Redis 消息队列
Redis可以实现轻量级消息队列,适合中小规模场景,相比专业MQ更简单易用。
简单队列实现
List队列
Bash
# 生产者:从右侧推入
RPUSH queue:tasks "task_data"
# 消费者:从左侧弹出
LPOP queue:tasks
# 阻塞弹出(等待消息)
BLPOP queue:tasks 30
# 30秒超时,0表示无限等待
多消费者
Bash
# 多消费者从同一队列拉取
# 每个消费者获取不同消息
# 简单负载均衡
队列长度监控
Bash
# 查看队列长度
LLEN queue:tasks
# 监控告警
if LLEN > 10000:
alert("队列积压")
可靠队列
RPOPLPUSH实现
Bash
# 消费消息时同时移到备份队列
RPOPLPUSH queue:tasks queue:backup
# 处理完成后删除备份
LREM queue:backup 1 "task_data"
# 处理失败时消息仍在备份队列
# 可重新处理
阻塞版本
Bash
# 阻塞获取并移到备份
BRPOPLPUSH queue:tasks queue:backup 30
消息确认机制
Python
def consume_task():
# 获取消息并移到备份队列
task = redis.brpoplpush("queue:tasks", "queue:backup", 30)
if task:
try:
# 处理任务
process(task)
# 成功后删除备份
redis.lrem("queue:backup", 1, task)
except:
# 处理失败,消息仍在备份队列
# 可稍后重试
pass
发布订阅模式
基本使用
Bash
# 发布消息
PUBLISH channel:news "news_content"
# 订阅频道
SUBSCRIBE channel:news
# 模式订阅
PSUBSCRIBE channel:*
# 取消订阅
UNSUBSCRIBE channel:news
多订阅者
Bash
# 多个订阅者同时订阅
# 所有订阅者都收到消息
# 广播模式
缺点
Bash
- 消息不持久化,订阅者离线丢失
- 无消息确认机制
- 不支持消费组
- 适合实时推送而非可靠队列
Stream消息队列
Stream特点(Redis 5.0+)
Bash
- 消息持久化
- 支持消费组
- 消息确认机制
- 消息ID有序
- 支持阻塞读取
基本操作
Bash
# 添加消息
XADD stream:orders * field1 value1 field2 value2
# 返回: 1700000000-0(消息ID)
# 读取消息
XREAD COUNT 10 STREAMS stream:orders 0
# 从ID=0开始读取10条
# 阻塞读取
XREAD COUNT 10 BLOCK 5000 STREAMS stream:orders $
# 等待新消息,5秒超时
消费组
Bash
# 创建消费组
XGROUP CREATE stream:orders group1 0
# 消费者读取
XREADGROUP GROUP group1 consumer1 COUNT 10 STREAMS stream:orders >
# 消息确认
XACK stream:orders group1 message_id
# 查看消费组信息
XINFO GROUPS stream:orders
未处理消息
Python
# 查看待处理消息
XPENDING stream:orders group1
# 消息详情
XPENDING stream:orders group1 - + 10
# 转移消息给其他消费者
XCLAIM stream:orders group1 consumer2 0 message_id
消息队列对比
| 特性 | List | Pub/Sub | Stream |
|---|---|---|---|
| 持久化 | 支持 | 不支持 | 支持 |
| 消息确认 | 手动实现 | 不支持 | 支持 |
| 消费组 | 不支持 | 不支持 | 支持 |
| 多消费者 | 负载均衡 | 广播 | 消费组 |
| 消息ID | 无 | 无 | 有序ID |
| 适用场景 | 简单队列 | 实时推送 | 专业队列 |
延迟队列
ZSet实现
Bash
# 添加延迟任务(执行时间为分数)
ZADD delay:queue 1700001000 "task:1001"
# 获取到期任务
ZRANGEBYSCORE delay:queue 0 {current_time}
# 移除已处理任务
ZREM delay:queue "task:1001"
延迟队列循环
Bash
def delay_queue_worker():
while True:
# 获取到期任务
current_time = int(time.time())
tasks = redis.zrangebyscore("delay:queue", 0, current_time)
for task in tasks:
# 尝试获取任务(防止重复处理)
if redis.zrem("delay:queue", task):
# 处理任务
process(task)
# 等待下一条任务
next_task = redis.zrange("delay:queue", 0, 0, withscores=True)
if next_task:
next_time = next_task[0][1]
sleep_time = max(0, next_time - current_time)
sleep(min(sleep_time, 1))
else:
sleep(1)
队列应用场景
1. 异步任务处理
Bash
# 用户注册后异步发送邮件
RPUSH queue:emails "email_data"
# 后台消费者处理
BLPOP queue:emails 0
send_email(data)
2. 订单处理
text
# 订单入队
RPUSH queue:orders "order:1001"
# 订单消费者处理
BRPOPLPUSH queue:orders queue:orders:backup 30
process_order(order)
LREM queue:orders:backup 1 order
3. 日志收集
text
# 日志入队
RPUSH queue:logs "log_entry"
# 日志聚合消费者
BLPOP queue:logs 0
aggregate_log(log)
性能与限制
Redis队列优势
text
- 实现简单
- 无需额外组件
- 低延迟
- 适合中小规模
Redis队列限制
text
- 不支持复杂消息路由
- 不支持事务消息
- 消息堆积影响性能
- 不适合大规模消息系统
规模建议
text
- 日消息量 < 10万:Redis队列足够
- 日消息量 > 100万:考虑专业MQ(RabbitMQ/Kafka)
要点总结
- List实现简单队列:RPUSH入队,BLPOP阻塞出队
- RPOPLPUSH实现可靠队列,备份队列保证消息不丢失
- Pub/Sub适合实时推送,不持久化,订阅者离线丢失
- Stream(Redis 5.0+)专业消息队列,支持持久化、消费组、消息确认
- 延迟队列用ZSet实现,执行时间为分数
- XGROUP/XREADGROUP/XACK实现消费组和消息确认
- 日消息量小用Redis队列,大量消息用专业MQ
- 监控队列长度,积压告警
- 异步任务、订单处理、日志收集适合队列场景
📝 发现内容有误?点击此处直接编辑