Coverage for faststream / specification / asyncapi / v2_6_0 / schema / bindings / sqs / channel.py: 78%
9 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1"""AsyncAPI SQS bindings.
3References: https://github.com/asyncapi/bindings/tree/master/sqs
4"""
6from typing import Any
8from pydantic import BaseModel
9from typing_extensions import Self
11from faststream.specification.schema.bindings import sqs
14class ChannelBinding(BaseModel):
15 """A class to represent channel binding.
17 Attributes:
18 queue : a dictionary representing the queue
19 bindingVersion : a string representing the binding version (default: "custom")
20 """
22 queue: dict[str, Any]
23 bindingVersion: str = "custom"
25 @classmethod
26 def from_pub(cls, binding: sqs.ChannelBinding) -> Self:
27 return cls(
28 queue=binding.queue,
29 bindingVersion=binding.bindingVersion,
30 )
32 @classmethod
33 def from_sub(cls, binding: sqs.ChannelBinding) -> Self:
34 return cls(
35 queue=binding.queue,
36 bindingVersion=binding.bindingVersion,
37 )