ZooKeeper 核心功能详解
2025/8/15大约 4 分钟
ZooKeeper 核心功能详解
前置知识
在学习本教程之前,请确保已经:
- 了解 ZooKeeper 的基本概念
- 掌握 Java 并发编程基础
- 熟悉分布式系统理论
分布式锁实现
1. 排他锁
使用 Curator 框架实现分布式排他锁:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class DistributedLockDemo {
private final CuratorFramework client;
public DistributedLockDemo(CuratorFramework client) {
this.client = client;
}
public void exclusiveLockDemo() {
InterProcessMutex lock = new InterProcessMutex(client, "/locks/exclusive");
try {
// 获取锁,设置超时时间
if (lock.acquire(10, TimeUnit.SECONDS)) {
log.info("获取锁成功,执行业务逻辑");
// 模拟业务处理
Thread.sleep(1000);
}
} catch (Exception e) {
log.error("获取锁失败", e);
} finally {
try {
lock.release();
log.info("释放锁成功");
} catch (Exception e) {
log.error("释放锁失败", e);
}
}
}
}
2. 读写锁
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
public class ReadWriteLockDemo {
private final CuratorFramework client;
public ReadWriteLockDemo(CuratorFramework client) {
this.client = client;
}
public void readWriteLockDemo() {
InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/locks/readwrite");
// 读锁示例
try {
if (lock.readLock().acquire(10, TimeUnit.SECONDS)) {
log.info("获取读锁成功");
// 读取共享资源
}
} catch (Exception e) {
log.error("获取读锁失败", e);
} finally {
try {
lock.readLock().release();
} catch (Exception e) {
log.error("释放读锁失败", e);
}
}
// 写锁示例
try {
if (lock.writeLock().acquire(10, TimeUnit.SECONDS)) {
log.info("获取写锁成功");
// 修改共享资源
}
} catch (Exception e) {
log.error("获取写锁失败", e);
} finally {
try {
lock.writeLock().release();
} catch (Exception e) {
log.error("释放写锁失败", e);
}
}
}
}
配置管理
1. 动态配置
import org.apache.curator.framework.recipes.cache.NodeCache;
import lombok.Data;
@Slf4j
public class ConfigurationManager {
private final CuratorFramework client;
private final String configPath;
private NodeCache nodeCache;
public ConfigurationManager(CuratorFramework client, String configPath) {
this.client = client;
this.configPath = configPath;
}
public void initConfig() throws Exception {
// 确保配置节点存在
if (client.checkExists().forPath(configPath) == null) {
client.create()
.creatingParentsIfNeeded()
.forPath(configPath, "默认配置".getBytes());
}
// 监听配置变化
nodeCache = new NodeCache(client, configPath);
nodeCache.start();
nodeCache.getListenable().addListener(() -> {
String newConfig = new String(nodeCache.getCurrentData().getData());
log.info("配置已更新:{}", newConfig);
// 更新应用配置
updateApplicationConfig(newConfig);
});
}
private void updateApplicationConfig(String config) {
// 实现配置更新逻辑
}
}
2. 配置版本管理
import org.apache.zookeeper.data.Stat;
public class VersionedConfig {
private final CuratorFramework client;
public VersionedConfig(CuratorFramework client) {
this.client = client;
}
public void updateConfig(String path, String data) throws Exception {
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath(path);
try {
client.setData()
.withVersion(stat.getVersion())
.forPath(path, data.getBytes());
log.info("配置更新成功,新版本:{}", stat.getVersion() + 1);
} catch (Exception e) {
log.error("配置更新失败,版本冲突", e);
}
}
}
服务发现
1. 服务注册
import lombok.Data;
@Data
public class ServiceInstance {
private String serviceName;
private String host;
private int port;
private Map<String, String> metadata;
}
public class ServiceRegistry {
private final CuratorFramework client;
private final String basePath;
public ServiceRegistry(CuratorFramework client, String basePath) {
this.client = client;
this.basePath = basePath;
}
public void registerService(ServiceInstance instance) throws Exception {
String path = String.format("%s/%s/%s:%d",
basePath,
instance.getServiceName(),
instance.getHost(),
instance.getPort());
// 创建临时节点
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(path, JsonUtils.toJson(instance).getBytes());
log.info("服务注册成功:{}", path);
}
}
2. 服务发现
public class ServiceDiscovery {
private final CuratorFramework client;
private final String basePath;
private final Map<String, PathChildrenCache> caches = new ConcurrentHashMap<>();
public ServiceDiscovery(CuratorFramework client, String basePath) {
this.client = client;
this.basePath = basePath;
}
public List<ServiceInstance> getServices(String serviceName) throws Exception {
String servicePath = basePath + "/" + serviceName;
List<ServiceInstance> instances = new ArrayList<>();
for (String child : client.getChildren().forPath(servicePath)) {
byte[] bytes = client.getData().forPath(servicePath + "/" + child);
instances.add(JsonUtils.fromJson(new String(bytes), ServiceInstance.class));
}
return instances;
}
public void watchService(String serviceName) throws Exception {
String servicePath = basePath + "/" + serviceName;
PathChildrenCache cache = new PathChildrenCache(client, servicePath, true);
cache.getListenable().addListener((client, event) -> {
switch (event.getType()) {
case CHILD_ADDED:
log.info("服务上线:{}", event.getData().getPath());
break;
case CHILD_REMOVED:
log.info("服务下线:{}", event.getData().getPath());
break;
default:
break;
}
});
cache.start();
caches.put(serviceName, cache);
}
}
最佳实践
性能注意事项
- 批量操作:使用事务批量处理多个操作
- 缓存管理:合理使用本地缓存,避免频繁查询
- 监听器优化:避免注册过多监听器
- 连接复用:使用连接池管理客户端连接
可靠性建议
- 异常重试:实现合理的重试机制
- 会话管理:及时清理无用的会话
- 数据备份:定期备份重要数据
- 监控告警:实现健康检查和异常告警
常见问题
1. 如何处理节点数据过大?
对于大数据量场景,建议:
- 只存储关键数据
- 使用外部存储系统
- 实现数据压缩
// 数据压缩示例
public byte[] compress(String data) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
try (GZIPOutputStream gzip = new GZIPOutputStream(out)) {
gzip.write(data.getBytes());
}
return out.toByteArray();
}
2. 如何实现服务的负载均衡?
使用一致性哈希或随机选择算法:
public class LoadBalancer {
public ServiceInstance select(List<ServiceInstance> instances) {
if (instances.isEmpty()) {
return null;
}
// 随机选择
int index = ThreadLocalRandom.current().nextInt(instances.size());
return instances.get(index);
}
}
总结
本教程详细介绍了 ZooKeeper 的核心功能:
- ✅ 分布式锁:排他锁和读写锁的实现
- ✅ 配置管理:动态配置和版本控制
- ✅ 服务发现:服务注册与发现机制
- ✅ 最佳实践:性能优化和可靠性建议
- ✅ 常见问题:数据存储和负载均衡
下一步学习
- 探索 ZooKeeper 的高级特性
- 学习分布式事务的实现
- 了解 ZooKeeper 的运维管理