Coverage for docs / docs_src / redis / stream / claiming_manual_ack.py: 90%

10 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from faststream import AckPolicy, FastStream, Logger 

2from faststream.redis import RedisBroker, RedisStreamMessage, StreamSub, Redis 

3 

4broker = RedisBroker() 

5app = FastStream(broker) 

6 

7 

8@broker.subscriber( 

9 stream=StreamSub( 

10 "critical-tasks", 

11 group="task-workers", 

12 consumer="worker-failover", 

13 min_idle_time=30000, # 30 seconds 

14 ), 

15 ack_policy=AckPolicy.MANUAL, 

16) 

17async def handle(msg: RedisStreamMessage, logger: Logger, redis: Redis): 

18 try: 

19 # Process the claimed message 

20 logger.info(f"Processing: {msg.body}") 

21 # Explicitly acknowledge after successful processing 

22 await msg.ack(redis=redis, group="critical-tasks") 

23 except Exception as e: 

24 # Don't acknowledge - let it be claimed by another consumer 

25 logger.error(f"Failed to process: {e}") 

26 

27 

28@app.after_startup 

29async def publish_test(): 

30 await broker.publish("critical-task-1", stream="critical-tasks")