Coverage for faststream / response / response.py: 94%
48 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from collections.abc import Sequence
2from functools import singledispatch
3from typing import Any
5from typing_extensions import Self
7from .publish_type import PublishType
10class Response:
11 def __init__(
12 self,
13 body: Any,
14 *,
15 headers: dict[str, Any] | None = None,
16 correlation_id: str | None = None,
17 ) -> None:
18 """Initialize a handler."""
19 self.body = body
20 self.headers = headers or {}
21 self.correlation_id = correlation_id
23 def as_publish_command(self) -> "PublishCommand":
24 """Method to transform handlers' Response result to DTO for publishers."""
25 return PublishCommand(
26 body=self.body,
27 headers=self.headers,
28 correlation_id=self.correlation_id,
29 _publish_type=PublishType.PUBLISH,
30 )
32 def get_publish_key(self) -> Any | None:
33 """Get the key for publishing this message.
35 Override this method in subclasses to provide broker-specific keys.
36 Default implementation returns None (no key).
38 Returns:
39 The key for publishing, or None if this Response type doesn't use keys.
40 """
41 return None
44class PublishCommand(Response):
45 def __init__(
46 self,
47 body: Any,
48 *,
49 _publish_type: PublishType,
50 reply_to: str = "",
51 destination: str = "",
52 correlation_id: str | None = None,
53 headers: dict[str, Any] | None = None,
54 ) -> None:
55 super().__init__(
56 body,
57 headers=headers,
58 correlation_id=correlation_id,
59 )
61 self.destination = destination
62 self.reply_to = reply_to
64 self.publish_type = _publish_type
66 @property
67 def batch_bodies(self) -> tuple["Any", ...]:
68 if self.body is not None:
69 return (self.body,)
70 return ()
72 def add_headers(
73 self,
74 headers: dict[str, Any],
75 *,
76 override: bool = True,
77 ) -> None:
78 if override:
79 self.headers |= headers
80 else:
81 self.headers = headers | self.headers
83 @classmethod
84 def from_cmd(cls, cmd: Self) -> Self:
85 raise NotImplementedError
88class BatchPublishCommand(PublishCommand):
89 def __init__(
90 self,
91 body: Any,
92 /,
93 *bodies: Any,
94 _publish_type: PublishType,
95 reply_to: str = "",
96 destination: str = "",
97 correlation_id: str | None = None,
98 headers: dict[str, Any] | None = None,
99 ) -> None:
100 super().__init__(
101 body,
102 headers=headers,
103 correlation_id=correlation_id,
104 destination=destination,
105 reply_to=reply_to,
106 _publish_type=_publish_type,
107 )
108 self.extra_bodies = bodies
110 @property
111 def batch_bodies(self) -> tuple["Any", ...]:
112 return (*super().batch_bodies, *self.extra_bodies)
114 @batch_bodies.setter
115 def batch_bodies(self, value: Sequence["Any"]) -> None:
116 if len(value) == 0:
117 self.body = None
118 self.extra_bodies = ()
119 else:
120 self.body = value[0]
121 self.extra_bodies = tuple(value[1:])
123 @classmethod
124 def from_cmd(
125 cls,
126 cmd: "PublishCommand",
127 *,
128 batch: bool = False,
129 ) -> "BatchPublishCommand":
130 raise NotImplementedError
132 @staticmethod
133 def _parse_bodies(body: Any, *, batch: bool = False) -> tuple[Any, tuple[Any, ...]]:
134 extra_bodies = []
135 if batch and isinstance(body, Sequence) and not isinstance(body, (str, bytes)):
136 if body:
137 body, extra_bodies = body[0], body[1:]
138 else:
139 body = None
140 return body, tuple(extra_bodies)
143@singledispatch
144def _extract_body_and_key(item: Any) -> tuple[Any, Any | None]:
145 """Extract body and key from a plain message.
147 Default implementation for non-Response objects.
148 Returns the item as-is for body and None for key.
149 """
150 return item, None
153@_extract_body_and_key.register
154def _(item: Response) -> tuple[Any, Any | None]:
155 """Extract body and key from a Response object.
157 Uses polymorphic get_publish_key() method to retrieve the key.
158 """
159 return item.body, item.get_publish_key()
162def extract_per_message_keys_and_bodies(
163 batch_bodies: Sequence[Any],
164) -> tuple[tuple[Any | None, ...], tuple[Any, ...] | None]:
165 """Extract per-message keys and optionally normalized bodies from a batch.
167 Returns a pair (keys, normalized_bodies_or_None):
168 - If no Response objects are present, returns ((), None)
169 so callers can reuse the original bodies without extra allocations.
170 - Otherwise returns (keys_tuple, normalized_bodies_tuple), where normalized bodies
171 contain the extracted 'body' values from Response objects (or the original item).
173 Supports passing Response objects (e.g., KafkaResponse) to set per-message keys:
174 await broker.publish_batch(
175 KafkaResponse("body1", key=b"key1"),
176 KafkaResponse("body2", key=b"key2"),
177 "plain message" # uses default key
178 )
180 Uses singledispatch for type-based polymorphism without isinstance checks.
181 """
182 if not batch_bodies:
183 return (), None
185 bodies: list[Any] = []
186 keys: list[Any | None] = []
187 has_key: bool = False
189 for item in batch_bodies:
190 body, key = _extract_body_and_key(item)
191 bodies.append(body)
192 keys.append(key)
193 if key is not None:
194 has_key = True
196 if not has_key:
197 return (), None
199 return tuple(keys), tuple(bodies)
202def key_for_index(
203 keys: Sequence[Any | None], default_key: Any | None, index: int
204) -> Any | None:
205 """Return the effective key for a given message index.
207 Prefers a per-message key at the given index when it is not None;
208 otherwise falls back to ``default_key``. If the index is out of bounds
209 or negative, ``default_key`` is returned.
210 """
211 if index < 0: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true
212 return default_key
214 try:
215 k = keys[index]
216 except IndexError:
217 return default_key
219 return k if k is not None else default_key