全部学科
NodeJS全栈
nodejs
Python全栈
python
小程序首页
📅 2026-05-22 8 分钟 ✍️ juanwangdev

手动确认模式

手动确认模式是保障消息可靠消费的核心机制,消费者处理完成后显式通知 RabbitMQ 删除消息。

定义

手动确认(autoAck=false)指消费者收到消息后不会立即从队列删除,而是等待消费者调用 basicAck 显式确认后,RabbitMQ 才将该消息移除。若消费者断开连接未确认,消息将重新入队。

Maven 依赖

XML
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.20.0</version>
</dependency>

配置示例

手动确认消费者

Java
import com.rabbitmq.client.*;

import java.io.IOException;

public class ManualAckConsumer {
    private static final String QUEUE_NAME = "manual_ack_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);

            // autoAck=false 开启手动确认
            boolean autoAck = false;
            channel.basicConsume(QUEUE_NAME, autoAck, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag,
                                          Envelope envelope,
                                          AMQP.BasicProperties properties,
                                          byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("收到消息: " + message);

                    try {
                        // 模拟业务处理
                        processMessage(message);

                        // 业务处理成功,发送确认
                        // deliveryTag: 消息标识, multiple: false 仅确认当前消息
                        channel.basicAck(envelope.getDeliveryTag(), false);
                        System.out.println("消息已确认: " + message);
                    } catch (Exception e) {
                        System.err.println("处理失败: " + e.getMessage());
                        // 处理失败,拒绝消息并重新入队
                        channel.basicNack(envelope.getDeliveryTag(), false, true);
                    }
                }
            });

            System.out.println("手动确认消费者已启动,等待消息...");
            Thread.sleep(Long.MAX_VALUE);
        }
    }

    private static void processMessage(String message) {
        // 实际业务逻辑
        System.out.println("处理消息中...");
    }
}

批量确认示例

Java
import com.rabbitmq.client.*;

import java.io.IOException;

public class BatchAckConsumer {
    private static final String QUEUE_NAME = "batch_ack_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);

            boolean autoAck = false;
            channel.basicConsume(QUEUE_NAME, autoAck, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag,
                                          Envelope envelope,
                                          AMQP.BasicProperties properties,
                                          byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("收到消息: " + message);

                    processMessage(message);

                    // multiple=true 确认所有未确认的小于等于当前 deliveryTag 的消息
                    channel.basicAck(envelope.getDeliveryTag(), true);
                }
            });

            Thread.sleep(Long.MAX_VALUE);
        }
    }

    private static void processMessage(String message) {
        System.out.println("处理消息: " + message);
    }
}

注意事项

忘记调用 basicAck 会导致消息在队列中堆积为 unacked 状态,最终触发 prefetch 限制,消费者无法接收新消息。

basicAck(deliveryTag, multiple=true) 会批量确认所有未确认的消息,需谨慎使用,避免误确认。

若消费者连接断开,未确认的消息会自动重新入队(队列非独占情况下)。

手动确认模式会增加一次网络往返,吞吐略低于自动确认,但可靠性更高。

要点总结

  • 手动确认:channel.basicConsume(queue, false, callback) 中第二个参数设为 false
  • 确认消息:channel.basicAck(deliveryTag, false),multiple=false 仅确认当前消息
  • 批量确认:channel.basicAck(deliveryTag, true) 确认所有未确认消息
  • 必须确保业务处理完成后才调用 ack,否则可能导致消息丢失或重复消费

📝 发现内容有误?点击此处直接编辑

← 上一篇 消息分发策略
下一篇 → 消息拒绝与重新入队
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库