灾难恢复演练
RabbitMQ灾难恢复演练用于验证故障预案有效性,确保故障发生时能快速恢复业务并保证数据完整性。
灾难恢复目标
RPO 与 RTO
| 指标 | 说明 | 目标值 |
|---|---|---|
| RPO(恢复点目标) | 允许丢失的最大数据量 | 消息不丢失或最小化 |
| RTO(恢复时间目标) | 从故障到服务恢复的最大时间 | 分钟级或秒级 |
恢复场景
- 单节点故障:集群中单节点宕机,其他节点自动接管
- 多节点故障:超过半数节点宕机,集群不可用,需手动恢复
- 磁盘满故障:磁盘空间耗尽,队列阻塞,需清理后恢复
- 网络分区故障:节点间网络中断,需修复网络或重新选举
灾难恢复流程设计
1. 故障检测与告警
Java
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* RabbitMQ 健康检查与告警示例
*/
public class RabbitMQHealthChecker {
private static final String HOST = "localhost";
private static final int PORT = 5672;
private static final String USERNAME = "admin";
private static final String PASSWORD = "admin123";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setConnectionTimeout(5000);
factory.setHandshakeTimeout(5000);
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 检查连接状态
if (connection.isOpen()) {
System.out.println("[OK] RabbitMQ 连接正常");
// 检查队列状态
String queueName = "health_check_queue";
channel.queueDeclare(queueName, true, false, false, null);
com.rabbitmq.client.AMQP.Queue.DeclareOk ok = channel.queueDeclarePassive(queueName);
System.out.println("[OK] 队列 " + queueName + " 正常,消息数: " + ok.getMessageCount());
}
} catch (IOException | TimeoutException e) {
System.err.println("[ALERT] RabbitMQ 连接异常: " + e.getMessage());
// 触发告警通知
sendAlert("RabbitMQ连接失败", e.getMessage());
}
}
private static void sendAlert(String title, String message) {
// 实际环境中对接企业微信、钉钉、邮件等告警系统
System.out.println("发送告警: " + title + " - " + message);
}
}
2. 故障隔离与切换
网络分区发生时,立即启用隔离策略,防止故障扩散。
Java
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 备用集群切换示例
*/
public class ClusterFailoverExample {
private static final String PRIMARY_HOST = "primary-rmq.example.com";
private static final String BACKUP_HOST = "backup-rmq.example.com";
private static final int PORT = 5672;
private static final String USERNAME = "app_user";
private static final String PASSWORD = "app_pass";
public static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(PRIMARY_HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setConnectionTimeout(3000);
factory.setRequestedHeartbeat(10);
factory.setAutomaticRecoveryEnabled(true);
try {
Connection conn = factory.newConnection();
System.out.println("连接主集群成功: " + PRIMARY_HOST);
return conn;
} catch (Exception e) {
System.err.println("主集群连接失败,切换到备用集群: " + BACKUP_HOST);
return connectToBackup(factory);
}
}
private static Connection connectToBackup(ConnectionFactory factory) throws IOException, TimeoutException {
factory.setHost(BACKUP_HOST);
factory.setConnectionTimeout(5000);
Connection backupConn = factory.newConnection();
System.out.println("连接备用集群成功: " + BACKUP_HOST);
return backupConn;
}
public static void main(String[] args) {
try (Connection conn = getConnection();
Channel channel = conn.createChannel()) {
channel.queueDeclare("failover_queue", true, false, false, null);
String message = "Failover test message";
channel.basicPublish("", "failover_queue", null, message.getBytes());
System.out.println("消息发送成功");
} catch (IOException | TimeoutException e) {
System.err.println("集群切换失败: " + e.getMessage());
}
}
}
3. 数据完整性校验
恢复后必须校验消息完整性,防止数据丢失或重复。
Java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 数据完整性校验示例
*/
public class DataIntegrityChecker {
private final AtomicInteger receivedCount = new AtomicInteger(0);
private final AtomicInteger expectedCount = new AtomicInteger(1000);
private final CountDownLatch latch = new CountDownLatch(1);
public void checkIntegrity() throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin123");
try (Connection conn = factory.newConnection();
Channel channel = conn.createChannel()) {
String queueName = "integrity_check_queue";
channel.queueDeclare(queueName, true, false, false, null);
// 获取队列信息
com.rabbitmq.client.AMQP.Queue.DeclareOk ok = channel.queueDeclarePassive(queueName);
int messageCount = ok.getMessageCount();
System.out.println("队列中待消费消息数: " + messageCount);
// 消费消息并校验
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
String messageId = properties.getMessageId();
long sequenceNumber = properties.getHeaders() != null ?
(long) properties.getHeaders().get("seq_num") : -1;
// 校验消息ID和序号
if (messageId == null || sequenceNumber == -1) {
System.err.println("[WARN] 消息缺少必要元数据: " + envelope.getDeliveryTag());
} else {
receivedCount.incrementAndGet();
System.out.println("[OK] 消息校验通过, seq=" + sequenceNumber);
}
channel.basicAck(envelope.getDeliveryTag(), false);
if (receivedCount.get() >= expectedCount.get()) {
latch.countDown();
}
}
};
channel.basicConsume(queueName, false, consumer);
// 等待校验完成
boolean completed = latch.await(30, TimeUnit.SECONDS);
if (completed) {
System.out.println("[OK] 数据完整性校验完成,接收: " + receivedCount.get() +
"/" + expectedCount.get());
} else {
System.err.println("[ALERT] 数据完整性校验超时,可能丢失消息");
}
}
}
public static void main(String[] args) throws Exception {
new DataIntegrityChecker().checkIntegrity();
}
}
演练执行方案
演练步骤
- 准备阶段:备份当前集群状态,记录基线指标
- 注入故障:模拟目标故障场景(断网、宕机、磁盘满等)
- 观察恢复:记录自动恢复过程与时间
- 手动介入:如自动恢复失败,执行手动恢复预案
- 数据校验:恢复后校验消息完整性与一致性
- 恢复评估:评估RPO/RTO是否达标,输出演练报告
常见故障恢复命令
Bash
# 查看集群状态
rabbitmqctl cluster_status
# 强制重置节点
rabbitmqctl reset
# 重新加入集群
rabbitmqctl join_cluster rabbit@node1
# 启动应用
rabbitmqctl start_app
# 查看队列镜像状态
rabbitmqctl list_queues name policy slave_pids synchronised_slave_pids
演练必须在低峰期执行,避免影响生产业务;每次演练后必须更新恢复预案。
要点总结
- 明确RPO/RTO目标:根据业务容忍度设定合理的恢复指标
- 定期演练:至少每季度执行一次完整演练,验证预案有效性
- 数据完整性优先:恢复后必须校验消息完整性,防止丢失或重复
- 自动化告警与切换:减少人工介入时间,缩短RTO
- 演练后复盘:输出报告,优化预案,持续改进恢复能力
📝 发现内容有误?点击此处直接编辑