Coverage for tests / prometheus / basic.py: 99%

87 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import asyncio 

2from typing import TYPE_CHECKING, Any, cast 

3 

4import pytest 

5from dirty_equals import IsList, IsPositiveFloat, IsStr 

6from prometheus_client import CollectorRegistry 

7 

8from faststream import Context 

9from faststream.exceptions import IgnoredException, RejectMessage 

10from faststream.message import AckStatus 

11from faststream.prometheus import MetricsSettingsProvider 

12from faststream.prometheus.middleware import ( 

13 PROCESSING_STATUS_BY_ACK_STATUS, 

14 PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP, 

15 BasePrometheusMiddleware, 

16 PrometheusMiddleware, 

17) 

18from faststream.prometheus.types import ProcessingStatus, PublishingStatus 

19from tests.brokers.base.basic import BaseTestcaseConfig 

20from tests.prometheus.utils import ( 

21 get_published_messages_duration_seconds_metric, 

22 get_published_messages_exceptions_metric, 

23 get_published_messages_metric, 

24 get_received_messages_in_process_metric, 

25 get_received_messages_metric, 

26 get_received_messages_size_bytes_metric, 

27 get_received_processed_messages_duration_seconds_metric, 

28 get_received_processed_messages_exceptions_metric, 

29 get_received_processed_messages_metric, 

30) 

31 

32if TYPE_CHECKING: 

33 from collections.abc import Callable 

34 

35 

36@pytest.mark.asyncio() 

37class LocalPrometheusTestcase(BaseTestcaseConfig): 

38 def get_middleware(self, **kwargs: Any) -> PrometheusMiddleware: 

39 raise NotImplementedError 

40 

41 def get_settings_provider(self) -> MetricsSettingsProvider[Any]: 

42 raise NotImplementedError 

43 

44 @pytest.mark.parametrize( 

45 ( 

46 "status", 

47 "exception_class", 

48 ), 

49 ( 

50 pytest.param( 

51 AckStatus.ACKED, 

52 RejectMessage, 

53 id="acked status with reject message exception", 

54 ), 

55 pytest.param( 

56 AckStatus.ACKED, 

57 Exception, 

58 id="acked status with not handler exception", 

59 ), 

60 pytest.param( 

61 AckStatus.ACKED, 

62 None, 

63 id="acked status without exception", 

64 ), 

65 pytest.param( 

66 AckStatus.NACKED, 

67 None, 

68 id="nacked status without exception", 

69 ), 

70 pytest.param( 

71 AckStatus.REJECTED, 

72 None, 

73 id="rejected status without exception", 

74 ), 

75 pytest.param( 

76 AckStatus.ACKED, 

77 IgnoredException, 

78 id="acked status with ignore exception", 

79 ), 

80 ), 

81 ) 

82 async def test_metrics( 

83 self, 

84 queue: str, 

85 status: AckStatus, 

86 exception_class: type[Exception] | None, 

87 ) -> None: 

88 event = asyncio.Event() 

89 registry = CollectorRegistry() 

90 custom_labels: dict[str, str | Callable[[Any], str]] = { 

91 "static": "pupupu", 

92 "dynamic": lambda x: "papapa", 

93 } 

94 middleware = self.get_middleware(registry=registry, custom_labels=custom_labels) 

95 

96 broker = self.get_broker(apply_types=True, middlewares=(middleware,)) 

97 

98 args, kwargs = self.get_subscriber_params(queue) 

99 

100 message = None 

101 

102 @broker.subscriber(*args, **kwargs) 

103 async def handler(m=Context("message")) -> None: 

104 event.set() 

105 

106 nonlocal message 

107 message = m 

108 

109 if exception_class: 

110 raise exception_class 

111 

112 if status == AckStatus.ACKED: 

113 await message.ack() 

114 elif status == AckStatus.NACKED: 

115 await message.nack() 

116 elif status == AckStatus.REJECTED: 116 ↛ exitline 116 didn't return from function 'handler' because the condition on line 116 was always true

