RabbitMQ 架构概述
RabbitMQ 是基于 AMQP(Advanced Message Queuing Protocol)协议的企业级消息中间件,其核心架构由多个组件协同完成消息路由与投递。
整体架构
XML
生产者
│
├─ 发送消息到 ──┐
▼
┌────────┐ 路由规则 ┌──────────┐
│Exchange│ ─────────────> │ Queue │
│ (交换机)│ │ (队列) │
└────────┘ └──────────┘
│
▼
┌──────────┐
│ Consumer │
│ (消费者) │
└──────────┘
核心组件
| 组件 | 说明 |
|---|---|
| Broker | RabbitMQ 服务实例,接收、存储、转发消息 |
| Exchange | 接收生产者消息,根据路由规则转发到对应队列 |
| Queue | 消息的存储容器,消费者从中拉取消息 |
| Binding | Exchange 与 Queue 之间的绑定关系,定义路由规则 |
| RoutingKey | 生产者发送消息时指定的路由标识 |
| Connection | 客户端与 Broker 之间的 TCP 连接 |
| Channel | 连接内的轻量级虚拟通道,复用 TCP 连接 |
AMQP 协议
RabbitMQ 实现 AMQP 0-9-1 协议,核心概念:
- 消息路由:生产者不直接发送消息到队列,而是发送到 Exchange
- Binding 规则:Exchange 根据 Binding 和 RoutingKey 决定消息投递到哪个队列
- Channel 复用:一个 TCP 连接可创建多个 Channel,减少连接开销
Exchange 类型
| 类型 | 路由规则 |
|---|---|
| Direct | RoutingKey 完全匹配 |
| Fanout | 广播到所有绑定队列 |
| Topic | RoutingKey 模式匹配(* 和 # 通配符) |
| Headers | 根据消息头属性匹配 |
Java 示例:Direct Exchange
Maven 依赖:
Java
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
生产者:
Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class DirectExchangeProducer {
private static final String EXCHANGE = "direct_exchange";
private static final String ROUTING_KEY = "order.create";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明 Direct Exchange
channel.exchangeDeclare(EXCHANGE, "direct");
// 声明队列
channel.queueDeclare("order.queue", true, false, false, null);
// 绑定 Exchange 与 Queue
channel.queueBind("order.queue", EXCHANGE, ROUTING_KEY);
String message = "New order created";
channel.basicPublish(EXCHANGE, ROUTING_KEY, null, message.getBytes("UTF-8"));
System.out.println("消息已发送到 Exchange: " + EXCHANGE +
", RoutingKey: " + ROUTING_KEY);
}
}
}
消费者:
text
import com.rabbitmq.client.*;
public class DirectExchangeConsumer {
private static final String EXCHANGE = "direct_exchange";
private static final String QUEUE = "order.queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE, "direct");
channel.queueDeclare(QUEUE, true, false, false, null);
channel.queueBind(QUEUE, EXCHANGE, "order.create");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("收到消息: " + message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(QUEUE, false, deliverCallback, consumerTag -> {});
System.out.println("消费者已启动...");
}
}
}
注意事项
生产者不直接指定队列,而是发送消息到 Exchange,由 Exchange 根据 Binding 规则路由到对应队列。
Channel 是线程安全的,但应在同一线程内复用,避免跨线程操作。
连接和 Channel 使用完毕后必须关闭,否则会造成资源泄漏。
要点总结
- RabbitMQ 核心架构:生产者 -> Exchange -> Binding -> Queue -> 消费者
- Exchange 负责消息路由,支持 Direct、Fanout、Topic、Headers 四种类型
- Binding 定义 Exchange 到 Queue 的路由规则,RoutingKey 是匹配标识
- Connection 是 TCP 连接,Channel 是轻量级虚拟通道,复用连接资源
- AMQP 协议确保消息路由的标准化与跨平台兼容性
📝 发现内容有误?点击此处直接编辑