Coverage for faststream / redis / subscriber / usecases / basic.py: 80%

50 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import logging 

2from abc import abstractmethod 

3from collections.abc import Sequence 

4from contextlib import suppress 

5from typing import TYPE_CHECKING, Any, Optional, TypeAlias 

6 

7import anyio 

8from typing_extensions import override 

9 

10from faststream._internal.endpoint.subscriber import ( 

11 SubscriberSpecification, 

12 SubscriberUsecase, 

13) 

14from faststream._internal.endpoint.subscriber.mixins import ConcurrentMixin, TasksMixin 

15from faststream.redis.message import ( 

16 UnifyRedisDict, 

17) 

18from faststream.redis.publisher.fake import RedisFakePublisher 

19 

20if TYPE_CHECKING: 

21 from redis.asyncio.client import Redis 

22 

23 from faststream._internal.endpoint.publisher import PublisherProto 

24 from faststream._internal.endpoint.subscriber.call_item import ( 

25 CallsCollection, 

26 ) 

27 from faststream.message import StreamMessage as BrokerStreamMessage 

28 from faststream.redis.configs import RedisBrokerConfig 

29 from faststream.redis.subscriber.config import RedisSubscriberConfig 

30 

31 

32TopicName: TypeAlias = bytes 

33Offset: TypeAlias = bytes 

34 

35 

36class LogicSubscriber(TasksMixin, SubscriberUsecase[UnifyRedisDict]): 

37 """A class to represent a Redis handler.""" 

38 

39 _outer_config: "RedisBrokerConfig" 

40 

41 def __init__( 

42 self, 

43 config: "RedisSubscriberConfig", 

44 specification: "SubscriberSpecification[Any, Any]", 

45 calls: "CallsCollection[Any]", 

46 ) -> None: 

47 super().__init__(config, specification, calls) 

48 self.config = config 

49 

50 @property 

51 def _client(self) -> "Redis[bytes]": 

52 return self._outer_config.connection.client 

53 

54 def _make_response_publisher( 

55 self, 

56 message: "BrokerStreamMessage[UnifyRedisDict]", 

57 ) -> Sequence["PublisherProto"]: 

58 return ( 

59 RedisFakePublisher( 

60 self._outer_config.producer, 

61 channel=message.reply_to, 

62 message_format=self.config.message_format, 

63 ), 

64 ) 

65 

66 @override 

67 async def start( 

68 self, 

69 *args: Any, 

70 ) -> None: 

71 await super().start() 

72 

73 self._post_start() 

74 

75 start_signal = anyio.Event() 

76 

77 if self.calls: 

78 self.add_task(self._consume, args, {"start_signal": start_signal}) 

79 

80 with anyio.fail_after(3.0): 

81 await start_signal.wait() 

82 

83 else: 

84 start_signal.set() 

85 

86 async def _consume(self, *args: Any, start_signal: anyio.Event) -> None: 

87 connected = True 

88 

89 while self.running: 

90 try: 

91 await self._get_msgs(*args) 

92 

93 except Exception as e: # noqa: PERF203 

94 self._log( 

95 log_level=logging.ERROR, 

96 message="Message fetch error", 

97 exc_info=e, 

98 ) 

99 

100 if connected: 

101 connected = False 

102 

103 await anyio.sleep(5) 

104 

105 else: 

106 if not connected: 106 ↛ 107line 106 didn't jump to line 107 because the condition on line 106 was never true

107 connected = True 

108 

109 finally: 

110 if not start_signal.is_set(): 

111 with suppress(Exception): 

112 start_signal.set() 

113 

114 @abstractmethod 

115 async def _get_msgs(self, *args: Any) -> None: 

116 raise NotImplementedError 

117 

118 @staticmethod 

119 def build_log_context( 

120 message: Optional["BrokerStreamMessage[Any]"], 

121 channel: str = "", 

122 ) -> dict[str, str]: 

123 return { 

124 "channel": channel, 

125 "message_id": getattr(message, "message_id", ""), 

126 } 

127 

128 async def consume_one(self, msg: Any) -> None: 

129 await self.consume(msg) 

130 

131 

132class ConcurrentSubscriber( 

133 ConcurrentMixin["BrokerStreamMessage[Any]"], 

134 LogicSubscriber, 

135): 

136 def __init__( 

137 self, 

138 config: "RedisSubscriberConfig", 

139 specification: "SubscriberSpecification[Any, Any]", 

140 calls: "CallsCollection[Any]", 

141 max_workers: int, 

142 ) -> None: 

143 super().__init__(config, specification, calls, max_workers=max_workers) 

144 

145 async def start(self) -> None: 

146 await super().start() 

147 self.start_consume_task() 

148 

149 async def consume_one(self, msg: "BrokerStreamMessage[Any]") -> None: 

150 await self._put_msg(msg)