Coverage for faststream / redis / subscriber / usecases / list_subscriber.py: 95%

91 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from collections.abc import AsyncIterator 

2from typing import TYPE_CHECKING, Any, Optional, TypeAlias 

3 

4import anyio 

5from typing_extensions import override 

6 

7from faststream._internal.endpoint.subscriber.mixins import ConcurrentMixin 

8from faststream._internal.endpoint.utils import process_msg 

9from faststream.redis.message import ( 

10 BatchListMessage, 

11 DefaultListMessage, 

12 RedisListMessage, 

13) 

14from faststream.redis.parser import ( 

15 RedisBatchListParser, 

16 RedisListParser, 

17) 

18 

19from .basic import LogicSubscriber 

20 

21if TYPE_CHECKING: 

22 from redis.asyncio.client import Redis 

23 

24 from faststream._internal.endpoint.subscriber import SubscriberSpecification 

25 from faststream._internal.endpoint.subscriber.call_item import ( 

26 CallsCollection, 

27 ) 

28 from faststream.message import StreamMessage as BrokerStreamMessage 

29 from faststream.redis.schemas import ListSub 

30 from faststream.redis.subscriber.config import RedisSubscriberConfig 

31 from faststream.redis.subscriber.specification import RedisSubscriberSpecification 

32 

33TopicName: TypeAlias = bytes 

34Offset: TypeAlias = bytes 

35 

36 

37class _ListHandlerMixin(LogicSubscriber): 

38 def __init__( 

39 self, 

40 config: "RedisSubscriberConfig", 

41 specification: "SubscriberSpecification[Any, Any]", 

42 calls: "CallsCollection[Any]", 

43 ) -> None: 

44 super().__init__(config, specification, calls) 

45 self._read_lock = anyio.Lock() 

46 assert config.list_sub 

47 self._list_sub = config.list_sub 

48 

49 @property 

50 def list_sub(self) -> "ListSub": 

51 return self._list_sub.add_prefix(self._outer_config.prefix) 

52 

53 def get_log_context( 

54 self, 

55 message: Optional["BrokerStreamMessage[Any]"], 

56 ) -> dict[str, str]: 

57 return self.build_log_context( 

58 message=message, 

59 channel=self.list_sub.name, 

60 ) 

61 

62 @override 

63 async def _consume( # type: ignore[override] 

64 self, 

65 client: "Redis[bytes]", 

66 *, 

67 start_signal: "anyio.Event", 

68 ) -> None: 

69 if await client.ping(): 69 ↛ 71line 69 didn't jump to line 71 because the condition on line 69 was always true

70 start_signal.set() 

71 await super()._consume(client, start_signal=start_signal) 

72 

73 @override 

74 async def start(self) -> None: 

75 await super().start(self._client) 

76 

77 @override 

78 async def stop(self) -> None: 

79 with anyio.move_on_after(self._outer_config.graceful_timeout): 

80 async with self._read_lock: 

81 await super().stop() 

82 

83 @override 

84 async def get_one( 

85 self, 

86 *, 

87 timeout: float = 5.0, 

88 ) -> "RedisListMessage | None": 

89 assert not self.calls, ( 

90 "You can't use `get_one` method if subscriber has registered handlers." 

91 ) 

92 

93 sleep_interval = timeout / 10 

94 raw_message = None 

95 

96 with anyio.move_on_after(timeout): 

97 while ( # noqa: ASYNC110 

98 raw_message := await self._client.lpop(name=self.list_sub.name) 

99 ) is None: 

100 await anyio.sleep(sleep_interval) 

101 

102 if not raw_message: 

103 return None 

104 

105 redis_incoming_msg = DefaultListMessage( 

106 type="list", 

107 data=raw_message, 

108 channel=self.list_sub.name, 

109 ) 

110 

111 context = self._outer_config.fd_config.context 

112 async_parser, async_decoder = self._get_parser_and_decoder() 

113 

114 msg: RedisListMessage = await process_msg( # type: ignore[assignment] 

115 msg=redis_incoming_msg, 

116 middlewares=( 

117 m(redis_incoming_msg, context=context) for m in self._broker_middlewares 

118 ), 

119 parser=async_parser, 

120 decoder=async_decoder, 

121 ) 

