Coverage for faststream / response / response.py: 94%

48 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from collections.abc import Sequence 

2from functools import singledispatch 

3from typing import Any 

4 

5from typing_extensions import Self 

6 

7from .publish_type import PublishType 

8 

9 

10class Response: 

11 def __init__( 

12 self, 

13 body: Any, 

14 *, 

15 headers: dict[str, Any] | None = None, 

16 correlation_id: str | None = None, 

17 ) -> None: 

18 """Initialize a handler.""" 

19 self.body = body 

20 self.headers = headers or {} 

21 self.correlation_id = correlation_id 

22 

23 def as_publish_command(self) -> "PublishCommand": 

24 """Method to transform handlers' Response result to DTO for publishers.""" 

25 return PublishCommand( 

26 body=self.body, 

27 headers=self.headers, 

28 correlation_id=self.correlation_id, 

29 _publish_type=PublishType.PUBLISH, 

30 ) 

31 

32 def get_publish_key(self) -> Any | None: 

33 """Get the key for publishing this message. 

34 

35 Override this method in subclasses to provide broker-specific keys. 

36 Default implementation returns None (no key). 

37 

38 Returns: 

39 The key for publishing, or None if this Response type doesn't use keys. 

40 """ 

41 return None 

42 

43 

44class PublishCommand(Response): 

45 def __init__( 

46 self, 

47 body: Any, 

48 *, 

49 _publish_type: PublishType, 

50 reply_to: str = "", 

51 destination: str = "", 

52 correlation_id: str | None = None, 

53 headers: dict[str, Any] | None = None, 

54 ) -> None: 

55 super().__init__( 

56 body, 

57 headers=headers, 

58 correlation_id=correlation_id, 

59 ) 

60 

61 self.destination = destination 

62 self.reply_to = reply_to 

63 

64 self.publish_type = _publish_type 

65 

66 @property 

67 def batch_bodies(self) -> tuple["Any", ...]: 

68 if self.body is not None: 

69 return (self.body,) 

70 return () 

71 

72 def add_headers( 

73 self, 

74 headers: dict[str, Any], 

75 *, 

76 override: bool = True, 

77 ) -> None: 

78 if override: 

79 self.headers |= headers 

80 else: 

81 self.headers = headers | self.headers 

82 

83 @classmethod 

84 def from_cmd(cls, cmd: Self) -> Self: 

85 raise NotImplementedError 

86 

87 

88class BatchPublishCommand(PublishCommand): 

89 def __init__( 

90 self, 

91 body: Any, 

92 /, 

93 *bodies: Any, 

94 _publish_type: PublishType, 

95 reply_to: str = "", 

96 destination: str = "", 

97 correlation_id: str | None = None, 

98 headers: dict[str, Any] | None = None, 

99 ) -> None: 

100 super().__init__( 

101 body, 

102 headers=headers, 

103 correlation_id=correlation_id, 

104 destination=destination, 

105 reply_to=reply_to, 

106 _publish_type=_publish_type, 

107 ) 

108 self.extra_bodies = bodies 

109 

110 @property 

111 def batch_bodies(self) -> tuple["Any", ...]: 

112 return (*super().batch_bodies, *self.extra_bodies) 

113 

114 @batch_bodies.setter 

115 def batch_bodies(self, value: Sequence["Any"]) -> None: 

116 if len(value) == 0: 

117 self.body = None 

118 self.extra_bodies = () 

119 else: 

120 self.body = value[0] 

121 self.extra_bodies = tuple(value[1:]) 

122 

123 @classmethod 

124 def from_cmd( 

125 cls, 

126 cmd: "PublishCommand", 

127 *, 

128 batch: bool = False, 

129 ) -> "BatchPublishCommand": 

130 raise NotImplementedError 

131 

132 @staticmethod 

133 def _parse_bodies(body: Any, *, batch: bool = False) -> tuple[Any, tuple[Any, ...]]: 

134 extra_bodies = [] 

135 if batch and isinstance(body, Sequence) and not isinstance(body, (str, bytes)): 

136 if body: 

137 body, extra_bodies = body[0], body[1:] 

138 else: 

139 body = None 

140 return body, tuple(extra_bodies) 

141 

142 

143@singledispatch 

144def _extract_body_and_key(item: Any) -> tuple[Any, Any | None]: 

145 """Extract body and key from a plain message. 

146 

147 Default implementation for non-Response objects. 

148 Returns the item as-is for body and None for key. 

149 """ 

150 return item, None 

151 

152 

153@_extract_body_and_key.register 

154def _(item: Response) -> tuple[Any, Any | None]: 

155 """Extract body and key from a Response object. 

156 

157 Uses polymorphic get_publish_key() method to retrieve the key. 

158 """ 

159 return item.body, item.get_publish_key() 

160 

161 

162def extract_per_message_keys_and_bodies( 

163 batch_bodies: Sequence[Any], 

164) -> tuple[tuple[Any | None, ...], tuple[Any, ...] | None]: 

165 """Extract per-message keys and optionally normalized bodies from a batch. 

166 

167 Returns a pair (keys, normalized_bodies_or_None): 

168 - If no Response objects are present, returns ((), None) 

169 so callers can reuse the original bodies without extra allocations. 

170 - Otherwise returns (keys_tuple, normalized_bodies_tuple), where normalized bodies 

171 contain the extracted 'body' values from Response objects (or the original item). 

172 

173 Supports passing Response objects (e.g., KafkaResponse) to set per-message keys: 

174 await broker.publish_batch( 

175 KafkaResponse("body1", key=b"key1"), 

176 KafkaResponse("body2", key=b"key2"), 

177 "plain message" # uses default key 

178 ) 

179 

180 Uses singledispatch for type-based polymorphism without isinstance checks. 

181 """ 

182 if not batch_bodies: 

183 return (), None 

184 

185 bodies: list[Any] = [] 

186 keys: list[Any | None] = [] 

187 has_key: bool = False 

188 

189 for item in batch_bodies: 

190 body, key = _extract_body_and_key(item) 

191 bodies.append(body) 

192 keys.append(key) 

193 if key is not None: 

194 has_key = True 

195 

196 if not has_key: 

197 return (), None 

198 

199 return tuple(keys), tuple(bodies) 

200 

201 

202def key_for_index( 

203 keys: Sequence[Any | None], default_key: Any | None, index: int 

204) -> Any | None: 

205 """Return the effective key for a given message index. 

206 

207 Prefers a per-message key at the given index when it is not None; 

208 otherwise falls back to ``default_key``. If the index is out of bounds 

209 or negative, ``default_key`` is returned. 

210 """ 

211 if index < 0: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true

212 return default_key 

213 

214 try: 

215 k = keys[index] 

216 except IndexError: 

217 return default_key 

218 

219 return k if k is not None else default_key