Coverage for tests / prometheus / basic.py: 99%
87 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import asyncio
2from typing import TYPE_CHECKING, Any, cast
4import pytest
5from dirty_equals import IsList, IsPositiveFloat, IsStr
6from prometheus_client import CollectorRegistry
8from faststream import Context
9from faststream.exceptions import IgnoredException, RejectMessage
10from faststream.message import AckStatus
11from faststream.prometheus import MetricsSettingsProvider
12from faststream.prometheus.middleware import (
13 PROCESSING_STATUS_BY_ACK_STATUS,
14 PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP,
15 BasePrometheusMiddleware,
16 PrometheusMiddleware,
17)
18from faststream.prometheus.types import ProcessingStatus, PublishingStatus
19from tests.brokers.base.basic import BaseTestcaseConfig
20from tests.prometheus.utils import (
21 get_published_messages_duration_seconds_metric,
22 get_published_messages_exceptions_metric,
23 get_published_messages_metric,
24 get_received_messages_in_process_metric,
25 get_received_messages_metric,
26 get_received_messages_size_bytes_metric,
27 get_received_processed_messages_duration_seconds_metric,
28 get_received_processed_messages_exceptions_metric,
29 get_received_processed_messages_metric,
30)
32if TYPE_CHECKING:
33 from collections.abc import Callable
36@pytest.mark.asyncio()
37class LocalPrometheusTestcase(BaseTestcaseConfig):
38 def get_middleware(self, **kwargs: Any) -> PrometheusMiddleware:
39 raise NotImplementedError
41 def get_settings_provider(self) -> MetricsSettingsProvider[Any]:
42 raise NotImplementedError
44 @pytest.mark.parametrize(
45 (
46 "status",
47 "exception_class",
48 ),
49 (
50 pytest.param(
51 AckStatus.ACKED,
52 RejectMessage,
53 id="acked status with reject message exception",
54 ),
55 pytest.param(
56 AckStatus.ACKED,
57 Exception,
58 id="acked status with not handler exception",
59 ),
60 pytest.param(
61 AckStatus.ACKED,
62 None,
63 id="acked status without exception",
64 ),
65 pytest.param(
66 AckStatus.NACKED,
67 None,
68 id="nacked status without exception",
69 ),
70 pytest.param(
71 AckStatus.REJECTED,
72 None,
73 id="rejected status without exception",
74 ),
75 pytest.param(
76 AckStatus.ACKED,
77 IgnoredException,
78 id="acked status with ignore exception",
79 ),
80 ),
81 )
82 async def test_metrics(
83 self,
84 queue: str,
85 status: AckStatus,
86 exception_class: type[Exception] | None,
87 ) -> None:
88 event = asyncio.Event()
89 registry = CollectorRegistry()
90 custom_labels: dict[str, str | Callable[[Any], str]] = {
91 "static": "pupupu",
92 "dynamic": lambda x: "papapa",
93 }
94 middleware = self.get_middleware(registry=registry, custom_labels=custom_labels)
96 broker = self.get_broker(apply_types=True, middlewares=(middleware,))
98 args, kwargs = self.get_subscriber_params(queue)
100 message = None
102 @broker.subscriber(*args, **kwargs)
103 async def handler(m=Context("message")) -> None:
104 event.set()
106 nonlocal message
107 message = m
109 if exception_class:
110 raise exception_class
112 if status == AckStatus.ACKED:
113 await message.ack()
114 elif status == AckStatus.NACKED:
115 await message.nack()
116 elif status == AckStatus.REJECTED: 116 ↛ exitline 116 didn't return from function 'handler' because the condition on line 116 was always true
117 await message.reject()
119 async with broker:
120 await broker.start()
121 tasks = (
122 asyncio.create_task(broker.publish("hello", queue)),
123 asyncio.create_task(event.wait()),
124 )
125 await asyncio.wait(tasks, timeout=self.timeout)
127 assert event.is_set()
128 self.assert_metrics(
129 registry=registry,
130 message=message,
131 exception_class=exception_class,
132 custom_labels={
133 "static": "pupupu",
134 "dynamic": "papapa",
135 },
136 )
138 def assert_metrics(
139 self,
140 *,
141 registry: CollectorRegistry,
142 message: Any,
143 exception_class: type[Exception] | None,
144 custom_labels: dict[str, str],
145 ) -> None:
146 settings_provider = self.get_settings_provider()
147 consume_attrs = settings_provider.get_consume_attrs_from_message(message)
149 received_messages_metric = get_received_messages_metric(
150 metrics_prefix="faststream",
151 app_name="faststream",
152 broker=settings_provider.messaging_system,
153 queue=consume_attrs["destination_name"],
154 messages_amount=consume_attrs["messages_count"],
155 custom_labels=custom_labels,
156 )
158 received_messages_size_bytes_metric = get_received_messages_size_bytes_metric(
159 metrics_prefix="faststream",
160 app_name="faststream",
161 broker=settings_provider.messaging_system,
162 queue=consume_attrs["destination_name"],
163 buckets=(
164 2.0**4,
165 2.0**6,
166 2.0**8,
167 2.0**10,
168 2.0**12,
169 2.0**14,
170 2.0**16,
171 2.0**18,
172 2.0**20,
173 2.0**22,
174 2.0**24,
175 float("inf"),
176 ),
177 size=consume_attrs["message_size"],
178 messages_amount=1,
179 custom_labels=custom_labels,
180 )
182 received_messages_in_process_metric = get_received_messages_in_process_metric(
183 metrics_prefix="faststream",
184 app_name="faststream",
185 broker=settings_provider.messaging_system,
186 queue=consume_attrs["destination_name"],
187 messages_amount=0,
188 custom_labels=custom_labels,
189 )
191 received_processed_messages_duration_seconds_metric = (
192 get_received_processed_messages_duration_seconds_metric(
193 metrics_prefix="faststream",
194 app_name="faststream",
195 broker=settings_provider.messaging_system,
196 queue=consume_attrs["destination_name"],
197 duration=cast("float", IsPositiveFloat),
198 custom_labels=custom_labels,
199 )
200 )
202 status = ProcessingStatus.acked
204 if exception_class:
205 status = (
206 PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP.get(exception_class)
207 or ProcessingStatus.error
208 )
209 elif message.committed:
210 status = PROCESSING_STATUS_BY_ACK_STATUS[message.committed]
212 received_processed_messages_metric = get_received_processed_messages_metric(
213 metrics_prefix="faststream",
214 app_name="faststream",
215 broker=settings_provider.messaging_system,
216 queue=consume_attrs["destination_name"],
217 messages_amount=consume_attrs["messages_count"],
218 status=status,
219 custom_labels=custom_labels,
220 )
222 exception_type: str | None = None
224 if exception_class and not issubclass(exception_class, IgnoredException):
225 exception_type = exception_class.__name__
227 received_processed_messages_exceptions_metric = (
228 get_received_processed_messages_exceptions_metric(
229 metrics_prefix="faststream",
230 app_name="faststream",
231 broker=settings_provider.messaging_system,
232 queue=consume_attrs["destination_name"],
233 exception_type=exception_type,
234 exceptions_amount=consume_attrs["messages_count"],
235 custom_labels=custom_labels,
236 )
237 )
239 published_messages_metric = get_published_messages_metric(
240 metrics_prefix="faststream",
241 app_name="faststream",
242 broker=settings_provider.messaging_system,
243 queue=cast("str", IsStr),
244 status=PublishingStatus.success,
245 messages_amount=consume_attrs["messages_count"],
246 custom_labels=custom_labels,
247 )
249 published_messages_duration_seconds_metric = (
250 get_published_messages_duration_seconds_metric(
251 metrics_prefix="faststream",
252 app_name="faststream",
253 broker=settings_provider.messaging_system,
254 queue=cast("str", IsStr),
255 duration=cast("float", IsPositiveFloat),
256 custom_labels=custom_labels,
257 )
258 )
260 published_messages_exceptions_metric = get_published_messages_exceptions_metric(
261 metrics_prefix="faststream",
262 app_name="faststream",
263 broker=settings_provider.messaging_system,
264 queue=cast("str", IsStr),
265 exception_type=None,
266 custom_labels=custom_labels,
267 )
269 expected_metrics = IsList(
270 received_messages_metric,
271 received_messages_size_bytes_metric,
272 received_messages_in_process_metric,
273 received_processed_messages_metric,
274 received_processed_messages_duration_seconds_metric,
275 received_processed_messages_exceptions_metric,
276 published_messages_metric,
277 published_messages_duration_seconds_metric,
278 published_messages_exceptions_metric,
279 check_order=False,
280 )
281 real_metrics = list(registry.collect())
283 assert real_metrics == expected_metrics
286class LocalRPCPrometheusTestcase:
287 @pytest.mark.asyncio()
288 async def test_rpc_request(
289 self,
290 queue: str,
291 ) -> None:
292 event = asyncio.Event()
293 registry = CollectorRegistry()
295 middleware = self.get_middleware(registry=registry)
297 broker = self.get_broker(apply_types=True, middlewares=(middleware,))
299 message = None
301 @broker.subscriber(queue)
302 async def handle(m=Context("message")):
303 event.set()
305 nonlocal message
306 message = m
308 return ""
310 async with self.patch_broker(broker) as br:
311 await br.start()
313 await asyncio.wait_for(
314 br.request("", queue),
315 timeout=3,
316 )
318 assert event.is_set()
320 self.assert_metrics(
321 registry=registry,
322 message=message,
323 exception_class=None,
324 custom_labels={},
325 )
328class LocalMetricsSettingsProviderTestcase:
329 messaging_system: str
331 def get_middleware(self, **kwargs) -> BasePrometheusMiddleware:
332 raise NotImplementedError
334 @staticmethod
335 def get_settings_provider() -> MetricsSettingsProvider:
336 raise NotImplementedError
338 def test_messaging_system(self) -> None:
339 provider = self.get_settings_provider()
340 assert provider.messaging_system == self.messaging_system
342 def test_one_registry_for_some_middlewares(self) -> None:
343 registry = CollectorRegistry()
345 middleware_1 = self.get_middleware(registry=registry)
346 middleware_2 = self.get_middleware(registry=registry)
347 self.get_broker(middlewares=(middleware_1,))
348 self.get_broker(middlewares=(middleware_2,))
350 assert (
351 middleware_1._metrics_container.received_messages_total
352 is middleware_2._metrics_container.received_messages_total
353 )