Coverage for faststream / mqtt / subscriber / specification.py: 90%

17 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import TYPE_CHECKING, Any 

2 

3from faststream._internal.endpoint.subscriber import SubscriberSpecification 

4from faststream.mqtt.broker.config import MQTTBrokerConfig 

5from faststream.specification.asyncapi.utils import resolve_payloads 

6from faststream.specification.schema import Message, Operation, SubscriberSpec 

7from faststream.specification.schema.bindings import ( 

8 ChannelBinding, 

9 OperationBinding, 

10 mqtt as mqtt_bindings, 

11) 

12 

13from .config import MQTTSubscriberSpecificationConfig 

14 

15if TYPE_CHECKING: 

16 from faststream._internal.endpoint.subscriber.call_item import CallsCollection 

17 

18 

19class MQTTSubscriberSpecification( 

20 SubscriberSpecification[MQTTBrokerConfig, MQTTSubscriberSpecificationConfig], 

21): 

22 def __init__( 

23 self, 

24 _outer_config: "MQTTBrokerConfig", 

25 specification_config: "MQTTSubscriberSpecificationConfig", 

26 calls: "CallsCollection[Any]", 

27 ) -> None: 

28 super().__init__(_outer_config, specification_config, calls) 

29 

30 @property 

31 def topic(self) -> str: 

32 base = f"{self._outer_config.prefix}{self.config.topic}" 

33 if self.config.shared: 33 ↛ 34line 33 didn't jump to line 34 because the condition on line 33 was never true

34 return f"$share/{self.config.shared}/{base}" 

35 return base 

36 

37 @property 

38 def name(self) -> str: 

39 if self.config.title_: 

40 return self.config.title_ 

41 return f"{self.topic}:{self.call_name}" 

42 

43 def get_schema(self) -> dict[str, SubscriberSpec]: 

44 payloads = self.get_payloads() 

45 

46 return { 

47 self.name: SubscriberSpec( 

48 description=self.description, 

49 operation=Operation( 

50 message=Message( 

51 title=f"{self.name}:Message", 

52 payload=resolve_payloads(payloads), 

53 ), 

54 bindings=OperationBinding( 

55 mqtt=mqtt_bindings.OperationBinding( 

56 qos=self.config.qos, 

57 ), 

58 ), 

59 ), 

60 bindings=ChannelBinding( 

61 mqtt=mqtt_bindings.ChannelBinding( 

62 topic=self.topic, 

63 qos=self.config.qos, 

64 ), 

65 ), 

66 ), 

67 }