Coverage for faststream / redis / message.py: 97%

58 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import ( 

2 TYPE_CHECKING, 

3 Literal, 

4 Optional, 

5 TypeAlias, 

6 TypeVar, 

7 Union, 

8) 

9 

10from typing_extensions import NotRequired, TypedDict, override 

11 

12from faststream.message import StreamMessage as BrokerStreamMessage 

13 

14if TYPE_CHECKING: 

15 from redis.asyncio import Redis 

16 

17 from faststream._internal.basic_types import DecodedMessage 

18 

19 

20BaseMessage: TypeAlias = Union[ 

21 "PubSubMessage", 

22 "DefaultListMessage", 

23 "BatchListMessage", 

24 "DefaultStreamMessage", 

25 "BatchStreamMessage", 

26] 

27 

28 

29class UnifyRedisDict(TypedDict): 

30 type: Literal[ 

31 "pmessage", 

32 "message", 

33 "list", 

34 "blist", 

35 "stream", 

36 "bstream", 

37 ] 

38 channel: str 

39 data: bytes | list[bytes] | dict[bytes, bytes] | list[dict[bytes, bytes]] 

40 pattern: NotRequired[bytes | None] 

41 

42 

43class RedisMessage(BrokerStreamMessage[UnifyRedisDict]): 

44 pass 

45 

46 

47class PubSubMessage(TypedDict): 

48 """A class to represent a PubSub message.""" 

49 

50 type: Literal["pmessage", "message"] 

51 channel: str 

52 data: bytes 

53 pattern: bytes | None 

54 

55 

56class RedisChannelMessage(BrokerStreamMessage[PubSubMessage]): 

57 pass 

58 

59 

60class _ListMessage(TypedDict): 

61 """A class to represent an Abstract List message.""" 

62 

63 channel: str 

64 

65 

66class DefaultListMessage(_ListMessage): 

67 """A class to represent a single List message.""" 

68 

69 type: Literal["list"] 

70 data: bytes 

71 

72 

73class BatchListMessage(_ListMessage): 

74 """A class to represent a List messages batch.""" 

75 

76 type: Literal["blist"] 

77 data: list[bytes] 

78 

79 

80class RedisListMessage(BrokerStreamMessage[DefaultListMessage]): 

81 """StreamMessage for single List message.""" 

82 

83 

84class RedisBatchListMessage(BrokerStreamMessage[BatchListMessage]): 

85 """StreamMessage for single List message.""" 

86 

87 decoded_body: list["DecodedMessage"] 

88 

89 

90DATA_KEY = "__data__" 

91bDATA_KEY = DATA_KEY.encode() # noqa: N816 

92 

93 

94class _StreamMessage(TypedDict): 

95 channel: str 

96 message_ids: list[bytes] 

97 

98 

99class DefaultStreamMessage(_StreamMessage): 

100 type: Literal["stream"] 

101 data: dict[bytes, bytes] 

102 

103 

104class BatchStreamMessage(_StreamMessage): 

105 type: Literal["bstream"] 

106 data: list[dict[bytes, bytes]] 

107 

108 

109_StreamMsgType = TypeVar("_StreamMsgType", bound=_StreamMessage) 

110 

111 

112class _RedisStreamMessageMixin(BrokerStreamMessage[_StreamMsgType]): 

113 @override 

114 async def ack( 

115 self, 

116 redis: Optional["Redis[bytes]"] = None, 

117 group: str | None = None, 

118 ) -> None: 

119 if not self.committed and group is not None and redis is not None: 

120 ids = self.raw_message["message_ids"] 

121 channel = self.raw_message["channel"] 

122 await redis.xack(channel, group, *ids) # type: ignore[no-untyped-call] 

123 await super().ack() 

124 

125 @override 

126 async def nack( 

127 self, 

128 redis: Optional["Redis[bytes]"] = None, 

129 group: str | None = None, 

130 ) -> None: 

131 await super().nack() 

132 

133 @override 

134 async def reject( 

135 self, 

136 redis: Optional["Redis[bytes]"] = None, 

137 group: str | None = None, 

138 ) -> None: 

139 await super().reject() 

140 

141 async def delete(self, redis: Optional["Redis[bytes]"]) -> None: 

142 if redis is not None: 142 ↛ exitline 142 didn't return from function 'delete' because the condition on line 142 was always true

143 ids = self.raw_message["message_ids"] 

144 channel = self.raw_message["channel"] 

145 await redis.xdel(channel, *ids) 

146 

147 

148class RedisStreamMessage(_RedisStreamMessageMixin[DefaultStreamMessage]): 

149 pass 

150 

151 

152class RedisBatchStreamMessage(_RedisStreamMessageMixin[BatchStreamMessage]): 

153 decoded_body: list["DecodedMessage"]