Coverage for tests / brokers / nats / test_consume.py: 99%

424 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import asyncio 

2from unittest.mock import MagicMock, patch 

3 

4import pytest 

5from nats.aio.msg import Msg 

6 

7from faststream import AckPolicy 

8from faststream.exceptions import AckMessage 

9from faststream.nats import ConsumerConfig, JStream, PubAck, PullSub 

10from faststream.nats.annotations import NatsMessage 

11from faststream.nats.message import NatsMessage as StreamMessage 

12from tests.brokers.base.consume import BrokerRealConsumeTestcase 

13from tests.tools import spy_decorator 

14 

15from .basic import NatsTestcaseConfig 

16 

17 

18@pytest.mark.connected() 

19@pytest.mark.nats() 

20class TestConsume(NatsTestcaseConfig, BrokerRealConsumeTestcase): 

21 async def test_concurrent_subscriber( 

22 self, 

23 queue: str, 

24 mock: MagicMock, 

25 ) -> None: 

26 event = asyncio.Event() 

27 event2 = asyncio.Event() 

28 

29 broker = self.get_broker() 

30 

31 args, kwargs = self.get_subscriber_params(queue, max_workers=2) 

32 

33 @broker.subscriber(*args, **kwargs) 

34 async def handler(msg): 

35 mock() 

36 

37 if event.is_set(): 

38 event2.set() 

39 else: 

40 event.set() 

41 

42 await asyncio.sleep(1.0) 

43 

44 async with self.patch_broker(broker) as br: 

45 await br.start() 

46 

47 for i in range(5): 

48 await br.publish(i, queue) 

49 

50 await asyncio.wait( 

51 ( 

52 asyncio.create_task(event.wait()), 

53 asyncio.create_task(event2.wait()), 

54 ), 

55 timeout=3, 

56 ) 

57 

58 assert event.is_set() 

59 assert event2.is_set() 

60 assert mock.call_count == 2, mock.call_count 

61 

62 async def test_consume_js( 

63 self, 

64 queue: str, 

65 stream: JStream, 

66 ) -> None: 

67 event = asyncio.Event() 

68 

69 consume_broker = self.get_broker() 

70 

71 args, kwargs = self.get_subscriber_params(queue, stream=stream) 

72 

73 @consume_broker.subscriber(*args, **kwargs) 

74 def subscriber(m) -> None: 

75 event.set() 

76 

77 async with self.patch_broker(consume_broker) as br: 

78 await br.start() 

79 

80 result = await br.publish("hello", queue, stream=stream.name) 

81 

82 await asyncio.wait( 

83 (asyncio.create_task(event.wait()),), 

84 timeout=3, 

85 ) 

86 

87 assert isinstance(result, PubAck), result 

88 assert event.is_set() 

89 

90 async def test_consume_with_filter( 

91 self, 

92 queue: str, 

93 mock: MagicMock, 

94 ) -> None: 

95 event = asyncio.Event() 

96 

97 consume_broker = self.get_broker() 

98 

99 @consume_broker.subscriber( 

100 config=ConsumerConfig(filter_subjects=[f"{queue}.a"]), 

101 stream=JStream(queue, subjects=[f"{queue}.*"]), 

102 ) 

103 def subscriber(m) -> None: 

104 mock(m) 

105 event.set() 

106 

107 async with self.patch_broker(consume_broker) as br: 

108 await br.start() 

109 await asyncio.wait( 

110 ( 

111 asyncio.create_task(br.publish(2, f"{queue}.a")), 

112 asyncio.create_task(event.wait()), 

113 ), 

114 timeout=3, 

115 ) 

116 

117 assert event.is_set() 

118 mock.assert_called_once_with(2) 

119 

120 async def test_consume_pull( 

121 self, 

122 queue: str, 

123 stream: JStream, 

124 mock, 

125 ) -> None: 

126 event = asyncio.Event() 

127 

128 consume_broker = self.get_broker() 

129 

130 @consume_broker.subscriber( 

131 queue, 

132 stream=stream, 

133 pull_sub=PullSub(1), 

134 ) 

135 def subscriber(m) -> None: 

136 mock(m) 

137 event.set() 

138 

139 async with self.patch_broker(consume_broker) as br: 

