Coverage for faststream / confluent / helpers / admin.py: 91%
29 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from dataclasses import dataclass
2from typing import TYPE_CHECKING
4from confluent_kafka.admin import ( # type: ignore[attr-defined]
5 AdminClient,
6 NewTopic,
7)
9if TYPE_CHECKING:
10 from .config import ConfluentFastConfig
13@dataclass
14class CreateResult:
15 topic: str
16 error: Exception | None
19class AdminService:
20 def __init__(self) -> None:
21 self.admin_client: AdminClient | None = None
23 async def connect(self, config: "ConfluentFastConfig") -> None:
24 if self.admin_client is None: 24 ↛ exitline 24 didn't return from function 'connect' because the condition on line 24 was always true
25 self.admin_client = AdminClient(config.admin_config)
27 async def disconnect(self) -> None:
28 self.admin_client = None
30 @property
31 def client(self) -> AdminClient:
32 assert self.admin_client is not None, (
33 "Admin client was not connected. Please, connect the broker first."
34 )
35 return self.admin_client
37 def create_topics(self, topics: list[str]) -> list[CreateResult]:
38 create_result = self.client.create_topics(
39 [NewTopic(topic, num_partitions=1, replication_factor=1) for topic in topics],
40 )
42 final_results = []
43 for topic, f in create_result.items():
44 try:
45 f.result()
47 except Exception as e:
48 if "TOPIC_ALREADY_EXISTS" not in str(e): 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true
49 result = CreateResult(topic, e)
50 else:
51 result = CreateResult(topic, None)
53 else:
54 result = CreateResult(topic, None)
56 final_results.append(result)
58 return final_results