117 await message.reject() 

118 

119 async with broker: 

120 await broker.start() 

121 tasks = ( 

122 asyncio.create_task(broker.publish("hello", queue)), 

123 asyncio.create_task(event.wait()), 

124 ) 

125 await asyncio.wait(tasks, timeout=self.timeout) 

126 

127 assert event.is_set() 

128 self.assert_metrics( 

129 registry=registry, 

130 message=message, 

131 exception_class=exception_class, 

132 custom_labels={ 

133 "static": "pupupu", 

134 "dynamic": "papapa", 

135 }, 

136 ) 

137 

138 def assert_metrics( 

139 self, 

140 *, 

141 registry: CollectorRegistry, 

142 message: Any, 

143 exception_class: type[Exception] | None, 

144 custom_labels: dict[str, str], 

145 ) -> None: 

146 settings_provider = self.get_settings_provider() 

147 consume_attrs = settings_provider.get_consume_attrs_from_message(message) 

148 

149 received_messages_metric = get_received_messages_metric( 

150 metrics_prefix="faststream", 

151 app_name="faststream", 

152 broker=settings_provider.messaging_system, 

153 queue=consume_attrs["destination_name"], 

154 messages_amount=consume_attrs["messages_count"], 

155 custom_labels=custom_labels, 

156 ) 

157 

158 received_messages_size_bytes_metric = get_received_messages_size_bytes_metric( 

159 metrics_prefix="faststream", 

160 app_name="faststream", 

161 broker=settings_provider.messaging_system, 

162 queue=consume_attrs["destination_name"], 

163 buckets=( 

164 2.0**4, 

165 2.0**6, 

166 2.0**8, 

167 2.0**10, 

168 2.0**12, 

169 2.0**14, 

170 2.0**16, 

171 2.0**18, 

172 2.0**20, 

173 2.0**22, 

174 2.0**24, 

175 float("inf"), 

176 ), 

177 size=consume_attrs["message_size"], 

178 messages_amount=1, 

179 custom_labels=custom_labels, 

180 ) 

181 

182 received_messages_in_process_metric = get_received_messages_in_process_metric( 

183 metrics_prefix="faststream", 

184 app_name="faststream", 

185 broker=settings_provider.messaging_system, 

186 queue=consume_attrs["destination_name"], 

187 messages_amount=0, 

188 custom_labels=custom_labels, 

189 ) 

190 

191 received_processed_messages_duration_seconds_metric = ( 

192 get_received_processed_messages_duration_seconds_metric( 

193 metrics_prefix="faststream", 

194 app_name="faststream", 

195 broker=settings_provider.messaging_system, 

196 queue=consume_attrs["destination_name"], 

197 duration=cast("float", IsPositiveFloat), 

198 custom_labels=custom_labels, 

199 ) 

200 ) 

201 

202 status = ProcessingStatus.acked 

203 

204 if exception_class: 

205 status = ( 

206 PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP.get(exception_class) 

207 or ProcessingStatus.error 

208 ) 

209 elif message.committed: 

210 status = PROCESSING_STATUS_BY_ACK_STATUS[message.committed] 

211 

212 received_processed_messages_metric = get_received_processed_messages_metric( 

213 metrics_prefix="faststream", 

214 app_name="faststream", 

215 broker=settings_provider.messaging_system, 

216 queue=consume_attrs["destination_name"], 

217 messages_amount=consume_attrs["messages_count"], 

218 status=status, 

219 custom_labels=custom_labels, 

220 ) 

221 

222 exception_type: str | None = None 

223 

224 if exception_class and not issubclass(exception_class, IgnoredException): 

225 exception_type = exception_class.__name__ 

226 

227 received_processed_messages_exceptions_metric = ( 

228 get_received_processed_messages_exceptions_metric( 

229 metrics_prefix="faststream", 

230 app_name="faststream", 

231 broker=settings_provider.messaging_system, 

232 queue=consume_attrs["destination_name"], 

233 exception_type=exception_type, 

234 exceptions_amount=consume_attrs["messages_count"], 

235 custom_labels=custom_labels, 

236 ) 

237 ) 

