Coverage for faststream / mqtt / subscriber / factory.py: 82%

9 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import TYPE_CHECKING, Any 

2 

3from zmqtt import QoS 

4 

5from faststream._internal.endpoint.subscriber.call_item import CallsCollection 

6 

7from .config import MQTTSubscriberConfig, MQTTSubscriberSpecificationConfig 

8from .specification import MQTTSubscriberSpecification 

9from .usecase import MQTTConcurrentSubscriber, MQTTDefaultSubscriber 

10 

11if TYPE_CHECKING: 

12 from faststream.middlewares import AckPolicy 

13 from faststream.mqtt.broker.config import MQTTBrokerConfig 

14 

15SubscriberType = MQTTDefaultSubscriber | MQTTConcurrentSubscriber 

16 

17 

18def create_subscriber( 

19 *, 

20 topic: str, 

21 qos: QoS, 

22 shared: str | None, 

23 # Subscriber args 

24 ack_policy: "AckPolicy", 

25 no_reply: bool, 

26 config: "MQTTBrokerConfig", 

27 max_workers: int = 1, 

28 # AsyncAPI args 

29 title_: str | None = None, 

30 description_: str | None = None, 

31 include_in_schema: bool = True, 

32) -> SubscriberType: 

33 subscriber_config = MQTTSubscriberConfig( 

34 topic=topic, 

35 qos=qos, 

36 shared=shared, 

37 no_reply=no_reply, 

38 _outer_config=config, 

39 _ack_policy=ack_policy, 

40 ) 

41 

42 specification_config = MQTTSubscriberSpecificationConfig( 

43 topic=topic, 

44 qos=qos, 

45 shared=shared, 

46 title_=title_, 

47 description_=description_, 

48 include_in_schema=include_in_schema, 

49 ) 

50 

51 calls = CallsCollection[Any]() 

52 

53 specification = MQTTSubscriberSpecification( 

54 config, 

55 specification_config, 

56 calls, 

57 ) 

58 

59 if max_workers > 1: 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true

60 return MQTTConcurrentSubscriber( 

61 subscriber_config, 

62 specification, 

63 calls, 

64 max_workers=max_workers, 

65 ) 

66 

67 return MQTTDefaultSubscriber(subscriber_config, specification, calls)