Coverage for tests / opentelemetry / basic.py: 99%

288 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import asyncio 

2from typing import Any, cast 

3from unittest.mock import MagicMock 

4 

5import pytest 

6from dirty_equals import IsFloat, IsUUID 

7from opentelemetry import baggage, context 

8from opentelemetry.baggage.propagation import W3CBaggagePropagator 

9from opentelemetry.sdk.metrics import MeterProvider 

10from opentelemetry.sdk.metrics._internal.point import Metric 

11from opentelemetry.sdk.metrics.export import InMemoryMetricReader 

12from opentelemetry.sdk.resources import Resource 

13from opentelemetry.sdk.trace import Span, TracerProvider 

14from opentelemetry.sdk.trace.export import SimpleSpanProcessor 

15from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter 

16from opentelemetry.semconv.trace import SpanAttributes as SpanAttr 

17from opentelemetry.trace import SpanKind, get_current_span 

18 

19from faststream._internal.broker import BrokerUsecase 

20from faststream.opentelemetry import Baggage, CurrentBaggage, CurrentSpan 

21from faststream.opentelemetry.consts import ( 

22 ERROR_TYPE, 

23 MESSAGING_DESTINATION_PUBLISH_NAME, 

24) 

25from faststream.opentelemetry.middleware import ( 

26 MessageAction as Action, 

27 TelemetryMiddleware, 

28) 

29from tests.brokers.base.basic import BaseTestcaseConfig 

30 

31 

32@pytest.mark.asyncio() 

33class LocalTelemetryTestcase(BaseTestcaseConfig): 

34 messaging_system: str 

35 include_messages_counters: bool 

36 resource: Resource = Resource.create(attributes={"service.name": "faststream.test"}) 

37 telemetry_middleware_class: TelemetryMiddleware 

38 

39 def get_broker( 

40 self, 

41 apply_types: bool = False, 

42 **kwargs: Any, 

43 ) -> BrokerUsecase[Any, Any]: 

44 raise NotImplementedError 

45 

46 def patch_broker( 

47 self, 

48 broker: BrokerUsecase[Any, Any], 

49 **kwargs: Any, 

50 ) -> BrokerUsecase[Any, Any]: 

51 return broker 

52 

53 def destination_name(self, queue: str) -> str: 

54 return queue 

55 

56 @staticmethod 

57 def get_spans(exporter: InMemorySpanExporter) -> list[Span]: 

58 spans = cast("tuple[Span, ...]", exporter.get_finished_spans()) 

59 return sorted(spans, key=lambda s: s.start_time or 0) 

60 

61 @staticmethod 

62 def get_metrics( 

63 reader: InMemoryMetricReader, 

64 ) -> list[Metric]: 

65 """Get sorted metrics. 

66 

67 Return order: 

68 - messaging.process.duration 

69 - messaging.process.messages 

70 - messaging.publish.duration 

71 - messaging.publish.messages 

72 """ 

73 metrics = reader.get_metrics_data() 

74 metrics = metrics.resource_metrics[0].scope_metrics[0].metrics 

75 metrics = sorted(metrics, key=lambda m: m.name) 

76 return cast("list[Metric]", metrics) 

77 

78 @pytest.fixture() 

79 def tracer_provider(self) -> TracerProvider: 

80 return TracerProvider(resource=self.resource) 

81 

82 @pytest.fixture() 

83 def trace_exporter(self, tracer_provider: TracerProvider) -> InMemorySpanExporter: 

84 exporter = InMemorySpanExporter() 

85 tracer_provider.add_span_processor(SimpleSpanProcessor(exporter)) 

86 return exporter 

87 

88 @pytest.fixture() 

89 def metric_reader(self) -> InMemoryMetricReader: 

90 return InMemoryMetricReader() 

91 

92 @pytest.fixture() 

93 def meter_provider(self, metric_reader: InMemoryMetricReader) -> MeterProvider: 

94 return MeterProvider(metric_readers=(metric_reader,), resource=self.resource) 

95 

96 def assert_span( 

97 self, 

98 span: Span, 

99 action: str, 

100 queue: str, 

101 msg: str, 

102 parent_span_id: str | None = None, 

103 ) -> None: 

104 attrs = span.attributes or {} 

105 assert attrs[SpanAttr.MESSAGING_SYSTEM] == self.messaging_system, attrs[ 

106 SpanAttr.MESSAGING_SYSTEM 

107 ] 

