Coverage for faststream / confluent / schemas / partition.py: 81%
17 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import TYPE_CHECKING
3from confluent_kafka import TopicPartition as ConfluentPartition
5if TYPE_CHECKING:
6 from typing_extensions import NotRequired, TypedDict
8 class _TopicKwargs(TypedDict):
9 topic: str
10 partition: int
11 offset: int
12 metadata: NotRequired[str]
13 leader_epoch: NotRequired[int]
16class TopicPartition:
17 __slots__ = (
18 "leader_epoch",
19 "metadata",
20 "offset",
21 "partition",
22 "topic",
23 )
25 def __init__(
26 self,
27 topic: str,
28 partition: int = -1,
29 offset: int = -1001,
30 metadata: str | None = None,
31 leader_epoch: int | None = None,
32 ) -> None:
33 self.topic = topic
34 self.partition = partition
35 self.offset = offset
36 self.metadata = metadata
37 self.leader_epoch = leader_epoch
39 def to_confluent(self) -> ConfluentPartition:
40 kwargs: _TopicKwargs = {
41 "topic": self.topic,
42 "partition": self.partition,
43 "offset": self.offset,
44 }
45 if self.metadata is not None: 45 ↛ 46line 45 didn't jump to line 46 because the condition on line 45 was never true
46 kwargs["metadata"] = self.metadata
47 if self.leader_epoch is not None: 47 ↛ 48line 47 didn't jump to line 48 because the condition on line 47 was never true
48 kwargs["leader_epoch"] = self.leader_epoch
49 return ConfluentPartition(**kwargs)
51 def add_prefix(self, prefix: str) -> "TopicPartition":
52 return TopicPartition(
53 topic=f"{prefix}{self.topic}",
54 partition=self.partition,
55 offset=self.offset,
56 metadata=self.metadata,
57 leader_epoch=self.leader_epoch,
58 )