Coverage for tests / brokers / redis / test_consume.py: 99%

448 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import asyncio 

2from unittest.mock import MagicMock, call, patch 

3 

4import pytest 

5from redis.asyncio import Redis 

6 

7from faststream import AckPolicy 

8from faststream.redis import ( 

9 ListSub, 

10 PubSub, 

11 RedisMessage, 

12 RedisStreamMessage, 

13 StreamSub, 

14) 

15from faststream.redis.exceptions import StreamGroupNotFoundError 

16from tests.brokers.base.consume import BrokerRealConsumeTestcase 

17from tests.tools import spy_decorator 

18 

19from .basic import RedisTestcaseConfig 

20 

21 

22@pytest.mark.connected() 

23@pytest.mark.redis() 

24@pytest.mark.asyncio() 

25class TestConsume(RedisTestcaseConfig, BrokerRealConsumeTestcase): 

26 async def test_consume_native( 

27 self, 

28 mock: MagicMock, 

29 queue: str, 

30 ) -> None: 

31 event = asyncio.Event() 

32 

33 consume_broker = self.get_broker() 

34 

35 @consume_broker.subscriber(queue) 

36 async def handler(msg) -> None: 

37 mock(msg) 

38 event.set() 

39 

40 async with self.patch_broker(consume_broker) as br: 

41 await br.start() 

42 

43 result = await br._connection.publish(queue, "hello") 

44 await asyncio.wait( 

45 (asyncio.create_task(event.wait()),), 

46 timeout=3, 

47 ) 

48 assert result == 1, result 

49 

50 mock.assert_called_once_with(b"hello") 

51 

52 async def test_pattern_with_path( 

53 self, 

54 mock: MagicMock, 

55 ) -> None: 

56 event = asyncio.Event() 

57 

58 consume_broker = self.get_broker() 

59 

60 @consume_broker.subscriber("test.{name}") 

61 async def handler(msg) -> None: 

62 mock(msg) 

63 event.set() 

64 

65 async with self.patch_broker(consume_broker) as br: 

66 await br.start() 

67 

68 await asyncio.wait( 

69 ( 

70 asyncio.create_task(br.publish("hello", "test.name")), 

71 asyncio.create_task(event.wait()), 

72 ), 

73 timeout=3, 

74 ) 

75 

76 mock.assert_called_once_with("hello") 

77 

78 async def test_pattern_without_path( 

79 self, 

80 mock: MagicMock, 

81 ) -> None: 

82 event = asyncio.Event() 

83 

84 consume_broker = self.get_broker() 

85 

86 @consume_broker.subscriber(PubSub("test.*", pattern=True)) 

87 async def handler(msg) -> None: 

88 mock(msg) 

89 event.set() 

90 

91 async with self.patch_broker(consume_broker) as br: 

92 await br.start() 

93 

94 await asyncio.wait( 

95 ( 

96 asyncio.create_task(br.publish("hello", "test.name")), 

97 asyncio.create_task(event.wait()), 

98 ), 

99 timeout=3, 

100 ) 

101 

102 mock.assert_called_once_with("hello") 

103 

104 @pytest.mark.flaky(reruns=3, reruns_delay=1) 

105 async def test_concurrent_consume_channel( 

106 self, 

107 queue: str, 

108 mock: MagicMock, 

109 ) -> None: 

110 event = asyncio.Event() 

111 event2 = asyncio.Event() 

112 

113 consume_broker = self.get_broker() 

114 

115 @consume_broker.subscriber(channel=PubSub(queue), max_workers=2) 

116 async def handler(msg): 

117 mock() 

118 if event.is_set(): 

119 event2.set() 

120 else: 

121 event.set() 

122 await asyncio.sleep(0.1) 

123 

124 async with self.patch_broker(consume_broker) as br: 

125 await br.start() 

126 

127 for i in range(5): 

128 await br.publish(i, queue) 

129 

130 await asyncio.wait( 

131 ( 

132 asyncio.create_task(event.wait()), 

133 asyncio.create_task(event2.wait()), 

134 ), 

135 timeout=3, 

136 ) 

