Coverage for faststream / mqtt / subscriber / specification.py: 90%
17 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import TYPE_CHECKING, Any
3from faststream._internal.endpoint.subscriber import SubscriberSpecification
4from faststream.mqtt.broker.config import MQTTBrokerConfig
5from faststream.specification.asyncapi.utils import resolve_payloads
6from faststream.specification.schema import Message, Operation, SubscriberSpec
7from faststream.specification.schema.bindings import (
8 ChannelBinding,
9 OperationBinding,
10 mqtt as mqtt_bindings,
11)
13from .config import MQTTSubscriberSpecificationConfig
15if TYPE_CHECKING:
16 from faststream._internal.endpoint.subscriber.call_item import CallsCollection
19class MQTTSubscriberSpecification(
20 SubscriberSpecification[MQTTBrokerConfig, MQTTSubscriberSpecificationConfig],
21):
22 def __init__(
23 self,
24 _outer_config: "MQTTBrokerConfig",
25 specification_config: "MQTTSubscriberSpecificationConfig",
26 calls: "CallsCollection[Any]",
27 ) -> None:
28 super().__init__(_outer_config, specification_config, calls)
30 @property
31 def topic(self) -> str:
32 base = f"{self._outer_config.prefix}{self.config.topic}"
33 if self.config.shared: 33 ↛ 34line 33 didn't jump to line 34 because the condition on line 33 was never true
34 return f"$share/{self.config.shared}/{base}"
35 return base
37 @property
38 def name(self) -> str:
39 if self.config.title_:
40 return self.config.title_
41 return f"{self.topic}:{self.call_name}"
43 def get_schema(self) -> dict[str, SubscriberSpec]:
44 payloads = self.get_payloads()
46 return {
47 self.name: SubscriberSpec(
48 description=self.description,
49 operation=Operation(
50 message=Message(
51 title=f"{self.name}:Message",
52 payload=resolve_payloads(payloads),
53 ),
54 bindings=OperationBinding(
55 mqtt=mqtt_bindings.OperationBinding(
56 qos=self.config.qos,
57 ),
58 ),
59 ),
60 bindings=ChannelBinding(
61 mqtt=mqtt_bindings.ChannelBinding(
62 topic=self.topic,
63 qos=self.config.qos,
64 ),
65 ),
66 ),
67 }