Coverage for faststream / specification / asyncapi / v2_6_0 / schema / bindings / redis / channel.py: 80%
16 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1"""AsyncAPI Redis bindings.
3References: https://github.com/asyncapi/bindings/tree/master/redis
4"""
6from pydantic import BaseModel
7from typing_extensions import Self
9from faststream.specification.schema.bindings import redis
12class ChannelBinding(BaseModel):
13 """A class to represent channel binding.
15 Attributes:
16 channel : the channel name
17 method : the method used for binding (ssubscribe, psubscribe, subscribe)
18 bindingVersion : the version of the binding
19 """
21 channel: str
22 method: str | None = None
23 groupName: str | None = None
24 consumerName: str | None = None
25 bindingVersion: str = "custom"
27 @classmethod
28 def from_sub(cls, binding: redis.ChannelBinding | None) -> Self | None:
29 if binding is None: 29 ↛ 30line 29 didn't jump to line 30 because the condition on line 29 was never true
30 return None
32 return cls(
33 channel=binding.channel,
34 method=binding.method,
35 groupName=binding.group_name,
36 consumerName=binding.consumer_name,
37 )
39 @classmethod
40 def from_pub(cls, binding: redis.ChannelBinding | None) -> Self | None:
41 if binding is None: 41 ↛ 42line 41 didn't jump to line 42 because the condition on line 41 was never true
42 return None
44 return cls(
45 channel=binding.channel,
46 method=binding.method,
47 groupName=binding.group_name,
48 consumerName=binding.consumer_name,
49 )