137 

138 assert event.is_set() 

139 assert event2.is_set() 

140 assert mock.call_count == 2, mock.call_count 

141 

142 

143@pytest.mark.connected() 

144@pytest.mark.redis() 

145@pytest.mark.asyncio() 

146class TestConsumeList(RedisTestcaseConfig): 

147 async def test_consume_list( 

148 self, 

149 queue: str, 

150 mock: MagicMock, 

151 ) -> None: 

152 event = asyncio.Event() 

153 

154 consume_broker = self.get_broker() 

155 

156 @consume_broker.subscriber(list=queue) 

157 async def handler(msg) -> None: 

158 mock(msg) 

159 event.set() 

160 

161 async with self.patch_broker(consume_broker) as br: 

162 await br.start() 

163 

164 await asyncio.wait( 

165 ( 

166 asyncio.create_task(br.publish("hello", list=queue)), 

167 asyncio.create_task(event.wait()), 

168 ), 

169 timeout=3, 

170 ) 

171 

172 mock.assert_called_once_with("hello") 

173 

174 async def test_consume_list_native( 

175 self, 

176 queue: str, 

177 mock: MagicMock, 

178 ) -> None: 

179 event = asyncio.Event() 

180 

181 consume_broker = self.get_broker() 

182 

183 @consume_broker.subscriber(list=queue) 

184 async def handler(msg) -> None: 

185 mock(msg) 

186 event.set() 

187 

188 async with self.patch_broker(consume_broker) as br: 

189 await br.start() 

190 

191 await asyncio.wait( 

192 ( 

193 asyncio.create_task(br._connection.rpush(queue, "hello")), 

194 asyncio.create_task(event.wait()), 

195 ), 

196 timeout=3, 

197 ) 

198 

199 mock.assert_called_once_with(b"hello") 

200 

201 @pytest.mark.slow() 

202 async def test_consume_list_batch_with_one( 

203 self, 

204 queue: str, 

205 mock: MagicMock, 

206 ) -> None: 

207 event = asyncio.Event() 

208 

209 consume_broker = self.get_broker() 

210 

211 @consume_broker.subscriber( 

212 list=ListSub(queue, batch=True, polling_interval=0.01), 

213 ) 

214 async def handler(msg) -> None: 

215 mock(msg) 

216 event.set() 

217 

218 async with self.patch_broker(consume_broker) as br: 

219 await br.start() 

220 await asyncio.wait( 

221 ( 

222 asyncio.create_task(br.publish("hi", list=queue)), 

223 asyncio.create_task(event.wait()), 

224 ), 

225 timeout=3, 

226 ) 

227 

228 assert event.is_set() 

229 mock.assert_called_once_with(["hi"]) 

230 

231 @pytest.mark.slow() 

232 async def test_consume_list_batch_headers( 

233 self, 

234 queue: str, 

235 mock: MagicMock, 

236 ) -> None: 

237 event = asyncio.Event() 

238 

239 consume_broker = self.get_broker(apply_types=True) 

240 

241 @consume_broker.subscriber( 

242 list=ListSub(queue, batch=True, polling_interval=0.01), 

243 ) 

244 def subscriber(m, msg: RedisMessage) -> None: 

245 check = all( 

246 ( 

247 msg.headers, 

248 msg.headers["correlation_id"] 

249 == msg.batch_headers[0]["correlation_id"], 

250 msg.headers.get("custom") == "1", 

251 ), 

252 ) 

253 mock(check) 

254 event.set() 

255 

256 async with self.patch_broker(consume_broker) as br: 

257 await br.start() 

258 await asyncio.wait( 

259 ( 

260 asyncio.create_task( 

261 br.publish("", list=queue, headers={"custom": "1"}), 

262 ), 

263 asyncio.create_task(event.wait()), 

264 ), 

265 timeout=3, 

266 ) 

267 

268 assert event.is_set() 

269 mock.assert_called_once_with(True) 

