Coverage for faststream / rabbit / broker / broker.py: 89%

63 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import logging 

2from collections.abc import Iterable, Sequence 

3from typing import ( 

4 TYPE_CHECKING, 

5 Any, 

6 Optional, 

7 Union, 

8 cast, 

9) 

10from urllib.parse import urlparse 

11 

12import anyio 

13from aio_pika import IncomingMessage, RobustConnection, connect_robust 

14from fast_depends import Provider, dependency_provider 

15from typing_extensions import override 

16 

17from faststream.__about__ import SERVICE_NAME 

18from faststream._internal.broker import BrokerUsecase 

19from faststream._internal.constants import EMPTY 

20from faststream._internal.context.repository import ContextRepo 

21from faststream._internal.di import FastDependsConfig 

22from faststream.message import gen_cor_id 

23from faststream.middlewares import AckPolicy 

24from faststream.rabbit.configs import RabbitBrokerConfig 

25from faststream.rabbit.helpers.channel_manager import ChannelManagerImpl 

26from faststream.rabbit.helpers.declarer import RabbitDeclarerImpl 

27from faststream.rabbit.publisher.producer import ( 

28 AioPikaFastProducerImpl, 

29) 

30from faststream.rabbit.response import RabbitPublishCommand 

31from faststream.rabbit.schemas import ( 

32 RABBIT_REPLY, 

33 Channel, 

34 RabbitExchange, 

35 RabbitQueue, 

36) 

37from faststream.rabbit.security import parse_security 

38from faststream.rabbit.utils import build_url 

39from faststream.response.publish_type import PublishType 

40from faststream.specification.schema import BrokerSpec 

41 

42from .logging import make_rabbit_logger_state 

43from .registrator import RabbitRegistrator 

44 

45if TYPE_CHECKING: 

46 from types import TracebackType 

47 

48 import aiormq 

49 from aio_pika import ( 

50 RobustChannel, 

51 RobustExchange, 

52 RobustQueue, 

53 ) 

54 from aio_pika.abc import DateType, HeadersType, SSLOptions, TimeoutType 

55 from fast_depends.dependencies import Dependant 

56 from fast_depends.library.serializer import SerializerProto 

57 from yarl import URL 

58 

59 from faststream._internal.basic_types import LoggerProto 

60 from faststream._internal.parser import CodecProto 

61 from faststream._internal.types import ( 

62 BrokerMiddleware, 

63 CustomCallable, 

64 ) 

65 from faststream.rabbit.helpers import RabbitDeclarer 

66 from faststream.rabbit.message import RabbitMessage 

67 from faststream.rabbit.types import AioPikaSendableMessage 

68 from faststream.rabbit.utils import RabbitClientProperties 

69 from faststream.security import BaseSecurity 

70 from faststream.specification.schema.extra import Tag, TagDict 

71 

72 

73class RabbitBroker( 

74 RabbitRegistrator, 

75 BrokerUsecase[IncomingMessage, RobustConnection], 

76): 

77 """A class to represent a RabbitMQ broker.""" 

78 

