Coverage for faststream / specification / asyncapi / v3_0_0 / schema / bindings / amqp / operation.py: 82%
18 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1"""AsyncAPI AMQP bindings.
3References: https://github.com/asyncapi/bindings/tree/master/amqp
4"""
6from pydantic import BaseModel, PositiveInt
7from typing_extensions import Self
9from faststream.specification.schema.bindings import amqp
12class OperationBinding(BaseModel):
13 cc: list[str] | None = None
14 ack: bool
15 replyTo: str | None = None
16 deliveryMode: int | None = None
17 mandatory: bool | None = None
18 priority: PositiveInt | None = None
20 bindingVersion: str = "0.3.0"
22 @classmethod
23 def from_sub(cls, binding: amqp.OperationBinding | None) -> Self | None:
24 if not binding: 24 ↛ 25line 24 didn't jump to line 25 because the condition on line 24 was never true
25 return None
27 return cls(
28 cc=[binding.routing_key]
29 if (binding.routing_key and binding.exchange.is_respect_routing_key)
30 else None,
31 ack=binding.ack,
32 replyTo=binding.reply_to,
33 deliveryMode=None if binding.persist is None else int(binding.persist) + 1,
34 mandatory=binding.mandatory,
35 priority=binding.priority,
36 )
38 @classmethod
39 def from_pub(cls, binding: amqp.OperationBinding | None) -> Self | None:
40 if not binding: 40 ↛ 41line 40 didn't jump to line 41 because the condition on line 40 was never true
41 return None
43 return cls(
44 cc=None
45 if (not binding.routing_key or not binding.exchange.is_respect_routing_key)
46 else [binding.routing_key],
47 ack=binding.ack,
48 replyTo=binding.reply_to,
49 deliveryMode=None if binding.persist is None else int(binding.persist) + 1,
50 mandatory=binding.mandatory,
51 priority=binding.priority,
52 )