Coverage for faststream / specification / asyncapi / v2_6_0 / schema / bindings / nats / operation.py: 41%
13 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1"""AsyncAPI NATS bindings.
3References: https://github.com/asyncapi/bindings/tree/master/nats
4"""
6from typing import Any
8from pydantic import BaseModel
9from typing_extensions import Self
11from faststream.specification.schema.bindings import nats
14class OperationBinding(BaseModel):
15 """A class to represent an operation binding.
17 Attributes:
18 replyTo : optional dictionary containing reply information
19 bindingVersion : version of the binding (default is "custom")
20 """
22 replyTo: dict[str, Any] | None = None
23 bindingVersion: str = "custom"
25 @classmethod
26 def from_sub(cls, binding: nats.OperationBinding | None) -> Self | None:
27 if not binding:
28 return None
30 return cls(
31 replyTo=binding.reply_to,
32 )
34 @classmethod
35 def from_pub(cls, binding: nats.OperationBinding | None) -> Self | None:
36 if not binding:
37 return None
39 return cls(
40 replyTo=binding.reply_to,
41 )