Coverage for faststream / rabbit / response.py: 95%
34 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import TYPE_CHECKING, Union
3from typing_extensions import Unpack, override
5from faststream.rabbit.schemas.exchange import RabbitExchange
6from faststream.response import PublishCommand, Response
7from faststream.response.publish_type import PublishType
9if TYPE_CHECKING:
10 from aio_pika.abc import TimeoutType
12 from faststream.rabbit.publisher.options import (
13 BasicMessageOptions,
14 MessageOptions,
15 PublishOptions,
16 )
17 from faststream.rabbit.types import AioPikaSendableMessage
20class RabbitResponse(Response):
21 def __init__(
22 self,
23 body: "AioPikaSendableMessage",
24 *,
25 timeout: "TimeoutType" = None,
26 mandatory: bool = True,
27 immediate: bool = False,
28 exchange: RabbitExchange | str | None = None,
29 **message_options: Unpack["MessageOptions"],
30 ) -> None:
31 headers = message_options.pop("headers", {})
32 correlation_id = message_options.pop("correlation_id", None)
34 super().__init__(
35 body=body,
36 headers=headers,
37 correlation_id=correlation_id,
38 )
40 self.exchange = None if exchange is None else RabbitExchange.validate(exchange)
41 self.message_options: BasicMessageOptions = message_options
42 self.publish_options: PublishOptions = {
43 "mandatory": mandatory,
44 "immediate": immediate,
45 "timeout": timeout,
46 }
48 @override
49 def as_publish_command(self) -> "RabbitPublishCommand":
50 return RabbitPublishCommand(
51 message=self.body,
52 _publish_type=PublishType.PUBLISH,
53 routing_key="",
54 exchange=self.exchange,
55 **self.publish_options,
56 headers=self.headers,
57 correlation_id=self.correlation_id,
58 **self.message_options,
59 )
62class RabbitPublishCommand(PublishCommand):
63 def __init__(
64 self,
65 message: "AioPikaSendableMessage",
66 *,
67 _publish_type: PublishType,
68 routing_key: str = "",
69 exchange: RabbitExchange | None = None,
70 # publish kwargs
71 mandatory: bool = True,
72 immediate: bool = False,
73 timeout: "TimeoutType" = None,
74 **message_options: Unpack["MessageOptions"],
75 ) -> None:
76 headers = message_options.pop("headers", {})
77 reply_to = message_options.pop("reply_to", None) or ""
78 correlation_id = message_options.pop("correlation_id", None)
80 super().__init__(
81 body=message,
82 destination=routing_key,
83 correlation_id=correlation_id,
84 headers=headers,
85 reply_to=reply_to,
86 _publish_type=_publish_type,
87 )
89 self._exchange = exchange
91 self.timeout = timeout
93 self.message_options: BasicMessageOptions = message_options
94 self.publish_options: PublishOptions = {
95 "mandatory": mandatory,
96 "immediate": immediate,
97 }
99 @property
100 def exchange(self) -> RabbitExchange:
101 if self._exchange is not None: 101 ↛ 103line 101 didn't jump to line 103 because the condition on line 101 was always true
102 return self._exchange
103 return RabbitExchange()
105 @exchange.setter
106 def exchange(self, value: RabbitExchange) -> None:
107 self._exchange = value
109 @classmethod
110 def from_cmd(
111 cls,
112 cmd: Union["PublishCommand", "RabbitPublishCommand"],
113 ) -> "RabbitPublishCommand":
114 if isinstance(cmd, RabbitPublishCommand):
115 # NOTE: Should return a copy probably.
116 return cmd
118 return cls(
119 message=cmd.body,
120 routing_key=cmd.destination,
121 correlation_id=cmd.correlation_id,
122 headers=cmd.headers,
123 reply_to=cmd.reply_to,
124 _publish_type=cmd.publish_type,
125 )