Publisher Confirm 机制
默认情况下,生产者发送消息后无法确认消息是否成功到达交换机。Publisher Confirm 机制提供了消息到达确认的能力,保障消息投递可靠性。
什么是 Publisher Confirm
定义
Publisher Confirm(发布确认)是 RabbitMQ 提供的消息确认机制。生产者开启 Confirm 模式后,每发送一条消息,RabbitMQ 处理完成后会返回一个确认(ack)或否定(nack)响应,告知生产者消息是否成功到达交换机。
三种确认模式
| 模式 | 特点 | 适用场景 |
|---|---|---|
| 普通 Confirm | 同步阻塞,每发一条等待确认 | 低吞吐、高可靠场景 |
| 批量 Confirm | 发一批后统一确认 | 中等吞吐场景 |
| 异步 Confirm | 回调非阻塞,持续发送 | 高吞吐、生产环境推荐 |
开启 Confirm 模式
Java
// Maven 依赖
// <dependency>
// <groupId>com.rabbitmq</groupId>
// <artifactId>amqp-client</artifactId>
// <version>5.20.0</version>
// </dependency>
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
public class PublisherConfirmExample {
private static final String EXCHANGE_NAME = "confirm_exchange";
private static final String QUEUE_NAME = "confirm_queue";
private static final String ROUTING_KEY = "confirm.key";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// 开启 Confirm 模式
channel.confirmSelect();
// 发送消息
String message = "Hello Confirm!";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null,
message.getBytes(StandardCharsets.UTF_8));
// 等待确认
boolean confirmed = channel.waitForConfirms();
if (confirmed) {
System.out.println("消息已确认到达交换机");
} else {
System.err.println("消息未确认,可能需要重发");
}
}
}
}
三种实现方式
方式一:普通 Confirm(同步)
Java
public class SimpleConfirm {
public static void publish(Channel channel)
throws IOException, InterruptedException {
channel.confirmSelect();
for (int i = 0; i < 100; i++) {
String message = "Message-" + i;
channel.basicPublish("exchange", "route", null,
message.getBytes(StandardCharsets.UTF_8));
// 每条消息阻塞等待确认
if (!channel.waitForConfirms()) {
System.err.println("消息发送失败: " + message);
// 重发逻辑
}
}
}
}
注意:同步 Confirm 性能较差,每条消息需等待确认后才能发送下一条,不推荐高吞吐场景使用。
方式二:批量 Confirm
Java
public class BatchConfirm {
private static final int BATCH_SIZE = 100;
public static void publish(Channel channel)
throws IOException, InterruptedException {
channel.confirmSelect();
for (int i = 0; i < 1000; i++) {
String message = "Message-" + i;
channel.basicPublish("exchange", "route", null,
message.getBytes(StandardCharsets.UTF_8));
// 每 BATCH_SIZE 条消息确认一次
if ((i + 1) % BATCH_SIZE == 0) {
channel.waitForConfirms();
System.out.println("批量确认: " + (i + 1) + " 条消息");
}
}
}
}
注意:批量 Confirm 中若某批消息确认失败,无法定位具体失败的消息,需整批重发。
方式三:异步 Confirm(推荐)
Java
public class AsyncConfirm {
// 维护未确认消息的映射表
private static final ConcurrentNavigableMap<Long, String> outstandingConfirms =
new ConcurrentSkipListMap<>();
public static void publish(Channel channel)
throws IOException, InterruptedException {
channel.confirmSelect();
// 注册异步回调
channel.addConfirmListener(
// 确认成功回调
(deliveryTag, multiple) -> {
if (multiple) {
// 批量确认:清除所有小于等于 deliveryTag 的记录
outstandingConfirms.headMap(deliveryTag, true).clear();
} else {
// 单条确认
outstandingConfirms.remove(deliveryTag);
}
System.out.println("消息已确认, deliveryTag: " + deliveryTag);
},
// 确认失败回调
(deliveryTag, multiple) -> {
if (multiple) {
// 批量 nack:获取所有失败消息
var failed = outstandingConfirms.headMap(deliveryTag, true);
System.err.println("批量消息发送失败: " + failed.values());
failed.clear();
} else {
String failedMsg = outstandingConfirms.get(deliveryTag);
System.err.println("消息发送失败, 需重发: " + failedMsg);
outstandingConfirms.remove(deliveryTag);
}
}
);
// 持续发送消息
for (int i = 0; i < 1000; i++) {
String message = "Message-" + i;
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("exchange", "route", null,
message.getBytes(StandardCharsets.UTF_8));
}
// 等待所有消息确认完成
Thread.sleep(5000);
}
}
注意事项
- Confirm 模式是 Channel 级别的,开启后该 Channel 所有消息都会进入确认流程。
deliveryTag是 Channel 内自增的序列号,用于标识消息顺序,非全局唯一。multiple = true表示批量确认,所有小于等于当前deliveryTag的消息均已确认。- Confirm 仅保证消息到达交换机,不保证消息最终被消费或持久化到队列。
- 异步 Confirm 需自行维护未确认消息集合,建议使用
ConcurrentSkipListMap保证线程安全。- 若 Channel 关闭或连接断开,所有未确认消息会丢失,需在生产者侧实现重发机制。
要点总结
- Publisher Confirm 确保消息成功到达交换机,分为同步、批量、异步三种模式
- 异步 Confirm 性能最高,生产环境推荐使用
addConfirmListener回调方式 deliveryTag是 Channel 内自增序号,multiple=true表示批量确认- Confirm 仅保证到达交换机,不保证队列接收或消费者处理
- 异步模式需维护未确认消息映射表,连接断开时需重发未确认消息
📝 发现内容有误?点击此处直接编辑