Coverage for tests / brokers / redis / test_consume.py: 99%
448 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import asyncio
2from unittest.mock import MagicMock, call, patch
4import pytest
5from redis.asyncio import Redis
7from faststream import AckPolicy
8from faststream.redis import (
9 ListSub,
10 PubSub,
11 RedisMessage,
12 RedisStreamMessage,
13 StreamSub,
14)
15from faststream.redis.exceptions import StreamGroupNotFoundError
16from tests.brokers.base.consume import BrokerRealConsumeTestcase
17from tests.tools import spy_decorator
19from .basic import RedisTestcaseConfig
22@pytest.mark.connected()
23@pytest.mark.redis()
24@pytest.mark.asyncio()
25class TestConsume(RedisTestcaseConfig, BrokerRealConsumeTestcase):
26 async def test_consume_native(
27 self,
28 mock: MagicMock,
29 queue: str,
30 ) -> None:
31 event = asyncio.Event()
33 consume_broker = self.get_broker()
35 @consume_broker.subscriber(queue)
36 async def handler(msg) -> None:
37 mock(msg)
38 event.set()
40 async with self.patch_broker(consume_broker) as br:
41 await br.start()
43 result = await br._connection.publish(queue, "hello")
44 await asyncio.wait(
45 (asyncio.create_task(event.wait()),),
46 timeout=3,
47 )
48 assert result == 1, result
50 mock.assert_called_once_with(b"hello")
52 async def test_pattern_with_path(
53 self,
54 mock: MagicMock,
55 ) -> None:
56 event = asyncio.Event()
58 consume_broker = self.get_broker()
60 @consume_broker.subscriber("test.{name}")
61 async def handler(msg) -> None:
62 mock(msg)
63 event.set()
65 async with self.patch_broker(consume_broker) as br:
66 await br.start()
68 await asyncio.wait(
69 (
70 asyncio.create_task(br.publish("hello", "test.name")),
71 asyncio.create_task(event.wait()),
72 ),
73 timeout=3,
74 )
76 mock.assert_called_once_with("hello")
78 async def test_pattern_without_path(
79 self,
80 mock: MagicMock,
81 ) -> None:
82 event = asyncio.Event()
84 consume_broker = self.get_broker()
86 @consume_broker.subscriber(PubSub("test.*", pattern=True))
87 async def handler(msg) -> None:
88 mock(msg)
89 event.set()
91 async with self.patch_broker(consume_broker) as br:
92 await br.start()
94 await asyncio.wait(
95 (
96 asyncio.create_task(br.publish("hello", "test.name")),
97 asyncio.create_task(event.wait()),
98 ),
99 timeout=3,
100 )
102 mock.assert_called_once_with("hello")
104 @pytest.mark.flaky(reruns=3, reruns_delay=1)
105 async def test_concurrent_consume_channel(
106 self,
107 queue: str,
108 mock: MagicMock,
109 ) -> None:
110 event = asyncio.Event()
111 event2 = asyncio.Event()
113 consume_broker = self.get_broker()
115 @consume_broker.subscriber(channel=PubSub(queue), max_workers=2)
116 async def handler(msg):
117 mock()
118 if event.is_set():
119 event2.set()
120 else:
121 event.set()
122 await asyncio.sleep(0.1)
124 async with self.patch_broker(consume_broker) as br:
125 await br.start()
127 for i in range(5):
128 await br.publish(i, queue)
130 await asyncio.wait(
131 (
132 asyncio.create_task(event.wait()),
133 asyncio.create_task(event2.wait()),
134 ),
135 timeout=3,
136 )
138 assert event.is_set()
139 assert event2.is_set()
140 assert mock.call_count == 2, mock.call_count
143@pytest.mark.connected()
144@pytest.mark.redis()
145@pytest.mark.asyncio()
146class TestConsumeList(RedisTestcaseConfig):
147 async def test_consume_list(
148 self,
149 queue: str,
150 mock: MagicMock,
151 ) -> None:
152 event = asyncio.Event()
154 consume_broker = self.get_broker()
156 @consume_broker.subscriber(list=queue)
157 async def handler(msg) -> None:
158 mock(msg)
159 event.set()
161 async with self.patch_broker(consume_broker) as br:
162 await br.start()
164 await asyncio.wait(
165 (
166 asyncio.create_task(br.publish("hello", list=queue)),
167 asyncio.create_task(event.wait()),
168 ),
169 timeout=3,
170 )
172 mock.assert_called_once_with("hello")
174 async def test_consume_list_native(
175 self,
176 queue: str,
177 mock: MagicMock,
178 ) -> None:
179 event = asyncio.Event()
181 consume_broker = self.get_broker()
183 @consume_broker.subscriber(list=queue)
184 async def handler(msg) -> None:
185 mock(msg)
186 event.set()
188 async with self.patch_broker(consume_broker) as br:
189 await br.start()
191 await asyncio.wait(
192 (
193 asyncio.create_task(br._connection.rpush(queue, "hello")),
194 asyncio.create_task(event.wait()),
195 ),
196 timeout=3,
197 )
199 mock.assert_called_once_with(b"hello")
201 @pytest.mark.slow()
202 async def test_consume_list_batch_with_one(
203 self,
204 queue: str,
205 mock: MagicMock,
206 ) -> None:
207 event = asyncio.Event()
209 consume_broker = self.get_broker()
211 @consume_broker.subscriber(
212 list=ListSub(queue, batch=True, polling_interval=0.01),
213 )
214 async def handler(msg) -> None:
215 mock(msg)
216 event.set()
218 async with self.patch_broker(consume_broker) as br:
219 await br.start()
220 await asyncio.wait(
221 (
222 asyncio.create_task(br.publish("hi", list=queue)),
223 asyncio.create_task(event.wait()),
224 ),
225 timeout=3,
226 )
228 assert event.is_set()
229 mock.assert_called_once_with(["hi"])
231 @pytest.mark.slow()
232 async def test_consume_list_batch_headers(
233 self,
234 queue: str,
235 mock: MagicMock,
236 ) -> None:
237 event = asyncio.Event()
239 consume_broker = self.get_broker(apply_types=True)
241 @consume_broker.subscriber(
242 list=ListSub(queue, batch=True, polling_interval=0.01),
243 )
244 def subscriber(m, msg: RedisMessage) -> None:
245 check = all(
246 (
247 msg.headers,
248 msg.headers["correlation_id"]
249 == msg.batch_headers[0]["correlation_id"],
250 msg.headers.get("custom") == "1",
251 ),
252 )
253 mock(check)
254 event.set()
256 async with self.patch_broker(consume_broker) as br:
257 await br.start()
258 await asyncio.wait(
259 (
260 asyncio.create_task(
261 br.publish("", list=queue, headers={"custom": "1"}),
262 ),
263 asyncio.create_task(event.wait()),
264 ),
265 timeout=3,
266 )
268 assert event.is_set()
269 mock.assert_called_once_with(True)
271 @pytest.mark.slow()
272 async def test_consume_list_batch(
273 self,
274 queue: str,
275 ) -> None:
276 consume_broker = self.get_broker(apply_types=True)
278 msgs_queue = asyncio.Queue(maxsize=1)
280 @consume_broker.subscriber(
281 list=ListSub(queue, batch=True, polling_interval=0.01),
282 )
283 async def handler(msg) -> None:
284 await msgs_queue.put(msg)
286 async with self.patch_broker(consume_broker) as br:
287 await br.start()
289 await br.publish_batch(1, "hi", list=queue)
291 result, _ = await asyncio.wait(
292 (asyncio.create_task(msgs_queue.get()),),
293 timeout=3,
294 )
296 assert [{1, "hi"}] == [set(r.result()) for r in result]
298 @pytest.mark.slow()
299 async def test_consume_list_batch_complex(
300 self,
301 queue: str,
302 ) -> None:
303 consume_broker = self.get_broker(apply_types=True)
305 from pydantic import BaseModel
307 class Data(BaseModel):
308 m: str
310 def __hash__(self):
311 return hash(self.m)
313 msgs_queue = asyncio.Queue(maxsize=1)
315 @consume_broker.subscriber(
316 list=ListSub(queue, batch=True, polling_interval=0.01),
317 )
318 async def handler(msg: list[Data]) -> None:
319 await msgs_queue.put(msg)
321 async with self.patch_broker(consume_broker) as br:
322 await br.start()
324 await br.publish_batch(Data(m="hi"), Data(m="again"), list=queue)
326 result, _ = await asyncio.wait(
327 (asyncio.create_task(msgs_queue.get()),),
328 timeout=3,
329 )
331 assert [{Data(m="hi"), Data(m="again")}] == [set(r.result()) for r in result]
333 @pytest.mark.slow()
334 async def test_consume_list_batch_native(
335 self,
336 queue: str,
337 ) -> None:
338 consume_broker = self.get_broker()
340 msgs_queue = asyncio.Queue(maxsize=1)
342 @consume_broker.subscriber(
343 list=ListSub(queue, batch=True, polling_interval=0.01),
344 )
345 async def handler(msg) -> None:
346 await msgs_queue.put(msg)
348 async with self.patch_broker(consume_broker) as br:
349 await br.start()
351 await br._connection.rpush(queue, 1, "hi")
353 result, _ = await asyncio.wait(
354 (asyncio.create_task(msgs_queue.get()),),
355 timeout=3,
356 )
358 assert [{1, "hi"}] == [set(r.result()) for r in result]
360 async def test_get_one(
361 self,
362 queue: str,
363 ) -> None:
364 broker = self.get_broker(apply_types=True)
365 subscriber = broker.subscriber(list=queue)
367 async with self.patch_broker(broker) as br:
368 await br.start()
370 message = None
372 async def consume() -> None:
373 nonlocal message
374 message = await subscriber.get_one(timeout=5)
376 async def publish() -> None:
377 await br.publish("test_message", list=queue)
379 await asyncio.wait(
380 (
381 asyncio.create_task(consume()),
382 asyncio.create_task(publish()),
383 ),
384 timeout=10,
385 )
387 assert message is not None
388 assert await message.decode() == "test_message"
390 async def test_get_one_timeout(
391 self,
392 queue: str,
393 mock: MagicMock,
394 ) -> None:
395 broker = self.get_broker(apply_types=True)
396 subscriber = broker.subscriber(list=queue)
398 async with self.patch_broker(broker) as br:
399 await br.start()
401 mock(await subscriber.get_one(timeout=1e-24))
402 mock.assert_called_once_with(None)
404 async def test_concurrent_consume_list(
405 self,
406 queue: str,
407 mock: MagicMock,
408 ) -> None:
409 event = asyncio.Event()
410 event2 = asyncio.Event()
412 consume_broker = self.get_broker()
414 @consume_broker.subscriber(list=ListSub(queue), max_workers=2)
415 async def handler(msg):
416 mock()
417 if event.is_set():
418 event2.set()
419 else:
420 event.set()
421 await asyncio.sleep(0.1)
423 async with self.patch_broker(consume_broker) as br:
424 await br.start()
426 for i in range(5):
427 await br.publish(i, list=queue)
429 await asyncio.wait(
430 (
431 asyncio.create_task(event.wait()),
432 asyncio.create_task(event2.wait()),
433 ),
434 timeout=3,
435 )
437 assert event.is_set()
438 assert event2.is_set()
439 assert mock.call_count == 2, mock.call_count
441 async def test_iterator(
442 self,
443 queue: str,
444 ) -> None:
445 expected_messages = ("test_message_1", "test_message_2")
447 broker = self.get_broker(apply_types=True)
448 subscriber = broker.subscriber(list=queue)
450 async with self.patch_broker(broker) as br:
451 await br.start()
453 async def publish_test_message():
454 for msg in expected_messages:
455 await br.publish(msg, list=queue)
457 _ = await asyncio.create_task(publish_test_message())
459 index_message = 0
460 async for msg in subscriber: 460 ↛ exitline 460 didn't jump to the function exit
461 result_message = await msg.decode()
463 assert result_message == expected_messages[index_message]
465 index_message += 1
466 if index_message >= len(expected_messages):
467 break
470@pytest.mark.connected()
471@pytest.mark.redis()
472@pytest.mark.asyncio()
473class TestConsumeStream(RedisTestcaseConfig):
474 @pytest.mark.slow()
475 async def test_consume_stream(
476 self,
477 mock: MagicMock,
478 queue: str,
479 ) -> None:
480 event = asyncio.Event()
482 consume_broker = self.get_broker()
484 @consume_broker.subscriber(stream=StreamSub(queue, polling_interval=10))
485 async def handler(msg) -> None:
486 mock(msg)
487 event.set()
489 async with self.patch_broker(consume_broker) as br:
490 await br.start()
492 await asyncio.wait(
493 (
494 asyncio.create_task(br.publish("hello", stream=queue)),
495 asyncio.create_task(event.wait()),
496 ),
497 timeout=3,
498 )
500 mock.assert_called_once_with("hello")
502 @pytest.mark.slow()
503 async def test_consume_stream_with_big_interval(
504 self,
505 event: asyncio.Event,
506 mock: MagicMock,
507 queue: str,
508 ) -> None:
509 consume_broker = self.get_broker()
511 @consume_broker.subscriber(stream=StreamSub(queue, polling_interval=100000))
512 async def handler(msg):
513 mock(msg)
514 event.set()
516 async with self.patch_broker(consume_broker) as br:
517 await br.start()
518 await asyncio.wait(
519 (
520 asyncio.create_task(br.publish("hello", stream=queue)),
521 asyncio.create_task(event.wait()),
522 ),
523 timeout=3,
524 )
526 mock.assert_called_once_with("hello")
528 @pytest.mark.slow()
529 async def test_consume_stream_native(
530 self,
531 mock: MagicMock,
532 queue: str,
533 ) -> None:
534 event = asyncio.Event()
536 consume_broker = self.get_broker()
538 @consume_broker.subscriber(stream=StreamSub(queue, polling_interval=10))
539 async def handler(msg) -> None:
540 mock(msg)
541 event.set()
543 async with self.patch_broker(consume_broker) as br:
544 await br.start()
546 await asyncio.wait(
547 (
548 asyncio.create_task(
549 br._connection.xadd(queue, {"message": "hello"}),
550 ),
551 asyncio.create_task(event.wait()),
552 ),
553 timeout=3,
554 )
556 mock.assert_called_once_with({"message": "hello"})
558 @pytest.mark.slow()
559 @pytest.mark.flaky(reruns=3, reruns_delay=1)
560 async def test_consume_stream_batch(
561 self,
562 mock: MagicMock,
563 queue: str,
564 ) -> None:
565 event = asyncio.Event()
567 consume_broker = self.get_broker()
569 @consume_broker.subscriber(
570 stream=StreamSub(queue, polling_interval=10, batch=True),
571 )
572 async def handler(msg) -> None:
573 mock(msg)
574 event.set()
576 async with self.patch_broker(consume_broker) as br:
577 await br.start()
579 await asyncio.wait(
580 (
581 asyncio.create_task(br.publish("hello", stream=queue)),
582 asyncio.create_task(event.wait()),
583 ),
584 timeout=3,
585 )
587 mock.assert_called_once_with(["hello"])
589 @pytest.mark.slow()
590 async def test_consume_stream_batch_headers(
591 self,
592 queue: str,
593 mock: MagicMock,
594 ) -> None:
595 event = asyncio.Event()
597 consume_broker = self.get_broker(apply_types=True)
599 @consume_broker.subscriber(
600 stream=StreamSub(queue, polling_interval=10, batch=True),
601 )
602 def subscriber(m, msg: RedisMessage) -> None:
603 check = all(
604 (
605 msg.headers,
606 msg.headers["correlation_id"]
607 == msg.batch_headers[0]["correlation_id"],
608 msg.headers.get("custom") == "1",
609 ),
610 )
611 mock(check)
612 event.set()
614 async with self.patch_broker(consume_broker) as br:
615 await br.start()
616 await asyncio.wait(
617 (
618 asyncio.create_task(
619 br.publish("", stream=queue, headers={"custom": "1"}),
620 ),
621 asyncio.create_task(event.wait()),
622 ),
623 timeout=3,
624 )
626 assert event.is_set()
627 mock.assert_called_once_with(True)
629 @pytest.mark.slow()
630 async def test_consume_stream_batch_complex(
631 self,
632 queue: str,
633 ) -> None:
634 consume_broker = self.get_broker(apply_types=True)
636 from pydantic import BaseModel
638 class Data(BaseModel):
639 m: str
641 msgs_queue = asyncio.Queue(maxsize=1)
643 @consume_broker.subscriber(
644 stream=StreamSub(queue, polling_interval=10, batch=True),
645 )
646 async def handler(msg: list[Data]) -> None:
647 await msgs_queue.put(msg)
649 async with self.patch_broker(consume_broker) as br:
650 await br.start()
652 await br.publish(Data(m="hi"), stream=queue)
654 result, _ = await asyncio.wait(
655 (asyncio.create_task(msgs_queue.get()),),
656 timeout=3,
657 )
659 assert next(iter(result)).result() == [Data(m="hi")]
661 @pytest.mark.slow()
662 async def test_consume_stream_batch_native(
663 self,
664 mock: MagicMock,
665 queue: str,
666 ) -> None:
667 event = asyncio.Event()
669 consume_broker = self.get_broker()
671 @consume_broker.subscriber(
672 stream=StreamSub(queue, polling_interval=10, batch=True),
673 )
674 async def handler(msg) -> None:
675 mock(msg)
676 event.set()
678 async with self.patch_broker(consume_broker) as br:
679 await br.start()
681 await asyncio.wait(
682 (
683 asyncio.create_task(
684 br._connection.xadd(queue, {"message": "hello"}),
685 ),
686 asyncio.create_task(event.wait()),
687 ),
688 timeout=3,
689 )
691 mock.assert_called_once_with([{"message": "hello"}])
693 async def test_consume_group(
694 self,
695 queue: str,
696 ) -> None:
697 consume_broker = self.get_broker()
699 @consume_broker.subscriber(
700 stream=StreamSub(queue, group="group", consumer=queue),
701 )
702 async def handler(msg: RedisMessage) -> None: ...
704 assert next(iter(consume_broker.subscribers)).last_id == ">"
706 async def test_consume_group_with_last_id(
707 self,
708 queue: str,
709 ) -> None:
710 consume_broker = self.get_broker()
712 @consume_broker.subscriber(
713 stream=StreamSub(queue, group="group", consumer=queue, last_id="0"),
714 )
715 async def handler(msg: RedisMessage) -> None: ...
717 assert next(iter(consume_broker.subscribers)).last_id == "0"
719 async def test_consume_nack(
720 self,
721 queue: str,
722 ) -> None:
723 event = asyncio.Event()
725 consume_broker = self.get_broker(apply_types=True)
727 @consume_broker.subscriber(
728 stream=StreamSub(queue, group="group", consumer=queue),
729 )
730 async def handler(msg: RedisMessage) -> None:
731 event.set()
732 await msg.nack()
734 async with self.patch_broker(consume_broker) as br:
735 await br.start()
737 with patch.object(Redis, "xack", spy_decorator(Redis.xack)) as m:
738 await asyncio.wait(
739 (
740 asyncio.create_task(br.publish("hello", stream=queue)),
741 asyncio.create_task(event.wait()),
742 ),
743 timeout=3,
744 )
746 assert not m.mock.called
748 assert event.is_set()
750 async def test_consume_ack(
751 self,
752 queue: str,
753 ) -> None:
754 event = asyncio.Event()
756 consume_broker = self.get_broker(apply_types=True)
758 @consume_broker.subscriber(
759 stream=StreamSub(queue, group="group", consumer=queue),
760 )
761 async def handler(msg: RedisMessage) -> None:
762 event.set()
764 async with self.patch_broker(consume_broker) as br:
765 await br.start()
767 with patch.object(Redis, "xack", spy_decorator(Redis.xack)) as m:
768 await asyncio.wait(
769 (
770 asyncio.create_task(br.publish("hello", stream=queue)),
771 asyncio.create_task(event.wait()),
772 ),
773 timeout=3,
774 )
776 m.mock.assert_called_once()
778 assert event.is_set()
780 @pytest.mark.flaky(reruns=3, reruns_delay=1)
781 async def test_consume_and_delete_acked(
782 self,
783 queue: str,
784 event: asyncio.Event,
785 ) -> None:
786 consume_broker = self.get_broker(apply_types=True)
788 @consume_broker.subscriber(
789 stream=StreamSub(queue, group="group", consumer=queue),
790 )
791 async def handler(msg: RedisStreamMessage) -> None:
792 event.set()
793 await msg.delete(consume_broker._connection)
795 async with self.patch_broker(consume_broker) as br:
796 await br.start()
798 with patch.object(Redis, "xdel", spy_decorator(Redis.xdel)) as m:
799 await asyncio.wait(
800 (
801 asyncio.create_task(br.publish("hello", stream=queue)),
802 asyncio.create_task(event.wait()),
803 ),
804 timeout=3,
805 )
807 m.mock.assert_called_once()
809 queue_len = await br._connection.xlen(queue)
810 assert queue_len == 0, (
811 f"Redis stream must be empty here, found {queue_len} messages"
812 )
814 async def test_consume_and_delete_nacked(
815 self,
816 queue: str,
817 event: asyncio.Event,
818 ) -> None:
819 consume_broker = self.get_broker(apply_types=True)
821 @consume_broker.subscriber(
822 stream=StreamSub(queue, group="group", consumer=queue),
823 ack_policy=AckPolicy.MANUAL,
824 )
825 async def handler(msg: RedisStreamMessage) -> None:
826 assert not msg.committed
827 await msg.delete(consume_broker._connection)
828 event.set()
830 async with self.patch_broker(consume_broker) as br:
831 await br.start()
833 with patch.object(Redis, "xdel", spy_decorator(Redis.xdel)) as m:
834 await asyncio.wait(
835 (
836 asyncio.create_task(br.publish("hello", stream=queue)),
837 asyncio.create_task(event.wait()),
838 ),
839 timeout=3,
840 )
842 m.mock.assert_called_once()
844 queue_len = await br._connection.xlen(queue)
845 assert queue_len == 0, (
846 f"Redis stream must be empty here, found {queue_len} messages"
847 )
849 async def test_get_one(
850 self,
851 queue: str,
852 ) -> None:
853 broker = self.get_broker(apply_types=True)
854 subscriber = broker.subscriber(stream=queue)
856 async with self.patch_broker(broker) as br:
857 await br.start()
859 message = None
861 async def consume() -> None:
862 nonlocal message
863 message = await subscriber.get_one(timeout=3)
865 async def publish() -> None:
866 await asyncio.sleep(0.1)
867 await br.publish("test_message", stream=queue)
869 await asyncio.wait(
870 (
871 asyncio.create_task(consume()),
872 asyncio.create_task(publish()),
873 ),
874 timeout=10,
875 )
877 assert message is not None
878 assert await message.decode() == "test_message"
880 async def test_get_one_timeout(
881 self,
882 queue: str,
883 mock: MagicMock,
884 ) -> None:
885 broker = self.get_broker(apply_types=True)
886 subscriber = broker.subscriber(stream=queue)
888 async with self.patch_broker(broker) as br:
889 await br.start()
891 mock(await subscriber.get_one(timeout=1e-24))
892 mock.assert_called_once_with(None)
894 @pytest.mark.flaky(reruns=3, reruns_delay=1)
895 async def test_concurrent_consume_stream(
896 self,
897 queue: str,
898 mock: MagicMock,
899 ) -> None:
900 event = asyncio.Event()
901 event2 = asyncio.Event()
903 consume_broker = self.get_broker()
905 @consume_broker.subscriber(stream=StreamSub(queue), max_workers=2)
906 async def handler(msg: RedisStreamMessage) -> None:
907 mock()
908 if event.is_set():
909 event2.set()
910 else:
911 event.set()
912 await asyncio.sleep(0.1)
914 async with self.patch_broker(consume_broker) as br:
915 await br.start()
917 for i in range(5):
918 await br.publish(i, stream=queue)
920 await asyncio.wait(
921 (
922 asyncio.create_task(event.wait()),
923 asyncio.create_task(event2.wait()),
924 ),
925 timeout=3,
926 )
928 assert mock.call_count == 2, mock.call_count
930 async def test_iterator(
931 self,
932 queue: str,
933 mock: MagicMock,
934 ) -> None:
935 expected_messages = ("test_message_1", "test_message_2")
937 broker = self.get_broker(apply_types=True)
938 subscriber = broker.subscriber(stream=queue)
940 async with self.patch_broker(broker) as br:
941 await br.start()
943 async def publish_test_message() -> None:
944 await asyncio.sleep(0.1)
945 for msg in expected_messages:
946 await br.publish(msg, stream=queue)
948 async def consume() -> None:
949 index_message = 0
950 async for msg in subscriber: 950 ↛ exitline 950 didn't return from function 'consume' because the loop on line 950 didn't complete
951 result_message = await msg.decode()
953 mock(result_message)
955 index_message += 1
956 if index_message >= len(expected_messages):
957 break
959 await asyncio.wait(
960 (
961 asyncio.create_task(consume()),
962 asyncio.create_task(publish_test_message()),
963 ),
964 timeout=self.timeout,
965 )
967 calls = [call(msg) for msg in expected_messages]
968 mock.assert_has_calls(calls=calls)
970 @pytest.mark.slow()
971 async def test_consume_stream_group_deleted(
972 self,
973 queue: str,
974 mock: MagicMock,
975 event: asyncio.Event,
976 ) -> None:
977 """Subscriber stops when the consumer group is deleted (NOGROUP)."""
978 consume_broker = self.get_broker(apply_types=True)
980 @consume_broker.subscriber(
981 stream=StreamSub(queue, group="test_group", consumer="consumer1"),
982 )
983 async def handler(msg: RedisMessage) -> None:
984 mock(msg)
985 event.set()
987 async with self.patch_broker(consume_broker) as br:
988 await br.start()
990 # Publish a message so the subscriber reads and starts consuming
991 await br.publish("hello", stream=queue)
992 await asyncio.wait_for(event.wait(), timeout=3)
993 assert mock.call_count >= 1
995 # Delete the stream — this removes the consumer group too
996 await br._connection.delete(queue)
998 # Give the subscriber time to try reading and hit NOGROUP
999 await asyncio.sleep(0.5)
1001 # Publish another message — subscriber should NOT receive it
1002 # because it already stopped due to NOGROUP
1003 event.clear()
1004 await br.publish("world", stream=queue)
1005 with pytest.raises(asyncio.TimeoutError):
1006 await asyncio.wait_for(event.wait(), timeout=1)
1008 # The subscriber task should have finished with StreamGroupNotFoundError
1009 tasks = br.subscribers[0].tasks
1010 assert all(t.done() for t in tasks)
1011 found = False
1012 for t in tasks: 1012 ↛ 1020line 1012 didn't jump to line 1020 because the loop on line 1012 didn't complete
1013 try:
1014 exc = t.exception()
1015 if isinstance(exc, StreamGroupNotFoundError): 1015 ↛ 1012line 1015 didn't jump to line 1012 because the condition on line 1015 was always true
1016 found = True
1017 break
1018 except (asyncio.CancelledError, asyncio.InvalidStateError):
1019 pass
1020 assert found, "Expected at least one task to raise StreamGroupNotFoundError"