270 

271 @pytest.mark.slow() 

272 async def test_consume_list_batch( 

273 self, 

274 queue: str, 

275 ) -> None: 

276 consume_broker = self.get_broker(apply_types=True) 

277 

278 msgs_queue = asyncio.Queue(maxsize=1) 

279 

280 @consume_broker.subscriber( 

281 list=ListSub(queue, batch=True, polling_interval=0.01), 

282 ) 

283 async def handler(msg) -> None: 

284 await msgs_queue.put(msg) 

285 

286 async with self.patch_broker(consume_broker) as br: 

287 await br.start() 

288 

289 await br.publish_batch(1, "hi", list=queue) 

290 

291 result, _ = await asyncio.wait( 

292 (asyncio.create_task(msgs_queue.get()),), 

293 timeout=3, 

294 ) 

295 

296 assert [{1, "hi"}] == [set(r.result()) for r in result] 

297 

298 @pytest.mark.slow() 

299 async def test_consume_list_batch_complex( 

300 self, 

301 queue: str, 

302 ) -> None: 

303 consume_broker = self.get_broker(apply_types=True) 

304 

305 from pydantic import BaseModel 

306 

307 class Data(BaseModel): 

308 m: str 

309 

310 def __hash__(self): 

311 return hash(self.m) 

312 

313 msgs_queue = asyncio.Queue(maxsize=1) 

314 

315 @consume_broker.subscriber( 

316 list=ListSub(queue, batch=True, polling_interval=0.01), 

317 ) 

318 async def handler(msg: list[Data]) -> None: 

319 await msgs_queue.put(msg) 

320 

321 async with self.patch_broker(consume_broker) as br: 

322 await br.start() 

323 

324 await br.publish_batch(Data(m="hi"), Data(m="again"), list=queue) 

325 

326 result, _ = await asyncio.wait( 

327 (asyncio.create_task(msgs_queue.get()),), 

328 timeout=3, 

329 ) 

330 

331 assert [{Data(m="hi"), Data(m="again")}] == [set(r.result()) for r in result] 

332 

333 @pytest.mark.slow() 

334 async def test_consume_list_batch_native( 

335 self, 

336 queue: str, 

337 ) -> None: 

338 consume_broker = self.get_broker() 

339 

340 msgs_queue = asyncio.Queue(maxsize=1) 

341 

342 @consume_broker.subscriber( 

343 list=ListSub(queue, batch=True, polling_interval=0.01), 

344 ) 

345 async def handler(msg) -> None: 

346 await msgs_queue.put(msg) 

347 

348 async with self.patch_broker(consume_broker) as br: 

349 await br.start() 

350 

351 await br._connection.rpush(queue, 1, "hi") 

352 

353 result, _ = await asyncio.wait( 

354 (asyncio.create_task(msgs_queue.get()),), 

355 timeout=3, 

356 ) 

357 

358 assert [{1, "hi"}] == [set(r.result()) for r in result] 

359 

360 async def test_get_one( 

361 self, 

362 queue: str, 

363 ) -> None: 

364 broker = self.get_broker(apply_types=True) 

365 subscriber = broker.subscriber(list=queue) 

366 

367 async with self.patch_broker(broker) as br: 

368 await br.start() 

369 

370 message = None 

371 

372 async def consume() -> None: 

373 nonlocal message 

374 message = await subscriber.get_one(timeout=5) 

375 

376 async def publish() -> None: 

377 await br.publish("test_message", list=queue) 

378 

379 await asyncio.wait( 

380 ( 

381 asyncio.create_task(consume()), 

382 asyncio.create_task(publish()), 

383 ), 

384 timeout=10, 

385 ) 

386 

387 assert message is not None 

388 assert await message.decode() == "test_message" 

389 

390 async def test_get_one_timeout( 

391 self, 

392 queue: str, 

393 mock: MagicMock, 

394 ) -> None: 

395 broker = self.get_broker(apply_types=True) 

396 subscriber = broker.subscriber(list=queue) 

397 