108 assert attrs[SpanAttr.MESSAGING_MESSAGE_CONVERSATION_ID] == IsUUID, attrs[ 

109 SpanAttr.MESSAGING_MESSAGE_CONVERSATION_ID 

110 ] 

111 assert span.name == f"{self.destination_name(queue)} {action}", span.name 

112 assert span.kind in {SpanKind.CONSUMER, SpanKind.PRODUCER}, span.kind 

113 

114 if span.kind == SpanKind.PRODUCER and action in {Action.CREATE, Action.PUBLISH}: 

115 assert attrs[SpanAttr.MESSAGING_DESTINATION_NAME] == queue, attrs[ 

116 SpanAttr.MESSAGING_DESTINATION_NAME 

117 ] 

118 

119 if span.kind == SpanKind.CONSUMER and action in {Action.CREATE, Action.PROCESS}: 

120 assert attrs[MESSAGING_DESTINATION_PUBLISH_NAME] == queue, attrs[ 

121 MESSAGING_DESTINATION_PUBLISH_NAME 

122 ] 

123 assert attrs[SpanAttr.MESSAGING_MESSAGE_ID] == IsUUID, attrs[ 

124 SpanAttr.MESSAGING_MESSAGE_ID 

125 ] 

126 

127 if action == Action.PROCESS: 

128 assert attrs[SpanAttr.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES] == len(msg), ( 

129 attrs[SpanAttr.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES] 

130 ) 

131 assert attrs[SpanAttr.MESSAGING_OPERATION] == action, attrs[ 

132 SpanAttr.MESSAGING_OPERATION 

133 ] 

134 

135 if action == Action.PUBLISH: 

136 assert attrs[SpanAttr.MESSAGING_OPERATION] == action, attrs[ 

137 SpanAttr.MESSAGING_OPERATION 

138 ] 

139 

140 if parent_span_id: 

141 assert span.parent 

142 assert span.parent.span_id == parent_span_id, span.parent.span_id 

143 

144 def assert_metrics( 

145 self, 

146 metrics: list[Metric], 

147 count: int = 1, 

148 error_type: str | None = None, 

149 ) -> None: 

150 if self.include_messages_counters: 

151 assert len(metrics) == 4 

152 proc_dur, proc_msg, pub_dur, pub_msg = metrics 

153 

154 assert proc_msg.data.data_points[0].value == count 

155 assert pub_msg.data.data_points[0].value == count 

156 

157 else: 

158 assert len(metrics) == 2 

159 proc_dur, pub_dur = metrics 

160 

161 if error_type: 

162 assert proc_dur.data.data_points[0].attributes[ERROR_TYPE] == error_type 

163 

164 assert proc_dur.data.data_points[0].count == 1 

165 assert proc_dur.data.data_points[0].sum == IsFloat 

166 

167 assert pub_dur.data.data_points[0].count == 1 

168 assert pub_dur.data.data_points[0].sum == IsFloat 

169 

170 async def test_subscriber_create_publish_process_span( 

171 self, 

172 queue: str, 

173 mock: MagicMock, 

174 tracer_provider: TracerProvider, 

175 trace_exporter: InMemorySpanExporter, 

176 event: asyncio.Event, 

177 ) -> None: 

178 mid = self.telemetry_middleware_class(tracer_provider=tracer_provider) 

179 broker = self.get_broker(middlewares=(mid,)) 

180 

181 args, kwargs = self.get_subscriber_params(queue) 

182 

183 @broker.subscriber(*args, **kwargs) 

184 async def handler(m) -> None: 

185 mock(m) 

186 event.set() 

187 

188 broker = self.patch_broker(broker) 

189 msg = "start" 

190 

191 async with broker: 

192 await broker.start() 

193 tasks = ( 

194 asyncio.create_task(broker.publish(msg, queue)), 

195 asyncio.create_task(event.wait()), 

196 ) 

197 await asyncio.wait(tasks, timeout=self.timeout) 

198 

199 create, publish, process = self.get_spans(trace_exporter) 

200 parent_span_id = create.context.span_id 

201 

202 self.assert_span(create, Action.CREATE, queue, msg) 

203 self.assert_span(publish, Action.PUBLISH, queue, msg, parent_span_id) 

204 self.assert_span(process, Action.PROCESS, queue, msg, parent_span_id) 

205 

206 mock.assert_called_once_with(msg) 

207 

