Coverage for faststream / specification / asyncapi / v2_6_0 / schema / bindings / mqtt / channel.py: 79%
15 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1"""AsyncAPI MQTT bindings.
3References: https://github.com/asyncapi/bindings/tree/master/mqtt
4"""
6from pydantic import BaseModel
7from typing_extensions import Self
9from faststream.specification.schema.bindings import mqtt
12class ChannelBinding(BaseModel):
13 """MQTT channel binding (AsyncAPI 2.6.0).
15 Attributes:
16 topic: The MQTT topic name.
17 qos: QoS level for the channel (0, 1, or 2).
18 retain: Whether messages are retained by the broker.
19 bindingVersion: Version of the binding spec.
20 """
22 topic: str
23 qos: int = 0
24 retain: bool = False
25 bindingVersion: str = "0.2.0"
27 @classmethod
28 def from_sub(cls, binding: mqtt.ChannelBinding | None) -> Self | None:
29 if binding is None: 29 ↛ 30line 29 didn't jump to line 30 because the condition on line 29 was never true
30 return None
32 return cls(
33 topic=binding.topic,
34 qos=binding.qos,
35 retain=binding.retain,
36 bindingVersion=binding.bindingVersion,
37 )
39 @classmethod
40 def from_pub(cls, binding: mqtt.ChannelBinding | None) -> Self | None:
41 if binding is None: 41 ↛ 42line 41 didn't jump to line 42 because the condition on line 41 was never true
42 return None
44 return cls(
45 topic=binding.topic,
46 qos=binding.qos,
47 retain=binding.retain,
48 bindingVersion=binding.bindingVersion,
49 )