Coverage for faststream / specification / asyncapi / v2_6_0 / schema / bindings / amqp / operation.py: 82%

18 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1"""AsyncAPI AMQP bindings. 

2 

3References: https://github.com/asyncapi/bindings/tree/master/amqp 

4""" 

5 

6from pydantic import BaseModel, PositiveInt 

7from typing_extensions import Self 

8 

9from faststream.specification.schema.bindings import amqp 

10 

11 

12class OperationBinding(BaseModel): 

13 """A class to represent an operation binding. 

14 

15 Attributes: 

16 cc : optional string representing the cc 

17 ack : boolean indicating if the operation is acknowledged 

18 replyTo : optional dictionary representing the replyTo 

19 bindingVersion : string representing the binding version 

20 """ 

21 

22 cc: str | None = None 

23 ack: bool 

24 replyTo: str | None = None 

25 deliveryMode: int | None = None 

26 mandatory: bool | None = None 

27 priority: PositiveInt | None = None 

28 

29 bindingVersion: str = "0.2.0" 

30 

31 @classmethod 

32 def from_sub(cls, binding: amqp.OperationBinding | None) -> Self | None: 

33 if not binding: 33 ↛ 34line 33 didn't jump to line 34 because the condition on line 33 was never true

34 return None 

35 

36 return cls( 

37 cc=binding.routing_key if binding.exchange.is_respect_routing_key else None, 

38 ack=binding.ack, 

39 replyTo=binding.reply_to, 

40 deliveryMode=None if binding.persist is None else int(binding.persist) + 1, 

41 mandatory=binding.mandatory, 

42 priority=binding.priority, 

43 ) 

44 

45 @classmethod 

46 def from_pub(cls, binding: amqp.OperationBinding | None) -> Self | None: 

47 if not binding: 47 ↛ 48line 47 didn't jump to line 48 because the condition on line 47 was never true

48 return None 

49 

50 return cls( 

51 cc=binding.routing_key if binding.exchange.is_respect_routing_key else None, 

52 ack=binding.ack, 

53 replyTo=binding.reply_to, 

54 deliveryMode=None if binding.persist is None else int(binding.persist) + 1, 

55 mandatory=binding.mandatory, 

56 priority=binding.priority, 

57 )