全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-24 12 分钟 ✍️ juanwangdev

ZooKeeper Curator框架实战

Curator大幅简化ZooKeeper客户端开发。

Curator框架使用

核心优势

功能说明
简化API更友好的接口设计
自动重连连接断开自动恢复
重试策略内置多种重试策略
Recipes配方分布式锁、选举等实现

创建客户端

Java
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
    .connectString("host1:2181,host2:2181")
    .sessionTimeoutMs(5000)
    .connectionTimeoutMs(3000)
    .retryPolicy(retryPolicy)
    .build();
client.start();

基本操作

Java
// 创建节点
client.create()
    .creatingParentsIfNeeded()
    .withMode(CreateMode.EPHEMERAL)
    .forPath("/path", data);

// 读取数据
byte[] data = client.getData().forPath("/path");

// 修改数据
client.setData().forPath("/path", newData);

// 删除节点
client.delete()
    .deletingChildrenIfNeeded()
    .forPath("/path");

// 检查节点存在
client.checkExists().forPath("/path");

重试策略

策略说明
ExponentialBackoffRetry指数退避重试
RetryNTimes固定次数重试
RetryUntilElapsed超时时间内重试

提示:ExponentialBackoffRetry适合大多数场景。

异常处理策略

异常类型

异常说明处理
ConnectionLoss连接中断等待重连后重试
SessionExpired会话过期重建客户端
NodeExists节点已存在检查是否预期
NoNodeException节点不存在先创建或跳过

ConnectionLoss处理

Java
// Curator自动重试
RetryPolicy retryPolicy = new RetryNTimes(3, 1000);
// 在重试策略覆盖范围内会自动处理

SessionExpired处理

Java
// 监听连接状态
client.getConnectionStateListenable().addListener((client, newState) -> {
    if (newState == ConnectionState.LOST) {
        // 会话过期,需重建临时节点和Watcher
        recreateEphemeralNodes();
        reRegisterWatchers();
    }
});

连接状态

状态说明
CONNECTED已连接
SUSPENDED连接断开
RECONNECTED重连成功
LOST会话过期

注意:SessionExpired后临时节点已删除,需重新创建。

Recipes配方使用

分布式锁

Java
InterProcessMutex lock = new InterProcessMutex(client, "/locks/my-lock");
lock.acquire();
try {
    // 业务操作
} finally {
    lock.release();
}

读写锁

Java
InterProcessReadWriteLock rwLock = 
    new InterProcessReadWriteLock(client, "/locks/rw");

rwLock.readLock().acquire();  // 读锁
try {
    // 读操作
} finally {
    rwLock.readLock().release();
}

rwLock.writeLock().acquire(); // 写锁
try {
    // 写操作
} finally {
    rwLock.writeLock().release();
}

Leader选举

Java
LeaderSelectorListener listener = new LeaderSelectorListener() {
    public void takeLeadership(CuratorFramework client) {
        // 成为Leader后执行
    }
};

LeaderSelector selector = new LeaderSelector(client, "/election", listener);
selector.start();  // 参选
selector.close();  // 退出

屏障

Java
DistributedBarrier barrier = new DistributedBarrier(client, "/barrier");
barrier.waitOnBarrier();  // 等待屏障打开
// 屏障打开后执行
barrier.leaveBarrier();   // 离开屏障

TreeCache持续监听

Java
TreeCache cache = TreeCache.newBuilder(client, "/path").build();
cache.start();
cache.getListenable().addListener((client, event) -> {
    // 持续监听节点变化
    if (event.getType() == TreeCacheEvent.Type.NODE_UPDATED) {
        // 处理更新
    }
});

提示:TreeCache解决Watcher一次性问题,实现持续监听。

要点总结

  • Curator简化API,自动重连,内置重试策略
  • ExponentialBackoffRetry是推荐重试策略
  • ConnectionLoss等待重连,SessionExpired需重建客户端
  • ConnectionState.LOST需重建临时节点和Watcher
  • Recipes提供分布式锁、选举、屏障配方
  • TreeCache实现持续监听,解决Watcher一次性问题

📝 发现内容有误?点击此处直接编辑

← 上一篇 ZooKeeper监控命令与指标
下一篇 → ZooKeeper Epoch与数据同步机制
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库