140 await br.start() 

141 

142 await asyncio.wait( 

143 ( 

144 asyncio.create_task(br.publish("hello", queue)), 

145 asyncio.create_task(event.wait()), 

146 ), 

147 timeout=3, 

148 ) 

149 

150 assert event.is_set() 

151 mock.assert_called_once_with("hello") 

152 

153 async def test_consume_batch( 

154 self, 

155 queue: str, 

156 stream: JStream, 

157 mock, 

158 ) -> None: 

159 event = asyncio.Event() 

160 

161 consume_broker = self.get_broker() 

162 

163 @consume_broker.subscriber( 

164 queue, 

165 stream=stream, 

166 pull_sub=PullSub(1, batch=True), 

167 ) 

168 def subscriber(m) -> None: 

169 mock(m) 

170 event.set() 

171 

172 async with self.patch_broker(consume_broker) as br: 

173 await br.start() 

174 

175 await asyncio.wait( 

176 ( 

177 asyncio.create_task(br.publish(b"hello", queue)), 

178 asyncio.create_task(event.wait()), 

179 ), 

180 timeout=3, 

181 ) 

182 

183 assert event.is_set() 

184 mock.assert_called_once_with([b"hello"]) 

185 

186 async def test_core_consume_no_ack( 

187 self, 

188 queue: str, 

189 mock: MagicMock, 

190 ) -> None: 

191 event = asyncio.Event() 

192 

193 consume_broker = self.get_broker(apply_types=True) 

194 

195 args, kwargs = self.get_subscriber_params( 

196 queue, 

197 ack_policy=AckPolicy.MANUAL, 

198 ) 

199 

200 @consume_broker.subscriber(*args, **kwargs) 

201 async def handler(msg: NatsMessage) -> None: 

202 mock(msg.raw_message._ackd) 

203 event.set() 

204 

205 async with self.patch_broker(consume_broker) as br: 

206 await br.start() 

207 

208 # Check, that Core Subscriber doesn't call Acknowledgement automatically 

209 with patch.object( 

210 StreamMessage, 

211 "ack", 

212 spy_decorator(StreamMessage.ack), 

213 ) as m: 

214 await asyncio.wait( 

215 ( 

216 asyncio.create_task(br.publish("hello", queue)), 

217 asyncio.create_task(event.wait()), 

218 ), 

219 timeout=3, 

220 ) 

221 assert not m.mock.called 

222 

223 assert event.is_set() 

224 mock.assert_called_once_with(True) # True was set by parser 

225 

226 async def test_consume_ack( 

227 self, 

228 queue: str, 

229 stream: JStream, 

230 ) -> None: 

231 event = asyncio.Event() 

232 

233 consume_broker = self.get_broker(apply_types=True) 

234 

235 @consume_broker.subscriber(queue, stream=stream) 

236 async def handler(msg: NatsMessage) -> None: 

237 event.set() 

238 

239 async with self.patch_broker(consume_broker) as br: 

240 await br.start() 

241 

242 with patch.object(Msg, "ack", spy_decorator(Msg.ack)) as m: 

243 await asyncio.wait( 

244 ( 

245 asyncio.create_task(br.publish("hello", queue)), 

246 asyncio.create_task(event.wait()), 

247 ), 

248 timeout=3, 

249 ) 

250 m.mock.assert_called_once() 

251 

252 assert event.is_set() 

253 

254 async def test_consume_ack_manual( 

255 self, 

256 queue: str, 

257 stream: JStream, 

258 ) -> None: 

259 event = asyncio.Event() 

260 

261 consume_broker = self.get_broker(apply_types=True) 

262 

263 @consume_broker.subscriber(queue, stream=stream) 

264 async def handler(msg: NatsMessage) -> None: 

265 await msg.ack() 

266 event.set() 

267 

268 async with self.patch_broker(consume_broker) as br: 

269 await br.start() 

270 

271 with patch.object(Msg, "ack", spy_decorator(Msg.ack)) as m: 

272 await asyncio.wait( 

273 ( 

274 asyncio.create_task(br.publish("hello", queue)), 

275 asyncio.create_task(event.wait()), 

276 ), 

277 timeout=3, 

278 ) 

