Coverage for tests / brokers / base / publish.py: 99%

228 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import asyncio 

2from contextlib import suppress 

3from dataclasses import asdict, dataclass 

4from datetime import datetime, timezone 

5from typing import Any 

6from unittest.mock import MagicMock 

7 

8import anyio 

9import pytest 

10from pydantic import BaseModel 

11 

12from faststream import BaseMiddleware, Context, Response 

13from faststream._internal._compat import dump_json, model_to_json 

14from faststream.exceptions import SubscriberNotFound 

15 

16from .basic import BaseTestcaseConfig 

17 

18 

19class SimpleModel(BaseModel): 

20 r: str 

21 

22 

23@dataclass 

24class SimpleDataclass: 

25 r: str 

26 

27 

28now = datetime.now(timezone.utc) 

29 

30parametrized = ( 

31 pytest.param( 

32 "hello", 

33 str, 

34 "hello", 

35 id="str->str", 

36 ), 

37 pytest.param( 

38 b"hello", 

39 bytes, 

40 b"hello", 

41 id="bytes->bytes", 

42 ), 

43 pytest.param( 

44 1, 

45 int, 

46 1, 

47 id="int->int", 

48 ), 

49 pytest.param( 

50 1.0, 

51 float, 

52 1.0, 

53 id="float->float", 

54 ), 

55 pytest.param( 

56 1, 

57 float, 

58 1.0, 

59 id="int->float", 

60 ), 

61 pytest.param( 

62 False, 

63 bool, 

64 False, 

65 id="bool->bool", 

66 ), 

67 pytest.param( 

68 {"m": 1}, 

69 dict[str, int], 

70 {"m": 1}, 

71 id="dict->dict", 

72 ), 

73 pytest.param( 

74 [1, 2, 3], 

75 list[int], 

76 [1, 2, 3], 

77 id="list->list", 

78 ), 

79 pytest.param( 

80 now, 

81 datetime, 

82 now, 

83 id="datetime->datetime", 

84 ), 

85 pytest.param( 

86 dump_json(asdict(SimpleDataclass(r="hello!"))), 

87 SimpleDataclass, 

88 SimpleDataclass(r="hello!"), 

89 id="bytes->dataclass", 

90 ), 

91 pytest.param( 

92 SimpleDataclass(r="hello!"), 

93 SimpleDataclass, 

94 SimpleDataclass(r="hello!"), 

95 id="dataclass->dataclass", 

96 ), 

97 pytest.param( 

98 SimpleDataclass(r="hello!"), 

99 dict, 

100 {"r": "hello!"}, 

101 id="dataclass->dict", 

102 ), 

103 pytest.param( 

104 {"r": "hello!"}, 

105 SimpleDataclass, 

106 SimpleDataclass(r="hello!"), 

107 id="dict->dataclass", 

108 ), 

109) 

110 

111 

112class BrokerPublishTestcase(BaseTestcaseConfig): 

113 @pytest.mark.asyncio() 

114 @pytest.mark.parametrize( 

115 ("message", "message_type", "expected_message"), 

116 ( 

117 *parametrized, 

118 pytest.param( 

119 model_to_json(SimpleModel(r="hello!")).encode(), 

120 SimpleModel, 

121 SimpleModel(r="hello!"), 

122 id="bytes->model", 

123 ), 

124 pytest.param( 

125 SimpleModel(r="hello!"), 

126 SimpleModel, 

127 SimpleModel(r="hello!"), 

128 id="model->model", 

129 ), 

130 pytest.param( 

131 SimpleModel(r="hello!"), 

132 dict, 

133 {"r": "hello!"}, 

134 id="model->dict", 

135 ), 

136 pytest.param( 

137 {"r": "hello!"}, 

138 SimpleModel, 

139 SimpleModel(r="hello!"), 

140 id="dict->model", 

141 ), 

142 ), 

143 ) 

144 async def test_serialize( 

145 self, 

146 queue: str, 

147 message: Any, 

148 message_type: Any, 

149 expected_message: Any, 

150 mock: MagicMock, 

151 ) -> None: 

152 event = asyncio.Event() 

153 

154 pub_broker = self.get_broker(apply_types=True) 

155 

156 args, kwargs = self.get_subscriber_params(queue) 

157 

158 @pub_broker.subscriber(*args, **kwargs) 

159 async def handler(m: message_type) -> None: 

160 event.set() 

161 mock(m) 

162 

163 async with self.patch_broker(pub_broker) as br: 

164 await br.start() 

165 

166 await asyncio.wait( 

167 ( 

168 asyncio.create_task(br.publish(message, queue)), 

169 asyncio.create_task(event.wait()), 

170 ), 

171 timeout=self.timeout, 

172 ) 

