Coverage for tests / brokers / base / consume.py: 99%

199 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import asyncio 

2from unittest.mock import MagicMock, call 

3 

4import anyio 

5import pytest 

6from pydantic import BaseModel 

7 

8from faststream import Context, Depends 

9from faststream.exceptions import StopConsume 

10 

11from .basic import BaseTestcaseConfig 

12 

13 

14@pytest.mark.asyncio() 

15class BrokerConsumeTestcase(BaseTestcaseConfig): 

16 async def test_consume( 

17 self, 

18 queue: str, 

19 ) -> None: 

20 event = asyncio.Event() 

21 consume_broker = self.get_broker() 

22 

23 args, kwargs = self.get_subscriber_params(queue) 

24 

25 @consume_broker.subscriber(*args, **kwargs) 

26 def subscriber(m) -> None: 

27 event.set() 

28 

29 async with self.patch_broker(consume_broker) as br: 

30 await br.start() 

31 await asyncio.wait( 

32 ( 

33 asyncio.create_task(br.publish("hello", queue)), 

34 asyncio.create_task(event.wait()), 

35 ), 

36 timeout=self.timeout, 

37 ) 

38 

39 assert event.is_set() 

40 

41 async def test_consume_from_multi( 

42 self, 

43 queue: str, 

44 mock: MagicMock, 

45 ) -> None: 

46 consume_broker = self.get_broker() 

47 

48 consume = asyncio.Event() 

49 consume2 = asyncio.Event() 

50 

51 args, kwargs = self.get_subscriber_params(queue) 

52 args2, kwargs2 = self.get_subscriber_params(queue + "1") 

53 

54 @consume_broker.subscriber(*args, **kwargs) 

55 @consume_broker.subscriber(*args2, **kwargs2) 

56 def subscriber(m) -> None: 

57 mock() 

58 if not consume.is_set(): 

59 consume.set() 

60 else: 

61 consume2.set() 

62 

63 async with self.patch_broker(consume_broker) as br: 

64 await br.start() 

65 await asyncio.wait( 

66 ( 

67 asyncio.create_task(br.publish("hello", queue)), 

68 asyncio.create_task(br.publish("hello", queue + "1")), 

69 asyncio.create_task(consume.wait()), 

70 asyncio.create_task(consume2.wait()), 

71 ), 

72 timeout=self.timeout, 

73 ) 

74 

75 assert consume.is_set() 

76 assert consume2.is_set() 

77 assert mock.call_count == 2 

78 

79 async def test_consume_double( 

80 self, 

81 queue: str, 

82 mock: MagicMock, 

83 ) -> None: 

84 consume_broker = self.get_broker() 

85 

86 consume = asyncio.Event() 

87 consume2 = asyncio.Event() 

88 

89 args, kwargs = self.get_subscriber_params(queue) 

90 

91 @consume_broker.subscriber(*args, **kwargs) 

92 async def handler(m) -> None: 

93 mock() 

94 if not consume.is_set(): 

95 consume.set() 

96 else: 

97 consume2.set() 

98 

99 async with self.patch_broker(consume_broker) as br: 

100 await br.start() 

101 await asyncio.wait( 

102 ( 

103 asyncio.create_task(br.publish("hello", queue)), 

104 asyncio.create_task(br.publish("hello", queue)), 

105 asyncio.create_task(consume.wait()), 

106 asyncio.create_task(consume2.wait()), 

107 ), 

108 timeout=self.timeout, 

109 ) 

110 

111 assert consume2.is_set() 

112 assert consume.is_set() 

113 assert mock.call_count == 2 

114 

115 async def test_different_consume( 

116 self, 

117 queue: str, 

118 mock: MagicMock, 

119 ) -> None: 

120 consume_broker = self.get_broker() 

121 

122 consume = asyncio.Event() 

123 consume2 = asyncio.Event() 

124 

125 args, kwargs = self.get_subscriber_params(queue) 

126 

127 @consume_broker.subscriber(*args, **kwargs) 

128 def handler(m) -> None: 

129 mock.handler() 

130 consume.set() 

131 

132 another_topic = queue + "1" 

133 args, kwargs = self.get_subscriber_params(another_topic) 

134 

135 @consume_broker.subscriber(*args, **kwargs) 

