消息过滤与拦截
拦截器(Interceptor)在消息发布和消费前后执行自定义逻辑,无需修改业务代码。
拦截器接口
RabbitMQ 提供 PublisherCallbackChannel.Listener 和自定义拦截器接口。
Java
// 发布拦截器
public interface PublishInterceptor {
// 消息发布前
void beforePublish(Channel channel, String exchange, String routingKey,
BasicProperties props, byte[] body);
// 消息发布后
void afterPublish(Channel channel, String exchange, String routingKey,
BasicProperties props, byte[] body);
}
// 消费拦截器
public interface ConsumeInterceptor {
// 消息消费前
void beforeConsume(Envelope envelope, BasicProperties props, byte[] body);
// 消息消费后
void afterConsume(Envelope envelope, BasicProperties props, byte[] body);
}
RabbitMQ Java Client 未内置拦截器框架,需自行实现或使用 Spring AMQP 等上层封装。
实现发布拦截器
通过包装 basicPublish 实现发布拦截。
Java
public class InterceptedChannel {
private final Channel delegate;
private final List<PublishInterceptor> interceptors = new ArrayList<>();
public InterceptedChannel(Channel delegate) {
this.delegate = delegate;
}
public void addInterceptor(PublishInterceptor interceptor) {
interceptors.add(interceptor);
}
public void basicPublish(String exchange, String routingKey,
BasicProperties props, byte[] body) throws Exception {
// 发布前拦截
for (PublishInterceptor interceptor : interceptors) {
interceptor.beforePublish(delegate, exchange, routingKey, props, body);
}
// 执行实际发布
delegate.basicPublish(exchange, routingKey, props, body);
// 发布后拦截
for (PublishInterceptor interceptor : interceptors) {
interceptor.afterPublish(delegate, exchange, routingKey, props, body);
}
}
}
消息过滤示例
在拦截器中实现消息过滤,丢弃不符合规则的消息。
Java
// 过滤拦截器
PublishInterceptor filterInterceptor = new PublishInterceptor() {
@Override
public void beforePublish(Channel channel, String exchange, String routingKey,
BasicProperties props, byte[] body) {
// 过滤空消息
if (body == null || body.length == 0) {
System.out.println("丢弃空消息");
throw new RuntimeException("空消息被拦截");
}
// 过滤超过大小限制的消息(1MB)
if (body.length > 1024 * 1024) {
System.out.println("丢弃超大消息");
throw new RuntimeException("消息超过大小限制");
}
}
@Override
public void afterPublish(Channel channel, String exchange, String routingKey,
BasicProperties props, byte[] body) {
// 记录发布日志
System.out.println("消息已发布: " + routingKey + ", 大小: " + body.length + "B");
}
};
// 使用
InterceptedChannel interceptedChannel = new InterceptedChannel(channel);
interceptedChannel.addInterceptor(filterInterceptor);
interceptedChannel.basicPublish("my_exchange", "test_key", null, "Hello".getBytes());
消息格式转换
在拦截器中实现消息格式转换。
Java
PublishInterceptor transformInterceptor = new PublishInterceptor() {
@Override
public void beforePublish(Channel channel, String exchange, String routingKey,
BasicProperties props, byte[] body) {
try {
// JSON 转 Protobuf(示例)
String json = new String(body, StandardCharsets.UTF_8);
// 假设转换为 Protobuf 格式
byte[] protobuf = json.getBytes(StandardCharsets.UTF_8); // 简化示例
// 修改消息头标记格式
Map<String, Object> headers = new HashMap<>();
if (props.getHeaders() != null) {
headers.putAll(props.getHeaders());
}
headers.put("content-format", "protobuf");
props = new AMQP.BasicProperties.Builder()
.from(props)
.headers(headers)
.build();
} catch (Exception e) {
System.err.println("格式转换失败: " + e.getMessage());
}
}
@Override
public void afterPublish(Channel channel, String exchange, String routingKey,
BasicProperties props, byte[] body) {
// 无需处理
}
};
路由预处理
根据消息内容动态修改路由键。
Java
PublishInterceptor routingInterceptor = new PublishInterceptor() {
@Override
public void beforePublish(Channel channel, String exchange, String routingKey,
BasicProperties props, byte[] body) {
try {
// 根据消息内容中的 order_id 路由到不同队列
String content = new String(body, StandardCharsets.UTF_8);
if (content.contains("\"priority\":\"high\"")) {
routingKey = "high_priority";
} else if (content.contains("\"priority\":\"low\"")) {
routingKey = "low_priority";
}
} catch (Exception e) {
// 使用默认路由键
}
}
@Override
public void afterPublish(Channel channel, String exchange, String routingKey,
BasicProperties props, byte[] body) {
// 无需处理
}
};
拦截器中修改路由键需确保目标交换机有对应绑定,否则消息丢失。
完整拦截器示例
Java
public class InterceptorDemo {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare("intercepted_exchange", "topic", true);
channel.queueDeclare("intercepted_queue", true, false, false, null);
channel.queueBind("intercepted_queue", "intercepted_exchange", "#");
// 包装 Channel
InterceptedChannel ic = new InterceptedChannel(channel);
ic.addInterceptor(new PublishInterceptor() {
@Override
public void beforePublish(Channel ch, String ex, String rk,
BasicProperties props, byte[] body) {
if (body == null || body.length == 0) {
throw new RuntimeException("空消息被拦截");
}
System.out.println("发布前: " + rk);
}
@Override
public void afterPublish(Channel ch, String ex, String rk,
BasicProperties props, byte[] body) {
System.out.println("发布后: " + body.length + "B");
}
});
// 发送消息
ic.basicPublish("intercepted_exchange", "test.key", null,
"拦截器测试消息".getBytes());
System.out.println("消息发送完成");
}
}
}
注意事项
- RabbitMQ Java Client 未内置拦截器框架,需自行包装 Channel 实现
- 拦截器中的异常会中断发布流程,需妥善处理
- 拦截器不应执行阻塞操作,否则会影响消息吞吐
- Spring AMQP 提供完善的拦截器支持,企业项目建议使用上层框架
要点总结
- 拦截器在消息发布和消费前后执行自定义逻辑
- 通过包装
Channel的basicPublish实现发布拦截 - 可在拦截器中实现消息过滤、格式转换和路由预处理
- 拦截器中的异常会中断发布流程,需妥善处理
- 原生 Java Client 需自行实现拦截器,Spring AMQP 提供完善支持
📝 发现内容有误?点击此处直接编辑