Coverage for faststream / specification / asyncapi / v2_6_0 / schema / bindings / mqtt / operation.py: 79%

15 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1"""AsyncAPI MQTT bindings. 

2 

3References: https://github.com/asyncapi/bindings/tree/master/mqtt 

4""" 

5 

6from pydantic import BaseModel 

7from typing_extensions import Self 

8 

9from faststream.specification.schema.bindings import mqtt 

10 

11 

12class OperationBinding(BaseModel): 

13 """MQTT operation binding (AsyncAPI 2.6.0). 

14 

15 Attributes: 

16 qos: QoS level for the operation (0, 1, or 2). 

17 retain: Whether the broker should retain the message. 

18 messageExpiryInterval: MQTT 5.0 message expiry interval in seconds. 

19 bindingVersion: Version of the binding spec. 

20 """ 

21 

22 qos: int = 0 

23 retain: bool = False 

24 messageExpiryInterval: int | None = None 

25 bindingVersion: str = "0.2.0" 

26 

27 @classmethod 

28 def from_sub(cls, binding: mqtt.OperationBinding | None) -> Self | None: 

29 if binding is None: 29 ↛ 30line 29 didn't jump to line 30 because the condition on line 29 was never true

30 return None 

31 

32 return cls( 

33 qos=binding.qos, 

34 retain=binding.retain, 

35 messageExpiryInterval=binding.messageExpiryInterval, 

36 bindingVersion=binding.bindingVersion, 

37 ) 

38 

39 @classmethod 

40 def from_pub(cls, binding: mqtt.OperationBinding | None) -> Self | None: 

41 if binding is None: 41 ↛ 42line 41 didn't jump to line 42 because the condition on line 41 was never true

42 return None 

43 

44 return cls( 

45 qos=binding.qos, 

46 retain=binding.retain, 

47 messageExpiryInterval=binding.messageExpiryInterval, 

48 bindingVersion=binding.bindingVersion, 

49 )