136 def handler2(m) -> None: 

137 mock.handler2() 

138 consume2.set() 

139 

140 async with self.patch_broker(consume_broker) as br: 

141 await br.start() 

142 await asyncio.wait( 

143 ( 

144 asyncio.create_task(br.publish("hello", queue)), 

145 asyncio.create_task(br.publish("hello", another_topic)), 

146 asyncio.create_task(consume.wait()), 

147 asyncio.create_task(consume2.wait()), 

148 ), 

149 timeout=self.timeout, 

150 ) 

151 

152 assert consume.is_set() 

153 assert consume2.is_set() 

154 mock.handler.assert_called_once() 

155 mock.handler2.assert_called_once() 

156 

157 async def test_consume_with_filter( 

158 self, 

159 queue: str, 

160 mock: MagicMock, 

161 ) -> None: 

162 consume_broker = self.get_broker() 

163 

164 consume = asyncio.Event() 

165 consume2 = asyncio.Event() 

166 

167 args, kwargs = self.get_subscriber_params( 

168 queue, 

169 ) 

170 

171 sub = consume_broker.subscriber(*args, **kwargs) 

172 

173 @sub(filter=lambda m: m.content_type == "application/json") 

174 async def handler(m) -> None: 

175 mock.handler(m) 

176 consume.set() 

177 

178 @sub 

179 async def handler2(m) -> None: 

180 mock.handler2(m) 

181 consume2.set() 

182 

183 async with self.patch_broker(consume_broker) as br: 

184 await br.start() 

185 await asyncio.wait( 

186 ( 

187 asyncio.create_task(br.publish({"msg": "hello"}, queue)), 

188 asyncio.create_task(br.publish("hello", queue)), 

189 asyncio.create_task(consume.wait()), 

190 asyncio.create_task(consume2.wait()), 

191 ), 

192 timeout=self.timeout, 

193 ) 

194 

195 assert consume.is_set() 

196 assert consume2.is_set() 

197 mock.handler.assert_called_once_with({"msg": "hello"}) 

198 mock.handler2.assert_called_once_with("hello") 

199 

200 async def test_consume_validate_false( 

201 self, 

202 queue: str, 

203 mock: MagicMock, 

204 ) -> None: 

205 event = asyncio.Event() 

206 

207 consume_broker = self.get_broker( 

208 apply_types=True, 

209 serializer=None, 

210 ) 

211 

212 class Foo(BaseModel): 

213 x: int 

214 

215 def dependency() -> str: 

216 return "100" 

217 

218 args, kwargs = self.get_subscriber_params(queue) 

219 

220 @consume_broker.subscriber(*args, **kwargs) 

221 async def handler( 

222 m: Foo, 

223 dep: int = Depends(dependency), 

224 broker=Context(), 

225 ) -> None: 

226 mock(m, dep, broker) 

227 event.set() 

228 

229 async with self.patch_broker(consume_broker) as br: 

230 await br.start() 

231 

232 await asyncio.wait( 

233 ( 

234 asyncio.create_task(br.publish({"x": 1}, queue)), 

235 asyncio.create_task(event.wait()), 

236 ), 

237 timeout=self.timeout, 

238 ) 

239 

240 assert event.is_set() 

241 mock.assert_called_once_with({"x": 1}, "100", consume_broker) 

242 

243 async def test_dynamic_sub(self, queue: str) -> None: 

244 event = asyncio.Event() 

245 

246 consume_broker = self.get_broker() 

247 

248 async def subscriber(m) -> None: 

249 event.set() 

250 

251 async with self.patch_broker(consume_broker) as br: 

252 await br.start() 

253 

254 args, kwargs = self.get_subscriber_params(queue) 

255 sub = br.subscriber(*args, **kwargs) 

256 sub(subscriber) 

257 await sub.start() 

258 

259 await br.publish("hello", queue) 

260 

261 with anyio.move_on_after(self.timeout): 

262 await event.wait() 

263 

264 await sub.stop() 

265 

266 assert event.is_set() 

267 

268 async def test_get_one_conflicts_with_handler(self, queue) -> None: 

269 broker = self.get_broker(apply_types=True) 

270 args, kwargs = self.get_subscriber_params(queue) 

271 subscriber = broker.subscriber(*args, **kwargs) 

