全部学科
NodeJS全栈
nodejs
Python全栈
python
小程序首页
📅 2026-05-22 10 分钟 ✍️ juanwangdev

灾难恢复演练

RabbitMQ灾难恢复演练用于验证故障预案有效性,确保故障发生时能快速恢复业务并保证数据完整性。

灾难恢复目标

RPO 与 RTO

指标说明目标值
RPO(恢复点目标)允许丢失的最大数据量消息不丢失或最小化
RTO(恢复时间目标)从故障到服务恢复的最大时间分钟级或秒级

恢复场景

  • 单节点故障:集群中单节点宕机,其他节点自动接管
  • 多节点故障:超过半数节点宕机,集群不可用,需手动恢复
  • 磁盘满故障:磁盘空间耗尽,队列阻塞,需清理后恢复
  • 网络分区故障:节点间网络中断,需修复网络或重新选举

灾难恢复流程设计

1. 故障检测与告警

Java
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * RabbitMQ 健康检查与告警示例
 */
public class RabbitMQHealthChecker {

    private static final String HOST = "localhost";
    private static final int PORT = 5672;
    private static final String USERNAME = "admin";
    private static final String PASSWORD = "admin123";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        factory.setConnectionTimeout(5000);
        factory.setHandshakeTimeout(5000);

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            // 检查连接状态
            if (connection.isOpen()) {
                System.out.println("[OK] RabbitMQ 连接正常");
                
                // 检查队列状态
                String queueName = "health_check_queue";
                channel.queueDeclare(queueName, true, false, false, null);
                com.rabbitmq.client.AMQP.Queue.DeclareOk ok = channel.queueDeclarePassive(queueName);
                System.out.println("[OK] 队列 " + queueName + " 正常,消息数: " + ok.getMessageCount());
            }
        } catch (IOException | TimeoutException e) {
            System.err.println("[ALERT] RabbitMQ 连接异常: " + e.getMessage());
            // 触发告警通知
            sendAlert("RabbitMQ连接失败", e.getMessage());
        }
    }

    private static void sendAlert(String title, String message) {
        // 实际环境中对接企业微信、钉钉、邮件等告警系统
        System.out.println("发送告警: " + title + " - " + message);
    }
}

2. 故障隔离与切换

网络分区发生时,立即启用隔离策略,防止故障扩散。

Java
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 备用集群切换示例
 */
public class ClusterFailoverExample {

    private static final String PRIMARY_HOST = "primary-rmq.example.com";
    private static final String BACKUP_HOST = "backup-rmq.example.com";
    private static final int PORT = 5672;
    private static final String USERNAME = "app_user";
    private static final String PASSWORD = "app_pass";

    public static Connection getConnection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(PRIMARY_HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        factory.setConnectionTimeout(3000);
        factory.setRequestedHeartbeat(10);
        factory.setAutomaticRecoveryEnabled(true);

        try {
            Connection conn = factory.newConnection();
            System.out.println("连接主集群成功: " + PRIMARY_HOST);
            return conn;
        } catch (Exception e) {
            System.err.println("主集群连接失败,切换到备用集群: " + BACKUP_HOST);
            return connectToBackup(factory);
        }
    }

    private static Connection connectToBackup(ConnectionFactory factory) throws IOException, TimeoutException {
        factory.setHost(BACKUP_HOST);
        factory.setConnectionTimeout(5000);
        Connection backupConn = factory.newConnection();
        System.out.println("连接备用集群成功: " + BACKUP_HOST);
        return backupConn;
    }

    public static void main(String[] args) {
        try (Connection conn = getConnection();
             Channel channel = conn.createChannel()) {
            
            channel.queueDeclare("failover_queue", true, false, false, null);
            String message = "Failover test message";
            channel.basicPublish("", "failover_queue", null, message.getBytes());
            System.out.println("消息发送成功");
        } catch (IOException | TimeoutException e) {
            System.err.println("集群切换失败: " + e.getMessage());
        }
    }
}

3. 数据完整性校验

恢复后必须校验消息完整性,防止数据丢失或重复。

Java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 数据完整性校验示例
 */
public class DataIntegrityChecker {

    private final AtomicInteger receivedCount = new AtomicInteger(0);
    private final AtomicInteger expectedCount = new AtomicInteger(1000);
    private final CountDownLatch latch = new CountDownLatch(1);

    public void checkIntegrity() throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin123");

        try (Connection conn = factory.newConnection();
             Channel channel = conn.createChannel()) {
            
            String queueName = "integrity_check_queue";
            channel.queueDeclare(queueName, true, false, false, null);
            
            // 获取队列信息
            com.rabbitmq.client.AMQP.Queue.DeclareOk ok = channel.queueDeclarePassive(queueName);
            int messageCount = ok.getMessageCount();
            System.out.println("队列中待消费消息数: " + messageCount);
            
            // 消费消息并校验
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                          AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, StandardCharsets.UTF_8);
                    String messageId = properties.getMessageId();
                    long sequenceNumber = properties.getHeaders() != null ? 
                        (long) properties.getHeaders().get("seq_num") : -1;
                    
                    // 校验消息ID和序号
                    if (messageId == null || sequenceNumber == -1) {
                        System.err.println("[WARN] 消息缺少必要元数据: " + envelope.getDeliveryTag());
                    } else {
                        receivedCount.incrementAndGet();
                        System.out.println("[OK] 消息校验通过, seq=" + sequenceNumber);
                    }
                    
                    channel.basicAck(envelope.getDeliveryTag(), false);
                    
                    if (receivedCount.get() >= expectedCount.get()) {
                        latch.countDown();
                    }
                }
            };
            
            channel.basicConsume(queueName, false, consumer);
            
            // 等待校验完成
            boolean completed = latch.await(30, TimeUnit.SECONDS);
            if (completed) {
                System.out.println("[OK] 数据完整性校验完成,接收: " + receivedCount.get() + 
                    "/" + expectedCount.get());
            } else {
                System.err.println("[ALERT] 数据完整性校验超时,可能丢失消息");
            }
        }
    }

    public static void main(String[] args) throws Exception {
        new DataIntegrityChecker().checkIntegrity();
    }
}

演练执行方案

演练步骤

  1. 准备阶段:备份当前集群状态,记录基线指标
  2. 注入故障:模拟目标故障场景(断网、宕机、磁盘满等)
  3. 观察恢复:记录自动恢复过程与时间
  4. 手动介入:如自动恢复失败,执行手动恢复预案
  5. 数据校验:恢复后校验消息完整性与一致性
  6. 恢复评估:评估RPO/RTO是否达标,输出演练报告

常见故障恢复命令

Bash
# 查看集群状态
rabbitmqctl cluster_status

# 强制重置节点
rabbitmqctl reset

# 重新加入集群
rabbitmqctl join_cluster rabbit@node1

# 启动应用
rabbitmqctl start_app

# 查看队列镜像状态
rabbitmqctl list_queues name policy slave_pids synchronised_slave_pids

演练必须在低峰期执行,避免影响生产业务;每次演练后必须更新恢复预案。

要点总结

  • 明确RPO/RTO目标:根据业务容忍度设定合理的恢复指标
  • 定期演练:至少每季度执行一次完整演练,验证预案有效性
  • 数据完整性优先:恢复后必须校验消息完整性,防止丢失或重复
  • 自动化告警与切换:减少人工介入时间,缩短RTO
  • 演练后复盘:输出报告,优化预案,持续改进恢复能力

📝 发现内容有误?点击此处直接编辑

← 上一篇 数据备份策略
下一篇 → 跨机房容灾部署
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库