Coverage for faststream / nats / subscriber / usecases / object_storage_subscriber.py: 88%
67 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from collections.abc import AsyncIterator, Iterable
2from contextlib import suppress
3from typing import TYPE_CHECKING, Any, Optional, cast
5import anyio
6from nats.errors import TimeoutError
7from nats.js.api import ObjectInfo
8from typing_extensions import override
10from faststream._internal.endpoint.subscriber.mixins import TasksMixin
11from faststream._internal.endpoint.utils import process_msg
12from faststream.nats.parser import (
13 ObjParser,
14)
15from faststream.nats.subscriber.adapters import (
16 UnsubscribeAdapter,
17)
19from .basic import LogicSubscriber
21if TYPE_CHECKING:
22 from nats.js.object_store import ObjectStore
24 from faststream._internal.endpoint.publisher import PublisherProto
25 from faststream._internal.endpoint.subscriber import SubscriberSpecification
26 from faststream._internal.endpoint.subscriber.call_item import CallsCollection
27 from faststream.message import StreamMessage
28 from faststream.nats.message import NatsObjMessage
29 from faststream.nats.schemas import ObjWatch
30 from faststream.nats.subscriber.config import NatsSubscriberConfig
33OBJECT_STORAGE_CONTEXT_KEY = "__object_storage"
36class ObjStoreWatchSubscriber(
37 TasksMixin,
38 LogicSubscriber[ObjectInfo],
39):
40 subscription: Optional["UnsubscribeAdapter[ObjectStore.ObjectWatcher]"]
41 _fetch_sub: UnsubscribeAdapter["ObjectStore.ObjectWatcher"] | None
43 def __init__(
44 self,
45 config: "NatsSubscriberConfig",
46 specification: "SubscriberSpecification[Any, Any]",
47 calls: "CallsCollection[ObjectInfo]",
48 *,
49 obj_watch: "ObjWatch",
50 ) -> None:
51 parser = ObjParser(pattern="")
52 config.parser = parser.parse_message
53 config.decoder = parser.decode_message
54 super().__init__(config, specification, calls)
56 self.obj_watch = obj_watch
57 self.obj_watch_conn = None
59 @override
60 async def get_one(self, *, timeout: float = 5) -> Optional["NatsObjMessage"]:
61 assert not self.calls, (
62 "You can't use `get_one` method if subscriber has registered handlers."
63 )
65 if not self._fetch_sub: 65 ↛ 80line 65 didn't jump to line 80 because the condition on line 65 was always true
66 self.bucket = await self._outer_config.os_declarer.create_object_store(
67 bucket=self.subject,
68 declare=self.obj_watch.declare,
69 )
71 obj_watch = await self.bucket.watch(
72 ignore_deletes=self.obj_watch.ignore_deletes,
73 include_history=self.obj_watch.include_history,
74 meta_only=self.obj_watch.meta_only,
75 )
76 fetch_sub = self._fetch_sub = UnsubscribeAdapter["ObjectStore.ObjectWatcher"](
77 obj_watch
78 )
79 else:
80 fetch_sub = self._fetch_sub
82 msg = None
83 sleep_interval = timeout / 10
84 with anyio.move_on_after(timeout):
85 while ( # noqa: ASYNC110
86 msg := await fetch_sub.obj.updates(timeout) # type: ignore[no-untyped-call]
87 ) is None:
88 await anyio.sleep(sleep_interval)
90 context = self._outer_config.fd_config.context
91 async_parser, async_decoder = self._get_parser_and_decoder()
93 return cast(
94 "NatsObjMessage",
95 await process_msg(
96 msg=msg,
97 middlewares=(m(msg, context=context) for m in self._broker_middlewares),
98 parser=async_parser,
99 decoder=async_decoder,
100 ),
101 )
103 @override
104 async def __aiter__(self) -> AsyncIterator["NatsObjMessage"]: # type: ignore[override]
105 assert not self.calls, (
106 "You can't use iterator if subscriber has registered handlers."
107 )
109 if not self._fetch_sub: 109 ↛ 124line 109 didn't jump to line 124 because the condition on line 109 was always true
110 self.bucket = await self._outer_config.os_declarer.create_object_store(
111 bucket=self.subject,
112 declare=self.obj_watch.declare,
113 )
115 obj_watch = await self.bucket.watch(
116 ignore_deletes=self.obj_watch.ignore_deletes,
117 include_history=self.obj_watch.include_history,
118 meta_only=self.obj_watch.meta_only,
119 )
120 fetch_sub = self._fetch_sub = UnsubscribeAdapter["ObjectStore.ObjectWatcher"](
121 obj_watch
122 )
123 else:
124 fetch_sub = self._fetch_sub
126 timeout = 5
127 sleep_interval = timeout / 10
129 context = self._outer_config.fd_config.context
130 async_parser, async_decoder = self._get_parser_and_decoder()
132 while True:
133 msg = None
134 with anyio.move_on_after(timeout):
135 while ( # noqa: ASYNC110
136 msg := await fetch_sub.obj.updates(timeout) # type: ignore[no-untyped-call]
137 ) is None:
138 await anyio.sleep(sleep_interval)
140 if msg is None: 140 ↛ 141line 140 didn't jump to line 141 because the condition on line 140 was never true
141 continue
143 yield cast(
144 "NatsObjMessage",
145 await process_msg(
146 msg=msg,
147 middlewares=(
148 m(msg, context=context) for m in self._broker_middlewares
149 ),
150 parser=async_parser,
151 decoder=async_decoder,
152 ),
153 )
155 @override
156 async def _create_subscription(self) -> None:
157 if self.subscription: 157 ↛ 158line 157 didn't jump to line 158 because the condition on line 157 was never true
158 return
160 self.bucket = await self._outer_config.os_declarer.create_object_store(
161 bucket=self.subject,
162 declare=self.obj_watch.declare,
163 )
165 self.add_task(self.__consume_watch)
167 async def __consume_watch(self) -> None:
168 # Should be created inside task to avoid nats-py lock
169 obj_watch = await self.bucket.watch(
170 ignore_deletes=self.obj_watch.ignore_deletes,
171 include_history=self.obj_watch.include_history,
172 meta_only=self.obj_watch.meta_only,
173 )
175 self.subscription = UnsubscribeAdapter["ObjectStore.ObjectWatcher"](obj_watch)
177 context = self._outer_config.fd_config.context
179 while self.running: 179 ↛ exitline 179 didn't return from function '__consume_watch' because the condition on line 179 was always true
180 with suppress(TimeoutError):
181 message = cast(
182 "ObjectInfo | None",
183 await obj_watch.updates(self.obj_watch.timeout), # type: ignore[no-untyped-call]
184 )
186 if message:
187 with context.scope(OBJECT_STORAGE_CONTEXT_KEY, self.bucket):
188 await self.consume(message)
190 def _make_response_publisher(
191 self,
192 message: "StreamMessage[ObjectInfo]",
193 ) -> Iterable["PublisherProto"]:
194 """Create Publisher objects to use it as one of `publishers` in `self.consume` scope.
196 Args:
197 message: Message requiring reply
198 """
199 return ()
201 def get_log_context(
202 self,
203 message: Optional["StreamMessage[ObjectInfo]"],
204 ) -> dict[str, str]:
205 """Log context factory using in `self.consume` scope.
207 Args:
208 message: Message which we are building context for
209 """
210 return self.build_log_context(
211 message=message,
212 subject=self.subject,
213 )