Coverage for faststream / specification / asyncapi / v3_0_0 / schema / bindings / amqp / channel.py: 75%
12 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing_extensions import Self
3from faststream.specification.asyncapi.v2_6_0.schema.bindings.amqp import (
4 ChannelBinding as V2Binding,
5)
6from faststream.specification.asyncapi.v2_6_0.schema.bindings.amqp.channel import (
7 Exchange,
8 Queue,
9)
10from faststream.specification.schema.bindings import amqp
13class ChannelBinding(V2Binding):
14 bindingVersion: str = "0.3.0"
16 @classmethod
17 def from_sub(cls, binding: amqp.ChannelBinding | None) -> Self | None:
18 if binding is None: 18 ↛ 19line 18 didn't jump to line 19 because the condition on line 18 was never true
19 return None
21 return cls(
22 **{
23 "is": "queue",
24 "queue": Queue.from_spec(binding.queue, binding.virtual_host),
25 },
26 )
28 @classmethod
29 def from_pub(cls, binding: amqp.ChannelBinding | None) -> Self | None:
30 if binding is None: 30 ↛ 31line 30 didn't jump to line 31 because the condition on line 30 was never true
31 return None
33 return cls(
34 **{
35 "is": "routingKey",
36 "exchange": Exchange.from_spec(binding.exchange, binding.virtual_host),
37 },
38 )