Coverage for docs / docs_src / redis / stream / claiming_manual_ack.py: 90%
10 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from faststream import AckPolicy, FastStream, Logger
2from faststream.redis import RedisBroker, RedisStreamMessage, StreamSub, Redis
4broker = RedisBroker()
5app = FastStream(broker)
8@broker.subscriber(
9 stream=StreamSub(
10 "critical-tasks",
11 group="task-workers",
12 consumer="worker-failover",
13 min_idle_time=30000, # 30 seconds
14 ),
15 ack_policy=AckPolicy.MANUAL,
16)
17async def handle(msg: RedisStreamMessage, logger: Logger, redis: Redis):
18 try:
19 # Process the claimed message
20 logger.info(f"Processing: {msg.body}")
21 # Explicitly acknowledge after successful processing
22 await msg.ack(redis=redis, group="critical-tasks")
23 except Exception as e:
24 # Don't acknowledge - let it be claimed by another consumer
25 logger.error(f"Failed to process: {e}")
28@app.after_startup
29async def publish_test():
30 await broker.publish("critical-task-1", stream="critical-tasks")