238 

239 published_messages_metric = get_published_messages_metric( 

240 metrics_prefix="faststream", 

241 app_name="faststream", 

242 broker=settings_provider.messaging_system, 

243 queue=cast("str", IsStr), 

244 status=PublishingStatus.success, 

245 messages_amount=consume_attrs["messages_count"], 

246 custom_labels=custom_labels, 

247 ) 

248 

249 published_messages_duration_seconds_metric = ( 

250 get_published_messages_duration_seconds_metric( 

251 metrics_prefix="faststream", 

252 app_name="faststream", 

253 broker=settings_provider.messaging_system, 

254 queue=cast("str", IsStr), 

255 duration=cast("float", IsPositiveFloat), 

256 custom_labels=custom_labels, 

257 ) 

258 ) 

259 

260 published_messages_exceptions_metric = get_published_messages_exceptions_metric( 

261 metrics_prefix="faststream", 

262 app_name="faststream", 

263 broker=settings_provider.messaging_system, 

264 queue=cast("str", IsStr), 

265 exception_type=None, 

266 custom_labels=custom_labels, 

267 ) 

268 

269 expected_metrics = IsList( 

270 received_messages_metric, 

271 received_messages_size_bytes_metric, 

272 received_messages_in_process_metric, 

273 received_processed_messages_metric, 

274 received_processed_messages_duration_seconds_metric, 

275 received_processed_messages_exceptions_metric, 

276 published_messages_metric, 

277 published_messages_duration_seconds_metric, 

278 published_messages_exceptions_metric, 

279 check_order=False, 

280 ) 

281 real_metrics = list(registry.collect()) 

282 

283 assert real_metrics == expected_metrics 

284 

285 

286class LocalRPCPrometheusTestcase: 

287 @pytest.mark.asyncio() 

288 async def test_rpc_request( 

289 self, 

290 queue: str, 

291 ) -> None: 

292 event = asyncio.Event() 

293 registry = CollectorRegistry() 

294 

295 middleware = self.get_middleware(registry=registry) 

296 

297 broker = self.get_broker(apply_types=True, middlewares=(middleware,)) 

298 

299 message = None 

300 

301 @broker.subscriber(queue) 

302 async def handle(m=Context("message")): 

303 event.set() 

304 

305 nonlocal message 

306 message = m 

307 

308 return "" 

309 

310 async with self.patch_broker(broker) as br: 

311 await br.start() 

312 

313 await asyncio.wait_for( 

314 br.request("", queue), 

315 timeout=3, 

316 ) 

317 

318 assert event.is_set() 

319 

320 self.assert_metrics( 

321 registry=registry, 

322 message=message, 

323 exception_class=None, 

324 custom_labels={}, 

325 ) 

326 

327 

328class LocalMetricsSettingsProviderTestcase: 

329 messaging_system: str 

330 

331 def get_middleware(self, **kwargs) -> BasePrometheusMiddleware: 

332 raise NotImplementedError 

333 

334 @staticmethod 

335 def get_settings_provider() -> MetricsSettingsProvider: 

336 raise NotImplementedError 

337 

338 def test_messaging_system(self) -> None: 

339 provider = self.get_settings_provider() 

340 assert provider.messaging_system == self.messaging_system 

341 

342 def test_one_registry_for_some_middlewares(self) -> None: 

343 registry = CollectorRegistry() 

344 

345 middleware_1 = self.get_middleware(registry=registry) 

346 middleware_2 = self.get_middleware(registry=registry) 

347 self.get_broker(middlewares=(middleware_1,)) 

348 self.get_broker(middlewares=(middleware_2,)) 

349 

350 assert ( 

351 middleware_1._metrics_container.received_messages_total 

352 is middleware_2._metrics_container.received_messages_total 

353 )