79 def __init__( 

80 self, 

81 url: Union[str, "URL", None] = "amqp://guest:guest@localhost:5672/", 

82 *, 

83 host: str | None = None, 

84 port: int | None = None, 

85 virtualhost: str | None = None, 

86 ssl_options: Optional["SSLOptions"] = None, 

87 client_properties: Optional["RabbitClientProperties"] = None, 

88 timeout: "TimeoutType" = None, 

89 fail_fast: bool = True, 

90 reconnect_interval: "TimeoutType" = 5.0, 

91 default_channel: Optional["Channel"] = None, 

92 app_id: str | None = SERVICE_NAME, 

93 # broker base args 

94 graceful_timeout: float | None = None, 

95 ack_policy: AckPolicy = EMPTY, 

96 decoder: Optional["CustomCallable"] = None, 

97 codec: Optional["CodecProto"] = None, 

98 parser: Optional["CustomCallable"] = None, 

99 dependencies: Iterable["Dependant"] = (), 

100 middlewares: Sequence["BrokerMiddleware[Any, Any]"] = (), 

101 routers: Iterable[RabbitRegistrator] = (), 

102 # AsyncAPI args 

103 security: Optional["BaseSecurity"] = None, 

104 specification_url: str | None = None, 

105 protocol: str | None = None, 

106 protocol_version: str | None = "0.9.1", 

107 description: str | None = None, 

108 tags: Iterable[Union["Tag", "TagDict"]] = (), 

109 # logging args 

110 logger: Optional["LoggerProto"] = EMPTY, 

111 log_level: int = logging.INFO, 

112 # FastDepends args 

113 apply_types: bool = True, 

114 serializer: Optional["SerializerProto"] = EMPTY, 

115 provider: Optional["Provider"] = None, 

116 context: Optional["ContextRepo"] = None, 

117 ) -> None: 

118 """Initialize the RabbitBroker. 

119 

120 Args: 

121 url: RabbitMQ destination location to connect. 

122 host: Destination host. This option overrides `url` option host. 

123 port: Destination port. This option overrides `url` option port. 

124 virtualhost: RabbitMQ virtual host to use in the current broker connection. 

125 ssl_options: Extra ssl options to establish connection. 

126 client_properties: Add custom client capability. 

127 timeout: Connection establishment timeout. 

128 fail_fast: Broker startup raises `AMQPConnectionError` if RabbitMQ is unreachable. 

129 reconnect_interval: Time to sleep between reconnection attempts. 

130 default_channel: Default channel settings to use. 

131 app_id: Application name to mark outgoing messages by. 

132 graceful_timeout: Graceful shutdown timeout. Broker waits for all running subscribers completion before shut down. 

133 ack_policy: Default acknowledgement policy for all subscribers. Individual subscribers can override. 

134 decoder: Custom decoder object. 

135 codec: Custom codec object. 

136 parser: Custom parser object. 

137 dependencies: Dependencies to apply to all broker subscribers. 

138 middlewares: Middlewares to apply to all broker publishers/subscribers. 

139 routers: RabbitRouters to build a broker with. 

140 security: Security options to connect broker and generate AsyncAPI server security information. 

141 specification_url: AsyncAPI hardcoded server addresses. Use `servers` if not specified. 

142 protocol: AsyncAPI server protocol. 

143 protocol_version: AsyncAPI server protocol version. 

144 description: AsyncAPI server description. 

145 tags: AsyncAPI server tags. 

146 logger: User-specified logger to pass into Context and log service messages. 

147 log_level: Service messages log level. 

148 apply_types: Whether to use FastDepends or not. 

149 serializer: FastDepends-compatible serializer to validate incoming messages. 

150 provider: Provider for FastDepends. 

151 context: Context for FastDepends. 

152 """ 

153 security_args = parse_security(security) 

154 

155 amqp_url = build_url( 

156 url, 

157 host=host, 

158 port=port, 

159 virtualhost=virtualhost, 

160 ssl_options=ssl_options, 

161 client_properties=client_properties, 

162 login=security_args.get("login"), 

163 password=security_args.get("password"), 

164 ssl=security_args.get("ssl"), 

165 ) 

166 

167 if specification_url is None: 

168 specification_url = str(amqp_url) 

169 

170 # respect ascynapi_url argument scheme 

171 built_asyncapi_url = urlparse(specification_url) 

172 if protocol is None: 

173 protocol = built_asyncapi_url.scheme 

174 

175 cm = ChannelManagerImpl(default_channel) 

176 declarer = RabbitDeclarerImpl(cm) 

177 

178 producer = AioPikaFastProducerImpl( 

179 declarer=declarer, 

180 decoder=decoder, 

181 parser=parser, 

182 ) 

183 

