Coverage for faststream / rabbit / schemas / queue.py: 91%
45 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from copy import deepcopy
2from enum import Enum
3from typing import TYPE_CHECKING, Any, Literal, Optional, TypedDict, Union, overload
5from faststream._internal.constants import EMPTY
6from faststream._internal.proto import NameRequired
7from faststream._internal.utils.path import compile_path
8from faststream.exceptions import SetupError
10if TYPE_CHECKING:
11 from aio_pika.abc import TimeoutType
14class QueueType(str, Enum):
15 """Queue types for RabbitMQ.
17 Enum should be lowercase to match RabbitMQ API.
18 """
20 CLASSIC = "classic"
21 QUORUM = "quorum"
22 STREAM = "stream"
25class RabbitQueue(NameRequired):
26 """A class to represent a RabbitMQ queue.
28 You can find information about all options in the official RabbitMQ documentation:
30 https://www.rabbitmq.com/docs/queues
31 """
33 __slots__ = (
34 "arguments",
35 "auto_delete",
36 "bind_arguments",
37 "durable",
38 "exclusive",
39 "name",
40 "path_regex",
41 "robust",
42 "routing_key",
43 "timeout",
44 )
46 def __repr__(self) -> str:
47 if self.declare:
48 body = f", robust={self.robust}, durable={self.durable}, exclusive={self.exclusive}, auto_delete={self.auto_delete})"
49 else:
50 body = ""
52 if (r := self.routing()) != self.name:
53 body = f", routing_key='{r}'{body}"
55 return f"{self.__class__.__name__}({self.name}{body})"
57 def __hash__(self) -> int:
58 """Supports hash to store real objects in declarer."""
59 return sum(
60 (
61 hash(self.name),
62 int(self.durable),
63 int(self.exclusive),
64 int(self.auto_delete),
65 ),
66 )
68 def routing(self) -> str:
69 """Return real routing_key of object."""
70 return self.routing_key or self.name
72 def add_prefix(self, prefix: str) -> "RabbitQueue":
73 new_q: RabbitQueue = deepcopy(self)
75 new_q.name = f"{prefix}{new_q.name}"
77 if new_q.routing_key:
78 new_q.routing_key = f"{prefix}{new_q.routing_key}"
80 return new_q
82 @overload
83 def __init__(
84 self,
85 name: str,
86 queue_type: Literal[QueueType.CLASSIC] = QueueType.CLASSIC,
87 durable: bool = EMPTY,
88 exclusive: bool = False,
89 declare: bool = True,
90 auto_delete: bool = False,
91 arguments: Optional["ClassicQueueArgs"] = None,
92 timeout: "TimeoutType" = None,
93 robust: bool = True,
94 bind_arguments: dict[str, Any] | None = None,
95 routing_key: str = "",
96 ) -> None: ...
98 @overload
99 def __init__(
100 self,
101 name: str,
102 queue_type: Literal[QueueType.QUORUM],
103 durable: Literal[True] = EMPTY,
104 exclusive: bool = False,
105 declare: bool = True,
106 auto_delete: bool = False,
107 arguments: Optional["QuorumQueueArgs"] = None,
108 timeout: "TimeoutType" = None,
109 robust: bool = True,
110 bind_arguments: dict[str, Any] | None = None,
111 routing_key: str = "",
112 ) -> None: ...
114 @overload
115 def __init__(
116 self,
117 name: str,
118 queue_type: Literal[QueueType.STREAM],
119 durable: Literal[True] = EMPTY,
120 exclusive: bool = False,
121 declare: bool = True,
122 auto_delete: bool = False,
123 arguments: Optional["StreamQueueArgs"] = None,
124 timeout: "TimeoutType" = None,
125 robust: bool = True,
126 bind_arguments: dict[str, Any] | None = None,
127 routing_key: str = "",
128 ) -> None: ...
130 def __init__(
131 self,
132 name: str,
133 queue_type: QueueType = QueueType.CLASSIC,
134 durable: bool = EMPTY,
135 exclusive: bool = False,
136 declare: bool = True,
137 auto_delete: bool = False,
138 arguments: Union[
139 "QuorumQueueArgs",
140 "ClassicQueueArgs",
141 "StreamQueueArgs",
142 dict[str, Any],
143 None,
144 ] = None,
145 timeout: "TimeoutType" = None,
146 robust: bool = True,
147 bind_arguments: dict[str, Any] | None = None,
148 routing_key: str = "",
149 ) -> None:
150 """Initialize the RabbitMQ queue.
152 :param name: RabbitMQ queue name.
153 :param durable: Whether the object is durable.
154 :param exclusive: The queue can be used only in the current connection and will be deleted after connection closed.
155 :param declare: Whether to queue automatically or just connect to it.
156 If you want to connect to an existing queue, set this to `False`.
157 Copy of `passive` aio-pike option.
158 :param auto_delete: The queue will be deleted after connection closed.
159 :param arguments: Queue declaration arguments.
160 You can find information about them in the official RabbitMQ documentation:
161 https://www.rabbitmq.com/docs/queues#optional-arguments
162 :param timeout: Send confirmation time from RabbitMQ.
163 :param robust: Whether to declare queue object as restorable.
164 :param bind_arguments: Queue-exchange binding options.
165 :param routing_key: Explicit binding routing key. Uses name if not present.
166 """
167 re, routing_key = compile_path(
168 routing_key,
169 replace_symbol="*",
170 patch_regex=lambda x: x.replace(r"\#", ".+"),
171 )
173 if queue_type is QueueType.QUORUM or queue_type is QueueType.STREAM:
174 if durable is EMPTY: 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true
175 durable = True
176 elif not durable: 176 ↛ 177line 176 didn't jump to line 177 because the condition on line 176 was never true
177 error_msg = "Quorum and Stream queues must be durable"
178 raise SetupError(error_msg)
179 elif durable is EMPTY:
180 durable = False
182 super().__init__(name)
184 self.path_regex = re
185 self.durable = durable
186 self.exclusive = exclusive
187 self.bind_arguments = bind_arguments
188 self.routing_key = routing_key
189 self.robust = robust
190 self.auto_delete = auto_delete
191 self.arguments = {"x-queue-type": queue_type.value, **(arguments or {})}
192 self.timeout = timeout
193 self.declare = declare
196CommonQueueArgs = TypedDict(
197 "CommonQueueArgs",
198 {
199 "x-queue-leader-locator": Literal["client-local", "balanced"],
200 "x-max-length-bytes": int,
201 },
202 total=False,
203)
206SharedClassicAndQuorumQueueArgs = TypedDict(
207 "SharedClassicAndQuorumQueueArgs",
208 {
209 "x-expires": int,
210 "x-message-ttl": int,
211 "x-single-active-consumer": bool,
212 "x-dead-letter-exchange": str,
213 "x-dead-letter-routing-key": str,
214 "x-max-length": int,
215 },
216 total=False,
217)
220ClassicQueueSpecificArgs = TypedDict(
221 "ClassicQueueSpecificArgs",
222 {
223 "x-overflow": Literal["drop-head", "reject-publish", "reject-publish-dlx"],
224 "x-queue-master-locator": Literal["client-local", "balanced"],
225 "x-max-priority": int,
226 "x-queue-mode": Literal["default", "lazy"],
227 "x-queue-version": int,
228 },
229 total=False,
230)
233QuorumQueueSpecificArgs = TypedDict(
234 "QuorumQueueSpecificArgs",
235 {
236 "x-overflow": Literal["drop-head", "reject-publish"],
237 "x-delivery-limit": int,
238 "x-quorum-initial-group-size": int,
239 "x-quorum-target-group-size": int,
240 "x-dead-letter-strategy": Literal["at-most-once", "at-least-once"],
241 "x-max-in-memory-length": int,
242 "x-max-in-memory-bytes": int,
243 },
244 total=False,
245)
248StreamQueueSpecificArgs = TypedDict(
249 "StreamQueueSpecificArgs",
250 {
251 "x-max-age": str,
252 "x-stream-max-segment-size-bytes": int,
253 "x-stream-filter-size-bytes": int,
254 "x-initial-cluster-size": int,
255 },
256 total=False,
257)
260class ClassicQueueArgs(
261 CommonQueueArgs,
262 SharedClassicAndQuorumQueueArgs,
263 ClassicQueueSpecificArgs,
264):
265 """rabbitmq-server/deps/rabbit/src/rabbit_classic_queue.erl."""
268class QuorumQueueArgs(
269 CommonQueueArgs,
270 SharedClassicAndQuorumQueueArgs,
271 QuorumQueueSpecificArgs,
272):
273 """rabbitmq-server/deps/rabbit/src/rabbit_quorum_queue.erl."""
276class StreamQueueArgs(CommonQueueArgs, StreamQueueSpecificArgs):
277 """rabbitmq-server/deps/rabbit/src/rabbit_stream_queue.erl."""