Coverage for tests / opentelemetry / basic.py: 99%
288 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import asyncio
2from typing import Any, cast
3from unittest.mock import MagicMock
5import pytest
6from dirty_equals import IsFloat, IsUUID
7from opentelemetry import baggage, context
8from opentelemetry.baggage.propagation import W3CBaggagePropagator
9from opentelemetry.sdk.metrics import MeterProvider
10from opentelemetry.sdk.metrics._internal.point import Metric
11from opentelemetry.sdk.metrics.export import InMemoryMetricReader
12from opentelemetry.sdk.resources import Resource
13from opentelemetry.sdk.trace import Span, TracerProvider
14from opentelemetry.sdk.trace.export import SimpleSpanProcessor
15from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
16from opentelemetry.semconv.trace import SpanAttributes as SpanAttr
17from opentelemetry.trace import SpanKind, get_current_span
19from faststream._internal.broker import BrokerUsecase
20from faststream.opentelemetry import Baggage, CurrentBaggage, CurrentSpan
21from faststream.opentelemetry.consts import (
22 ERROR_TYPE,
23 MESSAGING_DESTINATION_PUBLISH_NAME,
24)
25from faststream.opentelemetry.middleware import (
26 MessageAction as Action,
27 TelemetryMiddleware,
28)
29from tests.brokers.base.basic import BaseTestcaseConfig
32@pytest.mark.asyncio()
33class LocalTelemetryTestcase(BaseTestcaseConfig):
34 messaging_system: str
35 include_messages_counters: bool
36 resource: Resource = Resource.create(attributes={"service.name": "faststream.test"})
37 telemetry_middleware_class: TelemetryMiddleware
39 def get_broker(
40 self,
41 apply_types: bool = False,
42 **kwargs: Any,
43 ) -> BrokerUsecase[Any, Any]:
44 raise NotImplementedError
46 def patch_broker(
47 self,
48 broker: BrokerUsecase[Any, Any],
49 **kwargs: Any,
50 ) -> BrokerUsecase[Any, Any]:
51 return broker
53 def destination_name(self, queue: str) -> str:
54 return queue
56 @staticmethod
57 def get_spans(exporter: InMemorySpanExporter) -> list[Span]:
58 spans = cast("tuple[Span, ...]", exporter.get_finished_spans())
59 return sorted(spans, key=lambda s: s.start_time or 0)
61 @staticmethod
62 def get_metrics(
63 reader: InMemoryMetricReader,
64 ) -> list[Metric]:
65 """Get sorted metrics.
67 Return order:
68 - messaging.process.duration
69 - messaging.process.messages
70 - messaging.publish.duration
71 - messaging.publish.messages
72 """
73 metrics = reader.get_metrics_data()
74 metrics = metrics.resource_metrics[0].scope_metrics[0].metrics
75 metrics = sorted(metrics, key=lambda m: m.name)
76 return cast("list[Metric]", metrics)
78 @pytest.fixture()
79 def tracer_provider(self) -> TracerProvider:
80 return TracerProvider(resource=self.resource)
82 @pytest.fixture()
83 def trace_exporter(self, tracer_provider: TracerProvider) -> InMemorySpanExporter:
84 exporter = InMemorySpanExporter()
85 tracer_provider.add_span_processor(SimpleSpanProcessor(exporter))
86 return exporter
88 @pytest.fixture()
89 def metric_reader(self) -> InMemoryMetricReader:
90 return InMemoryMetricReader()
92 @pytest.fixture()
93 def meter_provider(self, metric_reader: InMemoryMetricReader) -> MeterProvider:
94 return MeterProvider(metric_readers=(metric_reader,), resource=self.resource)
96 def assert_span(
97 self,
98 span: Span,
99 action: str,
100 queue: str,
101 msg: str,
102 parent_span_id: str | None = None,
103 ) -> None:
104 attrs = span.attributes or {}
105 assert attrs[SpanAttr.MESSAGING_SYSTEM] == self.messaging_system, attrs[
106 SpanAttr.MESSAGING_SYSTEM
107 ]
108 assert attrs[SpanAttr.MESSAGING_MESSAGE_CONVERSATION_ID] == IsUUID, attrs[
109 SpanAttr.MESSAGING_MESSAGE_CONVERSATION_ID
110 ]
111 assert span.name == f"{self.destination_name(queue)} {action}", span.name
112 assert span.kind in {SpanKind.CONSUMER, SpanKind.PRODUCER}, span.kind
114 if span.kind == SpanKind.PRODUCER and action in {Action.CREATE, Action.PUBLISH}:
115 assert attrs[SpanAttr.MESSAGING_DESTINATION_NAME] == queue, attrs[
116 SpanAttr.MESSAGING_DESTINATION_NAME
117 ]
119 if span.kind == SpanKind.CONSUMER and action in {Action.CREATE, Action.PROCESS}:
120 assert attrs[MESSAGING_DESTINATION_PUBLISH_NAME] == queue, attrs[
121 MESSAGING_DESTINATION_PUBLISH_NAME
122 ]
123 assert attrs[SpanAttr.MESSAGING_MESSAGE_ID] == IsUUID, attrs[
124 SpanAttr.MESSAGING_MESSAGE_ID
125 ]
127 if action == Action.PROCESS:
128 assert attrs[SpanAttr.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES] == len(msg), (
129 attrs[SpanAttr.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES]
130 )
131 assert attrs[SpanAttr.MESSAGING_OPERATION] == action, attrs[
132 SpanAttr.MESSAGING_OPERATION
133 ]
135 if action == Action.PUBLISH:
136 assert attrs[SpanAttr.MESSAGING_OPERATION] == action, attrs[
137 SpanAttr.MESSAGING_OPERATION
138 ]
140 if parent_span_id:
141 assert span.parent
142 assert span.parent.span_id == parent_span_id, span.parent.span_id
144 def assert_metrics(
145 self,
146 metrics: list[Metric],
147 count: int = 1,
148 error_type: str | None = None,
149 ) -> None:
150 if self.include_messages_counters:
151 assert len(metrics) == 4
152 proc_dur, proc_msg, pub_dur, pub_msg = metrics
154 assert proc_msg.data.data_points[0].value == count
155 assert pub_msg.data.data_points[0].value == count
157 else:
158 assert len(metrics) == 2
159 proc_dur, pub_dur = metrics
161 if error_type:
162 assert proc_dur.data.data_points[0].attributes[ERROR_TYPE] == error_type
164 assert proc_dur.data.data_points[0].count == 1
165 assert proc_dur.data.data_points[0].sum == IsFloat
167 assert pub_dur.data.data_points[0].count == 1
168 assert pub_dur.data.data_points[0].sum == IsFloat
170 async def test_subscriber_create_publish_process_span(
171 self,
172 queue: str,
173 mock: MagicMock,
174 tracer_provider: TracerProvider,
175 trace_exporter: InMemorySpanExporter,
176 event: asyncio.Event,
177 ) -> None:
178 mid = self.telemetry_middleware_class(tracer_provider=tracer_provider)
179 broker = self.get_broker(middlewares=(mid,))
181 args, kwargs = self.get_subscriber_params(queue)
183 @broker.subscriber(*args, **kwargs)
184 async def handler(m) -> None:
185 mock(m)
186 event.set()
188 broker = self.patch_broker(broker)
189 msg = "start"
191 async with broker:
192 await broker.start()
193 tasks = (
194 asyncio.create_task(broker.publish(msg, queue)),
195 asyncio.create_task(event.wait()),
196 )
197 await asyncio.wait(tasks, timeout=self.timeout)
199 create, publish, process = self.get_spans(trace_exporter)
200 parent_span_id = create.context.span_id
202 self.assert_span(create, Action.CREATE, queue, msg)
203 self.assert_span(publish, Action.PUBLISH, queue, msg, parent_span_id)
204 self.assert_span(process, Action.PROCESS, queue, msg, parent_span_id)
206 mock.assert_called_once_with(msg)
208 async def test_chain_subscriber_publisher(
209 self,
210 queue: str,
211 mock: MagicMock,
212 tracer_provider: TracerProvider,
213 trace_exporter: InMemorySpanExporter,
214 event: asyncio.Event,
215 ) -> None:
216 mid = self.telemetry_middleware_class(tracer_provider=tracer_provider)
217 broker = self.get_broker(middlewares=(mid,))
219 first_queue = queue
220 second_queue = queue + "2"
222 args, kwargs = self.get_subscriber_params(first_queue)
224 @broker.subscriber(*args, **kwargs)
225 @broker.publisher(second_queue)
226 async def handler1(m):
227 return m
229 args2, kwargs2 = self.get_subscriber_params(second_queue)
231 @broker.subscriber(*args2, **kwargs2)
232 async def handler2(m) -> None:
233 mock(m)
234 event.set()
236 broker = self.patch_broker(broker)
237 msg = "start"
239 async with broker:
240 await broker.start()
241 tasks = (
242 asyncio.create_task(broker.publish(msg, queue)),
243 asyncio.create_task(event.wait()),
244 )
245 await asyncio.wait(tasks, timeout=self.timeout)
247 spans = self.get_spans(trace_exporter)
248 create, pub1, proc1, pub2, proc2 = spans
249 parent_span_id = create.context.span_id
251 self.assert_span(create, Action.CREATE, first_queue, msg)
252 self.assert_span(pub1, Action.PUBLISH, first_queue, msg, parent_span_id)
253 self.assert_span(proc1, Action.PROCESS, first_queue, msg, parent_span_id)
254 self.assert_span(pub2, Action.PUBLISH, second_queue, msg, proc1.context.span_id)
255 self.assert_span(proc2, Action.PROCESS, second_queue, msg, parent_span_id)
257 assert (
258 create.start_time
259 < pub1.start_time
260 < proc1.start_time
261 < pub2.start_time
262 < proc2.start_time
263 )
265 mock.assert_called_once_with(msg)
267 @pytest.mark.flaky(reruns=3, reruns_delay=1)
268 async def test_no_trace_context_create_process_span(
269 self,
270 queue: str,
271 tracer_provider: TracerProvider,
272 trace_exporter: InMemorySpanExporter,
273 event: asyncio.Event,
274 ) -> None:
275 mid = self.telemetry_middleware_class(tracer_provider=tracer_provider)
276 broker = self.get_broker(middlewares=(mid,))
277 broker_without_middlewares = self.get_broker()
279 args, kwargs = self.get_subscriber_params(queue)
281 @broker.subscriber(*args, **kwargs)
282 async def handler(m) -> None:
283 event.set()
285 broker = self.patch_broker(broker)
286 msg = "start"
288 async with broker, broker_without_middlewares:
289 await broker.start()
290 await broker_without_middlewares.start()
292 broker.config.broker_config.middlewares = ()
293 tasks = (
294 asyncio.create_task(broker_without_middlewares.publish(msg, queue)),
295 asyncio.create_task(event.wait()),
296 )
297 await asyncio.wait(tasks, timeout=self.timeout)
299 create, process = self.get_spans(trace_exporter)
300 parent_span_id = create.context.span_id
302 self.assert_span(create, Action.CREATE, queue, msg)
303 self.assert_span(process, Action.PROCESS, queue, msg, parent_span_id)
305 async def test_metrics(
306 self,
307 queue: str,
308 mock: MagicMock,
309 meter_provider: MeterProvider,
310 metric_reader: InMemoryMetricReader,
311 event: asyncio.Event,
312 ) -> None:
313 mid = self.telemetry_middleware_class(meter_provider=meter_provider)
314 broker = self.get_broker(middlewares=(mid,))
316 args, kwargs = self.get_subscriber_params(queue)
318 @broker.subscriber(*args, **kwargs)
319 async def handler(m) -> None:
320 mock(m)
321 event.set()
323 broker = self.patch_broker(broker)
324 msg = "start"
326 async with broker:
327 await broker.start()
328 tasks = (
329 asyncio.create_task(broker.publish(msg, queue)),
330 asyncio.create_task(event.wait()),
331 )
332 await asyncio.wait(tasks, timeout=self.timeout)
334 metrics = self.get_metrics(metric_reader)
336 self.assert_metrics(metrics)
337 mock.assert_called_once_with(msg)
339 async def test_error_metrics(
340 self,
341 queue: str,
342 mock: MagicMock,
343 meter_provider: MeterProvider,
344 metric_reader: InMemoryMetricReader,
345 event: asyncio.Event,
346 ) -> None:
347 mid = self.telemetry_middleware_class(meter_provider=meter_provider)
348 broker = self.get_broker(middlewares=(mid,))
349 expected_value_type = "ValueError"
351 args, kwargs = self.get_subscriber_params(queue)
353 @broker.subscriber(*args, **kwargs)
354 async def handler(m) -> None:
355 try:
356 raise ValueError
357 finally:
358 mock(m)
359 event.set()
361 broker = self.patch_broker(broker)
362 msg = "start"
364 async with broker:
365 await broker.start()
366 tasks = (
367 asyncio.create_task(broker.publish(msg, queue)),
368 asyncio.create_task(event.wait()),
369 )
370 await asyncio.wait(tasks, timeout=self.timeout)
372 metrics = self.get_metrics(metric_reader)
374 self.assert_metrics(metrics, error_type=expected_value_type)
375 mock.assert_called_once_with(msg)
377 async def test_span_in_context(
378 self,
379 queue: str,
380 mock: MagicMock,
381 tracer_provider: TracerProvider,
382 trace_exporter: InMemorySpanExporter,
383 event: asyncio.Event,
384 ) -> None:
385 mid = self.telemetry_middleware_class(tracer_provider=tracer_provider)
386 broker = self.get_broker(middlewares=(mid,), apply_types=True)
388 args, kwargs = self.get_subscriber_params(queue)
390 @broker.subscriber(*args, **kwargs)
391 async def handler(m, span: CurrentSpan) -> None:
392 assert span is get_current_span()
393 mock(m)
394 event.set()
396 broker = self.patch_broker(broker)
397 msg = "start"
399 async with broker:
400 await broker.start()
401 tasks = (
402 asyncio.create_task(broker.publish(msg, queue)),
403 asyncio.create_task(event.wait()),
404 )
405 await asyncio.wait(tasks, timeout=self.timeout)
407 mock.assert_called_once_with(msg)
409 async def test_get_baggage(
410 self,
411 queue: str,
412 mock: MagicMock,
413 event: asyncio.Event,
414 ) -> None:
415 mid = self.telemetry_middleware_class()
416 broker = self.get_broker(middlewares=(mid,), apply_types=True)
417 expected_baggage = {"foo": "bar"}
419 args, kwargs = self.get_subscriber_params(queue)
421 @broker.subscriber(*args, **kwargs)
422 async def handler1(m, baggage: CurrentBaggage) -> None:
423 assert baggage.get("foo") == "bar"
424 assert baggage.get_all() == expected_baggage
425 assert baggage.get_all_batch() == []
426 assert baggage.__repr__() == expected_baggage.__repr__()
427 mock(m)
428 event.set()
430 broker = self.patch_broker(broker)
431 msg = "start"
433 async with broker:
434 await broker.start()
435 tasks = (
436 asyncio.create_task(
437 broker.publish(
438 msg,
439 queue,
440 headers=Baggage({"foo": "bar"}).to_headers(),
441 ),
442 ),
443 asyncio.create_task(event.wait()),
444 )
445 await asyncio.wait(tasks, timeout=self.timeout)
447 mock.assert_called_once_with(msg)
449 async def test_clear_baggage(
450 self,
451 queue: str,
452 mock: MagicMock,
453 event: asyncio.Event,
454 ) -> None:
455 mid = self.telemetry_middleware_class()
456 broker = self.get_broker(middlewares=(mid,), apply_types=True)
458 first_queue = queue + "1"
459 second_queue = queue + "2"
461 args, kwargs = self.get_subscriber_params(first_queue)
463 @broker.subscriber(*args, **kwargs)
464 @broker.publisher(second_queue)
465 async def handler1(m, baggage: CurrentBaggage):
466 baggage.clear()
467 assert baggage.get_all() == {}
468 return m
470 args2, kwargs2 = self.get_subscriber_params(second_queue)
472 @broker.subscriber(*args2, **kwargs2)
473 async def handler2(m, baggage: CurrentBaggage) -> None:
474 assert baggage.get_all() == {}
475 mock(m)
476 event.set()
478 broker = self.patch_broker(broker)
479 msg = "start"
481 async with broker:
482 await broker.start()
483 tasks = (
484 asyncio.create_task(
485 broker.publish(
486 msg,
487 first_queue,
488 headers=Baggage({"foo": "bar"}).to_headers(),
489 ),
490 ),
491 asyncio.create_task(event.wait()),
492 )
493 await asyncio.wait(tasks, timeout=self.timeout)
495 mock.assert_called_once_with(msg)
497 async def test_modify_baggage(
498 self,
499 queue: str,
500 mock: MagicMock,
501 event: asyncio.Event,
502 ) -> None:
503 mid = self.telemetry_middleware_class()
504 broker = self.get_broker(middlewares=(mid,), apply_types=True)
505 expected_baggage = {"baz": "bar", "bar": "baz"}
507 first_queue = queue + "1"
508 second_queue = queue + "2"
510 args, kwargs = self.get_subscriber_params(first_queue)
512 @broker.subscriber(*args, **kwargs)
513 @broker.publisher(second_queue)
514 async def handler1(m, baggage: CurrentBaggage):
515 baggage.set("bar", "baz")
516 baggage.set("baz", "bar")
517 baggage.remove("foo")
518 return m
520 args2, kwargs2 = self.get_subscriber_params(second_queue)
522 @broker.subscriber(*args2, **kwargs2)
523 async def handler2(m, baggage: CurrentBaggage) -> None:
524 assert baggage.get_all() == expected_baggage
525 mock(m)
526 event.set()
528 broker = self.patch_broker(broker)
529 msg = "start"
531 async with broker:
532 await broker.start()
533 tasks = (
534 asyncio.create_task(
535 broker.publish(
536 msg,
537 first_queue,
538 headers=Baggage({"foo": "bar"}).to_headers(),
539 ),
540 ),
541 asyncio.create_task(event.wait()),
542 )
543 await asyncio.wait(tasks, timeout=self.timeout)
545 mock.assert_called_once_with(msg)
547 async def test_get_baggage_from_headers(
548 self,
549 queue: str,
550 event: asyncio.Event,
551 ) -> None:
552 mid = self.telemetry_middleware_class()
553 broker = self.get_broker(middlewares=(mid,), apply_types=True)
555 args, kwargs = self.get_subscriber_params(queue)
557 expected_baggage = {"foo": "bar", "bar": "baz"}
559 ctx = context.Context()
560 for key, value in expected_baggage.items():
561 ctx = baggage.set_baggage(key, value, context=ctx)
563 propagator = W3CBaggagePropagator()
564 headers = {}
565 propagator.inject(headers, context=ctx)
567 @broker.subscriber(*args, **kwargs)
568 async def handler() -> None:
569 baggage_instance = Baggage.from_headers(headers)
570 extracted_baggage = baggage_instance.get_all()
571 assert extracted_baggage == expected_baggage
572 event.set()
574 broker = self.patch_broker(broker)
575 msg = "start"
577 async with broker:
578 await broker.start()
579 tasks = (
580 asyncio.create_task(broker.publish(msg, queue, headers=headers)),
581 asyncio.create_task(event.wait()),
582 )
583 await asyncio.wait(tasks, timeout=self.timeout)
585 assert event.is_set()