Coverage for faststream / prometheus / middleware.py: 99%

64 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import time 

2from collections.abc import Awaitable, Callable, Sequence 

3from typing import TYPE_CHECKING, Any, Generic 

4 

5from faststream._internal.constants import EMPTY 

6from faststream._internal.middlewares import BaseMiddleware 

7from faststream._internal.types import AnyMsg, BrokerMiddleware, PublishCommandType 

8from faststream.exceptions import IgnoredException 

9from faststream.message import SourceType 

10from faststream.prometheus.consts import ( 

11 PROCESSING_STATUS_BY_ACK_STATUS, 

12 PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP, 

13) 

14from faststream.prometheus.container import MetricsContainer 

15from faststream.prometheus.manager import MetricsManager 

16from faststream.prometheus.provider import MetricsSettingsProvider 

17from faststream.prometheus.types import ProcessingStatus, PublishingStatus 

18from faststream.response import PublishType 

19 

20if TYPE_CHECKING: 

21 from prometheus_client import CollectorRegistry 

22 

23 from faststream._internal.basic_types import AsyncFuncAny 

24 from faststream._internal.context.repository import ContextRepo 

25 from faststream.message.message import StreamMessage 

26 

27 

28class PrometheusMiddleware(BrokerMiddleware[AnyMsg, PublishCommandType]): 

29 __slots__ = ( 

30 "_dynamic_labels", 

31 "_metrics_container", 

32 "_metrics_manager", 

33 "_settings_provider_factory", 

34 "_static_labels", 

35 ) 

36 

37 def __init__( 

38 self, 

39 *, 

40 settings_provider_factory: Callable[ 

41 [AnyMsg | None], 

42 MetricsSettingsProvider[AnyMsg, PublishCommandType] | None, 

43 ], 

44 registry: "CollectorRegistry", 

45 app_name: str = EMPTY, 

46 metrics_prefix: str = "faststream", 

47 received_messages_size_buckets: Sequence[float] | None = None, 

48 custom_labels: dict[str, str | Callable[[Any], str]] | None = None, 

49 ) -> None: 

50 if app_name is EMPTY: 50 ↛ 53line 50 didn't jump to line 53 because the condition on line 50 was always true

51 app_name = metrics_prefix 

52 

53 self._settings_provider_factory = settings_provider_factory 

54 self._metrics_container = MetricsContainer( 

55 registry, 

56 metrics_prefix=metrics_prefix, 

57 received_messages_size_buckets=received_messages_size_buckets, 

58 custom_label_names=tuple((custom_labels or {}).keys()), 

59 ) 

60 self._metrics_manager = MetricsManager( 

61 self._metrics_container, 

62 app_name=app_name, 

63 ) 

64 self._static_labels = {} 

65 self._dynamic_labels = {} 

66 

67 for k, v in (custom_labels or {}).items(): 

68 if callable(v): 

69 self._dynamic_labels[k] = v 

70 else: 

71 self._static_labels[k] = v 

72 

73 def __call__( 

74 self, 

75 msg: AnyMsg | None, 

76 /, 

77 *, 

78 context: "ContextRepo", 

79 ) -> "BasePrometheusMiddleware[PublishCommandType]": 

80 return BasePrometheusMiddleware[PublishCommandType]( 

81 msg, 

82 metrics_manager=self._metrics_manager, 

83 settings_provider_factory=self._settings_provider_factory, 

84 context=context, 

85 custom_labels=self._static_labels 

86 | {k: v(msg) for k, v in self._dynamic_labels.items()}, 

87 ) 

88 

89 

90class BasePrometheusMiddleware( 

91 BaseMiddleware[PublishCommandType, AnyMsg], 

92 Generic[PublishCommandType, AnyMsg], 

93): 

94 def __init__( 

95 self, 

96 msg: AnyMsg | None, 

97 /, 

98 *, 

99 settings_provider_factory: Callable[ 

100 [AnyMsg | None], 

101 MetricsSettingsProvider[AnyMsg, PublishCommandType] | None, 

102 ], 

103 metrics_manager: MetricsManager, 

104 context: "ContextRepo", 

105 custom_labels: dict[str, str], 

106 ) -> None: 

107 self._metrics_manager = metrics_manager 

108 self._settings_provider = settings_provider_factory(msg) 

109 self._custom_labels = custom_labels 

