ZooKeeper Curator框架实战
Curator大幅简化ZooKeeper客户端开发。
Curator框架使用
核心优势:
| 功能 | 说明 |
|---|---|
| 简化API | 更友好的接口设计 |
| 自动重连 | 连接断开自动恢复 |
| 重试策略 | 内置多种重试策略 |
| Recipes配方 | 分布式锁、选举等实现 |
创建客户端:
Java
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("host1:2181,host2:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.retryPolicy(retryPolicy)
.build();
client.start();
基本操作:
Java
// 创建节点
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("/path", data);
// 读取数据
byte[] data = client.getData().forPath("/path");
// 修改数据
client.setData().forPath("/path", newData);
// 删除节点
client.delete()
.deletingChildrenIfNeeded()
.forPath("/path");
// 检查节点存在
client.checkExists().forPath("/path");
重试策略:
| 策略 | 说明 |
|---|---|
| ExponentialBackoffRetry | 指数退避重试 |
| RetryNTimes | 固定次数重试 |
| RetryUntilElapsed | 超时时间内重试 |
提示:ExponentialBackoffRetry适合大多数场景。
异常处理策略
异常类型:
| 异常 | 说明 | 处理 |
|---|---|---|
| ConnectionLoss | 连接中断 | 等待重连后重试 |
| SessionExpired | 会话过期 | 重建客户端 |
| NodeExists | 节点已存在 | 检查是否预期 |
| NoNodeException | 节点不存在 | 先创建或跳过 |
ConnectionLoss处理:
Java
// Curator自动重试
RetryPolicy retryPolicy = new RetryNTimes(3, 1000);
// 在重试策略覆盖范围内会自动处理
SessionExpired处理:
Java
// 监听连接状态
client.getConnectionStateListenable().addListener((client, newState) -> {
if (newState == ConnectionState.LOST) {
// 会话过期,需重建临时节点和Watcher
recreateEphemeralNodes();
reRegisterWatchers();
}
});
连接状态:
| 状态 | 说明 |
|---|---|
| CONNECTED | 已连接 |
| SUSPENDED | 连接断开 |
| RECONNECTED | 重连成功 |
| LOST | 会话过期 |
注意:SessionExpired后临时节点已删除,需重新创建。
Recipes配方使用
分布式锁:
Java
InterProcessMutex lock = new InterProcessMutex(client, "/locks/my-lock");
lock.acquire();
try {
// 业务操作
} finally {
lock.release();
}
读写锁:
Java
InterProcessReadWriteLock rwLock =
new InterProcessReadWriteLock(client, "/locks/rw");
rwLock.readLock().acquire(); // 读锁
try {
// 读操作
} finally {
rwLock.readLock().release();
}
rwLock.writeLock().acquire(); // 写锁
try {
// 写操作
} finally {
rwLock.writeLock().release();
}
Leader选举:
Java
LeaderSelectorListener listener = new LeaderSelectorListener() {
public void takeLeadership(CuratorFramework client) {
// 成为Leader后执行
}
};
LeaderSelector selector = new LeaderSelector(client, "/election", listener);
selector.start(); // 参选
selector.close(); // 退出
屏障:
Java
DistributedBarrier barrier = new DistributedBarrier(client, "/barrier");
barrier.waitOnBarrier(); // 等待屏障打开
// 屏障打开后执行
barrier.leaveBarrier(); // 离开屏障
TreeCache持续监听:
Java
TreeCache cache = TreeCache.newBuilder(client, "/path").build();
cache.start();
cache.getListenable().addListener((client, event) -> {
// 持续监听节点变化
if (event.getType() == TreeCacheEvent.Type.NODE_UPDATED) {
// 处理更新
}
});
提示:TreeCache解决Watcher一次性问题,实现持续监听。
要点总结
- Curator简化API,自动重连,内置重试策略
- ExponentialBackoffRetry是推荐重试策略
- ConnectionLoss等待重连,SessionExpired需重建客户端
- ConnectionState.LOST需重建临时节点和Watcher
- Recipes提供分布式锁、选举、屏障配方
- TreeCache实现持续监听,解决Watcher一次性问题
📝 发现内容有误?点击此处直接编辑