全部学科
NodeJS全栈
nodejs
Python全栈
python
小程序首页
📅 2026-05-22 8 分钟 ✍️ juanwangdev

Publisher Confirm 机制

默认情况下,生产者发送消息后无法确认消息是否成功到达交换机。Publisher Confirm 机制提供了消息到达确认的能力,保障消息投递可靠性。

什么是 Publisher Confirm

定义

Publisher Confirm(发布确认)是 RabbitMQ 提供的消息确认机制。生产者开启 Confirm 模式后,每发送一条消息,RabbitMQ 处理完成后会返回一个确认(ack)或否定(nack)响应,告知生产者消息是否成功到达交换机。

三种确认模式

模式特点适用场景
普通 Confirm同步阻塞,每发一条等待确认低吞吐、高可靠场景
批量 Confirm发一批后统一确认中等吞吐场景
异步 Confirm回调非阻塞,持续发送高吞吐、生产环境推荐

开启 Confirm 模式

Java
// Maven 依赖
// <dependency>
//     <groupId>com.rabbitmq</groupId>
//     <artifactId>amqp-client</artifactId>
//     <version>5.20.0</version>
// </dependency>

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

public class PublisherConfirmExample {
    
    private static final String EXCHANGE_NAME = "confirm_exchange";
    private static final String QUEUE_NAME = "confirm_queue";
    private static final String ROUTING_KEY = "confirm.key";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
            
            // 开启 Confirm 模式
            channel.confirmSelect();
            
            // 发送消息
            String message = "Hello Confirm!";
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null,
                    message.getBytes(StandardCharsets.UTF_8));
            
            // 等待确认
            boolean confirmed = channel.waitForConfirms();
            if (confirmed) {
                System.out.println("消息已确认到达交换机");
            } else {
                System.err.println("消息未确认,可能需要重发");
            }
        }
    }
}

三种实现方式

方式一:普通 Confirm(同步)

Java
public class SimpleConfirm {
    
    public static void publish(Channel channel) 
            throws IOException, InterruptedException {
        channel.confirmSelect();
        
        for (int i = 0; i < 100; i++) {
            String message = "Message-" + i;
            channel.basicPublish("exchange", "route", null,
                    message.getBytes(StandardCharsets.UTF_8));
            
            // 每条消息阻塞等待确认
            if (!channel.waitForConfirms()) {
                System.err.println("消息发送失败: " + message);
                // 重发逻辑
            }
        }
    }
}

注意:同步 Confirm 性能较差,每条消息需等待确认后才能发送下一条,不推荐高吞吐场景使用。

方式二:批量 Confirm

Java
public class BatchConfirm {
    
    private static final int BATCH_SIZE = 100;
    
    public static void publish(Channel channel) 
            throws IOException, InterruptedException {
        channel.confirmSelect();
        
        for (int i = 0; i < 1000; i++) {
            String message = "Message-" + i;
            channel.basicPublish("exchange", "route", null,
                    message.getBytes(StandardCharsets.UTF_8));
            
            // 每 BATCH_SIZE 条消息确认一次
            if ((i + 1) % BATCH_SIZE == 0) {
                channel.waitForConfirms();
                System.out.println("批量确认: " + (i + 1) + " 条消息");
            }
        }
    }
}

注意:批量 Confirm 中若某批消息确认失败,无法定位具体失败的消息,需整批重发。

方式三:异步 Confirm(推荐)

Java
public class AsyncConfirm {
    
    // 维护未确认消息的映射表
    private static final ConcurrentNavigableMap<Long, String> outstandingConfirms = 
            new ConcurrentSkipListMap<>();
    
    public static void publish(Channel channel) 
            throws IOException, InterruptedException {
        channel.confirmSelect();
        
        // 注册异步回调
        channel.addConfirmListener(
            // 确认成功回调
            (deliveryTag, multiple) -> {
                if (multiple) {
                    // 批量确认:清除所有小于等于 deliveryTag 的记录
                    outstandingConfirms.headMap(deliveryTag, true).clear();
                } else {
                    // 单条确认
                    outstandingConfirms.remove(deliveryTag);
                }
                System.out.println("消息已确认, deliveryTag: " + deliveryTag);
            },
            // 确认失败回调
            (deliveryTag, multiple) -> {
                if (multiple) {
                    // 批量 nack:获取所有失败消息
                    var failed = outstandingConfirms.headMap(deliveryTag, true);
                    System.err.println("批量消息发送失败: " + failed.values());
                    failed.clear();
                } else {
                    String failedMsg = outstandingConfirms.get(deliveryTag);
                    System.err.println("消息发送失败, 需重发: " + failedMsg);
                    outstandingConfirms.remove(deliveryTag);
                }
            }
        );
        
        // 持续发送消息
        for (int i = 0; i < 1000; i++) {
            String message = "Message-" + i;
            outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
            channel.basicPublish("exchange", "route", null,
                    message.getBytes(StandardCharsets.UTF_8));
        }
        
        // 等待所有消息确认完成
        Thread.sleep(5000);
    }
}

注意事项

  1. Confirm 模式是 Channel 级别的,开启后该 Channel 所有消息都会进入确认流程。
  2. deliveryTag 是 Channel 内自增的序列号,用于标识消息顺序,非全局唯一。
  3. multiple = true 表示批量确认,所有小于等于当前 deliveryTag 的消息均已确认。
  4. Confirm 仅保证消息到达交换机,不保证消息最终被消费或持久化到队列。
  5. 异步 Confirm 需自行维护未确认消息集合,建议使用 ConcurrentSkipListMap 保证线程安全。
  6. 若 Channel 关闭或连接断开,所有未确认消息会丢失,需在生产者侧实现重发机制。

要点总结

  • Publisher Confirm 确保消息成功到达交换机,分为同步、批量、异步三种模式
  • 异步 Confirm 性能最高,生产环境推荐使用 addConfirmListener 回调方式
  • deliveryTag 是 Channel 内自增序号,multiple=true 表示批量确认
  • Confirm 仅保证到达交换机,不保证队列接收或消费者处理
  • 异步模式需维护未确认消息映射表,连接断开时需重发未确认消息

📝 发现内容有误?点击此处直接编辑

← 上一篇 路由策略设计
下一篇 → Return 消息机制
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库