398 async with self.patch_broker(broker) as br: 

399 await br.start() 

400 

401 mock(await subscriber.get_one(timeout=1e-24)) 

402 mock.assert_called_once_with(None) 

403 

404 async def test_concurrent_consume_list( 

405 self, 

406 queue: str, 

407 mock: MagicMock, 

408 ) -> None: 

409 event = asyncio.Event() 

410 event2 = asyncio.Event() 

411 

412 consume_broker = self.get_broker() 

413 

414 @consume_broker.subscriber(list=ListSub(queue), max_workers=2) 

415 async def handler(msg): 

416 mock() 

417 if event.is_set(): 

418 event2.set() 

419 else: 

420 event.set() 

421 await asyncio.sleep(0.1) 

422 

423 async with self.patch_broker(consume_broker) as br: 

424 await br.start() 

425 

426 for i in range(5): 

427 await br.publish(i, list=queue) 

428 

429 await asyncio.wait( 

430 ( 

431 asyncio.create_task(event.wait()), 

432 asyncio.create_task(event2.wait()), 

433 ), 

434 timeout=3, 

435 ) 

436 

437 assert event.is_set() 

438 assert event2.is_set() 

439 assert mock.call_count == 2, mock.call_count 

440 

441 async def test_iterator( 

442 self, 

443 queue: str, 

444 ) -> None: 

445 expected_messages = ("test_message_1", "test_message_2") 

446 

447 broker = self.get_broker(apply_types=True) 

448 subscriber = broker.subscriber(list=queue) 

449 

450 async with self.patch_broker(broker) as br: 

451 await br.start() 

452 

453 async def publish_test_message(): 

454 for msg in expected_messages: 

455 await br.publish(msg, list=queue) 

456 

457 _ = await asyncio.create_task(publish_test_message()) 

458 

459 index_message = 0 

460 async for msg in subscriber: 460 ↛ exitline 460 didn't jump to the function exit

461 result_message = await msg.decode() 

462 

463 assert result_message == expected_messages[index_message] 

464 

465 index_message += 1 

466 if index_message >= len(expected_messages): 

467 break 

468 

469 

470@pytest.mark.connected() 

471@pytest.mark.redis() 

472@pytest.mark.asyncio() 

473class TestConsumeStream(RedisTestcaseConfig): 

474 @pytest.mark.slow() 

475 async def test_consume_stream( 

476 self, 

477 mock: MagicMock, 

478 queue: str, 

479 ) -> None: 

480 event = asyncio.Event() 

481 

482 consume_broker = self.get_broker() 

483 

484 @consume_broker.subscriber(stream=StreamSub(queue, polling_interval=10)) 

485 async def handler(msg) -> None: 

486 mock(msg) 

487 event.set() 

488 

489 async with self.patch_broker(consume_broker) as br: 

490 await br.start() 

491 

492 await asyncio.wait( 

493 ( 

494 asyncio.create_task(br.publish("hello", stream=queue)), 

495 asyncio.create_task(event.wait()), 

496 ), 

497 timeout=3, 

498 ) 

499 

500 mock.assert_called_once_with("hello") 

501 

502 @pytest.mark.slow() 

503 async def test_consume_stream_with_big_interval( 

504 self, 

505 event: asyncio.Event, 

506 mock: MagicMock, 

507 queue: str, 

508 ) -> None: 

509 consume_broker = self.get_broker() 

510 

511 @consume_broker.subscriber(stream=StreamSub(queue, polling_interval=100000)) 

512 async def handler(msg): 

513 mock(msg) 

514 event.set() 

515 

516 async with self.patch_broker(consume_broker) as br: 

517 await br.start() 

518 await asyncio.wait( 

519 ( 

520 asyncio.create_task(br.publish("hello", stream=queue)), 

521 asyncio.create_task(event.wait()), 

522 ), 

523 timeout=3, 

524 ) 

525 

526 mock.assert_called_once_with("hello") 

527 

528 @pytest.mark.slow() 

