Coverage for faststream / redis / publisher / specification.py: 93%
51 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from faststream._internal.endpoint.publisher import PublisherSpecification
2from faststream.redis.configs import RedisBrokerConfig
3from faststream.redis.schemas import ListSub, PubSub, StreamSub
4from faststream.specification.asyncapi.utils import resolve_payloads
5from faststream.specification.schema import Message, Operation, PublisherSpec
6from faststream.specification.schema.bindings import ChannelBinding, redis
8from .config import RedisPublisherSpecificationConfig
11class RedisPublisherSpecification(
12 PublisherSpecification[RedisBrokerConfig, RedisPublisherSpecificationConfig],
13):
14 def get_schema(self) -> dict[str, PublisherSpec]:
15 payloads = self.get_payloads()
17 return {
18 self.name: PublisherSpec(
19 description=self.config.description_,
20 operation=Operation(
21 message=Message(
22 title=f"{self.name}:Message",
23 payload=resolve_payloads(payloads, "Publisher"),
24 ),
25 bindings=None,
26 ),
27 bindings=ChannelBinding(
28 redis=self.channel_binding,
29 ),
30 ),
31 }
33 @property
34 def channel_binding(self) -> redis.ChannelBinding:
35 raise NotImplementedError
38class ChannelPublisherSpecification(RedisPublisherSpecification):
39 def __init__(
40 self,
41 _outer_config: RedisBrokerConfig,
42 specification_config: RedisPublisherSpecificationConfig,
43 channel: PubSub,
44 ) -> None:
45 super().__init__(_outer_config, specification_config)
46 self.channel = channel
48 @property
49 def name(self) -> str:
50 if self.config.title_:
51 return self.config.title_
53 return f"{self.channel_name}:Publisher"
55 @property
56 def channel_name(self) -> str:
57 return f"{self._outer_config.prefix}{self.channel.name}"
59 @property
60 def channel_binding(self) -> redis.ChannelBinding:
61 return redis.ChannelBinding(
62 channel=self.channel_name,
63 method="publish",
64 )
67class ListPublisherSpecification(RedisPublisherSpecification):
68 def __init__(
69 self,
70 _outer_config: RedisBrokerConfig,
71 specification_config: RedisPublisherSpecificationConfig,
72 list_sub: ListSub,
73 ) -> None:
74 super().__init__(_outer_config, specification_config)
75 self.list_sub = list_sub
77 @property
78 def name(self) -> str:
79 if self.config.title_: 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true
80 return self.config.title_
82 return f"{self.list_name}:Publisher"
84 @property
85 def list_name(self) -> str:
86 return f"{self._outer_config.prefix}{self.list_sub.name}"
88 @property
89 def channel_binding(self) -> redis.ChannelBinding:
90 return redis.ChannelBinding(
91 channel=self.list_name,
92 method="rpush",
93 )
96class StreamPublisherSpecification(RedisPublisherSpecification):
97 def __init__(
98 self,
99 _outer_config: RedisBrokerConfig,
100 specification_config: RedisPublisherSpecificationConfig,
101 stream_sub: StreamSub,
102 ) -> None:
103 super().__init__(_outer_config, specification_config)
104 self.stream_sub = stream_sub
106 @property
107 def name(self) -> str:
108 if self.config.title_: 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true
109 return self.config.title_
111 return f"{self.stream_name}:Publisher"
113 @property
114 def stream_name(self) -> str:
115 return f"{self._outer_config.prefix}{self.stream_sub.name}"
117 @property
118 def channel_binding(self) -> "redis.ChannelBinding":
119 return redis.ChannelBinding(
120 channel=self.stream_name,
121 method="xadd",
122 )