Coverage for faststream / kafka / message.py: 96%

24 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import Any, Protocol, Union 

2 

3from aiokafka import ( 

4 AIOKafkaConsumer, 

5 ConsumerRecord, 

6 TopicPartition as AIOKafkaTopicPartition, 

7) 

8 

9from faststream.message import AckStatus, StreamMessage 

10 

11 

12class ConsumerProtocol(Protocol): 

13 """A protocol for Kafka consumers.""" 

14 

15 async def commit(self) -> None: ... 

16 

17 def seek( 

18 self, 

19 partition: AIOKafkaTopicPartition, 

20 offset: int, 

21 ) -> None: 

22 pass 

23 

24 

25class FakeConsumer: 

26 """A fake Kafka consumer.""" 

27 

28 async def commit(self) -> None: 

29 pass 

30 

31 def seek( 

32 self, 

33 partition: AIOKafkaTopicPartition, 

34 offset: int, 

35 ) -> None: 

36 pass 

37 

38 

39FAKE_CONSUMER = FakeConsumer() 

40 

41 

42class KafkaRawMessage(ConsumerRecord): # type: ignore[misc] 

43 consumer: AIOKafkaConsumer 

44 

45 

46class KafkaMessage( 

47 StreamMessage[ 

48 Union[ 

49 "ConsumerRecord", 

50 tuple["ConsumerRecord", ...], 

51 ] 

52 ], 

53): 

54 """Represents a Kafka message in the FastStream framework. 

55 

56 This class extends `StreamMessage` and is specialized for handling Kafka ConsumerRecord objects. 

57 """ 

58 

59 def __init__(self, *args: Any, consumer: ConsumerProtocol, **kwargs: Any) -> None: 

60 super().__init__(*args, **kwargs) 

61 self.consumer = consumer 

62 self.committed = AckStatus.ACKED 

63 

64 

65class KafkaAckableMessage(KafkaMessage): 

66 def __init__(self, *args: Any, **kwargs: Any) -> None: 

67 super().__init__(*args, **kwargs) 

68 self.committed = None 

69 

70 async def ack(self) -> None: 

71 if not self.committed: 

72 await self.consumer.commit() 

73 await super().ack() 

74 

75 async def reject(self) -> None: 

76 await self.ack() 

77 

78 async def nack(self) -> None: 

79 if not self.committed: 79 ↛ 93line 79 didn't jump to line 93 because the condition on line 79 was always true

80 raw_message = ( 

81 self.raw_message[0] 

82 if isinstance(self.raw_message, tuple) 

83 else self.raw_message 

84 ) 

85 topic_partition = AIOKafkaTopicPartition( 

86 raw_message.topic, 

87 raw_message.partition, 

88 ) 

89 self.consumer.seek( 

90 partition=topic_partition, 

91 offset=raw_message.offset, 

92 ) 

93 await super().nack()