208 async def test_chain_subscriber_publisher( 

209 self, 

210 queue: str, 

211 mock: MagicMock, 

212 tracer_provider: TracerProvider, 

213 trace_exporter: InMemorySpanExporter, 

214 event: asyncio.Event, 

215 ) -> None: 

216 mid = self.telemetry_middleware_class(tracer_provider=tracer_provider) 

217 broker = self.get_broker(middlewares=(mid,)) 

218 

219 first_queue = queue 

220 second_queue = queue + "2" 

221 

222 args, kwargs = self.get_subscriber_params(first_queue) 

223 

224 @broker.subscriber(*args, **kwargs) 

225 @broker.publisher(second_queue) 

226 async def handler1(m): 

227 return m 

228 

229 args2, kwargs2 = self.get_subscriber_params(second_queue) 

230 

231 @broker.subscriber(*args2, **kwargs2) 

232 async def handler2(m) -> None: 

233 mock(m) 

234 event.set() 

235 

236 broker = self.patch_broker(broker) 

237 msg = "start" 

238 

239 async with broker: 

240 await broker.start() 

241 tasks = ( 

242 asyncio.create_task(broker.publish(msg, queue)), 

243 asyncio.create_task(event.wait()), 

244 ) 

245 await asyncio.wait(tasks, timeout=self.timeout) 

246 

247 spans = self.get_spans(trace_exporter) 

248 create, pub1, proc1, pub2, proc2 = spans 

249 parent_span_id = create.context.span_id 

250 

251 self.assert_span(create, Action.CREATE, first_queue, msg) 

252 self.assert_span(pub1, Action.PUBLISH, first_queue, msg, parent_span_id) 

253 self.assert_span(proc1, Action.PROCESS, first_queue, msg, parent_span_id) 

254 self.assert_span(pub2, Action.PUBLISH, second_queue, msg, proc1.context.span_id) 

255 self.assert_span(proc2, Action.PROCESS, second_queue, msg, parent_span_id) 

256 

257 assert ( 

258 create.start_time 

259 < pub1.start_time 

260 < proc1.start_time 

261 < pub2.start_time 

262 < proc2.start_time 

263 ) 

264 

265 mock.assert_called_once_with(msg) 

266 

267 @pytest.mark.flaky(reruns=3, reruns_delay=1) 

268 async def test_no_trace_context_create_process_span( 

269 self, 

270 queue: str, 

271 tracer_provider: TracerProvider, 

272 trace_exporter: InMemorySpanExporter, 

273 event: asyncio.Event, 

274 ) -> None: 

275 mid = self.telemetry_middleware_class(tracer_provider=tracer_provider) 

276 broker = self.get_broker(middlewares=(mid,)) 

277 broker_without_middlewares = self.get_broker() 

278 

279 args, kwargs = self.get_subscriber_params(queue) 

280 

281 @broker.subscriber(*args, **kwargs) 

282 async def handler(m) -> None: 

283 event.set() 

284 

285 broker = self.patch_broker(broker) 

286 msg = "start" 

287 

288 async with broker, broker_without_middlewares: 

289 await broker.start() 

290 await broker_without_middlewares.start() 

291 

292 broker.config.broker_config.middlewares = () 

293 tasks = ( 

294 asyncio.create_task(broker_without_middlewares.publish(msg, queue)), 

295 asyncio.create_task(event.wait()), 

296 ) 

297 await asyncio.wait(tasks, timeout=self.timeout) 

298 

299 create, process = self.get_spans(trace_exporter) 

300 parent_span_id = create.context.span_id 

301 

302 self.assert_span(create, Action.CREATE, queue, msg) 

303 self.assert_span(process, Action.PROCESS, queue, msg, parent_span_id) 

304 

305 async def test_metrics( 

306 self, 

307 queue: str, 

308 mock: MagicMock, 

309 meter_provider: MeterProvider, 

310 metric_reader: InMemoryMetricReader, 

311 event: asyncio.Event, 

312 ) -> None: 

313 mid = self.telemetry_middleware_class(meter_provider=meter_provider) 

314 broker = self.get_broker(middlewares=(mid,)) 

315 

316 args, kwargs = self.get_subscriber_params(queue) 

317 

318 @broker.subscriber(*args, **kwargs) 

319 async def handler(m) -> None: 

320 mock(m) 

321 event.set() 

322 

323 broker = self.patch_broker(broker) 

324 msg = "start" 

325 

326 async with broker: 

327 await broker.start() 

