Coverage for faststream / nats / subscriber / usecases / object_storage_subscriber.py: 88%

67 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from collections.abc import AsyncIterator, Iterable 

2from contextlib import suppress 

3from typing import TYPE_CHECKING, Any, Optional, cast 

4 

5import anyio 

6from nats.errors import TimeoutError 

7from nats.js.api import ObjectInfo 

8from typing_extensions import override 

9 

10from faststream._internal.endpoint.subscriber.mixins import TasksMixin 

11from faststream._internal.endpoint.utils import process_msg 

12from faststream.nats.parser import ( 

13 ObjParser, 

14) 

15from faststream.nats.subscriber.adapters import ( 

16 UnsubscribeAdapter, 

17) 

18 

19from .basic import LogicSubscriber 

20 

21if TYPE_CHECKING: 

22 from nats.js.object_store import ObjectStore 

23 

24 from faststream._internal.endpoint.publisher import PublisherProto 

25 from faststream._internal.endpoint.subscriber import SubscriberSpecification 

26 from faststream._internal.endpoint.subscriber.call_item import CallsCollection 

27 from faststream.message import StreamMessage 

28 from faststream.nats.message import NatsObjMessage 

29 from faststream.nats.schemas import ObjWatch 

30 from faststream.nats.subscriber.config import NatsSubscriberConfig 

31 

32 

33OBJECT_STORAGE_CONTEXT_KEY = "__object_storage" 

34 

35 

36class ObjStoreWatchSubscriber( 

37 TasksMixin, 

38 LogicSubscriber[ObjectInfo], 

39): 

40 subscription: Optional["UnsubscribeAdapter[ObjectStore.ObjectWatcher]"] 

41 _fetch_sub: UnsubscribeAdapter["ObjectStore.ObjectWatcher"] | None 

42 

43 def __init__( 

44 self, 

45 config: "NatsSubscriberConfig", 

46 specification: "SubscriberSpecification[Any, Any]", 

47 calls: "CallsCollection[ObjectInfo]", 

48 *, 

49 obj_watch: "ObjWatch", 

50 ) -> None: 

51 parser = ObjParser(pattern="") 

52 config.parser = parser.parse_message 

53 config.decoder = parser.decode_message 

54 super().__init__(config, specification, calls) 

55 

56 self.obj_watch = obj_watch 

57 self.obj_watch_conn = None 

58 

59 @override 

60 async def get_one(self, *, timeout: float = 5) -> Optional["NatsObjMessage"]: 

61 assert not self.calls, ( 

62 "You can't use `get_one` method if subscriber has registered handlers." 

63 ) 

64 

65 if not self._fetch_sub: 65 ↛ 80line 65 didn't jump to line 80 because the condition on line 65 was always true

66 self.bucket = await self._outer_config.os_declarer.create_object_store( 

67 bucket=self.subject, 

68 declare=self.obj_watch.declare, 

69 ) 

70 

71 obj_watch = await self.bucket.watch( 

72 ignore_deletes=self.obj_watch.ignore_deletes, 

73 include_history=self.obj_watch.include_history, 

74 meta_only=self.obj_watch.meta_only, 

75 ) 

76 fetch_sub = self._fetch_sub = UnsubscribeAdapter["ObjectStore.ObjectWatcher"]( 

77 obj_watch 

78 ) 

79 else: 

80 fetch_sub = self._fetch_sub 

81 

82 msg = None 

83 sleep_interval = timeout / 10 

84 with anyio.move_on_after(timeout): 

85 while ( # noqa: ASYNC110 

86 msg := await fetch_sub.obj.updates(timeout) # type: ignore[no-untyped-call] 

87 ) is None: 

88 await anyio.sleep(sleep_interval) 

89 

90 context = self._outer_config.fd_config.context 

91 async_parser, async_decoder = self._get_parser_and_decoder() 

92 

93 return cast( 

94 "NatsObjMessage", 

95 await process_msg( 

96 msg=msg, 

97 middlewares=(m(msg, context=context) for m in self._broker_middlewares), 

98 parser=async_parser, 

99 decoder=async_decoder, 

100 ), 

101 ) 

102 

103 @override 

