Coverage for faststream / specification / asyncapi / v2_6_0 / schema / channels.py: 83%

16 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from pydantic import BaseModel 

2from typing_extensions import Self 

3 

4from faststream._internal._compat import PYDANTIC_V2 

5from faststream.specification.schema import PublisherSpec, SubscriberSpec 

6 

7from .bindings import ChannelBinding 

8from .operations import Operation 

9 

10 

11class Channel(BaseModel): 

12 """A class to represent a channel. 

13 

14 Attributes: 

15 description : optional description of the channel 

16 servers : optional list of servers associated with the channel 

17 bindings : optional channel binding 

18 subscribe : optional operation for subscribing to the channel 

19 publish : optional operation for publishing to the channel 

20 

21 Configurations: 

22 model_config : configuration for the model (only applicable for Pydantic version 2) 

23 Config : configuration for the class (only applicable for Pydantic version 1) 

24 """ 

25 

26 description: str | None = None 

27 servers: list[str] | None = None 

28 bindings: ChannelBinding | None = None 

29 subscribe: Operation | None = None 

30 publish: Operation | None = None 

31 

32 # TODO: 

33 # parameters: Optional[Parameter] = None 

34 

35 if PYDANTIC_V2: 35 ↛ 40line 35 didn't jump to line 40 because the condition on line 35 was always true

36 model_config = {"extra": "allow"} 

37 

38 else: 

39 

40 class Config: 

41 extra = "allow" 

42 

43 @classmethod 

44 def from_sub(cls, subscriber: SubscriberSpec) -> Self: 

45 return cls( 

46 description=subscriber.description, 

47 servers=None, 

48 bindings=ChannelBinding.from_sub(subscriber.bindings), 

49 subscribe=None, 

50 publish=Operation.from_sub(subscriber.operation), 

51 ) 

52 

53 @classmethod 

54 def from_pub(cls, publisher: PublisherSpec) -> Self: 

55 return cls( 

56 description=publisher.description, 

57 servers=None, 

58 bindings=ChannelBinding.from_pub(publisher.bindings), 

59 subscribe=Operation.from_pub(publisher.operation), 

60 publish=None, 

61 )