Coverage for faststream / nats / subscriber / usecases / key_value_subscriber.py: 87%
62 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from collections.abc import AsyncIterator, Iterable
2from contextlib import suppress
3from typing import TYPE_CHECKING, Any, Optional, cast
5import anyio
6from nats.errors import ConnectionClosedError, TimeoutError
7from typing_extensions import override
9from faststream._internal.endpoint.subscriber.mixins import TasksMixin
10from faststream._internal.endpoint.utils import process_msg
11from faststream.nats.parser import KvParser
12from faststream.nats.subscriber.adapters import UnsubscribeAdapter
14from .basic import LogicSubscriber
16if TYPE_CHECKING:
17 from nats.js.kv import KeyValue
19 from faststream._internal.endpoint.publisher import PublisherProto
20 from faststream._internal.endpoint.subscriber import SubscriberSpecification
21 from faststream._internal.endpoint.subscriber.call_item import CallsCollection
22 from faststream.message import StreamMessage
23 from faststream.nats.message import NatsKvMessage
24 from faststream.nats.schemas import KvWatch
25 from faststream.nats.subscriber.config import NatsSubscriberConfig
28class KeyValueWatchSubscriber(
29 TasksMixin,
30 LogicSubscriber["KeyValue.Entry"],
31):
32 subscription: Optional["UnsubscribeAdapter[KeyValue.KeyWatcher]"]
33 _fetch_sub: UnsubscribeAdapter["KeyValue.KeyWatcher"] | None
35 def __init__(
36 self,
37 config: "NatsSubscriberConfig",
38 specification: "SubscriberSpecification[Any, Any]",
39 calls: "CallsCollection[KeyValue.Entry]",
40 *,
41 kv_watch: "KvWatch",
42 ) -> None:
43 parser = KvParser(pattern=config.subject)
44 config.decoder = parser.decode_message
45 config.parser = parser.parse_message
46 super().__init__(config, specification, calls)
48 self.kv_watch = kv_watch
50 @override
51 async def get_one(
52 self,
53 *,
54 timeout: float = 5,
55 ) -> Optional["NatsKvMessage"]:
56 assert not self.calls, (
57 "You can't use `get_one` method if subscriber has registered handlers."
58 )
60 if not self._fetch_sub: 60 ↛ 76line 60 didn't jump to line 76 because the condition on line 60 was always true
61 bucket = await self._outer_config.kv_declarer.create_key_value(
62 bucket=self.kv_watch.name,
63 declare=self.kv_watch.declare,
64 )
66 fetch_sub = self._fetch_sub = UnsubscribeAdapter["KeyValue.KeyWatcher"](
67 await bucket.watch(
68 keys=self.clear_subject,
69 headers_only=self.kv_watch.headers_only,
70 include_history=self.kv_watch.include_history,
71 ignore_deletes=self.kv_watch.ignore_deletes,
72 meta_only=self.kv_watch.meta_only,
73 ),
74 )
75 else:
76 fetch_sub = self._fetch_sub
78 msg = None
79 sleep_interval = timeout / 10
80 with anyio.move_on_after(timeout):
81 while ( # noqa: ASYNC110
82 msg := await fetch_sub.obj.updates(timeout) # type: ignore[no-untyped-call]
83 ) is None:
84 await anyio.sleep(sleep_interval)
86 context = self._outer_config.fd_config.context
87 async_parser, async_decoder = self._get_parser_and_decoder()
89 return cast(
90 "NatsKvMessage",
91 await process_msg(
92 msg=msg,
93 middlewares=(m(msg, context=context) for m in self._broker_middlewares),
94 parser=async_parser,
95 decoder=async_decoder,
96 ),
97 )
99 @override
100 async def __aiter__(self) -> AsyncIterator["NatsKvMessage"]: # type: ignore[override]
101 assert not self.calls, (
102 "You can't use iterator if subscriber has registered handlers."
103 )
105 if not self._fetch_sub: 105 ↛ 121line 105 didn't jump to line 121 because the condition on line 105 was always true
106 bucket = await self._outer_config.kv_declarer.create_key_value(
107 bucket=self.kv_watch.name,
108 declare=self.kv_watch.declare,
109 )
111 fetch_sub = self._fetch_sub = UnsubscribeAdapter["KeyValue.KeyWatcher"](
112 await bucket.watch(
113 keys=self.clear_subject,
114 headers_only=self.kv_watch.headers_only,
115 include_history=self.kv_watch.include_history,
116 ignore_deletes=self.kv_watch.ignore_deletes,
117 meta_only=self.kv_watch.meta_only,
118 ),
119 )
120 else:
121 fetch_sub = self._fetch_sub
123 timeout = 5
124 sleep_interval = timeout / 10
126 context = self._outer_config.fd_config.context
127 async_parser, async_decoder = self._get_parser_and_decoder()
129 while True:
130 msg = None
131 with anyio.move_on_after(timeout):
132 while ( # noqa: ASYNC110
133 msg := await fetch_sub.obj.updates(timeout) # type: ignore[no-untyped-call]
134 ) is None:
135 await anyio.sleep(sleep_interval)
137 if msg is None: 137 ↛ 138line 137 didn't jump to line 138 because the condition on line 137 was never true
138 continue
140 yield cast(
141 "NatsKvMessage",
142 await process_msg(
143 msg=msg,
144 middlewares=(
145 m(msg, context=context) for m in self._broker_middlewares
146 ),
147 parser=async_parser,
148 decoder=async_decoder,
149 ),
150 )
152 @override
153 async def _create_subscription(self) -> None:
154 if self.subscription: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true
155 return
157 bucket = await self._outer_config.kv_declarer.create_key_value(
158 bucket=self.kv_watch.name,
159 declare=self.kv_watch.declare,
160 )
162 self.subscription = UnsubscribeAdapter["KeyValue.KeyWatcher"](
163 await bucket.watch(
164 keys=self.clear_subject,
165 headers_only=self.kv_watch.headers_only,
166 include_history=self.kv_watch.include_history,
167 ignore_deletes=self.kv_watch.ignore_deletes,
168 meta_only=self.kv_watch.meta_only,
169 ),
170 )
172 self.add_task(self.__consume_watch)
174 async def __consume_watch(self) -> None:
175 assert self.subscription, "You should call `create_subscription` at first."
177 key_watcher = self.subscription.obj
179 while self.running: 179 ↛ exitline 179 didn't return from function '__consume_watch' because the condition on line 179 was always true
180 with suppress(ConnectionClosedError, TimeoutError):
181 message = cast(
182 "KeyValue.Entry | None",
183 await key_watcher.updates(self.kv_watch.timeout), # type: ignore[no-untyped-call]
184 )
186 if message:
187 await self.consume(message)
189 def _make_response_publisher(
190 self,
191 message: "StreamMessage[KeyValue.Entry]",
192 ) -> Iterable["PublisherProto"]:
193 """Create Publisher objects to use it as one of `publishers` in `self.consume` scope.
195 Args:
196 message: Message requiring reply
197 """
198 return ()
200 def get_log_context(
201 self,
202 message: Optional["StreamMessage[KeyValue.Entry]"],
203 ) -> dict[str, str]:
204 """Log context factory using in `self.consume` scope.
206 Args:
207 message: Message which we are building context for
208 """
209 return self.build_log_context(
210 message=message,
211 subject=self.subject,
212 stream=self.kv_watch.name,
213 )