Coverage for tests / brokers / redis / test_autoclaim.py: 99%

135 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import asyncio 

2from contextlib import suppress 

3from unittest.mock import MagicMock, patch 

4 

5import pytest 

6from redis.asyncio import Redis 

7 

8from faststream.exceptions import NackMessage 

9from faststream.redis import StreamSub 

10from tests.brokers.base.consume import BrokerRealConsumeTestcase 

11from tests.tools import spy_decorator 

12 

13from .basic import RedisTestcaseConfig 

14 

15 

16@pytest.mark.connected() 

17@pytest.mark.redis() 

18@pytest.mark.asyncio() 

19class TestAutoClaim(RedisTestcaseConfig, BrokerRealConsumeTestcase): 

20 @pytest.mark.slow() 

21 async def test_consume_stream_with_min_idle_time( 

22 self, 

23 queue: str, 

24 mock: MagicMock, 

25 event: asyncio.Event, 

26 ) -> None: 

27 """Verify that subscribers with min_idle_time use XAUTOCLAIM to reclaim pending messages.""" 

28 consume_broker = self.get_broker(apply_types=True) 

29 

30 @consume_broker.subscriber( 

31 stream=StreamSub( 

32 queue, 

33 group="test_group", 

34 consumer="consumer1", 

35 ), 

36 ) 

37 async def regular(msg: str) -> None: 

38 raise NackMessage 

39 

40 @consume_broker.subscriber( 

41 stream=StreamSub( 

42 queue, 

43 group="test_group", 

44 consumer="consumer1", 

45 min_idle_time=100, # 100ms 

46 ), 

47 ) 

48 async def retry(msg: str) -> None: 

49 mock(msg) 

50 event.set() 

51 

52 async with self.patch_broker(consume_broker) as br: 

53 with ( 

54 patch.object( 

55 Redis, "xautoclaim", spy_decorator(Redis.xautoclaim) 

56 ) as xautoclaim, 

57 patch.object( 

58 Redis, "xreadgroup", spy_decorator(Redis.xreadgroup) 

59 ) as xreadgroup, 

60 ): 

61 await br.start() 

62 

63 # First, publish a message and let it become pending 

64 await br.publish("pending_message", stream=queue) 

65 

66 # The subscriber with XAUTOCLAIM should reclaim it 

67 await asyncio.wait( 

68 (asyncio.create_task(event.wait()),), 

69 timeout=3, 

70 ) 

71 

72 assert event.is_set() 

73 mock.assert_called_once_with("pending_message") 

74 

75 # Verify that XAUTOCLAIM was used, not XREADGROUP 

76 assert xautoclaim.mock.called 

77 assert xreadgroup.mock.called # regular subscriber uses xreadgroup 

78 

79 @pytest.mark.slow() 

80 async def test_get_one_with_min_idle_time( 

81 self, 

82 queue: str, 

83 ) -> None: 

84 """Verify that get_one() method uses XAUTOCLAIM when min_idle_time is configured.""" 

85 broker = self.get_broker(apply_types=True) 

86 

87 async with self.patch_broker(broker) as br: 

88 await br.start() 

89 

90 # First, create a pending message 

91 await br.publish({"data": "pending"}, stream=queue) 

92 with suppress(Exception): 

93 await br._connection.xgroup_create( 

94 queue, "idle_group", id="0", mkstream=True 

95 ) 

96 

97 # Read it but don't ack to make it pending 

98 await br._connection.xreadgroup( 

99 groupname="idle_group", 

100 consumername="temp_consumer", 

101 streams={queue: ">"}, 

102 count=1, 

103 ) 

104 

105 # Wait for it to become idle 

106 await asyncio.sleep(0.1) 

107 

108 # Now use get_one with min_idle_time 

109 subscriber = br.subscriber( 

110 stream=StreamSub( 

111 queue, 

112 group="idle_group", 

113 consumer="claiming_consumer", 

114 min_idle_time=1, 

115 ) 

116 ) 

117 

118 with ( 

119 patch.object( 

120 Redis, "xautoclaim", spy_decorator(Redis.xautoclaim) 

121 ) as xautoclaim, 

122 patch.object( 

123 Redis, "xreadgroup", spy_decorator(Redis.xreadgroup) 

124 ) as xreadgroup, 

125 ): 

126 message = await subscriber.get_one(timeout=3) 

127 

128 assert message is not None 

129 decoded = await message.decode() 

130 assert decoded == {"data": "pending"} 

131 # Should use XAUTOCLAIM, not XREADGROUP 

132 assert xautoclaim.mock.called 

133 assert not xreadgroup.mock.called 

134 

135 @pytest.mark.slow() 

136 async def test_get_one_with_min_idle_time_no_pending( 

137 self, 

138 queue: str, 

139 mock: MagicMock, 

140 ) -> None: 