184 super().__init__( 

185 # connection args 

186 url=str(amqp_url), 

187 ssl_context=security_args.get("ssl_context"), 

188 timeout=timeout, 

189 fail_fast=fail_fast, 

190 reconnect_interval=reconnect_interval, 

191 # Basic args 

192 routers=routers, 

193 config=RabbitBrokerConfig( 

194 channel_manager=cm, 

195 producer=producer, 

196 declarer=declarer, 

197 app_id=app_id, 

198 virtual_host=built_asyncapi_url.path, 

199 # both args 

200 broker_middlewares=middlewares, 

201 broker_parser=parser, 

202 broker_decoder=decoder, 

203 broker_codec=codec, 

204 logger=make_rabbit_logger_state( 

205 logger=logger, 

206 log_level=log_level, 

207 ), 

208 fd_config=FastDependsConfig( 

209 use_fastdepends=apply_types, 

210 serializer=serializer, 

211 provider=provider or dependency_provider, 

212 context=context or ContextRepo(), 

213 ), 

214 # subscriber args 

215 broker_dependencies=dependencies, 

216 graceful_timeout=graceful_timeout, 

217 ack_policy=ack_policy, 

218 extra_context={ 

219 "broker": self, 

220 }, 

221 ), 

222 specification=BrokerSpec( 

223 description=description, 

224 url=[specification_url], 

225 protocol=protocol or built_asyncapi_url.scheme, 

226 protocol_version=protocol_version, 

227 security=security, 

228 tags=tags, 

229 ), 

230 ) 

231 

232 self._channel: RobustChannel | None = None 

233 

234 @override 

235 async def _connect(self) -> "RobustConnection": 

236 connection = cast( 

237 "RobustConnection", 

238 await connect_robust(**self._connection_kwargs), 

239 ) 

240 

241 if self._channel is None: 241 ↛ 245line 241 didn't jump to line 245 because the condition on line 241 was always true

242 self.config.connect(connection) 

243 self._channel = await self.config.channel_manager.get_channel() 

244 

245 return connection 

246 

247 async def stop( 

248 self, 

249 exc_type: type[BaseException] | None = None, 

250 exc_val: BaseException | None = None, 

251 exc_tb: Optional["TracebackType"] = None, 

252 ) -> None: 

253 await super().stop(exc_type, exc_val, exc_tb) 

254 

255 if self._channel is not None: 

256 if not self._channel.is_closed: 256 ↛ 259line 256 didn't jump to line 259 because the condition on line 256 was always true

257 await self._channel.close() 

258 

259 self._channel = None 

260 

261 if self._connection is not None: 

262 await self._connection.close() 

263 self._connection = None 

264 

265 self.config.disconnect() 

266 

267 async def start(self) -> None: 

268 """Connect broker to RabbitMQ and startup all subscribers.""" 

269 await self.connect() 

270 await self.declare_queue(RABBIT_REPLY) 

271 await super().start() 

272 

273 @override 

274 async def publish( 

275 self, 

276 message: "AioPikaSendableMessage" = None, 

277 queue: Union["RabbitQueue", str] = "", 

278 exchange: Union["RabbitExchange", str, None] = None, 

279 *, 

280 routing_key: str = "", 

281 # publish options 

282 mandatory: bool = True, 

283 immediate: bool = False, 

284 timeout: "TimeoutType" = None, 

285 persist: bool = False, 

286 reply_to: str | None = None, 

287 correlation_id: str | None = None, 

288 # message options 

289 headers: Optional["HeadersType"] = None, 

290 content_type: str | None = None, 

291 content_encoding: str | None = None, 

292 expiration: Optional["DateType"] = None, 

293 message_id: str | None = None, 

294 timestamp: Optional["DateType"] = None, 

295 message_type: str | None = None, 

296 user_id: str | None = None, 

297 priority: int | None = None, 

298 ) -> Optional["aiormq.abc.ConfirmationFrameType"]: 

