Coverage for faststream / nats / subscriber / usecases / key_value_subscriber.py: 87%

62 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from collections.abc import AsyncIterator, Iterable 

2from contextlib import suppress 

3from typing import TYPE_CHECKING, Any, Optional, cast 

4 

5import anyio 

6from nats.errors import ConnectionClosedError, TimeoutError 

7from typing_extensions import override 

8 

9from faststream._internal.endpoint.subscriber.mixins import TasksMixin 

10from faststream._internal.endpoint.utils import process_msg 

11from faststream.nats.parser import KvParser 

12from faststream.nats.subscriber.adapters import UnsubscribeAdapter 

13 

14from .basic import LogicSubscriber 

15 

16if TYPE_CHECKING: 

17 from nats.js.kv import KeyValue 

18 

19 from faststream._internal.endpoint.publisher import PublisherProto 

20 from faststream._internal.endpoint.subscriber import SubscriberSpecification 

21 from faststream._internal.endpoint.subscriber.call_item import CallsCollection 

22 from faststream.message import StreamMessage 

23 from faststream.nats.message import NatsKvMessage 

24 from faststream.nats.schemas import KvWatch 

25 from faststream.nats.subscriber.config import NatsSubscriberConfig 

26 

27 

28class KeyValueWatchSubscriber( 

29 TasksMixin, 

30 LogicSubscriber["KeyValue.Entry"], 

31): 

32 subscription: Optional["UnsubscribeAdapter[KeyValue.KeyWatcher]"] 

33 _fetch_sub: UnsubscribeAdapter["KeyValue.KeyWatcher"] | None 

34 

35 def __init__( 

36 self, 

37 config: "NatsSubscriberConfig", 

38 specification: "SubscriberSpecification[Any, Any]", 

39 calls: "CallsCollection[KeyValue.Entry]", 

40 *, 

41 kv_watch: "KvWatch", 

42 ) -> None: 

43 parser = KvParser(pattern=config.subject) 

44 config.decoder = parser.decode_message 

45 config.parser = parser.parse_message 

46 super().__init__(config, specification, calls) 

47 

48 self.kv_watch = kv_watch 

49 

50 @override 

51 async def get_one( 

52 self, 

53 *, 

54 timeout: float = 5, 

55 ) -> Optional["NatsKvMessage"]: 

56 assert not self.calls, ( 

57 "You can't use `get_one` method if subscriber has registered handlers." 

58 ) 

59 

60 if not self._fetch_sub: 60 ↛ 76line 60 didn't jump to line 76 because the condition on line 60 was always true

61 bucket = await self._outer_config.kv_declarer.create_key_value( 

62 bucket=self.kv_watch.name, 

63 declare=self.kv_watch.declare, 

64 ) 

65 

66 fetch_sub = self._fetch_sub = UnsubscribeAdapter["KeyValue.KeyWatcher"]( 

67 await bucket.watch( 

68 keys=self.clear_subject, 

69 headers_only=self.kv_watch.headers_only, 

70 include_history=self.kv_watch.include_history, 

71 ignore_deletes=self.kv_watch.ignore_deletes, 

72 meta_only=self.kv_watch.meta_only, 

73 ), 

74 ) 

75 else: 

76 fetch_sub = self._fetch_sub 

77 

78 msg = None 

79 sleep_interval = timeout / 10 

80 with anyio.move_on_after(timeout): 

81 while ( # noqa: ASYNC110 

82 msg := await fetch_sub.obj.updates(timeout) # type: ignore[no-untyped-call] 

83 ) is None: 

84 await anyio.sleep(sleep_interval) 

85 

86 context = self._outer_config.fd_config.context 

87 async_parser, async_decoder = self._get_parser_and_decoder() 

88 

89 return cast( 

90 "NatsKvMessage", 

91 await process_msg( 

92 msg=msg, 

93 middlewares=(m(msg, context=context) for m in self._broker_middlewares), 

94 parser=async_parser, 

95 decoder=async_decoder, 

96 ), 

97 ) 

98 

99 @override 

100 async def __aiter__(self) -> AsyncIterator["NatsKvMessage"]: # type: ignore[override] 

