Coverage for faststream / specification / asyncapi / v3_0_0 / schema / bindings / amqp / channel.py: 75%

12 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing_extensions import Self 

2 

3from faststream.specification.asyncapi.v2_6_0.schema.bindings.amqp import ( 

4 ChannelBinding as V2Binding, 

5) 

6from faststream.specification.asyncapi.v2_6_0.schema.bindings.amqp.channel import ( 

7 Exchange, 

8 Queue, 

9) 

10from faststream.specification.schema.bindings import amqp 

11 

12 

13class ChannelBinding(V2Binding): 

14 bindingVersion: str = "0.3.0" 

15 

16 @classmethod 

17 def from_sub(cls, binding: amqp.ChannelBinding | None) -> Self | None: 

18 if binding is None: 18 ↛ 19line 18 didn't jump to line 19 because the condition on line 18 was never true

19 return None 

20 

21 return cls( 

22 **{ 

23 "is": "queue", 

24 "queue": Queue.from_spec(binding.queue, binding.virtual_host), 

25 }, 

26 ) 

27 

28 @classmethod 

29 def from_pub(cls, binding: amqp.ChannelBinding | None) -> Self | None: 

30 if binding is None: 30 ↛ 31line 30 didn't jump to line 31 because the condition on line 30 was never true

31 return None 

32 

33 return cls( 

34 **{ 

35 "is": "routingKey", 

36 "exchange": Exchange.from_spec(binding.exchange, binding.virtual_host), 

37 }, 

38 )