前置知识
在开始本教程之前,建议您具备以下基础知识:
- Java 基础语法
- 分布式系统基础概念
- Linux 基本操作
什么是 ZooKeeper?
ZooKeeper 是一个开源的分布式协调服务,由 Apache 软件基金会开发和维护。它主要用于:
- 配置管理:集中管理分布式系统的配置信息
- 命名服务:为分布式系统中的资源提供统一的命名
- 分布式锁:实现分布式系统中的互斥和同步
- 集群管理:监控集群中机器的上下线状态
2025/9/17大约 4 分钟
前置知识
在开始本教程之前,建议您具备以下基础知识:
ZooKeeper 是一个开源的分布式协调服务,由 Apache 软件基金会开发和维护。它主要用于:
前置知识
在学习本教程之前,请确保已经:
使用 Curator 框架实现分布式排他锁:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class DistributedLockDemo {
private final CuratorFramework client;
public DistributedLockDemo(CuratorFramework client) {
this.client = client;
}
public void exclusiveLockDemo() {
InterProcessMutex lock = new InterProcessMutex(client, "/locks/exclusive");
try {
// 获取锁,设置超时时间
if (lock.acquire(10, TimeUnit.SECONDS)) {
log.info("获取锁成功,执行业务逻辑");
// 模拟业务处理
Thread.sleep(1000);
}
} catch (Exception e) {
log.error("获取锁失败", e);
} finally {
try {
lock.release();
log.info("释放锁成功");
} catch (Exception e) {
log.error("释放锁失败", e);
}
}
}
}
前置知识
在学习本教程之前,请确保已经:
ZooKeeper 集群采用主从架构:
前置知识
在学习本教程之前,请确保已经:
// SessionTracker 核心实现
public interface SessionTracker {
long createSession(int sessionTimeout);
void addSession(long sessionId, int sessionTimeout);
void checkSession(long sessionId, Object owner) throws SessionExpiredException;
void setSessionClosing(long sessionId);
void removeSession(long sessionId);
void shutdown();
}
// 会话状态转换
public enum SessionState {
CREATED, // 已创建
CONNECTING, // 连接中
CONNECTED, // 已连接
CLOSED // 已关闭
}
核心流程:
前置知识
在学习本教程之前,请确保已经:
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ConfigCenter {
private final CuratorFramework client;
private final String configPath;
private final Map<String, String> localCache;
public ConfigCenter(CuratorFramework client, String configPath) {
this.client = client;
this.configPath = configPath;
this.localCache = new ConcurrentHashMap<>();
initConfigCenter();
}
private void initConfigCenter() {
try {
// 确保配置根节点存在
if (client.checkExists().forPath(configPath) == null) {
client.create()
.creatingParentsIfNeeded()
.forPath(configPath);
}
// 监听配置变化
PathChildrenCache cache = new PathChildrenCache(client, configPath, true);
cache.start();
cache.getListenable().addListener((client, event) -> {
switch (event.getType()) {
case CHILD_ADDED:
case CHILD_UPDATED:
updateLocalCache(event.getData().getPath(),
new String(event.getData().getData()));
break;
case CHILD_REMOVED:
removeFromLocalCache(event.getData().getPath());
break;
}
});
} catch (Exception e) {
log.error("初始化配置中心失败", e);
}
}
private void updateLocalCache(String path, String data) {
String key = path.substring(configPath.length() + 1);
localCache.put(key, data);
log.info("配置更新:{} = {}", key, data);
}
private void removeFromLocalCache(String path) {
String key = path.substring(configPath.length() + 1);
localCache.remove(key);
log.info("配置删除:{}", key);
}
}