Coverage for faststream / kafka / opentelemetry / provider.py: 82%
25 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from collections.abc import Sequence
2from typing import TYPE_CHECKING, Any, Union, cast
4from opentelemetry.semconv.trace import SpanAttributes
6from faststream._internal.types import MsgType
7from faststream.kafka.response import KafkaPublishCommand
8from faststream.opentelemetry import TelemetrySettingsProvider
9from faststream.opentelemetry.consts import MESSAGING_DESTINATION_PUBLISH_NAME
11if TYPE_CHECKING:
12 from aiokafka import ConsumerRecord
14 from faststream.message import StreamMessage
15 from faststream.response import PublishCommand
18class BaseKafkaTelemetrySettingsProvider(
19 TelemetrySettingsProvider[MsgType, KafkaPublishCommand],
20):
21 __slots__ = ("messaging_system",)
23 def __init__(self) -> None:
24 self.messaging_system = "kafka"
26 def get_publish_attrs_from_cmd(
27 self,
28 cmd: "KafkaPublishCommand",
29 ) -> dict[str, Any]:
30 attrs: dict[str, Any] = {
31 SpanAttributes.MESSAGING_SYSTEM: self.messaging_system,
32 SpanAttributes.MESSAGING_DESTINATION_NAME: cmd.destination,
33 SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID: cmd.correlation_id,
34 }
36 if cmd.partition is not None: 36 ↛ 37line 36 didn't jump to line 37 because the condition on line 36 was never true
37 attrs[SpanAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION] = cmd.partition
39 if cmd.key is not None: 39 ↛ 40line 39 didn't jump to line 40 because the condition on line 39 was never true
40 attrs[SpanAttributes.MESSAGING_KAFKA_MESSAGE_KEY] = cmd.key
42 return attrs
44 def get_publish_destination_name(
45 self,
46 cmd: "PublishCommand",
47 ) -> str:
48 return cmd.destination
51class KafkaTelemetrySettingsProvider(
52 BaseKafkaTelemetrySettingsProvider["ConsumerRecord"],
53):
54 def get_consume_attrs_from_message(
55 self,
56 msg: "StreamMessage[ConsumerRecord]",
57 ) -> dict[str, Any]:
58 attrs = {
59 SpanAttributes.MESSAGING_SYSTEM: self.messaging_system,
60 SpanAttributes.MESSAGING_MESSAGE_ID: msg.message_id,
61 SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID: msg.correlation_id,
62 SpanAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES: len(msg.body),
63 SpanAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION: msg.raw_message.partition,
64 SpanAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET: msg.raw_message.offset,
65 MESSAGING_DESTINATION_PUBLISH_NAME: msg.raw_message.topic,
66 }
68 if msg.raw_message.key is not None: 68 ↛ 69line 68 didn't jump to line 69 because the condition on line 68 was never true
69 attrs[SpanAttributes.MESSAGING_KAFKA_MESSAGE_KEY] = msg.raw_message.key
71 return attrs
73 def get_consume_destination_name(
74 self,
75 msg: "StreamMessage[ConsumerRecord]",
76 ) -> str:
77 return cast("str", msg.raw_message.topic)
80class BatchKafkaTelemetrySettingsProvider(
81 BaseKafkaTelemetrySettingsProvider[tuple["ConsumerRecord", ...]],
82):
83 def get_consume_attrs_from_message(
84 self,
85 msg: "StreamMessage[tuple[ConsumerRecord, ...]]",
86 ) -> dict[str, Any]:
87 raw_message = msg.raw_message[0]
89 return {
90 SpanAttributes.MESSAGING_SYSTEM: self.messaging_system,
91 SpanAttributes.MESSAGING_MESSAGE_ID: msg.message_id,
92 SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID: msg.correlation_id,
93 SpanAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES: len(
94 bytearray().join(cast("Sequence[bytes]", msg.body)),
95 ),
96 SpanAttributes.MESSAGING_BATCH_MESSAGE_COUNT: len(msg.raw_message),
97 SpanAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION: raw_message.partition,
98 MESSAGING_DESTINATION_PUBLISH_NAME: raw_message.topic,
99 }
101 def get_consume_destination_name(
102 self,
103 msg: "StreamMessage[tuple[ConsumerRecord, ...]]",
104 ) -> str:
105 return cast("str", msg.raw_message[0].topic)
108def telemetry_attributes_provider_factory(
109 msg: Union["ConsumerRecord", Sequence["ConsumerRecord"], None],
110) -> KafkaTelemetrySettingsProvider | BatchKafkaTelemetrySettingsProvider:
111 if isinstance(msg, Sequence):
112 return BatchKafkaTelemetrySettingsProvider()
113 return KafkaTelemetrySettingsProvider()