Coverage for faststream / specification / asyncapi / v3_0_0 / schema / channels.py: 86%

20 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from pydantic import BaseModel 

2from typing_extensions import Self 

3 

4from faststream._internal._compat import PYDANTIC_V2 

5from faststream.specification.asyncapi.v3_0_0.schema.bindings import ChannelBinding 

6from faststream.specification.asyncapi.v3_0_0.schema.message import Message 

7from faststream.specification.schema import PublisherSpec, SubscriberSpec 

8 

9from .utils import Reference 

10 

11 

12class Channel(BaseModel): 

13 """A class to represent a channel. 

14 

15 Attributes: 

16 address: A string representation of this channel's address. 

17 description : optional description of the channel 

18 servers : optional list of servers associated with the channel 

19 bindings : optional channel binding 

20 parameters : optional parameters associated with the channel 

21 

22 Configurations: 

23 model_config : configuration for the model (only applicable for Pydantic version 2) 

24 Config : configuration for the class (only applicable for Pydantic version 1) 

25 """ 

26 

27 address: str 

28 description: str | None = None 

29 servers: list[dict[str, str]] | None = None 

30 messages: dict[str, Message | Reference] 

31 bindings: ChannelBinding | None = None 

32 

33 # TODO: 

34 # parameters: Optional[Parameter] = None 

35 

36 if PYDANTIC_V2: 36 ↛ 41line 36 didn't jump to line 41 because the condition on line 36 was always true

37 model_config = {"extra": "allow"} 

38 

39 else: 

40 

41 class Config: 

42 extra = "allow" 

43 

44 @classmethod 

45 def from_sub(cls, address: str, subscriber: SubscriberSpec) -> Self: 

46 message = subscriber.operation.message 

47 assert message.title 

48 

49 *left, right = message.title.split(":") 

50 message.title = ":".join((*left, f"Subscribe{right}")) 

51 

52 return cls( 

53 description=subscriber.description, 

54 address=address, 

55 messages={ 

56 "SubscribeMessage": Message.from_spec(message), 

57 }, 

58 bindings=ChannelBinding.from_sub(subscriber.bindings), 

59 servers=None, 

60 ) 

61 

62 @classmethod 

63 def from_pub(cls, address: str, publisher: PublisherSpec) -> Self: 

64 return cls( 

65 description=publisher.description, 

66 address=address, 

67 messages={ 

68 "Message": Message.from_spec(publisher.operation.message), 

69 }, 

70 bindings=ChannelBinding.from_pub(publisher.bindings), 

71 servers=None, 

72 )