Coverage for faststream / mqtt / subscriber / factory.py: 82%
9 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import TYPE_CHECKING, Any
3from zmqtt import QoS
5from faststream._internal.endpoint.subscriber.call_item import CallsCollection
7from .config import MQTTSubscriberConfig, MQTTSubscriberSpecificationConfig
8from .specification import MQTTSubscriberSpecification
9from .usecase import MQTTConcurrentSubscriber, MQTTDefaultSubscriber
11if TYPE_CHECKING:
12 from faststream.middlewares import AckPolicy
13 from faststream.mqtt.broker.config import MQTTBrokerConfig
15SubscriberType = MQTTDefaultSubscriber | MQTTConcurrentSubscriber
18def create_subscriber(
19 *,
20 topic: str,
21 qos: QoS,
22 shared: str | None,
23 # Subscriber args
24 ack_policy: "AckPolicy",
25 no_reply: bool,
26 config: "MQTTBrokerConfig",
27 max_workers: int = 1,
28 # AsyncAPI args
29 title_: str | None = None,
30 description_: str | None = None,
31 include_in_schema: bool = True,
32) -> SubscriberType:
33 subscriber_config = MQTTSubscriberConfig(
34 topic=topic,
35 qos=qos,
36 shared=shared,
37 no_reply=no_reply,
38 _outer_config=config,
39 _ack_policy=ack_policy,
40 )
42 specification_config = MQTTSubscriberSpecificationConfig(
43 topic=topic,
44 qos=qos,
45 shared=shared,
46 title_=title_,
47 description_=description_,
48 include_in_schema=include_in_schema,
49 )
51 calls = CallsCollection[Any]()
53 specification = MQTTSubscriberSpecification(
54 config,
55 specification_config,
56 calls,
57 )
59 if max_workers > 1: 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true
60 return MQTTConcurrentSubscriber(
61 subscriber_config,
62 specification,
63 calls,
64 max_workers=max_workers,
65 )
67 return MQTTDefaultSubscriber(subscriber_config, specification, calls)