173 

174 mock.assert_called_with(expected_message) 

175 

176 @pytest.mark.asyncio() 

177 async def test_response( 

178 self, 

179 queue: str, 

180 mock: MagicMock, 

181 ) -> None: 

182 event = asyncio.Event() 

183 

184 pub_broker = self.get_broker(apply_types=True) 

185 

186 args, kwargs = self.get_subscriber_params(queue) 

187 

188 @pub_broker.subscriber(*args, **kwargs) 

189 @pub_broker.publisher(queue + "1") 

190 async def m(): 

191 return Response(1, headers={"custom": "1"}, correlation_id="1") 

192 

193 args2, kwargs2 = self.get_subscriber_params(queue + "1") 

194 

195 @pub_broker.subscriber(*args2, **kwargs2) 

196 async def m_next(msg=Context("message")) -> None: 

197 event.set() 

198 mock( 

199 body=msg.body, 

200 headers=msg.headers["custom"], 

201 correlation_id=msg.correlation_id, 

202 ) 

203 

204 async with self.patch_broker(pub_broker) as br: 

205 await br.start() 

206 await asyncio.wait( 

207 ( 

208 asyncio.create_task(br.publish(None, queue)), 

209 asyncio.create_task(event.wait()), 

210 ), 

211 timeout=self.timeout, 

212 ) 

213 

214 mock.assert_called_with( 

215 body=b"1", 

216 correlation_id="1", 

217 headers="1", 

218 ) 

219 

220 @pytest.mark.asyncio() 

221 async def test_unwrap_dict( 

222 self, 

223 queue: str, 

224 mock: MagicMock, 

225 ) -> None: 

226 event = asyncio.Event() 

227 

228 pub_broker = self.get_broker(apply_types=True) 

229 

230 args, kwargs = self.get_subscriber_params(queue) 

231 

232 @pub_broker.subscriber(*args, **kwargs) 

233 async def m(a: int, b: int) -> None: 

234 event.set() 

235 mock({"a": a, "b": b}) 

236 

237 async with self.patch_broker(pub_broker) as br: 

238 await br.start() 

239 await asyncio.wait( 

240 ( 

241 asyncio.create_task(br.publish({"a": 1, "b": 1.0}, queue)), 

242 asyncio.create_task(event.wait()), 

243 ), 

244 timeout=self.timeout, 

245 ) 

246 

247 mock.assert_called_with( 

248 { 

249 "a": 1, 

250 "b": 1, 

251 }, 

252 ) 

253 

254 @pytest.mark.asyncio() 

255 async def test_unwrap_list( 

256 self, 

257 mock: MagicMock, 

258 queue: str, 

259 ) -> None: 

260 event = asyncio.Event() 

261 

262 pub_broker = self.get_broker(apply_types=True) 

263 

264 args, kwargs = self.get_subscriber_params(queue) 

265 

266 @pub_broker.subscriber(*args, **kwargs) 

267 async def m(a: int, b: int, *args: tuple[int, ...]) -> None: 

268 event.set() 

269 mock({"a": a, "b": b, "args": args}) 

270 

271 async with self.patch_broker(pub_broker) as br: 

272 await br.start() 

273 await asyncio.wait( 

274 ( 

275 asyncio.create_task(br.publish([1, 2.0, 3.0, 4.0], queue)), 

276 asyncio.create_task(event.wait()), 

277 ), 

278 timeout=self.timeout, 

279 ) 

280 

281 mock.assert_called_with({"a": 1, "b": 2, "args": (3, 4)}) 

282 

283 @pytest.mark.asyncio() 

284 async def test_base_publisher( 

285 self, 

286 queue: str, 

287 mock: MagicMock, 

288 ) -> None: 

289 event = asyncio.Event() 

290 

291 pub_broker = self.get_broker(apply_types=True) 

292 

293 args, kwargs = self.get_subscriber_params(queue) 

294 

295 @pub_broker.subscriber(*args, **kwargs) 

296 @pub_broker.publisher(queue + "resp") 

297 async def m() -> str: 

298 return "" 

299 

300 args2, kwargs2 = self.get_subscriber_params(queue + "resp") 

301 

302 @pub_broker.subscriber(*args2, **kwargs2) 

303 async def resp(msg) -> None: 

304 event.set() 

305 mock(msg) 

306 

307 async with self.patch_broker(pub_broker) as br: 

308 await br.start() 

309 await asyncio.wait( 

310 ( 

311 asyncio.create_task(br.publish("", queue)), 

312 asyncio.create_task(event.wait()), 

313 ), 

314 timeout=self.timeout, 

315 ) 

316 

317 assert event.is_set() 

318 mock.assert_called_once_with("") 

