全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-12 10 分钟 ✍️ juanwangdev

Redis Stream是Redis 5.0引入的消息队列数据结构,支持消息持久化、消费组、消息确认机制,是Redis专业消息队列的实现。

Stream原理

设计特点

Bash
Stream特点:
- 消息持久化存储
- 每条消息有唯一ID(时间戳-序号)
- 支持消费组(Consumer Group)
- 支持消息确认(ACK)
- 支持阻塞读取
- 消息有序不可修改

消息ID结构

Bash
消息ID格式:<milliseconds>-<sequence_number>

示例:
1700000000-0  # 时间戳1700000000,序号0
1700000000-1  # 同一时间戳的第2条消息
1700000001-0  # 下一毫秒的消息

特点:
- 时间戳保证大致有序
- 序号保证同毫秒内有序
- 自动生成,用户无需指定

消息存储结构

Bash
Stream内部结构:
- 基于Radix Tree(基数树)
- 高效内存使用
- 支持范围查询
- O(log N)查找效率

消息内容:
- 多个field-value对
- 类似Hash结构
- 每条消息可存储多个字段

Stream核心命令

XADD添加消息

Bash
# 基本语法
XADD key [MAXLEN ~ count] * field value [field value ...]

# 添加消息(自动生成ID)
XADD stream:orders * orderId 1001 status pending
# 返回: "1700000000-0"

# 添加多条消息
XADD stream:logs * level error message "Connection failed" timestamp 1700000000

# 限制Stream长度
XADD stream:orders MAXLEN ~ 1000 * orderId 1002
# 保留约1000条消息,自动删除旧消息

# 手动指定ID
XADD stream:orders 1700000001-0 orderId 1003

XREAD读取消息

Bash
# 基本语法
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

# 从指定ID开始读取
XREAD COUNT 10 STREAMS stream:orders 0
# 从ID=0开始读取10条

# 从最新消息开始读取
XREAD STREAMS stream:orders $
# $表示只获取新消息

# 阻塞读取(等待新消息)
XREAD COUNT 10 BLOCK 5000 STREAMS stream:orders $
# 等待5秒,有新消息返回

# 读取多个Stream
XREAD STREAMS stream:orders stream:logs 0 0

XRANGE/XREVRANGE范围读取

Bash
# 基本语法
XRANGE key start end [COUNT count]
XREVRANGE key end start [COUNT count]

# 获取全部消息
XRANGE stream:orders - +
# - 表示最小ID,+ 表示最大ID

# 获取指定范围
XRANGE stream:orders 1700000000-0 1700000001-0

# 获取指定数量
XRANGE stream:orders - + COUNT 10

# 倒序获取(最新消息)
XREVRANGE stream:orders + - COUNT 10

XLEN获取消息数量

Bash
# 获取Stream长度
XLEN stream:orders
# 返回: 100

XDEL删除消息

Bash
# 删除指定消息
XDEL stream:orders 1700000000-0

# 删除不影响消费组状态
# 消息ID仍被消费组记录

XTRIM裁剪Stream

Bash
# 基本语法
XTRIM key MAXLEN [~] count

# 精确裁剪(保留指定数量)
XTRIM stream:orders MAXLEN 1000

# 近似裁剪(约保留指定数量)
XTRIM stream:orders MAXLEN ~ 1000
# ~ 表示近似,效率更高

消费组命令

XGROUP创建消费组

Bash
# 基本语法
XGROUP CREATE key groupname ID [MKSTREAM]

# 创建消费组(从最新消息开始)
XGROUP CREATE stream:orders group1 $
# $表示从新消息开始消费

# 创建消费组(从头开始消费)
XGROUP CREATE stream:orders group2 0

# 创建Stream和消费组(MKSTREAM)
XGROUP CREATE stream:new group1 0 MKSTREAM

XREADGROUP消费组读取

Bash
# 基本语法
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

# 消费组读取
XREADGROUP GROUP group1 consumer1 COUNT 10 STREAMS stream:orders >
# > 表示获取未分配的新消息

# 阻塞读取
XREADGROUP GROUP group1 consumer1 BLOCK 5000 STREAMS stream:orders >

# 不需要ACK(自动确认)
XREADGROUP GROUP group1 consumer1 NOACK STREAMS stream:orders >

XACK消息确认

Bash
# 确认消息已处理
XACK stream:orders group1 1700000000-0

# 确认多条消息
XACK stream:orders group1 1700000000-0 1700000000-1

XPENDING查看待处理消息

Bash
# 查看待处理消息摘要
XPENDING stream:orders group1

# 查看待处理消息详情
XPENDING stream:orders group1 - + 10

# 查看指定消费者的待处理消息
XPENDING stream:orders group1 - + 10 consumer1

XCLAIM转移消息

Bash
# 基本语法
XCLAIM key group consumer min-idle-time ID [ID ...]

# 将消息转移给其他消费者
XCLAIM stream:orders group1 consumer2 60000 1700000000-0
# 空闲超过60秒的消息转移给consumer2

XINFO查看信息

Bash
# 查看Stream信息
XINFO STREAM stream:orders

# 查看消费组信息
XINFO GROUPS stream:orders

# 查看消费者信息
XINFO CONSUMERS stream:orders group1

命令速查表

