Coverage for faststream / kafka / configs / broker.py: 89%

24 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from collections.abc import Callable 

2from dataclasses import dataclass, field 

3from functools import partial 

4from typing import Any, Optional 

5 

6import aiokafka 

7import aiokafka.admin 

8 

9from faststream.__about__ import SERVICE_NAME 

10from faststream._internal.configs import BrokerConfig 

11from faststream._internal.utils.data import filter_by_dict 

12from faststream.exceptions import IncorrectState 

13from faststream.kafka.publisher.producer import ( 

14 AioKafkaFastProducer, 

15 FakeAioKafkaFastProducer, 

16) 

17from faststream.kafka.schemas.params import ( 

18 AdminClientConnectionParams, 

19 ConsumerConnectionParams, 

20) 

21 

22 

23@dataclass(kw_only=True) 

24class KafkaBrokerConfig(BrokerConfig): 

25 producer: "AioKafkaFastProducer" = field(default_factory=FakeAioKafkaFastProducer) 

26 builder: Callable[..., aiokafka.AIOKafkaConsumer] = lambda: None 

27 

28 client_id: str | None = SERVICE_NAME 

29 

30 _admin_client: Optional["aiokafka.admin.client.AIOKafkaAdminClient"] = None 

31 

32 @property 

33 def admin_client(self) -> "aiokafka.admin.client.AIOKafkaAdminClient": 

34 if self._admin_client is None: 34 ↛ 35line 34 didn't jump to line 35 because the condition on line 34 was never true

35 msg = "Admin client is not initialized. Call connect() first." 

36 raise IncorrectState(msg) 

37 

38 return self._admin_client 

39 

40 async def connect(self, **connection_kwargs: Any) -> "None": 

41 producer = aiokafka.AIOKafkaProducer(**connection_kwargs) 

42 await self.producer.connect(producer, serializer=self.fd_config._serializer) 

43 

44 admin_options, _ = filter_by_dict( 

45 AdminClientConnectionParams, 

46 connection_kwargs, 

47 ) 

48 

49 self._admin_client = aiokafka.admin.client.AIOKafkaAdminClient(**admin_options) 

50 await self._admin_client.start() 

51 

52 consumer_options, _ = filter_by_dict( 

53 ConsumerConnectionParams, 

54 connection_kwargs, 

55 ) 

56 self.builder = partial(aiokafka.AIOKafkaConsumer, **consumer_options) 

57 

58 async def disconnect(self) -> "None": 

59 if self._admin_client is not None: 

60 await self._admin_client.close() 

61 self._admin_client = None 

62 

63 await self.producer.disconnect()