Coverage for faststream / redis / message.py: 97%
58 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import (
2 TYPE_CHECKING,
3 Literal,
4 Optional,
5 TypeAlias,
6 TypeVar,
7 Union,
8)
10from typing_extensions import NotRequired, TypedDict, override
12from faststream.message import StreamMessage as BrokerStreamMessage
14if TYPE_CHECKING:
15 from redis.asyncio import Redis
17 from faststream._internal.basic_types import DecodedMessage
20BaseMessage: TypeAlias = Union[
21 "PubSubMessage",
22 "DefaultListMessage",
23 "BatchListMessage",
24 "DefaultStreamMessage",
25 "BatchStreamMessage",
26]
29class UnifyRedisDict(TypedDict):
30 type: Literal[
31 "pmessage",
32 "message",
33 "list",
34 "blist",
35 "stream",
36 "bstream",
37 ]
38 channel: str
39 data: bytes | list[bytes] | dict[bytes, bytes] | list[dict[bytes, bytes]]
40 pattern: NotRequired[bytes | None]
43class RedisMessage(BrokerStreamMessage[UnifyRedisDict]):
44 pass
47class PubSubMessage(TypedDict):
48 """A class to represent a PubSub message."""
50 type: Literal["pmessage", "message"]
51 channel: str
52 data: bytes
53 pattern: bytes | None
56class RedisChannelMessage(BrokerStreamMessage[PubSubMessage]):
57 pass
60class _ListMessage(TypedDict):
61 """A class to represent an Abstract List message."""
63 channel: str
66class DefaultListMessage(_ListMessage):
67 """A class to represent a single List message."""
69 type: Literal["list"]
70 data: bytes
73class BatchListMessage(_ListMessage):
74 """A class to represent a List messages batch."""
76 type: Literal["blist"]
77 data: list[bytes]
80class RedisListMessage(BrokerStreamMessage[DefaultListMessage]):
81 """StreamMessage for single List message."""
84class RedisBatchListMessage(BrokerStreamMessage[BatchListMessage]):
85 """StreamMessage for single List message."""
87 decoded_body: list["DecodedMessage"]
90DATA_KEY = "__data__"
91bDATA_KEY = DATA_KEY.encode() # noqa: N816
94class _StreamMessage(TypedDict):
95 channel: str
96 message_ids: list[bytes]
99class DefaultStreamMessage(_StreamMessage):
100 type: Literal["stream"]
101 data: dict[bytes, bytes]
104class BatchStreamMessage(_StreamMessage):
105 type: Literal["bstream"]
106 data: list[dict[bytes, bytes]]
109_StreamMsgType = TypeVar("_StreamMsgType", bound=_StreamMessage)
112class _RedisStreamMessageMixin(BrokerStreamMessage[_StreamMsgType]):
113 @override
114 async def ack(
115 self,
116 redis: Optional["Redis[bytes]"] = None,
117 group: str | None = None,
118 ) -> None:
119 if not self.committed and group is not None and redis is not None:
120 ids = self.raw_message["message_ids"]
121 channel = self.raw_message["channel"]
122 await redis.xack(channel, group, *ids) # type: ignore[no-untyped-call]
123 await super().ack()
125 @override
126 async def nack(
127 self,
128 redis: Optional["Redis[bytes]"] = None,
129 group: str | None = None,
130 ) -> None:
131 await super().nack()
133 @override
134 async def reject(
135 self,
136 redis: Optional["Redis[bytes]"] = None,
137 group: str | None = None,
138 ) -> None:
139 await super().reject()
141 async def delete(self, redis: Optional["Redis[bytes]"]) -> None:
142 if redis is not None: 142 ↛ exitline 142 didn't return from function 'delete' because the condition on line 142 was always true
143 ids = self.raw_message["message_ids"]
144 channel = self.raw_message["channel"]
145 await redis.xdel(channel, *ids)
148class RedisStreamMessage(_RedisStreamMessageMixin[DefaultStreamMessage]):
149 pass
152class RedisBatchStreamMessage(_RedisStreamMessageMixin[BatchStreamMessage]):
153 decoded_body: list["DecodedMessage"]