Coverage for faststream / rabbit / schemas / queue.py: 91%

45 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from copy import deepcopy 

2from enum import Enum 

3from typing import TYPE_CHECKING, Any, Literal, Optional, TypedDict, Union, overload 

4 

5from faststream._internal.constants import EMPTY 

6from faststream._internal.proto import NameRequired 

7from faststream._internal.utils.path import compile_path 

8from faststream.exceptions import SetupError 

9 

10if TYPE_CHECKING: 

11 from aio_pika.abc import TimeoutType 

12 

13 

14class QueueType(str, Enum): 

15 """Queue types for RabbitMQ. 

16 

17 Enum should be lowercase to match RabbitMQ API. 

18 """ 

19 

20 CLASSIC = "classic" 

21 QUORUM = "quorum" 

22 STREAM = "stream" 

23 

24 

25class RabbitQueue(NameRequired): 

26 """A class to represent a RabbitMQ queue. 

27 

28 You can find information about all options in the official RabbitMQ documentation: 

29 

30 https://www.rabbitmq.com/docs/queues 

31 """ 

32 

33 __slots__ = ( 

34 "arguments", 

35 "auto_delete", 

36 "bind_arguments", 

37 "durable", 

38 "exclusive", 

39 "name", 

40 "path_regex", 

41 "robust", 

42 "routing_key", 

43 "timeout", 

44 ) 

45 

46 def __repr__(self) -> str: 

47 if self.declare: 

48 body = f", robust={self.robust}, durable={self.durable}, exclusive={self.exclusive}, auto_delete={self.auto_delete})" 

49 else: 

50 body = "" 

51 

52 if (r := self.routing()) != self.name: 

53 body = f", routing_key='{r}'{body}" 

54 

55 return f"{self.__class__.__name__}({self.name}{body})" 

56 

57 def __hash__(self) -> int: 

58 """Supports hash to store real objects in declarer.""" 

59 return sum( 

60 ( 

61 hash(self.name), 

62 int(self.durable), 

63 int(self.exclusive), 

64 int(self.auto_delete), 

65 ), 

66 ) 

67 

68 def routing(self) -> str: 

69 """Return real routing_key of object.""" 

70 return self.routing_key or self.name 

71 

72 def add_prefix(self, prefix: str) -> "RabbitQueue": 

73 new_q: RabbitQueue = deepcopy(self) 

74 

75 new_q.name = f"{prefix}{new_q.name}" 

76 

77 if new_q.routing_key: 

78 new_q.routing_key = f"{prefix}{new_q.routing_key}" 

79 

80 return new_q 

81 

82 @overload 

83 def __init__( 

84 self, 

85 name: str, 

86 queue_type: Literal[QueueType.CLASSIC] = QueueType.CLASSIC, 

87 durable: bool = EMPTY, 

88 exclusive: bool = False, 

89 declare: bool = True, 

90 auto_delete: bool = False, 

91 arguments: Optional["ClassicQueueArgs"] = None, 

92 timeout: "TimeoutType" = None, 

93 robust: bool = True, 

94 bind_arguments: dict[str, Any] | None = None, 

95 routing_key: str = "", 

96 ) -> None: ... 

97 

98 @overload 

99 def __init__( 

100 self, 

101 name: str, 

102 queue_type: Literal[QueueType.QUORUM], 

103 durable: Literal[True] = EMPTY, 

104 exclusive: bool = False, 

105 declare: bool = True, 

106 auto_delete: bool = False, 

107 arguments: Optional["QuorumQueueArgs"] = None, 

108 timeout: "TimeoutType" = None, 

109 robust: bool = True, 

110 bind_arguments: dict[str, Any] | None = None, 

111 routing_key: str = "", 

112 ) -> None: ... 

113 

114 @overload 

115 def __init__( 

116 self, 

117 name: str, 

118 queue_type: Literal[QueueType.STREAM], 

119 durable: Literal[True] = EMPTY, 

120 exclusive: bool = False, 

121 declare: bool = True, 

122 auto_delete: bool = False, 

123 arguments: Optional["StreamQueueArgs"] = None, 

124 timeout: "TimeoutType" = None, 

125 robust: bool = True, 

126 bind_arguments: dict[str, Any] | None = None, 

127 routing_key: str = "", 

128 ) -> None: ... 

129 

130 def __init__( 

131 self, 

132 name: str, 

133 queue_type: QueueType = QueueType.CLASSIC, 

134 durable: bool = EMPTY, 

135 exclusive: bool = False, 

136 declare: bool = True, 

137 auto_delete: bool = False, 

138 arguments: Union[ 

139 "QuorumQueueArgs", 

140 "ClassicQueueArgs", 

141 "StreamQueueArgs", 

142 dict[str, Any], 

143 None, 

144 ] = None, 

145 timeout: "TimeoutType" = None, 

146 robust: bool = True, 

147 bind_arguments: dict[str, Any] | None = None, 

148 routing_key: str = "", 

149 ) -> None: 

