Coverage for faststream / specification / asyncapi / v3_0_0 / schema / bindings / main / channel.py: 75%

45 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from pydantic import BaseModel 

2from typing_extensions import Self 

3 

4from faststream._internal._compat import PYDANTIC_V2 

5from faststream.specification.asyncapi.v3_0_0.schema.bindings import ( 

6 amqp as amqp_bindings, 

7 kafka as kafka_bindings, 

8 mqtt as mqtt_bindings, 

9 nats as nats_bindings, 

10 redis as redis_bindings, 

11 sqs as sqs_bindings, 

12) 

13from faststream.specification.schema.bindings import ChannelBinding as SpecBinding 

14 

15 

16class ChannelBinding(BaseModel): 

17 """A class to represent channel bindings. 

18 

19 Attributes: 

20 amqp : AMQP channel binding (optional) 

21 kafka : Kafka channel binding (optional) 

22 sqs : SQS channel binding (optional) 

23 nats : NATS channel binding (optional) 

24 redis : Redis channel binding (optional) 

25 """ 

26 

27 amqp: amqp_bindings.ChannelBinding | None = None 

28 kafka: kafka_bindings.ChannelBinding | None = None 

29 mqtt: mqtt_bindings.ChannelBinding | None = None 

30 sqs: sqs_bindings.ChannelBinding | None = None 

31 nats: nats_bindings.ChannelBinding | None = None 

32 redis: redis_bindings.ChannelBinding | None = None 

33 

34 if PYDANTIC_V2: 34 ↛ 39line 34 didn't jump to line 39 because the condition on line 34 was always true

35 model_config = {"extra": "allow"} 

36 

37 else: 

38 

39 class Config: 

40 extra = "allow" 

41 

42 @classmethod 

43 def from_sub(cls, binding: SpecBinding | None) -> Self | None: 

44 if binding is None: 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true

45 return None 

46 

47 if binding.amqp and (amqp := amqp_bindings.ChannelBinding.from_sub(binding.amqp)): 

48 return cls(amqp=amqp) 

49 

50 if binding.kafka and ( 

51 kafka := kafka_bindings.ChannelBinding.from_sub(binding.kafka) 

52 ): 

53 return cls(kafka=kafka) 

54 

55 if binding.mqtt and (mqtt := mqtt_bindings.ChannelBinding.from_sub(binding.mqtt)): 

56 return cls(mqtt=mqtt) 

57 

58 if binding.nats and (nats := nats_bindings.ChannelBinding.from_sub(binding.nats)): 

59 return cls(nats=nats) 

60 

61 if binding.redis and ( 61 ↛ 66line 61 didn't jump to line 66 because the condition on line 61 was always true

62 redis := redis_bindings.ChannelBinding.from_sub(binding.redis) 

63 ): 

64 return cls(redis=redis) 

65 

66 if binding.sqs and (sqs := sqs_bindings.ChannelBinding.from_sub(binding.sqs)): 

67 return cls(sqs=sqs) 

68 

69 return None 

70 

71 @classmethod 

72 def from_pub(cls, binding: SpecBinding | None) -> Self | None: 

73 if binding is None: 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true

74 return None 

75 

76 if binding.amqp and (amqp := amqp_bindings.ChannelBinding.from_pub(binding.amqp)): 

77 return cls(amqp=amqp) 

78 

79 if binding.kafka and ( 

80 kafka := kafka_bindings.ChannelBinding.from_pub(binding.kafka) 

81 ): 

82 return cls(kafka=kafka) 

83 

84 if binding.mqtt and (mqtt := mqtt_bindings.ChannelBinding.from_pub(binding.mqtt)): 

85 return cls(mqtt=mqtt) 

86 

87 if binding.nats and (nats := nats_bindings.ChannelBinding.from_pub(binding.nats)): 

88 return cls(nats=nats) 

89 

90 if binding.redis and ( 90 ↛ 95line 90 didn't jump to line 95 because the condition on line 90 was always true

91 redis := redis_bindings.ChannelBinding.from_pub(binding.redis) 

92 ): 

93 return cls(redis=redis) 

94 

95 if binding.sqs and (sqs := sqs_bindings.ChannelBinding.from_pub(binding.sqs)): 

96 return cls(sqs=sqs) 

97 

98 return None