Coverage for faststream / specification / asyncapi / v2_6_0 / schema / channels.py: 83%
16 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from pydantic import BaseModel
2from typing_extensions import Self
4from faststream._internal._compat import PYDANTIC_V2
5from faststream.specification.schema import PublisherSpec, SubscriberSpec
7from .bindings import ChannelBinding
8from .operations import Operation
11class Channel(BaseModel):
12 """A class to represent a channel.
14 Attributes:
15 description : optional description of the channel
16 servers : optional list of servers associated with the channel
17 bindings : optional channel binding
18 subscribe : optional operation for subscribing to the channel
19 publish : optional operation for publishing to the channel
21 Configurations:
22 model_config : configuration for the model (only applicable for Pydantic version 2)
23 Config : configuration for the class (only applicable for Pydantic version 1)
24 """
26 description: str | None = None
27 servers: list[str] | None = None
28 bindings: ChannelBinding | None = None
29 subscribe: Operation | None = None
30 publish: Operation | None = None
32 # TODO:
33 # parameters: Optional[Parameter] = None
35 if PYDANTIC_V2: 35 ↛ 40line 35 didn't jump to line 40 because the condition on line 35 was always true
36 model_config = {"extra": "allow"}
38 else:
40 class Config:
41 extra = "allow"
43 @classmethod
44 def from_sub(cls, subscriber: SubscriberSpec) -> Self:
45 return cls(
46 description=subscriber.description,
47 servers=None,
48 bindings=ChannelBinding.from_sub(subscriber.bindings),
49 subscribe=None,
50 publish=Operation.from_sub(subscriber.operation),
51 )
53 @classmethod
54 def from_pub(cls, publisher: PublisherSpec) -> Self:
55 return cls(
56 description=publisher.description,
57 servers=None,
58 bindings=ChannelBinding.from_pub(publisher.bindings),
59 subscribe=Operation.from_pub(publisher.operation),
60 publish=None,
61 )