全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-12 8 分钟 ✍️ juanwangdev

Redis 消息队列

Redis可以实现轻量级消息队列,适合中小规模场景,相比专业MQ更简单易用。

简单队列实现

List队列

Bash
# 生产者:从右侧推入
RPUSH queue:tasks "task_data"

# 消费者:从左侧弹出
LPOP queue:tasks

# 阻塞弹出(等待消息)
BLPOP queue:tasks 30
# 30秒超时,0表示无限等待

多消费者

Bash
# 多消费者从同一队列拉取
# 每个消费者获取不同消息
# 简单负载均衡

队列长度监控

Bash
# 查看队列长度
LLEN queue:tasks

# 监控告警
if LLEN > 10000:
    alert("队列积压")

可靠队列

RPOPLPUSH实现

Bash
# 消费消息时同时移到备份队列
RPOPLPUSH queue:tasks queue:backup

# 处理完成后删除备份
LREM queue:backup 1 "task_data"

# 处理失败时消息仍在备份队列
# 可重新处理

阻塞版本

Bash
# 阻塞获取并移到备份
BRPOPLPUSH queue:tasks queue:backup 30

消息确认机制

Python
def consume_task():
    # 获取消息并移到备份队列
    task = redis.brpoplpush("queue:tasks", "queue:backup", 30)

    if task:
        try:
            # 处理任务
            process(task)
            # 成功后删除备份
            redis.lrem("queue:backup", 1, task)
        except:
            # 处理失败,消息仍在备份队列
            # 可稍后重试
            pass

发布订阅模式

基本使用

Bash
# 发布消息
PUBLISH channel:news "news_content"

# 订阅频道
SUBSCRIBE channel:news

# 模式订阅
PSUBSCRIBE channel:*

# 取消订阅
UNSUBSCRIBE channel:news

多订阅者

Bash
# 多个订阅者同时订阅
# 所有订阅者都收到消息
# 广播模式

缺点

Bash
- 消息不持久化,订阅者离线丢失
- 无消息确认机制
- 不支持消费组
- 适合实时推送而非可靠队列

Stream消息队列

Stream特点(Redis 5.0+)

Bash
- 消息持久化
- 支持消费组
- 消息确认机制
- 消息ID有序
- 支持阻塞读取

基本操作

Bash
# 添加消息
XADD stream:orders * field1 value1 field2 value2
# 返回: 1700000000-0(消息ID)

# 读取消息
XREAD COUNT 10 STREAMS stream:orders 0
# 从ID=0开始读取10条

# 阻塞读取
XREAD COUNT 10 BLOCK 5000 STREAMS stream:orders $
# 等待新消息,5秒超时

消费组

Bash
# 创建消费组
XGROUP CREATE stream:orders group1 0

# 消费者读取
XREADGROUP GROUP group1 consumer1 COUNT 10 STREAMS stream:orders >

# 消息确认
XACK stream:orders group1 message_id

# 查看消费组信息
XINFO GROUPS stream:orders

未处理消息

Python
# 查看待处理消息
XPENDING stream:orders group1

# 消息详情
XPENDING stream:orders group1 - + 10

# 转移消息给其他消费者
XCLAIM stream:orders group1 consumer2 0 message_id

消息队列对比

特性ListPub/SubStream
持久化支持不支持支持
消息确认手动实现不支持支持
消费组不支持不支持支持
多消费者负载均衡广播消费组
消息ID有序ID
适用场景简单队列实时推送专业队列

延迟队列

ZSet实现

Bash
# 添加延迟任务(执行时间为分数)
ZADD delay:queue 1700001000 "task:1001"

# 获取到期任务
ZRANGEBYSCORE delay:queue 0 {current_time}

# 移除已处理任务
ZREM delay:queue "task:1001"

延迟队列循环

Bash
def delay_queue_worker():
    while True:
        # 获取到期任务
        current_time = int(time.time())
        tasks = redis.zrangebyscore("delay:queue", 0, current_time)

        for task in tasks:
            # 尝试获取任务(防止重复处理)
            if redis.zrem("delay:queue", task):
                # 处理任务
                process(task)

        # 等待下一条任务
        next_task = redis.zrange("delay:queue", 0, 0, withscores=True)
        if next_task:
            next_time = next_task[0][1]
            sleep_time = max(0, next_time - current_time)
            sleep(min(sleep_time, 1))
        else:
            sleep(1)

队列应用场景

1. 异步任务处理

Bash
# 用户注册后异步发送邮件
RPUSH queue:emails "email_data"

# 后台消费者处理
BLPOP queue:emails 0
send_email(data)

2. 订单处理

text
# 订单入队
RPUSH queue:orders "order:1001"

# 订单消费者处理
BRPOPLPUSH queue:orders queue:orders:backup 30
process_order(order)
LREM queue:orders:backup 1 order

3. 日志收集

text
# 日志入队
RPUSH queue:logs "log_entry"

# 日志聚合消费者
BLPOP queue:logs 0
aggregate_log(log)

性能与限制

Redis队列优势

text
- 实现简单
- 无需额外组件
- 低延迟
- 适合中小规模

Redis队列限制

text
- 不支持复杂消息路由
- 不支持事务消息
- 消息堆积影响性能
- 不适合大规模消息系统

规模建议

text
- 日消息量 < 10万:Redis队列足够
- 日消息量 > 100万:考虑专业MQ(RabbitMQ/Kafka)

要点总结

  • List实现简单队列:RPUSH入队,BLPOP阻塞出队
  • RPOPLPUSH实现可靠队列,备份队列保证消息不丢失
  • Pub/Sub适合实时推送,不持久化,订阅者离线丢失
  • Stream(Redis 5.0+)专业消息队列,支持持久化、消费组、消息确认
  • 延迟队列用ZSet实现,执行时间为分数
  • XGROUP/XREADGROUP/XACK实现消费组和消息确认
  • 日消息量小用Redis队列,大量消息用专业MQ
  • 监控队列长度,积压告警
  • 异步任务、订单处理、日志收集适合队列场景

📝 发现内容有误?点击此处直接编辑

← 上一篇 Redis 排行榜与社交功能
下一篇 → Redis 缓存应用
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库