279 m.mock.assert_called_once() 

280 

281 assert event.is_set() 

282 

283 async def test_consume_ack_sync_manual( 

284 self, 

285 queue: str, 

286 event: asyncio.Event, 

287 stream: JStream, 

288 ): 

289 consume_broker = self.get_broker(apply_types=True) 

290 

291 @consume_broker.subscriber(queue, stream=stream) 

292 async def handler(msg: NatsMessage): 

293 await msg.ack_sync() 

294 event.set() 

295 

296 async with self.patch_broker(consume_broker) as br: 

297 await br.start() 

298 

299 with patch.object(Msg, "ack_sync", spy_decorator(Msg.ack_sync)) as m: 

300 await asyncio.wait( 

301 ( 

302 asyncio.create_task(br.publish("hello", queue)), 

303 asyncio.create_task(event.wait()), 

304 ), 

305 timeout=3, 

306 ) 

307 m.mock.assert_called_once() 

308 

309 assert event.is_set() 

310 

311 async def test_consume_ack_raise( 

312 self, 

313 queue: str, 

314 stream: JStream, 

315 ) -> None: 

316 event = asyncio.Event() 

317 

318 consume_broker = self.get_broker(apply_types=True) 

319 

320 @consume_broker.subscriber(queue, stream=stream) 

321 async def handler(msg: NatsMessage): 

322 event.set() 

323 raise AckMessage 

324 

325 async with self.patch_broker(consume_broker) as br: 

326 await br.start() 

327 

328 with patch.object(Msg, "ack", spy_decorator(Msg.ack)) as m: 

329 await asyncio.wait( 

330 ( 

331 asyncio.create_task(br.publish("hello", queue)), 

332 asyncio.create_task(event.wait()), 

333 ), 

334 timeout=3, 

335 ) 

336 m.mock.assert_called_once() 

337 

338 assert event.is_set() 

339 

340 async def test_nack( 

341 self, 

342 queue: str, 

343 stream: JStream, 

344 ) -> None: 

345 event = asyncio.Event() 

346 

347 consume_broker = self.get_broker(apply_types=True) 

348 

349 @consume_broker.subscriber(queue, stream=stream) 

350 async def handler(msg: NatsMessage) -> None: 

351 await msg.nack() 

352 event.set() 

353 

354 async with self.patch_broker(consume_broker) as br: 

355 await br.start() 

356 

357 with patch.object(Msg, "nak", spy_decorator(Msg.nak)) as m: 

358 await asyncio.wait( 

359 ( 

360 asyncio.create_task(br.publish("hello", queue)), 

361 asyncio.create_task(event.wait()), 

362 ), 

363 timeout=3, 

364 ) 

365 m.mock.assert_called_once() 

366 

367 assert event.is_set() 

368 

369 async def test_consume_no_ack( 

370 self, 

371 queue: str, 

372 stream: str, 

373 ) -> None: 

374 event = asyncio.Event() 

375 

376 consume_broker = self.get_broker(apply_types=True) 

377 

378 @consume_broker.subscriber( 

379 queue, 

380 stream=stream, 

381 ack_policy=AckPolicy.MANUAL, 

382 ) 

383 async def handler(msg: NatsMessage) -> None: 

384 event.set() 

385 

386 async with self.patch_broker(consume_broker) as br: 

387 await br.start() 

388 

389 with patch.object(Msg, "ack", spy_decorator(Msg.ack)) as m: 

390 await asyncio.wait( 

391 ( 

392 asyncio.create_task(br.publish("hello", queue)), 

393 asyncio.create_task(event.wait()), 

394 ), 

395 timeout=3, 

396 ) 

397 m.mock.assert_not_called() 

398 

399 assert event.is_set() 

400 

401 async def test_consume_batch_headers( 

402 self, 

403 queue: str, 

404 stream: JStream, 

405 mock, 

406 ) -> None: 

407 event = asyncio.Event() 

408 

409 consume_broker = self.get_broker(apply_types=True) 

410 

411 @consume_broker.subscriber( 

412 queue, 

413 stream=stream, 

414 pull_sub=PullSub(1, batch=True), 

415 ) 

416 def subscriber(m, msg: NatsMessage) -> None: 

