Coverage for faststream / specification / asyncapi / v2_6_0 / schema / bindings / amqp / channel.py: 82%

36 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1"""AsyncAPI AMQP bindings. 

2 

3References: https://github.com/asyncapi/bindings/tree/master/amqp 

4""" 

5 

6from typing import Literal, overload 

7 

8from pydantic import BaseModel, Field 

9from typing_extensions import Self 

10 

11from faststream.specification.schema.bindings import amqp 

12 

13 

14class Queue(BaseModel): 

15 """A class to represent a queue. 

16 

17 Attributes: 

18 name : name of the queue 

19 durable : indicates if the queue is durable 

20 exclusive : indicates if the queue is exclusive 

21 autoDelete : indicates if the queue should be automatically deleted 

22 vhost : virtual host of the queue (default is "/") 

23 """ 

24 

25 name: str 

26 durable: bool 

27 exclusive: bool 

28 autoDelete: bool 

29 vhost: str = "/" 

30 

31 @overload 

32 @classmethod 

33 def from_spec(cls, binding: None, vhost: str) -> None: ... 

34 

35 @overload 

36 @classmethod 

37 def from_spec(cls, binding: amqp.Queue, vhost: str) -> Self: ... 

38 

39 @classmethod 

40 def from_spec(cls, binding: amqp.Queue | None, vhost: str) -> Self | None: 

41 if binding is None: 41 ↛ 42line 41 didn't jump to line 42 because the condition on line 41 was never true

42 return None 

43 

44 return cls( 

45 name=binding.name, 

46 durable=binding.durable, 

47 exclusive=binding.exclusive, 

48 autoDelete=binding.auto_delete, 

49 vhost=vhost, 

50 ) 

51 

52 

53class Exchange(BaseModel): 

54 """A class to represent an exchange. 

55 

56 Attributes: 

57 name : name of the exchange (optional) 

58 type : type of the exchange, can be one of "default", "direct", "topic", "fanout", "headers" 

59 durable : whether the exchange is durable (optional) 

60 autoDelete : whether the exchange is automatically deleted (optional) 

61 vhost : virtual host of the exchange, default is "/" 

62 """ 

63 

64 name: str | None = None 

65 type: Literal[ 

66 "default", 

67 "direct", 

68 "topic", 

69 "fanout", 

70 "headers", 

71 "x-delayed-message", 

72 "x-consistent-hash", 

73 "x-modulus-hash", 

74 ] 

75 durable: bool | None = None 

76 autoDelete: bool | None = None 

77 vhost: str = "/" 

78 

79 @overload 

80 @classmethod 

81 def from_spec(cls, binding: None, vhost: str) -> None: ... 

82 

83 @overload 

84 @classmethod 

85 def from_spec(cls, binding: amqp.Exchange, vhost: str) -> Self: ... 

86 

87 @classmethod 

88 def from_spec(cls, binding: amqp.Exchange | None, vhost: str) -> Self | None: 

89 if binding is None: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true

90 return None 

91 

92 return cls( 

93 name=binding.name, 

94 type=binding.type, 

95 durable=binding.durable, 

96 autoDelete=binding.auto_delete, 

97 vhost=vhost, 

98 ) 

99 

100 

101class ChannelBinding(BaseModel): 

102 """A class to represent channel binding. 

103 

104 Attributes: 

105 is_ : Type of binding, can be "queue" or "routingKey" 

106 bindingVersion : Version of the binding 

107 queue : Optional queue object 

108 exchange : Optional exchange object 

109 """ 

110 

111 is_: Literal["queue", "routingKey"] = Field(..., alias="is") 

112 bindingVersion: str = "0.2.0" 

113 queue: Queue | None = None 

114 exchange: Exchange | None = None 

115 

116 @classmethod 

117 def from_sub(cls, binding: amqp.ChannelBinding | None) -> Self | None: 

118 if binding is None: 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true

119 return None 

120 

121 return cls( 

122 **{ 

123 "is": "routingKey", 

124 "queue": Queue.from_spec(binding.queue, binding.virtual_host) 

125 if binding.exchange.is_respect_routing_key 

126 else None, 

127 "exchange": Exchange.from_spec(binding.exchange, binding.virtual_host), 

128 }, 

129 ) 

130 

131 @classmethod 

132 def from_pub(cls, binding: amqp.ChannelBinding | None) -> Self | None: 

133 if binding is None: 133 ↛ 134line 133 didn't jump to line 134 because the condition on line 133 was never true

134 return None 

135 

136 return cls( 

137 **{ 

138 "is": "routingKey", 

139 "queue": Queue.from_spec(binding.queue, binding.virtual_host) 

140 if binding.exchange.is_respect_routing_key and binding.queue.name 

141 else None, 

142 "exchange": Exchange.from_spec(binding.exchange, binding.virtual_host), 

143 }, 

144 )