Coverage for faststream / mqtt / parser.py: 91%

29 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from contextlib import suppress 

2from typing import TYPE_CHECKING, Any 

3 

4import zmqtt 

5 

6from faststream._internal._compat import json_loads 

7from faststream.message import StreamMessage, decode_message 

8 

9from .message import MQTTMessage 

10 

11if TYPE_CHECKING: 

12 from faststream._internal.basic_types import DecodedMessage 

13 

14 

15class MQTTBaseParser: 

16 """Base parser for MQTT messages — shared parse + decode logic.""" 

17 

18 async def parse_message(self, msg: zmqtt.Message) -> MQTTMessage: 

19 raise NotImplementedError 

20 

21 async def decode_message(self, msg: "StreamMessage[Any]") -> "DecodedMessage": 

22 return decode_message(msg) 

23 

24 

25class MQTTParserV311(MQTTBaseParser): 

26 """Parser for MQTT 3.1.1 messages — raw payload, no metadata.""" 

27 

28 async def parse_message(self, msg: zmqtt.Message) -> MQTTMessage: 

29 return MQTTMessage( 

30 raw_message=msg, 

31 body=msg.payload, 

32 headers={}, 

33 content_type=None, 

34 reply_to="", 

35 correlation_id=None, 

36 ) 

37 

38 async def decode_message(self, msg: "StreamMessage[Any]") -> "DecodedMessage": 

39 body: bytes = msg.body 

40 with suppress(Exception): 

41 m: DecodedMessage = json_loads(body) 

42 return m 

43 with suppress(UnicodeDecodeError): 

44 return body.decode() 

45 return body 

46 

47 

48class MQTTParserV5(MQTTBaseParser): 

49 """Parser for MQTT 5.0 messages. 

50 

51 Extracts content_type, response_topic, correlation_data, and 

52 user_properties from PUBLISH properties when available. 

53 """ 

54 

55 async def parse_message(self, msg: zmqtt.Message) -> MQTTMessage: 

56 props = msg.properties 

57 content_type: str | None = None 

58 reply_to: str = "" 

59 correlation_id: str | None = None 

60 headers: dict[str, Any] = {} 

61 

62 if props is not None: 62 ↛ 69line 62 didn't jump to line 69 because the condition on line 62 was always true

63 content_type = props.content_type 

64 reply_to = props.response_topic or "" 

65 if props.correlation_data is not None: 65 ↛ 67line 65 didn't jump to line 67 because the condition on line 65 was always true

66 correlation_id = props.correlation_data.decode(errors="replace") 

67 headers.update(props.user_properties) 

68 

69 return MQTTMessage( 

70 raw_message=msg, 

71 body=msg.payload, 

72 headers=headers, 

73 content_type=content_type, 

74 reply_to=reply_to, 

75 correlation_id=correlation_id, 

76 )