Coverage for faststream / redis / parser / binary.py: 98%

98 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import enum 

2from collections.abc import Sequence 

3from struct import pack, unpack 

4from typing import TYPE_CHECKING, Any, Optional, Union 

5 

6from faststream._internal._compat import json_loads 

7 

8from .message import MessageFormat 

9 

10if TYPE_CHECKING: 

11 from fast_depends.library.serializer import SerializerProto 

12 

13 from faststream._internal.basic_types import SendableMessage 

14 

15 

16class FastStreamMessageVersion(int, enum.Enum): 

17 v1 = 1 

18 

19 

20class BinaryMessageFormatV1(MessageFormat): 

21 """Message format to encode into binary and parse it.""" 

22 

23 IDENTITY_HEADER = b"\x89BIN\x0d\x0a\x1a\x0a" # to avoid confusion with other formats 

24 

25 @classmethod 

26 def encode( 

27 cls, 

28 *, 

29 message: Union[Sequence["SendableMessage"], "SendableMessage"], 

30 reply_to: str | None, 

31 headers: dict[str, Any] | None, 

32 correlation_id: str, 

33 serializer: Optional["SerializerProto"] = None, 

34 ) -> bytes: 

35 msg = cls.build( 

36 message=message, 

37 reply_to=reply_to, 

38 headers=headers, 

39 correlation_id=correlation_id, 

40 serializer=serializer, 

41 ) 

42 headers_writer = BinaryWriter() 

43 for key, value in msg.headers.items(): 

44 headers_writer.write_string(key) 

45 headers_writer.write_string(value) 

46 

47 headers_len = len(headers_writer.data) 

48 writer = BinaryWriter() 

49 writer.write(cls.IDENTITY_HEADER) 

50 writer.write_short(FastStreamMessageVersion.v1.value) 

51 headers_start = len(writer.data) + 8 

52 data_start = 2 + headers_start + headers_len 

53 writer.write_int(headers_start) 

54 writer.write_int(data_start) 

55 writer.write_short(len(msg.headers.items())) 

56 writer.write(headers_writer.get_bytes()) 

57 writer.write(msg.data) 

58 return writer.get_bytes() 

59 

60 @classmethod 

61 def parse(cls, data: bytes) -> tuple[bytes, dict[str, Any]]: 

62 headers: dict[str, Any] = {} 

63 final_data: bytes 

64 

65 try: 

66 reader = BinaryReader(data) 

67 magic_header = reader.read_until(len(cls.IDENTITY_HEADER)) 

68 message_version = reader.read_short() 

69 

70 if ( 

71 magic_header == cls.IDENTITY_HEADER 

72 and message_version == FastStreamMessageVersion.v1.value 

73 ): 

74 headers_start = reader.read_int() 

75 data_start = reader.read_int() 

76 reader.shift_offset_to(headers_start) 

77 header_count = reader.read_short() 

78 for _ in range(header_count): 

79 key = reader.read_string() 

80 value = reader.read_string() 

81 headers[key] = value 

82 

83 reader.shift_offset_to(data_start) 

84 final_data = reader.read_bytes() 

85 

86 else: 

87 parsed_data = json_loads(data) 

88 final_data = parsed_data["data"].encode() 

89 headers = parsed_data["headers"] 

90 

91 except Exception: 

92 # Raw Redis message format or legacy JSON envelope 

93 try: 

94 parsed_data = json_loads(data) 

95 final_data = parsed_data["data"].encode() 

96 headers = parsed_data.get("headers", {}) 

97 except Exception: 

98 final_data = data 

99 headers = {} 

100 return final_data, headers 

101 

102 return final_data, headers 

103 

104 

105class BinaryWriter: 

106 def __init__(self) -> None: 

107 self.data = bytearray() 

108 

109 def write(self, data: bytes) -> None: 

110 self.data.extend(data) 

111 

112 def write_short(self, number: int) -> None: 

113 int_bytes = pack(">H", number) 

114 self.write(int_bytes) 

115 

116 def write_int(self, number: int) -> None: 

117 int_bytes = pack(">I", number) 

118 self.write(int_bytes) 

119 

120 def write_string(self, data: str | bytes) -> None: 

121 str_len = len(data) 

122 self.write_short(str_len) 

123 if isinstance(data, bytes): 123 ↛ 124line 123 didn't jump to line 124 because the condition on line 123 was never true

124 self.write(data) 

125 else: 

126 self.write(data.encode()) 

127 

128 def get_bytes(self) -> bytes: 

129 return bytes(self.data) 

130 

131 

132class BinaryReader: 

133 def __init__(self, data: bytes) -> None: 

134 self.data = data 

135 self.offset = 0 

136 

137 def read_until(self, offset: int) -> bytes: 

138 data = self.data[self.offset : self.offset + offset] 

139 self.offset += offset 

140 return data 

141 

142 def shift_offset_to(self, offset: int) -> None: 

143 self.offset = offset 

144 

145 def read_short(self) -> int: 

146 data = unpack(">H", self.data[self.offset : self.offset + 2])[0] 

147 self.offset += 2 

148 return int(data) 

149 

150 def read_int(self) -> int: 

151 data = unpack(">I", self.data[self.offset : self.offset + 4])[0] 

152 self.offset += 4 

153 return int(data) 

154 

155 def read_string(self) -> str: 

156 str_len = self.read_short() 

157 data = self.data[self.offset : self.offset + str_len] 

158 self.offset += str_len 

159 return data.decode() 

160 

161 def read_bytes(self) -> bytes: 

162 return self.data[self.offset :]