141 """Verify that get_one() returns None when no pending messages are available for claiming.""" 

142 broker = self.get_broker(apply_types=True) 

143 

144 subscriber = broker.subscriber( 

145 stream=StreamSub( 

146 queue, 

147 group="empty_group", 

148 consumer="consumer1", 

149 min_idle_time=100, 

150 ) 

151 ) 

152 

153 async with self.patch_broker(broker) as br: 

154 await br.start() 

155 

156 # Should return None after timeout 

157 result = await subscriber.get_one(timeout=0.5) 

158 mock(result) 

159 

160 mock.assert_called_once_with(None) 

161 

162 @pytest.mark.slow() 

163 async def test_iterator_with_min_idle_time( 

164 self, 

165 queue: str, 

166 mock: MagicMock, 

167 ) -> None: 

168 """Verify that async iterator uses XAUTOCLAIM when min_idle_time is configured.""" 

169 broker = self.get_broker(apply_types=True) 

170 

171 async with self.patch_broker(broker) as br: 

172 await br.start() 

173 

174 # Create pending messages 

175 await br.publish({"data": "msg1"}, stream=queue) 

176 await br.publish({"data": "msg2"}, stream=queue) 

177 

178 with suppress(Exception): 

179 await br._connection.xgroup_create( 

180 queue, "iter_group", id="0", mkstream=True 

181 ) 

182 

183 # Read them but don't ack 

184 await br._connection.xreadgroup( 

185 groupname="iter_group", 

186 consumername="temp", 

187 streams={queue: ">"}, 

188 count=10, 

189 ) 

190 

191 await asyncio.sleep(0.1) 

192 

193 subscriber = br.subscriber( 

194 stream=StreamSub( 

195 queue, 

196 group="iter_group", 

197 consumer="iter_consumer", 

198 min_idle_time=1, 

199 ) 

200 ) 

201 

202 with ( 

203 patch.object( 

204 Redis, "xautoclaim", spy_decorator(Redis.xautoclaim) 

205 ) as xautoclaim, 

206 patch.object( 

207 Redis, "xreadgroup", spy_decorator(Redis.xreadgroup) 

208 ) as xreadgroup, 

209 ): 

210 count = 0 

211 async for msg in subscriber: 211 ↛ 218line 211 didn't jump to line 218 because the loop on line 211 didn't complete

212 decoded = await msg.decode() 

213 mock(decoded) 

214 count += 1 

215 if count >= 2: 

216 break 

217 

218 assert count == 2 

219 mock.assert_any_call({"data": "msg1"}) 

220 mock.assert_any_call({"data": "msg2"}) 

221 # Should use XAUTOCLAIM, not XREADGROUP 

222 assert xautoclaim.mock.called 

223 assert not xreadgroup.mock.called 

224 

225 @pytest.mark.slow() 

226 async def test_consume_stream_batch_with_min_idle_time( 

227 self, 

228 queue: str, 

229 mock: MagicMock, 

230 event: asyncio.Event, 

231 ) -> None: 

232 """Verify that batch subscribers use XAUTOCLAIM when min_idle_time is configured.""" 

233 consume_broker = self.get_broker(apply_types=True) 

234 

235 @consume_broker.subscriber( 

236 stream=StreamSub( 

237 queue, 

238 group="batch_group", 

239 consumer="batch_consumer", 

240 batch=True, 

241 min_idle_time=1, 

242 ), 

243 ) 

244 async def handler(msg: list) -> None: 

245 mock(msg) 

246 event.set() 

247 

248 async with self.patch_broker(consume_broker) as br: 

249 # Create a pending message first (before starting subscriber) 

250 await br.publish({"data": "batch_msg"}, stream=queue) 

251 

252 with suppress(Exception): 

253 await br._connection.xgroup_create( 

254 queue, "batch_group", id="0", mkstream=True 

255 ) 

256 

257 # Read but don't ack (before starting subscriber) 

258 await br._connection.xreadgroup( 

259 groupname="batch_group", 

260 consumername="temp", 

261 streams={queue: ">"}, 

262 count=1, 

263 ) 

264 

265 await asyncio.sleep(0.1) 

266 

267 # Now start subscriber and track calls 

268 with ( 

269 patch.object( 

270 Redis, "xautoclaim", spy_decorator(Redis.xautoclaim) 

271 ) as xautoclaim, 

272 patch.object( 

273 Redis, "xreadgroup", spy_decorator(Redis.xreadgroup) 

274 ) as xreadgroup, 

275 ): 

276 await br.start() 

277 

278 # Now the subscriber should reclaim it 

279 await asyncio.wait( 

280 (asyncio.create_task(event.wait()),), 

281 timeout=3, 

282 ) 

283 

