Coverage for faststream / confluent / opentelemetry / provider.py: 82%

25 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from collections.abc import Sequence 

2from typing import TYPE_CHECKING, Any, Union, cast 

3 

4from opentelemetry.semconv.trace import SpanAttributes 

5 

6from faststream._internal.types import MsgType 

7from faststream.confluent.response import KafkaPublishCommand 

8from faststream.opentelemetry import TelemetrySettingsProvider 

9from faststream.opentelemetry.consts import MESSAGING_DESTINATION_PUBLISH_NAME 

10 

11if TYPE_CHECKING: 

12 from confluent_kafka import Message 

13 

14 from faststream.message import StreamMessage 

15 from faststream.response import PublishCommand 

16 

17 

18class BaseConfluentTelemetrySettingsProvider( 

19 TelemetrySettingsProvider[MsgType, KafkaPublishCommand] 

20): 

21 __slots__ = ("messaging_system",) 

22 

23 def __init__(self) -> None: 

24 self.messaging_system = "kafka" 

25 

26 def get_publish_attrs_from_cmd(self, cmd: "KafkaPublishCommand") -> dict[str, Any]: 

27 attrs: dict[str, Any] = { 

28 SpanAttributes.MESSAGING_SYSTEM: self.messaging_system, 

29 SpanAttributes.MESSAGING_DESTINATION_NAME: cmd.destination, 

30 SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID: cmd.correlation_id, 

31 } 

32 

33 if cmd.partition is not None: 33 ↛ 34line 33 didn't jump to line 34 because the condition on line 33 was never true

34 attrs[SpanAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION] = cmd.partition 

35 

36 if cmd.key is not None: 36 ↛ 37line 36 didn't jump to line 37 because the condition on line 36 was never true

37 attrs[SpanAttributes.MESSAGING_KAFKA_MESSAGE_KEY] = cmd.key 

38 

39 return attrs 

40 

41 def get_publish_destination_name(self, cmd: "PublishCommand") -> str: 

42 return cmd.destination 

43 

44 

45class ConfluentTelemetrySettingsProvider( 

46 BaseConfluentTelemetrySettingsProvider["Message"], 

47): 

48 def get_consume_attrs_from_message( 

49 self, 

50 msg: "StreamMessage[Message]", 

51 ) -> dict[str, Any]: 

52 attrs = { 

53 SpanAttributes.MESSAGING_SYSTEM: self.messaging_system, 

54 SpanAttributes.MESSAGING_MESSAGE_ID: msg.message_id, 

55 SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID: msg.correlation_id, 

56 SpanAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES: len(msg.body), 

57 SpanAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION: msg.raw_message.partition(), 

58 SpanAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET: msg.raw_message.offset(), 

59 MESSAGING_DESTINATION_PUBLISH_NAME: msg.raw_message.topic(), 

60 } 

61 

62 if (key := msg.raw_message.key()) is not None: 62 ↛ 63line 62 didn't jump to line 63 because the condition on line 62 was never true

63 attrs[SpanAttributes.MESSAGING_KAFKA_MESSAGE_KEY] = key 

64 

65 return attrs 

66 

67 def get_consume_destination_name( 

68 self, 

69 msg: "StreamMessage[Message]", 

70 ) -> str: 

71 return cast("str", msg.raw_message.topic()) 

72 

73 

74class BatchConfluentTelemetrySettingsProvider( 

75 BaseConfluentTelemetrySettingsProvider[tuple["Message", ...]], 

76): 

77 def get_consume_attrs_from_message( 

78 self, 

79 msg: "StreamMessage[tuple[Message, ...]]", 

80 ) -> dict[str, Any]: 

81 raw_message = msg.raw_message[0] 

82 return { 

83 SpanAttributes.MESSAGING_SYSTEM: self.messaging_system, 

84 SpanAttributes.MESSAGING_MESSAGE_ID: msg.message_id, 

85 SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID: msg.correlation_id, 

86 SpanAttributes.MESSAGING_BATCH_MESSAGE_COUNT: len(msg.raw_message), 

87 SpanAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES: len( 

88 bytearray().join(cast("Sequence[bytes]", msg.body)), 

89 ), 

90 SpanAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION: raw_message.partition(), 

91 MESSAGING_DESTINATION_PUBLISH_NAME: raw_message.topic(), 

92 } 

93 

94 def get_consume_destination_name( 

95 self, 

96 msg: "StreamMessage[tuple[Message, ...]]", 

97 ) -> str: 

98 return cast("str", msg.raw_message[0].topic()) 

99 

100 

101def telemetry_attributes_provider_factory( 

102 msg: Union["Message", Sequence["Message"], None], 

103) -> ConfluentTelemetrySettingsProvider | BatchConfluentTelemetrySettingsProvider: 

104 if isinstance(msg, Sequence): 

105 return BatchConfluentTelemetrySettingsProvider() 

106 return ConfluentTelemetrySettingsProvider()