Coverage for faststream / mqtt / response.py: 73%
20 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import TYPE_CHECKING, Union
3from typing_extensions import override
4from zmqtt import QoS
6from faststream.response.publish_type import PublishType
7from faststream.response.response import PublishCommand, Response
9if TYPE_CHECKING:
10 from faststream._internal.basic_types import SendableMessage
13class MQTTResponse(Response):
14 def __init__(
15 self,
16 body: "SendableMessage",
17 *,
18 headers: dict[str, str] | None = None,
19 correlation_id: str | None = None,
20 qos: QoS = QoS.AT_MOST_ONCE,
21 retain: bool = False,
22 ) -> None:
23 super().__init__(body=body, headers=headers, correlation_id=correlation_id)
24 self.qos = qos
25 self.retain = retain
27 @override
28 def as_publish_command(self) -> "MQTTPublishCommand":
29 return MQTTPublishCommand(
30 message=self.body,
31 headers=self.headers,
32 correlation_id=self.correlation_id,
33 _publish_type=PublishType.PUBLISH,
34 topic="",
35 qos=self.qos,
36 retain=self.retain,
37 )
40class MQTTPublishCommand(PublishCommand):
41 def __init__(
42 self,
43 message: "SendableMessage",
44 *,
45 topic: str = "",
46 correlation_id: str | None = None,
47 headers: dict[str, str] | None = None,
48 reply_to: str = "",
49 qos: QoS = QoS.AT_MOST_ONCE,
50 retain: bool = False,
51 message_expiry_interval: int | None = None,
52 timeout: float | None = 30.0,
53 _publish_type: PublishType,
54 ) -> None:
55 super().__init__(
56 body=message,
57 destination=topic,
58 correlation_id=correlation_id,
59 headers=headers,
60 reply_to=reply_to,
61 _publish_type=_publish_type,
62 )
63 self.qos = qos
64 self.retain = retain
65 self.message_expiry_interval = message_expiry_interval
66 self.timeout = timeout
68 @classmethod
69 def from_cmd(
70 cls,
71 cmd: Union["PublishCommand", "MQTTPublishCommand"],
72 ) -> "MQTTPublishCommand":
73 if isinstance(cmd, MQTTPublishCommand): 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true
74 return cmd
76 return cls(
77 message=cmd.body,
78 topic=cmd.destination,
79 correlation_id=cmd.correlation_id,
80 headers=cmd.headers,
81 reply_to=cmd.reply_to,
82 timeout=getattr(cmd, "timeout", None),
83 _publish_type=cmd.publish_type,
84 )
86 def __repr__(self) -> str:
87 body = [
88 f"body='{self.body}'",
89 f"topic='{self.destination}'",
90 f"qos={self.qos}",
91 ]
92 if self.retain:
93 body.append("retain=True")
94 if self.reply_to:
95 body.append(f"reply_to='{self.reply_to}'")
96 body.extend((
97 f"headers={self.headers}",
98 f"correlation_id='{self.correlation_id}'",
99 ))
100 return f"{self.__class__.__name__}({', '.join(body)})"