Coverage for faststream / specification / asyncapi / v2_6_0 / schema / bindings / main / channel.py: 75%
45 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import overload
3from pydantic import BaseModel
4from typing_extensions import Self
6from faststream._internal._compat import PYDANTIC_V2
7from faststream.specification.asyncapi.v2_6_0.schema.bindings import (
8 amqp as amqp_bindings,
9 kafka as kafka_bindings,
10 mqtt as mqtt_bindings,
11 nats as nats_bindings,
12 redis as redis_bindings,
13 sqs as sqs_bindings,
14)
15from faststream.specification.schema.bindings import ChannelBinding as SpecBinding
18class ChannelBinding(BaseModel):
19 """A class to represent channel bindings.
21 Attributes:
22 amqp : AMQP channel binding (optional)
23 kafka : Kafka channel binding (optional)
24 sqs : SQS channel binding (optional)
25 nats : NATS channel binding (optional)
26 redis : Redis channel binding (optional)
27 """
29 amqp: amqp_bindings.ChannelBinding | None = None
30 kafka: kafka_bindings.ChannelBinding | None = None
31 mqtt: mqtt_bindings.ChannelBinding | None = None
32 sqs: sqs_bindings.ChannelBinding | None = None
33 nats: nats_bindings.ChannelBinding | None = None
34 redis: redis_bindings.ChannelBinding | None = None
36 if PYDANTIC_V2: 36 ↛ 41line 36 didn't jump to line 41 because the condition on line 36 was always true
37 model_config = {"extra": "allow"}
39 else:
41 class Config:
42 extra = "allow"
44 @overload
45 @classmethod
46 def from_sub(cls, binding: None) -> None: ...
48 @overload
49 @classmethod
50 def from_sub(cls, binding: SpecBinding) -> Self: ...
52 @classmethod
53 def from_sub(cls, binding: SpecBinding | None) -> Self | None:
54 if binding is None: 54 ↛ 55line 54 didn't jump to line 55 because the condition on line 54 was never true
55 return None
57 if binding.amqp and (amqp := amqp_bindings.ChannelBinding.from_sub(binding.amqp)):
58 return cls(amqp=amqp)
60 if binding.kafka and (
61 kafka := kafka_bindings.ChannelBinding.from_sub(binding.kafka)
62 ):
63 return cls(kafka=kafka)
65 if binding.mqtt and (mqtt := mqtt_bindings.ChannelBinding.from_sub(binding.mqtt)):
66 return cls(mqtt=mqtt)
68 if binding.nats and (nats := nats_bindings.ChannelBinding.from_sub(binding.nats)):
69 return cls(nats=nats)
71 if binding.redis and ( 71 ↛ 76line 71 didn't jump to line 76 because the condition on line 71 was always true
72 redis := redis_bindings.ChannelBinding.from_sub(binding.redis)
73 ):
74 return cls(redis=redis)
76 if binding.sqs and (sqs := sqs_bindings.ChannelBinding.from_sub(binding.sqs)):
77 return cls(sqs=sqs)
79 return None
81 @overload
82 @classmethod
83 def from_pub(cls, binding: None) -> None: ...
85 @overload
86 @classmethod
87 def from_pub(cls, binding: SpecBinding) -> Self: ...
89 @classmethod
90 def from_pub(cls, binding: SpecBinding | None) -> Self | None:
91 if binding is None: 91 ↛ 92line 91 didn't jump to line 92 because the condition on line 91 was never true
92 return None
94 if binding.amqp and (amqp := amqp_bindings.ChannelBinding.from_pub(binding.amqp)):
95 return cls(amqp=amqp)
97 if binding.kafka and (
98 kafka := kafka_bindings.ChannelBinding.from_pub(binding.kafka)
99 ):
100 return cls(kafka=kafka)
102 if binding.mqtt and (mqtt := mqtt_bindings.ChannelBinding.from_pub(binding.mqtt)):
103 return cls(mqtt=mqtt)
105 if binding.nats and (nats := nats_bindings.ChannelBinding.from_pub(binding.nats)):
106 return cls(nats=nats)
108 if binding.redis and ( 108 ↛ 113line 108 didn't jump to line 113 because the condition on line 108 was always true
109 redis := redis_bindings.ChannelBinding.from_pub(binding.redis)
110 ):
111 return cls(redis=redis)
113 if binding.sqs and (sqs := sqs_bindings.ChannelBinding.from_pub(binding.sqs)):
114 return cls(sqs=sqs)
116 return None