Coverage for faststream / confluent / helpers / client.py: 82%

106 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import asyncio 

2import logging 

3from collections.abc import Callable, Sequence 

4from concurrent.futures import ThreadPoolExecutor 

5from contextlib import suppress 

6from time import time 

7from typing import TYPE_CHECKING, Any, cast 

8 

9import anyio 

10from confluent_kafka import Consumer, KafkaError, KafkaException, Message, Producer 

11 

12from faststream._internal.utils.functions import call_or_await, run_in_executor 

13from faststream.confluent.schemas import TopicPartition 

14from faststream.exceptions import SetupError 

15 

16from . import config as config_module 

17 

18if TYPE_CHECKING: 

19 from typing_extensions import NotRequired, TypedDict 

20 

21 from faststream._internal.logger import LoggerState 

22 

23 from .admin import AdminService 

24 

25 class _SendKwargs(TypedDict): 

26 value: bytes | str | None 

27 key: bytes | str | None 

28 headers: list[tuple[str, str | bytes]] | None 

29 partition: NotRequired[int] 

30 timestamp: NotRequired[int] 

31 on_delivery: NotRequired[Callable[..., None]] 

32 

33 

34class AsyncConfluentProducer: 

35 """An asynchronous Python Kafka client using the "confluent-kafka" package.""" 

36 

37 def __init__( 

38 self, 

39 *, 

40 logger: "LoggerState", 

41 config: config_module.ConfluentFastConfig, 

42 ) -> None: 

43 self.logger_state = logger 

44 

45 self.config = config.producer_config 

46 self.producer = Producer( 

47 self.config, 

48 logger=self.logger_state.logger.logger, 

49 ) 

50 

51 self.__running = True 

52 self._poll_task = asyncio.create_task(self._poll_loop()) 

53 

54 async def _poll_loop(self) -> None: 

55 while self.__running: 55 ↛ exitline 55 didn't return from function '_poll_loop' because the condition on line 55 was always true

56 with suppress(Exception): 

57 await call_or_await(self.producer.poll, 0.1) 

58 

59 async def stop(self) -> None: 

60 """Stop the Kafka producer and flush remaining messages.""" 

61 if self.__running: 61 ↛ exitline 61 didn't return from function 'stop' because the condition on line 61 was always true

62 self.__running = False 

63 if not self._poll_task.done(): 63 ↛ 65line 63 didn't jump to line 65 because the condition on line 63 was always true

64 self._poll_task.cancel() 

65 await call_or_await(self.producer.flush) 

66 

67 async def flush(self) -> None: 

68 await call_or_await(self.producer.flush) 

69 

70 async def send( 

71 self, 

72 topic: str, 

73 value: bytes | str | None = None, 

74 key: bytes | str | None = None, 

75 partition: int | None = None, 

76 timestamp_ms: int | None = None, 

77 headers: list[tuple[str, str | bytes]] | None = None, 

78 no_confirm: bool = False, 

79 ) -> "asyncio.Future[Message | None] | Message | None": 

80 """Sends a single message to a Kafka topic.""" 

81 kwargs: _SendKwargs = { 

82 "value": value, 

83 "key": key, 

84 "headers": headers, 

85 } 

86 

87 if partition is not None: 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true

88 kwargs["partition"] = partition 

89 

90 if timestamp_ms is not None: 

91 kwargs["timestamp"] = timestamp_ms 

92 

93 loop = asyncio.get_running_loop() 

94 result_future: asyncio.Future[Message | None] = loop.create_future() 

95 

96 def ack_callback(err: Any, msg: Message | None) -> None: 

97 if err or (msg is not None and (err := msg.error())): 97 ↛ 98line 97 didn't jump to line 98 because the condition on line 97 was never true

98 loop.call_soon_threadsafe( 

99 result_future.set_exception, 

100 KafkaException(err), 

101 ) 

102 else: 

103 loop.call_soon_threadsafe(result_future.set_result, msg) 

104 

105 kwargs["on_delivery"] = ack_callback 

106 

107 # should be sync to prevent segfault 

108 # confluent stub expects bytes|None for value/key; we accept str and encode 

109 produce_value: bytes | None = ( 

110 kwargs["value"] 

111 if isinstance(kwargs["value"], (bytes, type(None))) 

112 else kwargs["value"].encode() 

113 ) 

