Coverage for tests / brokers / nats / test_consume.py: 99%
424 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import asyncio
2from unittest.mock import MagicMock, patch
4import pytest
5from nats.aio.msg import Msg
7from faststream import AckPolicy
8from faststream.exceptions import AckMessage
9from faststream.nats import ConsumerConfig, JStream, PubAck, PullSub
10from faststream.nats.annotations import NatsMessage
11from faststream.nats.message import NatsMessage as StreamMessage
12from tests.brokers.base.consume import BrokerRealConsumeTestcase
13from tests.tools import spy_decorator
15from .basic import NatsTestcaseConfig
18@pytest.mark.connected()
19@pytest.mark.nats()
20class TestConsume(NatsTestcaseConfig, BrokerRealConsumeTestcase):
21 async def test_concurrent_subscriber(
22 self,
23 queue: str,
24 mock: MagicMock,
25 ) -> None:
26 event = asyncio.Event()
27 event2 = asyncio.Event()
29 broker = self.get_broker()
31 args, kwargs = self.get_subscriber_params(queue, max_workers=2)
33 @broker.subscriber(*args, **kwargs)
34 async def handler(msg):
35 mock()
37 if event.is_set():
38 event2.set()
39 else:
40 event.set()
42 await asyncio.sleep(1.0)
44 async with self.patch_broker(broker) as br:
45 await br.start()
47 for i in range(5):
48 await br.publish(i, queue)
50 await asyncio.wait(
51 (
52 asyncio.create_task(event.wait()),
53 asyncio.create_task(event2.wait()),
54 ),
55 timeout=3,
56 )
58 assert event.is_set()
59 assert event2.is_set()
60 assert mock.call_count == 2, mock.call_count
62 async def test_consume_js(
63 self,
64 queue: str,
65 stream: JStream,
66 ) -> None:
67 event = asyncio.Event()
69 consume_broker = self.get_broker()
71 args, kwargs = self.get_subscriber_params(queue, stream=stream)
73 @consume_broker.subscriber(*args, **kwargs)
74 def subscriber(m) -> None:
75 event.set()
77 async with self.patch_broker(consume_broker) as br:
78 await br.start()
80 result = await br.publish("hello", queue, stream=stream.name)
82 await asyncio.wait(
83 (asyncio.create_task(event.wait()),),
84 timeout=3,
85 )
87 assert isinstance(result, PubAck), result
88 assert event.is_set()
90 async def test_consume_with_filter(
91 self,
92 queue: str,
93 mock: MagicMock,
94 ) -> None:
95 event = asyncio.Event()
97 consume_broker = self.get_broker()
99 @consume_broker.subscriber(
100 config=ConsumerConfig(filter_subjects=[f"{queue}.a"]),
101 stream=JStream(queue, subjects=[f"{queue}.*"]),
102 )
103 def subscriber(m) -> None:
104 mock(m)
105 event.set()
107 async with self.patch_broker(consume_broker) as br:
108 await br.start()
109 await asyncio.wait(
110 (
111 asyncio.create_task(br.publish(2, f"{queue}.a")),
112 asyncio.create_task(event.wait()),
113 ),
114 timeout=3,
115 )
117 assert event.is_set()
118 mock.assert_called_once_with(2)
120 async def test_consume_pull(
121 self,
122 queue: str,
123 stream: JStream,
124 mock,
125 ) -> None:
126 event = asyncio.Event()
128 consume_broker = self.get_broker()
130 @consume_broker.subscriber(
131 queue,
132 stream=stream,
133 pull_sub=PullSub(1),
134 )
135 def subscriber(m) -> None:
136 mock(m)
137 event.set()
139 async with self.patch_broker(consume_broker) as br:
140 await br.start()
142 await asyncio.wait(
143 (
144 asyncio.create_task(br.publish("hello", queue)),
145 asyncio.create_task(event.wait()),
146 ),
147 timeout=3,
148 )
150 assert event.is_set()
151 mock.assert_called_once_with("hello")
153 async def test_consume_batch(
154 self,
155 queue: str,
156 stream: JStream,
157 mock,
158 ) -> None:
159 event = asyncio.Event()
161 consume_broker = self.get_broker()
163 @consume_broker.subscriber(
164 queue,
165 stream=stream,
166 pull_sub=PullSub(1, batch=True),
167 )
168 def subscriber(m) -> None:
169 mock(m)
170 event.set()
172 async with self.patch_broker(consume_broker) as br:
173 await br.start()
175 await asyncio.wait(
176 (
177 asyncio.create_task(br.publish(b"hello", queue)),
178 asyncio.create_task(event.wait()),
179 ),
180 timeout=3,
181 )
183 assert event.is_set()
184 mock.assert_called_once_with([b"hello"])
186 async def test_core_consume_no_ack(
187 self,
188 queue: str,
189 mock: MagicMock,
190 ) -> None:
191 event = asyncio.Event()
193 consume_broker = self.get_broker(apply_types=True)
195 args, kwargs = self.get_subscriber_params(
196 queue,
197 ack_policy=AckPolicy.MANUAL,
198 )
200 @consume_broker.subscriber(*args, **kwargs)
201 async def handler(msg: NatsMessage) -> None:
202 mock(msg.raw_message._ackd)
203 event.set()
205 async with self.patch_broker(consume_broker) as br:
206 await br.start()
208 # Check, that Core Subscriber doesn't call Acknowledgement automatically
209 with patch.object(
210 StreamMessage,
211 "ack",
212 spy_decorator(StreamMessage.ack),
213 ) as m:
214 await asyncio.wait(
215 (
216 asyncio.create_task(br.publish("hello", queue)),
217 asyncio.create_task(event.wait()),
218 ),
219 timeout=3,
220 )
221 assert not m.mock.called
223 assert event.is_set()
224 mock.assert_called_once_with(True) # True was set by parser
226 async def test_consume_ack(
227 self,
228 queue: str,
229 stream: JStream,
230 ) -> None:
231 event = asyncio.Event()
233 consume_broker = self.get_broker(apply_types=True)
235 @consume_broker.subscriber(queue, stream=stream)
236 async def handler(msg: NatsMessage) -> None:
237 event.set()
239 async with self.patch_broker(consume_broker) as br:
240 await br.start()
242 with patch.object(Msg, "ack", spy_decorator(Msg.ack)) as m:
243 await asyncio.wait(
244 (
245 asyncio.create_task(br.publish("hello", queue)),
246 asyncio.create_task(event.wait()),
247 ),
248 timeout=3,
249 )
250 m.mock.assert_called_once()
252 assert event.is_set()
254 async def test_consume_ack_manual(
255 self,
256 queue: str,
257 stream: JStream,
258 ) -> None:
259 event = asyncio.Event()
261 consume_broker = self.get_broker(apply_types=True)
263 @consume_broker.subscriber(queue, stream=stream)
264 async def handler(msg: NatsMessage) -> None:
265 await msg.ack()
266 event.set()
268 async with self.patch_broker(consume_broker) as br:
269 await br.start()
271 with patch.object(Msg, "ack", spy_decorator(Msg.ack)) as m:
272 await asyncio.wait(
273 (
274 asyncio.create_task(br.publish("hello", queue)),
275 asyncio.create_task(event.wait()),
276 ),
277 timeout=3,
278 )
279 m.mock.assert_called_once()
281 assert event.is_set()
283 async def test_consume_ack_sync_manual(
284 self,
285 queue: str,
286 event: asyncio.Event,
287 stream: JStream,
288 ):
289 consume_broker = self.get_broker(apply_types=True)
291 @consume_broker.subscriber(queue, stream=stream)
292 async def handler(msg: NatsMessage):
293 await msg.ack_sync()
294 event.set()
296 async with self.patch_broker(consume_broker) as br:
297 await br.start()
299 with patch.object(Msg, "ack_sync", spy_decorator(Msg.ack_sync)) as m:
300 await asyncio.wait(
301 (
302 asyncio.create_task(br.publish("hello", queue)),
303 asyncio.create_task(event.wait()),
304 ),
305 timeout=3,
306 )
307 m.mock.assert_called_once()
309 assert event.is_set()
311 async def test_consume_ack_raise(
312 self,
313 queue: str,
314 stream: JStream,
315 ) -> None:
316 event = asyncio.Event()
318 consume_broker = self.get_broker(apply_types=True)
320 @consume_broker.subscriber(queue, stream=stream)
321 async def handler(msg: NatsMessage):
322 event.set()
323 raise AckMessage
325 async with self.patch_broker(consume_broker) as br:
326 await br.start()
328 with patch.object(Msg, "ack", spy_decorator(Msg.ack)) as m:
329 await asyncio.wait(
330 (
331 asyncio.create_task(br.publish("hello", queue)),
332 asyncio.create_task(event.wait()),
333 ),
334 timeout=3,
335 )
336 m.mock.assert_called_once()
338 assert event.is_set()
340 async def test_nack(
341 self,
342 queue: str,
343 stream: JStream,
344 ) -> None:
345 event = asyncio.Event()
347 consume_broker = self.get_broker(apply_types=True)
349 @consume_broker.subscriber(queue, stream=stream)
350 async def handler(msg: NatsMessage) -> None:
351 await msg.nack()
352 event.set()
354 async with self.patch_broker(consume_broker) as br:
355 await br.start()
357 with patch.object(Msg, "nak", spy_decorator(Msg.nak)) as m:
358 await asyncio.wait(
359 (
360 asyncio.create_task(br.publish("hello", queue)),
361 asyncio.create_task(event.wait()),
362 ),
363 timeout=3,
364 )
365 m.mock.assert_called_once()
367 assert event.is_set()
369 async def test_consume_no_ack(
370 self,
371 queue: str,
372 stream: str,
373 ) -> None:
374 event = asyncio.Event()
376 consume_broker = self.get_broker(apply_types=True)
378 @consume_broker.subscriber(
379 queue,
380 stream=stream,
381 ack_policy=AckPolicy.MANUAL,
382 )
383 async def handler(msg: NatsMessage) -> None:
384 event.set()
386 async with self.patch_broker(consume_broker) as br:
387 await br.start()
389 with patch.object(Msg, "ack", spy_decorator(Msg.ack)) as m:
390 await asyncio.wait(
391 (
392 asyncio.create_task(br.publish("hello", queue)),
393 asyncio.create_task(event.wait()),
394 ),
395 timeout=3,
396 )
397 m.mock.assert_not_called()
399 assert event.is_set()
401 async def test_consume_batch_headers(
402 self,
403 queue: str,
404 stream: JStream,
405 mock,
406 ) -> None:
407 event = asyncio.Event()
409 consume_broker = self.get_broker(apply_types=True)
411 @consume_broker.subscriber(
412 queue,
413 stream=stream,
414 pull_sub=PullSub(1, batch=True),
415 )
416 def subscriber(m, msg: NatsMessage) -> None:
417 check = all(
418 (
419 msg.headers,
420 [msg.headers] == msg.batch_headers,
421 msg.headers.get("custom") == "1",
422 ),
423 )
424 mock(check)
425 event.set()
427 async with self.patch_broker(consume_broker) as br:
428 await br.start()
429 await asyncio.wait(
430 (
431 asyncio.create_task(br.publish("", queue, headers={"custom": "1"})),
432 asyncio.create_task(event.wait()),
433 ),
434 timeout=3,
435 )
437 assert event.is_set()
438 mock.assert_called_once_with(True)
440 @pytest.mark.asyncio()
441 async def test_consume_kv(
442 self,
443 queue: str,
444 mock,
445 ) -> None:
446 event = asyncio.Event()
448 consume_broker = self.get_broker(apply_types=True)
450 @consume_broker.subscriber(queue, kv_watch=queue + "1")
451 async def handler(m) -> None:
452 mock(m)
453 event.set()
455 async with self.patch_broker(consume_broker) as br:
456 await br.start()
457 bucket = await br.key_value(queue + "1")
459 await asyncio.wait(
460 (
461 asyncio.create_task(
462 bucket.put(
463 queue,
464 b"world",
465 ),
466 ),
467 asyncio.create_task(event.wait()),
468 ),
469 timeout=3,
470 )
472 assert event.is_set()
473 mock.assert_called_with(b"world")
475 @pytest.mark.asyncio()
476 async def test_consume_os(
477 self,
478 queue: str,
479 mock,
480 ) -> None:
481 event = asyncio.Event()
483 consume_broker = self.get_broker(apply_types=True)
485 @consume_broker.subscriber(queue, obj_watch=True)
486 async def handler(filename: str) -> None:
487 event.set()
488 mock(filename)
490 async with self.patch_broker(consume_broker) as br:
491 await br.start()
492 bucket = await br.object_storage(queue)
494 await asyncio.wait(
495 (
496 asyncio.create_task(
497 bucket.put(
498 "hello",
499 b"world",
500 ),
501 ),
502 asyncio.create_task(event.wait()),
503 ),
504 timeout=3,
505 )
507 assert event.is_set()
508 mock.assert_called_once_with("hello")
510 async def test_get_one_js(
511 self,
512 queue: str,
513 stream: JStream,
514 ) -> None:
515 broker = self.get_broker(apply_types=True)
516 subscriber = broker.subscriber(queue, stream=stream)
518 async with self.patch_broker(broker) as br:
519 await br.start()
521 message = None
523 async def consume() -> None:
524 nonlocal message
525 message = await subscriber.get_one(timeout=5)
527 async def publish() -> None:
528 await br.publish("test_message", queue, stream=stream.name)
530 await asyncio.wait(
531 (
532 asyncio.create_task(consume()),
533 asyncio.create_task(publish()),
534 ),
535 timeout=10,
536 )
538 assert message is not None
539 assert await message.decode() == "test_message"
541 async def test_get_one_timeout_js(
542 self,
543 queue: str,
544 stream: JStream,
545 mock,
546 ) -> None:
547 broker = self.get_broker(apply_types=True)
548 subscriber = broker.subscriber(queue, stream=stream)
550 async with self.patch_broker(broker) as br:
551 await br.start()
553 mock(await subscriber.get_one(timeout=1e-24))
554 mock.assert_called_once_with(None)
556 async def test_get_one_pull(
557 self,
558 queue: str,
559 stream: JStream,
560 ) -> None:
561 broker = self.get_broker(apply_types=True)
562 subscriber = broker.subscriber(
563 queue,
564 stream=stream,
565 pull_sub=PullSub(1),
566 )
568 async with self.patch_broker(broker) as br:
569 await br.start()
571 message = None
573 async def consume() -> None:
574 nonlocal message
575 message = await subscriber.get_one(timeout=5)
577 async def publish() -> None:
578 await br.publish("test_message", queue)
580 await asyncio.wait(
581 (
582 asyncio.create_task(consume()),
583 asyncio.create_task(publish()),
584 ),
585 timeout=10,
586 )
588 assert message is not None
589 assert await message.decode() == "test_message"
591 async def test_get_one_pull_timeout(
592 self,
593 queue: str,
594 stream: JStream,
595 mock: MagicMock,
596 ) -> None:
597 broker = self.get_broker(apply_types=True)
598 subscriber = broker.subscriber(
599 queue,
600 stream=stream,
601 pull_sub=PullSub(1),
602 )
604 async with self.patch_broker(broker) as br:
605 await br.start()
607 mock(await subscriber.get_one(timeout=1e-24))
608 mock.assert_called_once_with(None)
610 async def test_get_one_batch(
611 self,
612 queue: str,
613 stream: JStream,
614 ) -> None:
615 broker = self.get_broker(apply_types=True)
616 subscriber = broker.subscriber(
617 queue,
618 stream=stream,
619 pull_sub=PullSub(1, batch=True),
620 )
622 async with self.patch_broker(broker) as br:
623 await br.start()
625 message = None
627 async def consume() -> None:
628 nonlocal message
629 message = await subscriber.get_one(timeout=5)
631 async def publish() -> None:
632 await br.publish("test_message", queue)
634 await asyncio.wait(
635 (
636 asyncio.create_task(consume()),
637 asyncio.create_task(publish()),
638 ),
639 timeout=10,
640 )
642 assert message is not None
643 assert await message.decode() == ["test_message"]
645 async def test_get_one_batch_timeout(
646 self,
647 queue: str,
648 stream: JStream,
649 mock: MagicMock,
650 ) -> None:
651 broker = self.get_broker(apply_types=True)
652 subscriber = broker.subscriber(
653 queue,
654 stream=stream,
655 pull_sub=PullSub(1, batch=True),
656 )
658 async with self.patch_broker(broker) as br:
659 await br.start()
661 mock(await subscriber.get_one(timeout=1e-24))
662 mock.assert_called_once_with(None)
664 async def test_get_one_with_filter(
665 self,
666 queue: str,
667 stream: JStream,
668 ) -> None:
669 broker = self.get_broker(apply_types=True)
670 subscriber = broker.subscriber(
671 config=ConsumerConfig(filter_subjects=[f"{queue}.a"]),
672 stream=JStream(queue, subjects=[f"{queue}.*"]),
673 )
675 async with self.patch_broker(broker) as br:
676 await br.start()
678 message = None
680 async def consume() -> None:
681 nonlocal message
682 message = await subscriber.get_one(timeout=5)
684 async def publish() -> None:
685 await br.publish("test_message", f"{queue}.a")
687 await asyncio.wait(
688 (
689 asyncio.create_task(publish()),
690 asyncio.create_task(consume()),
691 ),
692 timeout=10,
693 )
695 assert message is not None
696 assert await message.decode() == "test_message"
698 async def test_get_one_kv(
699 self,
700 queue: str,
701 stream: JStream,
702 ) -> None:
703 broker = self.get_broker(apply_types=True)
704 subscriber = broker.subscriber(queue, kv_watch=queue + "1")
706 async with self.patch_broker(broker) as br:
707 await br.start()
708 bucket = await br.key_value(queue + "1")
710 message = None
712 async def consume() -> None:
713 nonlocal message
714 message = await subscriber.get_one(timeout=5)
716 async def publish() -> None:
717 await bucket.put(queue, b"test_message")
719 await asyncio.wait(
720 (
721 asyncio.create_task(consume()),
722 asyncio.create_task(publish()),
723 ),
724 timeout=10,
725 )
727 assert message is not None
728 assert await message.decode() == b"test_message"
730 async def test_get_one_kv_timeout(
731 self,
732 queue: str,
733 stream: JStream,
734 mock: MagicMock,
735 ) -> None:
736 broker = self.get_broker(apply_types=True)
737 subscriber = broker.subscriber(queue, kv_watch=queue + "1")
739 async with self.patch_broker(broker) as br:
740 await br.start()
742 mock(await subscriber.get_one(timeout=1e-24))
743 mock.assert_called_once_with(None)
745 async def test_get_one_os(
746 self,
747 queue: str,
748 stream: JStream,
749 ) -> None:
750 broker = self.get_broker(apply_types=True)
751 subscriber = broker.subscriber(queue, obj_watch=True)
753 async with self.patch_broker(broker) as br:
754 await br.start()
755 bucket = await br.object_storage(queue)
757 new_object_id = None
759 async def consume() -> None:
760 nonlocal new_object_id
761 new_object_event = await subscriber.get_one(timeout=5)
762 new_object_id = await new_object_event.decode()
764 async def publish() -> None:
765 await bucket.put(queue, b"test_message")
767 await asyncio.wait(
768 (
769 asyncio.create_task(consume()),
770 asyncio.create_task(publish()),
771 ),
772 timeout=10,
773 )
775 new_object = await bucket.get(new_object_id)
776 assert new_object.data == b"test_message"
778 async def test_get_one_os_timeout(
779 self,
780 queue: str,
781 stream: JStream,
782 mock: MagicMock,
783 ) -> None:
784 broker = self.get_broker(apply_types=True)
785 subscriber = broker.subscriber(queue, obj_watch=True)
787 async with self.patch_broker(broker) as br:
788 await br.start()
790 mock(await subscriber.get_one(timeout=1e-24))
791 mock.assert_called_once_with(None)
793 async def test_iterator_js(
794 self,
795 queue: str,
796 stream: JStream,
797 ) -> None:
798 expected_messages = ("test_message_1", "test_message_2")
800 broker = self.get_broker(apply_types=True)
801 subscriber = broker.subscriber(queue, stream=stream)
803 async with self.patch_broker(broker) as br:
804 await br.start()
806 async def publish_test_message():
807 for msg in expected_messages:
808 await br.publish(msg, queue)
810 _ = await asyncio.create_task(publish_test_message())
812 index_message = 0
813 async for msg in subscriber: 813 ↛ exitline 813 didn't jump to the function exit
814 result_message = await msg.decode()
816 assert result_message == expected_messages[index_message]
818 index_message += 1
819 if index_message >= len(expected_messages):
820 break
822 async def test_iterator_pull(
823 self,
824 queue: str,
825 stream: JStream,
826 ) -> None:
827 expected_messages = ("test_message_1", "test_message_2")
829 broker = self.get_broker(apply_types=True)
830 subscriber = broker.subscriber(
831 queue,
832 stream=stream,
833 pull_sub=PullSub(1),
834 )
836 async with self.patch_broker(broker) as br:
837 await br.start()
839 async def publish_test_message():
840 for msg in expected_messages:
841 await br.publish(msg, queue)
843 _ = await asyncio.create_task(publish_test_message())
845 index_message = 0
846 async for msg in subscriber: 846 ↛ exitline 846 didn't jump to the function exit
847 result_message = await msg.decode()
849 assert result_message == expected_messages[index_message]
851 index_message += 1
852 if index_message >= len(expected_messages):
853 break
855 async def test_iterator_batch(
856 self,
857 queue: str,
858 stream: JStream,
859 ) -> None:
860 expected_messages = ("test_message_1", "test_message_2")
862 broker = self.get_broker(apply_types=True)
863 subscriber = broker.subscriber(
864 queue,
865 stream=stream,
866 pull_sub=PullSub(1, batch=True),
867 )
869 async with self.patch_broker(broker) as br:
870 await br.start()
872 async def publish_test_message():
873 for msg in expected_messages:
874 await br.publish(msg, queue)
876 _ = await asyncio.create_task(publish_test_message())
878 index_message = 0
879 async for msg in subscriber: 879 ↛ exitline 879 didn't jump to the function exit
880 result_message = await msg.decode()
882 assert result_message == [expected_messages[index_message]]
884 index_message += 1
885 if index_message >= len(expected_messages):
886 break
888 async def test_iterator_with_filter(
889 self,
890 queue: str,
891 ) -> None:
892 expected_messages = ("test_message_1", "test_message_2")
894 broker = self.get_broker(apply_types=True)
895 subscriber = broker.subscriber(
896 config=ConsumerConfig(filter_subjects=[f"{queue}.a"]),
897 stream=JStream(queue, subjects=[f"{queue}.*"]),
898 )
900 async with self.patch_broker(broker) as br:
901 await br.start()
903 async def publish_test_message():
904 for msg in expected_messages:
905 await br.publish(msg, f"{queue}.a")
907 _ = await asyncio.create_task(publish_test_message())
909 index_message = 0
910 async for msg in subscriber: 910 ↛ exitline 910 didn't jump to the function exit
911 result_message = await msg.decode()
913 assert result_message == expected_messages[index_message]
915 index_message += 1
916 if index_message >= len(expected_messages):
917 break
919 async def test_iterator_kv(
920 self,
921 queue: str,
922 ) -> None:
923 expected_messages = (b"test_message_1", b"test_message_2")
925 broker = self.get_broker(apply_types=True)
926 subscriber = broker.subscriber(queue, kv_watch=queue + "1")
928 async with self.patch_broker(broker) as br:
929 await br.start()
930 bucket = await br.key_value(queue + "1")
932 async def publish_test_message():
933 await bucket.put(queue, expected_messages[0])
935 _ = await asyncio.create_task(publish_test_message())
937 index_message = 0
938 async for msg in subscriber: 938 ↛ exitline 938 didn't jump to the function exit
939 result_message = await msg.decode()
941 assert result_message == expected_messages[index_message]
943 index_message += 1
944 if index_message >= len(expected_messages):
945 break
947 await bucket.put(queue, expected_messages[index_message])
949 async def test_iterator_os(
950 self,
951 queue: str,
952 ) -> None:
953 expected_messages = (b"test_message_1", b"test_message_2")
955 broker = self.get_broker(apply_types=True)
956 subscriber = broker.subscriber(queue, obj_watch=True)
958 async with self.patch_broker(broker) as br:
959 await br.start()
960 bucket = await br.object_storage(queue)
962 async def publish_test_message():
963 await bucket.put(queue, expected_messages[0])
965 _ = await asyncio.create_task(publish_test_message())
967 index_message = 0
968 async for new_object_event in subscriber: 968 ↛ exitline 968 didn't jump to the function exit
969 new_object_id = await new_object_event.decode()
970 new_object = await bucket.get(new_object_id)
972 assert new_object.data == expected_messages[index_message]
974 index_message += 1
975 if index_message >= len(expected_messages):
976 break
978 await bucket.put(queue, expected_messages[index_message])