Coverage for faststream / redis / publisher / usecase.py: 92%
106 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from abc import abstractmethod
2from collections.abc import Iterable
3from typing import TYPE_CHECKING, Any, Optional, Union
5from typing_extensions import override
7from faststream._internal.endpoint.publisher import (
8 PublisherSpecification,
9 PublisherUsecase,
10)
11from faststream.message import gen_cor_id
12from faststream.redis.response import RedisPublishCommand
13from faststream.response.publish_type import PublishType
15from .producer import RedisFastProducer
17if TYPE_CHECKING:
18 from redis.asyncio.client import Pipeline
20 from faststream._internal.basic_types import SendableMessage
21 from faststream._internal.types import PublisherMiddleware
22 from faststream.redis.message import RedisChannelMessage
23 from faststream.redis.schemas import ListSub, PubSub, StreamSub
24 from faststream.response import PublishCommand
26 from .config import RedisPublisherConfig
29class LogicPublisher(PublisherUsecase):
30 """A class to represent a Redis publisher."""
32 def __init__(
33 self,
34 config: "RedisPublisherConfig",
35 specification: "PublisherSpecification[Any, Any]",
36 ) -> None:
37 super().__init__(config, specification)
39 self.config = config
41 self.reply_to = config.reply_to
42 self.headers = config.headers or {}
44 self.producer = self.config._outer_config.producer
46 async def start(self) -> None:
47 await super().start()
49 broker_producer = self.config._outer_config.producer
51 self.producer = RedisFastProducer(
52 connection=self.config._outer_config.connection,
53 parser=broker_producer._parser.custom_func,
54 decoder=broker_producer._decoder.custom_func,
55 message_format=self.config.message_format,
56 serializer=self.config._outer_config.fd_config._serializer,
57 )
59 @abstractmethod
60 def subscriber_property(self, *, name_only: bool) -> dict[str, Any]:
61 raise NotImplementedError
64class ChannelPublisher(LogicPublisher):
65 def __init__(
66 self,
67 config: "RedisPublisherConfig",
68 specification: "PublisherSpecification[Any, Any]",
69 *,
70 channel: "PubSub",
71 ) -> None:
72 super().__init__(config, specification)
74 self._channel = channel
76 @property
77 def channel(self) -> "PubSub":
78 return self._channel.add_prefix(self._outer_config.prefix)
80 @override
81 def subscriber_property(self, *, name_only: bool) -> dict[str, Any]:
82 return {
83 "channel": self.channel.name if name_only else self.channel,
84 "list": None,
85 "stream": None,
86 }
88 @override
89 async def publish(
90 self,
91 message: "SendableMessage" = None,
92 channel: str | None = None,
93 reply_to: str = "",
94 headers: dict[str, Any] | None = None,
95 correlation_id: str | None = None,
96 *,
97 pipeline: Optional["Pipeline[bytes]"] = None,
98 ) -> int:
99 cmd = RedisPublishCommand(
100 message,
101 channel=channel or self.channel.name,
102 reply_to=reply_to or self.reply_to,
103 headers=self.headers | (headers or {}),
104 correlation_id=correlation_id or gen_cor_id(),
105 pipeline=pipeline,
106 _publish_type=PublishType.PUBLISH,
107 message_format=self.config.message_format,
108 )
109 result: int = await self._basic_publish(
110 cmd,
111 producer=self.producer,
112 _extra_middlewares=(),
113 )
114 return result
116 @override
117 async def _publish(
118 self,
119 cmd: Union["PublishCommand", "RedisPublishCommand"],
120 *,
121 _extra_middlewares: Iterable["PublisherMiddleware"],
122 ) -> None:
123 """This method should be called in subscriber flow only."""
124 cmd = RedisPublishCommand.from_cmd(cmd, message_format=self.config.message_format)
126 cmd.set_destination(channel=self.channel.name)
128 cmd.add_headers(self.headers, override=False)
129 cmd.reply_to = cmd.reply_to or self.reply_to
131 await self._basic_publish(
132 cmd,
133 producer=self.producer,
134 _extra_middlewares=_extra_middlewares,
135 )
137 @override
138 async def request(
139 self,
140 message: "SendableMessage" = None,
141 channel: str | None = None,
142 *,
143 correlation_id: str | None = None,
144 headers: dict[str, Any] | None = None,
145 timeout: float | None = 30.0,
146 ) -> "RedisChannelMessage":
147 cmd = RedisPublishCommand(
148 message,
149 channel=channel or self.channel.name,
150 headers=self.headers | (headers or {}),
151 correlation_id=correlation_id or gen_cor_id(),
152 timeout=timeout,
153 _publish_type=PublishType.REQUEST,
154 message_format=self.config.message_format,
155 )
157 msg: RedisChannelMessage = await self._basic_request(
158 cmd,
159 producer=self.producer,
160 )
161 return msg
164class ListPublisher(LogicPublisher):
165 def __init__(
166 self,
167 config: "RedisPublisherConfig",
168 specification: "PublisherSpecification[Any, Any]",
169 *,
170 list: "ListSub",
171 ) -> None:
172 super().__init__(config, specification)
174 self._list = list
176 @property
177 def list(self) -> "ListSub":
178 return self._list.add_prefix(self._outer_config.prefix)
180 @override
181 def subscriber_property(self, *, name_only: bool) -> dict[str, Any]:
182 return {
183 "channel": None,
184 "list": self.list.name if name_only else self.list,
185 "stream": None,
186 }
188 @override
189 async def publish(
190 self,
191 message: "SendableMessage" = None,
192 list: str | None = None,
193 reply_to: str = "",
194 headers: dict[str, Any] | None = None,
195 correlation_id: str | None = None,
196 *,
197 pipeline: Optional["Pipeline[bytes]"] = None,
198 ) -> int:
199 cmd = RedisPublishCommand(
200 message,
201 list=list or self.list.name,
202 reply_to=reply_to or self.reply_to,
203 headers=self.headers | (headers or {}),
204 correlation_id=correlation_id or gen_cor_id(),
205 pipeline=pipeline,
206 _publish_type=PublishType.PUBLISH,
207 message_format=self.config.message_format,
208 )
210 result: int = await self._basic_publish(
211 cmd,
212 producer=self.producer,
213 _extra_middlewares=(),
214 )
215 return result
217 @override
218 async def _publish(
219 self,
220 cmd: Union["PublishCommand", "RedisPublishCommand"],
221 *,
222 _extra_middlewares: Iterable["PublisherMiddleware"],
223 ) -> None:
224 """This method should be called in subscriber flow only."""
225 cmd = RedisPublishCommand.from_cmd(cmd, message_format=self.config.message_format)
227 cmd.set_destination(list=self.list.name)
229 cmd.add_headers(self.headers, override=False)
230 cmd.reply_to = cmd.reply_to or self.reply_to
232 await self._basic_publish(
233 cmd,
234 producer=self.producer,
235 _extra_middlewares=_extra_middlewares,
236 )
238 @override
239 async def request(
240 self,
241 message: "SendableMessage" = None,
242 list: str | None = None,
243 *,
244 correlation_id: str | None = None,
245 headers: dict[str, Any] | None = None,
246 timeout: float | None = 30.0,
247 ) -> "RedisChannelMessage":
248 cmd = RedisPublishCommand(
249 message,
250 list=list or self.list.name,
251 headers=self.headers | (headers or {}),
252 correlation_id=correlation_id or gen_cor_id(),
253 timeout=timeout,
254 _publish_type=PublishType.REQUEST,
255 message_format=self.config.message_format,
256 )
258 msg: RedisChannelMessage = await self._basic_request(
259 cmd,
260 producer=self.producer,
261 )
262 return msg
265class ListBatchPublisher(ListPublisher):
266 @override
267 async def publish( # type: ignore[override]
268 self,
269 *messages: "SendableMessage",
270 list: str,
271 correlation_id: str | None = None,
272 reply_to: str = "",
273 headers: dict[str, Any] | None = None,
274 pipeline: Optional["Pipeline[bytes]"] = None,
275 ) -> int:
276 cmd = RedisPublishCommand(
277 *messages,
278 list=list or self.list.name,
279 reply_to=reply_to or self.reply_to,
280 headers=self.headers | (headers or {}),
281 correlation_id=correlation_id or gen_cor_id(),
282 pipeline=pipeline,
283 _publish_type=PublishType.PUBLISH,
284 message_format=self.config.message_format,
285 )
287 result: int = await self._basic_publish_batch(
288 cmd,
289 producer=self.producer,
290 _extra_middlewares=(),
291 )
292 return result
294 @override
295 async def _publish(
296 self,
297 cmd: Union["PublishCommand", "RedisPublishCommand"],
298 *,
299 _extra_middlewares: Iterable["PublisherMiddleware"],
300 ) -> None:
301 """This method should be called in subscriber flow only."""
302 cmd = RedisPublishCommand.from_cmd(
303 cmd, batch=True, message_format=self.config.message_format
304 )
306 cmd.set_destination(list=self.list.name)
308 cmd.add_headers(self.headers, override=False)
309 cmd.reply_to = cmd.reply_to or self.reply_to
311 await self._basic_publish_batch(
312 cmd,
313 producer=self.producer,
314 _extra_middlewares=_extra_middlewares,
315 )
318class StreamPublisher(LogicPublisher):
319 def __init__(
320 self,
321 config: "RedisPublisherConfig",
322 specification: "PublisherSpecification[Any, Any]",
323 *,
324 stream: "StreamSub",
325 ) -> None:
326 super().__init__(config, specification)
327 self._stream = stream
329 @property
330 def stream(self) -> "StreamSub":
331 return self._stream.add_prefix(self._outer_config.prefix)
333 @override
334 def subscriber_property(self, *, name_only: bool) -> dict[str, Any]:
335 return {
336 "channel": None,
337 "list": None,
338 "stream": self.stream.name if name_only else self.stream,
339 }
341 @override
342 async def publish(
343 self,
344 message: "SendableMessage" = None,
345 stream: str | None = None,
346 reply_to: str = "",
347 headers: dict[str, Any] | None = None,
348 correlation_id: str | None = None,
349 *,
350 maxlen: int | None = None,
351 pipeline: Optional["Pipeline[bytes]"] = None,
352 ) -> bytes:
353 cmd = RedisPublishCommand(
354 message,
355 stream=stream or self.stream.name,
356 reply_to=reply_to or self.reply_to,
357 headers=self.headers | (headers or {}),
358 correlation_id=correlation_id or gen_cor_id(),
359 maxlen=maxlen or self.stream.maxlen,
360 pipeline=pipeline,
361 _publish_type=PublishType.PUBLISH,
362 message_format=self.config.message_format,
363 )
365 result: bytes = await self._basic_publish(
366 cmd,
367 producer=self.producer,
368 _extra_middlewares=(),
369 )
370 return result
372 @override
373 async def _publish(
374 self,
375 cmd: Union["PublishCommand", "RedisPublishCommand"],
376 *,
377 _extra_middlewares: Iterable["PublisherMiddleware"],
378 ) -> None:
379 """This method should be called in subscriber flow only."""
380 cmd = RedisPublishCommand.from_cmd(cmd, message_format=self.config.message_format)
382 cmd.set_destination(stream=self.stream.name)
384 cmd.add_headers(self.headers, override=False)
385 cmd.reply_to = cmd.reply_to or self.reply_to
386 cmd.maxlen = self.stream.maxlen
388 await self._basic_publish(
389 cmd,
390 producer=self.producer,
391 _extra_middlewares=_extra_middlewares,
392 )
394 @override
395 async def request(
396 self,
397 message: "SendableMessage" = None,
398 stream: str | None = None,
399 *,
400 maxlen: int | None = None,
401 correlation_id: str | None = None,
402 headers: dict[str, Any] | None = None,
403 timeout: float | None = 30.0,
404 ) -> "RedisChannelMessage":
405 cmd = RedisPublishCommand(
406 message,
407 stream=stream or self.stream.name,
408 headers=self.headers | (headers or {}),
409 correlation_id=correlation_id or gen_cor_id(),
410 maxlen=maxlen or self.stream.maxlen,
411 timeout=timeout,
412 _publish_type=PublishType.REQUEST,
413 message_format=self.config.message_format,
414 )
416 msg: RedisChannelMessage = await self._basic_request(
417 cmd,
418 producer=self.producer,
419 )
420 return msg