417 check = all( 

418 ( 

419 msg.headers, 

420 [msg.headers] == msg.batch_headers, 

421 msg.headers.get("custom") == "1", 

422 ), 

423 ) 

424 mock(check) 

425 event.set() 

426 

427 async with self.patch_broker(consume_broker) as br: 

428 await br.start() 

429 await asyncio.wait( 

430 ( 

431 asyncio.create_task(br.publish("", queue, headers={"custom": "1"})), 

432 asyncio.create_task(event.wait()), 

433 ), 

434 timeout=3, 

435 ) 

436 

437 assert event.is_set() 

438 mock.assert_called_once_with(True) 

439 

440 @pytest.mark.asyncio() 

441 async def test_consume_kv( 

442 self, 

443 queue: str, 

444 mock, 

445 ) -> None: 

446 event = asyncio.Event() 

447 

448 consume_broker = self.get_broker(apply_types=True) 

449 

450 @consume_broker.subscriber(queue, kv_watch=queue + "1") 

451 async def handler(m) -> None: 

452 mock(m) 

453 event.set() 

454 

455 async with self.patch_broker(consume_broker) as br: 

456 await br.start() 

457 bucket = await br.key_value(queue + "1") 

458 

459 await asyncio.wait( 

460 ( 

461 asyncio.create_task( 

462 bucket.put( 

463 queue, 

464 b"world", 

465 ), 

466 ), 

467 asyncio.create_task(event.wait()), 

468 ), 

469 timeout=3, 

470 ) 

471 

472 assert event.is_set() 

473 mock.assert_called_with(b"world") 

474 

475 @pytest.mark.asyncio() 

476 async def test_consume_os( 

477 self, 

478 queue: str, 

479 mock, 

480 ) -> None: 

481 event = asyncio.Event() 

482 

483 consume_broker = self.get_broker(apply_types=True) 

484 

485 @consume_broker.subscriber(queue, obj_watch=True) 

486 async def handler(filename: str) -> None: 

487 event.set() 

488 mock(filename) 

489 

490 async with self.patch_broker(consume_broker) as br: 

491 await br.start() 

492 bucket = await br.object_storage(queue) 

493 

494 await asyncio.wait( 

495 ( 

496 asyncio.create_task( 

497 bucket.put( 

498 "hello", 

499 b"world", 

500 ), 

501 ), 

502 asyncio.create_task(event.wait()), 

503 ), 

504 timeout=3, 

505 ) 

506 

507 assert event.is_set() 

508 mock.assert_called_once_with("hello") 

509 

510 async def test_get_one_js( 

511 self, 

512 queue: str, 

513 stream: JStream, 

514 ) -> None: 

515 broker = self.get_broker(apply_types=True) 

516 subscriber = broker.subscriber(queue, stream=stream) 

517 

518 async with self.patch_broker(broker) as br: 

519 await br.start() 

520 

521 message = None 

522 

523 async def consume() -> None: 

524 nonlocal message 

525 message = await subscriber.get_one(timeout=5) 

526 

527 async def publish() -> None: 

528 await br.publish("test_message", queue, stream=stream.name) 

529 

530 await asyncio.wait( 

531 ( 

532 asyncio.create_task(consume()), 

533 asyncio.create_task(publish()), 

534 ), 

535 timeout=10, 

536 ) 

537 

538 assert message is not None 

539 assert await message.decode() == "test_message" 

540 

541 async def test_get_one_timeout_js( 

542 self, 

543 queue: str, 

544 stream: JStream, 

545 mock, 

546 ) -> None: 

547 broker = self.get_broker(apply_types=True) 

548 subscriber = broker.subscriber(queue, stream=stream) 

549 

550 async with self.patch_broker(broker) as br: 

551 await br.start() 

552 

553 mock(await subscriber.get_one(timeout=1e-24)) 

554 mock.assert_called_once_with(None) 

555 

556 async def test_get_one_pull( 

557 self, 

558 queue: str, 

559 stream: JStream, 

560 ) -> None: 

561 broker = self.get_broker(apply_types=True) 

562 subscriber = broker.subscriber( 

563 queue, 

564 stream=stream, 

565 pull_sub=PullSub(1), 

566 ) 

567 

