Coverage for faststream / opentelemetry / baggage.py: 98%
41 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import TYPE_CHECKING, Any, cast
3from opentelemetry import baggage, context
4from opentelemetry.baggage.propagation import W3CBaggagePropagator
5from typing_extensions import Self
7if TYPE_CHECKING:
8 from faststream.message import StreamMessage
10_BAGGAGE_PROPAGATOR = W3CBaggagePropagator()
13class Baggage:
14 __slots__ = ("_baggage", "_batch_baggage")
16 def __init__(
17 self,
18 payload: dict[str, Any],
19 batch_payload: list[dict[str, Any]] | None = None,
20 ) -> None:
21 self._baggage = dict(payload)
22 self._batch_baggage = [dict(b) for b in batch_payload] if batch_payload else []
24 def get_all(self) -> dict[str, Any]:
25 """Get a copy of the current baggage."""
26 return self._baggage.copy()
28 def get_all_batch(self) -> list[dict[str, Any]]:
29 """Get a copy of all batch baggage if exists."""
30 return self._batch_baggage.copy()
32 def get(self, key: str) -> Any | None:
33 """Get a value from the baggage by key."""
34 return self._baggage.get(key)
36 def remove(self, key: str) -> None:
37 """Remove a baggage item by key."""
38 self._baggage.pop(key, None)
40 def set(self, key: str, value: Any) -> None:
41 """Set a key-value pair in the baggage."""
42 self._baggage[key] = value
44 def clear(self) -> None:
45 """Clear the current baggage."""
46 self._baggage.clear()
48 def to_headers(self, headers: dict[str, Any] | None = None) -> dict[str, Any]:
49 """Convert baggage items to headers format for propagation."""
50 current_context = context.get_current()
51 if headers is None: 51 ↛ 54line 51 didn't jump to line 54 because the condition on line 51 was always true
52 headers = {}
54 for k, v in self._baggage.items():
55 current_context = baggage.set_baggage(k, v, context=current_context)
57 _BAGGAGE_PROPAGATOR.inject(headers, current_context)
58 return headers
60 @classmethod
61 def from_msg(cls, msg: "StreamMessage[Any]") -> Self:
62 """Create a Baggage instance from a StreamMessage."""
63 if len(msg.batch_headers) <= 1:
64 return cls.from_headers(msg.headers)
66 cumulative_baggage: dict[str, Any] = {}
67 batch_baggage: list[dict[str, Any]] = []
69 for headers in msg.batch_headers:
70 payload = baggage.get_all(_BAGGAGE_PROPAGATOR.extract(headers))
71 cumulative_baggage.update(payload)
72 batch_baggage.append(cast("dict[str, Any]", payload))
74 return cls(cumulative_baggage, batch_baggage)
76 @classmethod
77 def from_headers(cls, headers: dict[str, Any]) -> Self:
78 """Create a Baggage instance from headers."""
79 payload = baggage.get_all(_BAGGAGE_PROPAGATOR.extract(headers))
80 return cls(cast("dict[str, Any]", payload))
82 def __repr__(self) -> str:
83 return self._baggage.__repr__()