529 async def test_consume_stream_native( 

530 self, 

531 mock: MagicMock, 

532 queue: str, 

533 ) -> None: 

534 event = asyncio.Event() 

535 

536 consume_broker = self.get_broker() 

537 

538 @consume_broker.subscriber(stream=StreamSub(queue, polling_interval=10)) 

539 async def handler(msg) -> None: 

540 mock(msg) 

541 event.set() 

542 

543 async with self.patch_broker(consume_broker) as br: 

544 await br.start() 

545 

546 await asyncio.wait( 

547 ( 

548 asyncio.create_task( 

549 br._connection.xadd(queue, {"message": "hello"}), 

550 ), 

551 asyncio.create_task(event.wait()), 

552 ), 

553 timeout=3, 

554 ) 

555 

556 mock.assert_called_once_with({"message": "hello"}) 

557 

558 @pytest.mark.slow() 

559 @pytest.mark.flaky(reruns=3, reruns_delay=1) 

560 async def test_consume_stream_batch( 

561 self, 

562 mock: MagicMock, 

563 queue: str, 

564 ) -> None: 

565 event = asyncio.Event() 

566 

567 consume_broker = self.get_broker() 

568 

569 @consume_broker.subscriber( 

570 stream=StreamSub(queue, polling_interval=10, batch=True), 

571 ) 

572 async def handler(msg) -> None: 

573 mock(msg) 

574 event.set() 

575 

576 async with self.patch_broker(consume_broker) as br: 

577 await br.start() 

578 

579 await asyncio.wait( 

580 ( 

581 asyncio.create_task(br.publish("hello", stream=queue)), 

582 asyncio.create_task(event.wait()), 

583 ), 

584 timeout=3, 

585 ) 

586 

587 mock.assert_called_once_with(["hello"]) 

588 

589 @pytest.mark.slow() 

590 async def test_consume_stream_batch_headers( 

591 self, 

592 queue: str, 

593 mock: MagicMock, 

594 ) -> None: 

595 event = asyncio.Event() 

596 

597 consume_broker = self.get_broker(apply_types=True) 

598 

599 @consume_broker.subscriber( 

600 stream=StreamSub(queue, polling_interval=10, batch=True), 

601 ) 

602 def subscriber(m, msg: RedisMessage) -> None: 

603 check = all( 

604 ( 

605 msg.headers, 

606 msg.headers["correlation_id"] 

607 == msg.batch_headers[0]["correlation_id"], 

608 msg.headers.get("custom") == "1", 

609 ), 

610 ) 

611 mock(check) 

612 event.set() 

613 

614 async with self.patch_broker(consume_broker) as br: 

615 await br.start() 

616 await asyncio.wait( 

617 ( 

618 asyncio.create_task( 

619 br.publish("", stream=queue, headers={"custom": "1"}), 

620 ), 

621 asyncio.create_task(event.wait()), 

622 ), 

623 timeout=3, 

624 ) 

625 

626 assert event.is_set() 

627 mock.assert_called_once_with(True) 

628 

629 @pytest.mark.slow() 

630 async def test_consume_stream_batch_complex( 

631 self, 

632 queue: str, 

633 ) -> None: 

634 consume_broker = self.get_broker(apply_types=True) 

635 

636 from pydantic import BaseModel 

637 

638 class Data(BaseModel): 

639 m: str 

640 

641 msgs_queue = asyncio.Queue(maxsize=1) 

642 

643 @consume_broker.subscriber( 

644 stream=StreamSub(queue, polling_interval=10, batch=True), 

645 ) 

646 async def handler(msg: list[Data]) -> None: 

647 await msgs_queue.put(msg) 

648 

649 async with self.patch_broker(consume_broker) as br: 

650 await br.start() 

651 

652 await br.publish(Data(m="hi"), stream=queue) 

653 

654 result, _ = await asyncio.wait( 

655 (asyncio.create_task(msgs_queue.get()),), 

656 timeout=3, 

657 ) 

658 

659 assert next(iter(result)).result() == [Data(m="hi")] 

