Coverage for faststream / kafka / message.py: 96%
24 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import Any, Protocol, Union
3from aiokafka import (
4 AIOKafkaConsumer,
5 ConsumerRecord,
6 TopicPartition as AIOKafkaTopicPartition,
7)
9from faststream.message import AckStatus, StreamMessage
12class ConsumerProtocol(Protocol):
13 """A protocol for Kafka consumers."""
15 async def commit(self) -> None: ...
17 def seek(
18 self,
19 partition: AIOKafkaTopicPartition,
20 offset: int,
21 ) -> None:
22 pass
25class FakeConsumer:
26 """A fake Kafka consumer."""
28 async def commit(self) -> None:
29 pass
31 def seek(
32 self,
33 partition: AIOKafkaTopicPartition,
34 offset: int,
35 ) -> None:
36 pass
39FAKE_CONSUMER = FakeConsumer()
42class KafkaRawMessage(ConsumerRecord): # type: ignore[misc]
43 consumer: AIOKafkaConsumer
46class KafkaMessage(
47 StreamMessage[
48 Union[
49 "ConsumerRecord",
50 tuple["ConsumerRecord", ...],
51 ]
52 ],
53):
54 """Represents a Kafka message in the FastStream framework.
56 This class extends `StreamMessage` and is specialized for handling Kafka ConsumerRecord objects.
57 """
59 def __init__(self, *args: Any, consumer: ConsumerProtocol, **kwargs: Any) -> None:
60 super().__init__(*args, **kwargs)
61 self.consumer = consumer
62 self.committed = AckStatus.ACKED
65class KafkaAckableMessage(KafkaMessage):
66 def __init__(self, *args: Any, **kwargs: Any) -> None:
67 super().__init__(*args, **kwargs)
68 self.committed = None
70 async def ack(self) -> None:
71 if not self.committed:
72 await self.consumer.commit()
73 await super().ack()
75 async def reject(self) -> None:
76 await self.ack()
78 async def nack(self) -> None:
79 if not self.committed: 79 ↛ 93line 79 didn't jump to line 93 because the condition on line 79 was always true
80 raw_message = (
81 self.raw_message[0]
82 if isinstance(self.raw_message, tuple)
83 else self.raw_message
84 )
85 topic_partition = AIOKafkaTopicPartition(
86 raw_message.topic,
87 raw_message.partition,
88 )
89 self.consumer.seek(
90 partition=topic_partition,
91 offset=raw_message.offset,
92 )
93 await super().nack()