Coverage for faststream / specification / asyncapi / v3_0_0 / schema / bindings / main / operation.py: 54%
46 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from pydantic import BaseModel
2from typing_extensions import Self
4from faststream._internal._compat import PYDANTIC_V2
5from faststream.specification.asyncapi.v3_0_0.schema.bindings import (
6 amqp as amqp_bindings,
7 http as http_bindings,
8 kafka as kafka_bindings,
9 mqtt as mqtt_bindings,
10 nats as nats_bindings,
11 redis as redis_bindings,
12 sqs as sqs_bindings,
13)
14from faststream.specification.schema.bindings import OperationBinding as SpecBinding
17class OperationBinding(BaseModel):
18 """A class to represent an operation binding.
20 Attributes:
21 amqp : AMQP operation binding (optional)
22 kafka : Kafka operation binding (optional)
23 sqs : SQS operation binding (optional)
24 nats : NATS operation binding (optional)
25 redis : Redis operation binding (optional)
26 http : HTTP operation binding (optional)
27 """
29 amqp: amqp_bindings.OperationBinding | None = None
30 kafka: kafka_bindings.OperationBinding | None = None
31 mqtt: mqtt_bindings.OperationBinding | None = None
32 sqs: sqs_bindings.OperationBinding | None = None
33 nats: nats_bindings.OperationBinding | None = None
34 redis: redis_bindings.OperationBinding | None = None
35 http: http_bindings.OperationBinding | None = None
37 if PYDANTIC_V2: 37 ↛ 42line 37 didn't jump to line 42 because the condition on line 37 was always true
38 model_config = {"extra": "allow"}
40 else:
42 class Config:
43 extra = "allow"
45 @classmethod
46 def from_sub(cls, binding: SpecBinding | None) -> Self | None:
47 if binding is None:
48 return None
50 if binding.amqp and (
51 amqp := amqp_bindings.OperationBinding.from_sub(binding.amqp)
52 ):
53 return cls(amqp=amqp)
55 if binding.kafka and ( 55 ↛ 58line 55 didn't jump to line 58 because the condition on line 55 was never true
56 kafka := kafka_bindings.OperationBinding.from_sub(binding.kafka)
57 ):
58 return cls(kafka=kafka)
60 if binding.mqtt and ( 60 ↛ 65line 60 didn't jump to line 65 because the condition on line 60 was always true
61 mqtt := mqtt_bindings.OperationBinding.from_sub(binding.mqtt)
62 ):
63 return cls(mqtt=mqtt)
65 if binding.nats and (
66 nats := nats_bindings.OperationBinding.from_sub(binding.nats)
67 ):
68 return cls(nats=nats)
70 if binding.redis and (
71 redis := redis_bindings.OperationBinding.from_sub(binding.redis)
72 ):
73 return cls(redis=redis)
75 if binding.sqs and (sqs := sqs_bindings.OperationBinding.from_sub(binding.sqs)):
76 return cls(sqs=sqs)
78 return None
80 @classmethod
81 def from_pub(cls, binding: SpecBinding | None) -> Self | None:
82 if binding is None:
83 return None
85 if binding.amqp and (
86 amqp := amqp_bindings.OperationBinding.from_pub(binding.amqp)
87 ):
88 return cls(amqp=amqp)
90 if binding.kafka and ( 90 ↛ 93line 90 didn't jump to line 93 because the condition on line 90 was never true
91 kafka := kafka_bindings.OperationBinding.from_pub(binding.kafka)
92 ):
93 return cls(kafka=kafka)
95 if binding.mqtt and ( 95 ↛ 100line 95 didn't jump to line 100 because the condition on line 95 was always true
96 mqtt := mqtt_bindings.OperationBinding.from_pub(binding.mqtt)
97 ):
98 return cls(mqtt=mqtt)
100 if binding.nats and (
101 nats := nats_bindings.OperationBinding.from_pub(binding.nats)
102 ):
103 return cls(nats=nats)
105 if binding.redis and (
106 redis := redis_bindings.OperationBinding.from_pub(binding.redis)
107 ):
108 return cls(redis=redis)
110 if binding.sqs and (sqs := sqs_bindings.OperationBinding.from_pub(binding.sqs)):
111 return cls(sqs=sqs)
113 return None