Coverage for faststream / confluent / opentelemetry / provider.py: 82%
25 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from collections.abc import Sequence
2from typing import TYPE_CHECKING, Any, Union, cast
4from opentelemetry.semconv.trace import SpanAttributes
6from faststream._internal.types import MsgType
7from faststream.confluent.response import KafkaPublishCommand
8from faststream.opentelemetry import TelemetrySettingsProvider
9from faststream.opentelemetry.consts import MESSAGING_DESTINATION_PUBLISH_NAME
11if TYPE_CHECKING:
12 from confluent_kafka import Message
14 from faststream.message import StreamMessage
15 from faststream.response import PublishCommand
18class BaseConfluentTelemetrySettingsProvider(
19 TelemetrySettingsProvider[MsgType, KafkaPublishCommand]
20):
21 __slots__ = ("messaging_system",)
23 def __init__(self) -> None:
24 self.messaging_system = "kafka"
26 def get_publish_attrs_from_cmd(self, cmd: "KafkaPublishCommand") -> dict[str, Any]:
27 attrs: dict[str, Any] = {
28 SpanAttributes.MESSAGING_SYSTEM: self.messaging_system,
29 SpanAttributes.MESSAGING_DESTINATION_NAME: cmd.destination,
30 SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID: cmd.correlation_id,
31 }
33 if cmd.partition is not None: 33 ↛ 34line 33 didn't jump to line 34 because the condition on line 33 was never true
34 attrs[SpanAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION] = cmd.partition
36 if cmd.key is not None: 36 ↛ 37line 36 didn't jump to line 37 because the condition on line 36 was never true
37 attrs[SpanAttributes.MESSAGING_KAFKA_MESSAGE_KEY] = cmd.key
39 return attrs
41 def get_publish_destination_name(self, cmd: "PublishCommand") -> str:
42 return cmd.destination
45class ConfluentTelemetrySettingsProvider(
46 BaseConfluentTelemetrySettingsProvider["Message"],
47):
48 def get_consume_attrs_from_message(
49 self,
50 msg: "StreamMessage[Message]",
51 ) -> dict[str, Any]:
52 attrs = {
53 SpanAttributes.MESSAGING_SYSTEM: self.messaging_system,
54 SpanAttributes.MESSAGING_MESSAGE_ID: msg.message_id,
55 SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID: msg.correlation_id,
56 SpanAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES: len(msg.body),
57 SpanAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION: msg.raw_message.partition(),
58 SpanAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET: msg.raw_message.offset(),
59 MESSAGING_DESTINATION_PUBLISH_NAME: msg.raw_message.topic(),
60 }
62 if (key := msg.raw_message.key()) is not None: 62 ↛ 63line 62 didn't jump to line 63 because the condition on line 62 was never true
63 attrs[SpanAttributes.MESSAGING_KAFKA_MESSAGE_KEY] = key
65 return attrs
67 def get_consume_destination_name(
68 self,
69 msg: "StreamMessage[Message]",
70 ) -> str:
71 return cast("str", msg.raw_message.topic())
74class BatchConfluentTelemetrySettingsProvider(
75 BaseConfluentTelemetrySettingsProvider[tuple["Message", ...]],
76):
77 def get_consume_attrs_from_message(
78 self,
79 msg: "StreamMessage[tuple[Message, ...]]",
80 ) -> dict[str, Any]:
81 raw_message = msg.raw_message[0]
82 return {
83 SpanAttributes.MESSAGING_SYSTEM: self.messaging_system,
84 SpanAttributes.MESSAGING_MESSAGE_ID: msg.message_id,
85 SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID: msg.correlation_id,
86 SpanAttributes.MESSAGING_BATCH_MESSAGE_COUNT: len(msg.raw_message),
87 SpanAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES: len(
88 bytearray().join(cast("Sequence[bytes]", msg.body)),
89 ),
90 SpanAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION: raw_message.partition(),
91 MESSAGING_DESTINATION_PUBLISH_NAME: raw_message.topic(),
92 }
94 def get_consume_destination_name(
95 self,
96 msg: "StreamMessage[tuple[Message, ...]]",
97 ) -> str:
98 return cast("str", msg.raw_message[0].topic())
101def telemetry_attributes_provider_factory(
102 msg: Union["Message", Sequence["Message"], None],
103) -> ConfluentTelemetrySettingsProvider | BatchConfluentTelemetrySettingsProvider:
104 if isinstance(msg, Sequence):
105 return BatchConfluentTelemetrySettingsProvider()
106 return ConfluentTelemetrySettingsProvider()