Coverage for faststream / specification / asyncapi / v3_0_0 / schema / bindings / main / channel.py: 75%
45 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from pydantic import BaseModel
2from typing_extensions import Self
4from faststream._internal._compat import PYDANTIC_V2
5from faststream.specification.asyncapi.v3_0_0.schema.bindings import (
6 amqp as amqp_bindings,
7 kafka as kafka_bindings,
8 mqtt as mqtt_bindings,
9 nats as nats_bindings,
10 redis as redis_bindings,
11 sqs as sqs_bindings,
12)
13from faststream.specification.schema.bindings import ChannelBinding as SpecBinding
16class ChannelBinding(BaseModel):
17 """A class to represent channel bindings.
19 Attributes:
20 amqp : AMQP channel binding (optional)
21 kafka : Kafka channel binding (optional)
22 sqs : SQS channel binding (optional)
23 nats : NATS channel binding (optional)
24 redis : Redis channel binding (optional)
25 """
27 amqp: amqp_bindings.ChannelBinding | None = None
28 kafka: kafka_bindings.ChannelBinding | None = None
29 mqtt: mqtt_bindings.ChannelBinding | None = None
30 sqs: sqs_bindings.ChannelBinding | None = None
31 nats: nats_bindings.ChannelBinding | None = None
32 redis: redis_bindings.ChannelBinding | None = None
34 if PYDANTIC_V2: 34 ↛ 39line 34 didn't jump to line 39 because the condition on line 34 was always true
35 model_config = {"extra": "allow"}
37 else:
39 class Config:
40 extra = "allow"
42 @classmethod
43 def from_sub(cls, binding: SpecBinding | None) -> Self | None:
44 if binding is None: 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true
45 return None
47 if binding.amqp and (amqp := amqp_bindings.ChannelBinding.from_sub(binding.amqp)):
48 return cls(amqp=amqp)
50 if binding.kafka and (
51 kafka := kafka_bindings.ChannelBinding.from_sub(binding.kafka)
52 ):
53 return cls(kafka=kafka)
55 if binding.mqtt and (mqtt := mqtt_bindings.ChannelBinding.from_sub(binding.mqtt)):
56 return cls(mqtt=mqtt)
58 if binding.nats and (nats := nats_bindings.ChannelBinding.from_sub(binding.nats)):
59 return cls(nats=nats)
61 if binding.redis and ( 61 ↛ 66line 61 didn't jump to line 66 because the condition on line 61 was always true
62 redis := redis_bindings.ChannelBinding.from_sub(binding.redis)
63 ):
64 return cls(redis=redis)
66 if binding.sqs and (sqs := sqs_bindings.ChannelBinding.from_sub(binding.sqs)):
67 return cls(sqs=sqs)
69 return None
71 @classmethod
72 def from_pub(cls, binding: SpecBinding | None) -> Self | None:
73 if binding is None: 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true
74 return None
76 if binding.amqp and (amqp := amqp_bindings.ChannelBinding.from_pub(binding.amqp)):
77 return cls(amqp=amqp)
79 if binding.kafka and (
80 kafka := kafka_bindings.ChannelBinding.from_pub(binding.kafka)
81 ):
82 return cls(kafka=kafka)
84 if binding.mqtt and (mqtt := mqtt_bindings.ChannelBinding.from_pub(binding.mqtt)):
85 return cls(mqtt=mqtt)
87 if binding.nats and (nats := nats_bindings.ChannelBinding.from_pub(binding.nats)):
88 return cls(nats=nats)
90 if binding.redis and ( 90 ↛ 95line 90 didn't jump to line 95 because the condition on line 90 was always true
91 redis := redis_bindings.ChannelBinding.from_pub(binding.redis)
92 ):
93 return cls(redis=redis)
95 if binding.sqs and (sqs := sqs_bindings.ChannelBinding.from_pub(binding.sqs)):
96 return cls(sqs=sqs)
98 return None