Coverage for faststream / redis / parser / binary.py: 98%
98 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import enum
2from collections.abc import Sequence
3from struct import pack, unpack
4from typing import TYPE_CHECKING, Any, Optional, Union
6from faststream._internal._compat import json_loads
8from .message import MessageFormat
10if TYPE_CHECKING:
11 from fast_depends.library.serializer import SerializerProto
13 from faststream._internal.basic_types import SendableMessage
16class FastStreamMessageVersion(int, enum.Enum):
17 v1 = 1
20class BinaryMessageFormatV1(MessageFormat):
21 """Message format to encode into binary and parse it."""
23 IDENTITY_HEADER = b"\x89BIN\x0d\x0a\x1a\x0a" # to avoid confusion with other formats
25 @classmethod
26 def encode(
27 cls,
28 *,
29 message: Union[Sequence["SendableMessage"], "SendableMessage"],
30 reply_to: str | None,
31 headers: dict[str, Any] | None,
32 correlation_id: str,
33 serializer: Optional["SerializerProto"] = None,
34 ) -> bytes:
35 msg = cls.build(
36 message=message,
37 reply_to=reply_to,
38 headers=headers,
39 correlation_id=correlation_id,
40 serializer=serializer,
41 )
42 headers_writer = BinaryWriter()
43 for key, value in msg.headers.items():
44 headers_writer.write_string(key)
45 headers_writer.write_string(value)
47 headers_len = len(headers_writer.data)
48 writer = BinaryWriter()
49 writer.write(cls.IDENTITY_HEADER)
50 writer.write_short(FastStreamMessageVersion.v1.value)
51 headers_start = len(writer.data) + 8
52 data_start = 2 + headers_start + headers_len
53 writer.write_int(headers_start)
54 writer.write_int(data_start)
55 writer.write_short(len(msg.headers.items()))
56 writer.write(headers_writer.get_bytes())
57 writer.write(msg.data)
58 return writer.get_bytes()
60 @classmethod
61 def parse(cls, data: bytes) -> tuple[bytes, dict[str, Any]]:
62 headers: dict[str, Any] = {}
63 final_data: bytes
65 try:
66 reader = BinaryReader(data)
67 magic_header = reader.read_until(len(cls.IDENTITY_HEADER))
68 message_version = reader.read_short()
70 if (
71 magic_header == cls.IDENTITY_HEADER
72 and message_version == FastStreamMessageVersion.v1.value
73 ):
74 headers_start = reader.read_int()
75 data_start = reader.read_int()
76 reader.shift_offset_to(headers_start)
77 header_count = reader.read_short()
78 for _ in range(header_count):
79 key = reader.read_string()
80 value = reader.read_string()
81 headers[key] = value
83 reader.shift_offset_to(data_start)
84 final_data = reader.read_bytes()
86 else:
87 parsed_data = json_loads(data)
88 final_data = parsed_data["data"].encode()
89 headers = parsed_data["headers"]
91 except Exception:
92 # Raw Redis message format or legacy JSON envelope
93 try:
94 parsed_data = json_loads(data)
95 final_data = parsed_data["data"].encode()
96 headers = parsed_data.get("headers", {})
97 except Exception:
98 final_data = data
99 headers = {}
100 return final_data, headers
102 return final_data, headers
105class BinaryWriter:
106 def __init__(self) -> None:
107 self.data = bytearray()
109 def write(self, data: bytes) -> None:
110 self.data.extend(data)
112 def write_short(self, number: int) -> None:
113 int_bytes = pack(">H", number)
114 self.write(int_bytes)
116 def write_int(self, number: int) -> None:
117 int_bytes = pack(">I", number)
118 self.write(int_bytes)
120 def write_string(self, data: str | bytes) -> None:
121 str_len = len(data)
122 self.write_short(str_len)
123 if isinstance(data, bytes): 123 ↛ 124line 123 didn't jump to line 124 because the condition on line 123 was never true
124 self.write(data)
125 else:
126 self.write(data.encode())
128 def get_bytes(self) -> bytes:
129 return bytes(self.data)
132class BinaryReader:
133 def __init__(self, data: bytes) -> None:
134 self.data = data
135 self.offset = 0
137 def read_until(self, offset: int) -> bytes:
138 data = self.data[self.offset : self.offset + offset]
139 self.offset += offset
140 return data
142 def shift_offset_to(self, offset: int) -> None:
143 self.offset = offset
145 def read_short(self) -> int:
146 data = unpack(">H", self.data[self.offset : self.offset + 2])[0]
147 self.offset += 2
148 return int(data)
150 def read_int(self) -> int:
151 data = unpack(">I", self.data[self.offset : self.offset + 4])[0]
152 self.offset += 4
153 return int(data)
155 def read_string(self) -> str:
156 str_len = self.read_short()
157 data = self.data[self.offset : self.offset + str_len]
158 self.offset += str_len
159 return data.decode()
161 def read_bytes(self) -> bytes:
162 return self.data[self.offset :]