Coverage for faststream / specification / asyncapi / v2_6_0 / schema / bindings / nats / operation.py: 41%

13 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1"""AsyncAPI NATS bindings. 

2 

3References: https://github.com/asyncapi/bindings/tree/master/nats 

4""" 

5 

6from typing import Any 

7 

8from pydantic import BaseModel 

9from typing_extensions import Self 

10 

11from faststream.specification.schema.bindings import nats 

12 

13 

14class OperationBinding(BaseModel): 

15 """A class to represent an operation binding. 

16 

17 Attributes: 

18 replyTo : optional dictionary containing reply information 

19 bindingVersion : version of the binding (default is "custom") 

20 """ 

21 

22 replyTo: dict[str, Any] | None = None 

23 bindingVersion: str = "custom" 

24 

25 @classmethod 

26 def from_sub(cls, binding: nats.OperationBinding | None) -> Self | None: 

27 if not binding: 

28 return None 

29 

30 return cls( 

31 replyTo=binding.reply_to, 

32 ) 

33 

34 @classmethod 

35 def from_pub(cls, binding: nats.OperationBinding | None) -> Self | None: 

36 if not binding: 

37 return None 

38 

39 return cls( 

40 replyTo=binding.reply_to, 

41 )