预取数量配置
basicQos 用于设置 prefetch count,限制消费者未确认消息的最大数量,防止单个消费者被大量消息压垮。
核心概念
- prefetch count:消费者未 ACK 的消息上限
- 达到上限后,RabbitMQ 不再投递新消息给该消费者
- 消费者 ACK 后,释放配额,允许接收新消息
- 实现负载均衡,避免快速消费者堆积消息、慢消费者空闲
Java 示例:设置预取数量
Java
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class PrefetchCountExample {
private static final String QUEUE = "prefetch.queue";
private static final String EXCHANGE = "prefetch.exchange";
private static final int PREFETCH_COUNT = 10;
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE, com.rabbitmq.client.BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(QUEUE, true, false, false, null);
channel.queueBind(QUEUE, EXCHANGE, "prefetch.key");
// 设置预取数量:同一时刻最多 10 条未确认消息
channel.basicQos(PREFETCH_COUNT);
// 关闭自动确认,启用手动确认
channel.basicConsume(QUEUE, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
System.out.println("收到消息: " + message);
try {
// 模拟业务处理
processMessage(message);
// 业务处理成功,确认消息
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
// 业务处理失败,拒绝消息,不重新入队
channel.basicNack(envelope.getDeliveryTag(), false, false);
}
}
});
System.out.println("消费者已启动,prefetch count: " + PREFETCH_COUNT);
}
}
private static void processMessage(String message) {
// 模拟业务处理耗时
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
QoS 级别说明
Java
// 全局 QoS(作用于整个 Channel)
channel.basicQos(10);
// 非全局 QoS(作用于单个消费者)
channel.basicQos(0, 10, false);
// global=true:作用于整个 Channel 所有消费者
channel.basicQos(0, 10, true);
| 参数 | 说明 |
|---|---|
| prefetchSize | 预取消息总大小(字节),0 表示不限制 |
| prefetchCount | 预取消息数量 |
| global | true=Channel级别,false=Consumer级别 |
生产环境建议
prefetchCount设置为消费者线程数的 1~2 倍,避免过小导致吞吐不足,或过大导致内存压力。
prefetch count 与性能的关系
| prefetch count 值 | 效果 |
|---|---|
| 0 | 不限制,RabbitMQ 全量推送(默认行为) |
| 1 | 严格串行处理,吞吐量最低 |
| 10-50 | 推荐范围,平衡吞吐与负载均衡 |
| 100+ | 高吞吐场景,注意消费者内存压力 |
注意事项
prefetch count必须配合手动确认(autoAck=false)使用,否则无效- 设置为 0 表示不限制预取数量
- Channel 级别和 Consumer 级别的 QoS 可以共存
prefetchSize通常设置为 0(按数量限制),不推荐按字节限制
要点总结
basicQos(prefetchCount)限制消费者未 ACK 消息的上限- 配合手动确认使用,实现负载均衡和流量控制
- 推荐值:消费者线程数的 1~2 倍
global=true作用于 Channel 级别,global=false作用于 Consumer 级别- prefetch count=0 表示不限制,慎用(可能导致消费者内存溢出)
📝 发现内容有误?点击此处直接编辑