消息队列应用场景
消息队列主要应用于三大核心场景:系统解耦、流量削峰、异步处理。
场景一:系统解耦
问题:订单系统直接调用库存、物流、通知系统,任一子系统故障影响整体流程。
方案:订单系统发送消息到队列,各子系统独立消费,彼此无直接依赖。
Java
订单系统 --> [消息队列] --> 库存系统
--> 物流系统
--> 通知系统
代码示例:
Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class DecoupledProducer {
private static final String EXCHANGE = "order.exchange";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE, "fanout");
String message = "{\"orderId\": 1001, \"action\": \"create\"}";
channel.basicPublish(EXCHANGE, "", null, message.getBytes("UTF-8"));
System.out.println("订单消息已发送,各子系统自行消费");
}
}
}
解耦的核心价值:新增或下线子系统无需修改生产者代码,只需调整队列绑定关系。
场景二:流量削峰
问题:秒杀活动时瞬间请求量超出数据库处理能力,导致系统崩溃。
方案:请求先入消息队列,消费者按数据库承受能力匀速消费。
Java
用户请求 --> [消息队列] --> 消费者(匀速处理) --> 数据库
(堆积缓冲) (按自身速率消费)
代码示例:
text
import com.rabbitmq.client.*;
import java.util.concurrent.CountDownLatch;
public class PeakShavingConsumer {
private static final String QUEUE = "order_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 限制并发消费者数量,实现匀速消费
int consumerCount = 5;
CountDownLatch latch = new CountDownLatch(consumerCount);
for (int i = 0; i < consumerCount; i++) {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.basicQos(1); // 每次只处理一条,未确认不推送
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
String message = new String(delivery.getBody(), "UTF-8");
// 模拟数据库写入
Thread.sleep(200);
System.out.println("处理订单: " + message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
};
channel.basicConsume(QUEUE, false, deliverCallback, consumerTag -> {});
latch.countDown();
}
latch.await();
}
}
basicQos(prefetchCount = 1) 确保消费者不被消息淹没,按自身能力处理。
场景三:异步处理
问题:用户注册后需发送邮件、短信、初始化数据等耗时操作,阻塞主流程。
方案:注册完成后发送消息,耗时操作异步执行,快速返回用户。
text
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class AsyncRegisterProducer {
private static final String QUEUE = "user.register";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE, true, false, false, null);
String message = "{\"userId\": 1024, \"email\": \"user@example.com\"}";
channel.basicPublish("", QUEUE, null, message.getBytes("UTF-8"));
// 主流程立即返回,后续操作由消费者异步处理
System.out.println("用户注册完成,欢迎使用!");
}
}
}
异步处理将串行操作转为并行,主链路耗时从 T1+T2+T3 缩减为 T1。
要点总结
- 解耦:生产者与消费者无直接依赖,系统扩展性大幅提升
- 削峰:消息队列作为缓冲层,消费者按自身能力匀速处理
- 异步处理:耗时操作移出主链路,提升用户响应速度
- 三大场景共同点:将同步调用转为异步传递,降低系统耦合度与响应延迟
📝 发现内容有误?点击此处直接编辑