Coverage for faststream / specification / asyncapi / v3_0_0 / schema / bindings / amqp / operation.py: 82%

18 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1"""AsyncAPI AMQP bindings. 

2 

3References: https://github.com/asyncapi/bindings/tree/master/amqp 

4""" 

5 

6from pydantic import BaseModel, PositiveInt 

7from typing_extensions import Self 

8 

9from faststream.specification.schema.bindings import amqp 

10 

11 

12class OperationBinding(BaseModel): 

13 cc: list[str] | None = None 

14 ack: bool 

15 replyTo: str | None = None 

16 deliveryMode: int | None = None 

17 mandatory: bool | None = None 

18 priority: PositiveInt | None = None 

19 

20 bindingVersion: str = "0.3.0" 

21 

22 @classmethod 

23 def from_sub(cls, binding: amqp.OperationBinding | None) -> Self | None: 

24 if not binding: 24 ↛ 25line 24 didn't jump to line 25 because the condition on line 24 was never true

25 return None 

26 

27 return cls( 

28 cc=[binding.routing_key] 

29 if (binding.routing_key and binding.exchange.is_respect_routing_key) 

30 else None, 

31 ack=binding.ack, 

32 replyTo=binding.reply_to, 

33 deliveryMode=None if binding.persist is None else int(binding.persist) + 1, 

34 mandatory=binding.mandatory, 

35 priority=binding.priority, 

36 ) 

37 

38 @classmethod 

39 def from_pub(cls, binding: amqp.OperationBinding | None) -> Self | None: 

40 if not binding: 40 ↛ 41line 40 didn't jump to line 41 because the condition on line 40 was never true

41 return None 

42 

43 return cls( 

44 cc=None 

45 if (not binding.routing_key or not binding.exchange.is_respect_routing_key) 

46 else [binding.routing_key], 

47 ack=binding.ack, 

48 replyTo=binding.reply_to, 

49 deliveryMode=None if binding.persist is None else int(binding.persist) + 1, 

50 mandatory=binding.mandatory, 

51 priority=binding.priority, 

52 )