Coverage for faststream / confluent / response.py: 98%
39 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import TYPE_CHECKING, Any, Union
3from typing_extensions import override
5from faststream.response.publish_type import PublishType
6from faststream.response.response import (
7 BatchPublishCommand,
8 PublishCommand,
9 Response,
10 extract_per_message_keys_and_bodies,
11 key_for_index,
12)
14if TYPE_CHECKING:
15 from faststream._internal.basic_types import SendableMessage
18class KafkaResponse(Response):
19 """Kafka-specific response object for outgoing messages.
21 Can be used in two ways:
22 1. As a return value from handler to send a response message
23 2. Directly in publish_batch() to set per-message attributes (key, headers, etc.)
25 For publish operations, consider using the more semantic alias `KafkaPublishMessage`.
26 """
28 def __init__(
29 self,
30 body: "SendableMessage",
31 *,
32 headers: dict[str, Any] | None = None,
33 correlation_id: str | None = None,
34 timestamp_ms: int | None = None,
35 key: bytes | Any | None = None,
36 ) -> None:
37 super().__init__(
38 body=body,
39 headers=headers,
40 correlation_id=correlation_id,
41 )
43 self.timestamp_ms = timestamp_ms
44 self.key = key
46 @override
47 def get_publish_key(self) -> bytes | Any | None:
48 """Return the Kafka message key for publishing."""
49 return self.key
51 @override
52 def as_publish_command(self) -> "KafkaPublishCommand":
53 return KafkaPublishCommand(
54 self.body,
55 headers=self.headers,
56 correlation_id=self.correlation_id,
57 _publish_type=PublishType.PUBLISH,
58 # Kafka specific
59 topic="",
60 key=self.key,
61 timestamp_ms=self.timestamp_ms,
62 )
65class KafkaPublishCommand(BatchPublishCommand):
66 def __init__(
67 self,
68 message: "SendableMessage",
69 /,
70 *messages: "SendableMessage",
71 topic: str,
72 _publish_type: PublishType,
73 key: bytes | Any | None = None,
74 partition: int | None = None,
75 timestamp_ms: int | None = None,
76 headers: dict[str, str] | None = None,
77 correlation_id: str | None = None,
78 reply_to: str = "",
79 no_confirm: bool = False,
80 timeout: float = 0.5,
81 ) -> None:
82 super().__init__(
83 message,
84 *messages,
85 destination=topic,
86 reply_to=reply_to,
87 correlation_id=correlation_id,
88 headers=headers,
89 _publish_type=_publish_type,
90 )
92 self.key = key
93 self.partition = partition
94 self.timestamp_ms = timestamp_ms
95 self.no_confirm = no_confirm
97 # request option
98 self.timeout = timeout
100 # per-message keys support
101 keys, normalized = extract_per_message_keys_and_bodies(self.batch_bodies)
102 if normalized is not None:
103 self.batch_bodies = normalized
104 self._per_message_keys = keys
106 @classmethod
107 def from_cmd(
108 cls,
109 cmd: Union["PublishCommand", "KafkaPublishCommand"],
110 *,
111 batch: bool = False,
112 ) -> "KafkaPublishCommand":
113 if isinstance(cmd, KafkaPublishCommand):
114 # NOTE: Should return a copy probably.
115 return cmd
117 body, extra_bodies = cls._parse_bodies(cmd.body, batch=batch)
119 return cls(
120 body,
121 *extra_bodies,
122 topic=cmd.destination,
123 correlation_id=cmd.correlation_id,
124 headers=cmd.headers,
125 reply_to=cmd.reply_to,
126 _publish_type=cmd.publish_type,
127 )
129 def key_for(self, index: int) -> Any | None:
130 return key_for_index(self._per_message_keys, self.key, index)
132 def headers_to_publish(self) -> dict[str, str]:
133 headers = {}
135 if self.correlation_id: 135 ↛ 138line 135 didn't jump to line 138 because the condition on line 135 was always true
136 headers["correlation_id"] = self.correlation_id
138 if self.reply_to:
139 headers["reply_to"] = self.reply_to
141 return headers | self.headers
144# Semantic alias for publish operations
145# More intuitive name when using in publish_batch() rather than as handler return value
146KafkaPublishMessage = KafkaResponse