Coverage for faststream / confluent / helpers / client.py: 82%
106 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import asyncio
2import logging
3from collections.abc import Callable, Sequence
4from concurrent.futures import ThreadPoolExecutor
5from contextlib import suppress
6from time import time
7from typing import TYPE_CHECKING, Any, cast
9import anyio
10from confluent_kafka import Consumer, KafkaError, KafkaException, Message, Producer
12from faststream._internal.utils.functions import call_or_await, run_in_executor
13from faststream.confluent.schemas import TopicPartition
14from faststream.exceptions import SetupError
16from . import config as config_module
18if TYPE_CHECKING:
19 from typing_extensions import NotRequired, TypedDict
21 from faststream._internal.logger import LoggerState
23 from .admin import AdminService
25 class _SendKwargs(TypedDict):
26 value: bytes | str | None
27 key: bytes | str | None
28 headers: list[tuple[str, str | bytes]] | None
29 partition: NotRequired[int]
30 timestamp: NotRequired[int]
31 on_delivery: NotRequired[Callable[..., None]]
34class AsyncConfluentProducer:
35 """An asynchronous Python Kafka client using the "confluent-kafka" package."""
37 def __init__(
38 self,
39 *,
40 logger: "LoggerState",
41 config: config_module.ConfluentFastConfig,
42 ) -> None:
43 self.logger_state = logger
45 self.config = config.producer_config
46 self.producer = Producer(
47 self.config,
48 logger=self.logger_state.logger.logger,
49 )
51 self.__running = True
52 self._poll_task = asyncio.create_task(self._poll_loop())
54 async def _poll_loop(self) -> None:
55 while self.__running: 55 ↛ exitline 55 didn't return from function '_poll_loop' because the condition on line 55 was always true
56 with suppress(Exception):
57 await call_or_await(self.producer.poll, 0.1)
59 async def stop(self) -> None:
60 """Stop the Kafka producer and flush remaining messages."""
61 if self.__running: 61 ↛ exitline 61 didn't return from function 'stop' because the condition on line 61 was always true
62 self.__running = False
63 if not self._poll_task.done(): 63 ↛ 65line 63 didn't jump to line 65 because the condition on line 63 was always true
64 self._poll_task.cancel()
65 await call_or_await(self.producer.flush)
67 async def flush(self) -> None:
68 await call_or_await(self.producer.flush)
70 async def send(
71 self,
72 topic: str,
73 value: bytes | str | None = None,
74 key: bytes | str | None = None,
75 partition: int | None = None,
76 timestamp_ms: int | None = None,
77 headers: list[tuple[str, str | bytes]] | None = None,
78 no_confirm: bool = False,
79 ) -> "asyncio.Future[Message | None] | Message | None":
80 """Sends a single message to a Kafka topic."""
81 kwargs: _SendKwargs = {
82 "value": value,
83 "key": key,
84 "headers": headers,
85 }
87 if partition is not None: 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true
88 kwargs["partition"] = partition
90 if timestamp_ms is not None:
91 kwargs["timestamp"] = timestamp_ms
93 loop = asyncio.get_running_loop()
94 result_future: asyncio.Future[Message | None] = loop.create_future()
96 def ack_callback(err: Any, msg: Message | None) -> None:
97 if err or (msg is not None and (err := msg.error())): 97 ↛ 98line 97 didn't jump to line 98 because the condition on line 97 was never true
98 loop.call_soon_threadsafe(
99 result_future.set_exception,
100 KafkaException(err),
101 )
102 else:
103 loop.call_soon_threadsafe(result_future.set_result, msg)
105 kwargs["on_delivery"] = ack_callback
107 # should be sync to prevent segfault
108 # confluent stub expects bytes|None for value/key; we accept str and encode
109 produce_value: bytes | None = (
110 kwargs["value"]
111 if isinstance(kwargs["value"], (bytes, type(None)))
112 else kwargs["value"].encode()
113 )
114 produce_key: bytes | None = (
115 kwargs["key"]
116 if isinstance(kwargs["key"], (bytes, type(None)))
117 else (kwargs["key"].encode() if kwargs["key"] is not None else None)
118 )
119 produce_headers: (
120 dict[str, str | bytes | None] | list[tuple[str, str | bytes | None]] | None
121 ) = cast("Any", kwargs["headers"]) if kwargs.get("headers") is not None else None
122 produce_kwargs: dict[str, Any] = {
123 "value": produce_value,
124 "key": produce_key,
125 "headers": produce_headers,
126 "on_delivery": kwargs["on_delivery"],
127 }
128 if kwargs.get("partition") is not None: 128 ↛ 129line 128 didn't jump to line 129 because the condition on line 128 was never true
129 produce_kwargs["partition"] = kwargs["partition"]
130 if kwargs.get("timestamp") is not None:
131 produce_kwargs["timestamp"] = kwargs["timestamp"]
132 self.producer.produce(topic, **produce_kwargs)
134 if no_confirm: 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true
135 return result_future
136 return await result_future
138 def create_batch(self) -> "BatchBuilder":
139 """Creates a batch for sending multiple messages."""
140 return BatchBuilder()
142 async def send_batch(
143 self,
144 batch: "BatchBuilder",
145 topic: str,
146 *,
147 partition: int | None,
148 no_confirm: bool = False,
149 ) -> None:
150 """Sends a batch of messages to a Kafka topic."""
151 async with anyio.create_task_group() as tg:
152 for msg in batch._builder:
153 tg.start_soon(
154 self.send,
155 topic,
156 msg["value"],
157 msg["key"],
158 partition,
159 msg["timestamp_ms"],
160 msg["headers"],
161 no_confirm,
162 )
164 async def ping(
165 self,
166 timeout: float | None = 5.0,
167 ) -> bool:
168 """Implement ping using `list_topics` information request."""
169 if timeout is None: 169 ↛ 170line 169 didn't jump to line 170 because the condition on line 169 was never true
170 timeout = -1
172 try:
173 cluster_metadata = await call_or_await(
174 self.producer.list_topics,
175 timeout=timeout,
176 )
178 return bool(cluster_metadata)
180 except Exception:
181 return False
184class AsyncConfluentConsumer:
185 """An asynchronous Python Kafka client for consuming messages using the "confluent-kafka" package."""
187 def __init__(
188 self,
189 *topics: str,
190 config: config_module.ConfluentFastConfig,
191 logger: "LoggerState",
192 admin_service: "AdminService",
193 # kwargs options
194 partitions: Sequence["TopicPartition"],
195 bootstrap_servers: str | list[str] = "localhost",
196 # consumer options
197 client_id: str | None = "confluent-kafka-consumer",
198 group_id: str | None = None,
199 group_instance_id: str | None = None,
200 fetch_max_wait_ms: int = 500,
201 fetch_max_bytes: int = 52428800,
202 fetch_min_bytes: int = 1,
203 max_partition_fetch_bytes: int = 1 * 1024 * 1024,
204 retry_backoff_ms: int = 100,
205 auto_offset_reset: str = "latest",
206 enable_auto_commit: bool = True,
207 auto_commit_interval_ms: int = 5000,
208 check_crcs: bool = True,
209 metadata_max_age_ms: int = 5 * 60 * 1000,
210 partition_assignment_strategy: str | list[Any] = "roundrobin",
211 max_poll_interval_ms: int = 300000,
212 session_timeout_ms: int = 10000,
213 heartbeat_interval_ms: int = 3000,
214 security_protocol: str = "PLAINTEXT",
215 connections_max_idle_ms: int = 540000,
216 isolation_level: str = "read_uncommitted",
217 allow_auto_create_topics: bool = True,
218 # rebalance callbacks
219 on_assign: Callable[..., None] | None = None,
220 on_revoke: Callable[..., None] | None = None,
221 on_lost: Callable[..., None] | None = None,
222 ) -> None:
223 self.admin_client = admin_service
224 self.logger_state = logger
226 self._on_assign = on_assign
227 self._on_revoke = on_revoke
228 self._on_lost = on_lost
230 self.topics = list(topics)
231 self.partitions = partitions
233 if not isinstance(partition_assignment_strategy, str):
234 partition_assignment_strategy = ",".join(
235 [
236 x if isinstance(x, str) else x().name
237 for x in partition_assignment_strategy
238 ],
239 )
241 config_from_params = {
242 "allow.auto.create.topics": allow_auto_create_topics,
243 "topic.metadata.refresh.interval.ms": 1000,
244 "bootstrap.servers": bootstrap_servers,
245 "client.id": client_id,
246 "group.id": group_id or "faststream-consumer-group",
247 "group.instance.id": group_instance_id,
248 "fetch.wait.max.ms": fetch_max_wait_ms,
249 "fetch.max.bytes": fetch_max_bytes,
250 "fetch.min.bytes": fetch_min_bytes,
251 "max.partition.fetch.bytes": max_partition_fetch_bytes,
252 "fetch.error.backoff.ms": retry_backoff_ms,
253 "auto.offset.reset": auto_offset_reset,
254 "enable.auto.commit": enable_auto_commit,
255 "auto.commit.interval.ms": auto_commit_interval_ms,
256 "check.crcs": check_crcs,
257 "metadata.max.age.ms": metadata_max_age_ms,
258 "partition.assignment.strategy": partition_assignment_strategy,
259 "max.poll.interval.ms": max_poll_interval_ms,
260 "session.timeout.ms": session_timeout_ms,
261 "heartbeat.interval.ms": heartbeat_interval_ms,
262 "security.protocol": security_protocol.lower(),
263 "connections.max.idle.ms": connections_max_idle_ms,
264 "isolation.level": isolation_level,
265 } | config.consumer_config
267 self.config = config_from_params
268 self.consumer = Consumer(self.config, logger=self.logger_state.logger.logger)
270 # A pool with single thread is used in order to execute the commands of the consumer sequentially:
271 # https://github.com/ag2ai/faststream/issues/1904#issuecomment-2506990895
272 self._thread_pool = ThreadPoolExecutor(max_workers=1)
274 @property
275 def topics_to_create(self) -> list[str]:
276 return list({*self.topics, *(p.topic for p in self.partitions)})
278 async def start(self) -> None:
279 """Starts the Kafka consumer and subscribes to the specified topics."""
280 if self.config.get("allow.auto.create.topics", True):
281 topics_creation_result = await run_in_executor(
282 self._thread_pool,
283 self.admin_client.create_topics,
284 self.topics_to_create,
285 )
287 for create_result in topics_creation_result:
288 if create_result.error:
289 self.logger_state.log(
290 log_level=logging.WARNING,
291 message=f"Failed to create topic {create_result.topic}: {create_result.error}",
292 )
294 else:
295 self.logger_state.log(
296 log_level=logging.WARNING,
297 message="Auto create topics is disabled. Make sure the topics exist.",
298 )
300 if self.topics:
301 subscribe_kwargs: dict[str, Any] = {"topics": self.topics}
302 if self._on_assign is not None: 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true
303 subscribe_kwargs["on_assign"] = self._on_assign
304 if self._on_revoke is not None: 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true
305 subscribe_kwargs["on_revoke"] = self._on_revoke
306 if self._on_lost is not None: 306 ↛ 307line 306 didn't jump to line 307 because the condition on line 306 was never true
307 subscribe_kwargs["on_lost"] = self._on_lost
308 await run_in_executor(
309 self._thread_pool,
310 self.consumer.subscribe,
311 **subscribe_kwargs,
312 )
314 elif self.partitions: 314 ↛ 322line 314 didn't jump to line 322 because the condition on line 314 was always true
315 await run_in_executor(
316 self._thread_pool,
317 self.consumer.assign,
318 [p.to_confluent() for p in self.partitions],
319 )
321 else:
322 msg = "You must provide either `topics` or `partitions` option."
323 raise SetupError(msg)
325 async def commit(self, asynchronous: bool = True) -> None:
326 """Commits the offsets of all messages returned by the last poll operation."""
327 await run_in_executor(
328 self._thread_pool,
329 lambda: self.consumer.commit(asynchronous=asynchronous), # type: ignore[call-overload]
330 )
332 async def stop(self) -> None:
333 """Stops the Kafka consumer and releases all resources."""
334 # NOTE: If we don't explicitly call commit and then close the consumer, the confluent consumer gets stuck.
335 # We are doing this to avoid the issue.
336 enable_auto_commit = self.config.get("enable.auto.commit", True)
338 try:
339 if enable_auto_commit:
340 await self.commit(asynchronous=False)
342 except Exception as e:
343 # No offset stored issue is not a problem - https://github.com/confluentinc/confluent-kafka-python/issues/295#issuecomment-355907183
344 if "No offset stored" in str(e):
345 pass
346 else:
347 self.logger_state.log(
348 log_level=logging.ERROR,
349 message="Consumer closing error occurred.",
350 exc_info=e,
351 )
353 # Wrap calls to async to make method cancelable by timeout
354 # We shouldn't read messages and close consumer concurrently
355 # https://github.com/ag2ai/faststream/issues/1904#issuecomment-2506990895
356 # Now it works without lock due `ThreadPoolExecutor(max_workers=1)`
357 # that makes all calls to consumer sequential
358 await run_in_executor(self._thread_pool, self.consumer.close)
360 self._thread_pool.shutdown(wait=False)
362 async def getone(self, timeout: float = 0.1) -> Message | None:
363 """Consumes a single message from Kafka."""
364 msg = await run_in_executor(self._thread_pool, self.consumer.poll, timeout)
365 return check_msg_error(msg)
367 async def getmany(
368 self,
369 timeout: float = 0.1,
370 max_records: int | None = 10,
371 ) -> tuple[Message, ...]:
372 """Consumes a batch of messages from Kafka and groups them by topic and partition."""
373 raw_messages: list[Message | None] = await run_in_executor(
374 self._thread_pool,
375 cast(
376 "Callable[..., list[Message | None]]",
377 lambda: self.consumer.consume(
378 num_messages=max_records or 10,
379 timeout=timeout,
380 ),
381 ),
382 )
383 return tuple(x for x in map(check_msg_error, raw_messages) if x is not None)
385 async def seek(self, topic: str, partition: int, offset: int) -> None:
386 """Seeks to the specified offset in the specified topic and partition."""
387 topic_partition = TopicPartition(
388 topic=topic,
389 partition=partition,
390 offset=offset,
391 )
392 await run_in_executor(
393 self._thread_pool,
394 self.consumer.seek,
395 topic_partition.to_confluent(),
396 )
399def check_msg_error(msg: Message | None) -> Message | None:
400 """Checks for errors in the consumed message."""
401 if msg is None or msg.error():
402 return None
404 return msg
407class BatchBuilder:
408 """A helper class to build a batch of messages to send to Kafka."""
410 def __init__(self) -> None:
411 """Initializes a new BatchBuilder instance."""
412 self._builder: list[dict[str, Any]] = []
414 def append(
415 self,
416 *,
417 value: bytes | str | None = None,
418 key: bytes | str | None = None,
419 timestamp: int | None = None,
420 headers: list[tuple[str, bytes]] | None = None,
421 ) -> None:
422 """Appends a message to the batch with optional timestamp, key, value, and headers."""
423 if key is None and value is None: 423 ↛ 424line 423 didn't jump to line 424 because the condition on line 423 was never true
424 raise KafkaException(
425 KafkaError(40, "Both key and value can't be None"),
426 )
428 self._builder.append(
429 {
430 "timestamp_ms": timestamp or round(time() * 1000),
431 "key": key,
432 "value": value,
433 "headers": headers or [],
434 },
435 )