Redis 延迟队列
延迟队列用于处理需要延时执行的任务,如订单超时取消、定时提醒、延迟推送等场景。
延迟队列原理
核心设计
Bash
- 使用ZSet存储任务
- 任务执行时间作为score
- 时间戳越小越优先执行
- 循环检查到期任务并执行
任务数据结构
Bash
# 任务ID作为member,执行时间作为score
ZADD delay:queue 1700001000 "order:1001"
# 任务详细信息单独存储
SET task:order:1001 '{"orderId":1001,"action":"cancel"}'
ZSet实现延迟队列
添加延迟任务
Python
# 计算执行时间戳
execute_time = current_time + delay_seconds
# 添加到延迟队列
ZADD delay:queue execute_time "task_id"
# 存储任务详情
SET task:detail:task_id task_data
添加任务示例
Bash
def add_delay_task(task_id, task_data, delay_seconds):
# 计算执行时间
execute_time = int(time.time()) + delay_seconds
# 添加到延迟队列
redis.zadd("delay:queue", {task_id: execute_time})
# 存储任务详情
redis.set(f"task:detail:{task_id}", json.dumps(task_data))
获取到期任务
Python
# 获取当前时间戳之前的任务
ZRANGEBYSCORE delay:queue 0 {current_time}
# 获取到期任务数量
ZCOUNT delay:queue 0 {current_time}
执行到期任务
Python
def consume_delay_queue():
current_time = int(time.time())
# 获取到期任务
tasks = redis.zrangebyscore("delay:queue", 0, current_time)
for task_id in tasks:
# 尝试获取任务(防止多消费者重复处理)
if redis.zrem("delay:queue", task_id):
# 获取任务详情
task_data = redis.get(f"task:detail:{task_id}")
# 执行任务
try:
execute_task(task_id, task_data)
# 成功后删除任务详情
redis.delete(f"task:detail:{task_id}")
except:
# 失败处理
handle_failure(task_id, task_data)
消费者实现
单消费者循环
Python
def delay_queue_consumer():
while True:
current_time = int(time.time())
# 获取到期任务
tasks = redis.zrangebyscore("delay:queue", 0, current_time, start=0, num=10)
for task_id in tasks:
if redis.zrem("delay:queue", task_id):
process_task(task_id)
# 无任务时等待
if not tasks:
# 获取下一条任务的执行时间
next_task = redis.zrange("delay:queue", 0, 0, withscores=True)
if next_task:
next_time = next_task[0][1]
wait_time = max(0, next_time - current_time)
time.sleep(min(wait_time, 1))
else:
time.sleep(1)
多消费者竞争
Python
def multi_consumer():
while True:
current_time = int(time.time())
# 获取到期任务
tasks = redis.zrangebyscore("delay:queue", 0, current_time)
for task_id in tasks:
# ZREM原子操作保证只有一个消费者获取
if redis.zrem("delay:queue", task_id):
process_task(task_id)
break # 处理一条后重新循环
time.sleep(0.1)
ZREM原子操作保证任务不会被多个消费者重复获取。
任务失败重试
重试机制
Python
def process_task_with_retry(task_id, max_retry=3):
task_data = redis.get(f"task:detail:{task_id}")
retry_count = redis.get(f"task:retry:{task_id}") or 0
try:
execute_task(task_id, task_data)
# 成功,清理数据
redis.delete(f"task:detail:{task_id}")
redis.delete(f"task:retry:{task_id}")
except Exception as e:
retry_count = int(retry_count) + 1
if retry_count < max_retry:
# 重试,延迟时间递增
delay = retry_count * 60 # 第1次60秒,第2次120秒
new_time = int(time.time()) + delay
redis.zadd("delay:queue", {task_id: new_time})
redis.set(f"task:retry:{task_id}", retry_count)
else:
# 超过最大重试次数
handle_task_failure(task_id, task_data)
redis.delete(f"task:detail:{task_id}")
redis.delete(f"task:retry:{task_id}")
死信队列
Python
def handle_task_failure(task_id, task_data):
# 超过重试次数,放入死信队列
redis.lpush("delay:dead_letter", task_id)
redis.set(f"dead:detail:{task_id}", task_data)
# 告警
alert(f"Task {task_id} failed after max retries")
应用场景详解
1. 订单超时取消
设计方案
Python
- 订单创建后30分钟未支付自动取消
- 创建订单时添加延迟任务
- 到期检查订单状态,未支付则取消
实现
Python
def create_order(order_id, user_id):
# 创建订单
db.insert_order(order_id, user_id, status="pending")
# 添加30分钟后取消的延迟任务
task_id = f"order_cancel:{order_id}"
task_data = {"orderId": order_id, "action": "cancel"}
add_delay_task(task_id, task_data, 1800) # 30分钟
def cancel_unpaid_order(order_id):
# 检查订单状态
order = db.query_order(order_id)
if order.status == "pending":
# 未支付,取消订单
db.update_order(order_id, status="cancelled")
# 通知用户
send_notification(user_id, "订单已取消")
else:
# 已支付,忽略
pass
def pay_order(order_id):
# 用户支付
db.update_order(order_id, status="paid")
# 删除延迟任务(已支付不需要取消)
redis.zrem("delay:queue", f"order_cancel:{order_id}")
2. 定时提醒/通知
实现
Python
def schedule_reminder(user_id, message, remind_time):
# remind_time为提醒时间戳
task_id = f"reminder:{uuid()}"
task_data = {"userId": user_id, "message": message}
redis.zadd("delay:queue", {task_id: remind_time})
redis.set(f"task:detail:{task_id}", json.dumps(task_data))
def send_reminder(task_data):
# 发送提醒
send_notification(task_data["userId"], task_data["message"])
3. 延迟推送
实现
Python
def schedule_push(user_id, content, delay_seconds):
task_id = f"push:{uuid()}"
task_data = {"userId": user_id, "content": content}
execute_time = int(time.time()) + delay_seconds
redis.zadd("delay:queue", {task_id: execute_time})
redis.set(f"task:detail:{task_id}", json.dumps(task_data))
4. 定时数据同步
实现
Python
def schedule_sync(source, target, sync_time):
task_id = f"sync:{source}:{target}"
task_data = {"source": source, "target": target}
redis.zadd("delay:queue", {task_id: sync_time})
redis.set(f"task:detail:{task_id}", json.dumps(task_data))
性能优化
批量获取任务
Python
# 批量获取到期任务
tasks = redis.zrangebyscore("delay:queue", 0, current_time, start=0, num=100)
Pipeline优化
Python
# 使用Pipeline批量操作
pipe = redis.pipeline()
for task_id in tasks:
pipe.zrem("delay:queue", task_id)
results = pipe.execute()
# 检查哪些任务成功获取
for i, task_id in enumerate(tasks):
if results[i]:
process_task(task_id)
任务分片
Python
# 多队列分片
for i in range(10):
queue_key = f"delay:queue:{i}"
# 各消费者处理不同队列
监控与告警
队列监控
text
# 队列积压监控
pending_count = redis.zcard("delay:queue")
if pending_count > 10000:
alert("延迟队列积压过多")
# 任务延迟监控
current_time = int(time.time())
overdue_tasks = redis.zrangebyscore("delay:queue", 0, current_time - 60)
if overdue_tasks:
alert(f"有{len(overdue_tasks)}个任务超期未处理")
告警设置
text
# 监控指标
metrics = {
"pending_tasks": redis.zcard("delay:queue"),
"overdue_tasks": len(redis.zrangebyscore("delay:queue", 0, current_time - 60)),
"dead_letter_count": redis.llen("delay:dead_letter")
}
要点总结
- ZSet实现延迟队列,执行时间作为score
- ZRANGEBYSCORE获取到期任务,ZREM原子获取防止重复
- 任务详情单独存储,任务ID作为ZSet的member
- 单消费者循环:获取、处理、等待下一条任务
- 多消费者竞争:ZREM保证只有一个消费者获取任务
- 失败重试:递增延迟时间,超过次数入死信队列
- 订单超时取消:创建订单时添加延迟任务,支付后删除任务
- 定时提醒、延迟推送、定时同步适合延迟队列
- 监控队列积压和超期任务,及时告警
- Pipeline批量获取和删除,提高效率
📝 发现内容有误?点击此处直接编辑