299 """Publish message directly. 

300 

301 This method allows you to publish message in not AsyncAPI-documented way. You can use it in another frameworks 

302 applications or to publish messages from time to time. 

303 

304 Please, use `@broker.publisher(...)` or `broker.publisher(...).publish(...)` instead in a regular way. 

305 

306 Args: 

307 message: 

308 Message body to send. 

309 queue: 

310 Message routing key to publish with. 

311 exchange: 

312 Target exchange to publish message to. 

313 routing_key: 

314 Message routing key to publish with. Overrides `queue` option if presented. 

315 mandatory: 

316 Client waits for confirmation that the message is placed to some queue. RabbitMQ returns message to client if there is no suitable queue. 

317 immediate: 

318 Client expects that there is consumer ready to take the message to work. RabbitMQ returns message to client if there is no suitable consumer. 

319 timeout: 

320 Send confirmation time from RabbitMQ. 

321 persist: 

322 Restore the message on RabbitMQ reboot. 

323 reply_to: 

324 Reply message routing key to send with (always sending to default exchange). 

325 correlation_id: 

326 Manual message **correlation_id** setter. **correlation_id** is a useful option to trace messages. 

327 headers: 

328 Message headers to store metainformation. 

329 content_type: 

330 Message **content-type** header. Used by application, not core RabbitMQ. Will be set automatically if not specified. 

331 content_encoding: 

332 Message body content encoding, e.g. **gzip**. 

333 expiration: 

334 Message expiration (lifetime) in seconds (or datetime or timedelta). 

335 message_id: 

336 Arbitrary message id. Generated automatically if not present. 

337 timestamp: 

338 Message publish timestamp. Generated automatically if not presented. 

339 message_type: 

340 Application-specific message type, e.g. **orders.created**. 

341 user_id: 

342 Publisher connection User ID, validated if set. 

343 priority: 

344 The message priority (0 by default). 

345 

346 Returns: 

347 An optional `aiormq.abc.ConfirmationFrameType` representing the confirmation frame if RabbitMQ is configured to send confirmations. 

348 """ 

349 cmd = RabbitPublishCommand( 

350 message, 

351 routing_key=routing_key or RabbitQueue.validate(queue).routing(), 

352 exchange=RabbitExchange.validate(exchange), 

353 correlation_id=correlation_id or gen_cor_id(), 

354 app_id=self.config.app_id, 

355 mandatory=mandatory, 

356 immediate=immediate, 

357 persist=persist, 

358 reply_to=reply_to, 

359 headers=headers, 

360 content_type=content_type, 

361 content_encoding=content_encoding, 

362 expiration=expiration, 

363 message_id=message_id, 

364 message_type=message_type, 

365 timestamp=timestamp, 

366 user_id=user_id, 

367 timeout=timeout, 

368 priority=priority, 

369 _publish_type=PublishType.PUBLISH, 

370 ) 

371 

372 result: aiormq.abc.ConfirmationFrameType | None = await super()._basic_publish( 

373 cmd, 

374 producer=self._producer, 

375 ) 

376 return result 

377 

378 @override 

379 async def request( # type: ignore[override] 

380 self, 

381 message: "AioPikaSendableMessage" = None, 

382 queue: Union["RabbitQueue", str] = "", 

383 exchange: Union["RabbitExchange", str, None] = None, 

384 *, 

385 routing_key: str = "", 

386 mandatory: bool = True, 

387 immediate: bool = False, 

388 timeout: "TimeoutType" = None, 

389 persist: bool = False, 

390 # message args 

391 correlation_id: str | None = None, 

392 headers: Optional["HeadersType"] = None, 

393 content_type: str | None = None, 

394 content_encoding: str | None = None, 

395 expiration: Optional["DateType"] = None, 

396 message_id: str | None = None, 

397 timestamp: Optional["DateType"] = None, 

398 message_type: str | None = None, 

399 user_id: str | None = None, 

400 priority: int | None = None, 

401 ) -> "RabbitMessage": 

