Coverage for faststream / mqtt / publisher / producer.py: 82%

71 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import asyncio 

2from typing import TYPE_CHECKING, Any, Optional 

3 

4import zmqtt 

5from typing_extensions import override 

6from zmqtt import PublishProperties 

7 

8from faststream._internal.endpoint.utils import ParserComposition 

9from faststream._internal.producer import ProducerProto 

10from faststream.exceptions import FeatureNotSupportedException, IncorrectState 

11from faststream.message import encode_message, gen_cor_id 

12from faststream.mqtt.parser import MQTTParserV5, MQTTParserV311 

13from faststream.mqtt.response import MQTTPublishCommand 

14 

15if TYPE_CHECKING: 

16 from fast_depends.library.serializer import SerializerProto 

17 

18 from faststream._internal.types import AsyncCallable, CustomCallable 

19 

20 

21class ZmqttBaseProducer(ProducerProto[MQTTPublishCommand]): 

22 _parser: "AsyncCallable" 

23 _decoder: "AsyncCallable" 

24 

25 def __init__( 

26 self, 

27 default_parser: Any, 

28 parser: Optional["CustomCallable"], 

29 decoder: Optional["CustomCallable"], 

30 ) -> None: 

31 self.serializer: SerializerProto | None = None 

32 self._client: zmqtt.MQTTClient | None = None 

33 

34 self._parser = ParserComposition(parser, default_parser.parse_message) 

35 self._decoder = ParserComposition(decoder, default_parser.decode_message) 

36 

37 def connect( 

38 self, 

39 client: "zmqtt.MQTTClient", 

40 serializer: Optional["SerializerProto"], 

41 ) -> None: 

42 self._client = client 

43 self.serializer = serializer 

44 

45 def disconnect(self) -> None: 

46 self._client = None 

47 self.serializer = None 

48 

49 @property 

50 def _connected_client(self) -> "zmqtt.MQTTClient": 

51 if self._client is None: 51 ↛ 52line 51 didn't jump to line 52 because the condition on line 51 was never true

52 msg = "Producer is not connected. Call connect() first." 

53 raise IncorrectState(msg) 

54 return self._client 

55 

56 @override 

57 async def publish(self, cmd: "MQTTPublishCommand") -> None: 

58 raise NotImplementedError 

59 

60 @override 

61 async def request(self, cmd: "MQTTPublishCommand") -> Any: 

62 raise NotImplementedError 

63 

64 @override 

65 async def publish_batch(self, cmd: "MQTTPublishCommand") -> None: 

66 msg = "MQTT does not support batch publishing." 

67 raise FeatureNotSupportedException(msg) 

68 

69 

70class ZmqttProducerV311(ZmqttBaseProducer): 

71 """Producer for MQTT 3.1.1 — publishes raw bytes only. 

72 

73 Headers, correlation_id, and other metadata are not supported. 

74 Use MQTT 5.0 for those features. Request/reply is supported via 

75 an explicit reply_to topic provided by the caller. 

76 """ 

77 

78 def __init__( 

79 self, 

80 parser: Optional["CustomCallable"], 

81 decoder: Optional["CustomCallable"], 

82 ) -> None: 

83 super().__init__(MQTTParserV311(), parser, decoder) 

84 

85 @override 

86 async def publish(self, cmd: "MQTTPublishCommand") -> None: 

87 if cmd.headers: 

88 msg = "MQTT 3.1.1 does not support message headers. Use MQTT 5.0." 

89 raise FeatureNotSupportedException(msg) 

90 payload, _ = encode_message(cmd.body, self.serializer) 

91 await self._connected_client.publish( 

92 cmd.destination, 

93 payload, 

94 qos=zmqtt.QoS(cmd.qos), 

95 retain=cmd.retain, 

96 ) 

97 

98 @override 

99 async def request(self, cmd: "MQTTPublishCommand") -> "zmqtt.Message": 

