Coverage for faststream / mqtt / parser.py: 91%
29 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from contextlib import suppress
2from typing import TYPE_CHECKING, Any
4import zmqtt
6from faststream._internal._compat import json_loads
7from faststream.message import StreamMessage, decode_message
9from .message import MQTTMessage
11if TYPE_CHECKING:
12 from faststream._internal.basic_types import DecodedMessage
15class MQTTBaseParser:
16 """Base parser for MQTT messages — shared parse + decode logic."""
18 async def parse_message(self, msg: zmqtt.Message) -> MQTTMessage:
19 raise NotImplementedError
21 async def decode_message(self, msg: "StreamMessage[Any]") -> "DecodedMessage":
22 return decode_message(msg)
25class MQTTParserV311(MQTTBaseParser):
26 """Parser for MQTT 3.1.1 messages — raw payload, no metadata."""
28 async def parse_message(self, msg: zmqtt.Message) -> MQTTMessage:
29 return MQTTMessage(
30 raw_message=msg,
31 body=msg.payload,
32 headers={},
33 content_type=None,
34 reply_to="",
35 correlation_id=None,
36 )
38 async def decode_message(self, msg: "StreamMessage[Any]") -> "DecodedMessage":
39 body: bytes = msg.body
40 with suppress(Exception):
41 m: DecodedMessage = json_loads(body)
42 return m
43 with suppress(UnicodeDecodeError):
44 return body.decode()
45 return body
48class MQTTParserV5(MQTTBaseParser):
49 """Parser for MQTT 5.0 messages.
51 Extracts content_type, response_topic, correlation_data, and
52 user_properties from PUBLISH properties when available.
53 """
55 async def parse_message(self, msg: zmqtt.Message) -> MQTTMessage:
56 props = msg.properties
57 content_type: str | None = None
58 reply_to: str = ""
59 correlation_id: str | None = None
60 headers: dict[str, Any] = {}
62 if props is not None: 62 ↛ 69line 62 didn't jump to line 69 because the condition on line 62 was always true
63 content_type = props.content_type
64 reply_to = props.response_topic or ""
65 if props.correlation_data is not None: 65 ↛ 67line 65 didn't jump to line 67 because the condition on line 65 was always true
66 correlation_id = props.correlation_data.decode(errors="replace")
67 headers.update(props.user_properties)
69 return MQTTMessage(
70 raw_message=msg,
71 body=msg.payload,
72 headers=headers,
73 content_type=content_type,
74 reply_to=reply_to,
75 correlation_id=correlation_id,
76 )