122 return msg 

123 

124 @override 

125 async def __aiter__(self) -> AsyncIterator["RedisListMessage"]: # type: ignore[override] 

126 assert not self.calls, ( 

127 "You can't use iterator if subscriber has registered handlers." 

128 ) 

129 

130 timeout = 5 

131 sleep_interval = timeout / 10 

132 raw_message = None 

133 

134 context = self._outer_config.fd_config.context 

135 async_parser, async_decoder = self._get_parser_and_decoder() 

136 

137 while True: 

138 with anyio.move_on_after(timeout): 

139 while ( # noqa: ASYNC110 139 ↛ 142line 139 didn't jump to line 142 because the condition on line 139 was never true

140 raw_message := await self._client.lpop(name=self.list_sub.name) 

141 ) is None: 

142 await anyio.sleep(sleep_interval) 

143 

144 if not raw_message: 144 ↛ 145line 144 didn't jump to line 145 because the condition on line 144 was never true

145 continue 

146 

147 redis_incoming_msg = DefaultListMessage( 

148 type="list", 

149 data=raw_message, 

150 channel=self.list_sub.name, 

151 ) 

152 

153 msg: RedisListMessage = await process_msg( # type: ignore[assignment] 

154 msg=redis_incoming_msg, 

155 middlewares=( 

156 m(redis_incoming_msg, context=context) 

157 for m in self._broker_middlewares 

158 ), 

159 parser=async_parser, 

160 decoder=async_decoder, 

161 ) 

162 yield msg 

163 

164 

165class ListSubscriber(_ListHandlerMixin): 

166 def __init__( 

167 self, 

168 config: "RedisSubscriberConfig", 

169 specification: "RedisSubscriberSpecification", 

170 calls: "CallsCollection[Any]", 

171 ) -> None: 

172 parser = RedisListParser(config) 

173 config.parser = parser.parse_message 

174 config.decoder = parser.decode_message 

175 super().__init__(config, specification, calls) 

176 

177 async def _get_msgs(self, client: "Redis[bytes]") -> None: 

178 async with self._read_lock: 

179 raw_msg = await client.blpop( 

180 self.list_sub.name, 

181 timeout=self.list_sub.polling_interval, 

182 ) 

183 

184 if raw_msg: 

185 _, msg_data = raw_msg 

186 

187 msg = DefaultListMessage( 

188 type="list", 

189 data=msg_data, 

190 channel=self.list_sub.name, 

191 ) 

192 

193 await self.consume_one(msg) 

194 

195 

196class ListBatchSubscriber(_ListHandlerMixin): 

197 def __init__( 

198 self, 

199 config: "RedisSubscriberConfig", 

200 specification: "RedisSubscriberSpecification", 

201 calls: "CallsCollection[Any]", 

202 ) -> None: 

203 parser = RedisBatchListParser(config) 

204 config.parser = parser.parse_message 

205 config.decoder = parser.decode_message 

206 super().__init__(config, specification, calls) 

207 

208 async def _get_msgs(self, client: "Redis[bytes]") -> None: 

209 async with self._read_lock: 

210 raw_msgs = await client.lpop( 

211 name=self.list_sub.name, 

212 count=self.list_sub.max_records, 

213 ) 

214 

215 if raw_msgs: 

216 msg = BatchListMessage( 

217 type="blist", 

218 channel=self.list_sub.name, 

219 data=raw_msgs, 

220 ) 

221 

222 await self.consume_one(msg) 

223 

224 if not raw_msgs: 

225 await anyio.sleep(self.list_sub.polling_interval) 

226 

227 

228class ListConcurrentSubscriber( 

229 ConcurrentMixin["BrokerStreamMessage[Any]"], 

230 ListSubscriber, 

231): 

232 async def start(self) -> None: 

233 await super().start() 

234 self.start_consume_task() 

235 

236 async def consume_one(self, msg: "BrokerStreamMessage[Any]") -> None: 

237 await self._put_msg(msg)