消费者基础接收
消费者负责订阅队列并处理消息,下面梳理基础接收流程与核心 API。
Maven 依赖
XML
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
基础接收流程
Java
import com.rabbitmq.client.*;
import java.nio.charset.StandardCharsets;
import java.io.IOException;
public class BasicConsumerExample {
private static final String QUEUE_NAME = "demo_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 创建消费者
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("收到消息: " + message);
// 手动确认消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
// 开始消费
boolean autoAck = false; // 关闭自动确认
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});
System.out.println("等待接收消息...");
}
}
核心 API 说明
queueDeclare 队列声明
Java
channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
| 参数 | 说明 |
|---|---|
| queue | 队列名称 |
| durable | 是否持久化,true 表示服务器重启后保留 |
| exclusive | 是否排他,true 表示仅当前连接可见 |
| autoDelete | 是否自动删除,true 表示无消费者时删除 |
| arguments | 队列扩展参数,通常为 null |
basicConsume 消费订阅
Java
channel.basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)
| 参数 | 说明 |
|---|---|
| queue | 队列名称 |
| autoAck | 是否自动确认,false 表示需手动 ack |
| deliverCallback | 消息到达时的回调函数 |
| cancelCallback | 消费者被取消时的回调函数 |
消息确认机制
Java
import com.rabbitmq.client.*;
import java.nio.charset.StandardCharsets;
public class AckConsumerExample {
private static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("处理消息: " + message);
try {
// 模拟业务处理
processMessage(message);
// 处理成功,确认消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
// 处理失败,拒绝消息并重新入队
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
}
}
private static void processMessage(String message) {
// 业务逻辑
}
}
注意事项
autoAck 设为 false 时必须手动调用 basicAck 或 basicNack,否则消息会变为 unacked 状态且不会被重新投递。
basicAck 的第二个参数 multiple 为 true 时会确认当前 deliveryTag 之前所有未确认消息。
basicNack 第三个参数 requeue 为 true 时消息会重新入队,为 false 时消息被丢弃或进入死信队列。
消费者回调函数中的异常不会自动 nack 消息,需显式调用 basicNack 处理失败场景。
要点总结
- 消费者接收消息需经历:创建连接 -> 创建信道 -> 声明队列 -> 注册回调 -> 开始消费。
- basicConsume 用于订阅队列,autoAck 决定是否需要手动确认消息。
- 手动确认模式下必须调用 basicAck(成功)或 basicNack(失败)处理消息。
- basicNack 的 requeue 参数控制失败消息是否重新入队。
- 推荐使用手动确认模式保障消息可靠性,避免消息丢失。
📝 发现内容有误?点击此处直接编辑