Coverage for faststream / rabbit / publisher / usecase.py: 90%

61 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from collections.abc import Iterable 

2from typing import TYPE_CHECKING, Any, Optional, Union 

3 

4from typing_extensions import Unpack, override 

5 

6from faststream._internal.endpoint.publisher import PublisherUsecase 

7from faststream._internal.utils.data import filter_by_dict 

8from faststream.message import gen_cor_id 

9from faststream.rabbit.response import RabbitPublishCommand 

10from faststream.rabbit.schemas import RabbitExchange, RabbitQueue 

11from faststream.response.publish_type import PublishType 

12 

13from .options import BasicMessageOptions, PublishKwargs, PublishOptions 

14 

15if TYPE_CHECKING: 

16 import aiormq 

17 

18 from faststream._internal.endpoint.publisher import PublisherSpecification 

19 from faststream._internal.types import PublisherMiddleware 

20 from faststream.rabbit.configs import RabbitBrokerConfig 

21 from faststream.rabbit.message import RabbitMessage 

22 from faststream.rabbit.types import AioPikaSendableMessage 

23 from faststream.response.response import PublishCommand 

24 

25 from .config import RabbitPublisherConfig 

26 

27 

28class RabbitPublisher(PublisherUsecase): 

29 """A class to represent a RabbitMQ publisher.""" 

30 

31 _outer_config: "RabbitBrokerConfig" 

32 

33 def __init__( 

34 self, 

35 config: "RabbitPublisherConfig", 

36 specification: "PublisherSpecification[Any, Any]", 

37 ) -> None: 

38 super().__init__(config, specification) 

39 

40 self.queue = config.queue 

41 self.routing_key = config.routing_key 

42 

43 self.exchange = config.exchange 

44 

45 self.headers = config.message_kwargs.pop("headers") or {} 

46 self.reply_to = config.message_kwargs.pop("reply_to", None) or "" 

47 self.timeout = config.message_kwargs.pop("timeout", None) 

48 

49 message_options, _ = filter_by_dict( 

50 BasicMessageOptions, 

51 dict(config.message_kwargs), 

52 ) 

53 self._message_options = message_options 

54 

55 publish_options, _ = filter_by_dict(PublishOptions, dict(config.message_kwargs)) 

56 self.publish_options = publish_options 

57 

58 @property 

59 def message_options(self) -> "BasicMessageOptions": 

60 if self._outer_config.app_id and "app_id" not in self._message_options: 60 ↛ 65line 60 didn't jump to line 65 because the condition on line 60 was always true

61 message_options = self._message_options.copy() 

62 message_options["app_id"] = self._outer_config.app_id 

63 return message_options 

64 

65 return self._message_options 

66 

67 def routing( 

68 self, 

69 *, 

70 queue: Union["RabbitQueue", str, None] = None, 

71 routing_key: str = "", 

72 ) -> str: 

73 if not routing_key: 

74 if q := RabbitQueue.validate(queue): 

75 routing_key = q.routing() 

76 else: 

77 r = self.routing_key or self.queue.routing() 

78 routing_key = f"{self._outer_config.prefix}{r}" 

79 

80 return routing_key 

81 

82 async def start(self) -> None: 

83 if self.exchange is not None: 83 ↛ 85line 83 didn't jump to line 85 because the condition on line 83 was always true

84 await self._outer_config.declarer.declare_exchange(self.exchange) 

85 return await super().start() 

86 

87 @override 

88 async def publish( 

89 self, 

90 message: "AioPikaSendableMessage", 

91 queue: Union["RabbitQueue", str, None] = None, 

92 exchange: Union["RabbitExchange", str, None] = None, 

93 *, 

94 routing_key: str = "", 

95 **publish_kwargs: "Unpack[PublishKwargs]", 

96 ) -> Optional["aiormq.abc.ConfirmationFrameType"]: 

97 if "headers" in publish_kwargs: 97 ↛ 98line 97 didn't jump to line 98 because the condition on line 97 was never true

98 headers = self.headers | (publish_kwargs.pop("headers") or {}) 

99 else: 

100 headers = self.headers 

101 

102 correlation_id = publish_kwargs.pop("correlation_id", gen_cor_id()) 

103 

104 cmd = RabbitPublishCommand( 

105 message, 

106 routing_key=self.routing(queue=queue, routing_key=routing_key), 

107 exchange=RabbitExchange.validate(exchange or self.exchange), 

108 headers=headers, 

109 correlation_id=correlation_id, 

110 _publish_type=PublishType.PUBLISH, 

111 **(self.publish_options | self.message_options | publish_kwargs), # type: ignore[operator] 

112 ) 

113 

114 frame: aiormq.abc.ConfirmationFrameType | None = await self._basic_publish( 

115 cmd, 

116 producer=self._outer_config.producer, 

117 _extra_middlewares=(), 

118 ) 

119 return frame 

120 

121 @override 

122 async def _publish( 

123 self, 

124 cmd: Union["RabbitPublishCommand", "PublishCommand"], 

125 *, 

126 _extra_middlewares: Iterable["PublisherMiddleware"], 

127 ) -> None: 

128 """This method should be called in subscriber flow only.""" 

129 cmd = RabbitPublishCommand.from_cmd(cmd) 

130 

131 cmd.exchange = RabbitExchange.validate(cmd._exchange or self.exchange) 

132 

133 cmd.destination = self.routing() 

134 cmd.reply_to = cmd.reply_to or self.reply_to 

135 cmd.add_headers(self.headers, override=False) 

136 

137 cmd.timeout = cmd.timeout or self.timeout 

138 

139 cmd.message_options = {**self.message_options, **cmd.message_options} 

140 cmd.publish_options = {**self.publish_options, **cmd.publish_options} 

141 

142 await self._basic_publish( 

143 cmd, 

144 producer=self._outer_config.producer, 

145 _extra_middlewares=_extra_middlewares, 

146 ) 

147 

148 @override 

149 async def request( 

150 self, 

151 message: "AioPikaSendableMessage", 

152 queue: Union["RabbitQueue", str, None] = None, 

153 exchange: Union["RabbitExchange", str, None] = None, 

154 *, 

155 routing_key: str = "", 

156 **publish_kwargs: "Unpack[PublishKwargs]", 

157 ) -> "RabbitMessage": 

158 if "headers" in publish_kwargs: 158 ↛ 159line 158 didn't jump to line 159 because the condition on line 158 was never true

159 headers = self.headers | (publish_kwargs.pop("headers") or {}) 

160 else: 

161 headers = self.headers 

162 

163 correlation_id = publish_kwargs.pop("correlation_id", gen_cor_id()) 

164 

165 cmd = RabbitPublishCommand( 

166 message, 

167 routing_key=self.routing(queue=queue, routing_key=routing_key), 

168 exchange=RabbitExchange.validate(exchange or self.exchange), 

169 correlation_id=correlation_id, 

170 headers=headers, 

171 _publish_type=PublishType.PUBLISH, 

172 **(self.publish_options | self.message_options | publish_kwargs), # type: ignore[operator] 

173 ) 

174 

175 msg: RabbitMessage = await self._basic_request( 

176 cmd, 

177 producer=self._outer_config.producer, 

178 ) 

179 return msg