Coverage for faststream / kafka / publisher / usecase.py: 96%
55 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from collections.abc import Iterable
2from typing import TYPE_CHECKING, Any, Literal, Union, cast, overload
4from typing_extensions import override
6from faststream._internal.endpoint.publisher import PublisherUsecase
7from faststream.kafka.response import KafkaPublishCommand
8from faststream.message import gen_cor_id
9from faststream.response.publish_type import PublishType
11if TYPE_CHECKING:
12 import asyncio
14 from aiokafka.structs import RecordMetadata
16 from faststream._internal.basic_types import SendableMessage
17 from faststream._internal.endpoint.publisher import PublisherSpecification
18 from faststream._internal.types import PublisherMiddleware
19 from faststream.kafka.message import KafkaMessage
20 from faststream.response.response import PublishCommand
22 from .config import KafkaPublisherConfig
23 from .producer import AioKafkaFastProducer
26class LogicPublisher(PublisherUsecase):
27 """A class to publish messages to a Kafka topic."""
29 def __init__(
30 self,
31 config: "KafkaPublisherConfig",
32 specification: "PublisherSpecification[Any, Any]",
33 ) -> None:
34 super().__init__(config, specification)
36 self._topic = config.topic
37 self.partition = config.partition
38 self.reply_to = config.reply_to
39 self.headers = config.headers or {}
41 @property
42 def topic(self) -> str:
43 return f"{self._outer_config.prefix}{self._topic}"
45 @override
46 async def request(
47 self,
48 message: "SendableMessage",
49 topic: str = "",
50 *,
51 key: bytes | Any | None = None,
52 partition: int | None = None,
53 timestamp_ms: int | None = None,
54 headers: dict[str, str] | None = None,
55 correlation_id: str | None = None,
56 timeout: float = 0.5,
57 ) -> "KafkaMessage":
58 """Send a request message to Kafka topic.
60 Args:
61 message: Message body to send.
62 topic: Topic where the message will be published.
63 key: A key to associate with the message. Can be used to
64 determine which partition to send the message to. If partition
65 is `None` (and producer's partitioner config is left as default),
66 then messages with the same key will be delivered to the same
67 partition (but if key is `None`, partition is chosen randomly).
68 Must be type `bytes`, or be serializable to bytes via configured
69 `key_serializer`.
70 partition: Specify a partition. If not set, the partition will be
71 selected using the configured `partitioner`.
72 timestamp_ms: Epoch milliseconds (from Jan 1 1970 UTC) to use as
73 the message timestamp. Defaults to current time.
74 headers: Message headers to store metainformation.
75 correlation_id: Manual message **correlation_id** setter.
76 **correlation_id** is a useful option to trace messages.
77 timeout: Timeout to send RPC request.
79 Returns:
80 KafkaMessage: The response message.
81 """
82 cmd = KafkaPublishCommand(
83 message,
84 topic=topic or self.topic,
85 key=key,
86 partition=partition if partition is not None else self.partition,
87 headers=self.headers | (headers or {}),
88 correlation_id=correlation_id or gen_cor_id(),
89 timestamp_ms=timestamp_ms,
90 timeout=timeout,
91 _publish_type=PublishType.REQUEST,
92 )
94 msg: KafkaMessage = await self._basic_request(
95 cmd,
96 producer=self._outer_config.producer,
97 )
98 return msg
100 async def flush(self) -> None:
101 producer = cast("AioKafkaFastProducer", self._outer_config.producer)
102 await producer.flush()
105class DefaultPublisher(LogicPublisher):
106 def __init__(
107 self,
108 config: "KafkaPublisherConfig",
109 specification: "PublisherSpecification[Any, Any]",
110 ) -> None:
111 super().__init__(config, specification)
113 self.key = config.key
115 @overload
116 async def publish(
117 self,
118 message: "SendableMessage",
119 topic: str = "",
120 *,
121 key: bytes | Any | None = None,
122 partition: int | None = None,
123 timestamp_ms: int | None = None,
124 headers: dict[str, str] | None = None,
125 correlation_id: str | None = None,
126 reply_to: str = "",
127 no_confirm: Literal[False] = False,
128 ) -> "RecordMetadata": ...
130 @overload
131 async def publish(
132 self,
133 message: "SendableMessage",
134 topic: str = "",
135 *,
136 key: bytes | Any | None = None,
137 partition: int | None = None,
138 timestamp_ms: int | None = None,
139 headers: dict[str, str] | None = None,
140 correlation_id: str | None = None,
141 reply_to: str = "",
142 no_confirm: Literal[True] = ...,
143 ) -> "asyncio.Future[RecordMetadata]": ...
145 @overload
146 async def publish(
147 self,
148 message: "SendableMessage",
149 topic: str = "",
150 *,
151 key: bytes | Any | None = None,
152 partition: int | None = None,
153 timestamp_ms: int | None = None,
154 headers: dict[str, str] | None = None,
155 correlation_id: str | None = None,
156 reply_to: str = "",
157 no_confirm: bool = False,
158 ) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]: ...
160 @override
161 async def publish(
162 self,
163 message: "SendableMessage",
164 topic: str = "",
165 *,
166 key: bytes | Any | None = None,
167 partition: int | None = None,
168 timestamp_ms: int | None = None,
169 headers: dict[str, str] | None = None,
170 correlation_id: str | None = None,
171 reply_to: str = "",
172 no_confirm: bool = False,
173 ) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]:
174 """Publishes a message to Kafka.
176 Args:
177 message:
178 Message body to send.
179 topic:
180 Topic where the message will be published.
181 key:
182 A key to associate with the message. Can be used to
183 determine which partition to send the message to. If partition
184 is `None` (and producer's partitioner config is left as default),
185 then messages with the same key will be delivered to the same
186 partition (but if key is `None`, partition is chosen randomly).
187 Must be type `bytes`, or be serializable to bytes via configured
188 `key_serializer`
189 partition:
190 Specify a partition. If not set, the partition will be
191 selected using the configured `partitioner`
192 timestamp_ms:
193 Epoch milliseconds (from Jan 1 1970 UTC) to use as
194 the message timestamp. Defaults to current time.
195 headers:
196 Message headers to store metainformation.
197 correlation_id:
198 Manual message **correlation_id** setter.
199 **correlation_id** is a useful option to trace messages.
200 reply_to:
201 Reply message topic name to send response.
202 no_confirm:
203 Do not wait for Kafka publish confirmation.
205 Returns:
206 `asyncio.Future[RecordMetadata]` if no_confirm = True.
207 `RecordMetadata` if no_confirm = False.
208 """
209 cmd = KafkaPublishCommand(
210 message,
211 topic=topic or self.topic,
212 key=key or self.key,
213 partition=partition if partition is not None else self.partition,
214 reply_to=reply_to or self.reply_to,
215 headers=self.headers | (headers or {}),
216 correlation_id=correlation_id or gen_cor_id(),
217 timestamp_ms=timestamp_ms,
218 no_confirm=no_confirm,
219 _publish_type=PublishType.PUBLISH,
220 )
221 return await self._basic_publish(
222 cmd,
223 producer=self._outer_config.producer,
224 _extra_middlewares=(),
225 )
227 @override
228 async def _publish(
229 self,
230 cmd: Union["PublishCommand", "KafkaPublishCommand"],
231 *,
232 _extra_middlewares: Iterable["PublisherMiddleware"],
233 ) -> None:
234 """This method should be called in subscriber flow only."""
235 cmd = KafkaPublishCommand.from_cmd(cmd)
237 cmd.destination = self.topic
238 cmd.add_headers(self.headers, override=False)
239 cmd.reply_to = cmd.reply_to or self.reply_to
241 cmd.partition = cmd.partition if cmd.partition is not None else self.partition
242 cmd.key = cmd.key or self.key
244 await self._basic_publish(
245 cmd,
246 producer=self._outer_config.producer,
247 _extra_middlewares=_extra_middlewares,
248 )
250 @override
251 async def request(
252 self,
253 message: "SendableMessage",
254 topic: str = "",
255 *,
256 key: bytes | Any | None = None,
257 partition: int | None = None,
258 timestamp_ms: int | None = None,
259 headers: dict[str, str] | None = None,
260 correlation_id: str | None = None,
261 timeout: float = 0.5,
262 ) -> "KafkaMessage":
263 """Send a request message and wait for a response.
265 Args:
266 message: Message body to send.
267 topic: Topic where the message will be published.
268 key: A key to associate with the message. Can be used to
269 determine which partition to send the message to. If partition
270 is `None` (and producer's partitioner config is left as default),
271 then messages with the same key will be delivered to the same
272 partition (but if key is `None`, partition is chosen randomly).
273 Must be type `bytes`, or be serializable to bytes via configured
274 `key_serializer`.
275 partition: Specify a partition. If not set, the partition will be
276 selected using the configured `partitioner`.
277 timestamp_ms: Epoch milliseconds (from Jan 1 1970 UTC) to use as
278 the message timestamp. Defaults to current time.
279 headers: Message headers to store metainformation.
280 correlation_id: Manual message **correlation_id** setter.
281 **correlation_id** is a useful option to trace messages.
282 timeout: Timeout to send RPC request.
284 Returns:
285 The response message.
286 """
287 return await super().request(
288 message,
289 topic=topic,
290 key=key or self.key,
291 partition=partition,
292 timestamp_ms=timestamp_ms,
293 headers=headers,
294 correlation_id=correlation_id,
295 timeout=timeout,
296 )
299class BatchPublisher(LogicPublisher):
300 def __init__(
301 self,
302 config: "KafkaPublisherConfig",
303 specification: "PublisherSpecification[Any, Any]",
304 ) -> None:
305 super().__init__(config, specification)
306 self.key = config.key
308 @overload
309 async def publish(
310 self,
311 *messages: "SendableMessage",
312 topic: str = "",
313 key: bytes | Any | None = None,
314 partition: int | None = None,
315 timestamp_ms: int | None = None,
316 headers: dict[str, str] | None = None,
317 reply_to: str = "",
318 correlation_id: str | None = None,
319 no_confirm: Literal[False] = False,
320 ) -> "RecordMetadata": ...
322 @overload
323 async def publish(
324 self,
325 *messages: "SendableMessage",
326 topic: str = "",
327 key: bytes | Any | None = None,
328 partition: int | None = None,
329 timestamp_ms: int | None = None,
330 headers: dict[str, str] | None = None,
331 reply_to: str = "",
332 correlation_id: str | None = None,
333 no_confirm: Literal[True] = ...,
334 ) -> "asyncio.Future[RecordMetadata]": ...
336 @overload
337 async def publish(
338 self,
339 *messages: "SendableMessage",
340 topic: str = "",
341 key: bytes | Any | None = None,
342 partition: int | None = None,
343 timestamp_ms: int | None = None,
344 headers: dict[str, str] | None = None,
345 reply_to: str = "",
346 correlation_id: str | None = None,
347 no_confirm: bool = False,
348 ) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]: ...
350 @override
351 async def publish(
352 self,
353 *messages: "SendableMessage",
354 topic: str = "",
355 key: bytes | Any | None = None,
356 partition: int | None = None,
357 timestamp_ms: int | None = None,
358 headers: dict[str, str] | None = None,
359 reply_to: str = "",
360 correlation_id: str | None = None,
361 no_confirm: bool = False,
362 ) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]:
363 """Publish a message batch as a single request to broker.
365 Args:
366 *messages:
367 Messages bodies to send.
368 topic:
369 Topic where the message will be published.
370 key:
371 A single key to associate with every message in this batch. If a
372 partition is not specified and the producer uses the default
373 partitioner, messages with the same key will be routed to the
374 same partition. Must be bytes or serializable to bytes via the
375 configured key serializer. If omitted, falls back to the
376 publisher's default key (if configured).
377 partition:
378 Specify a partition. If not set, the partition will be
379 selected using the configured `partitioner`
380 timestamp_ms:
381 Epoch milliseconds (from Jan 1 1970 UTC) to use as
382 the message timestamp. Defaults to current time.
383 headers:
384 Message headers to store metainformation.
385 reply_to:
386 Reply message topic name to send response.
387 correlation_id:
388 Manual message **correlation_id** setter.
389 **correlation_id** is a useful option to trace messages.
390 no_confirm:
391 Do not wait for Kafka publish confirmation.
393 Returns:
394 `asyncio.Future[RecordMetadata]` if no_confirm = True.
395 `RecordMetadata` if no_confirm = False.
396 """
397 cmd = KafkaPublishCommand(
398 *messages,
399 key=key or self.key,
400 topic=topic or self.topic,
401 partition=partition if partition is not None else self.partition,
402 reply_to=reply_to or self.reply_to,
403 headers=self.headers | (headers or {}),
404 correlation_id=correlation_id or gen_cor_id(),
405 timestamp_ms=timestamp_ms,
406 no_confirm=no_confirm,
407 _publish_type=PublishType.PUBLISH,
408 )
410 return await self._basic_publish_batch(
411 cmd,
412 producer=self._outer_config.producer,
413 _extra_middlewares=(),
414 )
416 @override
417 async def _publish(
418 self,
419 cmd: Union["PublishCommand", "KafkaPublishCommand"],
420 *,
421 _extra_middlewares: Iterable["PublisherMiddleware"],
422 ) -> None:
423 """This method should be called in subscriber flow only."""
424 cmd = KafkaPublishCommand.from_cmd(cmd, batch=True)
426 cmd.destination = self.topic
427 cmd.add_headers(self.headers, override=False)
428 cmd.reply_to = cmd.reply_to or self.reply_to
430 cmd.partition = cmd.partition if cmd.partition is not None else self.partition
431 cmd.key = cmd.key or self.key
433 await self._basic_publish_batch(
434 cmd,
435 producer=self._outer_config.producer,
436 _extra_middlewares=_extra_middlewares,
437 )