Coverage for faststream / rabbit / publisher / usecase.py: 90%
61 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from collections.abc import Iterable
2from typing import TYPE_CHECKING, Any, Optional, Union
4from typing_extensions import Unpack, override
6from faststream._internal.endpoint.publisher import PublisherUsecase
7from faststream._internal.utils.data import filter_by_dict
8from faststream.message import gen_cor_id
9from faststream.rabbit.response import RabbitPublishCommand
10from faststream.rabbit.schemas import RabbitExchange, RabbitQueue
11from faststream.response.publish_type import PublishType
13from .options import BasicMessageOptions, PublishKwargs, PublishOptions
15if TYPE_CHECKING:
16 import aiormq
18 from faststream._internal.endpoint.publisher import PublisherSpecification
19 from faststream._internal.types import PublisherMiddleware
20 from faststream.rabbit.configs import RabbitBrokerConfig
21 from faststream.rabbit.message import RabbitMessage
22 from faststream.rabbit.types import AioPikaSendableMessage
23 from faststream.response.response import PublishCommand
25 from .config import RabbitPublisherConfig
28class RabbitPublisher(PublisherUsecase):
29 """A class to represent a RabbitMQ publisher."""
31 _outer_config: "RabbitBrokerConfig"
33 def __init__(
34 self,
35 config: "RabbitPublisherConfig",
36 specification: "PublisherSpecification[Any, Any]",
37 ) -> None:
38 super().__init__(config, specification)
40 self.queue = config.queue
41 self.routing_key = config.routing_key
43 self.exchange = config.exchange
45 self.headers = config.message_kwargs.pop("headers") or {}
46 self.reply_to = config.message_kwargs.pop("reply_to", None) or ""
47 self.timeout = config.message_kwargs.pop("timeout", None)
49 message_options, _ = filter_by_dict(
50 BasicMessageOptions,
51 dict(config.message_kwargs),
52 )
53 self._message_options = message_options
55 publish_options, _ = filter_by_dict(PublishOptions, dict(config.message_kwargs))
56 self.publish_options = publish_options
58 @property
59 def message_options(self) -> "BasicMessageOptions":
60 if self._outer_config.app_id and "app_id" not in self._message_options: 60 ↛ 65line 60 didn't jump to line 65 because the condition on line 60 was always true
61 message_options = self._message_options.copy()
62 message_options["app_id"] = self._outer_config.app_id
63 return message_options
65 return self._message_options
67 def routing(
68 self,
69 *,
70 queue: Union["RabbitQueue", str, None] = None,
71 routing_key: str = "",
72 ) -> str:
73 if not routing_key:
74 if q := RabbitQueue.validate(queue):
75 routing_key = q.routing()
76 else:
77 r = self.routing_key or self.queue.routing()
78 routing_key = f"{self._outer_config.prefix}{r}"
80 return routing_key
82 async def start(self) -> None:
83 if self.exchange is not None: 83 ↛ 85line 83 didn't jump to line 85 because the condition on line 83 was always true
84 await self._outer_config.declarer.declare_exchange(self.exchange)
85 return await super().start()
87 @override
88 async def publish(
89 self,
90 message: "AioPikaSendableMessage",
91 queue: Union["RabbitQueue", str, None] = None,
92 exchange: Union["RabbitExchange", str, None] = None,
93 *,
94 routing_key: str = "",
95 **publish_kwargs: "Unpack[PublishKwargs]",
96 ) -> Optional["aiormq.abc.ConfirmationFrameType"]:
97 if "headers" in publish_kwargs: 97 ↛ 98line 97 didn't jump to line 98 because the condition on line 97 was never true
98 headers = self.headers | (publish_kwargs.pop("headers") or {})
99 else:
100 headers = self.headers
102 correlation_id = publish_kwargs.pop("correlation_id", gen_cor_id())
104 cmd = RabbitPublishCommand(
105 message,
106 routing_key=self.routing(queue=queue, routing_key=routing_key),
107 exchange=RabbitExchange.validate(exchange or self.exchange),
108 headers=headers,
109 correlation_id=correlation_id,
110 _publish_type=PublishType.PUBLISH,
111 **(self.publish_options | self.message_options | publish_kwargs), # type: ignore[operator]
112 )
114 frame: aiormq.abc.ConfirmationFrameType | None = await self._basic_publish(
115 cmd,
116 producer=self._outer_config.producer,
117 _extra_middlewares=(),
118 )
119 return frame
121 @override
122 async def _publish(
123 self,
124 cmd: Union["RabbitPublishCommand", "PublishCommand"],
125 *,
126 _extra_middlewares: Iterable["PublisherMiddleware"],
127 ) -> None:
128 """This method should be called in subscriber flow only."""
129 cmd = RabbitPublishCommand.from_cmd(cmd)
131 cmd.exchange = RabbitExchange.validate(cmd._exchange or self.exchange)
133 cmd.destination = self.routing()
134 cmd.reply_to = cmd.reply_to or self.reply_to
135 cmd.add_headers(self.headers, override=False)
137 cmd.timeout = cmd.timeout or self.timeout
139 cmd.message_options = {**self.message_options, **cmd.message_options}
140 cmd.publish_options = {**self.publish_options, **cmd.publish_options}
142 await self._basic_publish(
143 cmd,
144 producer=self._outer_config.producer,
145 _extra_middlewares=_extra_middlewares,
146 )
148 @override
149 async def request(
150 self,
151 message: "AioPikaSendableMessage",
152 queue: Union["RabbitQueue", str, None] = None,
153 exchange: Union["RabbitExchange", str, None] = None,
154 *,
155 routing_key: str = "",
156 **publish_kwargs: "Unpack[PublishKwargs]",
157 ) -> "RabbitMessage":
158 if "headers" in publish_kwargs: 158 ↛ 159line 158 didn't jump to line 159 because the condition on line 158 was never true
159 headers = self.headers | (publish_kwargs.pop("headers") or {})
160 else:
161 headers = self.headers
163 correlation_id = publish_kwargs.pop("correlation_id", gen_cor_id())
165 cmd = RabbitPublishCommand(
166 message,
167 routing_key=self.routing(queue=queue, routing_key=routing_key),
168 exchange=RabbitExchange.validate(exchange or self.exchange),
169 correlation_id=correlation_id,
170 headers=headers,
171 _publish_type=PublishType.PUBLISH,
172 **(self.publish_options | self.message_options | publish_kwargs), # type: ignore[operator]
173 )
175 msg: RabbitMessage = await self._basic_request(
176 cmd,
177 producer=self._outer_config.producer,
178 )
179 return msg