Coverage for faststream / confluent / helpers / admin.py: 91%

29 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from dataclasses import dataclass 

2from typing import TYPE_CHECKING 

3 

4from confluent_kafka.admin import ( # type: ignore[attr-defined] 

5 AdminClient, 

6 NewTopic, 

7) 

8 

9if TYPE_CHECKING: 

10 from .config import ConfluentFastConfig 

11 

12 

13@dataclass 

14class CreateResult: 

15 topic: str 

16 error: Exception | None 

17 

18 

19class AdminService: 

20 def __init__(self) -> None: 

21 self.admin_client: AdminClient | None = None 

22 

23 async def connect(self, config: "ConfluentFastConfig") -> None: 

24 if self.admin_client is None: 24 ↛ exitline 24 didn't return from function 'connect' because the condition on line 24 was always true

25 self.admin_client = AdminClient(config.admin_config) 

26 

27 async def disconnect(self) -> None: 

28 self.admin_client = None 

29 

30 @property 

31 def client(self) -> AdminClient: 

32 assert self.admin_client is not None, ( 

33 "Admin client was not connected. Please, connect the broker first." 

34 ) 

35 return self.admin_client 

36 

37 def create_topics(self, topics: list[str]) -> list[CreateResult]: 

38 create_result = self.client.create_topics( 

39 [NewTopic(topic, num_partitions=1, replication_factor=1) for topic in topics], 

40 ) 

41 

42 final_results = [] 

43 for topic, f in create_result.items(): 

44 try: 

45 f.result() 

46 

47 except Exception as e: 

48 if "TOPIC_ALREADY_EXISTS" not in str(e): 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true

49 result = CreateResult(topic, e) 

50 else: 

51 result = CreateResult(topic, None) 

52 

53 else: 

54 result = CreateResult(topic, None) 

55 

56 final_results.append(result) 

57 

58 return final_results