Coverage for faststream / mqtt / publisher / producer.py: 82%
71 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import asyncio
2from typing import TYPE_CHECKING, Any, Optional
4import zmqtt
5from typing_extensions import override
6from zmqtt import PublishProperties
8from faststream._internal.endpoint.utils import ParserComposition
9from faststream._internal.producer import ProducerProto
10from faststream.exceptions import FeatureNotSupportedException, IncorrectState
11from faststream.message import encode_message, gen_cor_id
12from faststream.mqtt.parser import MQTTParserV5, MQTTParserV311
13from faststream.mqtt.response import MQTTPublishCommand
15if TYPE_CHECKING:
16 from fast_depends.library.serializer import SerializerProto
18 from faststream._internal.types import AsyncCallable, CustomCallable
21class ZmqttBaseProducer(ProducerProto[MQTTPublishCommand]):
22 _parser: "AsyncCallable"
23 _decoder: "AsyncCallable"
25 def __init__(
26 self,
27 default_parser: Any,
28 parser: Optional["CustomCallable"],
29 decoder: Optional["CustomCallable"],
30 ) -> None:
31 self.serializer: SerializerProto | None = None
32 self._client: zmqtt.MQTTClient | None = None
34 self._parser = ParserComposition(parser, default_parser.parse_message)
35 self._decoder = ParserComposition(decoder, default_parser.decode_message)
37 def connect(
38 self,
39 client: "zmqtt.MQTTClient",
40 serializer: Optional["SerializerProto"],
41 ) -> None:
42 self._client = client
43 self.serializer = serializer
45 def disconnect(self) -> None:
46 self._client = None
47 self.serializer = None
49 @property
50 def _connected_client(self) -> "zmqtt.MQTTClient":
51 if self._client is None: 51 ↛ 52line 51 didn't jump to line 52 because the condition on line 51 was never true
52 msg = "Producer is not connected. Call connect() first."
53 raise IncorrectState(msg)
54 return self._client
56 @override
57 async def publish(self, cmd: "MQTTPublishCommand") -> None:
58 raise NotImplementedError
60 @override
61 async def request(self, cmd: "MQTTPublishCommand") -> Any:
62 raise NotImplementedError
64 @override
65 async def publish_batch(self, cmd: "MQTTPublishCommand") -> None:
66 msg = "MQTT does not support batch publishing."
67 raise FeatureNotSupportedException(msg)
70class ZmqttProducerV311(ZmqttBaseProducer):
71 """Producer for MQTT 3.1.1 — publishes raw bytes only.
73 Headers, correlation_id, and other metadata are not supported.
74 Use MQTT 5.0 for those features. Request/reply is supported via
75 an explicit reply_to topic provided by the caller.
76 """
78 def __init__(
79 self,
80 parser: Optional["CustomCallable"],
81 decoder: Optional["CustomCallable"],
82 ) -> None:
83 super().__init__(MQTTParserV311(), parser, decoder)
85 @override
86 async def publish(self, cmd: "MQTTPublishCommand") -> None:
87 if cmd.headers:
88 msg = "MQTT 3.1.1 does not support message headers. Use MQTT 5.0."
89 raise FeatureNotSupportedException(msg)
90 payload, _ = encode_message(cmd.body, self.serializer)
91 await self._connected_client.publish(
92 cmd.destination,
93 payload,
94 qos=zmqtt.QoS(cmd.qos),
95 retain=cmd.retain,
96 )
98 @override
99 async def request(self, cmd: "MQTTPublishCommand") -> "zmqtt.Message":
100 """Request/reply for MQTT 3.1.1 via explicit reply topic.
102 The caller must supply ``cmd.reply_to``. FastStream subscribes to
103 that topic, publishes the raw request payload, then waits for the
104 first message on the reply topic. The handler side must publish
105 its response to the same topic (e.g. via ``@broker.publisher``).
106 """
107 if not cmd.reply_to: 107 ↛ 111line 107 didn't jump to line 111 because the condition on line 107 was always true
108 msg = "MQTT 3.1.1 request() requires an explicit reply_to topic."
109 raise FeatureNotSupportedException(msg)
111 sub = self._connected_client.subscribe(cmd.reply_to)
112 await sub.start()
114 try:
115 payload, _ = encode_message(cmd.body, self.serializer)
116 await self._connected_client.publish(
117 cmd.destination,
118 payload,
119 qos=cmd.qos,
120 retain=cmd.retain,
121 )
122 return await asyncio.wait_for(
123 sub.get_message(),
124 timeout=cmd.timeout or 30.0,
125 )
126 finally:
127 await sub.stop()
130class ZmqttProducerV5(ZmqttBaseProducer):
131 """Producer for MQTT 5.0 — publishes with PublishProperties."""
133 def __init__(
134 self,
135 parser: Optional["CustomCallable"],
136 decoder: Optional["CustomCallable"],
137 ) -> None:
138 super().__init__(MQTTParserV5(), parser, decoder)
140 @override
141 async def publish(self, cmd: "MQTTPublishCommand") -> None:
142 payload, content_type = encode_message(cmd.body, self.serializer)
144 user_props: list[tuple[str, str]] = [
145 (k, str(v)) for k, v in (cmd.headers or {}).items()
146 ]
148 properties = PublishProperties(
149 content_type=content_type or None,
150 response_topic=cmd.reply_to or None,
151 correlation_data=cmd.correlation_id.encode() if cmd.correlation_id else None,
152 user_properties=tuple(user_props),
153 message_expiry_interval=cmd.message_expiry_interval,
154 )
156 await self._connected_client.publish(
157 cmd.destination,
158 payload,
159 qos=cmd.qos,
160 retain=cmd.retain,
161 properties=properties,
162 )
164 @override
165 async def request(self, cmd: "MQTTPublishCommand") -> "zmqtt.Message":
166 """Request/reply for MQTT 5.0 via zmqtt's native client.request().
168 zmqtt auto-generates a unique reply topic. We pass our correlation
169 ID explicitly so the responder echoes it back and the caller can
170 verify it on the response StreamMessage.
171 """
172 payload, content_type = encode_message(cmd.body, self.serializer)
173 correlation_id = cmd.correlation_id or gen_cor_id()
175 user_props: list[tuple[str, str]] = [
176 (k, str(v)) for k, v in (cmd.headers or {}).items()
177 ]
179 # Pass correlation_data explicitly so the responder echoes it back.
180 # Do NOT set response_topic — let zmqtt generate it.
181 properties = PublishProperties(
182 content_type=content_type or None,
183 correlation_data=correlation_id.encode(),
184 user_properties=tuple(user_props),
185 message_expiry_interval=cmd.message_expiry_interval,
186 )
188 return await self._connected_client.request(
189 cmd.destination,
190 payload,
191 qos=cmd.qos,
192 timeout=cmd.timeout or 30.0,
193 properties=properties,
194 )
197class ZmqttFakeProducer(ZmqttBaseProducer):
198 def __init__(self) -> None: ...
199 def __bool__(self) -> bool:
200 return False
202 def connect(
203 self,
204 client: "zmqtt.MQTTClient",
205 serializer: Optional["SerializerProto"],
206 ) -> None:
207 raise NotImplementedError
209 def disconnect(self) -> None:
210 raise NotImplementedError