402 """Make a synchronous request to RabbitMQ. 

403 

404 This method uses Direct Reply-To pattern to send a message and wait for a reply. 

405 It is a blocking call and will wait for a reply until timeout. 

406 

407 Args: 

408 message: Message body to send. 

409 queue: Message routing key to publish with. 

410 exchange: Target exchange to publish message to. 

411 routing_key: Message routing key to publish with. Overrides `queue` option if presented. 

412 mandatory: Client waits for confirmation that the message is placed to some queue. 

413 RabbitMQ returns message to client if there is no suitable queue. 

414 immediate: Client expects that there is a consumer ready to take the message to work. 

415 RabbitMQ returns message to client if there is no suitable consumer. 

416 timeout: Send confirmation time from RabbitMQ. 

417 persist: Restore the message on RabbitMQ reboot. 

418 correlation_id: Manual message **correlation_id** setter. **correlation_id** is a useful option to trace messages. 

419 headers: Message headers to store metainformation. 

420 content_type: Message **content-type** header. Used by application, not core RabbitMQ. 

421 Will be set automatically if not specified. 

422 content_encoding: Message body content encoding, e.g. **gzip**. 

423 expiration: Message expiration (lifetime) in seconds (or datetime or timedelta). 

424 message_id: Arbitrary message id. Generated automatically if not present. 

425 timestamp: Message publish timestamp. Generated automatically if not present. 

426 message_type: Application-specific message type, e.g. **orders.created**. 

427 user_id: Publisher connection User ID, validated if set. 

428 priority: The message priority (0 by default). 

429 """ 

430 cmd = RabbitPublishCommand( 

431 message, 

432 routing_key=routing_key or RabbitQueue.validate(queue).routing(), 

433 exchange=RabbitExchange.validate(exchange), 

434 correlation_id=correlation_id or gen_cor_id(), 

435 app_id=self.config.app_id, 

436 mandatory=mandatory, 

437 immediate=immediate, 

438 persist=persist, 

439 headers=headers, 

440 content_type=content_type, 

441 content_encoding=content_encoding, 

442 expiration=expiration, 

443 message_id=message_id, 

444 message_type=message_type, 

445 timestamp=timestamp, 

446 user_id=user_id, 

447 timeout=timeout, 

448 priority=priority, 

449 _publish_type=PublishType.REQUEST, 

450 ) 

451 

452 msg: RabbitMessage = await super()._basic_request(cmd, producer=self._producer) 

453 return msg 

454 

455 async def declare_queue(self, queue: "RabbitQueue") -> "RobustQueue": 

456 """Declares queue object in **RabbitMQ**.""" 

457 declarer: RabbitDeclarer = self.config.declarer 

458 return await declarer.declare_queue(queue) 

459 

460 async def declare_exchange(self, exchange: "RabbitExchange") -> "RobustExchange": 

461 """Declares exchange object in **RabbitMQ**.""" 

462 declarer: RabbitDeclarer = self.config.declarer 

463 return await declarer.declare_exchange(exchange) 

464 

465 @override 

466 async def ping(self, timeout: float | None) -> bool: 

467 sleep_time = (timeout or 10) / 10 

468 

469 with anyio.move_on_after(timeout) as cancel_scope: 

470 if self._connection is None: 470 ↛ 471line 470 didn't jump to line 471 because the condition on line 470 was never true

471 return False 

472 

473 while True: 

474 if cancel_scope.cancel_called: 474 ↛ 475line 474 didn't jump to line 475 because the condition on line 474 was never true

475 return False 

476 

477 if self._connection.connected.is_set(): 477 ↛ 480line 477 didn't jump to line 480 because the condition on line 477 was always true

478 return True 

479 

480 await anyio.sleep(sleep_time) 

481 

482 return False