Coverage for faststream / redis / subscriber / usecases / list_subscriber.py: 95%
91 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from collections.abc import AsyncIterator
2from typing import TYPE_CHECKING, Any, Optional, TypeAlias
4import anyio
5from typing_extensions import override
7from faststream._internal.endpoint.subscriber.mixins import ConcurrentMixin
8from faststream._internal.endpoint.utils import process_msg
9from faststream.redis.message import (
10 BatchListMessage,
11 DefaultListMessage,
12 RedisListMessage,
13)
14from faststream.redis.parser import (
15 RedisBatchListParser,
16 RedisListParser,
17)
19from .basic import LogicSubscriber
21if TYPE_CHECKING:
22 from redis.asyncio.client import Redis
24 from faststream._internal.endpoint.subscriber import SubscriberSpecification
25 from faststream._internal.endpoint.subscriber.call_item import (
26 CallsCollection,
27 )
28 from faststream.message import StreamMessage as BrokerStreamMessage
29 from faststream.redis.schemas import ListSub
30 from faststream.redis.subscriber.config import RedisSubscriberConfig
31 from faststream.redis.subscriber.specification import RedisSubscriberSpecification
33TopicName: TypeAlias = bytes
34Offset: TypeAlias = bytes
37class _ListHandlerMixin(LogicSubscriber):
38 def __init__(
39 self,
40 config: "RedisSubscriberConfig",
41 specification: "SubscriberSpecification[Any, Any]",
42 calls: "CallsCollection[Any]",
43 ) -> None:
44 super().__init__(config, specification, calls)
45 self._read_lock = anyio.Lock()
46 assert config.list_sub
47 self._list_sub = config.list_sub
49 @property
50 def list_sub(self) -> "ListSub":
51 return self._list_sub.add_prefix(self._outer_config.prefix)
53 def get_log_context(
54 self,
55 message: Optional["BrokerStreamMessage[Any]"],
56 ) -> dict[str, str]:
57 return self.build_log_context(
58 message=message,
59 channel=self.list_sub.name,
60 )
62 @override
63 async def _consume( # type: ignore[override]
64 self,
65 client: "Redis[bytes]",
66 *,
67 start_signal: "anyio.Event",
68 ) -> None:
69 if await client.ping(): 69 ↛ 71line 69 didn't jump to line 71 because the condition on line 69 was always true
70 start_signal.set()
71 await super()._consume(client, start_signal=start_signal)
73 @override
74 async def start(self) -> None:
75 await super().start(self._client)
77 @override
78 async def stop(self) -> None:
79 with anyio.move_on_after(self._outer_config.graceful_timeout):
80 async with self._read_lock:
81 await super().stop()
83 @override
84 async def get_one(
85 self,
86 *,
87 timeout: float = 5.0,
88 ) -> "RedisListMessage | None":
89 assert not self.calls, (
90 "You can't use `get_one` method if subscriber has registered handlers."
91 )
93 sleep_interval = timeout / 10
94 raw_message = None
96 with anyio.move_on_after(timeout):
97 while ( # noqa: ASYNC110
98 raw_message := await self._client.lpop(name=self.list_sub.name)
99 ) is None:
100 await anyio.sleep(sleep_interval)
102 if not raw_message:
103 return None
105 redis_incoming_msg = DefaultListMessage(
106 type="list",
107 data=raw_message,
108 channel=self.list_sub.name,
109 )
111 context = self._outer_config.fd_config.context
112 async_parser, async_decoder = self._get_parser_and_decoder()
114 msg: RedisListMessage = await process_msg( # type: ignore[assignment]
115 msg=redis_incoming_msg,
116 middlewares=(
117 m(redis_incoming_msg, context=context) for m in self._broker_middlewares
118 ),
119 parser=async_parser,
120 decoder=async_decoder,
121 )
122 return msg
124 @override
125 async def __aiter__(self) -> AsyncIterator["RedisListMessage"]: # type: ignore[override]
126 assert not self.calls, (
127 "You can't use iterator if subscriber has registered handlers."
128 )
130 timeout = 5
131 sleep_interval = timeout / 10
132 raw_message = None
134 context = self._outer_config.fd_config.context
135 async_parser, async_decoder = self._get_parser_and_decoder()
137 while True:
138 with anyio.move_on_after(timeout):
139 while ( # noqa: ASYNC110 139 ↛ 142line 139 didn't jump to line 142 because the condition on line 139 was never true
140 raw_message := await self._client.lpop(name=self.list_sub.name)
141 ) is None:
142 await anyio.sleep(sleep_interval)
144 if not raw_message: 144 ↛ 145line 144 didn't jump to line 145 because the condition on line 144 was never true
145 continue
147 redis_incoming_msg = DefaultListMessage(
148 type="list",
149 data=raw_message,
150 channel=self.list_sub.name,
151 )
153 msg: RedisListMessage = await process_msg( # type: ignore[assignment]
154 msg=redis_incoming_msg,
155 middlewares=(
156 m(redis_incoming_msg, context=context)
157 for m in self._broker_middlewares
158 ),
159 parser=async_parser,
160 decoder=async_decoder,
161 )
162 yield msg
165class ListSubscriber(_ListHandlerMixin):
166 def __init__(
167 self,
168 config: "RedisSubscriberConfig",
169 specification: "RedisSubscriberSpecification",
170 calls: "CallsCollection[Any]",
171 ) -> None:
172 parser = RedisListParser(config)
173 config.parser = parser.parse_message
174 config.decoder = parser.decode_message
175 super().__init__(config, specification, calls)
177 async def _get_msgs(self, client: "Redis[bytes]") -> None:
178 async with self._read_lock:
179 raw_msg = await client.blpop(
180 self.list_sub.name,
181 timeout=self.list_sub.polling_interval,
182 )
184 if raw_msg:
185 _, msg_data = raw_msg
187 msg = DefaultListMessage(
188 type="list",
189 data=msg_data,
190 channel=self.list_sub.name,
191 )
193 await self.consume_one(msg)
196class ListBatchSubscriber(_ListHandlerMixin):
197 def __init__(
198 self,
199 config: "RedisSubscriberConfig",
200 specification: "RedisSubscriberSpecification",
201 calls: "CallsCollection[Any]",
202 ) -> None:
203 parser = RedisBatchListParser(config)
204 config.parser = parser.parse_message
205 config.decoder = parser.decode_message
206 super().__init__(config, specification, calls)
208 async def _get_msgs(self, client: "Redis[bytes]") -> None:
209 async with self._read_lock:
210 raw_msgs = await client.lpop(
211 name=self.list_sub.name,
212 count=self.list_sub.max_records,
213 )
215 if raw_msgs:
216 msg = BatchListMessage(
217 type="blist",
218 channel=self.list_sub.name,
219 data=raw_msgs,
220 )
222 await self.consume_one(msg)
224 if not raw_msgs:
225 await anyio.sleep(self.list_sub.polling_interval)
228class ListConcurrentSubscriber(
229 ConcurrentMixin["BrokerStreamMessage[Any]"],
230 ListSubscriber,
231):
232 async def start(self) -> None:
233 await super().start()
234 self.start_consume_task()
236 async def consume_one(self, msg: "BrokerStreamMessage[Any]") -> None:
237 await self._put_msg(msg)