Coverage for faststream / specification / asyncapi / v2_6_0 / schema / bindings / main / channel.py: 75%

45 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import overload 

2 

3from pydantic import BaseModel 

4from typing_extensions import Self 

5 

6from faststream._internal._compat import PYDANTIC_V2 

7from faststream.specification.asyncapi.v2_6_0.schema.bindings import ( 

8 amqp as amqp_bindings, 

9 kafka as kafka_bindings, 

10 mqtt as mqtt_bindings, 

11 nats as nats_bindings, 

12 redis as redis_bindings, 

13 sqs as sqs_bindings, 

14) 

15from faststream.specification.schema.bindings import ChannelBinding as SpecBinding 

16 

17 

18class ChannelBinding(BaseModel): 

19 """A class to represent channel bindings. 

20 

21 Attributes: 

22 amqp : AMQP channel binding (optional) 

23 kafka : Kafka channel binding (optional) 

24 sqs : SQS channel binding (optional) 

25 nats : NATS channel binding (optional) 

26 redis : Redis channel binding (optional) 

27 """ 

28 

29 amqp: amqp_bindings.ChannelBinding | None = None 

30 kafka: kafka_bindings.ChannelBinding | None = None 

31 mqtt: mqtt_bindings.ChannelBinding | None = None 

32 sqs: sqs_bindings.ChannelBinding | None = None 

33 nats: nats_bindings.ChannelBinding | None = None 

34 redis: redis_bindings.ChannelBinding | None = None 

35 

36 if PYDANTIC_V2: 36 ↛ 41line 36 didn't jump to line 41 because the condition on line 36 was always true

37 model_config = {"extra": "allow"} 

38 

39 else: 

40 

41 class Config: 

42 extra = "allow" 

43 

44 @overload 

45 @classmethod 

46 def from_sub(cls, binding: None) -> None: ... 

47 

48 @overload 

49 @classmethod 

50 def from_sub(cls, binding: SpecBinding) -> Self: ... 

51 

52 @classmethod 

53 def from_sub(cls, binding: SpecBinding | None) -> Self | None: 

54 if binding is None: 54 ↛ 55line 54 didn't jump to line 55 because the condition on line 54 was never true

55 return None 

56 

57 if binding.amqp and (amqp := amqp_bindings.ChannelBinding.from_sub(binding.amqp)): 

58 return cls(amqp=amqp) 

59 

60 if binding.kafka and ( 

61 kafka := kafka_bindings.ChannelBinding.from_sub(binding.kafka) 

62 ): 

63 return cls(kafka=kafka) 

64 

65 if binding.mqtt and (mqtt := mqtt_bindings.ChannelBinding.from_sub(binding.mqtt)): 

66 return cls(mqtt=mqtt) 

67 

68 if binding.nats and (nats := nats_bindings.ChannelBinding.from_sub(binding.nats)): 

69 return cls(nats=nats) 

70 

71 if binding.redis and ( 71 ↛ 76line 71 didn't jump to line 76 because the condition on line 71 was always true

72 redis := redis_bindings.ChannelBinding.from_sub(binding.redis) 

73 ): 

74 return cls(redis=redis) 

75 

76 if binding.sqs and (sqs := sqs_bindings.ChannelBinding.from_sub(binding.sqs)): 

77 return cls(sqs=sqs) 

78 

79 return None 

80 

81 @overload 

82 @classmethod 

83 def from_pub(cls, binding: None) -> None: ... 

84 

85 @overload 

86 @classmethod 

87 def from_pub(cls, binding: SpecBinding) -> Self: ... 

88 

89 @classmethod 

90 def from_pub(cls, binding: SpecBinding | None) -> Self | None: 

91 if binding is None: 91 ↛ 92line 91 didn't jump to line 92 because the condition on line 91 was never true

92 return None 

93 

94 if binding.amqp and (amqp := amqp_bindings.ChannelBinding.from_pub(binding.amqp)): 

95 return cls(amqp=amqp) 

96 

97 if binding.kafka and ( 

98 kafka := kafka_bindings.ChannelBinding.from_pub(binding.kafka) 

99 ): 

100 return cls(kafka=kafka) 

101 

102 if binding.mqtt and (mqtt := mqtt_bindings.ChannelBinding.from_pub(binding.mqtt)): 

103 return cls(mqtt=mqtt) 

104 

105 if binding.nats and (nats := nats_bindings.ChannelBinding.from_pub(binding.nats)): 

106 return cls(nats=nats) 

107 

108 if binding.redis and ( 108 ↛ 113line 108 didn't jump to line 113 because the condition on line 108 was always true

109 redis := redis_bindings.ChannelBinding.from_pub(binding.redis) 

110 ): 

111 return cls(redis=redis) 

112 

113 if binding.sqs and (sqs := sqs_bindings.ChannelBinding.from_pub(binding.sqs)): 

114 return cls(sqs=sqs) 

115 

116 return None