660 

661 @pytest.mark.slow() 

662 async def test_consume_stream_batch_native( 

663 self, 

664 mock: MagicMock, 

665 queue: str, 

666 ) -> None: 

667 event = asyncio.Event() 

668 

669 consume_broker = self.get_broker() 

670 

671 @consume_broker.subscriber( 

672 stream=StreamSub(queue, polling_interval=10, batch=True), 

673 ) 

674 async def handler(msg) -> None: 

675 mock(msg) 

676 event.set() 

677 

678 async with self.patch_broker(consume_broker) as br: 

679 await br.start() 

680 

681 await asyncio.wait( 

682 ( 

683 asyncio.create_task( 

684 br._connection.xadd(queue, {"message": "hello"}), 

685 ), 

686 asyncio.create_task(event.wait()), 

687 ), 

688 timeout=3, 

689 ) 

690 

691 mock.assert_called_once_with([{"message": "hello"}]) 

692 

693 async def test_consume_group( 

694 self, 

695 queue: str, 

696 ) -> None: 

697 consume_broker = self.get_broker() 

698 

699 @consume_broker.subscriber( 

700 stream=StreamSub(queue, group="group", consumer=queue), 

701 ) 

702 async def handler(msg: RedisMessage) -> None: ... 

703 

704 assert next(iter(consume_broker.subscribers)).last_id == ">" 

705 

706 async def test_consume_group_with_last_id( 

707 self, 

708 queue: str, 

709 ) -> None: 

710 consume_broker = self.get_broker() 

711 

712 @consume_broker.subscriber( 

713 stream=StreamSub(queue, group="group", consumer=queue, last_id="0"), 

714 ) 

715 async def handler(msg: RedisMessage) -> None: ... 

716 

717 assert next(iter(consume_broker.subscribers)).last_id == "0" 

718 

719 async def test_consume_nack( 

720 self, 

721 queue: str, 

722 ) -> None: 

723 event = asyncio.Event() 

724 

725 consume_broker = self.get_broker(apply_types=True) 

726 

727 @consume_broker.subscriber( 

728 stream=StreamSub(queue, group="group", consumer=queue), 

729 ) 

730 async def handler(msg: RedisMessage) -> None: 

731 event.set() 

732 await msg.nack() 

733 

734 async with self.patch_broker(consume_broker) as br: 

735 await br.start() 

736 

737 with patch.object(Redis, "xack", spy_decorator(Redis.xack)) as m: 

738 await asyncio.wait( 

739 ( 

740 asyncio.create_task(br.publish("hello", stream=queue)), 

741 asyncio.create_task(event.wait()), 

742 ), 

743 timeout=3, 

744 ) 

745 

746 assert not m.mock.called 

747 

748 assert event.is_set() 

749 

750 async def test_consume_ack( 

751 self, 

752 queue: str, 

753 ) -> None: 

754 event = asyncio.Event() 

755 

756 consume_broker = self.get_broker(apply_types=True) 

757 

758 @consume_broker.subscriber( 

759 stream=StreamSub(queue, group="group", consumer=queue), 

760 ) 

761 async def handler(msg: RedisMessage) -> None: 

762 event.set() 

763 

764 async with self.patch_broker(consume_broker) as br: 

765 await br.start() 

766 

767 with patch.object(Redis, "xack", spy_decorator(Redis.xack)) as m: 

768 await asyncio.wait( 

769 ( 

770 asyncio.create_task(br.publish("hello", stream=queue)), 

771 asyncio.create_task(event.wait()), 

772 ), 

773 timeout=3, 

774 ) 

775 

776 m.mock.assert_called_once() 

777 

778 assert event.is_set() 

779 

780 @pytest.mark.flaky(reruns=3, reruns_delay=1) 

781 async def test_consume_and_delete_acked( 

782 self, 

783 queue: str, 

784 event: asyncio.Event, 

785 ) -> None: 

786 consume_broker = self.get_broker(apply_types=True) 

787 