319 

320 @pytest.mark.asyncio() 

321 async def test_publisher_object( 

322 self, 

323 queue: str, 

324 mock: MagicMock, 

325 ) -> None: 

326 event = asyncio.Event() 

327 

328 pub_broker = self.get_broker(apply_types=True) 

329 

330 publisher = pub_broker.publisher(queue + "resp") 

331 

332 args, kwargs = self.get_subscriber_params(queue) 

333 

334 @publisher 

335 @pub_broker.subscriber(*args, **kwargs) 

336 async def m() -> str: 

337 return "" 

338 

339 args, kwargs = self.get_subscriber_params(queue + "resp") 

340 

341 @pub_broker.subscriber(*args, **kwargs) 

342 async def resp(msg) -> None: 

343 event.set() 

344 mock(msg) 

345 

346 async with self.patch_broker(pub_broker) as br: 

347 await br.start() 

348 await asyncio.wait( 

349 ( 

350 asyncio.create_task(br.publish("", queue)), 

351 asyncio.create_task(event.wait()), 

352 ), 

353 timeout=self.timeout, 

354 ) 

355 

356 mock.assert_called_once_with("") 

357 

358 @pytest.mark.asyncio() 

359 async def test_publish_manual( 

360 self, 

361 queue: str, 

362 mock: MagicMock, 

363 ) -> None: 

364 event = asyncio.Event() 

365 

366 pub_broker = self.get_broker(apply_types=True) 

367 

368 publisher = pub_broker.publisher(queue + "resp") 

369 

370 args, kwargs = self.get_subscriber_params(queue) 

371 

372 @pub_broker.subscriber(*args, **kwargs) 

373 async def m() -> None: 

374 await publisher.publish("") 

375 

376 args2, kwargs2 = self.get_subscriber_params(queue + "resp") 

377 

378 @pub_broker.subscriber(*args2, **kwargs2) 

379 async def resp(msg) -> None: 

380 event.set() 

381 mock(msg) 

382 

383 async with self.patch_broker(pub_broker) as br: 

384 await br.start() 

385 await asyncio.wait( 

386 ( 

387 asyncio.create_task(br.publish("", queue)), 

388 asyncio.create_task(event.wait()), 

389 ), 

390 timeout=self.timeout, 

391 ) 

392 

393 mock.assert_called_once_with("") 

394 

395 @pytest.mark.asyncio() 

396 async def test_multiple_publishers( 

397 self, 

398 queue: str, 

399 mock: MagicMock, 

400 ) -> None: 

401 pub_broker = self.get_broker(apply_types=True) 

402 

403 event = anyio.Event() 

404 event2 = anyio.Event() 

405 

406 args, kwargs = self.get_subscriber_params(queue) 

407 

408 @pub_broker.publisher(queue + "resp2") 

409 @pub_broker.subscriber(*args, **kwargs) 

410 @pub_broker.publisher(queue + "resp") 

411 async def m() -> str: 

412 return "" 

413 

414 args2, kwargs2 = self.get_subscriber_params(queue + "resp") 

415 

416 @pub_broker.subscriber(*args2, **kwargs2) 

417 async def resp(msg) -> None: 

418 event.set() 

419 mock.resp1(msg) 

420 

421 args3, kwargs3 = self.get_subscriber_params(queue + "resp2") 

422 

423 @pub_broker.subscriber(*args3, **kwargs3) 

424 async def resp2(msg) -> None: 

425 event2.set() 

426 mock.resp2(msg) 

427 

428 async with self.patch_broker(pub_broker) as br: 

429 await br.start() 

430 await asyncio.wait( 

431 ( 

432 asyncio.create_task(br.publish("", queue)), 

433 asyncio.create_task(event.wait()), 

434 asyncio.create_task(event2.wait()), 

435 ), 

436 timeout=self.timeout, 

437 ) 

438 

439 mock.resp1.assert_called_once_with("") 

440 mock.resp2.assert_called_once_with("") 

441 

442 @pytest.mark.asyncio() 

443 async def test_reusable_publishers( 

444 self, 

445 queue: str, 

446 mock: MagicMock, 

447 ) -> None: 

448 pub_broker = self.get_broker(apply_types=True) 

449 

450 consume = anyio.Event() 

451 consume2 = anyio.Event() 

452 

453 pub = pub_broker.publisher(queue + "resp") 

454 

455 args, kwargs = self.get_subscriber_params(queue) 

456 

457 @pub 

458 @pub_broker.subscriber(*args, **kwargs) 

459 async def m() -> str: 

460 return "" 

461 

462 args2, kwargs2 = self.get_subscriber_params(queue + "2") 

463 

464 @pub 

465 @pub_broker.subscriber(*args2, **kwargs2) 