328 tasks = ( 

329 asyncio.create_task(broker.publish(msg, queue)), 

330 asyncio.create_task(event.wait()), 

331 ) 

332 await asyncio.wait(tasks, timeout=self.timeout) 

333 

334 metrics = self.get_metrics(metric_reader) 

335 

336 self.assert_metrics(metrics) 

337 mock.assert_called_once_with(msg) 

338 

339 async def test_error_metrics( 

340 self, 

341 queue: str, 

342 mock: MagicMock, 

343 meter_provider: MeterProvider, 

344 metric_reader: InMemoryMetricReader, 

345 event: asyncio.Event, 

346 ) -> None: 

347 mid = self.telemetry_middleware_class(meter_provider=meter_provider) 

348 broker = self.get_broker(middlewares=(mid,)) 

349 expected_value_type = "ValueError" 

350 

351 args, kwargs = self.get_subscriber_params(queue) 

352 

353 @broker.subscriber(*args, **kwargs) 

354 async def handler(m) -> None: 

355 try: 

356 raise ValueError 

357 finally: 

358 mock(m) 

359 event.set() 

360 

361 broker = self.patch_broker(broker) 

362 msg = "start" 

363 

364 async with broker: 

365 await broker.start() 

366 tasks = ( 

367 asyncio.create_task(broker.publish(msg, queue)), 

368 asyncio.create_task(event.wait()), 

369 ) 

370 await asyncio.wait(tasks, timeout=self.timeout) 

371 

372 metrics = self.get_metrics(metric_reader) 

373 

374 self.assert_metrics(metrics, error_type=expected_value_type) 

375 mock.assert_called_once_with(msg) 

376 

377 async def test_span_in_context( 

378 self, 

379 queue: str, 

380 mock: MagicMock, 

381 tracer_provider: TracerProvider, 

382 trace_exporter: InMemorySpanExporter, 

383 event: asyncio.Event, 

384 ) -> None: 

385 mid = self.telemetry_middleware_class(tracer_provider=tracer_provider) 

386 broker = self.get_broker(middlewares=(mid,), apply_types=True) 

387 

388 args, kwargs = self.get_subscriber_params(queue) 

389 

390 @broker.subscriber(*args, **kwargs) 

391 async def handler(m, span: CurrentSpan) -> None: 

392 assert span is get_current_span() 

393 mock(m) 

394 event.set() 

395 

396 broker = self.patch_broker(broker) 

397 msg = "start" 

398 

399 async with broker: 

400 await broker.start() 

401 tasks = ( 

402 asyncio.create_task(broker.publish(msg, queue)), 

403 asyncio.create_task(event.wait()), 

404 ) 

405 await asyncio.wait(tasks, timeout=self.timeout) 

406 

407 mock.assert_called_once_with(msg) 

408 

409 async def test_get_baggage( 

410 self, 

411 queue: str, 

412 mock: MagicMock, 

413 event: asyncio.Event, 

414 ) -> None: 

415 mid = self.telemetry_middleware_class() 

416 broker = self.get_broker(middlewares=(mid,), apply_types=True) 

417 expected_baggage = {"foo": "bar"} 

418 

419 args, kwargs = self.get_subscriber_params(queue) 

420 

421 @broker.subscriber(*args, **kwargs) 

422 async def handler1(m, baggage: CurrentBaggage) -> None: 

423 assert baggage.get("foo") == "bar" 

424 assert baggage.get_all() == expected_baggage 

425 assert baggage.get_all_batch() == [] 

426 assert baggage.__repr__() == expected_baggage.__repr__() 

427 mock(m) 

428 event.set() 

429 

430 broker = self.patch_broker(broker) 

431 msg = "start" 

432 

433 async with broker: 

434 await broker.start() 

435 tasks = ( 

436 asyncio.create_task( 

437 broker.publish( 

438 msg, 

439 queue, 

440 headers=Baggage({"foo": "bar"}).to_headers(), 

441 ), 

442 ), 

443 asyncio.create_task(event.wait()), 

444 ) 

445 await asyncio.wait(tasks, timeout=self.timeout) 

446 

447 mock.assert_called_once_with(msg) 

448 

449 async def test_clear_baggage( 

450 self, 

451 queue: str, 

452 mock: MagicMock, 

453 event: asyncio.Event, 

454 ) -> None: 

455 mid = self.telemetry_middleware_class() 

456 broker = self.get_broker(middlewares=(mid,), apply_types=True) 

