Coverage for faststream / kafka / opentelemetry / provider.py: 82%

25 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from collections.abc import Sequence 

2from typing import TYPE_CHECKING, Any, Union, cast 

3 

4from opentelemetry.semconv.trace import SpanAttributes 

5 

6from faststream._internal.types import MsgType 

7from faststream.kafka.response import KafkaPublishCommand 

8from faststream.opentelemetry import TelemetrySettingsProvider 

9from faststream.opentelemetry.consts import MESSAGING_DESTINATION_PUBLISH_NAME 

10 

11if TYPE_CHECKING: 

12 from aiokafka import ConsumerRecord 

13 

14 from faststream.message import StreamMessage 

15 from faststream.response import PublishCommand 

16 

17 

18class BaseKafkaTelemetrySettingsProvider( 

19 TelemetrySettingsProvider[MsgType, KafkaPublishCommand], 

20): 

21 __slots__ = ("messaging_system",) 

22 

23 def __init__(self) -> None: 

24 self.messaging_system = "kafka" 

25 

26 def get_publish_attrs_from_cmd( 

27 self, 

28 cmd: "KafkaPublishCommand", 

29 ) -> dict[str, Any]: 

30 attrs: dict[str, Any] = { 

31 SpanAttributes.MESSAGING_SYSTEM: self.messaging_system, 

32 SpanAttributes.MESSAGING_DESTINATION_NAME: cmd.destination, 

33 SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID: cmd.correlation_id, 

34 } 

35 

36 if cmd.partition is not None: 36 ↛ 37line 36 didn't jump to line 37 because the condition on line 36 was never true

37 attrs[SpanAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION] = cmd.partition 

38 

39 if cmd.key is not None: 39 ↛ 40line 39 didn't jump to line 40 because the condition on line 39 was never true

40 attrs[SpanAttributes.MESSAGING_KAFKA_MESSAGE_KEY] = cmd.key 

41 

42 return attrs 

43 

44 def get_publish_destination_name( 

45 self, 

46 cmd: "PublishCommand", 

47 ) -> str: 

48 return cmd.destination 

49 

50 

51class KafkaTelemetrySettingsProvider( 

52 BaseKafkaTelemetrySettingsProvider["ConsumerRecord"], 

53): 

54 def get_consume_attrs_from_message( 

55 self, 

56 msg: "StreamMessage[ConsumerRecord]", 

57 ) -> dict[str, Any]: 

58 attrs = { 

59 SpanAttributes.MESSAGING_SYSTEM: self.messaging_system, 

60 SpanAttributes.MESSAGING_MESSAGE_ID: msg.message_id, 

61 SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID: msg.correlation_id, 

62 SpanAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES: len(msg.body), 

63 SpanAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION: msg.raw_message.partition, 

64 SpanAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET: msg.raw_message.offset, 

65 MESSAGING_DESTINATION_PUBLISH_NAME: msg.raw_message.topic, 

66 } 

67 

68 if msg.raw_message.key is not None: 68 ↛ 69line 68 didn't jump to line 69 because the condition on line 68 was never true

69 attrs[SpanAttributes.MESSAGING_KAFKA_MESSAGE_KEY] = msg.raw_message.key 

70 

71 return attrs 

72 

73 def get_consume_destination_name( 

74 self, 

75 msg: "StreamMessage[ConsumerRecord]", 

76 ) -> str: 

77 return cast("str", msg.raw_message.topic) 

78 

79 

80class BatchKafkaTelemetrySettingsProvider( 

81 BaseKafkaTelemetrySettingsProvider[tuple["ConsumerRecord", ...]], 

82): 

83 def get_consume_attrs_from_message( 

84 self, 

85 msg: "StreamMessage[tuple[ConsumerRecord, ...]]", 

86 ) -> dict[str, Any]: 

87 raw_message = msg.raw_message[0] 

88 

89 return { 

90 SpanAttributes.MESSAGING_SYSTEM: self.messaging_system, 

91 SpanAttributes.MESSAGING_MESSAGE_ID: msg.message_id, 

92 SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID: msg.correlation_id, 

93 SpanAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES: len( 

94 bytearray().join(cast("Sequence[bytes]", msg.body)), 

95 ), 

96 SpanAttributes.MESSAGING_BATCH_MESSAGE_COUNT: len(msg.raw_message), 

97 SpanAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION: raw_message.partition, 

98 MESSAGING_DESTINATION_PUBLISH_NAME: raw_message.topic, 

99 } 

100 

101 def get_consume_destination_name( 

102 self, 

103 msg: "StreamMessage[tuple[ConsumerRecord, ...]]", 

104 ) -> str: 

105 return cast("str", msg.raw_message[0].topic) 

106 

107 

108def telemetry_attributes_provider_factory( 

109 msg: Union["ConsumerRecord", Sequence["ConsumerRecord"], None], 

110) -> KafkaTelemetrySettingsProvider | BatchKafkaTelemetrySettingsProvider: 

111 if isinstance(msg, Sequence): 

112 return BatchKafkaTelemetrySettingsProvider() 

113 return KafkaTelemetrySettingsProvider()