端到端可靠性
消息从生产者到消费者的完整链路中,任何环节异常都可能导致消息丢失。组合多种可靠性机制可实现消息端到端不丢失。
消息丢失环节分析
| 环节 | 可能丢失原因 | 防护机制 |
|---|---|---|
| 生产端 → 交换机 | 网络异常、交换机不存在 | Publisher Confirm |
| 交换机 → 队列 | 路由失败、无匹配队列 | mandatory + Return 机制 |
| 队列存储 | RabbitMQ 重启、节点故障 | 消息持久化 |
| 队列 → 消费者 | 消费异常、未确认断开 | 手动确认 + 重试 |
完整方案实现
生产端:Confirm + 持久化
Java
// Maven 依赖
// <dependency>
// <groupId>com.rabbitmq</groupId>
// <artifactId>amqp-client</artifactId>
// <version>5.20.0</version>
// </dependency>
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;
public class EndToEndReliability {
private static final String EXCHANGE_NAME = "reliable_exchange";
private static final String QUEUE_NAME = "reliable_queue";
private static final String ROUTING_KEY = "reliable.key";
private static final String DLX_EXCHANGE = "dlx_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// ========== 1. 声明持久化交换机和队列 ==========
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true); // durable=true
channel.queueDeclare(QUEUE_NAME, true, false, false, null); // durable=true
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// ========== 2. 开启 Confirm 模式 ==========
channel.confirmSelect();
// 维护未确认消息
ConcurrentNavigableMap<Long, String> outstandingConfirms =
new ConcurrentSkipListMap<>();
// 注册异步 Confirm 回调
channel.addConfirmListener(
(deliveryTag, multiple) -> {
if (multiple) {
outstandingConfirms.headMap(deliveryTag, true).clear();
} else {
outstandingConfirms.remove(deliveryTag);
}
System.out.println("[Confirm] 消息已确认, tag: " + deliveryTag);
},
(deliveryTag, multiple) -> {
if (multiple) {
var failed = outstandingConfirms.headMap(deliveryTag, true);
System.err.println("[Confirm] 批量消息未确认: " + failed.values());
failed.clear();
} else {
String msg = outstandingConfirms.get(deliveryTag);
System.err.println("[Confirm] 消息未确认,需重发: " + msg);
outstandingConfirms.remove(deliveryTag);
}
}
);
// 注册 Return 回调
channel.addReturnListener((replyCode, replyText, exchange,
routingKey, properties, body) -> {
System.err.println("[Return] 消息未路由: " + routingKey);
});
// ========== 3. 发送持久化消息 ==========
String message = "Reliable Message";
String messageId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.messageId(messageId)
.deliveryMode(2) // 2 = 持久化, 1 = 非持久化
.build();
long nextSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true, props,
message.getBytes(StandardCharsets.UTF_8));
outstandingConfirms.put(nextSeqNo, message);
System.out.println("消息已发送: " + message + ", messageId: " + messageId);
// 等待确认
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
消费端:手动确认 + 失败重试 + 死信
Java
public class ReliableConsumer {
private static final int MAX_RETRY = 3;
public static void consume() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明死信交换机
channel.exchangeDeclare("dlx_exchange", "direct", true);
channel.queueDeclare("dlx_queue", true, false, false, null);
channel.queueBind("dlx_queue", "dlx_exchange", "dlx.route");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
String messageId = delivery.getProperties().getMessageId();
try {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
// 检查是否重复消费
if (isDuplicate(messageId)) {
System.out.println("重复消息,跳过: " + messageId);
channel.basicAck(deliveryTag, false);
return;
}
// 处理消息
processMessage(message);
// 处理成功,确认消息
markProcessed(messageId);
channel.basicAck(deliveryTag, false);
System.out.println("消息已确认: " + messageId);
} catch (Exception e) {
int retryCount = getRetryCount(delivery);
if (retryCount < MAX_RETRY) {
// 重试:更新重试次数后重新入队
retryCount++;
republishWithRetry(channel, delivery, retryCount);
channel.basicAck(deliveryTag, false);
} else {
// 超过最大重试次数,拒绝消息进入死信
channel.basicNack(deliveryTag, false, false);
System.err.println("消息进入死信队列: " + messageId);
}
}
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
}
private static int getRetryCount(Delivery delivery) {
Object count = delivery.getProperties().getHeaders().get("x-retry-count");
return count != null ? (Integer) count : 0;
}
private static void republishWithRetry(Channel channel, Delivery delivery,
int retryCount) throws IOException {
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.messageId(delivery.getProperties().getMessageId())
.deliveryMode(2)
.headers(java.util.Map.of("x-retry-count", retryCount))
.build();
channel.basicPublish("reliable_exchange", "reliable.key", props, delivery.getBody());
}
private static boolean isDuplicate(String messageId) {
// 查询去重存储
return false;
}
private static void markProcessed(String messageId) {
// 标记已处理
}
private static void processMessage(String message) {
// 业务处理
}
}
队列持久化配置
声明持久化队列与死信
Java
public class DurableQueueSetup {
public static void setup(Channel channel) throws IOException {
// 声明死信交换机
channel.exchangeDeclare("dlx_exchange", "direct", true);
channel.queueDeclare("dlx_queue", true, false, false, null);
channel.queueBind("dlx_queue", "dlx_exchange", "dlx.route");
// 声明主队列,绑定死信交换机
java.util.Map<String, Object> args = new java.util.HashMap<>();
args.put("x-dead-letter-exchange", "dlx_exchange"); // 死信交换机
args.put("x-dead-letter-routing-key", "dlx.route"); // 死信路由键
args.put("x-message-ttl", 60000); // 消息 TTL(可选)
channel.queueDeclare("main_queue", true, false, false, args);
channel.queueBind("main_queue", "reliable_exchange", "reliable.key");
}
}
注意事项
- 消息持久化(deliveryMode=2)需配合交换机和队列的 durable=true,仅消息持久化无效。
- Publisher Confirm 异步模式下需维护未确认消息集合,连接断开时需重发。
- 手动确认后若消费者未确认即断开,消息会自动重新入队。
- 死信队列需提前声明并绑定,消息被 nack(requeue=false) 或 TTL 过期后自动转入。
- 端到端可靠性 = 发布确认 + 消息持久化 + 手动确认 + 死信队列,缺一不可。
- 持久化会降低吞吐量,高吞吐场景可权衡可靠性与性能,选择部分关键消息持久化。
要点总结
- 端到端可靠性需覆盖生产端 → 交换机 → 队列 → 消费端全链路
- 生产端使用 Publisher Confirm 确保消息到达交换机
- 消息设置 deliveryMode=2 实现持久化,防止 RabbitMQ 重启丢失
- 消费端使用手动确认 + 重试机制,失败消息转入死信队列
- 交换机、队列、消息均需设置 durable 属性才能完整持久化
📝 发现内容有误?点击此处直接编辑