150 """Initialize the RabbitMQ queue. 

151 

152 :param name: RabbitMQ queue name. 

153 :param durable: Whether the object is durable. 

154 :param exclusive: The queue can be used only in the current connection and will be deleted after connection closed. 

155 :param declare: Whether to queue automatically or just connect to it. 

156 If you want to connect to an existing queue, set this to `False`. 

157 Copy of `passive` aio-pike option. 

158 :param auto_delete: The queue will be deleted after connection closed. 

159 :param arguments: Queue declaration arguments. 

160 You can find information about them in the official RabbitMQ documentation: 

161 https://www.rabbitmq.com/docs/queues#optional-arguments 

162 :param timeout: Send confirmation time from RabbitMQ. 

163 :param robust: Whether to declare queue object as restorable. 

164 :param bind_arguments: Queue-exchange binding options. 

165 :param routing_key: Explicit binding routing key. Uses name if not present. 

166 """ 

167 re, routing_key = compile_path( 

168 routing_key, 

169 replace_symbol="*", 

170 patch_regex=lambda x: x.replace(r"\#", ".+"), 

171 ) 

172 

173 if queue_type is QueueType.QUORUM or queue_type is QueueType.STREAM: 

174 if durable is EMPTY: 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true

175 durable = True 

176 elif not durable: 176 ↛ 177line 176 didn't jump to line 177 because the condition on line 176 was never true

177 error_msg = "Quorum and Stream queues must be durable" 

178 raise SetupError(error_msg) 

179 elif durable is EMPTY: 

180 durable = False 

181 

182 super().__init__(name) 

183 

184 self.path_regex = re 

185 self.durable = durable 

186 self.exclusive = exclusive 

187 self.bind_arguments = bind_arguments 

188 self.routing_key = routing_key 

189 self.robust = robust 

190 self.auto_delete = auto_delete 

191 self.arguments = {"x-queue-type": queue_type.value, **(arguments or {})} 

192 self.timeout = timeout 

193 self.declare = declare 

194 

195 

196CommonQueueArgs = TypedDict( 

197 "CommonQueueArgs", 

198 { 

199 "x-queue-leader-locator": Literal["client-local", "balanced"], 

200 "x-max-length-bytes": int, 

201 }, 

202 total=False, 

203) 

204 

205 

206SharedClassicAndQuorumQueueArgs = TypedDict( 

207 "SharedClassicAndQuorumQueueArgs", 

208 { 

209 "x-expires": int, 

210 "x-message-ttl": int, 

211 "x-single-active-consumer": bool, 

212 "x-dead-letter-exchange": str, 

213 "x-dead-letter-routing-key": str, 

214 "x-max-length": int, 

215 }, 

216 total=False, 

217) 

218 

219 

220ClassicQueueSpecificArgs = TypedDict( 

221 "ClassicQueueSpecificArgs", 

222 { 

223 "x-overflow": Literal["drop-head", "reject-publish", "reject-publish-dlx"], 

224 "x-queue-master-locator": Literal["client-local", "balanced"], 

225 "x-max-priority": int, 

226 "x-queue-mode": Literal["default", "lazy"], 

227 "x-queue-version": int, 

228 }, 

229 total=False, 

230) 

231 

232 

233QuorumQueueSpecificArgs = TypedDict( 

234 "QuorumQueueSpecificArgs", 

235 { 

236 "x-overflow": Literal["drop-head", "reject-publish"], 

237 "x-delivery-limit": int, 

238 "x-quorum-initial-group-size": int, 

239 "x-quorum-target-group-size": int, 

240 "x-dead-letter-strategy": Literal["at-most-once", "at-least-once"], 

241 "x-max-in-memory-length": int, 

242 "x-max-in-memory-bytes": int, 

243 }, 

244 total=False, 

245) 

246 

247 

248StreamQueueSpecificArgs = TypedDict( 

249 "StreamQueueSpecificArgs", 

250 { 

251 "x-max-age": str, 

252 "x-stream-max-segment-size-bytes": int, 

253 "x-stream-filter-size-bytes": int, 

254 "x-initial-cluster-size": int, 

255 }, 

256 total=False, 

257) 

258 

259 

260class ClassicQueueArgs( 

261 CommonQueueArgs, 

262 SharedClassicAndQuorumQueueArgs, 

263 ClassicQueueSpecificArgs, 

264): 

265 """rabbitmq-server/deps/rabbit/src/rabbit_classic_queue.erl.""" 

266 

267 

268class QuorumQueueArgs( 

269 CommonQueueArgs, 

270 SharedClassicAndQuorumQueueArgs, 

271 QuorumQueueSpecificArgs, 

272): 

273 """rabbitmq-server/deps/rabbit/src/rabbit_quorum_queue.erl.""" 

274 

275 

276class StreamQueueArgs(CommonQueueArgs, StreamQueueSpecificArgs): 

277 """rabbitmq-server/deps/rabbit/src/rabbit_stream_queue.erl."""