消息分发策略
RabbitMQ 工作队列支持两种消息分发策略:轮询分发和公平分发,下面梳理两者的区别与实现方式。
Maven 依赖
XML
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
轮询分发(Round-Robin)
默认分发策略,消息按顺序依次投递给每个消费者。
Java
import com.rabbitmq.client.*;
import java.nio.charset.StandardCharsets;
public class RoundRobinConsumerExample {
private static final String QUEUE_NAME = "round_robin_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 不设置 basicQos 限制,使用默认轮询
System.out.println("消费者启动 - 轮询分发模式");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("收到: " + message);
try {
Thread.sleep(1000); // 模拟处理耗时
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
}
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
}
}
}
公平分发(Fair Dispatch)
通过 basicQos 设置预取值,确保空闲消费者优先接收消息。
Java
import com.rabbitmq.client.*;
import java.nio.charset.StandardCharsets;
public class FairDispatchConsumerExample {
private static final String QUEUE_NAME = "fair_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 关键设置:每次只预取一条未确认消息
channel.basicQos(1);
System.out.println("消费者启动 - 公平分发模式");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("收到: " + message);
try {
// 模拟不同处理时长的任务
int sleepTime = message.contains("heavy") ? 3000 : 500;
Thread.sleep(sleepTime);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
System.out.println("完成: " + message);
} catch (Exception e) {
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
}
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
}
}
}
两种策略对比
| 特性 | 轮询分发 | 公平分发 |
|---|---|---|
| 配置 | 无需额外配置 | 需设置 channel.basicQos(1) |
| 分发逻辑 | 按顺序依次分配 | 空闲消费者优先接收 |
| 适用场景 | 任务处理耗时相近 | 任务处理耗时差异大 |
| 性能 | 可能出现负载不均衡 | 自动负载均衡 |
| 消息积压 | 慢消费者可能积压 | 减少慢消费者积压 |
basicQos 参数说明
Java
channel.basicQos(int prefetchSize, int prefetchCount, boolean global)
channel.basicQos(int prefetchCount) // 常用简化版本
| 参数 | 说明 |
|---|---|
| prefetchSize | 消息大小限制(字节),0 表示不限制 |
| prefetchCount | 未确认消息数量上限 |
| global | 是否应用于整个通道,true=通道级别,false=消费者级别 |
注意事项
公平分发必须配合手动确认(autoAck=false)使用,否则 basicQos 限制无效。
prefetchCount 设为 1 时实现严格意义上的空闲优先分发,可根据业务调整更大值。
轮询分发在任务耗时均匀时效率更高,公平分发在任务耗时差异大时表现更好。
basicQos 设置仅影响未确认消息的预取数量,不改变消息在队列中的排序。
要点总结
- 轮询分发是默认策略,消息按顺序均匀分配给消费者。
- 公平分发通过 basicQos(1) 实现,空闲消费者优先接收消息。
- 轮询分发适合耗时相近的任务,公平分发适合耗时差异大的任务。
- basicQos 的 prefetchCount 参数控制未确认消息的上限数量。
- 公平分发必须配合手动确认模式使用才能生效。
📝 发现内容有误?点击此处直接编辑