Coverage for faststream / specification / asyncapi / v2_6_0 / schema / bindings / nats / channel.py: 78%
14 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1"""AsyncAPI NATS bindings.
3References: https://github.com/asyncapi/bindings/tree/master/nats
4"""
6from pydantic import BaseModel
7from typing_extensions import Self
9from faststream.specification.schema.bindings import nats
12class ChannelBinding(BaseModel):
13 """A class to represent channel binding.
15 Attributes:
16 subject : subject of the channel binding
17 queue : optional queue for the channel binding
18 bindingVersion : version of the channel binding, default is "custom"
19 """
21 subject: str
22 queue: str | None = None
23 bindingVersion: str = "custom"
25 @classmethod
26 def from_sub(cls, binding: nats.ChannelBinding | None) -> Self | None:
27 if binding is None: 27 ↛ 28line 27 didn't jump to line 28 because the condition on line 27 was never true
28 return None
30 return cls(
31 subject=binding.subject,
32 queue=binding.queue,
33 bindingVersion="custom",
34 )
36 @classmethod
37 def from_pub(cls, binding: nats.ChannelBinding | None) -> Self | None:
38 if binding is None: 38 ↛ 39line 38 didn't jump to line 39 because the condition on line 38 was never true
39 return None
41 return cls(
42 subject=binding.subject,
43 queue=binding.queue,
44 bindingVersion="custom",
45 )