Coverage for faststream / confluent / schemas / partition.py: 81%

17 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import TYPE_CHECKING 

2 

3from confluent_kafka import TopicPartition as ConfluentPartition 

4 

5if TYPE_CHECKING: 

6 from typing_extensions import NotRequired, TypedDict 

7 

8 class _TopicKwargs(TypedDict): 

9 topic: str 

10 partition: int 

11 offset: int 

12 metadata: NotRequired[str] 

13 leader_epoch: NotRequired[int] 

14 

15 

16class TopicPartition: 

17 __slots__ = ( 

18 "leader_epoch", 

19 "metadata", 

20 "offset", 

21 "partition", 

22 "topic", 

23 ) 

24 

25 def __init__( 

26 self, 

27 topic: str, 

28 partition: int = -1, 

29 offset: int = -1001, 

30 metadata: str | None = None, 

31 leader_epoch: int | None = None, 

32 ) -> None: 

33 self.topic = topic 

34 self.partition = partition 

35 self.offset = offset 

36 self.metadata = metadata 

37 self.leader_epoch = leader_epoch 

38 

39 def to_confluent(self) -> ConfluentPartition: 

40 kwargs: _TopicKwargs = { 

41 "topic": self.topic, 

42 "partition": self.partition, 

43 "offset": self.offset, 

44 } 

45 if self.metadata is not None: 45 ↛ 46line 45 didn't jump to line 46 because the condition on line 45 was never true

46 kwargs["metadata"] = self.metadata 

47 if self.leader_epoch is not None: 47 ↛ 48line 47 didn't jump to line 48 because the condition on line 47 was never true

48 kwargs["leader_epoch"] = self.leader_epoch 

49 return ConfluentPartition(**kwargs) 

50 

51 def add_prefix(self, prefix: str) -> "TopicPartition": 

52 return TopicPartition( 

53 topic=f"{prefix}{self.topic}", 

54 partition=self.partition, 

55 offset=self.offset, 

56 metadata=self.metadata, 

57 leader_epoch=self.leader_epoch, 

58 )