Coverage for faststream / specification / asyncapi / v2_6_0 / schema / bindings / amqp / channel.py: 82%
36 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1"""AsyncAPI AMQP bindings.
3References: https://github.com/asyncapi/bindings/tree/master/amqp
4"""
6from typing import Literal, overload
8from pydantic import BaseModel, Field
9from typing_extensions import Self
11from faststream.specification.schema.bindings import amqp
14class Queue(BaseModel):
15 """A class to represent a queue.
17 Attributes:
18 name : name of the queue
19 durable : indicates if the queue is durable
20 exclusive : indicates if the queue is exclusive
21 autoDelete : indicates if the queue should be automatically deleted
22 vhost : virtual host of the queue (default is "/")
23 """
25 name: str
26 durable: bool
27 exclusive: bool
28 autoDelete: bool
29 vhost: str = "/"
31 @overload
32 @classmethod
33 def from_spec(cls, binding: None, vhost: str) -> None: ...
35 @overload
36 @classmethod
37 def from_spec(cls, binding: amqp.Queue, vhost: str) -> Self: ...
39 @classmethod
40 def from_spec(cls, binding: amqp.Queue | None, vhost: str) -> Self | None:
41 if binding is None: 41 ↛ 42line 41 didn't jump to line 42 because the condition on line 41 was never true
42 return None
44 return cls(
45 name=binding.name,
46 durable=binding.durable,
47 exclusive=binding.exclusive,
48 autoDelete=binding.auto_delete,
49 vhost=vhost,
50 )
53class Exchange(BaseModel):
54 """A class to represent an exchange.
56 Attributes:
57 name : name of the exchange (optional)
58 type : type of the exchange, can be one of "default", "direct", "topic", "fanout", "headers"
59 durable : whether the exchange is durable (optional)
60 autoDelete : whether the exchange is automatically deleted (optional)
61 vhost : virtual host of the exchange, default is "/"
62 """
64 name: str | None = None
65 type: Literal[
66 "default",
67 "direct",
68 "topic",
69 "fanout",
70 "headers",
71 "x-delayed-message",
72 "x-consistent-hash",
73 "x-modulus-hash",
74 ]
75 durable: bool | None = None
76 autoDelete: bool | None = None
77 vhost: str = "/"
79 @overload
80 @classmethod
81 def from_spec(cls, binding: None, vhost: str) -> None: ...
83 @overload
84 @classmethod
85 def from_spec(cls, binding: amqp.Exchange, vhost: str) -> Self: ...
87 @classmethod
88 def from_spec(cls, binding: amqp.Exchange | None, vhost: str) -> Self | None:
89 if binding is None: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true
90 return None
92 return cls(
93 name=binding.name,
94 type=binding.type,
95 durable=binding.durable,
96 autoDelete=binding.auto_delete,
97 vhost=vhost,
98 )
101class ChannelBinding(BaseModel):
102 """A class to represent channel binding.
104 Attributes:
105 is_ : Type of binding, can be "queue" or "routingKey"
106 bindingVersion : Version of the binding
107 queue : Optional queue object
108 exchange : Optional exchange object
109 """
111 is_: Literal["queue", "routingKey"] = Field(..., alias="is")
112 bindingVersion: str = "0.2.0"
113 queue: Queue | None = None
114 exchange: Exchange | None = None
116 @classmethod
117 def from_sub(cls, binding: amqp.ChannelBinding | None) -> Self | None:
118 if binding is None: 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true
119 return None
121 return cls(
122 **{
123 "is": "routingKey",
124 "queue": Queue.from_spec(binding.queue, binding.virtual_host)
125 if binding.exchange.is_respect_routing_key
126 else None,
127 "exchange": Exchange.from_spec(binding.exchange, binding.virtual_host),
128 },
129 )
131 @classmethod
132 def from_pub(cls, binding: amqp.ChannelBinding | None) -> Self | None:
133 if binding is None: 133 ↛ 134line 133 didn't jump to line 134 because the condition on line 133 was never true
134 return None
136 return cls(
137 **{
138 "is": "routingKey",
139 "queue": Queue.from_spec(binding.queue, binding.virtual_host)
140 if binding.exchange.is_respect_routing_key and binding.queue.name
141 else None,
142 "exchange": Exchange.from_spec(binding.exchange, binding.virtual_host),
143 },
144 )