100 """Request/reply for MQTT 3.1.1 via explicit reply topic. 

101 

102 The caller must supply ``cmd.reply_to``. FastStream subscribes to 

103 that topic, publishes the raw request payload, then waits for the 

104 first message on the reply topic. The handler side must publish 

105 its response to the same topic (e.g. via ``@broker.publisher``). 

106 """ 

107 if not cmd.reply_to: 107 ↛ 111line 107 didn't jump to line 111 because the condition on line 107 was always true

108 msg = "MQTT 3.1.1 request() requires an explicit reply_to topic." 

109 raise FeatureNotSupportedException(msg) 

110 

111 sub = self._connected_client.subscribe(cmd.reply_to) 

112 await sub.start() 

113 

114 try: 

115 payload, _ = encode_message(cmd.body, self.serializer) 

116 await self._connected_client.publish( 

117 cmd.destination, 

118 payload, 

119 qos=cmd.qos, 

120 retain=cmd.retain, 

121 ) 

122 return await asyncio.wait_for( 

123 sub.get_message(), 

124 timeout=cmd.timeout or 30.0, 

125 ) 

126 finally: 

127 await sub.stop() 

128 

129 

130class ZmqttProducerV5(ZmqttBaseProducer): 

131 """Producer for MQTT 5.0 — publishes with PublishProperties.""" 

132 

133 def __init__( 

134 self, 

135 parser: Optional["CustomCallable"], 

136 decoder: Optional["CustomCallable"], 

137 ) -> None: 

138 super().__init__(MQTTParserV5(), parser, decoder) 

139 

140 @override 

141 async def publish(self, cmd: "MQTTPublishCommand") -> None: 

142 payload, content_type = encode_message(cmd.body, self.serializer) 

143 

144 user_props: list[tuple[str, str]] = [ 

145 (k, str(v)) for k, v in (cmd.headers or {}).items() 

146 ] 

147 

148 properties = PublishProperties( 

149 content_type=content_type or None, 

150 response_topic=cmd.reply_to or None, 

151 correlation_data=cmd.correlation_id.encode() if cmd.correlation_id else None, 

152 user_properties=tuple(user_props), 

153 message_expiry_interval=cmd.message_expiry_interval, 

154 ) 

155 

156 await self._connected_client.publish( 

157 cmd.destination, 

158 payload, 

159 qos=cmd.qos, 

160 retain=cmd.retain, 

161 properties=properties, 

162 ) 

163 

164 @override 

165 async def request(self, cmd: "MQTTPublishCommand") -> "zmqtt.Message": 

166 """Request/reply for MQTT 5.0 via zmqtt's native client.request(). 

167 

168 zmqtt auto-generates a unique reply topic. We pass our correlation 

169 ID explicitly so the responder echoes it back and the caller can 

170 verify it on the response StreamMessage. 

171 """ 

172 payload, content_type = encode_message(cmd.body, self.serializer) 

173 correlation_id = cmd.correlation_id or gen_cor_id() 

174 

175 user_props: list[tuple[str, str]] = [ 

176 (k, str(v)) for k, v in (cmd.headers or {}).items() 

177 ] 

178 

179 # Pass correlation_data explicitly so the responder echoes it back. 

180 # Do NOT set response_topic — let zmqtt generate it. 

181 properties = PublishProperties( 

182 content_type=content_type or None, 

183 correlation_data=correlation_id.encode(), 

184 user_properties=tuple(user_props), 

185 message_expiry_interval=cmd.message_expiry_interval, 

186 ) 

187 

188 return await self._connected_client.request( 

189 cmd.destination, 

190 payload, 

191 qos=cmd.qos, 

192 timeout=cmd.timeout or 30.0, 

193 properties=properties, 

194 ) 

195 

196 

197class ZmqttFakeProducer(ZmqttBaseProducer): 

198 def __init__(self) -> None: ... 

199 def __bool__(self) -> bool: 

200 return False 

201 

202 def connect( 

203 self, 

204 client: "zmqtt.MQTTClient", 

205 serializer: Optional["SerializerProto"], 

206 ) -> None: 

207 raise NotImplementedError 

208 

209 def disconnect(self) -> None: 

210 raise NotImplementedError