284 assert event.is_set() 

285 # In batch mode, should receive list 

286 assert mock.call_count == 1 

287 called_with = mock.call_args[0][0] 

288 assert isinstance(called_with, list) 

289 assert len(called_with) > 0 

290 # Should use XAUTOCLAIM, not XREADGROUP 

291 assert xautoclaim.mock.called 

292 assert not xreadgroup.mock.called 

293 

294 @pytest.mark.slow() 

295 async def test_xautoclaim_with_deleted_messages( 

296 self, 

297 queue: str, 

298 mock: MagicMock, 

299 ) -> None: 

300 """Verify that XAUTOCLAIM handles deleted messages gracefully without errors.""" 

301 consume_broker = self.get_broker(apply_types=True) 

302 

303 async with self.patch_broker(consume_broker) as br: 

304 await br.start() 

305 

306 # Create and consume a message without ack 

307 msg_id = await br.publish({"data": "will_delete"}, stream=queue) 

308 

309 with suppress(Exception): 

310 await br._connection.xgroup_create( 

311 queue, "delete_group", id="0", mkstream=True 

312 ) 

313 

314 # Read to make it pending 

315 await br._connection.xreadgroup( 

316 groupname="delete_group", 

317 consumername="temp", 

318 streams={queue: ">"}, 

319 count=1, 

320 ) 

321 

322 # Delete the message from stream 

323 await br._connection.xdel(queue, msg_id) 

324 

325 await asyncio.sleep(0.1) 

326 

327 # XAUTOCLAIM should handle deleted messages gracefully 

328 subscriber = br.subscriber( 

329 stream=StreamSub( 

330 queue, 

331 group="delete_group", 

332 consumer="delete_consumer", 

333 min_idle_time=1, 

334 ) 

335 ) 

336 

337 # Should timeout gracefully without errors 

338 result = await subscriber.get_one(timeout=0.5) 

339 mock(result) 

340 

341 # Should return None (no valid messages to claim) 

342 mock.assert_called_once_with(None) 

343 

344 @pytest.mark.slow() 

345 async def test_xautoclaim_circular_scanning_with_idle_timeout( 

346 self, 

347 queue: str, 

348 mock: MagicMock, 

349 ) -> None: 

350 """Verify that XAUTOCLAIM performs circular scanning and claims messages as they become idle.""" 

351 consume_broker = self.get_broker(apply_types=True) 

352 

353 async with self.patch_broker(consume_broker) as br: 

354 await br.start() 

355 

356 # Create multiple pending messages 

357 msg_ids = [] 

358 for i in range(5): 

359 msg_id = await br.publish({"data": f"msg{i}"}, stream=queue) 

360 msg_ids.append(msg_id) 

361 

362 with suppress(Exception): 

363 await br._connection.xgroup_create( 

364 queue, "circular_group", id="0", mkstream=True 

365 ) 

366 

367 # Read all messages with consumer1 but don't ack - making them pending 

368 await br._connection.xreadgroup( 

369 groupname="circular_group", 

370 consumername="consumer1", 

371 streams={queue: ">"}, 

372 count=10, 

373 ) 

374 

375 # Wait for messages to become idle 

376 await asyncio.sleep(0.1) 

377 

378 # Create subscriber with XAUTOCLAIM 

379 subscriber = br.subscriber( 

380 stream=StreamSub( 

381 queue, 

382 group="circular_group", 

383 consumer="consumer2", 

384 min_idle_time=1, 

385 ) 

386 ) 

387 

388 # First pass: claim all messages one by one 

389 claimed_messages_first_pass = [] 

390 for _ in range(5): 

391 msg = await subscriber.get_one(timeout=1) 

392 if msg: 392 ↛ 390line 392 didn't jump to line 390 because the condition on line 392 was always true

393 decoded = await msg.decode() 

394 claimed_messages_first_pass.append(decoded) 

395 mock(f"first_pass_{decoded['data']}") 

396 

397 # Should have claimed all 5 messages in order 

398 assert len(claimed_messages_first_pass) == 5 

399 assert claimed_messages_first_pass == [{"data": f"msg{i}"} for i in range(5)] 

400 

401 # After reaching the end, XAUTOCLAIM should restart from "0-0" 

402 # and scan circularly - messages are still pending since we didn't ACK them 

403 # Second pass: verify circular behavior by claiming messages again 

404 msg = await subscriber.get_one(timeout=1) 

405 assert msg is not None 

406 decoded = await msg.decode() 

407 # Should get msg0 again (circular scan restarted) 

408 assert decoded["data"] == "msg0" 

409 mock("second_pass_msg0") 

410 

411 # Verify messages were claimed in both passes 

412 mock.assert_any_call("first_pass_msg0") 

413 mock.assert_any_call("second_pass_msg0")