568 async with self.patch_broker(broker) as br: 

569 await br.start() 

570 

571 message = None 

572 

573 async def consume() -> None: 

574 nonlocal message 

575 message = await subscriber.get_one(timeout=5) 

576 

577 async def publish() -> None: 

578 await br.publish("test_message", queue) 

579 

580 await asyncio.wait( 

581 ( 

582 asyncio.create_task(consume()), 

583 asyncio.create_task(publish()), 

584 ), 

585 timeout=10, 

586 ) 

587 

588 assert message is not None 

589 assert await message.decode() == "test_message" 

590 

591 async def test_get_one_pull_timeout( 

592 self, 

593 queue: str, 

594 stream: JStream, 

595 mock: MagicMock, 

596 ) -> None: 

597 broker = self.get_broker(apply_types=True) 

598 subscriber = broker.subscriber( 

599 queue, 

600 stream=stream, 

601 pull_sub=PullSub(1), 

602 ) 

603 

604 async with self.patch_broker(broker) as br: 

605 await br.start() 

606 

607 mock(await subscriber.get_one(timeout=1e-24)) 

608 mock.assert_called_once_with(None) 

609 

610 async def test_get_one_batch( 

611 self, 

612 queue: str, 

613 stream: JStream, 

614 ) -> None: 

615 broker = self.get_broker(apply_types=True) 

616 subscriber = broker.subscriber( 

617 queue, 

618 stream=stream, 

619 pull_sub=PullSub(1, batch=True), 

620 ) 

621 

622 async with self.patch_broker(broker) as br: 

623 await br.start() 

624 

625 message = None 

626 

627 async def consume() -> None: 

628 nonlocal message 

629 message = await subscriber.get_one(timeout=5) 

630 

631 async def publish() -> None: 

632 await br.publish("test_message", queue) 

633 

634 await asyncio.wait( 

635 ( 

636 asyncio.create_task(consume()), 

637 asyncio.create_task(publish()), 

638 ), 

639 timeout=10, 

640 ) 

641 

642 assert message is not None 

643 assert await message.decode() == ["test_message"] 

644 

645 async def test_get_one_batch_timeout( 

646 self, 

647 queue: str, 

648 stream: JStream, 

649 mock: MagicMock, 

650 ) -> None: 

651 broker = self.get_broker(apply_types=True) 

652 subscriber = broker.subscriber( 

653 queue, 

654 stream=stream, 

655 pull_sub=PullSub(1, batch=True), 

656 ) 

657 

658 async with self.patch_broker(broker) as br: 

659 await br.start() 

660 

661 mock(await subscriber.get_one(timeout=1e-24)) 

662 mock.assert_called_once_with(None) 

663 

664 async def test_get_one_with_filter( 

665 self, 

666 queue: str, 

667 stream: JStream, 

668 ) -> None: 

669 broker = self.get_broker(apply_types=True) 

670 subscriber = broker.subscriber( 

671 config=ConsumerConfig(filter_subjects=[f"{queue}.a"]), 

672 stream=JStream(queue, subjects=[f"{queue}.*"]), 

673 ) 

674 

675 async with self.patch_broker(broker) as br: 

676 await br.start() 

677 

678 message = None 

679 

680 async def consume() -> None: 

681 nonlocal message 

682 message = await subscriber.get_one(timeout=5) 

683 

684 async def publish() -> None: 

685 await br.publish("test_message", f"{queue}.a") 

686 

687 await asyncio.wait( 

688 ( 

689 asyncio.create_task(publish()), 

690 asyncio.create_task(consume()), 

691 ), 

692 timeout=10, 

693 ) 

694 

695 assert message is not None 

696 assert await message.decode() == "test_message" 

697 

698 async def test_get_one_kv( 

699 self, 

700 queue: str, 

701 stream: JStream, 

702 ) -> None: 

703 broker = self.get_broker(apply_types=True) 

704 subscriber = broker.subscriber(queue, kv_watch=queue + "1") 

705 

706 async with self.patch_broker(broker) as br: 

707 await br.start() 

708 bucket = await br.key_value(queue + "1") 

709 

710 message = None 

711 

712 async def consume() -> None: 

713 nonlocal message 

714 message = await subscriber.get_one(timeout=5) 

