Coverage for faststream / kafka / configs / broker.py: 89%
24 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from collections.abc import Callable
2from dataclasses import dataclass, field
3from functools import partial
4from typing import Any, Optional
6import aiokafka
7import aiokafka.admin
9from faststream.__about__ import SERVICE_NAME
10from faststream._internal.configs import BrokerConfig
11from faststream._internal.utils.data import filter_by_dict
12from faststream.exceptions import IncorrectState
13from faststream.kafka.publisher.producer import (
14 AioKafkaFastProducer,
15 FakeAioKafkaFastProducer,
16)
17from faststream.kafka.schemas.params import (
18 AdminClientConnectionParams,
19 ConsumerConnectionParams,
20)
23@dataclass(kw_only=True)
24class KafkaBrokerConfig(BrokerConfig):
25 producer: "AioKafkaFastProducer" = field(default_factory=FakeAioKafkaFastProducer)
26 builder: Callable[..., aiokafka.AIOKafkaConsumer] = lambda: None
28 client_id: str | None = SERVICE_NAME
30 _admin_client: Optional["aiokafka.admin.client.AIOKafkaAdminClient"] = None
32 @property
33 def admin_client(self) -> "aiokafka.admin.client.AIOKafkaAdminClient":
34 if self._admin_client is None: 34 ↛ 35line 34 didn't jump to line 35 because the condition on line 34 was never true
35 msg = "Admin client is not initialized. Call connect() first."
36 raise IncorrectState(msg)
38 return self._admin_client
40 async def connect(self, **connection_kwargs: Any) -> "None":
41 producer = aiokafka.AIOKafkaProducer(**connection_kwargs)
42 await self.producer.connect(producer, serializer=self.fd_config._serializer)
44 admin_options, _ = filter_by_dict(
45 AdminClientConnectionParams,
46 connection_kwargs,
47 )
49 self._admin_client = aiokafka.admin.client.AIOKafkaAdminClient(**admin_options)
50 await self._admin_client.start()
52 consumer_options, _ = filter_by_dict(
53 ConsumerConnectionParams,
54 connection_kwargs,
55 )
56 self.builder = partial(aiokafka.AIOKafkaConsumer, **consumer_options)
58 async def disconnect(self) -> "None":
59 if self._admin_client is not None:
60 await self._admin_client.close()
61 self._admin_client = None
63 await self.producer.disconnect()