788 @consume_broker.subscriber( 

789 stream=StreamSub(queue, group="group", consumer=queue), 

790 ) 

791 async def handler(msg: RedisStreamMessage) -> None: 

792 event.set() 

793 await msg.delete(consume_broker._connection) 

794 

795 async with self.patch_broker(consume_broker) as br: 

796 await br.start() 

797 

798 with patch.object(Redis, "xdel", spy_decorator(Redis.xdel)) as m: 

799 await asyncio.wait( 

800 ( 

801 asyncio.create_task(br.publish("hello", stream=queue)), 

802 asyncio.create_task(event.wait()), 

803 ), 

804 timeout=3, 

805 ) 

806 

807 m.mock.assert_called_once() 

808 

809 queue_len = await br._connection.xlen(queue) 

810 assert queue_len == 0, ( 

811 f"Redis stream must be empty here, found {queue_len} messages" 

812 ) 

813 

814 async def test_consume_and_delete_nacked( 

815 self, 

816 queue: str, 

817 event: asyncio.Event, 

818 ) -> None: 

819 consume_broker = self.get_broker(apply_types=True) 

820 

821 @consume_broker.subscriber( 

822 stream=StreamSub(queue, group="group", consumer=queue), 

823 ack_policy=AckPolicy.MANUAL, 

824 ) 

825 async def handler(msg: RedisStreamMessage) -> None: 

826 assert not msg.committed 

827 await msg.delete(consume_broker._connection) 

828 event.set() 

829 

830 async with self.patch_broker(consume_broker) as br: 

831 await br.start() 

832 

833 with patch.object(Redis, "xdel", spy_decorator(Redis.xdel)) as m: 

834 await asyncio.wait( 

835 ( 

836 asyncio.create_task(br.publish("hello", stream=queue)), 

837 asyncio.create_task(event.wait()), 

838 ), 

839 timeout=3, 

840 ) 

841 

842 m.mock.assert_called_once() 

843 

844 queue_len = await br._connection.xlen(queue) 

845 assert queue_len == 0, ( 

846 f"Redis stream must be empty here, found {queue_len} messages" 

847 ) 

848 

849 async def test_get_one( 

850 self, 

851 queue: str, 

852 ) -> None: 

853 broker = self.get_broker(apply_types=True) 

854 subscriber = broker.subscriber(stream=queue) 

855 

856 async with self.patch_broker(broker) as br: 

857 await br.start() 

858 

859 message = None 

860 

861 async def consume() -> None: 

862 nonlocal message 

863 message = await subscriber.get_one(timeout=3) 

864 

865 async def publish() -> None: 

866 await asyncio.sleep(0.1) 

867 await br.publish("test_message", stream=queue) 

868 

869 await asyncio.wait( 

870 ( 

871 asyncio.create_task(consume()), 

872 asyncio.create_task(publish()), 

873 ), 

874 timeout=10, 

875 ) 

876 

877 assert message is not None 

878 assert await message.decode() == "test_message" 

879 

880 async def test_get_one_timeout( 

881 self, 

882 queue: str, 

883 mock: MagicMock, 

884 ) -> None: 

885 broker = self.get_broker(apply_types=True) 

886 subscriber = broker.subscriber(stream=queue) 

887 

888 async with self.patch_broker(broker) as br: 

889 await br.start() 

890 

891 mock(await subscriber.get_one(timeout=1e-24)) 

892 mock.assert_called_once_with(None) 

893 

894 @pytest.mark.flaky(reruns=3, reruns_delay=1) 

895 async def test_concurrent_consume_stream( 

896 self, 

897 queue: str, 

898 mock: MagicMock, 

899 ) -> None: 

900 event = asyncio.Event() 

901 event2 = asyncio.Event() 

902 

903 consume_broker = self.get_broker() 

904 

905 @consume_broker.subscriber(stream=StreamSub(queue), max_workers=2) 

906 async def handler(msg: RedisStreamMessage) -> None: 

907 mock() 

908 if event.is_set(): 

909 event2.set() 

910 else: 

911 event.set() 