114 produce_key: bytes | None = ( 

115 kwargs["key"] 

116 if isinstance(kwargs["key"], (bytes, type(None))) 

117 else (kwargs["key"].encode() if kwargs["key"] is not None else None) 

118 ) 

119 produce_headers: ( 

120 dict[str, str | bytes | None] | list[tuple[str, str | bytes | None]] | None 

121 ) = cast("Any", kwargs["headers"]) if kwargs.get("headers") is not None else None 

122 produce_kwargs: dict[str, Any] = { 

123 "value": produce_value, 

124 "key": produce_key, 

125 "headers": produce_headers, 

126 "on_delivery": kwargs["on_delivery"], 

127 } 

128 if kwargs.get("partition") is not None: 128 ↛ 129line 128 didn't jump to line 129 because the condition on line 128 was never true

129 produce_kwargs["partition"] = kwargs["partition"] 

130 if kwargs.get("timestamp") is not None: 

131 produce_kwargs["timestamp"] = kwargs["timestamp"] 

132 self.producer.produce(topic, **produce_kwargs) 

133 

134 if no_confirm: 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true

135 return result_future 

136 return await result_future 

137 

138 def create_batch(self) -> "BatchBuilder": 

139 """Creates a batch for sending multiple messages.""" 

140 return BatchBuilder() 

141 

142 async def send_batch( 

143 self, 

144 batch: "BatchBuilder", 

145 topic: str, 

146 *, 

147 partition: int | None, 

148 no_confirm: bool = False, 

149 ) -> None: 

150 """Sends a batch of messages to a Kafka topic.""" 

151 async with anyio.create_task_group() as tg: 

152 for msg in batch._builder: 

153 tg.start_soon( 

154 self.send, 

155 topic, 

156 msg["value"], 

157 msg["key"], 

158 partition, 

159 msg["timestamp_ms"], 

160 msg["headers"], 

161 no_confirm, 

162 ) 

163 

164 async def ping( 

165 self, 

166 timeout: float | None = 5.0, 

167 ) -> bool: 

168 """Implement ping using `list_topics` information request.""" 

169 if timeout is None: 169 ↛ 170line 169 didn't jump to line 170 because the condition on line 169 was never true

170 timeout = -1 

171 

172 try: 

173 cluster_metadata = await call_or_await( 

174 self.producer.list_topics, 

175 timeout=timeout, 

176 ) 

177 

178 return bool(cluster_metadata) 

179 

180 except Exception: 

181 return False 

182 

183 

184class AsyncConfluentConsumer: 

185 """An asynchronous Python Kafka client for consuming messages using the "confluent-kafka" package.""" 

186 

187 def __init__( 

188 self, 

189 *topics: str, 

190 config: config_module.ConfluentFastConfig, 

191 logger: "LoggerState", 

192 admin_service: "AdminService", 

193 # kwargs options 

194 partitions: Sequence["TopicPartition"], 

195 bootstrap_servers: str | list[str] = "localhost", 

196 # consumer options 

197 client_id: str | None = "confluent-kafka-consumer", 

198 group_id: str | None = None, 

199 group_instance_id: str | None = None, 

200 fetch_max_wait_ms: int = 500, 

201 fetch_max_bytes: int = 52428800, 

202 fetch_min_bytes: int = 1, 

203 max_partition_fetch_bytes: int = 1 * 1024 * 1024, 

204 retry_backoff_ms: int = 100, 

205 auto_offset_reset: str = "latest", 

206 enable_auto_commit: bool = True, 

207 auto_commit_interval_ms: int = 5000, 

208 check_crcs: bool = True, 

209 metadata_max_age_ms: int = 5 * 60 * 1000, 

210 partition_assignment_strategy: str | list[Any] = "roundrobin", 

211 max_poll_interval_ms: int = 300000, 

212 session_timeout_ms: int = 10000, 

213 heartbeat_interval_ms: int = 3000, 

214 security_protocol: str = "PLAINTEXT", 

215 connections_max_idle_ms: int = 540000, 

216 isolation_level: str = "read_uncommitted", 

217 allow_auto_create_topics: bool = True, 

218 # rebalance callbacks 

219 on_assign: Callable[..., None] | None = None, 

220 on_revoke: Callable[..., None] | None = None, 

221 on_lost: Callable[..., None] | None = None, 

222 ) -> None: 

