Coverage for faststream / rabbit / helpers / declarer.py: 98%

33 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import TYPE_CHECKING, Optional, Protocol, cast 

2 

3from faststream._internal.constants import EMPTY 

4 

5if TYPE_CHECKING: 

6 import aio_pika 

7 

8 from faststream.rabbit.schemas import Channel, RabbitExchange, RabbitQueue 

9 

10 from .channel_manager import ChannelManager 

11 

12 

13class RabbitDeclarer(Protocol): 

14 """An utility class to declare RabbitMQ queues and exchanges.""" 

15 

16 def disconnect(self) -> None: ... 

17 

18 async def declare_queue( 

19 self, 

20 queue: "RabbitQueue", 

21 declare: bool = EMPTY, 

22 *, 

23 channel: Optional["Channel"] = None, 

24 ) -> "aio_pika.RobustQueue": 

25 """Declare a queue.""" 

26 ... 

27 

28 async def declare_exchange( 

29 self, 

30 exchange: "RabbitExchange", 

31 declare: bool = EMPTY, 

32 *, 

33 channel: Optional["Channel"] = None, 

34 ) -> "aio_pika.RobustExchange": 

35 """Declare an exchange, parent exchanges and bind them each other.""" 

36 ... 

37 

38 

39class FakeRabbitDeclarer(RabbitDeclarer): 

40 def disconnect(self) -> None: 

41 raise NotImplementedError 

42 

43 async def declare_queue( 

44 self, 

45 queue: "RabbitQueue", 

46 declare: bool = EMPTY, 

47 *, 

48 channel: Optional["Channel"] = None, 

49 ) -> "aio_pika.RobustQueue": 

50 raise NotImplementedError 

51 

52 async def declare_exchange( 

53 self, 

54 exchange: "RabbitExchange", 

55 declare: bool = EMPTY, 

56 *, 

57 channel: Optional["Channel"] = None, 

58 ) -> "aio_pika.RobustExchange": 

59 raise NotImplementedError 

60 

61 

62class RabbitDeclarerImpl(RabbitDeclarer): 

63 __slots__ = ("__channel_manager", "__exchanges", "__queues") 

64 

65 def __init__(self, channel_manager: "ChannelManager") -> None: 

66 self.__channel_manager = channel_manager 

67 self._queues: dict[RabbitQueue, aio_pika.RobustQueue] = {} 

68 self._exchanges: dict[RabbitExchange, aio_pika.RobustExchange] = {} 

69 

70 def __repr__(self) -> str: 

71 return f"{self.__class__.__name__}(queues={list(self._queues.keys())}, exchanges={list(self._exchanges.keys())})" 

72 

73 def disconnect(self) -> None: 

74 self._queues.clear() 

75 self._exchanges.clear() 

76 

77 async def declare_queue( 

78 self, 

79 queue: "RabbitQueue", 

80 declare: bool = EMPTY, 

81 *, 

82 channel: Optional["Channel"] = None, 

83 ) -> "aio_pika.RobustQueue": 

84 if (q := self._queues.get(queue)) is None: 

85 if declare is EMPTY: 85 ↛ 88line 85 didn't jump to line 88 because the condition on line 85 was always true

86 declare = queue.declare 

87 

88 channel_obj = await self.__channel_manager.get_channel(channel) 

89 

90 self._queues[queue] = q = cast( 

91 "aio_pika.RobustQueue", 

92 await channel_obj.declare_queue( 

93 name=queue.name, 

94 durable=queue.durable, 

95 exclusive=queue.exclusive, 

96 passive=not declare, 

97 auto_delete=queue.auto_delete, 

98 arguments=queue.arguments, 

99 timeout=queue.timeout, 

100 robust=queue.robust, 

101 ), 

102 ) 

103 

104 return q 

105 

106 async def declare_exchange( 

107 self, 

108 exchange: "RabbitExchange", 

109 declare: bool = EMPTY, 

110 *, 

111 channel: Optional["Channel"] = None, 

112 ) -> "aio_pika.RobustExchange": 

113 channel_obj = await self.__channel_manager.get_channel(channel) 

114 

115 if not exchange.name: 

116 return channel_obj.default_exchange 

117 

118 if (exch := self._exchanges.get(exchange)) is None: 

119 if declare is EMPTY: 

120 declare = exchange.declare 

121 

122 self._exchanges[exchange] = exch = cast( 

123 "aio_pika.RobustExchange", 

124 await channel_obj.declare_exchange( 

125 name=exchange.name, 

126 type=exchange.type.value, 

127 durable=exchange.durable, 

128 auto_delete=exchange.auto_delete, 

129 passive=not declare, 

130 arguments=exchange.arguments, 

131 timeout=exchange.timeout, 

132 robust=exchange.robust, 

133 internal=False, # deprecated RMQ option 

134 ), 

135 ) 

136 

137 if exchange.bind_to is not None: 

138 parent = await self.declare_exchange(exchange.bind_to) 

139 await exch.bind( 

140 exchange=parent, 

141 routing_key=exchange.routing(), 

142 arguments=exchange.bind_arguments, 

143 timeout=exchange.timeout, 

144 robust=exchange.robust, 

145 ) 

146 

147 return exch