Coverage for faststream / redis / broker / broker.py: 92%
55 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import logging
2from collections.abc import Iterable, Mapping, Sequence
3from typing import (
4 TYPE_CHECKING,
5 Any,
6 Optional,
7 TypeAlias,
8 Union,
9)
10from urllib.parse import urlparse
12import anyio
13from anyio import move_on_after
14from fast_depends import Provider, dependency_provider
15from redis.asyncio.connection import (
16 Connection,
17 DefaultParser,
18 Encoder,
19 parse_url,
20)
21from redis.exceptions import ConnectionError
22from typing_extensions import overload, override
24from faststream._internal.broker import BrokerUsecase
25from faststream._internal.constants import EMPTY
26from faststream._internal.context.repository import ContextRepo
27from faststream._internal.di import FastDependsConfig
28from faststream.message import gen_cor_id
29from faststream.middlewares import AckPolicy
30from faststream.redis.configs import ConnectionState, RedisBrokerConfig
31from faststream.redis.message import UnifyRedisDict
32from faststream.redis.parser import BinaryMessageFormatV1, MessageFormat
33from faststream.redis.publisher.producer import RedisFastProducer
34from faststream.redis.response import RedisPublishCommand
35from faststream.redis.security import parse_security
36from faststream.response.publish_type import PublishType
37from faststream.specification.schema import BrokerSpec
39from .logging import make_redis_logger_state
40from .registrator import RedisRegistrator
42if TYPE_CHECKING:
43 from types import TracebackType
45 from fast_depends.dependencies import Dependant
46 from fast_depends.library.serializer import SerializerProto
47 from redis.asyncio.client import Pipeline, Redis
48 from redis.asyncio.connection import BaseParser
49 from typing_extensions import TypedDict
51 from faststream._internal.basic_types import LoggerProto, SendableMessage
52 from faststream._internal.parser import CodecProto
53 from faststream._internal.types import BrokerMiddleware, CustomCallable
54 from faststream.redis.message import RedisChannelMessage
55 from faststream.security import BaseSecurity
56 from faststream.specification.schema.extra import Tag, TagDict
58 class RedisInitKwargs(TypedDict, total=False):
59 host: str | None
60 port: str | int | None
61 db: str | int | None
62 client_name: str | None
63 health_check_interval: float | None
64 max_connections: int | None
65 socket_timeout: float | None
66 socket_connect_timeout: float | None
67 socket_read_size: int | None
68 socket_keepalive: bool | None
69 socket_keepalive_options: Mapping[int, int | bytes] | None
70 socket_type: int | None
71 retry_on_timeout: bool | None
72 encoding: str | None
73 encoding_errors: str | None
74 parser_class: type["BaseParser"] | None
75 connection_class: type["Connection"] | None
76 encoder_class: type["Encoder"] | None
79Channel: TypeAlias = str
82class RedisBroker(
83 RedisRegistrator,
84 BrokerUsecase[UnifyRedisDict, "Redis[bytes]"],
85):
86 """Redis broker."""
88 def __init__(
89 self,
90 url: str = "redis://localhost:6379",
91 *,
92 host: str = EMPTY,
93 port: str | int = EMPTY,
94 db: str | int = EMPTY,
95 connection_class: type["Connection"] = EMPTY,
96 client_name: str | None = None,
97 health_check_interval: float = 0,
98 max_connections: int | None = None,
99 socket_timeout: float | None = None,
100 socket_connect_timeout: float | None = None,
101 socket_read_size: int = 65536,
102 socket_keepalive: bool = False,
103 socket_keepalive_options: Mapping[int, int | bytes] | None = None,
104 socket_type: int = 0,
105 retry_on_timeout: bool = False,
106 encoding: str = "utf-8",
107 encoding_errors: str = "strict",
108 parser_class: type["BaseParser"] = DefaultParser,
109 encoder_class: type["Encoder"] = Encoder,
110 graceful_timeout: float | None = 15.0,
111 ack_policy: AckPolicy = EMPTY,
112 decoder: Optional["CustomCallable"] = None,
113 codec: Optional["CodecProto"] = None,
114 parser: Optional["CustomCallable"] = None,
115 dependencies: Iterable["Dependant"] = (),
116 middlewares: Sequence["BrokerMiddleware[Any, Any]"] = (),
117 routers: Iterable[RedisRegistrator] = (),
118 message_format: type["MessageFormat"] = BinaryMessageFormatV1,
119 security: Optional["BaseSecurity"] = None,
120 specification_url: str | None = None,
121 protocol: str | None = None,
122 protocol_version: str | None = "custom",
123 description: str | None = None,
124 tags: Iterable[Union["Tag", "TagDict"]] = (),
125 logger: Optional["LoggerProto"] = EMPTY,
126 log_level: int = logging.INFO,
127 apply_types: bool = True,
128 serializer: Optional["SerializerProto"] = EMPTY,
129 provider: Optional["Provider"] = None,
130 context: Optional["ContextRepo"] = None,
131 ) -> None:
132 """Initialized the RedisBroker.
134 Args:
135 url:
136 The Redis connection URL. Defaults to "redis://localhost:6379".
137 host:
138 The Redis host to connect to. If not provided, it will be extracted from the URL.
139 port:
140 The Redis port to connect to. If not provided, it will be extracted from the URL.
141 db:
142 The Redis database to use. If not provided, it will be extracted from the URL.
143 connection_class:
144 The class to use for establishing connections. Defaults to EMPTY.
145 client_name:
146 The name of the Redis client. Defaults to None.
147 health_check_interval:
148 The interval at which to perform health checks on the broker. Defaults to 0.
149 max_connections:
150 The maximum number of connections to establish. Defaults to None.
151 socket_timeout:
152 The timeout for socket operations. Defaults to None.
153 socket_connect_timeout:
154 The timeout for connecting sockets. Defaults to None.
155 socket_read_size:
156 The size of the buffer used for reading from sockets. Defaults to 65536.
157 socket_keepalive:
158 Whether to enable keep-alive on sockets. Defaults to False.
159 socket_keepalive_options:
160 Options for keep-alive on sockets. Defaults to None.
161 socket_type:
162 The type of socket to use (if supported by your platform). Defaults to 0.
163 retry_on_timeout:
164 Whether to retry operations that timeout. Defaults to False.
165 encoding:
166 The encoding used for sending and receiving data. Defaults to "utf-8".
167 encoding_errors:
168 How to handle encoding errors. Defaults to "strict".
169 parser_class:
170 The class to use for parsing messages. Defaults to DefaultParser.
171 encoder_class:
172 The class to use for encoding messages. Defaults to Encoder.
173 graceful_timeout:
174 Graceful shutdown timeout. Broker waits for all running subscribers completion before shut down. Defaults to 15.0.
175 ack_policy:
176 Default acknowledgement policy for all subscribers. Individual subscribers can override.
177 decoder:
178 Custom decoder object. Defaults to None.
179 codec:
180 Custom codec object. Defaults to None.
181 parser:
182 Custom parser object. Defaults to None.
183 dependencies:
184 Dependencies to apply to all broker subscribers. Defaults to ().
185 middlewares:
186 Middlewares to apply to all broker publishers/subscribers. Defaults to ().
187 routers:
188 Routers to apply to broker. Defaults to ().
189 message_format:
190 What format to use when parsing messages. Defaults to BinaryMessageFormatV1.
191 security:
192 Security options to connect broker and generate AsyncAPI server security information. Defaults to None.
193 specification_url:
194 AsyncAPI hardcoded server addresses. Use `servers` if not specified. Defaults to None.
195 protocol:
196 AsyncAPI server protocol. Defaults to None.
197 protocol_version:
198 AsyncAPI server protocol version. Defaults to "custom".
199 description:
200 AsyncAPI server description. Defaults to None.
201 tags:
202 AsyncAPI server tags. Defaults to ().
203 logger:
204 User specified logger to pass into Context and log service messages. Defaults to EMPTY.
205 log_level:
206 Service messages log level. Defaults to logging.INFO.
207 apply_types:
208 Whether to use FastDepends or not. Defaults to True.
209 serializer:
210 Serializer object. Defaults to EMPTY.
211 provider:
212 Provider for FastDepends library. Defaults to None.
213 context:
214 Context repository for FastDepends library. Defaults to None.
215 """
216 self.message_format = message_format
218 if specification_url is None:
219 specification_url = url
221 if protocol is None:
222 url_kwargs = urlparse(specification_url)
223 protocol = url_kwargs.scheme
225 connection_options = _resolve_url_options(
226 url,
227 security=security,
228 host=host,
229 port=port,
230 db=db,
231 client_name=client_name,
232 health_check_interval=health_check_interval,
233 max_connections=max_connections,
234 socket_timeout=socket_timeout,
235 socket_connect_timeout=socket_connect_timeout,
236 socket_read_size=socket_read_size,
237 socket_keepalive=socket_keepalive,
238 socket_keepalive_options=socket_keepalive_options,
239 socket_type=socket_type,
240 retry_on_timeout=retry_on_timeout,
241 encoding=encoding,
242 encoding_errors=encoding_errors,
243 parser_class=parser_class,
244 connection_class=connection_class,
245 encoder_class=encoder_class,
246 )
248 connection_state = ConnectionState(connection_options)
250 super().__init__(
251 **connection_options,
252 routers=routers,
253 config=RedisBrokerConfig(
254 connection=connection_state,
255 producer=RedisFastProducer(
256 connection=connection_state,
257 parser=parser,
258 decoder=decoder,
259 message_format=self.message_format,
260 serializer=serializer,
261 ),
262 message_format=self.message_format,
263 # both args
264 broker_middlewares=middlewares,
265 broker_parser=parser,
266 broker_decoder=decoder,
267 broker_codec=codec,
268 logger=make_redis_logger_state(
269 logger=logger,
270 log_level=log_level,
271 ),
272 fd_config=FastDependsConfig(
273 use_fastdepends=apply_types,
274 serializer=serializer,
275 provider=provider or dependency_provider,
276 context=context or ContextRepo(),
277 ),
278 # subscriber args
279 broker_dependencies=dependencies,
280 graceful_timeout=graceful_timeout,
281 ack_policy=ack_policy,
282 extra_context={
283 "broker": self,
284 },
285 ),
286 specification=BrokerSpec(
287 description=description,
288 url=[specification_url],
289 protocol=protocol,
290 protocol_version=protocol_version,
291 security=security,
292 tags=tags,
293 ),
294 )
296 @override
297 async def _connect(self) -> "Redis[bytes]":
298 await self.config.connect()
299 return self.config.broker_config.connection.client
301 async def stop(
302 self,
303 exc_type: type[BaseException] | None = None,
304 exc_val: BaseException | None = None,
305 exc_tb: Optional["TracebackType"] = None,
306 ) -> None:
307 await super().stop(exc_type, exc_val, exc_tb)
308 await self.config.disconnect()
309 self._connection = None
311 async def start(self) -> None:
312 await self.connect()
313 await super().start()
315 @overload
316 async def publish(
317 self,
318 message: "SendableMessage" = None,
319 channel: str | None = None,
320 *,
321 reply_to: str = "",
322 headers: dict[str, Any] | None = None,
323 correlation_id: str | None = None,
324 list: str | None = None,
325 stream: None = None,
326 maxlen: int | None = None,
327 pipeline: Optional["Pipeline[bytes]"] = None,
328 ) -> int: ...
330 @overload
331 async def publish(
332 self,
333 message: "SendableMessage" = None,
334 channel: str | None = None,
335 *,
336 reply_to: str = "",
337 headers: dict[str, Any] | None = None,
338 correlation_id: str | None = None,
339 list: str | None = None,
340 stream: str = ...,
341 maxlen: int | None = None,
342 pipeline: Optional["Pipeline[bytes]"] = None,
343 ) -> bytes: ...
345 @override
346 async def publish(
347 self,
348 message: "SendableMessage" = None,
349 channel: str | None = None,
350 *,
351 reply_to: str = "",
352 headers: dict[str, Any] | None = None,
353 correlation_id: str | None = None,
354 list: str | None = None,
355 stream: str | None = None,
356 maxlen: int | None = None,
357 pipeline: Optional["Pipeline[bytes]"] = None,
358 ) -> int | bytes:
359 """Publish message directly.
361 This method allows you to publish a message in a non-AsyncAPI-documented way.
362 It can be used in other frameworks or to publish messages at specific intervals.
364 Args:
365 message:
366 Message body to send.
367 channel:
368 Redis PubSub object name to send message.
369 reply_to:
370 Reply message destination PubSub object name.
371 headers:
372 Message headers to store metainformation.
373 correlation_id:
374 Manual message correlation_id setter. correlation_id is a useful option to trace messages.
375 list:
376 Redis List object name to send message.
377 stream:
378 Redis Stream object name to send message.
379 maxlen:
380 Redis Stream maxlen publish option. Remove eldest message if maxlen exceeded.
381 pipeline:
382 Redis pipeline to use for publishing messages.
384 Returns:
385 int: The result of the publish operation, typically the number of messages published.
386 """
387 cmd = RedisPublishCommand(
388 message,
389 correlation_id=correlation_id or gen_cor_id(),
390 channel=channel,
391 list=list,
392 stream=stream,
393 maxlen=maxlen,
394 reply_to=reply_to,
395 headers=headers,
396 pipeline=pipeline,
397 _publish_type=PublishType.PUBLISH,
398 message_format=self.message_format,
399 )
401 result: int | bytes = await super()._basic_publish(
402 cmd,
403 producer=self.config.producer,
404 )
405 return result
407 @override
408 async def request( # type: ignore[override]
409 self,
410 message: "SendableMessage",
411 channel: str | None = None,
412 *,
413 list: str | None = None,
414 stream: str | None = None,
415 maxlen: int | None = None,
416 correlation_id: str | None = None,
417 headers: dict[str, Any] | None = None,
418 timeout: float | None = 30.0,
419 ) -> "RedisChannelMessage":
420 cmd = RedisPublishCommand(
421 message,
422 correlation_id=correlation_id or gen_cor_id(),
423 channel=channel,
424 list=list,
425 stream=stream,
426 maxlen=maxlen,
427 headers=headers,
428 timeout=timeout,
429 _publish_type=PublishType.REQUEST,
430 message_format=self.message_format,
431 )
432 msg: RedisChannelMessage = await super()._basic_request(
433 cmd,
434 producer=self.config.producer,
435 )
436 return msg
438 @override
439 async def publish_batch( # type: ignore[override]
440 self,
441 *messages: "SendableMessage",
442 list: str,
443 correlation_id: str | None = None,
444 reply_to: str = "",
445 headers: dict[str, Any] | None = None,
446 pipeline: Optional["Pipeline[bytes]"] = None,
447 ) -> int:
448 """Publish multiple messages to Redis List by one request.
450 Args:
451 *messages: Messages bodies to send.
452 list: Redis List object name to send messages.
453 correlation_id: Manual message **correlation_id** setter. **correlation_id** is a useful option to trace messages.
454 reply_to: Reply message destination PubSub object name.
455 headers: Message headers to store metainformation.
456 pipeline: Redis pipeline to use for publishing messages.
458 Returns:
459 int: The result of the batch publish operation.
460 """
461 cmd = RedisPublishCommand(
462 *messages,
463 list=list,
464 reply_to=reply_to,
465 headers=headers,
466 correlation_id=correlation_id or gen_cor_id(),
467 pipeline=pipeline,
468 _publish_type=PublishType.PUBLISH,
469 message_format=self.message_format,
470 )
472 result: int = await self._basic_publish_batch(
473 cmd,
474 producer=self.config.producer,
475 )
476 return result
478 @override
479 async def ping(self, timeout: float | None = 3) -> bool:
480 sleep_time = (timeout or 10) / 10
482 with move_on_after(timeout) as cancel_scope:
483 if self._connection is None: 483 ↛ 484line 483 didn't jump to line 484 because the condition on line 483 was never true
484 return False
486 while True:
487 if cancel_scope.cancel_called: 487 ↛ 488line 487 didn't jump to line 488 because the condition on line 487 was never true
488 return False
490 try:
491 if await self._connection.ping(): 491 ↛ 497line 491 didn't jump to line 497 because the condition on line 491 was always true
492 return True
494 except ConnectionError:
495 pass
497 await anyio.sleep(sleep_time)
499 return False
502def _resolve_url_options(
503 url: str,
504 *,
505 security: Optional["BaseSecurity"],
506 **kwargs: Any,
507) -> dict[str, Any]:
508 return {
509 **dict(parse_url(url)),
510 **parse_security(security),
511 **{k: v for k, v in kwargs.items() if v is not EMPTY},
512 }