223 self.admin_client = admin_service 

224 self.logger_state = logger 

225 

226 self._on_assign = on_assign 

227 self._on_revoke = on_revoke 

228 self._on_lost = on_lost 

229 

230 self.topics = list(topics) 

231 self.partitions = partitions 

232 

233 if not isinstance(partition_assignment_strategy, str): 

234 partition_assignment_strategy = ",".join( 

235 [ 

236 x if isinstance(x, str) else x().name 

237 for x in partition_assignment_strategy 

238 ], 

239 ) 

240 

241 config_from_params = { 

242 "allow.auto.create.topics": allow_auto_create_topics, 

243 "topic.metadata.refresh.interval.ms": 1000, 

244 "bootstrap.servers": bootstrap_servers, 

245 "client.id": client_id, 

246 "group.id": group_id or "faststream-consumer-group", 

247 "group.instance.id": group_instance_id, 

248 "fetch.wait.max.ms": fetch_max_wait_ms, 

249 "fetch.max.bytes": fetch_max_bytes, 

250 "fetch.min.bytes": fetch_min_bytes, 

251 "max.partition.fetch.bytes": max_partition_fetch_bytes, 

252 "fetch.error.backoff.ms": retry_backoff_ms, 

253 "auto.offset.reset": auto_offset_reset, 

254 "enable.auto.commit": enable_auto_commit, 

255 "auto.commit.interval.ms": auto_commit_interval_ms, 

256 "check.crcs": check_crcs, 

257 "metadata.max.age.ms": metadata_max_age_ms, 

258 "partition.assignment.strategy": partition_assignment_strategy, 

259 "max.poll.interval.ms": max_poll_interval_ms, 

260 "session.timeout.ms": session_timeout_ms, 

261 "heartbeat.interval.ms": heartbeat_interval_ms, 

262 "security.protocol": security_protocol.lower(), 

263 "connections.max.idle.ms": connections_max_idle_ms, 

264 "isolation.level": isolation_level, 

265 } | config.consumer_config 

266 

267 self.config = config_from_params 

268 self.consumer = Consumer(self.config, logger=self.logger_state.logger.logger) 

269 

270 # A pool with single thread is used in order to execute the commands of the consumer sequentially: 

271 # https://github.com/ag2ai/faststream/issues/1904#issuecomment-2506990895 

272 self._thread_pool = ThreadPoolExecutor(max_workers=1) 

273 

274 @property 

275 def topics_to_create(self) -> list[str]: 

276 return list({*self.topics, *(p.topic for p in self.partitions)}) 

277 

278 async def start(self) -> None: 

279 """Starts the Kafka consumer and subscribes to the specified topics.""" 

280 if self.config.get("allow.auto.create.topics", True): 

281 topics_creation_result = await run_in_executor( 

282 self._thread_pool, 

283 self.admin_client.create_topics, 

284 self.topics_to_create, 

285 ) 

286 

287 for create_result in topics_creation_result: 

288 if create_result.error: 

289 self.logger_state.log( 

290 log_level=logging.WARNING, 

291 message=f"Failed to create topic {create_result.topic}: {create_result.error}", 

292 ) 

293 

294 else: 

295 self.logger_state.log( 

296 log_level=logging.WARNING, 

297 message="Auto create topics is disabled. Make sure the topics exist.", 

298 ) 

299 

300 if self.topics: 

301 subscribe_kwargs: dict[str, Any] = {"topics": self.topics} 

302 if self._on_assign is not None: 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true

303 subscribe_kwargs["on_assign"] = self._on_assign 

304 if self._on_revoke is not None: 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true

305 subscribe_kwargs["on_revoke"] = self._on_revoke 

306 if self._on_lost is not None: 306 ↛ 307line 306 didn't jump to line 307 because the condition on line 306 was never true

307 subscribe_kwargs["on_lost"] = self._on_lost 

308 await run_in_executor( 

309 self._thread_pool, 

310 self.consumer.subscribe, 

311 **subscribe_kwargs, 

312 ) 

313 

314 elif self.partitions: 314 ↛ 322line 314 didn't jump to line 322 because the condition on line 314 was always true

315 await run_in_executor( 

316 self._thread_pool, 

317 self.consumer.assign, 

318 [p.to_confluent() for p in self.partitions], 

319 ) 

320 

321 else: 

322 msg = "You must provide either `topics` or `partitions` option." 