110 super().__init__(msg, context=context) 

111 

112 async def consume_scope( 

113 self, 

114 call_next: "AsyncFuncAny", 

115 msg: "StreamMessage[AnyMsg]", 

116 ) -> Any: 

117 if self._settings_provider is None or msg.source_type is SourceType.RESPONSE: 

118 return await call_next(msg) 

119 

120 messaging_system = self._settings_provider.messaging_system 

121 consume_attrs = self._settings_provider.get_consume_attrs_from_message(msg) 

122 destination_name = consume_attrs["destination_name"] 

123 

124 self._metrics_manager.add_received_message( 

125 amount=consume_attrs["messages_count"], 

126 broker=messaging_system, 

127 handler=destination_name, 

128 **self._custom_labels, 

129 ) 

130 

131 self._metrics_manager.observe_received_messages_size( 

132 size=consume_attrs["message_size"], 

133 broker=messaging_system, 

134 handler=destination_name, 

135 **self._custom_labels, 

136 ) 

137 

138 self._metrics_manager.add_received_message_in_process( 

139 amount=consume_attrs["messages_count"], 

140 broker=messaging_system, 

141 handler=destination_name, 

142 **self._custom_labels, 

143 ) 

144 

145 err: Exception | None = None 

146 start_time = time.perf_counter() 

147 

148 try: 

149 result = await call_next(msg) 

150 

151 except Exception as e: 

152 err = e 

153 

154 if not isinstance(err, IgnoredException): 

155 self._metrics_manager.add_received_processed_message_exception( 

156 exception_type=type(err).__name__, 

157 broker=messaging_system, 

158 handler=destination_name, 

159 **self._custom_labels, 

160 ) 

161 raise 

162 

163 finally: 

164 duration = time.perf_counter() - start_time 

165 self._metrics_manager.observe_received_processed_message_duration( 

166 duration=duration, 

167 broker=messaging_system, 

168 handler=destination_name, 

169 **self._custom_labels, 

170 ) 

171 

172 self._metrics_manager.remove_received_message_in_process( 

173 amount=consume_attrs["messages_count"], 

174 broker=messaging_system, 

175 handler=destination_name, 

176 **self._custom_labels, 

177 ) 

178 

179 status = ProcessingStatus.acked 

180 

181 if msg.committed or err: 

182 status = ( 

183 PROCESSING_STATUS_BY_ACK_STATUS.get(msg.committed) # type: ignore[arg-type] 

184 or PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP.get(type(err)) 

185 or ProcessingStatus.error 

186 ) 

187 

188 self._metrics_manager.add_received_processed_message( 

189 amount=consume_attrs["messages_count"], 

190 status=status, 

191 broker=messaging_system, 

192 handler=destination_name, 

193 **self._custom_labels, 

194 ) 

195 

196 return result 

197 

198 async def publish_scope( 

199 self, 

200 call_next: Callable[[PublishCommandType], Awaitable[Any]], 

201 cmd: PublishCommandType, 

202 ) -> Any: 

203 if self._settings_provider is None or cmd.publish_type is PublishType.REPLY: 

204 return await call_next(cmd) 

205 

206 destination_name = self._settings_provider.get_publish_destination_name_from_cmd( 

207 cmd 

208 ) 

209 messaging_system = self._settings_provider.messaging_system 

210 

211 err: Exception | None = None 

212 start_time = time.perf_counter() 

213 

214 try: 

215 result = await call_next(cmd) 

216 

217 except Exception as e: 

218 err = e 

219 self._metrics_manager.add_published_message_exception( 

220 exception_type=type(err).__name__, 

221 broker=messaging_system, 

222 destination=destination_name, 

223 **self._custom_labels, 

224 ) 

225 raise 

226 

227 finally: 

228 duration = time.perf_counter() - start_time 

229 

230 self._metrics_manager.observe_published_message_duration( 

231 duration=duration, 

232 broker=messaging_system, 

233 destination=destination_name, 

234 **self._custom_labels, 

235 ) 

236 

237 status = PublishingStatus.error if err else PublishingStatus.success 

238 

239 self._metrics_manager.add_published_message( 

240 amount=len(cmd.batch_bodies), 

241 status=status, 

242 broker=messaging_system, 

243 destination=destination_name, 

244 **self._custom_labels, 

245 ) 

246 

247 return result