Coverage for faststream / specification / asyncapi / v2_6_0 / schema / bindings / nats / channel.py: 78%

14 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1"""AsyncAPI NATS bindings. 

2 

3References: https://github.com/asyncapi/bindings/tree/master/nats 

4""" 

5 

6from pydantic import BaseModel 

7from typing_extensions import Self 

8 

9from faststream.specification.schema.bindings import nats 

10 

11 

12class ChannelBinding(BaseModel): 

13 """A class to represent channel binding. 

14 

15 Attributes: 

16 subject : subject of the channel binding 

17 queue : optional queue for the channel binding 

18 bindingVersion : version of the channel binding, default is "custom" 

19 """ 

20 

21 subject: str 

22 queue: str | None = None 

23 bindingVersion: str = "custom" 

24 

25 @classmethod 

26 def from_sub(cls, binding: nats.ChannelBinding | None) -> Self | None: 

27 if binding is None: 27 ↛ 28line 27 didn't jump to line 28 because the condition on line 27 was never true

28 return None 

29 

30 return cls( 

31 subject=binding.subject, 

32 queue=binding.queue, 

33 bindingVersion="custom", 

34 ) 

35 

36 @classmethod 

37 def from_pub(cls, binding: nats.ChannelBinding | None) -> Self | None: 

38 if binding is None: 38 ↛ 39line 38 didn't jump to line 39 because the condition on line 38 was never true

39 return None 

40 

41 return cls( 

42 subject=binding.subject, 

43 queue=binding.queue, 

44 bindingVersion="custom", 

45 )