101 assert not self.calls, ( 

102 "You can't use iterator if subscriber has registered handlers." 

103 ) 

104 

105 if not self._fetch_sub: 105 ↛ 121line 105 didn't jump to line 121 because the condition on line 105 was always true

106 bucket = await self._outer_config.kv_declarer.create_key_value( 

107 bucket=self.kv_watch.name, 

108 declare=self.kv_watch.declare, 

109 ) 

110 

111 fetch_sub = self._fetch_sub = UnsubscribeAdapter["KeyValue.KeyWatcher"]( 

112 await bucket.watch( 

113 keys=self.clear_subject, 

114 headers_only=self.kv_watch.headers_only, 

115 include_history=self.kv_watch.include_history, 

116 ignore_deletes=self.kv_watch.ignore_deletes, 

117 meta_only=self.kv_watch.meta_only, 

118 ), 

119 ) 

120 else: 

121 fetch_sub = self._fetch_sub 

122 

123 timeout = 5 

124 sleep_interval = timeout / 10 

125 

126 context = self._outer_config.fd_config.context 

127 async_parser, async_decoder = self._get_parser_and_decoder() 

128 

129 while True: 

130 msg = None 

131 with anyio.move_on_after(timeout): 

132 while ( # noqa: ASYNC110 

133 msg := await fetch_sub.obj.updates(timeout) # type: ignore[no-untyped-call] 

134 ) is None: 

135 await anyio.sleep(sleep_interval) 

136 

137 if msg is None: 137 ↛ 138line 137 didn't jump to line 138 because the condition on line 137 was never true

138 continue 

139 

140 yield cast( 

141 "NatsKvMessage", 

142 await process_msg( 

143 msg=msg, 

144 middlewares=( 

145 m(msg, context=context) for m in self._broker_middlewares 

146 ), 

147 parser=async_parser, 

148 decoder=async_decoder, 

149 ), 

150 ) 

151 

152 @override 

153 async def _create_subscription(self) -> None: 

154 if self.subscription: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true

155 return 

156 

157 bucket = await self._outer_config.kv_declarer.create_key_value( 

158 bucket=self.kv_watch.name, 

159 declare=self.kv_watch.declare, 

160 ) 

161 

162 self.subscription = UnsubscribeAdapter["KeyValue.KeyWatcher"]( 

163 await bucket.watch( 

164 keys=self.clear_subject, 

165 headers_only=self.kv_watch.headers_only, 

166 include_history=self.kv_watch.include_history, 

167 ignore_deletes=self.kv_watch.ignore_deletes, 

168 meta_only=self.kv_watch.meta_only, 

169 ), 

170 ) 

171 

172 self.add_task(self.__consume_watch) 

173 

174 async def __consume_watch(self) -> None: 

175 assert self.subscription, "You should call `create_subscription` at first." 

176 

177 key_watcher = self.subscription.obj 

178 

179 while self.running: 179 ↛ exitline 179 didn't return from function '__consume_watch' because the condition on line 179 was always true

180 with suppress(ConnectionClosedError, TimeoutError): 

181 message = cast( 

182 "KeyValue.Entry | None", 

183 await key_watcher.updates(self.kv_watch.timeout), # type: ignore[no-untyped-call] 

184 ) 

185 

186 if message: 

187 await self.consume(message) 

188 

189 def _make_response_publisher( 

190 self, 

191 message: "StreamMessage[KeyValue.Entry]", 

192 ) -> Iterable["PublisherProto"]: 

193 """Create Publisher objects to use it as one of `publishers` in `self.consume` scope. 

194 

195 Args: 

196 message: Message requiring reply 

197 """ 

198 return () 

199 

200 def get_log_context( 

201 self, 

202 message: Optional["StreamMessage[KeyValue.Entry]"], 

203 ) -> dict[str, str]: 

204 """Log context factory using in `self.consume` scope. 

205 

206 Args: 

207 message: Message which we are building context for 

208 """ 

209 return self.build_log_context( 

210 message=message, 

211 subject=self.subject, 

212 stream=self.kv_watch.name, 

213 )