Shovel 插件与消息桥接
Shovel 插件将消息从一个队列持续转发到另一个队列(可在不同集群),实现跨集群消息桥接。
安装与启用
Shovel 为官方内置插件,启用即可使用。
Bash
# 启用 Shovel 插件
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management
# 重启服务
systemctl restart rabbitmq-server
# 验证插件状态
rabbitmq-plugins list | grep shovel
rabbitmq_shovel:核心转发引擎
rabbitmq_shovel_management:Management UI 管理界面支持
Shovel 插件会建立持久连接并持续拉取消息,需确保网络稳定。
Shovel 工作原理
Shovel 工作流程:
- 连接到源队列所在 Broker
- 从源队列拉取消息
- 发布到目标队列所在 Broker
- 确认源消息消费完成
Bash
[源集群] [目标集群]
Queue A --Shovel拉取--> Queue B
(确认) (接收)
关键特性:
- 消息确认:目标队列接收后才确认源消息
- 断线重连:网络断开后自动重连并继续
- 消息持久化:支持持久化消息不丢失
静态 Shovel 配置
在配置文件中定义 Shovel,随 Broker 启动自动运行。
Bash
# 编辑 rabbitmq.conf
# Linux: /etc/rabbitmq/rabbitmq.conf
# 定义 Shovel 名称
shovel.my_shovel.uri = amqp://admin:admin@source-host:5672
shovel.my_shovel.source-queue = source_queue
shovel.my_shovel.source-prefetch-count = 100
shovel.my_shovel.destination-uri = amqp://admin:admin@dest-host:5672
shovel.my_shovel.destination-queue = dest_queue
shovel.my_shovel.destination-publish-properties = true
配置参数说明:
| 参数 | 说明 | 示例值 |
|---|---|---|
uri | 源 Broker 连接 | amqp://user:pass@host:5672 |
source-queue | 源队列名称 | source_queue |
source-prefetch-count | 拉取预取数量 | 100 |
destination-uri | 目标 Broker 连接 | amqp://user:pass@host:5672 |
destination-queue | 目标队列名称 | dest_queue |
destination-publish-properties | 保留消息属性 | true |
动态 Shovel 配置
通过 Policy 或 Management API 运行时创建 Shovel。
Bash
# 通过 Policy 创建动态 Shovel
rabbitmqctl set_parameter shovel my_dynamic_shovel \
'{"src-uri":"amqp://source-host:5672","src-queue":"source_queue",\
"dest-uri":"amqp://dest-host:5672","dest-queue":"dest_queue"}'
# 查看 Shovel 列表
rabbitmqctl list_shovels
# 删除 Shovel
rabbitmqctl clear_parameter shovel my_dynamic_shovel
通过 Management API:
Java
curl -u admin:admin -X PUT \
http://localhost:15672/api/parameters/shovel/%2F/my_shovel \
-H "Content-Type: application/json" \
-d '{
"src-uri": "amqp://source-host:5672",
"src-queue": "source_queue",
"dest-uri": "amqp://dest-host:5672",
"dest-queue": "dest_queue"
}'
Java Client 操作 Shovel
通过 HTTP API 管理 Shovel。
text
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.URI;
import java.util.Base64;
public class ShovelManager {
private static final String API_URL = "http://localhost:15672/api/parameters/shovel/%2F/";
private static final String AUTH = "Basic " + Base64.getEncoder()
.encodeToString("admin:admin".getBytes());
private final HttpClient client = HttpClient.newHttpClient();
// 创建 Shovel
public void createShovel(String name, String srcUri, String srcQueue,
String destUri, String destQueue) throws Exception {
String body = String.format(
"{\"src-uri\":\"%s\",\"src-queue\":\"%s\"," +
"\"dest-uri\":\"%s\",\"dest-queue\":\"%s\"}",
srcUri, srcQueue, destUri, destQueue
);
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(API_URL + name))
.header("Content-Type", "application/json")
.header("Authorization", AUTH)
.PUT(HttpRequest.BodyPublishers.ofString(body))
.build();
HttpResponse<String> response = client.send(request,
HttpResponse.BodyHandlers.ofString());
if (response.statusCode() != 204) {
throw new RuntimeException("创建 Shovel 失败: " + response.body());
}
}
// 删除 Shovel
public void deleteShovel(String name) throws Exception {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(API_URL + name))
.header("Authorization", AUTH)
.DELETE()
.build();
client.send(request, HttpResponse.BodyHandlers.ofString());
}
}
Shovel 与 Federation 对比
| 特性 | Shovel | Federation |
|---|---|---|
| 拓扑 | 单向推送 | 双向同步 |
| 连接 | 持续连接 | 按需连接 |
| 适用场景 | 数据迁移、异地备份 | 多活集群 |
| 消息确认 | 目标确认后确认源 | 独立确认 |
| 配置复杂度 | 低 | 高 |
单向消息同步选 Shovel,多活双向同步选 Federation。
注意事项
- Shovel 会持续拉取消息,源队列消息被消费后不再可转发
- 网络不稳定时 Shovel 会重连,期间消息可能重复
- 静态 Shovel 随 Broker 启动,动态 Shovel 需手动创建
- Shovel 不适用于高吞吐实时同步场景,建议使用 Federation
要点总结
- Shovel 插件实现跨集群消息单向同步,从源队列拉取到目标队列
- 支持静态配置(随 Broker 启动)和动态配置(运行时创建)
- 消息在目标队列确认后才确认源消息,保证不丢失
- 通过 Management API 或 rabbitmqctl 管理 Shovel 生命周期
- 单向同步选 Shovel,多活双向同步选 Federation
📝 发现内容有误?点击此处直接编辑