457 

458 first_queue = queue + "1" 

459 second_queue = queue + "2" 

460 

461 args, kwargs = self.get_subscriber_params(first_queue) 

462 

463 @broker.subscriber(*args, **kwargs) 

464 @broker.publisher(second_queue) 

465 async def handler1(m, baggage: CurrentBaggage): 

466 baggage.clear() 

467 assert baggage.get_all() == {} 

468 return m 

469 

470 args2, kwargs2 = self.get_subscriber_params(second_queue) 

471 

472 @broker.subscriber(*args2, **kwargs2) 

473 async def handler2(m, baggage: CurrentBaggage) -> None: 

474 assert baggage.get_all() == {} 

475 mock(m) 

476 event.set() 

477 

478 broker = self.patch_broker(broker) 

479 msg = "start" 

480 

481 async with broker: 

482 await broker.start() 

483 tasks = ( 

484 asyncio.create_task( 

485 broker.publish( 

486 msg, 

487 first_queue, 

488 headers=Baggage({"foo": "bar"}).to_headers(), 

489 ), 

490 ), 

491 asyncio.create_task(event.wait()), 

492 ) 

493 await asyncio.wait(tasks, timeout=self.timeout) 

494 

495 mock.assert_called_once_with(msg) 

496 

497 async def test_modify_baggage( 

498 self, 

499 queue: str, 

500 mock: MagicMock, 

501 event: asyncio.Event, 

502 ) -> None: 

503 mid = self.telemetry_middleware_class() 

504 broker = self.get_broker(middlewares=(mid,), apply_types=True) 

505 expected_baggage = {"baz": "bar", "bar": "baz"} 

506 

507 first_queue = queue + "1" 

508 second_queue = queue + "2" 

509 

510 args, kwargs = self.get_subscriber_params(first_queue) 

511 

512 @broker.subscriber(*args, **kwargs) 

513 @broker.publisher(second_queue) 

514 async def handler1(m, baggage: CurrentBaggage): 

515 baggage.set("bar", "baz") 

516 baggage.set("baz", "bar") 

517 baggage.remove("foo") 

518 return m 

519 

520 args2, kwargs2 = self.get_subscriber_params(second_queue) 

521 

522 @broker.subscriber(*args2, **kwargs2) 

523 async def handler2(m, baggage: CurrentBaggage) -> None: 

524 assert baggage.get_all() == expected_baggage 

525 mock(m) 

526 event.set() 

527 

528 broker = self.patch_broker(broker) 

529 msg = "start" 

530 

531 async with broker: 

532 await broker.start() 

533 tasks = ( 

534 asyncio.create_task( 

535 broker.publish( 

536 msg, 

537 first_queue, 

538 headers=Baggage({"foo": "bar"}).to_headers(), 

539 ), 

540 ), 

541 asyncio.create_task(event.wait()), 

542 ) 

543 await asyncio.wait(tasks, timeout=self.timeout) 

544 

545 mock.assert_called_once_with(msg) 

546 

547 async def test_get_baggage_from_headers( 

548 self, 

549 queue: str, 

550 event: asyncio.Event, 

551 ) -> None: 

552 mid = self.telemetry_middleware_class() 

553 broker = self.get_broker(middlewares=(mid,), apply_types=True) 

554 

555 args, kwargs = self.get_subscriber_params(queue) 

556 

557 expected_baggage = {"foo": "bar", "bar": "baz"} 

558 

559 ctx = context.Context() 

560 for key, value in expected_baggage.items(): 

561 ctx = baggage.set_baggage(key, value, context=ctx) 

562 

563 propagator = W3CBaggagePropagator() 

564 headers = {} 

565 propagator.inject(headers, context=ctx) 

566 

567 @broker.subscriber(*args, **kwargs) 

568 async def handler() -> None: 

569 baggage_instance = Baggage.from_headers(headers) 

570 extracted_baggage = baggage_instance.get_all() 

571 assert extracted_baggage == expected_baggage 

572 event.set() 

573 

574 broker = self.patch_broker(broker) 

575 msg = "start" 

576 

577 async with broker: 

578 await broker.start() 

579 tasks = ( 

580 asyncio.create_task(broker.publish(msg, queue, headers=headers)), 

581 asyncio.create_task(event.wait()), 

582 ) 

583 await asyncio.wait(tasks, timeout=self.timeout) 

584 

585 assert event.is_set()