Coverage for faststream / confluent / parser.py: 90%
19 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import TYPE_CHECKING, Any, cast
3from faststream.message import StreamMessage, decode_message
5from .message import FAKE_CONSUMER, KafkaMessage
7if TYPE_CHECKING:
8 from collections.abc import Sequence
10 from confluent_kafka import Message
12 from faststream._internal.basic_types import DecodedMessage
14 from .message import ConsumerProtocol
16 # Type of headers returned by confluent_kafka Message.headers()
17 _HeadersInput = (
18 dict[str, str | bytes | None]
19 | list[tuple[str, str | bytes | None]]
20 | tuple[tuple[str, str | bytes | None], ...]
21 )
24class AsyncConfluentParser:
25 """A class to parse Kafka messages."""
27 def __init__(self, is_manual: bool = False) -> None:
28 self.is_manual = is_manual
29 self._consumer: ConsumerProtocol = FAKE_CONSUMER
31 def _setup(self, consumer: "ConsumerProtocol") -> None:
32 self._consumer = consumer
34 async def parse_message(
35 self,
36 message: "Message",
37 ) -> KafkaMessage:
38 """Parses a Kafka message."""
39 headers = _parse_msg_headers(cast("_HeadersInput", message.headers() or ()))
41 body = message.value() or b""
42 offset = message.offset()
43 _, timestamp = message.timestamp()
45 return KafkaMessage(
46 body=body,
47 headers=headers,
48 reply_to=headers.get("reply_to", ""),
49 content_type=headers.get("content-type"),
50 message_id=f"{offset}-{timestamp}",
51 correlation_id=headers.get("correlation_id"),
52 raw_message=message,
53 consumer=self._consumer,
54 is_manual=self.is_manual,
55 )
57 async def parse_batch(
58 self,
59 message: tuple["Message", ...],
60 ) -> KafkaMessage:
61 """Parses a batch of messages from a Kafka consumer."""
62 body: list[Any] = []
63 batch_headers: list[dict[str, str]] = []
65 first = message[0]
66 last = message[-1]
68 for m in message:
69 body.append(m.value() or b"")
70 batch_headers.append(
71 _parse_msg_headers(cast("_HeadersInput", m.headers() or ()))
72 )
74 headers = next(iter(batch_headers), {})
76 _, first_timestamp = first.timestamp()
78 return KafkaMessage(
79 body=body,
80 headers=headers,
81 batch_headers=batch_headers,
82 reply_to=headers.get("reply_to", ""),
83 content_type=headers.get("content-type"),
84 message_id=f"{first.offset()}-{last.offset()}-{first_timestamp}",
85 correlation_id=headers.get("correlation_id"),
86 raw_message=message,
87 consumer=self._consumer,
88 is_manual=self.is_manual,
89 )
91 async def decode_message(
92 self,
93 msg: "StreamMessage[Message]",
94 ) -> "DecodedMessage":
95 """Decodes a message."""
96 return decode_message(msg)
98 async def decode_batch(
99 self,
100 msg: "StreamMessage[tuple[Message, ...]]",
101 ) -> "DecodedMessage":
102 """Decode a batch of messages."""
103 return [decode_message(await self.parse_message(m)) for m in msg.raw_message]
106def _parse_msg_headers(headers: "_HeadersInput") -> dict[str, str]:
107 if isinstance(headers, dict): 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true
108 seq: Sequence[tuple[str, bytes | str | None]] = list(headers.items())
109 else:
110 seq = headers
111 return {
112 i: (j if isinstance(j, str) else (j.decode() if j is not None else ""))
113 for i, j in seq
114 if j is not None
115 }