Coverage for faststream / specification / asyncapi / v2_6_0 / schema / bindings / main / operation.py: 54%

46 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import overload 

2 

3from pydantic import BaseModel 

4from typing_extensions import Self 

5 

6from faststream._internal._compat import PYDANTIC_V2 

7from faststream.specification.asyncapi.v2_6_0.schema.bindings import ( 

8 amqp as amqp_bindings, 

9 http as http_bindings, 

10 kafka as kafka_bindings, 

11 mqtt as mqtt_bindings, 

12 nats as nats_bindings, 

13 redis as redis_bindings, 

14 sqs as sqs_bindings, 

15) 

16from faststream.specification.schema.bindings import OperationBinding as SpecBinding 

17 

18 

19class OperationBinding(BaseModel): 

20 """A class to represent an operation binding. 

21 

22 Attributes: 

23 amqp : AMQP operation binding (optional) 

24 kafka : Kafka operation binding (optional) 

25 sqs : SQS operation binding (optional) 

26 nats : NATS operation binding (optional) 

27 redis : Redis operation binding (optional) 

28 http : HTTP channel binding (optional) 

29 """ 

30 

31 amqp: amqp_bindings.OperationBinding | None = None 

32 kafka: kafka_bindings.OperationBinding | None = None 

33 mqtt: mqtt_bindings.OperationBinding | None = None 

34 sqs: sqs_bindings.OperationBinding | None = None 

35 nats: nats_bindings.OperationBinding | None = None 

36 redis: redis_bindings.OperationBinding | None = None 

37 http: http_bindings.OperationBinding | None = None 

38 

39 if PYDANTIC_V2: 39 ↛ 44line 39 didn't jump to line 44 because the condition on line 39 was always true

40 model_config = {"extra": "allow"} 

41 

42 else: 

43 

44 class Config: 

45 extra = "allow" 

46 

47 @overload 

48 @classmethod 

49 def from_sub(cls, binding: None) -> None: ... 

50 

51 @overload 

52 @classmethod 

53 def from_sub(cls, binding: SpecBinding) -> Self: ... 

54 

55 @classmethod 

56 def from_sub(cls, binding: SpecBinding | None) -> Self | None: 

57 if binding is None: 

58 return None 

59 

60 if binding.amqp and ( 

61 amqp := amqp_bindings.OperationBinding.from_sub(binding.amqp) 

62 ): 

63 return cls(amqp=amqp) 

64 

65 if binding.kafka and ( 65 ↛ 68line 65 didn't jump to line 68 because the condition on line 65 was never true

66 kafka := kafka_bindings.OperationBinding.from_sub(binding.kafka) 

67 ): 

68 return cls(kafka=kafka) 

69 

70 if binding.mqtt and ( 70 ↛ 75line 70 didn't jump to line 75 because the condition on line 70 was always true

71 mqtt := mqtt_bindings.OperationBinding.from_sub(binding.mqtt) 

72 ): 

73 return cls(mqtt=mqtt) 

74 

75 if binding.nats and ( 

76 nats := nats_bindings.OperationBinding.from_sub(binding.nats) 

77 ): 

78 return cls(nats=nats) 

79 

80 if binding.redis and ( 

81 redis := redis_bindings.OperationBinding.from_sub(binding.redis) 

82 ): 

83 return cls(redis=redis) 

84 

85 if binding.sqs and (sqs := sqs_bindings.OperationBinding.from_sub(binding.sqs)): 

86 return cls(sqs=sqs) 

87 

88 return None 

89 

90 @overload 

91 @classmethod 

92 def from_pub(cls, binding: None) -> None: ... 

93 

94 @overload 

95 @classmethod 

96 def from_pub(cls, binding: SpecBinding) -> Self: ... 

97 

98 @classmethod 

99 def from_pub(cls, binding: SpecBinding | None) -> Self | None: 

100 if binding is None: 

101 return None 

102 

103 if binding.amqp and ( 

104 amqp := amqp_bindings.OperationBinding.from_pub(binding.amqp) 

105 ): 

106 return cls(amqp=amqp) 

107 

108 if binding.kafka and ( 108 ↛ 111line 108 didn't jump to line 111 because the condition on line 108 was never true

109 kafka := kafka_bindings.OperationBinding.from_pub(binding.kafka) 

110 ): 

111 return cls(kafka=kafka) 

112 

113 if binding.mqtt and ( 113 ↛ 118line 113 didn't jump to line 118 because the condition on line 113 was always true

114 mqtt := mqtt_bindings.OperationBinding.from_pub(binding.mqtt) 

115 ): 

116 return cls(mqtt=mqtt) 

117 

118 if binding.nats and ( 

119 nats := nats_bindings.OperationBinding.from_pub(binding.nats) 

120 ): 

121 return cls(nats=nats) 

122 

123 if binding.redis and ( 

124 redis := redis_bindings.OperationBinding.from_pub(binding.redis) 

125 ): 

126 return cls(redis=redis) 

127 

128 if binding.sqs and (sqs := sqs_bindings.OperationBinding.from_pub(binding.sqs)): 

129 return cls(sqs=sqs) 

130 

131 return None