延迟消息插件
延迟消息插件提供原生延迟交换机类型,消息在指定时间后才投递到队列,简化延迟场景实现。
安装插件
插件需与 RabbitMQ 版本匹配,安装后重启服务生效。
Bash
# 下载插件(以 3.10.x 为例)
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.10.2/rabbitmq_delayed_message_exchange-3.10.2.ez
# 复制到插件目录
cp rabbitmq_delayed_message_exchange-3.10.2.ez /usr/lib/rabbitmq/plugins/
# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 重启 RabbitMQ
systemctl restart rabbitmq-server
# 验证插件状态
rabbitmq-plugins list | grep delayed
插件版本必须与 RabbitMQ 主版本一致,否则无法加载。
延迟交换机类型
延迟交换机类型为 x-delayed-message,支持以下路由策略:
| 路由类型 | x-delayed-type 值 | 说明 |
|---|---|---|
| Direct | direct | 精确匹配路由键 |
| Fanout | fanout | 广播到所有绑定队列 |
| Topic | topic | 通配符匹配路由键 |
| Headers | headers | 基于消息头匹配 |
Java Client 声明延迟交换机
通过 x-delayed-type 参数指定底层路由策略。
Java
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明延迟交换机
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct"); // 底层使用 direct 路由
channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, args);
// 声明队列并绑定
channel.queueDeclare("delayed_queue", true, false, false, null);
channel.queueBind("delayed_queue", "delayed_exchange", "delayed_key");
x-delayed-message是交换机类型,x-delayed-type指定底层路由策略。
发送延迟消息
通过消息头 x-delay 指定延迟时间(毫秒)。
Java
// 发送延迟消息
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.headers(Map.of("x-delay", 5000)) // 延迟 5 秒
.contentType("text/plain")
.build();
channel.basicPublish(
"delayed_exchange", // 交换机
"delayed_key", // 路由键
props, // 消息属性(含延迟时间)
"延迟消息内容".getBytes()
);
System.out.println("延迟消息已发送,5秒后投递");
消息流转过程:
- 消息到达延迟交换机
- 交换机读取
x-delay值,暂存消息 - 延迟时间到达后,消息路由到目标队列
动态延迟时间
每条消息可设置不同的延迟时间。
Java
// 不同延迟时间
int[] delays = {1000, 3000, 5000, 10000};
for (int delay : delays) {
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.headers(Map.of("x-delay", delay))
.build();
channel.basicPublish(
"delayed_exchange",
"delayed_key",
props,
("延迟 " + delay + "ms").getBytes()
);
}
x-delay值为负数或 0 时,消息立即投递,不延迟。
延迟消息与死信队列对比
传统方案使用 TTL + DLX 实现延迟,对比如下:
| 特性 | 延迟插件 | TTL + DLX |
|---|---|---|
| 实现复杂度 | 低 | 高(需多个队列+DLX) |
| 动态延迟 | 支持 | 不支持(队列TTL固定) |
| 消息排序 | 按延迟时间排序 | 按过期时间排序 |
| 插件依赖 | 需安装插件 | 原生支持 |
延迟插件适合动态延迟场景,TTL+DLX 适合固定延迟且无需插件的场景。
注意事项
- 延迟消息存储在内存中,Broker 重启后未到期消息丢失
- 延迟时间最大值受
delayed_message.max_delay配置限制- 延迟交换机不支持
auto-delete,否则消息无法暂存- 大量延迟消息会占用内存,建议设置合理的延迟上限
要点总结
- 延迟插件提供
x-delayed-message交换机类型,需安装后启用 - 声明交换机时通过
x-delayed-type指定底层路由策略 - 发送消息时通过
x-delay消息头设置延迟时间(毫秒) - 每条消息可设置不同延迟时间,支持动态延迟
- 延迟消息暂存内存,Broker 重启丢失,不适合持久化场景
📝 发现内容有误?点击此处直接编辑