命令说明示例
XADD添加消息XADD k * f v
XREAD读取消息XREAD STREAMS k 0
XRANGE范围读取XRANGE k - +
XLEN消息数量XLEN k
XDEL删除消息XDEL k id
XTRIM裁剪StreamXTRIM k MAXLEN ~1000
XGROUP创建组XGROUP CREATE k g 0
XREADGROUP组消费XREADGROUP GROUP g c ...
XACK确认消息XACK k g id
XPENDING待处理XPENDING k g
XCLAIM转移消息XCLAIM k g c 60000 id
XINFO查看信息XINFO STREAM k

应用场景

1. 消息队列

Bash
# 生产者发送消息
XADD queue:orders * orderId 1001 userId 1000 amount 99.9

# 消费者消费消息
# 创建消费组
XGROUP CREATE queue:orders processors 0 MKSTREAM

# 消费者1
XREADGROUP GROUP processors consumer1 BLOCK 5000 STREAMS queue:orders >
# 处理消息
process_order(message)
# 确认消息
XACK queue:orders processors message_id

2. 事件流

Bash
# 记录系统事件
XADD stream:events * type login userId 1000 timestamp 1700000000
XADD stream:events * type logout userId 1000 timestamp 1700000001

# 分析消费组消费事件
XREADGROUP GROUP analytics consumer1 STREAMS stream:events >

3. 日志收集

Bash
# 应用日志写入Stream
XADD stream:app:logs * level error message "Database connection failed" stacktrace "..."

# 日志聚合服务消费
XREADGROUP GROUP aggregator consumer1 COUNT 100 STREAMS stream:app:logs >

4. 实时数据流

Python
# 实时数据写入
XADD stream:metrics * cpu 75.5 memory 80.2 disk 45.0

# 监控服务读取
XREAD COUNT 1 BLOCK 1000 STREAMS stream:metrics $

5. 活动追踪

Python
# 用户活动记录
XADD stream:activity:user:1000 * action click page "/product/500" timestamp 1700000000

# 分析最近活动
XRANGE stream:activity:user:1000 - + COUNT 50

消费组详解

消费组原理

Bash
消费组机制:
1. 消息分配给消费者
2. 每条消息只分配给一个消费者
3. 消费者处理后确认(ACK)
4. 未确认的消息可被转移

状态:
- 已分配:消息分配给消费者
- 已确认(ACK):处理完成
- 待处理(PENDING):已分配但未确认

消费者状态

Bash
# 查看消费者信息
XINFO CONSUMERS stream:orders group1
# 返回:
# name: consumer1
# pending: 5(待处理消息数)
# idle: 1000(空闲时间ms)

消息流转

Bash
消息状态流转:
新消息 → 分配给消费者 → 消费者处理 → ACK确认
          ↓
       PENDING列表(未确认)
          ↓
       超时后可XCLAIM转移

消息可靠性

ACK机制

text
def consume_message():
    # 获取消息
    message = redis.xreadgroup(
        group="group1",
        consumer="consumer1",
        streams={"stream:orders": ">"}
    )

    try:
        # 处理消息
        process(message)
        # 确认消息
        redis.xack("stream:orders", "group1", message["id"])
    except Exception:
        # 处理失败,消息留在PENDING列表
        # 可稍后重试或转移给其他消费者
        pass

死信处理

text
def check_pending_messages():
    # 检查待处理消息
    pending = redis.xpending("stream:orders", "group1")

    for msg_id in pending:
        # 检查空闲时间
        if idle_time > 60000:  # 超过60秒
            # 转移给其他消费者或入死信队列
            redis.xclaim("stream:orders", "group1", "consumer2", 60000, msg_id)

性能优化

MAXLEN限制长度

text
# 限制Stream长度避免无限增长
XADD stream:orders MAXLEN ~ 10000 * orderId 1001

# 近似裁剪效率更高
# 精确裁剪开销大

消费组优化

text
# 合理消费者数量
# 建议:消费组消费者数 = 处理能力需要

# 批量消费
XREADGROUP GROUP g c COUNT 100 STREAMS s >

定期清理

text
# 清理已确认的旧消息
XTRIM stream:orders MAXLEN ~ 10000

# 清理空闲消费者
XGROUP DELCONSUMER stream:orders group1 consumer_inactive

与其他消息队列对比

特性Redis StreamList队列Pub/Sub专业MQ
持久化支持支持不支持支持
消费组支持不支持不支持支持
ACK支持手动实现不支持支持
消息ID有序ID多种
复杂度
适用中小规模简单队列实时推送大规模

要点总结

  • Stream是Redis 5.0专业消息队列,支持持久化和消费组
  • XADD添加消息,自动生成有序ID(时间戳-序号)
  • XREAD/XRANGE读取消息,支持阻塞读取
  • XGROUP CREATE创建消费组,消费组实现消息分发
  • XREADGROUP消费组读取,>获取未分配的新消息
  • XACK确认消息,XPENDING查看待处理消息
  • XCLAIM转移超时未确认的消息给其他消费者
  • MAXLEN ~ 限制Stream长度,避免无限增长
  • 应用场景:消息队列、事件流、日志收集、实时数据
  • ACK机制保证消息可靠,处理失败可重试
  • 中小规模用Stream,大规模用专业MQ(RabbitMQ/Kafka)

📝 发现内容有误?点击此处直接编辑

← 上一篇 地理空间
下一篇 → AOF持久化
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库