715 

716 async def publish() -> None: 

717 await bucket.put(queue, b"test_message") 

718 

719 await asyncio.wait( 

720 ( 

721 asyncio.create_task(consume()), 

722 asyncio.create_task(publish()), 

723 ), 

724 timeout=10, 

725 ) 

726 

727 assert message is not None 

728 assert await message.decode() == b"test_message" 

729 

730 async def test_get_one_kv_timeout( 

731 self, 

732 queue: str, 

733 stream: JStream, 

734 mock: MagicMock, 

735 ) -> None: 

736 broker = self.get_broker(apply_types=True) 

737 subscriber = broker.subscriber(queue, kv_watch=queue + "1") 

738 

739 async with self.patch_broker(broker) as br: 

740 await br.start() 

741 

742 mock(await subscriber.get_one(timeout=1e-24)) 

743 mock.assert_called_once_with(None) 

744 

745 async def test_get_one_os( 

746 self, 

747 queue: str, 

748 stream: JStream, 

749 ) -> None: 

750 broker = self.get_broker(apply_types=True) 

751 subscriber = broker.subscriber(queue, obj_watch=True) 

752 

753 async with self.patch_broker(broker) as br: 

754 await br.start() 

755 bucket = await br.object_storage(queue) 

756 

757 new_object_id = None 

758 

759 async def consume() -> None: 

760 nonlocal new_object_id 

761 new_object_event = await subscriber.get_one(timeout=5) 

762 new_object_id = await new_object_event.decode() 

763 

764 async def publish() -> None: 

765 await bucket.put(queue, b"test_message") 

766 

767 await asyncio.wait( 

768 ( 

769 asyncio.create_task(consume()), 

770 asyncio.create_task(publish()), 

771 ), 

772 timeout=10, 

773 ) 

774 

775 new_object = await bucket.get(new_object_id) 

776 assert new_object.data == b"test_message" 

777 

778 async def test_get_one_os_timeout( 

779 self, 

780 queue: str, 

781 stream: JStream, 

782 mock: MagicMock, 

783 ) -> None: 

784 broker = self.get_broker(apply_types=True) 

785 subscriber = broker.subscriber(queue, obj_watch=True) 

786 

787 async with self.patch_broker(broker) as br: 

788 await br.start() 

789 

790 mock(await subscriber.get_one(timeout=1e-24)) 

791 mock.assert_called_once_with(None) 

792 

793 async def test_iterator_js( 

794 self, 

795 queue: str, 

796 stream: JStream, 

797 ) -> None: 

798 expected_messages = ("test_message_1", "test_message_2") 

799 

800 broker = self.get_broker(apply_types=True) 

801 subscriber = broker.subscriber(queue, stream=stream) 

802 

803 async with self.patch_broker(broker) as br: 

804 await br.start() 

805 

806 async def publish_test_message(): 

807 for msg in expected_messages: 

808 await br.publish(msg, queue) 

809 

810 _ = await asyncio.create_task(publish_test_message()) 

811 

812 index_message = 0 

813 async for msg in subscriber: 813 ↛ exitline 813 didn't jump to the function exit

814 result_message = await msg.decode() 

815 

816 assert result_message == expected_messages[index_message] 

817 

818 index_message += 1 

819 if index_message >= len(expected_messages): 

820 break 

821 

822 async def test_iterator_pull( 

823 self, 

824 queue: str, 

825 stream: JStream, 

826 ) -> None: 

827 expected_messages = ("test_message_1", "test_message_2") 

828 

829 broker = self.get_broker(apply_types=True) 

830 subscriber = broker.subscriber( 

831 queue, 

832 stream=stream, 

833 pull_sub=PullSub(1), 

834 ) 

835 

836 async with self.patch_broker(broker) as br: 

837 await br.start() 

838 

839 async def publish_test_message(): 

840 for msg in expected_messages: 

841 await br.publish(msg, queue) 

842 

843 _ = await asyncio.create_task(publish_test_message()) 

844 

845 index_message = 0 

846 async for msg in subscriber: 846 ↛ exitline 846 didn't jump to the function exit

847 result_message = await msg.decode() 

848 

849 assert result_message == expected_messages[index_message] 

