Coverage for faststream / redis / subscriber / specification.py: 93%
51 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import TYPE_CHECKING, Any
3from faststream._internal.endpoint.subscriber import SubscriberSpecification
4from faststream.redis.configs import RedisBrokerConfig
5from faststream.redis.schemas import ListSub, PubSub, StreamSub
6from faststream.specification.asyncapi.utils import resolve_payloads
7from faststream.specification.schema import Message, Operation, SubscriberSpec
8from faststream.specification.schema.bindings import ChannelBinding, redis
10from .config import RedisSubscriberSpecificationConfig
12if TYPE_CHECKING:
13 from faststream._internal.endpoint.subscriber.call_item import (
14 CallsCollection,
15 )
18class RedisSubscriberSpecification(
19 SubscriberSpecification[RedisBrokerConfig, RedisSubscriberSpecificationConfig],
20):
21 def get_schema(self) -> dict[str, SubscriberSpec]:
22 payloads = self.get_payloads()
24 return {
25 self.name: SubscriberSpec(
26 description=self.description,
27 operation=Operation(
28 message=Message(
29 title=f"{self.name}:Message",
30 payload=resolve_payloads(payloads),
31 ),
32 bindings=None,
33 ),
34 bindings=ChannelBinding(
35 redis=self.channel_binding,
36 ),
37 ),
38 }
40 @property
41 def channel_binding(self) -> redis.ChannelBinding:
42 raise NotImplementedError
45class ChannelSubscriberSpecification(RedisSubscriberSpecification):
46 def __init__(
47 self,
48 _outer_config: "RedisBrokerConfig",
49 specification_config: "RedisSubscriberSpecificationConfig",
50 calls: "CallsCollection[Any]",
51 channel: PubSub,
52 ) -> None:
53 super().__init__(_outer_config, specification_config, calls)
54 self.channel = channel
56 @property
57 def name(self) -> str:
58 if self.config.title_:
59 return self.config.title_
61 return f"{self.channel_name}:{self.call_name}"
63 @property
64 def channel_name(self) -> str:
65 return f"{self._outer_config.prefix}{self.channel.name}"
67 @property
68 def channel_binding(self) -> "redis.ChannelBinding":
69 return redis.ChannelBinding(
70 channel=self.channel_name,
71 method="psubscribe" if self.channel.pattern else "subscribe",
72 )
75class ListSubscriberSpecification(RedisSubscriberSpecification):
76 def __init__(
77 self,
78 _outer_config: "RedisBrokerConfig",
79 specification_config: "RedisSubscriberSpecificationConfig",
80 calls: "CallsCollection[Any]",
81 list_sub: ListSub,
82 ) -> None:
83 super().__init__(_outer_config, specification_config, calls)
84 self.list_sub = list_sub
86 @property
87 def name(self) -> str:
88 if self.config.title_: 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true
89 return self.config.title_
91 return f"{self.list_name}:{self.call_name}"
93 @property
94 def list_name(self) -> str:
95 return f"{self._outer_config.prefix}{self.list_sub.name}"
97 @property
98 def channel_binding(self) -> "redis.ChannelBinding":
99 return redis.ChannelBinding(
100 channel=self.list_name,
101 method="lpop",
102 )
105class StreamSubscriberSpecification(RedisSubscriberSpecification):
106 def __init__(
107 self,
108 _outer_config: "RedisBrokerConfig",
109 specification_config: "RedisSubscriberSpecificationConfig",
110 calls: "CallsCollection[Any]",
111 stream_sub: StreamSub,
112 ) -> None:
113 super().__init__(_outer_config, specification_config, calls)
114 self.stream_sub = stream_sub
116 @property
117 def name(self) -> str:
118 if self.config.title_: 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true
119 return self.config.title_
121 return f"{self.stream_name}:{self.call_name}"
123 @property
124 def stream_name(self) -> str:
125 return f"{self._outer_config.prefix}{self.stream_sub.name}"
127 @property
128 def channel_binding(self) -> "redis.ChannelBinding":
129 return redis.ChannelBinding(
130 channel=self.stream_name,
131 group_name=self.stream_sub.group,
132 consumer_name=self.stream_sub.consumer,
133 method="xreadgroup" if self.stream_sub.group else "xread",
134 )