Coverage for faststream / mqtt / response.py: 73%

20 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import TYPE_CHECKING, Union 

2 

3from typing_extensions import override 

4from zmqtt import QoS 

5 

6from faststream.response.publish_type import PublishType 

7from faststream.response.response import PublishCommand, Response 

8 

9if TYPE_CHECKING: 

10 from faststream._internal.basic_types import SendableMessage 

11 

12 

13class MQTTResponse(Response): 

14 def __init__( 

15 self, 

16 body: "SendableMessage", 

17 *, 

18 headers: dict[str, str] | None = None, 

19 correlation_id: str | None = None, 

20 qos: QoS = QoS.AT_MOST_ONCE, 

21 retain: bool = False, 

22 ) -> None: 

23 super().__init__(body=body, headers=headers, correlation_id=correlation_id) 

24 self.qos = qos 

25 self.retain = retain 

26 

27 @override 

28 def as_publish_command(self) -> "MQTTPublishCommand": 

29 return MQTTPublishCommand( 

30 message=self.body, 

31 headers=self.headers, 

32 correlation_id=self.correlation_id, 

33 _publish_type=PublishType.PUBLISH, 

34 topic="", 

35 qos=self.qos, 

36 retain=self.retain, 

37 ) 

38 

39 

40class MQTTPublishCommand(PublishCommand): 

41 def __init__( 

42 self, 

43 message: "SendableMessage", 

44 *, 

45 topic: str = "", 

46 correlation_id: str | None = None, 

47 headers: dict[str, str] | None = None, 

48 reply_to: str = "", 

49 qos: QoS = QoS.AT_MOST_ONCE, 

50 retain: bool = False, 

51 message_expiry_interval: int | None = None, 

52 timeout: float | None = 30.0, 

53 _publish_type: PublishType, 

54 ) -> None: 

55 super().__init__( 

56 body=message, 

57 destination=topic, 

58 correlation_id=correlation_id, 

59 headers=headers, 

60 reply_to=reply_to, 

61 _publish_type=_publish_type, 

62 ) 

63 self.qos = qos 

64 self.retain = retain 

65 self.message_expiry_interval = message_expiry_interval 

66 self.timeout = timeout 

67 

68 @classmethod 

69 def from_cmd( 

70 cls, 

71 cmd: Union["PublishCommand", "MQTTPublishCommand"], 

72 ) -> "MQTTPublishCommand": 

73 if isinstance(cmd, MQTTPublishCommand): 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true

74 return cmd 

75 

76 return cls( 

77 message=cmd.body, 

78 topic=cmd.destination, 

79 correlation_id=cmd.correlation_id, 

80 headers=cmd.headers, 

81 reply_to=cmd.reply_to, 

82 timeout=getattr(cmd, "timeout", None), 

83 _publish_type=cmd.publish_type, 

84 ) 

85 

86 def __repr__(self) -> str: 

87 body = [ 

88 f"body='{self.body}'", 

89 f"topic='{self.destination}'", 

90 f"qos={self.qos}", 

91 ] 

92 if self.retain: 

93 body.append("retain=True") 

94 if self.reply_to: 

95 body.append(f"reply_to='{self.reply_to}'") 

96 body.extend(( 

97 f"headers={self.headers}", 

98 f"correlation_id='{self.correlation_id}'", 

99 )) 

100 return f"{self.__class__.__name__}({', '.join(body)})"