Coverage for faststream / specification / asyncapi / v2_6_0 / schema / bindings / mqtt / operation.py: 79%
15 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1"""AsyncAPI MQTT bindings.
3References: https://github.com/asyncapi/bindings/tree/master/mqtt
4"""
6from pydantic import BaseModel
7from typing_extensions import Self
9from faststream.specification.schema.bindings import mqtt
12class OperationBinding(BaseModel):
13 """MQTT operation binding (AsyncAPI 2.6.0).
15 Attributes:
16 qos: QoS level for the operation (0, 1, or 2).
17 retain: Whether the broker should retain the message.
18 messageExpiryInterval: MQTT 5.0 message expiry interval in seconds.
19 bindingVersion: Version of the binding spec.
20 """
22 qos: int = 0
23 retain: bool = False
24 messageExpiryInterval: int | None = None
25 bindingVersion: str = "0.2.0"
27 @classmethod
28 def from_sub(cls, binding: mqtt.OperationBinding | None) -> Self | None:
29 if binding is None: 29 ↛ 30line 29 didn't jump to line 30 because the condition on line 29 was never true
30 return None
32 return cls(
33 qos=binding.qos,
34 retain=binding.retain,
35 messageExpiryInterval=binding.messageExpiryInterval,
36 bindingVersion=binding.bindingVersion,
37 )
39 @classmethod
40 def from_pub(cls, binding: mqtt.OperationBinding | None) -> Self | None:
41 if binding is None: 41 ↛ 42line 41 didn't jump to line 42 because the condition on line 41 was never true
42 return None
44 return cls(
45 qos=binding.qos,
46 retain=binding.retain,
47 messageExpiryInterval=binding.messageExpiryInterval,
48 bindingVersion=binding.bindingVersion,
49 )