Coverage for faststream / confluent / parser.py: 90%

19 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import TYPE_CHECKING, Any, cast 

2 

3from faststream.message import StreamMessage, decode_message 

4 

5from .message import FAKE_CONSUMER, KafkaMessage 

6 

7if TYPE_CHECKING: 

8 from collections.abc import Sequence 

9 

10 from confluent_kafka import Message 

11 

12 from faststream._internal.basic_types import DecodedMessage 

13 

14 from .message import ConsumerProtocol 

15 

16 # Type of headers returned by confluent_kafka Message.headers() 

17 _HeadersInput = ( 

18 dict[str, str | bytes | None] 

19 | list[tuple[str, str | bytes | None]] 

20 | tuple[tuple[str, str | bytes | None], ...] 

21 ) 

22 

23 

24class AsyncConfluentParser: 

25 """A class to parse Kafka messages.""" 

26 

27 def __init__(self, is_manual: bool = False) -> None: 

28 self.is_manual = is_manual 

29 self._consumer: ConsumerProtocol = FAKE_CONSUMER 

30 

31 def _setup(self, consumer: "ConsumerProtocol") -> None: 

32 self._consumer = consumer 

33 

34 async def parse_message( 

35 self, 

36 message: "Message", 

37 ) -> KafkaMessage: 

38 """Parses a Kafka message.""" 

39 headers = _parse_msg_headers(cast("_HeadersInput", message.headers() or ())) 

40 

41 body = message.value() or b"" 

42 offset = message.offset() 

43 _, timestamp = message.timestamp() 

44 

45 return KafkaMessage( 

46 body=body, 

47 headers=headers, 

48 reply_to=headers.get("reply_to", ""), 

49 content_type=headers.get("content-type"), 

50 message_id=f"{offset}-{timestamp}", 

51 correlation_id=headers.get("correlation_id"), 

52 raw_message=message, 

53 consumer=self._consumer, 

54 is_manual=self.is_manual, 

55 ) 

56 

57 async def parse_batch( 

58 self, 

59 message: tuple["Message", ...], 

60 ) -> KafkaMessage: 

61 """Parses a batch of messages from a Kafka consumer.""" 

62 body: list[Any] = [] 

63 batch_headers: list[dict[str, str]] = [] 

64 

65 first = message[0] 

66 last = message[-1] 

67 

68 for m in message: 

69 body.append(m.value() or b"") 

70 batch_headers.append( 

71 _parse_msg_headers(cast("_HeadersInput", m.headers() or ())) 

72 ) 

73 

74 headers = next(iter(batch_headers), {}) 

75 

76 _, first_timestamp = first.timestamp() 

77 

78 return KafkaMessage( 

79 body=body, 

80 headers=headers, 

81 batch_headers=batch_headers, 

82 reply_to=headers.get("reply_to", ""), 

83 content_type=headers.get("content-type"), 

84 message_id=f"{first.offset()}-{last.offset()}-{first_timestamp}", 

85 correlation_id=headers.get("correlation_id"), 

86 raw_message=message, 

87 consumer=self._consumer, 

88 is_manual=self.is_manual, 

89 ) 

90 

91 async def decode_message( 

92 self, 

93 msg: "StreamMessage[Message]", 

94 ) -> "DecodedMessage": 

95 """Decodes a message.""" 

96 return decode_message(msg) 

97 

98 async def decode_batch( 

99 self, 

100 msg: "StreamMessage[tuple[Message, ...]]", 

101 ) -> "DecodedMessage": 

102 """Decode a batch of messages.""" 

103 return [decode_message(await self.parse_message(m)) for m in msg.raw_message] 

104 

105 

106def _parse_msg_headers(headers: "_HeadersInput") -> dict[str, str]: 

107 if isinstance(headers, dict): 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true

108 seq: Sequence[tuple[str, bytes | str | None]] = list(headers.items()) 

109 else: 

110 seq = headers 

111 return { 

112 i: (j if isinstance(j, str) else (j.decode() if j is not None else "")) 

113 for i, j in seq 

114 if j is not None 

115 }