323 raise SetupError(msg) 

324 

325 async def commit(self, asynchronous: bool = True) -> None: 

326 """Commits the offsets of all messages returned by the last poll operation.""" 

327 await run_in_executor( 

328 self._thread_pool, 

329 lambda: self.consumer.commit(asynchronous=asynchronous), # type: ignore[call-overload] 

330 ) 

331 

332 async def stop(self) -> None: 

333 """Stops the Kafka consumer and releases all resources.""" 

334 # NOTE: If we don't explicitly call commit and then close the consumer, the confluent consumer gets stuck. 

335 # We are doing this to avoid the issue. 

336 enable_auto_commit = self.config.get("enable.auto.commit", True) 

337 

338 try: 

339 if enable_auto_commit: 

340 await self.commit(asynchronous=False) 

341 

342 except Exception as e: 

343 # No offset stored issue is not a problem - https://github.com/confluentinc/confluent-kafka-python/issues/295#issuecomment-355907183 

344 if "No offset stored" in str(e): 

345 pass 

346 else: 

347 self.logger_state.log( 

348 log_level=logging.ERROR, 

349 message="Consumer closing error occurred.", 

350 exc_info=e, 

351 ) 

352 

353 # Wrap calls to async to make method cancelable by timeout 

354 # We shouldn't read messages and close consumer concurrently 

355 # https://github.com/ag2ai/faststream/issues/1904#issuecomment-2506990895 

356 # Now it works without lock due `ThreadPoolExecutor(max_workers=1)` 

357 # that makes all calls to consumer sequential 

358 await run_in_executor(self._thread_pool, self.consumer.close) 

359 

360 self._thread_pool.shutdown(wait=False) 

361 

362 async def getone(self, timeout: float = 0.1) -> Message | None: 

363 """Consumes a single message from Kafka.""" 

364 msg = await run_in_executor(self._thread_pool, self.consumer.poll, timeout) 

365 return check_msg_error(msg) 

366 

367 async def getmany( 

368 self, 

369 timeout: float = 0.1, 

370 max_records: int | None = 10, 

371 ) -> tuple[Message, ...]: 

372 """Consumes a batch of messages from Kafka and groups them by topic and partition.""" 

373 raw_messages: list[Message | None] = await run_in_executor( 

374 self._thread_pool, 

375 cast( 

376 "Callable[..., list[Message | None]]", 

377 lambda: self.consumer.consume( 

378 num_messages=max_records or 10, 

379 timeout=timeout, 

380 ), 

381 ), 

382 ) 

383 return tuple(x for x in map(check_msg_error, raw_messages) if x is not None) 

384 

385 async def seek(self, topic: str, partition: int, offset: int) -> None: 

386 """Seeks to the specified offset in the specified topic and partition.""" 

387 topic_partition = TopicPartition( 

388 topic=topic, 

389 partition=partition, 

390 offset=offset, 

391 ) 

392 await run_in_executor( 

393 self._thread_pool, 

394 self.consumer.seek, 

395 topic_partition.to_confluent(), 

396 ) 

397 

398 

399def check_msg_error(msg: Message | None) -> Message | None: 

400 """Checks for errors in the consumed message.""" 

401 if msg is None or msg.error(): 

402 return None 

403 

404 return msg 

405 

406 

407class BatchBuilder: 

408 """A helper class to build a batch of messages to send to Kafka.""" 

409 

410 def __init__(self) -> None: 

411 """Initializes a new BatchBuilder instance.""" 

412 self._builder: list[dict[str, Any]] = [] 

413 

414 def append( 

415 self, 

416 *, 

417 value: bytes | str | None = None, 

418 key: bytes | str | None = None, 

419 timestamp: int | None = None, 

420 headers: list[tuple[str, bytes]] | None = None, 

421 ) -> None: 

422 """Appends a message to the batch with optional timestamp, key, value, and headers.""" 

423 if key is None and value is None: 423 ↛ 424line 423 didn't jump to line 424 because the condition on line 423 was never true

424 raise KafkaException( 

425 KafkaError(40, "Both key and value can't be None"), 

426 ) 

427 

428 self._builder.append( 

429 { 

430 "timestamp_ms": timestamp or round(time() * 1000), 

431 "key": key, 

432 "value": value, 

433 "headers": headers or [], 

434 }, 

435 )