912 await asyncio.sleep(0.1) 

913 

914 async with self.patch_broker(consume_broker) as br: 

915 await br.start() 

916 

917 for i in range(5): 

918 await br.publish(i, stream=queue) 

919 

920 await asyncio.wait( 

921 ( 

922 asyncio.create_task(event.wait()), 

923 asyncio.create_task(event2.wait()), 

924 ), 

925 timeout=3, 

926 ) 

927 

928 assert mock.call_count == 2, mock.call_count 

929 

930 async def test_iterator( 

931 self, 

932 queue: str, 

933 mock: MagicMock, 

934 ) -> None: 

935 expected_messages = ("test_message_1", "test_message_2") 

936 

937 broker = self.get_broker(apply_types=True) 

938 subscriber = broker.subscriber(stream=queue) 

939 

940 async with self.patch_broker(broker) as br: 

941 await br.start() 

942 

943 async def publish_test_message() -> None: 

944 await asyncio.sleep(0.1) 

945 for msg in expected_messages: 

946 await br.publish(msg, stream=queue) 

947 

948 async def consume() -> None: 

949 index_message = 0 

950 async for msg in subscriber: 950 ↛ exitline 950 didn't return from function 'consume' because the loop on line 950 didn't complete

951 result_message = await msg.decode() 

952 

953 mock(result_message) 

954 

955 index_message += 1 

956 if index_message >= len(expected_messages): 

957 break 

958 

959 await asyncio.wait( 

960 ( 

961 asyncio.create_task(consume()), 

962 asyncio.create_task(publish_test_message()), 

963 ), 

964 timeout=self.timeout, 

965 ) 

966 

967 calls = [call(msg) for msg in expected_messages] 

968 mock.assert_has_calls(calls=calls) 

969 

970 @pytest.mark.slow() 

971 async def test_consume_stream_group_deleted( 

972 self, 

973 queue: str, 

974 mock: MagicMock, 

975 event: asyncio.Event, 

976 ) -> None: 

977 """Subscriber stops when the consumer group is deleted (NOGROUP).""" 

978 consume_broker = self.get_broker(apply_types=True) 

979 

980 @consume_broker.subscriber( 

981 stream=StreamSub(queue, group="test_group", consumer="consumer1"), 

982 ) 

983 async def handler(msg: RedisMessage) -> None: 

984 mock(msg) 

985 event.set() 

986 

987 async with self.patch_broker(consume_broker) as br: 

988 await br.start() 

989 

990 # Publish a message so the subscriber reads and starts consuming 

991 await br.publish("hello", stream=queue) 

992 await asyncio.wait_for(event.wait(), timeout=3) 

993 assert mock.call_count >= 1 

994 

995 # Delete the stream — this removes the consumer group too 

996 await br._connection.delete(queue) 

997 

998 # Give the subscriber time to try reading and hit NOGROUP 

999 await asyncio.sleep(0.5) 

1000 

1001 # Publish another message — subscriber should NOT receive it 

1002 # because it already stopped due to NOGROUP 

1003 event.clear() 

1004 await br.publish("world", stream=queue) 

1005 with pytest.raises(asyncio.TimeoutError): 

1006 await asyncio.wait_for(event.wait(), timeout=1) 

1007 

1008 # The subscriber task should have finished with StreamGroupNotFoundError 

1009 tasks = br.subscribers[0].tasks 

1010 assert all(t.done() for t in tasks) 

1011 found = False 

1012 for t in tasks: 1012 ↛ 1020line 1012 didn't jump to line 1020 because the loop on line 1012 didn't complete

1013 try: 

1014 exc = t.exception() 

1015 if isinstance(exc, StreamGroupNotFoundError): 1015 ↛ 1012line 1015 didn't jump to line 1012 because the condition on line 1015 was always true

1016 found = True 

1017 break 

1018 except (asyncio.CancelledError, asyncio.InvalidStateError): 

1019 pass 

1020 assert found, "Expected at least one task to raise StreamGroupNotFoundError"