任务处理耗时模拟
通过模拟不同耗时的任务,可以直观验证 RabbitMQ 工作队列的分发效果,下面梳理实现方法。
Maven 依赖
XML
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
生产者 - 发送不同耗时任务
Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class TaskProducerExample {
private static final String QUEUE_NAME = "task_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 发送不同耗时的任务(数字表示模拟处理秒数)
String[] tasks = {"task:1", "task:5", "task:2", "task:8", "task:3", "task:6"};
for (String task : tasks) {
channel.basicPublish("", QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
task.getBytes(StandardCharsets.UTF_8));
System.out.println("已发送: " + task);
}
System.out.println("所有任务已发送至队列");
}
}
}
消费者 - 模拟耗时处理
Java
import com.rabbitmq.client.*;
import java.nio.charset.StandardCharsets;
public class TaskConsumerExample {
private static final String QUEUE_NAME = "task_queue";
private static final String consumerName;
static {
// 通过命令行参数或环境变量区分不同消费者实例
consumerName = System.getProperty("consumer.name", "C1");
}
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 使用公平分发策略
channel.basicQos(1);
System.out.println("[" + consumerName + "] 等待任务...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
int sleepSeconds = parseSleepTime(message);
System.out.println("[" + consumerName + "] 收到: " + message);
System.out.println("[" + consumerName + "] 开始处理,预计耗时: " + sleepSeconds + "秒");
try {
doWork(sleepSeconds);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
System.out.println("[" + consumerName + "] 完成: " + message);
} catch (Exception e) {
System.err.println("[" + consumerName + "] 失败: " + message);
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
}
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
}
}
private static int parseSleepTime(String message) {
try {
String[] parts = message.split(":");
return Integer.parseInt(parts[1]);
} catch (Exception e) {
return 1; // 默认1秒
}
}
private static void doWork(int seconds) throws InterruptedException {
for (int i = 0; i < seconds; i++) {
Thread.sleep(1000);
System.out.print(".");
}
System.out.println();
}
}
运行验证方式
Bash
# 终端1:启动消费者1
java -Dconsumer.name=C1 -cp .:amqp-client-5.20.0.jar TaskConsumerExample
# 终端2:启动消费者2
java -Dconsumer.name=C2 -cp .:amqp-client-5.20.0.jar TaskConsumerExample
# 终端3:启动生产者发送任务
java -cp .:amqp-client-5.20.0.jar TaskProducerExample
观察结果
轮询分发模式下
text
C1 收到: task:1 (耗时1秒)
C2 收到: task:5 (耗时5秒)
C1 收到: task:2 (耗时2秒)
C2 收到: task:8 (耗时8秒) <- C2 被长任务阻塞
C1 收到: task:3 (耗时3秒)
C2 收到: task:6 (耗时6秒) <- C2 继续积压
公平分发模式下
text
C1 收到: task:1 (耗时1秒,快速完成)
C2 收到: task:5 (耗时5秒)
C1 收到: task:2 (耗时2秒,快速完成)
C1 收到: task:3 (耗时3秒,空闲继续) <- C1 多处理任务
C2 收到: task:8 (耗时8秒完成后才接收)
C1 收到: task:6 (耗时6秒) <- C1 承担更多工作
注意事项
验证公平分发效果时需确保 channel.basicQos(1) 已设置,否则退化为轮询分发。
消费者需通过不同进程启动(而非多线程),否则无法体现多消费者分发效果。
任务消息中的耗时数字仅用于模拟,实际场景中可替换为真实业务处理逻辑。
长任务场景建议配合消息超时和重试机制,避免任务无限积压。
要点总结
- 任务耗时模拟通过 Thread.sleep 实现,用于验证不同分发策略效果。
- 轮询分发下消息均匀分配,可能出现慢消费者积压问题。
- 公平分发下空闲消费者优先接收,自动实现负载均衡。
- 验证时需启动多个独立消费者进程,观察任务分配差异。
- 公平分发需配合 basicQos(1) 和手动确认机制才能生效。
📝 发现内容有误?点击此处直接编辑