Coverage for faststream / mqtt / broker / registrator.py: 87%
19 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from collections.abc import Iterable, Sequence
2from typing import TYPE_CHECKING, Any, Optional, cast
4from typing_extensions import override
5from zmqtt import QoS
7from faststream._internal.broker.registrator import Registrator
8from faststream._internal.constants import EMPTY
9from faststream.middlewares import AckPolicy
10from faststream.mqtt.broker.config import MQTTBrokerConfig
11from faststream.mqtt.publisher.factory import create_publisher
12from faststream.mqtt.subscriber.factory import create_subscriber
14if TYPE_CHECKING:
15 import zmqtt # noqa: F401
16 from fast_depends.dependencies import Dependant
18 from faststream._internal.parser import CodecProto
19 from faststream._internal.types import (
20 BrokerMiddleware,
21 CustomCallable,
22 )
23 from faststream.mqtt.publisher.usecase import MQTTPublisher
24 from faststream.mqtt.subscriber.usecase import (
25 MQTTConcurrentSubscriber,
26 MQTTDefaultSubscriber,
27 )
30class MQTTRegistrator(Registrator["zmqtt.Message", MQTTBrokerConfig]):
31 """Includable to MQTTBroker router."""
33 @override
34 def subscriber( # type: ignore[override]
35 self,
36 topic: str,
37 *,
38 qos: QoS = QoS.AT_MOST_ONCE,
39 shared: str | None = None,
40 # broker arguments
41 ack_policy: AckPolicy = EMPTY,
42 no_reply: bool = False,
43 dependencies: Iterable["Dependant"] = (),
44 parser: Optional["CustomCallable"] = None,
45 decoder: Optional["CustomCallable"] = None,
46 codec: Optional["CodecProto"] = None,
47 max_workers: int = 1,
48 persistent: bool = True,
49 # AsyncAPI information
50 title: str | None = None,
51 description: str | None = None,
52 include_in_schema: bool = True,
53 ) -> "MQTTDefaultSubscriber | MQTTConcurrentSubscriber":
54 """Subscribe a handler to an MQTT topic.
56 Args:
57 topic: MQTT topic filter. Wildcards ``+`` (single level) and
58 ``#`` (multi-level) are supported.
59 qos: QoS level for the subscription (0, 1, or 2).
60 shared: Optional shared subscription group name. When set,
61 subscribes as ``$share/<group>/<topic>``.
62 ack_policy: Acknowledgement policy for message processing.
63 no_reply: Whether to disable FastStream RPC / reply-to responses.
64 dependencies: Dependencies list to apply to the subscriber.
65 parser: Custom parser to map raw messages to FastStream ones.
66 decoder: Function to decode FastStream message bytes to Python objects.
67 codec: Custom codec object.
68 max_workers: Number of workers to process messages concurrently.
69 persistent: Whether to retain the subscriber across broker restarts.
70 title: AsyncAPI subscriber object title.
71 description: AsyncAPI subscriber object description.
72 include_in_schema: Whether to include operation in AsyncAPI schema.
73 """
74 subscriber = create_subscriber(
75 topic=topic,
76 qos=qos,
77 shared=shared,
78 ack_policy=ack_policy,
79 no_reply=no_reply,
80 config=cast("MQTTBrokerConfig", self.config),
81 max_workers=max_workers,
82 title_=title,
83 description_=description,
84 include_in_schema=include_in_schema,
85 )
87 super().subscriber(subscriber, persistent=persistent)
89 return subscriber.add_call(
90 parser_=parser or self._parser,
91 decoder_=decoder or self._decoder,
92 codec_=codec,
93 dependencies_=dependencies,
94 )
96 @override
97 def publisher( # type: ignore[override]
98 self,
99 topic: str,
100 *,
101 qos: QoS = QoS.AT_MOST_ONCE,
102 retain: bool = False,
103 headers: dict[str, str] | None = None,
104 persistent: bool = True,
105 # AsyncAPI information
106 title: str | None = None,
107 description: str | None = None,
108 schema: Any | None = None,
109 include_in_schema: bool = True,
110 ) -> "MQTTPublisher":
111 """Create a persistent publisher object for the given MQTT topic.
113 Args:
114 topic: MQTT topic to publish to. Must not contain wildcards.
115 qos: QoS level for published messages (0, 1, or 2).
116 retain: Whether the broker should retain the last message.
117 headers: Default headers to include in every published message.
118 persistent: Whether to retain the publisher across broker restarts.
119 title: AsyncAPI publisher object title.
120 description: AsyncAPI publisher object description.
121 schema: AsyncAPI publishing message type.
122 include_in_schema: Whether to include operation in AsyncAPI schema.
123 """
124 publisher = create_publisher(
125 topic=topic,
126 qos=qos,
127 retain=retain,
128 headers=headers,
129 broker_config=cast("MQTTBrokerConfig", self.config),
130 title_=title,
131 description_=description,
132 schema_=schema,
133 include_in_schema=include_in_schema,
134 )
135 super().publisher(publisher, persistent=persistent)
136 return publisher
138 @override
139 def include_router(
140 self,
141 router: "MQTTRegistrator", # type: ignore[override]
142 *,
143 prefix: str = "",
144 dependencies: Iterable["Dependant"] = (),
145 middlewares: Sequence["BrokerMiddleware[Any, Any]"] = (),
146 include_in_schema: bool | None = None,
147 ) -> None:
148 if not isinstance(router, MQTTRegistrator): 148 ↛ 149line 148 didn't jump to line 149 because the condition on line 148 was never true
149 msg = (
150 f"Router must be an instance of MQTTRegistrator, "
151 f"got {type(router).__name__} instead."
152 )
153 raise TypeError(msg)
155 super().include_router(
156 router,
157 prefix=prefix,
158 dependencies=dependencies,
159 middlewares=middlewares,
160 include_in_schema=include_in_schema,
161 )
163 for m in router.config.broker_middlewares:
164 router.config._validate_middleware(m)