Coverage for faststream / specification / asyncapi / v3_0_0 / schema / bindings / main / operation.py: 54%

46 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from pydantic import BaseModel 

2from typing_extensions import Self 

3 

4from faststream._internal._compat import PYDANTIC_V2 

5from faststream.specification.asyncapi.v3_0_0.schema.bindings import ( 

6 amqp as amqp_bindings, 

7 http as http_bindings, 

8 kafka as kafka_bindings, 

9 mqtt as mqtt_bindings, 

10 nats as nats_bindings, 

11 redis as redis_bindings, 

12 sqs as sqs_bindings, 

13) 

14from faststream.specification.schema.bindings import OperationBinding as SpecBinding 

15 

16 

17class OperationBinding(BaseModel): 

18 """A class to represent an operation binding. 

19 

20 Attributes: 

21 amqp : AMQP operation binding (optional) 

22 kafka : Kafka operation binding (optional) 

23 sqs : SQS operation binding (optional) 

24 nats : NATS operation binding (optional) 

25 redis : Redis operation binding (optional) 

26 http : HTTP operation binding (optional) 

27 """ 

28 

29 amqp: amqp_bindings.OperationBinding | None = None 

30 kafka: kafka_bindings.OperationBinding | None = None 

31 mqtt: mqtt_bindings.OperationBinding | None = None 

32 sqs: sqs_bindings.OperationBinding | None = None 

33 nats: nats_bindings.OperationBinding | None = None 

34 redis: redis_bindings.OperationBinding | None = None 

35 http: http_bindings.OperationBinding | None = None 

36 

37 if PYDANTIC_V2: 37 ↛ 42line 37 didn't jump to line 42 because the condition on line 37 was always true

38 model_config = {"extra": "allow"} 

39 

40 else: 

41 

42 class Config: 

43 extra = "allow" 

44 

45 @classmethod 

46 def from_sub(cls, binding: SpecBinding | None) -> Self | None: 

47 if binding is None: 

48 return None 

49 

50 if binding.amqp and ( 

51 amqp := amqp_bindings.OperationBinding.from_sub(binding.amqp) 

52 ): 

53 return cls(amqp=amqp) 

54 

55 if binding.kafka and ( 55 ↛ 58line 55 didn't jump to line 58 because the condition on line 55 was never true

56 kafka := kafka_bindings.OperationBinding.from_sub(binding.kafka) 

57 ): 

58 return cls(kafka=kafka) 

59 

60 if binding.mqtt and ( 60 ↛ 65line 60 didn't jump to line 65 because the condition on line 60 was always true

61 mqtt := mqtt_bindings.OperationBinding.from_sub(binding.mqtt) 

62 ): 

63 return cls(mqtt=mqtt) 

64 

65 if binding.nats and ( 

66 nats := nats_bindings.OperationBinding.from_sub(binding.nats) 

67 ): 

68 return cls(nats=nats) 

69 

70 if binding.redis and ( 

71 redis := redis_bindings.OperationBinding.from_sub(binding.redis) 

72 ): 

73 return cls(redis=redis) 

74 

75 if binding.sqs and (sqs := sqs_bindings.OperationBinding.from_sub(binding.sqs)): 

76 return cls(sqs=sqs) 

77 

78 return None 

79 

80 @classmethod 

81 def from_pub(cls, binding: SpecBinding | None) -> Self | None: 

82 if binding is None: 

83 return None 

84 

85 if binding.amqp and ( 

86 amqp := amqp_bindings.OperationBinding.from_pub(binding.amqp) 

87 ): 

88 return cls(amqp=amqp) 

89 

90 if binding.kafka and ( 90 ↛ 93line 90 didn't jump to line 93 because the condition on line 90 was never true

91 kafka := kafka_bindings.OperationBinding.from_pub(binding.kafka) 

92 ): 

93 return cls(kafka=kafka) 

94 

95 if binding.mqtt and ( 95 ↛ 100line 95 didn't jump to line 100 because the condition on line 95 was always true

96 mqtt := mqtt_bindings.OperationBinding.from_pub(binding.mqtt) 

97 ): 

98 return cls(mqtt=mqtt) 

99 

100 if binding.nats and ( 

101 nats := nats_bindings.OperationBinding.from_pub(binding.nats) 

102 ): 

103 return cls(nats=nats) 

104 

105 if binding.redis and ( 

106 redis := redis_bindings.OperationBinding.from_pub(binding.redis) 

107 ): 

108 return cls(redis=redis) 

109 

110 if binding.sqs and (sqs := sqs_bindings.OperationBinding.from_pub(binding.sqs)): 

111 return cls(sqs=sqs) 

112 

113 return None