850 

851 index_message += 1 

852 if index_message >= len(expected_messages): 

853 break 

854 

855 async def test_iterator_batch( 

856 self, 

857 queue: str, 

858 stream: JStream, 

859 ) -> None: 

860 expected_messages = ("test_message_1", "test_message_2") 

861 

862 broker = self.get_broker(apply_types=True) 

863 subscriber = broker.subscriber( 

864 queue, 

865 stream=stream, 

866 pull_sub=PullSub(1, batch=True), 

867 ) 

868 

869 async with self.patch_broker(broker) as br: 

870 await br.start() 

871 

872 async def publish_test_message(): 

873 for msg in expected_messages: 

874 await br.publish(msg, queue) 

875 

876 _ = await asyncio.create_task(publish_test_message()) 

877 

878 index_message = 0 

879 async for msg in subscriber: 879 ↛ exitline 879 didn't jump to the function exit

880 result_message = await msg.decode() 

881 

882 assert result_message == [expected_messages[index_message]] 

883 

884 index_message += 1 

885 if index_message >= len(expected_messages): 

886 break 

887 

888 async def test_iterator_with_filter( 

889 self, 

890 queue: str, 

891 ) -> None: 

892 expected_messages = ("test_message_1", "test_message_2") 

893 

894 broker = self.get_broker(apply_types=True) 

895 subscriber = broker.subscriber( 

896 config=ConsumerConfig(filter_subjects=[f"{queue}.a"]), 

897 stream=JStream(queue, subjects=[f"{queue}.*"]), 

898 ) 

899 

900 async with self.patch_broker(broker) as br: 

901 await br.start() 

902 

903 async def publish_test_message(): 

904 for msg in expected_messages: 

905 await br.publish(msg, f"{queue}.a") 

906 

907 _ = await asyncio.create_task(publish_test_message()) 

908 

909 index_message = 0 

910 async for msg in subscriber: 910 ↛ exitline 910 didn't jump to the function exit

911 result_message = await msg.decode() 

912 

913 assert result_message == expected_messages[index_message] 

914 

915 index_message += 1 

916 if index_message >= len(expected_messages): 

917 break 

918 

919 async def test_iterator_kv( 

920 self, 

921 queue: str, 

922 ) -> None: 

923 expected_messages = (b"test_message_1", b"test_message_2") 

924 

925 broker = self.get_broker(apply_types=True) 

926 subscriber = broker.subscriber(queue, kv_watch=queue + "1") 

927 

928 async with self.patch_broker(broker) as br: 

929 await br.start() 

930 bucket = await br.key_value(queue + "1") 

931 

932 async def publish_test_message(): 

933 await bucket.put(queue, expected_messages[0]) 

934 

935 _ = await asyncio.create_task(publish_test_message()) 

936 

937 index_message = 0 

938 async for msg in subscriber: 938 ↛ exitline 938 didn't jump to the function exit

939 result_message = await msg.decode() 

940 

941 assert result_message == expected_messages[index_message] 

942 

943 index_message += 1 

944 if index_message >= len(expected_messages): 

945 break 

946 

947 await bucket.put(queue, expected_messages[index_message]) 

948 

949 async def test_iterator_os( 

950 self, 

951 queue: str, 

952 ) -> None: 

953 expected_messages = (b"test_message_1", b"test_message_2") 

954 

955 broker = self.get_broker(apply_types=True) 

956 subscriber = broker.subscriber(queue, obj_watch=True) 

957 

958 async with self.patch_broker(broker) as br: 

959 await br.start() 

960 bucket = await br.object_storage(queue) 

961 

962 async def publish_test_message(): 

963 await bucket.put(queue, expected_messages[0]) 

964 

965 _ = await asyncio.create_task(publish_test_message()) 

966 

967 index_message = 0 

968 async for new_object_event in subscriber: 968 ↛ exitline 968 didn't jump to the function exit

969 new_object_id = await new_object_event.decode() 

970 new_object = await bucket.get(new_object_id) 

971 

972 assert new_object.data == expected_messages[index_message] 

973 

974 index_message += 1 

975 if index_message >= len(expected_messages): 

976 break 

977 

978 await bucket.put(queue, expected_messages[index_message])