Coverage for faststream / specification / asyncapi / v2_6_0 / schema / bindings / sqs / channel.py: 78%

9 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1"""AsyncAPI SQS bindings. 

2 

3References: https://github.com/asyncapi/bindings/tree/master/sqs 

4""" 

5 

6from typing import Any 

7 

8from pydantic import BaseModel 

9from typing_extensions import Self 

10 

11from faststream.specification.schema.bindings import sqs 

12 

13 

14class ChannelBinding(BaseModel): 

15 """A class to represent channel binding. 

16 

17 Attributes: 

18 queue : a dictionary representing the queue 

19 bindingVersion : a string representing the binding version (default: "custom") 

20 """ 

21 

22 queue: dict[str, Any] 

23 bindingVersion: str = "custom" 

24 

25 @classmethod 

26 def from_pub(cls, binding: sqs.ChannelBinding) -> Self: 

27 return cls( 

28 queue=binding.queue, 

29 bindingVersion=binding.bindingVersion, 

30 ) 

31 

32 @classmethod 

33 def from_sub(cls, binding: sqs.ChannelBinding) -> Self: 

34 return cls( 

35 queue=binding.queue, 

36 bindingVersion=binding.bindingVersion, 

37 )