Coverage for faststream / specification / asyncapi / v2_6_0 / schema / bindings / main / operation.py: 54%
46 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import overload
3from pydantic import BaseModel
4from typing_extensions import Self
6from faststream._internal._compat import PYDANTIC_V2
7from faststream.specification.asyncapi.v2_6_0.schema.bindings import (
8 amqp as amqp_bindings,
9 http as http_bindings,
10 kafka as kafka_bindings,
11 mqtt as mqtt_bindings,
12 nats as nats_bindings,
13 redis as redis_bindings,
14 sqs as sqs_bindings,
15)
16from faststream.specification.schema.bindings import OperationBinding as SpecBinding
19class OperationBinding(BaseModel):
20 """A class to represent an operation binding.
22 Attributes:
23 amqp : AMQP operation binding (optional)
24 kafka : Kafka operation binding (optional)
25 sqs : SQS operation binding (optional)
26 nats : NATS operation binding (optional)
27 redis : Redis operation binding (optional)
28 http : HTTP channel binding (optional)
29 """
31 amqp: amqp_bindings.OperationBinding | None = None
32 kafka: kafka_bindings.OperationBinding | None = None
33 mqtt: mqtt_bindings.OperationBinding | None = None
34 sqs: sqs_bindings.OperationBinding | None = None
35 nats: nats_bindings.OperationBinding | None = None
36 redis: redis_bindings.OperationBinding | None = None
37 http: http_bindings.OperationBinding | None = None
39 if PYDANTIC_V2: 39 ↛ 44line 39 didn't jump to line 44 because the condition on line 39 was always true
40 model_config = {"extra": "allow"}
42 else:
44 class Config:
45 extra = "allow"
47 @overload
48 @classmethod
49 def from_sub(cls, binding: None) -> None: ...
51 @overload
52 @classmethod
53 def from_sub(cls, binding: SpecBinding) -> Self: ...
55 @classmethod
56 def from_sub(cls, binding: SpecBinding | None) -> Self | None:
57 if binding is None:
58 return None
60 if binding.amqp and (
61 amqp := amqp_bindings.OperationBinding.from_sub(binding.amqp)
62 ):
63 return cls(amqp=amqp)
65 if binding.kafka and ( 65 ↛ 68line 65 didn't jump to line 68 because the condition on line 65 was never true
66 kafka := kafka_bindings.OperationBinding.from_sub(binding.kafka)
67 ):
68 return cls(kafka=kafka)
70 if binding.mqtt and ( 70 ↛ 75line 70 didn't jump to line 75 because the condition on line 70 was always true
71 mqtt := mqtt_bindings.OperationBinding.from_sub(binding.mqtt)
72 ):
73 return cls(mqtt=mqtt)
75 if binding.nats and (
76 nats := nats_bindings.OperationBinding.from_sub(binding.nats)
77 ):
78 return cls(nats=nats)
80 if binding.redis and (
81 redis := redis_bindings.OperationBinding.from_sub(binding.redis)
82 ):
83 return cls(redis=redis)
85 if binding.sqs and (sqs := sqs_bindings.OperationBinding.from_sub(binding.sqs)):
86 return cls(sqs=sqs)
88 return None
90 @overload
91 @classmethod
92 def from_pub(cls, binding: None) -> None: ...
94 @overload
95 @classmethod
96 def from_pub(cls, binding: SpecBinding) -> Self: ...
98 @classmethod
99 def from_pub(cls, binding: SpecBinding | None) -> Self | None:
100 if binding is None:
101 return None
103 if binding.amqp and (
104 amqp := amqp_bindings.OperationBinding.from_pub(binding.amqp)
105 ):
106 return cls(amqp=amqp)
108 if binding.kafka and ( 108 ↛ 111line 108 didn't jump to line 111 because the condition on line 108 was never true
109 kafka := kafka_bindings.OperationBinding.from_pub(binding.kafka)
110 ):
111 return cls(kafka=kafka)
113 if binding.mqtt and ( 113 ↛ 118line 113 didn't jump to line 118 because the condition on line 113 was always true
114 mqtt := mqtt_bindings.OperationBinding.from_pub(binding.mqtt)
115 ):
116 return cls(mqtt=mqtt)
118 if binding.nats and (
119 nats := nats_bindings.OperationBinding.from_pub(binding.nats)
120 ):
121 return cls(nats=nats)
123 if binding.redis and (
124 redis := redis_bindings.OperationBinding.from_pub(binding.redis)
125 ):
126 return cls(redis=redis)
128 if binding.sqs and (sqs := sqs_bindings.OperationBinding.from_pub(binding.sqs)):
129 return cls(sqs=sqs)
131 return None