104 async def __aiter__(self) -> AsyncIterator["NatsObjMessage"]: # type: ignore[override] 

105 assert not self.calls, ( 

106 "You can't use iterator if subscriber has registered handlers." 

107 ) 

108 

109 if not self._fetch_sub: 109 ↛ 124line 109 didn't jump to line 124 because the condition on line 109 was always true

110 self.bucket = await self._outer_config.os_declarer.create_object_store( 

111 bucket=self.subject, 

112 declare=self.obj_watch.declare, 

113 ) 

114 

115 obj_watch = await self.bucket.watch( 

116 ignore_deletes=self.obj_watch.ignore_deletes, 

117 include_history=self.obj_watch.include_history, 

118 meta_only=self.obj_watch.meta_only, 

119 ) 

120 fetch_sub = self._fetch_sub = UnsubscribeAdapter["ObjectStore.ObjectWatcher"]( 

121 obj_watch 

122 ) 

123 else: 

124 fetch_sub = self._fetch_sub 

125 

126 timeout = 5 

127 sleep_interval = timeout / 10 

128 

129 context = self._outer_config.fd_config.context 

130 async_parser, async_decoder = self._get_parser_and_decoder() 

131 

132 while True: 

133 msg = None 

134 with anyio.move_on_after(timeout): 

135 while ( # noqa: ASYNC110 

136 msg := await fetch_sub.obj.updates(timeout) # type: ignore[no-untyped-call] 

137 ) is None: 

138 await anyio.sleep(sleep_interval) 

139 

140 if msg is None: 140 ↛ 141line 140 didn't jump to line 141 because the condition on line 140 was never true

141 continue 

142 

143 yield cast( 

144 "NatsObjMessage", 

145 await process_msg( 

146 msg=msg, 

147 middlewares=( 

148 m(msg, context=context) for m in self._broker_middlewares 

149 ), 

150 parser=async_parser, 

151 decoder=async_decoder, 

152 ), 

153 ) 

154 

155 @override 

156 async def _create_subscription(self) -> None: 

157 if self.subscription: 157 ↛ 158line 157 didn't jump to line 158 because the condition on line 157 was never true

158 return 

159 

160 self.bucket = await self._outer_config.os_declarer.create_object_store( 

161 bucket=self.subject, 

162 declare=self.obj_watch.declare, 

163 ) 

164 

165 self.add_task(self.__consume_watch) 

166 

167 async def __consume_watch(self) -> None: 

168 # Should be created inside task to avoid nats-py lock 

169 obj_watch = await self.bucket.watch( 

170 ignore_deletes=self.obj_watch.ignore_deletes, 

171 include_history=self.obj_watch.include_history, 

172 meta_only=self.obj_watch.meta_only, 

173 ) 

174 

175 self.subscription = UnsubscribeAdapter["ObjectStore.ObjectWatcher"](obj_watch) 

176 

177 context = self._outer_config.fd_config.context 

178 

179 while self.running: 179 ↛ exitline 179 didn't return from function '__consume_watch' because the condition on line 179 was always true

180 with suppress(TimeoutError): 

181 message = cast( 

182 "ObjectInfo | None", 

183 await obj_watch.updates(self.obj_watch.timeout), # type: ignore[no-untyped-call] 

184 ) 

185 

186 if message: 

187 with context.scope(OBJECT_STORAGE_CONTEXT_KEY, self.bucket): 

188 await self.consume(message) 

189 

190 def _make_response_publisher( 

191 self, 

192 message: "StreamMessage[ObjectInfo]", 

193 ) -> Iterable["PublisherProto"]: 

194 """Create Publisher objects to use it as one of `publishers` in `self.consume` scope. 

195 

196 Args: 

197 message: Message requiring reply 

198 """ 

199 return () 

200 

201 def get_log_context( 

202 self, 

203 message: Optional["StreamMessage[ObjectInfo]"], 

204 ) -> dict[str, str]: 

205 """Log context factory using in `self.consume` scope. 

206 

207 Args: 

208 message: Message which we are building context for 

209 """ 

210 return self.build_log_context( 

211 message=message, 

212 subject=self.subject, 

213 )