466 async def m2() -> str: 

467 return "" 

468 

469 args3, kwargs3 = self.get_subscriber_params(queue + "resp") 

470 

471 @pub_broker.subscriber(*args3, **kwargs3) 

472 async def resp() -> None: 

473 if not consume.is_set(): 

474 consume.set() 

475 else: 

476 consume2.set() 

477 mock() 

478 

479 async with self.patch_broker(pub_broker) as br: 

480 await br.start() 

481 await asyncio.wait( 

482 ( 

483 asyncio.create_task(br.publish("", queue)), 

484 asyncio.create_task(br.publish("", queue + "2")), 

485 asyncio.create_task(consume.wait()), 

486 asyncio.create_task(consume2.wait()), 

487 ), 

488 timeout=self.timeout, 

489 ) 

490 

491 assert mock.call_count == 2 

492 

493 @pytest.mark.asyncio() 

494 async def test_reply_to( 

495 self, 

496 queue: str, 

497 mock: MagicMock, 

498 ) -> None: 

499 event = asyncio.Event() 

500 

501 pub_broker = self.get_broker(apply_types=True) 

502 

503 args, kwargs = self.get_subscriber_params(queue + "reply") 

504 

505 @pub_broker.subscriber(*args, **kwargs) 

506 async def reply_handler(m) -> None: 

507 event.set() 

508 mock(m) 

509 

510 args2, kwargs2 = self.get_subscriber_params(queue) 

511 

512 @pub_broker.subscriber(*args2, **kwargs2) 

513 async def handler(m): 

514 return m 

515 

516 async with self.patch_broker(pub_broker) as br: 

517 await br.start() 

518 

519 await asyncio.wait( 

520 ( 

521 asyncio.create_task( 

522 br.publish("Hello!", queue, reply_to=queue + "reply"), 

523 ), 

524 asyncio.create_task(event.wait()), 

525 ), 

526 timeout=self.timeout, 

527 ) 

528 

529 mock.assert_called_with("Hello!") 

530 

531 @pytest.mark.asyncio() 

532 async def test_no_reply( 

533 self, 

534 queue: str, 

535 mock: MagicMock, 

536 ) -> None: 

537 event = asyncio.Event() 

538 

539 class Mid(BaseMiddleware): 

540 async def after_processed(self, *args: Any, **kwargs: Any): 

541 event.set() 

542 

543 return await super().after_processed(*args, **kwargs) 

544 

545 pub_broker = self.get_broker(apply_types=True) 

546 pub_broker.add_middleware(Mid) 

547 

548 args, kwargs = self.get_subscriber_params(queue + "reply") 

549 

550 @pub_broker.subscriber(*args, **kwargs) 

551 async def reply_handler(m) -> None: 

552 mock(m) 

553 

554 args2, kwargs2 = self.get_subscriber_params(queue, no_reply=True) 

555 

556 @pub_broker.subscriber(*args2, **kwargs2) 

557 async def handler(m): 

558 return m 

559 

560 async with self.patch_broker(pub_broker) as br: 

561 await br.start() 

562 

563 await asyncio.wait( 

564 ( 

565 asyncio.create_task( 

566 br.publish("Hello!", queue, reply_to=queue + "reply"), 

567 ), 

568 asyncio.create_task(event.wait()), 

569 ), 

570 timeout=self.timeout, 

571 ) 

572 

573 assert not mock.called 

574 

575 @pytest.mark.asyncio() 

576 async def test_publisher_after_connect(self, queue: str) -> None: 

577 async with self.patch_broker(self.get_broker()) as br: 

578 # Should pass without error 

579 # suppress TestClient error due where is no suitable subscriber 

580 with suppress(SubscriberNotFound): 

581 await br.publisher(queue).publish(None) 

582 

583 @pytest.mark.asyncio() 

584 async def test_publisher_after_start( 

585 self, 

586 queue: str, 

587 mock: MagicMock, 

588 ) -> None: 

589 event = asyncio.Event() 

590 

591 pub_broker = self.get_broker(apply_types=True) 

592 

593 args, kwargs = self.get_subscriber_params(queue) 

594 

595 @pub_broker.subscriber(*args, **kwargs) 

596 async def handler(m) -> None: 

597 event.set() 

598 mock(m) 

599 

600 async with self.patch_broker(pub_broker) as br: 

601 await br.start() 

602 

603 pub = br.publisher(queue) 

604 

605 await asyncio.wait( 

606 ( 

607 asyncio.create_task(pub.publish("Hello!")), 

608 asyncio.create_task(event.wait()), 

609 ), 

610 timeout=self.timeout, 

611 ) 

612 

613 mock.assert_called_with("Hello!")