Coverage for faststream / rabbit / broker / broker.py: 89%
63 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import logging
2from collections.abc import Iterable, Sequence
3from typing import (
4 TYPE_CHECKING,
5 Any,
6 Optional,
7 Union,
8 cast,
9)
10from urllib.parse import urlparse
12import anyio
13from aio_pika import IncomingMessage, RobustConnection, connect_robust
14from fast_depends import Provider, dependency_provider
15from typing_extensions import override
17from faststream.__about__ import SERVICE_NAME
18from faststream._internal.broker import BrokerUsecase
19from faststream._internal.constants import EMPTY
20from faststream._internal.context.repository import ContextRepo
21from faststream._internal.di import FastDependsConfig
22from faststream.message import gen_cor_id
23from faststream.middlewares import AckPolicy
24from faststream.rabbit.configs import RabbitBrokerConfig
25from faststream.rabbit.helpers.channel_manager import ChannelManagerImpl
26from faststream.rabbit.helpers.declarer import RabbitDeclarerImpl
27from faststream.rabbit.publisher.producer import (
28 AioPikaFastProducerImpl,
29)
30from faststream.rabbit.response import RabbitPublishCommand
31from faststream.rabbit.schemas import (
32 RABBIT_REPLY,
33 Channel,
34 RabbitExchange,
35 RabbitQueue,
36)
37from faststream.rabbit.security import parse_security
38from faststream.rabbit.utils import build_url
39from faststream.response.publish_type import PublishType
40from faststream.specification.schema import BrokerSpec
42from .logging import make_rabbit_logger_state
43from .registrator import RabbitRegistrator
45if TYPE_CHECKING:
46 from types import TracebackType
48 import aiormq
49 from aio_pika import (
50 RobustChannel,
51 RobustExchange,
52 RobustQueue,
53 )
54 from aio_pika.abc import DateType, HeadersType, SSLOptions, TimeoutType
55 from fast_depends.dependencies import Dependant
56 from fast_depends.library.serializer import SerializerProto
57 from yarl import URL
59 from faststream._internal.basic_types import LoggerProto
60 from faststream._internal.parser import CodecProto
61 from faststream._internal.types import (
62 BrokerMiddleware,
63 CustomCallable,
64 )
65 from faststream.rabbit.helpers import RabbitDeclarer
66 from faststream.rabbit.message import RabbitMessage
67 from faststream.rabbit.types import AioPikaSendableMessage
68 from faststream.rabbit.utils import RabbitClientProperties
69 from faststream.security import BaseSecurity
70 from faststream.specification.schema.extra import Tag, TagDict
73class RabbitBroker(
74 RabbitRegistrator,
75 BrokerUsecase[IncomingMessage, RobustConnection],
76):
77 """A class to represent a RabbitMQ broker."""
79 def __init__(
80 self,
81 url: Union[str, "URL", None] = "amqp://guest:guest@localhost:5672/",
82 *,
83 host: str | None = None,
84 port: int | None = None,
85 virtualhost: str | None = None,
86 ssl_options: Optional["SSLOptions"] = None,
87 client_properties: Optional["RabbitClientProperties"] = None,
88 timeout: "TimeoutType" = None,
89 fail_fast: bool = True,
90 reconnect_interval: "TimeoutType" = 5.0,
91 default_channel: Optional["Channel"] = None,
92 app_id: str | None = SERVICE_NAME,
93 # broker base args
94 graceful_timeout: float | None = None,
95 ack_policy: AckPolicy = EMPTY,
96 decoder: Optional["CustomCallable"] = None,
97 codec: Optional["CodecProto"] = None,
98 parser: Optional["CustomCallable"] = None,
99 dependencies: Iterable["Dependant"] = (),
100 middlewares: Sequence["BrokerMiddleware[Any, Any]"] = (),
101 routers: Iterable[RabbitRegistrator] = (),
102 # AsyncAPI args
103 security: Optional["BaseSecurity"] = None,
104 specification_url: str | None = None,
105 protocol: str | None = None,
106 protocol_version: str | None = "0.9.1",
107 description: str | None = None,
108 tags: Iterable[Union["Tag", "TagDict"]] = (),
109 # logging args
110 logger: Optional["LoggerProto"] = EMPTY,
111 log_level: int = logging.INFO,
112 # FastDepends args
113 apply_types: bool = True,
114 serializer: Optional["SerializerProto"] = EMPTY,
115 provider: Optional["Provider"] = None,
116 context: Optional["ContextRepo"] = None,
117 ) -> None:
118 """Initialize the RabbitBroker.
120 Args:
121 url: RabbitMQ destination location to connect.
122 host: Destination host. This option overrides `url` option host.
123 port: Destination port. This option overrides `url` option port.
124 virtualhost: RabbitMQ virtual host to use in the current broker connection.
125 ssl_options: Extra ssl options to establish connection.
126 client_properties: Add custom client capability.
127 timeout: Connection establishment timeout.
128 fail_fast: Broker startup raises `AMQPConnectionError` if RabbitMQ is unreachable.
129 reconnect_interval: Time to sleep between reconnection attempts.
130 default_channel: Default channel settings to use.
131 app_id: Application name to mark outgoing messages by.
132 graceful_timeout: Graceful shutdown timeout. Broker waits for all running subscribers completion before shut down.
133 ack_policy: Default acknowledgement policy for all subscribers. Individual subscribers can override.
134 decoder: Custom decoder object.
135 codec: Custom codec object.
136 parser: Custom parser object.
137 dependencies: Dependencies to apply to all broker subscribers.
138 middlewares: Middlewares to apply to all broker publishers/subscribers.
139 routers: RabbitRouters to build a broker with.
140 security: Security options to connect broker and generate AsyncAPI server security information.
141 specification_url: AsyncAPI hardcoded server addresses. Use `servers` if not specified.
142 protocol: AsyncAPI server protocol.
143 protocol_version: AsyncAPI server protocol version.
144 description: AsyncAPI server description.
145 tags: AsyncAPI server tags.
146 logger: User-specified logger to pass into Context and log service messages.
147 log_level: Service messages log level.
148 apply_types: Whether to use FastDepends or not.
149 serializer: FastDepends-compatible serializer to validate incoming messages.
150 provider: Provider for FastDepends.
151 context: Context for FastDepends.
152 """
153 security_args = parse_security(security)
155 amqp_url = build_url(
156 url,
157 host=host,
158 port=port,
159 virtualhost=virtualhost,
160 ssl_options=ssl_options,
161 client_properties=client_properties,
162 login=security_args.get("login"),
163 password=security_args.get("password"),
164 ssl=security_args.get("ssl"),
165 )
167 if specification_url is None:
168 specification_url = str(amqp_url)
170 # respect ascynapi_url argument scheme
171 built_asyncapi_url = urlparse(specification_url)
172 if protocol is None:
173 protocol = built_asyncapi_url.scheme
175 cm = ChannelManagerImpl(default_channel)
176 declarer = RabbitDeclarerImpl(cm)
178 producer = AioPikaFastProducerImpl(
179 declarer=declarer,
180 decoder=decoder,
181 parser=parser,
182 )
184 super().__init__(
185 # connection args
186 url=str(amqp_url),
187 ssl_context=security_args.get("ssl_context"),
188 timeout=timeout,
189 fail_fast=fail_fast,
190 reconnect_interval=reconnect_interval,
191 # Basic args
192 routers=routers,
193 config=RabbitBrokerConfig(
194 channel_manager=cm,
195 producer=producer,
196 declarer=declarer,
197 app_id=app_id,
198 virtual_host=built_asyncapi_url.path,
199 # both args
200 broker_middlewares=middlewares,
201 broker_parser=parser,
202 broker_decoder=decoder,
203 broker_codec=codec,
204 logger=make_rabbit_logger_state(
205 logger=logger,
206 log_level=log_level,
207 ),
208 fd_config=FastDependsConfig(
209 use_fastdepends=apply_types,
210 serializer=serializer,
211 provider=provider or dependency_provider,
212 context=context or ContextRepo(),
213 ),
214 # subscriber args
215 broker_dependencies=dependencies,
216 graceful_timeout=graceful_timeout,
217 ack_policy=ack_policy,
218 extra_context={
219 "broker": self,
220 },
221 ),
222 specification=BrokerSpec(
223 description=description,
224 url=[specification_url],
225 protocol=protocol or built_asyncapi_url.scheme,
226 protocol_version=protocol_version,
227 security=security,
228 tags=tags,
229 ),
230 )
232 self._channel: RobustChannel | None = None
234 @override
235 async def _connect(self) -> "RobustConnection":
236 connection = cast(
237 "RobustConnection",
238 await connect_robust(**self._connection_kwargs),
239 )
241 if self._channel is None: 241 ↛ 245line 241 didn't jump to line 245 because the condition on line 241 was always true
242 self.config.connect(connection)
243 self._channel = await self.config.channel_manager.get_channel()
245 return connection
247 async def stop(
248 self,
249 exc_type: type[BaseException] | None = None,
250 exc_val: BaseException | None = None,
251 exc_tb: Optional["TracebackType"] = None,
252 ) -> None:
253 await super().stop(exc_type, exc_val, exc_tb)
255 if self._channel is not None:
256 if not self._channel.is_closed: 256 ↛ 259line 256 didn't jump to line 259 because the condition on line 256 was always true
257 await self._channel.close()
259 self._channel = None
261 if self._connection is not None:
262 await self._connection.close()
263 self._connection = None
265 self.config.disconnect()
267 async def start(self) -> None:
268 """Connect broker to RabbitMQ and startup all subscribers."""
269 await self.connect()
270 await self.declare_queue(RABBIT_REPLY)
271 await super().start()
273 @override
274 async def publish(
275 self,
276 message: "AioPikaSendableMessage" = None,
277 queue: Union["RabbitQueue", str] = "",
278 exchange: Union["RabbitExchange", str, None] = None,
279 *,
280 routing_key: str = "",
281 # publish options
282 mandatory: bool = True,
283 immediate: bool = False,
284 timeout: "TimeoutType" = None,
285 persist: bool = False,
286 reply_to: str | None = None,
287 correlation_id: str | None = None,
288 # message options
289 headers: Optional["HeadersType"] = None,
290 content_type: str | None = None,
291 content_encoding: str | None = None,
292 expiration: Optional["DateType"] = None,
293 message_id: str | None = None,
294 timestamp: Optional["DateType"] = None,
295 message_type: str | None = None,
296 user_id: str | None = None,
297 priority: int | None = None,
298 ) -> Optional["aiormq.abc.ConfirmationFrameType"]:
299 """Publish message directly.
301 This method allows you to publish message in not AsyncAPI-documented way. You can use it in another frameworks
302 applications or to publish messages from time to time.
304 Please, use `@broker.publisher(...)` or `broker.publisher(...).publish(...)` instead in a regular way.
306 Args:
307 message:
308 Message body to send.
309 queue:
310 Message routing key to publish with.
311 exchange:
312 Target exchange to publish message to.
313 routing_key:
314 Message routing key to publish with. Overrides `queue` option if presented.
315 mandatory:
316 Client waits for confirmation that the message is placed to some queue. RabbitMQ returns message to client if there is no suitable queue.
317 immediate:
318 Client expects that there is consumer ready to take the message to work. RabbitMQ returns message to client if there is no suitable consumer.
319 timeout:
320 Send confirmation time from RabbitMQ.
321 persist:
322 Restore the message on RabbitMQ reboot.
323 reply_to:
324 Reply message routing key to send with (always sending to default exchange).
325 correlation_id:
326 Manual message **correlation_id** setter. **correlation_id** is a useful option to trace messages.
327 headers:
328 Message headers to store metainformation.
329 content_type:
330 Message **content-type** header. Used by application, not core RabbitMQ. Will be set automatically if not specified.
331 content_encoding:
332 Message body content encoding, e.g. **gzip**.
333 expiration:
334 Message expiration (lifetime) in seconds (or datetime or timedelta).
335 message_id:
336 Arbitrary message id. Generated automatically if not present.
337 timestamp:
338 Message publish timestamp. Generated automatically if not presented.
339 message_type:
340 Application-specific message type, e.g. **orders.created**.
341 user_id:
342 Publisher connection User ID, validated if set.
343 priority:
344 The message priority (0 by default).
346 Returns:
347 An optional `aiormq.abc.ConfirmationFrameType` representing the confirmation frame if RabbitMQ is configured to send confirmations.
348 """
349 cmd = RabbitPublishCommand(
350 message,
351 routing_key=routing_key or RabbitQueue.validate(queue).routing(),
352 exchange=RabbitExchange.validate(exchange),
353 correlation_id=correlation_id or gen_cor_id(),
354 app_id=self.config.app_id,
355 mandatory=mandatory,
356 immediate=immediate,
357 persist=persist,
358 reply_to=reply_to,
359 headers=headers,
360 content_type=content_type,
361 content_encoding=content_encoding,
362 expiration=expiration,
363 message_id=message_id,
364 message_type=message_type,
365 timestamp=timestamp,
366 user_id=user_id,
367 timeout=timeout,
368 priority=priority,
369 _publish_type=PublishType.PUBLISH,
370 )
372 result: aiormq.abc.ConfirmationFrameType | None = await super()._basic_publish(
373 cmd,
374 producer=self._producer,
375 )
376 return result
378 @override
379 async def request( # type: ignore[override]
380 self,
381 message: "AioPikaSendableMessage" = None,
382 queue: Union["RabbitQueue", str] = "",
383 exchange: Union["RabbitExchange", str, None] = None,
384 *,
385 routing_key: str = "",
386 mandatory: bool = True,
387 immediate: bool = False,
388 timeout: "TimeoutType" = None,
389 persist: bool = False,
390 # message args
391 correlation_id: str | None = None,
392 headers: Optional["HeadersType"] = None,
393 content_type: str | None = None,
394 content_encoding: str | None = None,
395 expiration: Optional["DateType"] = None,
396 message_id: str | None = None,
397 timestamp: Optional["DateType"] = None,
398 message_type: str | None = None,
399 user_id: str | None = None,
400 priority: int | None = None,
401 ) -> "RabbitMessage":
402 """Make a synchronous request to RabbitMQ.
404 This method uses Direct Reply-To pattern to send a message and wait for a reply.
405 It is a blocking call and will wait for a reply until timeout.
407 Args:
408 message: Message body to send.
409 queue: Message routing key to publish with.
410 exchange: Target exchange to publish message to.
411 routing_key: Message routing key to publish with. Overrides `queue` option if presented.
412 mandatory: Client waits for confirmation that the message is placed to some queue.
413 RabbitMQ returns message to client if there is no suitable queue.
414 immediate: Client expects that there is a consumer ready to take the message to work.
415 RabbitMQ returns message to client if there is no suitable consumer.
416 timeout: Send confirmation time from RabbitMQ.
417 persist: Restore the message on RabbitMQ reboot.
418 correlation_id: Manual message **correlation_id** setter. **correlation_id** is a useful option to trace messages.
419 headers: Message headers to store metainformation.
420 content_type: Message **content-type** header. Used by application, not core RabbitMQ.
421 Will be set automatically if not specified.
422 content_encoding: Message body content encoding, e.g. **gzip**.
423 expiration: Message expiration (lifetime) in seconds (or datetime or timedelta).
424 message_id: Arbitrary message id. Generated automatically if not present.
425 timestamp: Message publish timestamp. Generated automatically if not present.
426 message_type: Application-specific message type, e.g. **orders.created**.
427 user_id: Publisher connection User ID, validated if set.
428 priority: The message priority (0 by default).
429 """
430 cmd = RabbitPublishCommand(
431 message,
432 routing_key=routing_key or RabbitQueue.validate(queue).routing(),
433 exchange=RabbitExchange.validate(exchange),
434 correlation_id=correlation_id or gen_cor_id(),
435 app_id=self.config.app_id,
436 mandatory=mandatory,
437 immediate=immediate,
438 persist=persist,
439 headers=headers,
440 content_type=content_type,
441 content_encoding=content_encoding,
442 expiration=expiration,
443 message_id=message_id,
444 message_type=message_type,
445 timestamp=timestamp,
446 user_id=user_id,
447 timeout=timeout,
448 priority=priority,
449 _publish_type=PublishType.REQUEST,
450 )
452 msg: RabbitMessage = await super()._basic_request(cmd, producer=self._producer)
453 return msg
455 async def declare_queue(self, queue: "RabbitQueue") -> "RobustQueue":
456 """Declares queue object in **RabbitMQ**."""
457 declarer: RabbitDeclarer = self.config.declarer
458 return await declarer.declare_queue(queue)
460 async def declare_exchange(self, exchange: "RabbitExchange") -> "RobustExchange":
461 """Declares exchange object in **RabbitMQ**."""
462 declarer: RabbitDeclarer = self.config.declarer
463 return await declarer.declare_exchange(exchange)
465 @override
466 async def ping(self, timeout: float | None) -> bool:
467 sleep_time = (timeout or 10) / 10
469 with anyio.move_on_after(timeout) as cancel_scope:
470 if self._connection is None: 470 ↛ 471line 470 didn't jump to line 471 because the condition on line 470 was never true
471 return False
473 while True:
474 if cancel_scope.cancel_called: 474 ↛ 475line 474 didn't jump to line 475 because the condition on line 474 was never true
475 return False
477 if self._connection.connected.is_set(): 477 ↛ 480line 477 didn't jump to line 480 because the condition on line 477 was always true
478 return True
480 await anyio.sleep(sleep_time)
482 return False