Coverage for faststream / opentelemetry / baggage.py: 98%

41 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import TYPE_CHECKING, Any, cast 

2 

3from opentelemetry import baggage, context 

4from opentelemetry.baggage.propagation import W3CBaggagePropagator 

5from typing_extensions import Self 

6 

7if TYPE_CHECKING: 

8 from faststream.message import StreamMessage 

9 

10_BAGGAGE_PROPAGATOR = W3CBaggagePropagator() 

11 

12 

13class Baggage: 

14 __slots__ = ("_baggage", "_batch_baggage") 

15 

16 def __init__( 

17 self, 

18 payload: dict[str, Any], 

19 batch_payload: list[dict[str, Any]] | None = None, 

20 ) -> None: 

21 self._baggage = dict(payload) 

22 self._batch_baggage = [dict(b) for b in batch_payload] if batch_payload else [] 

23 

24 def get_all(self) -> dict[str, Any]: 

25 """Get a copy of the current baggage.""" 

26 return self._baggage.copy() 

27 

28 def get_all_batch(self) -> list[dict[str, Any]]: 

29 """Get a copy of all batch baggage if exists.""" 

30 return self._batch_baggage.copy() 

31 

32 def get(self, key: str) -> Any | None: 

33 """Get a value from the baggage by key.""" 

34 return self._baggage.get(key) 

35 

36 def remove(self, key: str) -> None: 

37 """Remove a baggage item by key.""" 

38 self._baggage.pop(key, None) 

39 

40 def set(self, key: str, value: Any) -> None: 

41 """Set a key-value pair in the baggage.""" 

42 self._baggage[key] = value 

43 

44 def clear(self) -> None: 

45 """Clear the current baggage.""" 

46 self._baggage.clear() 

47 

48 def to_headers(self, headers: dict[str, Any] | None = None) -> dict[str, Any]: 

49 """Convert baggage items to headers format for propagation.""" 

50 current_context = context.get_current() 

51 if headers is None: 51 ↛ 54line 51 didn't jump to line 54 because the condition on line 51 was always true

52 headers = {} 

53 

54 for k, v in self._baggage.items(): 

55 current_context = baggage.set_baggage(k, v, context=current_context) 

56 

57 _BAGGAGE_PROPAGATOR.inject(headers, current_context) 

58 return headers 

59 

60 @classmethod 

61 def from_msg(cls, msg: "StreamMessage[Any]") -> Self: 

62 """Create a Baggage instance from a StreamMessage.""" 

63 if len(msg.batch_headers) <= 1: 

64 return cls.from_headers(msg.headers) 

65 

66 cumulative_baggage: dict[str, Any] = {} 

67 batch_baggage: list[dict[str, Any]] = [] 

68 

69 for headers in msg.batch_headers: 

70 payload = baggage.get_all(_BAGGAGE_PROPAGATOR.extract(headers)) 

71 cumulative_baggage.update(payload) 

72 batch_baggage.append(cast("dict[str, Any]", payload)) 

73 

74 return cls(cumulative_baggage, batch_baggage) 

75 

76 @classmethod 

77 def from_headers(cls, headers: dict[str, Any]) -> Self: 

78 """Create a Baggage instance from headers.""" 

79 payload = baggage.get_all(_BAGGAGE_PROPAGATOR.extract(headers)) 

80 return cls(cast("dict[str, Any]", payload)) 

81 

82 def __repr__(self) -> str: 

83 return self._baggage.__repr__()