Coverage for tests / brokers / redis / test_autoclaim.py: 99%
135 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import asyncio
2from contextlib import suppress
3from unittest.mock import MagicMock, patch
5import pytest
6from redis.asyncio import Redis
8from faststream.exceptions import NackMessage
9from faststream.redis import StreamSub
10from tests.brokers.base.consume import BrokerRealConsumeTestcase
11from tests.tools import spy_decorator
13from .basic import RedisTestcaseConfig
16@pytest.mark.connected()
17@pytest.mark.redis()
18@pytest.mark.asyncio()
19class TestAutoClaim(RedisTestcaseConfig, BrokerRealConsumeTestcase):
20 @pytest.mark.slow()
21 async def test_consume_stream_with_min_idle_time(
22 self,
23 queue: str,
24 mock: MagicMock,
25 event: asyncio.Event,
26 ) -> None:
27 """Verify that subscribers with min_idle_time use XAUTOCLAIM to reclaim pending messages."""
28 consume_broker = self.get_broker(apply_types=True)
30 @consume_broker.subscriber(
31 stream=StreamSub(
32 queue,
33 group="test_group",
34 consumer="consumer1",
35 ),
36 )
37 async def regular(msg: str) -> None:
38 raise NackMessage
40 @consume_broker.subscriber(
41 stream=StreamSub(
42 queue,
43 group="test_group",
44 consumer="consumer1",
45 min_idle_time=100, # 100ms
46 ),
47 )
48 async def retry(msg: str) -> None:
49 mock(msg)
50 event.set()
52 async with self.patch_broker(consume_broker) as br:
53 with (
54 patch.object(
55 Redis, "xautoclaim", spy_decorator(Redis.xautoclaim)
56 ) as xautoclaim,
57 patch.object(
58 Redis, "xreadgroup", spy_decorator(Redis.xreadgroup)
59 ) as xreadgroup,
60 ):
61 await br.start()
63 # First, publish a message and let it become pending
64 await br.publish("pending_message", stream=queue)
66 # The subscriber with XAUTOCLAIM should reclaim it
67 await asyncio.wait(
68 (asyncio.create_task(event.wait()),),
69 timeout=3,
70 )
72 assert event.is_set()
73 mock.assert_called_once_with("pending_message")
75 # Verify that XAUTOCLAIM was used, not XREADGROUP
76 assert xautoclaim.mock.called
77 assert xreadgroup.mock.called # regular subscriber uses xreadgroup
79 @pytest.mark.slow()
80 async def test_get_one_with_min_idle_time(
81 self,
82 queue: str,
83 ) -> None:
84 """Verify that get_one() method uses XAUTOCLAIM when min_idle_time is configured."""
85 broker = self.get_broker(apply_types=True)
87 async with self.patch_broker(broker) as br:
88 await br.start()
90 # First, create a pending message
91 await br.publish({"data": "pending"}, stream=queue)
92 with suppress(Exception):
93 await br._connection.xgroup_create(
94 queue, "idle_group", id="0", mkstream=True
95 )
97 # Read it but don't ack to make it pending
98 await br._connection.xreadgroup(
99 groupname="idle_group",
100 consumername="temp_consumer",
101 streams={queue: ">"},
102 count=1,
103 )
105 # Wait for it to become idle
106 await asyncio.sleep(0.1)
108 # Now use get_one with min_idle_time
109 subscriber = br.subscriber(
110 stream=StreamSub(
111 queue,
112 group="idle_group",
113 consumer="claiming_consumer",
114 min_idle_time=1,
115 )
116 )
118 with (
119 patch.object(
120 Redis, "xautoclaim", spy_decorator(Redis.xautoclaim)
121 ) as xautoclaim,
122 patch.object(
123 Redis, "xreadgroup", spy_decorator(Redis.xreadgroup)
124 ) as xreadgroup,
125 ):
126 message = await subscriber.get_one(timeout=3)
128 assert message is not None
129 decoded = await message.decode()
130 assert decoded == {"data": "pending"}
131 # Should use XAUTOCLAIM, not XREADGROUP
132 assert xautoclaim.mock.called
133 assert not xreadgroup.mock.called
135 @pytest.mark.slow()
136 async def test_get_one_with_min_idle_time_no_pending(
137 self,
138 queue: str,
139 mock: MagicMock,
140 ) -> None:
141 """Verify that get_one() returns None when no pending messages are available for claiming."""
142 broker = self.get_broker(apply_types=True)
144 subscriber = broker.subscriber(
145 stream=StreamSub(
146 queue,
147 group="empty_group",
148 consumer="consumer1",
149 min_idle_time=100,
150 )
151 )
153 async with self.patch_broker(broker) as br:
154 await br.start()
156 # Should return None after timeout
157 result = await subscriber.get_one(timeout=0.5)
158 mock(result)
160 mock.assert_called_once_with(None)
162 @pytest.mark.slow()
163 async def test_iterator_with_min_idle_time(
164 self,
165 queue: str,
166 mock: MagicMock,
167 ) -> None:
168 """Verify that async iterator uses XAUTOCLAIM when min_idle_time is configured."""
169 broker = self.get_broker(apply_types=True)
171 async with self.patch_broker(broker) as br:
172 await br.start()
174 # Create pending messages
175 await br.publish({"data": "msg1"}, stream=queue)
176 await br.publish({"data": "msg2"}, stream=queue)
178 with suppress(Exception):
179 await br._connection.xgroup_create(
180 queue, "iter_group", id="0", mkstream=True
181 )
183 # Read them but don't ack
184 await br._connection.xreadgroup(
185 groupname="iter_group",
186 consumername="temp",
187 streams={queue: ">"},
188 count=10,
189 )
191 await asyncio.sleep(0.1)
193 subscriber = br.subscriber(
194 stream=StreamSub(
195 queue,
196 group="iter_group",
197 consumer="iter_consumer",
198 min_idle_time=1,
199 )
200 )
202 with (
203 patch.object(
204 Redis, "xautoclaim", spy_decorator(Redis.xautoclaim)
205 ) as xautoclaim,
206 patch.object(
207 Redis, "xreadgroup", spy_decorator(Redis.xreadgroup)
208 ) as xreadgroup,
209 ):
210 count = 0
211 async for msg in subscriber: 211 ↛ 218line 211 didn't jump to line 218 because the loop on line 211 didn't complete
212 decoded = await msg.decode()
213 mock(decoded)
214 count += 1
215 if count >= 2:
216 break
218 assert count == 2
219 mock.assert_any_call({"data": "msg1"})
220 mock.assert_any_call({"data": "msg2"})
221 # Should use XAUTOCLAIM, not XREADGROUP
222 assert xautoclaim.mock.called
223 assert not xreadgroup.mock.called
225 @pytest.mark.slow()
226 async def test_consume_stream_batch_with_min_idle_time(
227 self,
228 queue: str,
229 mock: MagicMock,
230 event: asyncio.Event,
231 ) -> None:
232 """Verify that batch subscribers use XAUTOCLAIM when min_idle_time is configured."""
233 consume_broker = self.get_broker(apply_types=True)
235 @consume_broker.subscriber(
236 stream=StreamSub(
237 queue,
238 group="batch_group",
239 consumer="batch_consumer",
240 batch=True,
241 min_idle_time=1,
242 ),
243 )
244 async def handler(msg: list) -> None:
245 mock(msg)
246 event.set()
248 async with self.patch_broker(consume_broker) as br:
249 # Create a pending message first (before starting subscriber)
250 await br.publish({"data": "batch_msg"}, stream=queue)
252 with suppress(Exception):
253 await br._connection.xgroup_create(
254 queue, "batch_group", id="0", mkstream=True
255 )
257 # Read but don't ack (before starting subscriber)
258 await br._connection.xreadgroup(
259 groupname="batch_group",
260 consumername="temp",
261 streams={queue: ">"},
262 count=1,
263 )
265 await asyncio.sleep(0.1)
267 # Now start subscriber and track calls
268 with (
269 patch.object(
270 Redis, "xautoclaim", spy_decorator(Redis.xautoclaim)
271 ) as xautoclaim,
272 patch.object(
273 Redis, "xreadgroup", spy_decorator(Redis.xreadgroup)
274 ) as xreadgroup,
275 ):
276 await br.start()
278 # Now the subscriber should reclaim it
279 await asyncio.wait(
280 (asyncio.create_task(event.wait()),),
281 timeout=3,
282 )
284 assert event.is_set()
285 # In batch mode, should receive list
286 assert mock.call_count == 1
287 called_with = mock.call_args[0][0]
288 assert isinstance(called_with, list)
289 assert len(called_with) > 0
290 # Should use XAUTOCLAIM, not XREADGROUP
291 assert xautoclaim.mock.called
292 assert not xreadgroup.mock.called
294 @pytest.mark.slow()
295 async def test_xautoclaim_with_deleted_messages(
296 self,
297 queue: str,
298 mock: MagicMock,
299 ) -> None:
300 """Verify that XAUTOCLAIM handles deleted messages gracefully without errors."""
301 consume_broker = self.get_broker(apply_types=True)
303 async with self.patch_broker(consume_broker) as br:
304 await br.start()
306 # Create and consume a message without ack
307 msg_id = await br.publish({"data": "will_delete"}, stream=queue)
309 with suppress(Exception):
310 await br._connection.xgroup_create(
311 queue, "delete_group", id="0", mkstream=True
312 )
314 # Read to make it pending
315 await br._connection.xreadgroup(
316 groupname="delete_group",
317 consumername="temp",
318 streams={queue: ">"},
319 count=1,
320 )
322 # Delete the message from stream
323 await br._connection.xdel(queue, msg_id)
325 await asyncio.sleep(0.1)
327 # XAUTOCLAIM should handle deleted messages gracefully
328 subscriber = br.subscriber(
329 stream=StreamSub(
330 queue,
331 group="delete_group",
332 consumer="delete_consumer",
333 min_idle_time=1,
334 )
335 )
337 # Should timeout gracefully without errors
338 result = await subscriber.get_one(timeout=0.5)
339 mock(result)
341 # Should return None (no valid messages to claim)
342 mock.assert_called_once_with(None)
344 @pytest.mark.slow()
345 async def test_xautoclaim_circular_scanning_with_idle_timeout(
346 self,
347 queue: str,
348 mock: MagicMock,
349 ) -> None:
350 """Verify that XAUTOCLAIM performs circular scanning and claims messages as they become idle."""
351 consume_broker = self.get_broker(apply_types=True)
353 async with self.patch_broker(consume_broker) as br:
354 await br.start()
356 # Create multiple pending messages
357 msg_ids = []
358 for i in range(5):
359 msg_id = await br.publish({"data": f"msg{i}"}, stream=queue)
360 msg_ids.append(msg_id)
362 with suppress(Exception):
363 await br._connection.xgroup_create(
364 queue, "circular_group", id="0", mkstream=True
365 )
367 # Read all messages with consumer1 but don't ack - making them pending
368 await br._connection.xreadgroup(
369 groupname="circular_group",
370 consumername="consumer1",
371 streams={queue: ">"},
372 count=10,
373 )
375 # Wait for messages to become idle
376 await asyncio.sleep(0.1)
378 # Create subscriber with XAUTOCLAIM
379 subscriber = br.subscriber(
380 stream=StreamSub(
381 queue,
382 group="circular_group",
383 consumer="consumer2",
384 min_idle_time=1,
385 )
386 )
388 # First pass: claim all messages one by one
389 claimed_messages_first_pass = []
390 for _ in range(5):
391 msg = await subscriber.get_one(timeout=1)
392 if msg: 392 ↛ 390line 392 didn't jump to line 390 because the condition on line 392 was always true
393 decoded = await msg.decode()
394 claimed_messages_first_pass.append(decoded)
395 mock(f"first_pass_{decoded['data']}")
397 # Should have claimed all 5 messages in order
398 assert len(claimed_messages_first_pass) == 5
399 assert claimed_messages_first_pass == [{"data": f"msg{i}"} for i in range(5)]
401 # After reaching the end, XAUTOCLAIM should restart from "0-0"
402 # and scan circularly - messages are still pending since we didn't ACK them
403 # Second pass: verify circular behavior by claiming messages again
404 msg = await subscriber.get_one(timeout=1)
405 assert msg is not None
406 decoded = await msg.decode()
407 # Should get msg0 again (circular scan restarted)
408 assert decoded["data"] == "msg0"
409 mock("second_pass_msg0")
411 # Verify messages were claimed in both passes
412 mock.assert_any_call("first_pass_msg0")
413 mock.assert_any_call("second_pass_msg0")