Coverage for faststream / prometheus / middleware.py: 99%
64 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import time
2from collections.abc import Awaitable, Callable, Sequence
3from typing import TYPE_CHECKING, Any, Generic
5from faststream._internal.constants import EMPTY
6from faststream._internal.middlewares import BaseMiddleware
7from faststream._internal.types import AnyMsg, BrokerMiddleware, PublishCommandType
8from faststream.exceptions import IgnoredException
9from faststream.message import SourceType
10from faststream.prometheus.consts import (
11 PROCESSING_STATUS_BY_ACK_STATUS,
12 PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP,
13)
14from faststream.prometheus.container import MetricsContainer
15from faststream.prometheus.manager import MetricsManager
16from faststream.prometheus.provider import MetricsSettingsProvider
17from faststream.prometheus.types import ProcessingStatus, PublishingStatus
18from faststream.response import PublishType
20if TYPE_CHECKING:
21 from prometheus_client import CollectorRegistry
23 from faststream._internal.basic_types import AsyncFuncAny
24 from faststream._internal.context.repository import ContextRepo
25 from faststream.message.message import StreamMessage
28class PrometheusMiddleware(BrokerMiddleware[AnyMsg, PublishCommandType]):
29 __slots__ = (
30 "_dynamic_labels",
31 "_metrics_container",
32 "_metrics_manager",
33 "_settings_provider_factory",
34 "_static_labels",
35 )
37 def __init__(
38 self,
39 *,
40 settings_provider_factory: Callable[
41 [AnyMsg | None],
42 MetricsSettingsProvider[AnyMsg, PublishCommandType] | None,
43 ],
44 registry: "CollectorRegistry",
45 app_name: str = EMPTY,
46 metrics_prefix: str = "faststream",
47 received_messages_size_buckets: Sequence[float] | None = None,
48 custom_labels: dict[str, str | Callable[[Any], str]] | None = None,
49 ) -> None:
50 if app_name is EMPTY: 50 ↛ 53line 50 didn't jump to line 53 because the condition on line 50 was always true
51 app_name = metrics_prefix
53 self._settings_provider_factory = settings_provider_factory
54 self._metrics_container = MetricsContainer(
55 registry,
56 metrics_prefix=metrics_prefix,
57 received_messages_size_buckets=received_messages_size_buckets,
58 custom_label_names=tuple((custom_labels or {}).keys()),
59 )
60 self._metrics_manager = MetricsManager(
61 self._metrics_container,
62 app_name=app_name,
63 )
64 self._static_labels = {}
65 self._dynamic_labels = {}
67 for k, v in (custom_labels or {}).items():
68 if callable(v):
69 self._dynamic_labels[k] = v
70 else:
71 self._static_labels[k] = v
73 def __call__(
74 self,
75 msg: AnyMsg | None,
76 /,
77 *,
78 context: "ContextRepo",
79 ) -> "BasePrometheusMiddleware[PublishCommandType]":
80 return BasePrometheusMiddleware[PublishCommandType](
81 msg,
82 metrics_manager=self._metrics_manager,
83 settings_provider_factory=self._settings_provider_factory,
84 context=context,
85 custom_labels=self._static_labels
86 | {k: v(msg) for k, v in self._dynamic_labels.items()},
87 )
90class BasePrometheusMiddleware(
91 BaseMiddleware[PublishCommandType, AnyMsg],
92 Generic[PublishCommandType, AnyMsg],
93):
94 def __init__(
95 self,
96 msg: AnyMsg | None,
97 /,
98 *,
99 settings_provider_factory: Callable[
100 [AnyMsg | None],
101 MetricsSettingsProvider[AnyMsg, PublishCommandType] | None,
102 ],
103 metrics_manager: MetricsManager,
104 context: "ContextRepo",
105 custom_labels: dict[str, str],
106 ) -> None:
107 self._metrics_manager = metrics_manager
108 self._settings_provider = settings_provider_factory(msg)
109 self._custom_labels = custom_labels
110 super().__init__(msg, context=context)
112 async def consume_scope(
113 self,
114 call_next: "AsyncFuncAny",
115 msg: "StreamMessage[AnyMsg]",
116 ) -> Any:
117 if self._settings_provider is None or msg.source_type is SourceType.RESPONSE:
118 return await call_next(msg)
120 messaging_system = self._settings_provider.messaging_system
121 consume_attrs = self._settings_provider.get_consume_attrs_from_message(msg)
122 destination_name = consume_attrs["destination_name"]
124 self._metrics_manager.add_received_message(
125 amount=consume_attrs["messages_count"],
126 broker=messaging_system,
127 handler=destination_name,
128 **self._custom_labels,
129 )
131 self._metrics_manager.observe_received_messages_size(
132 size=consume_attrs["message_size"],
133 broker=messaging_system,
134 handler=destination_name,
135 **self._custom_labels,
136 )
138 self._metrics_manager.add_received_message_in_process(
139 amount=consume_attrs["messages_count"],
140 broker=messaging_system,
141 handler=destination_name,
142 **self._custom_labels,
143 )
145 err: Exception | None = None
146 start_time = time.perf_counter()
148 try:
149 result = await call_next(msg)
151 except Exception as e:
152 err = e
154 if not isinstance(err, IgnoredException):
155 self._metrics_manager.add_received_processed_message_exception(
156 exception_type=type(err).__name__,
157 broker=messaging_system,
158 handler=destination_name,
159 **self._custom_labels,
160 )
161 raise
163 finally:
164 duration = time.perf_counter() - start_time
165 self._metrics_manager.observe_received_processed_message_duration(
166 duration=duration,
167 broker=messaging_system,
168 handler=destination_name,
169 **self._custom_labels,
170 )
172 self._metrics_manager.remove_received_message_in_process(
173 amount=consume_attrs["messages_count"],
174 broker=messaging_system,
175 handler=destination_name,
176 **self._custom_labels,
177 )
179 status = ProcessingStatus.acked
181 if msg.committed or err:
182 status = (
183 PROCESSING_STATUS_BY_ACK_STATUS.get(msg.committed) # type: ignore[arg-type]
184 or PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP.get(type(err))
185 or ProcessingStatus.error
186 )
188 self._metrics_manager.add_received_processed_message(
189 amount=consume_attrs["messages_count"],
190 status=status,
191 broker=messaging_system,
192 handler=destination_name,
193 **self._custom_labels,
194 )
196 return result
198 async def publish_scope(
199 self,
200 call_next: Callable[[PublishCommandType], Awaitable[Any]],
201 cmd: PublishCommandType,
202 ) -> Any:
203 if self._settings_provider is None or cmd.publish_type is PublishType.REPLY:
204 return await call_next(cmd)
206 destination_name = self._settings_provider.get_publish_destination_name_from_cmd(
207 cmd
208 )
209 messaging_system = self._settings_provider.messaging_system
211 err: Exception | None = None
212 start_time = time.perf_counter()
214 try:
215 result = await call_next(cmd)
217 except Exception as e:
218 err = e
219 self._metrics_manager.add_published_message_exception(
220 exception_type=type(err).__name__,
221 broker=messaging_system,
222 destination=destination_name,
223 **self._custom_labels,
224 )
225 raise
227 finally:
228 duration = time.perf_counter() - start_time
230 self._metrics_manager.observe_published_message_duration(
231 duration=duration,
232 broker=messaging_system,
233 destination=destination_name,
234 **self._custom_labels,
235 )
237 status = PublishingStatus.error if err else PublishingStatus.success
239 self._metrics_manager.add_published_message(
240 amount=len(cmd.batch_bodies),
241 status=status,
242 broker=messaging_system,
243 destination=destination_name,
244 **self._custom_labels,
245 )
247 return result