Coverage for faststream / redis / subscriber / usecases / basic.py: 80%
50 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import logging
2from abc import abstractmethod
3from collections.abc import Sequence
4from contextlib import suppress
5from typing import TYPE_CHECKING, Any, Optional, TypeAlias
7import anyio
8from typing_extensions import override
10from faststream._internal.endpoint.subscriber import (
11 SubscriberSpecification,
12 SubscriberUsecase,
13)
14from faststream._internal.endpoint.subscriber.mixins import ConcurrentMixin, TasksMixin
15from faststream.redis.message import (
16 UnifyRedisDict,
17)
18from faststream.redis.publisher.fake import RedisFakePublisher
20if TYPE_CHECKING:
21 from redis.asyncio.client import Redis
23 from faststream._internal.endpoint.publisher import PublisherProto
24 from faststream._internal.endpoint.subscriber.call_item import (
25 CallsCollection,
26 )
27 from faststream.message import StreamMessage as BrokerStreamMessage
28 from faststream.redis.configs import RedisBrokerConfig
29 from faststream.redis.subscriber.config import RedisSubscriberConfig
32TopicName: TypeAlias = bytes
33Offset: TypeAlias = bytes
36class LogicSubscriber(TasksMixin, SubscriberUsecase[UnifyRedisDict]):
37 """A class to represent a Redis handler."""
39 _outer_config: "RedisBrokerConfig"
41 def __init__(
42 self,
43 config: "RedisSubscriberConfig",
44 specification: "SubscriberSpecification[Any, Any]",
45 calls: "CallsCollection[Any]",
46 ) -> None:
47 super().__init__(config, specification, calls)
48 self.config = config
50 @property
51 def _client(self) -> "Redis[bytes]":
52 return self._outer_config.connection.client
54 def _make_response_publisher(
55 self,
56 message: "BrokerStreamMessage[UnifyRedisDict]",
57 ) -> Sequence["PublisherProto"]:
58 return (
59 RedisFakePublisher(
60 self._outer_config.producer,
61 channel=message.reply_to,
62 message_format=self.config.message_format,
63 ),
64 )
66 @override
67 async def start(
68 self,
69 *args: Any,
70 ) -> None:
71 await super().start()
73 self._post_start()
75 start_signal = anyio.Event()
77 if self.calls:
78 self.add_task(self._consume, args, {"start_signal": start_signal})
80 with anyio.fail_after(3.0):
81 await start_signal.wait()
83 else:
84 start_signal.set()
86 async def _consume(self, *args: Any, start_signal: anyio.Event) -> None:
87 connected = True
89 while self.running:
90 try:
91 await self._get_msgs(*args)
93 except Exception as e: # noqa: PERF203
94 self._log(
95 log_level=logging.ERROR,
96 message="Message fetch error",
97 exc_info=e,
98 )
100 if connected:
101 connected = False
103 await anyio.sleep(5)
105 else:
106 if not connected: 106 ↛ 107line 106 didn't jump to line 107 because the condition on line 106 was never true
107 connected = True
109 finally:
110 if not start_signal.is_set():
111 with suppress(Exception):
112 start_signal.set()
114 @abstractmethod
115 async def _get_msgs(self, *args: Any) -> None:
116 raise NotImplementedError
118 @staticmethod
119 def build_log_context(
120 message: Optional["BrokerStreamMessage[Any]"],
121 channel: str = "",
122 ) -> dict[str, str]:
123 return {
124 "channel": channel,
125 "message_id": getattr(message, "message_id", ""),
126 }
128 async def consume_one(self, msg: Any) -> None:
129 await self.consume(msg)
132class ConcurrentSubscriber(
133 ConcurrentMixin["BrokerStreamMessage[Any]"],
134 LogicSubscriber,
135):
136 def __init__(
137 self,
138 config: "RedisSubscriberConfig",
139 specification: "SubscriberSpecification[Any, Any]",
140 calls: "CallsCollection[Any]",
141 max_workers: int,
142 ) -> None:
143 super().__init__(config, specification, calls, max_workers=max_workers)
145 async def start(self) -> None:
146 await super().start()
147 self.start_consume_task()
149 async def consume_one(self, msg: "BrokerStreamMessage[Any]") -> None:
150 await self._put_msg(msg)