272 

273 @subscriber 

274 async def t() -> None: ... 

275 

276 async with self.patch_broker(broker) as br: 

277 await br.start() 

278 

279 with pytest.raises(AssertionError): 

280 await subscriber.get_one(timeout=1e-24) 

281 

282 

283@pytest.mark.asyncio() 

284class BrokerRealConsumeTestcase(BrokerConsumeTestcase): 

285 async def test_get_one( 

286 self, 

287 queue: str, 

288 mock: MagicMock, 

289 ) -> None: 

290 broker = self.get_broker(apply_types=True) 

291 

292 args, kwargs = self.get_subscriber_params(queue) 

293 subscriber = broker.subscriber(*args, **kwargs) 

294 

295 async with self.patch_broker(broker) as br: 

296 await br.start() 

297 

298 async def consume() -> None: 

299 mock(await subscriber.get_one(timeout=self.timeout)) 

300 

301 async def publish() -> None: 

302 await anyio.sleep(1e-24) 

303 await br.publish("test_message", queue) 

304 

305 await asyncio.wait( 

306 ( 

307 asyncio.create_task(consume()), 

308 asyncio.create_task(publish()), 

309 ), 

310 timeout=self.timeout, 

311 ) 

312 

313 mock.assert_called_once() 

314 message = mock.call_args[0][0] 

315 assert message 

316 assert await message.decode() == "test_message", await message.decode() 

317 

318 async def test_get_one_timeout( 

319 self, 

320 queue: str, 

321 mock: MagicMock, 

322 ) -> None: 

323 broker = self.get_broker(apply_types=True) 

324 args, kwargs = self.get_subscriber_params(queue) 

325 subscriber = broker.subscriber(*args, **kwargs) 

326 

327 async with self.patch_broker(broker) as br: 

328 await br.start() 

329 

330 mock(await subscriber.get_one(timeout=1e-24)) 

331 mock.assert_called_once_with(None) 

332 

333 @pytest.mark.slow() 

334 async def test_stop_consume_exc( 

335 self, 

336 queue: str, 

337 mock: MagicMock, 

338 ) -> None: 

339 event = asyncio.Event() 

340 

341 consume_broker = self.get_broker() 

342 

343 args, kwargs = self.get_subscriber_params(queue) 

344 

345 @consume_broker.subscriber(*args, **kwargs) 

346 def subscriber(m): 

347 mock() 

348 event.set() 

349 raise StopConsume 

350 

351 async with self.patch_broker(consume_broker) as br: 

352 await br.start() 

353 await asyncio.wait( 

354 ( 

355 asyncio.create_task(br.publish("hello", queue)), 

356 asyncio.create_task(event.wait()), 

357 ), 

358 timeout=self.timeout, 

359 ) 

360 await asyncio.sleep(0.5) 

361 await br.publish("hello", queue) 

362 await asyncio.sleep(0.5) 

363 

364 assert event.is_set() 

365 mock.assert_called_once() 

366 

367 @pytest.mark.asyncio() 

368 async def test_iteration( 

369 self, 

370 queue: str, 

371 mock: MagicMock, 

372 ) -> None: 

373 expected_messages = ("test_message_1", "test_message_2") 

374 

375 broker = self.get_broker(apply_types=True) 

376 

377 args, kwargs = self.get_subscriber_params(queue) 

378 subscriber = broker.subscriber(*args, **kwargs) 

379 

380 async with self.patch_broker(broker) as br: 

381 await br.start() 

382 

383 async def publish_test_message(): 

384 for msg in expected_messages: 

385 await br.publish(msg, queue) 

386 

387 async def consume(): 

388 index_message = 0 

389 async for msg in subscriber: 389 ↛ exitline 389 didn't return from function 'consume' because the loop on line 389 didn't complete

390 result_message = await msg.decode() 

391 

392 mock(result_message) 

393 

394 index_message += 1 

395 if index_message >= len(expected_messages): 

396 break 

397 

398 await asyncio.wait( 

399 ( 

400 asyncio.create_task(consume()), 

401 asyncio.create_task(publish_test_message()), 

402 ), 

403 timeout=self.timeout, 

404 ) 

405 

406 calls = [call(msg) for msg in expected_messages] 

407 mock.assert_has_calls(calls=calls)