消息丢失排查
消息丢失是 RabbitMQ 最严重的故障。需按链路分段排查,逐步缩小范围。
排查链路
Java
生产者 → 网络 → Exchange → Queue → 消费者
| | | | |
确认 连接 路由 持久化 ACK
机制 状态 规则 模式 机制
第一步:确认生产者端
检查发布确认
未启用 Confirm 模式时,生产者无法感知消息是否到达 Broker。
Java
import com.rabbitmq.client.*;
public class PublisherCheck {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection conn = factory.newConnection();
Channel ch = conn.createChannel()) {
// 开启 Confirm 模式
ch.confirmSelect();
String exchange = "order.exchange";
String routingKey = "order.created";
String message = "{\"orderId\": 1001}";
ch.basicPublish(exchange, routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
// 等待确认
if (ch.waitForConfirms(5000)) {
System.out.println("消息已到达 Broker");
} else {
System.out.println("消息未确认,可能丢失");
}
}
}
}
未收到
basic.ack的消息视为发布失败,可能是网络断开或 Exchange 不存在。
检查 Mandatory 标志
消息路由到 Queue 失败时,mandatory=true 会触发 basic.return 回调。
Bash
ch.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
System.out.println("消息被退回: " + replyText);
System.out.println("路由键: " + routingKey);
// 记录或重试
});
ch.basicPublish("order.exchange", "invalid_key", true,
MessageProperties.PERSISTENT_TEXT_PLAIN, "test".getBytes());
第二步:检查 Broker 路由
验证 Exchange 与 Queue 绑定
Java
# 查看 Exchange 绑定关系
rabbitmqctl list_bindings exchange_name source_name destination_name routing_key
# 查看 Exchange 类型
rabbitmqctl list_exchanges name type durable auto_delete
# 查看队列绑定
rabbitmqctl list_queues name messages consumers
常见路由错误
| 错误类型 | 表现 | 排查命令 |
|---|---|---|
| Exchange 不存在 | 消息直接丢弃 | rabbitmqctl list_exchanges |
| 路由键不匹配 | 消息未路由到 Queue | rabbitmqctl list_bindings |
| Exchange 类型错误 | Fanout 忽略 routingKey | 检查 Exchange 类型 |
| Binding 丢失 | 非持久化绑定重启消失 | 检查 durable 参数 |
第三步:检查队列存储
持久化配置
Bash
// 队列持久化
boolean durable = true;
ch.queueDeclare("order.queue", durable, false, false, null);
// 消息持久化
ch.basicPublish("", "order.queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
仅队列持久化不够,消息也必须标记为
PERSISTENT,否则重启后消息丢失。
检查消息状态
Java
# 查看队列消息数
rabbitmqctl list_queues name messages_ready messages_unacknowledged
# 查看队列详情(含持久化状态)
rabbitmqctl eval 'rabbit_amqqueue:info(rabbit_amqqueue:find({resource, <<"/">>, queue, <<"order.queue">>}), [durable, policy, slave_pids]).'
第四步:检查消费者端
ACK 模式排查
Java
// 手动 ACK 模式
boolean autoAck = false;
ch.basicConsume("order.queue", autoAck, (consumerTag, delivery) -> {
try {
String msg = new String(delivery.getBody());
processMessage(msg);
ch.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
// 拒绝消息,不重新入队
ch.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
// 或重新入队
// ch.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
}, consumerTag -> {});
basicNack的requeue=false时消息直接丢弃,需配合死信队列使用。
常见消费者端丢失场景
| 场景 | 原因 | 解决方案 |
|---|---|---|
| 自动 ACK 处理失败 | 消息消费失败但已 ACK | 改用手动 ACK |
| NACK requeue=false | 消息被拒绝且不重新入队 | 设置 requeue=true 或 DLX |
| 消费者崩溃 | 连接断开,消息重新入队 | 检查心跳与重连机制 |
| prefetch=0 | 一次拉取过多消息积压 | 设置合理 prefetch 值 |
完整排查脚本
text
import com.rabbitmq.client.*;
import java.util.concurrent.CountDownLatch;
public class MessageLossDebugger {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection conn = factory.newConnection();
Channel ch = conn.createChannel()) {
// 1. 开启 Confirm
ch.confirmSelect();
// 2. 添加 Return 监听
ch.addReturnListener((code, text, ex, rk, props, body) -> {
System.err.printf("Return: code=%d, text=%s, key=%s%n", code, text, rk);
});
// 3. 发布消息
String message = "debug-message-" + System.currentTimeMillis();
ch.basicPublish("test.exchange", "test.key", true,
MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
// 4. 等待确认
boolean confirmed = ch.waitForConfirms(5000);
System.out.println("Confirm: " + confirmed);
// 5. 检查队列
AMQP.Queue.DeclareOk ok = ch.queueDeclarePassive("test.queue");
System.out.println("Queue messages: " + ok.getMessageCount());
// 6. 消费验证
CountDownLatch latch = new CountDownLatch(1);
ch.basicConsume("test.queue", false, (tag, delivery) -> {
System.out.println("Received: " + new String(delivery.getBody()));
ch.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
latch.countDown();
}, tag -> {});
latch.await();
}
}
}
注意事项
- 排查消息丢失必须先确认生产者是否收到 Confirm,这是链路起点
- Exchange 不存在或路由键错误是消息"消失"最常见原因
- 持久化需同时设置队列 durable=true 和消息 deliveryMode=2
- 消费者自动 ACK 模式下,消息在投递到消费者时即被确认,处理失败即丢失
- 集群场景需检查镜像队列同步状态,主节点故障可能导致未同步消息丢失
要点总结
- 消息丢失排查遵循"生产者 → 路由 → 存储 → 消费者"四段式链路
- 生产者端必须启用 Confirm + Return 机制感知发布状态
- 路由阶段重点检查 Exchange 存在性、类型和 Binding 关系
- 存储阶段确认队列和消息双重持久化配置
- 消费者端使用手动 ACK + 死信队列避免处理失败导致丢失
📝 发现内容有误?点击此处直接编辑