手动确认模式
手动确认模式是保障消息可靠消费的核心机制,消费者处理完成后显式通知 RabbitMQ 删除消息。
定义
手动确认(autoAck=false)指消费者收到消息后不会立即从队列删除,而是等待消费者调用 basicAck 显式确认后,RabbitMQ 才将该消息移除。若消费者断开连接未确认,消息将重新入队。
Maven 依赖
XML
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
配置示例
手动确认消费者
Java
import com.rabbitmq.client.*;
import java.io.IOException;
public class ManualAckConsumer {
private static final String QUEUE_NAME = "manual_ack_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// autoAck=false 开启手动确认
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("收到消息: " + message);
try {
// 模拟业务处理
processMessage(message);
// 业务处理成功,发送确认
// deliveryTag: 消息标识, multiple: false 仅确认当前消息
channel.basicAck(envelope.getDeliveryTag(), false);
System.out.println("消息已确认: " + message);
} catch (Exception e) {
System.err.println("处理失败: " + e.getMessage());
// 处理失败,拒绝消息并重新入队
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
}
});
System.out.println("手动确认消费者已启动,等待消息...");
Thread.sleep(Long.MAX_VALUE);
}
}
private static void processMessage(String message) {
// 实际业务逻辑
System.out.println("处理消息中...");
}
}
批量确认示例
Java
import com.rabbitmq.client.*;
import java.io.IOException;
public class BatchAckConsumer {
private static final String QUEUE_NAME = "batch_ack_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("收到消息: " + message);
processMessage(message);
// multiple=true 确认所有未确认的小于等于当前 deliveryTag 的消息
channel.basicAck(envelope.getDeliveryTag(), true);
}
});
Thread.sleep(Long.MAX_VALUE);
}
}
private static void processMessage(String message) {
System.out.println("处理消息: " + message);
}
}
注意事项
忘记调用
basicAck会导致消息在队列中堆积为 unacked 状态,最终触发 prefetch 限制,消费者无法接收新消息。
basicAck(deliveryTag, multiple=true)会批量确认所有未确认的消息,需谨慎使用,避免误确认。
若消费者连接断开,未确认的消息会自动重新入队(队列非独占情况下)。
手动确认模式会增加一次网络往返,吞吐略低于自动确认,但可靠性更高。
要点总结
- 手动确认:
channel.basicConsume(queue, false, callback)中第二个参数设为false - 确认消息:
channel.basicAck(deliveryTag, false),multiple=false 仅确认当前消息 - 批量确认:
channel.basicAck(deliveryTag, true)确认所有未确认消息 - 必须确保业务处理完成后才调用 ack,否则可能导致消息丢失或重复消费
📝 发现内容有误?点击此处直接编辑