异常处理与恢复
网络抖动、Broker重启等异常会导致连接断开,必须实现自动恢复机制。
监听连接关闭事件
RabbitMQ Java Client 提供 ShutdownListener 接口,用于监听 Connection 和 Channel 的关闭事件。
Java
connection.addShutdownListener(cause -> {
System.out.println("连接关闭,原因:" + cause.getMessage());
System.out.println("关闭信号:" + cause.getSignal());
System.out.println("发起方:" + cause.getInitiator());
});
channel.addShutdownListener(cause -> {
System.out.println("Channel关闭,原因:" + cause.getMessage());
});
ShutdownSignalException 包含三个关键字段:
reason:关闭原因描述initiator:发起方(APPLICATION或SOFT_ERROR)signal:信号类型(connection.close、channel.close等)
自动重连机制
通过 ConnectionFactory 配置自动重连参数:
Java
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
// 启用自动恢复
factory.setAutomaticRecoveryEnabled(true);
// 恢复间隔(毫秒)
factory.setNetworkRecoveryInterval(5000);
// 拓扑恢复(交换机、队列、绑定)
factory.setTopologyRecoveryEnabled(true);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
自动恢复默认开启,会重连连接并重建拓扑,但不会恢复未确认消息的确认状态。
手动恢复 Channel 状态
自动恢复不恢复 Channel 的 QoS、消费者标签等运行时状态,需手动处理。
Java
Channel channel = connection.createChannel();
// 注册恢复监听
((Recoverable) channel).addRecoveryListener(new RecoveryListener() {
@Override
public void handleRecovery(Recoverable recoverable) {
System.out.println("恢复完成,重新配置 Channel");
Channel ch = (Channel) recoverable;
try {
// 重新设置 QoS
ch.basicQos(10);
// 重新声明队列和消费者
ch.queueDeclare("task_queue", true, false, false, null);
ch.basicConsume("task_queue", false, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void handleRecoveryStarted(Recoverable recoverable) {
System.out.println("开始恢复");
}
});
// 设置 QoS 和消费者
channel.basicQos(10);
channel.queueDeclare("task_queue", true, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
System.out.println("收到消息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("task_queue", false, consumer);
恢复后 Channel 对象引用不变,但底层连接已重建,需重新配置所有运行时状态。
异常分类与处理策略
不同异常类型对应不同处理策略:
Java
connection.addShutdownListener(cause -> {
if (cause instanceof ShutdownSignalException) {
ShutdownSignalException sig = (ShutdownSignalException) cause;
// Broker主动关闭(如认证失败、权限不足)
if (sig.isHardError() && sig.getProtocolStackTraceElement() != null) {
System.err.println("Broker关闭连接,需人工介入");
return;
}
// 网络异常(可自动恢复)
if (sig.getCause() instanceof IOException) {
System.out.println("网络异常,等待自动重连");
}
// Channel级别异常(如路由失败)
if (!sig.isHardError()) {
System.out.println("Channel异常,可重建Channel");
}
}
});
常见异常类型:
| 异常类型 | 级别 | 处理策略 |
|---|---|---|
CONNECTION_FORCED | 连接级 | 等待自动重连 |
NOT_FOUND | Channel级 | 重新声明队列/交换机 |
ACCESS_REFUSED | Channel级 | 检查权限配置 |
PRECONDITION_FAILED | Channel级 | 检查队列参数 |
注意事项
- 自动恢复仅恢复拓扑结构,不恢复消费者和 QoS 设置
- 恢复过程中发送的消息可能丢失,建议使用发布确认机制
- 频繁重连会消耗资源,需设置合理的重连间隔和上限
- 恢复监听器中不要执行阻塞操作,会影响恢复流程
要点总结
ShutdownListener用于监听连接和 Channel 关闭事件setAutomaticRecoveryEnabled(true)启用自动重连和拓扑恢复RecoveryListener用于恢复 Channel 运行时状态(QoS、消费者等)- 根据异常类型选择处理策略:网络异常自动恢复,业务异常人工介入
- 自动恢复不保证消息确认状态的恢复
📝 发现内容有误?点击此处直接编辑