Coverage for faststream / mqtt / broker / registrator.py: 87%

19 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from collections.abc import Iterable, Sequence 

2from typing import TYPE_CHECKING, Any, Optional, cast 

3 

4from typing_extensions import override 

5from zmqtt import QoS 

6 

7from faststream._internal.broker.registrator import Registrator 

8from faststream._internal.constants import EMPTY 

9from faststream.middlewares import AckPolicy 

10from faststream.mqtt.broker.config import MQTTBrokerConfig 

11from faststream.mqtt.publisher.factory import create_publisher 

12from faststream.mqtt.subscriber.factory import create_subscriber 

13 

14if TYPE_CHECKING: 

15 import zmqtt # noqa: F401 

16 from fast_depends.dependencies import Dependant 

17 

18 from faststream._internal.parser import CodecProto 

19 from faststream._internal.types import ( 

20 BrokerMiddleware, 

21 CustomCallable, 

22 ) 

23 from faststream.mqtt.publisher.usecase import MQTTPublisher 

24 from faststream.mqtt.subscriber.usecase import ( 

25 MQTTConcurrentSubscriber, 

26 MQTTDefaultSubscriber, 

27 ) 

28 

29 

30class MQTTRegistrator(Registrator["zmqtt.Message", MQTTBrokerConfig]): 

31 """Includable to MQTTBroker router.""" 

32 

33 @override 

34 def subscriber( # type: ignore[override] 

35 self, 

36 topic: str, 

37 *, 

38 qos: QoS = QoS.AT_MOST_ONCE, 

39 shared: str | None = None, 

40 # broker arguments 

41 ack_policy: AckPolicy = EMPTY, 

42 no_reply: bool = False, 

43 dependencies: Iterable["Dependant"] = (), 

44 parser: Optional["CustomCallable"] = None, 

45 decoder: Optional["CustomCallable"] = None, 

46 codec: Optional["CodecProto"] = None, 

47 max_workers: int = 1, 

48 persistent: bool = True, 

49 # AsyncAPI information 

50 title: str | None = None, 

51 description: str | None = None, 

52 include_in_schema: bool = True, 

53 ) -> "MQTTDefaultSubscriber | MQTTConcurrentSubscriber": 

54 """Subscribe a handler to an MQTT topic. 

55 

56 Args: 

57 topic: MQTT topic filter. Wildcards ``+`` (single level) and 

58 ``#`` (multi-level) are supported. 

59 qos: QoS level for the subscription (0, 1, or 2). 

60 shared: Optional shared subscription group name. When set, 

61 subscribes as ``$share/<group>/<topic>``. 

62 ack_policy: Acknowledgement policy for message processing. 

63 no_reply: Whether to disable FastStream RPC / reply-to responses. 

64 dependencies: Dependencies list to apply to the subscriber. 

65 parser: Custom parser to map raw messages to FastStream ones. 

66 decoder: Function to decode FastStream message bytes to Python objects. 

67 codec: Custom codec object. 

68 max_workers: Number of workers to process messages concurrently. 

69 persistent: Whether to retain the subscriber across broker restarts. 

70 title: AsyncAPI subscriber object title. 

71 description: AsyncAPI subscriber object description. 

72 include_in_schema: Whether to include operation in AsyncAPI schema. 

73 """ 

74 subscriber = create_subscriber( 

75 topic=topic, 

76 qos=qos, 

77 shared=shared, 

78 ack_policy=ack_policy, 

79 no_reply=no_reply, 

80 config=cast("MQTTBrokerConfig", self.config), 

81 max_workers=max_workers, 

82 title_=title, 

83 description_=description, 

84 include_in_schema=include_in_schema, 

85 ) 

86 

87 super().subscriber(subscriber, persistent=persistent) 

88 

89 return subscriber.add_call( 

90 parser_=parser or self._parser, 

91 decoder_=decoder or self._decoder, 

92 codec_=codec, 

93 dependencies_=dependencies, 

94 ) 

95 

96 @override 

97 def publisher( # type: ignore[override] 

98 self, 

99 topic: str, 

100 *, 

101 qos: QoS = QoS.AT_MOST_ONCE, 

102 retain: bool = False, 

103 headers: dict[str, str] | None = None, 

104 persistent: bool = True, 

105 # AsyncAPI information 

106 title: str | None = None, 

107 description: str | None = None, 

108 schema: Any | None = None, 

109 include_in_schema: bool = True, 

110 ) -> "MQTTPublisher": 

111 """Create a persistent publisher object for the given MQTT topic. 

112 

113 Args: 

114 topic: MQTT topic to publish to. Must not contain wildcards. 

115 qos: QoS level for published messages (0, 1, or 2). 

116 retain: Whether the broker should retain the last message. 

117 headers: Default headers to include in every published message. 

118 persistent: Whether to retain the publisher across broker restarts. 

119 title: AsyncAPI publisher object title. 

120 description: AsyncAPI publisher object description. 

121 schema: AsyncAPI publishing message type. 

122 include_in_schema: Whether to include operation in AsyncAPI schema. 

123 """ 

124 publisher = create_publisher( 

125 topic=topic, 

126 qos=qos, 

127 retain=retain, 

128 headers=headers, 

129 broker_config=cast("MQTTBrokerConfig", self.config), 

130 title_=title, 

131 description_=description, 

132 schema_=schema, 

133 include_in_schema=include_in_schema, 

134 ) 

135 super().publisher(publisher, persistent=persistent) 

136 return publisher 

137 

138 @override 

139 def include_router( 

140 self, 

141 router: "MQTTRegistrator", # type: ignore[override] 

142 *, 

143 prefix: str = "", 

144 dependencies: Iterable["Dependant"] = (), 

145 middlewares: Sequence["BrokerMiddleware[Any, Any]"] = (), 

146 include_in_schema: bool | None = None, 

147 ) -> None: 

148 if not isinstance(router, MQTTRegistrator): 148 ↛ 149line 148 didn't jump to line 149 because the condition on line 148 was never true

149 msg = ( 

150 f"Router must be an instance of MQTTRegistrator, " 

151 f"got {type(router).__name__} instead." 

152 ) 

153 raise TypeError(msg) 

154 

155 super().include_router( 

156 router, 

157 prefix=prefix, 

158 dependencies=dependencies, 

159 middlewares=middlewares, 

160 include_in_schema=include_in_schema, 

161 ) 

162 

163 for m in router.config.broker_middlewares: 

164 router.config._validate_middleware(m)