Coverage for faststream / rabbit / helpers / declarer.py: 98%
33 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import TYPE_CHECKING, Optional, Protocol, cast
3from faststream._internal.constants import EMPTY
5if TYPE_CHECKING:
6 import aio_pika
8 from faststream.rabbit.schemas import Channel, RabbitExchange, RabbitQueue
10 from .channel_manager import ChannelManager
13class RabbitDeclarer(Protocol):
14 """An utility class to declare RabbitMQ queues and exchanges."""
16 def disconnect(self) -> None: ...
18 async def declare_queue(
19 self,
20 queue: "RabbitQueue",
21 declare: bool = EMPTY,
22 *,
23 channel: Optional["Channel"] = None,
24 ) -> "aio_pika.RobustQueue":
25 """Declare a queue."""
26 ...
28 async def declare_exchange(
29 self,
30 exchange: "RabbitExchange",
31 declare: bool = EMPTY,
32 *,
33 channel: Optional["Channel"] = None,
34 ) -> "aio_pika.RobustExchange":
35 """Declare an exchange, parent exchanges and bind them each other."""
36 ...
39class FakeRabbitDeclarer(RabbitDeclarer):
40 def disconnect(self) -> None:
41 raise NotImplementedError
43 async def declare_queue(
44 self,
45 queue: "RabbitQueue",
46 declare: bool = EMPTY,
47 *,
48 channel: Optional["Channel"] = None,
49 ) -> "aio_pika.RobustQueue":
50 raise NotImplementedError
52 async def declare_exchange(
53 self,
54 exchange: "RabbitExchange",
55 declare: bool = EMPTY,
56 *,
57 channel: Optional["Channel"] = None,
58 ) -> "aio_pika.RobustExchange":
59 raise NotImplementedError
62class RabbitDeclarerImpl(RabbitDeclarer):
63 __slots__ = ("__channel_manager", "__exchanges", "__queues")
65 def __init__(self, channel_manager: "ChannelManager") -> None:
66 self.__channel_manager = channel_manager
67 self._queues: dict[RabbitQueue, aio_pika.RobustQueue] = {}
68 self._exchanges: dict[RabbitExchange, aio_pika.RobustExchange] = {}
70 def __repr__(self) -> str:
71 return f"{self.__class__.__name__}(queues={list(self._queues.keys())}, exchanges={list(self._exchanges.keys())})"
73 def disconnect(self) -> None:
74 self._queues.clear()
75 self._exchanges.clear()
77 async def declare_queue(
78 self,
79 queue: "RabbitQueue",
80 declare: bool = EMPTY,
81 *,
82 channel: Optional["Channel"] = None,
83 ) -> "aio_pika.RobustQueue":
84 if (q := self._queues.get(queue)) is None:
85 if declare is EMPTY: 85 ↛ 88line 85 didn't jump to line 88 because the condition on line 85 was always true
86 declare = queue.declare
88 channel_obj = await self.__channel_manager.get_channel(channel)
90 self._queues[queue] = q = cast(
91 "aio_pika.RobustQueue",
92 await channel_obj.declare_queue(
93 name=queue.name,
94 durable=queue.durable,
95 exclusive=queue.exclusive,
96 passive=not declare,
97 auto_delete=queue.auto_delete,
98 arguments=queue.arguments,
99 timeout=queue.timeout,
100 robust=queue.robust,
101 ),
102 )
104 return q
106 async def declare_exchange(
107 self,
108 exchange: "RabbitExchange",
109 declare: bool = EMPTY,
110 *,
111 channel: Optional["Channel"] = None,
112 ) -> "aio_pika.RobustExchange":
113 channel_obj = await self.__channel_manager.get_channel(channel)
115 if not exchange.name:
116 return channel_obj.default_exchange
118 if (exch := self._exchanges.get(exchange)) is None:
119 if declare is EMPTY:
120 declare = exchange.declare
122 self._exchanges[exchange] = exch = cast(
123 "aio_pika.RobustExchange",
124 await channel_obj.declare_exchange(
125 name=exchange.name,
126 type=exchange.type.value,
127 durable=exchange.durable,
128 auto_delete=exchange.auto_delete,
129 passive=not declare,
130 arguments=exchange.arguments,
131 timeout=exchange.timeout,
132 robust=exchange.robust,
133 internal=False, # deprecated RMQ option
134 ),
135 )
137 if exchange.bind_to is not None:
138 parent = await self.declare_exchange(exchange.bind_to)
139 await exch.bind(
140 exchange=parent,
141 routing_key=exchange.routing(),
142 arguments=exchange.bind_arguments,
143 timeout=exchange.timeout,
144 robust=exchange.robust,
145 )
147 return exch