实战应用与最佳实践
2025/8/15大约 4 分钟
ZooKeeper 实战应用与最佳实践
前置知识
在学习本教程之前,请确保已经:
- 掌握 ZooKeeper 的基本使用
- 了解分布式系统设计模式
- 具备实际项目开发经验
分布式配置中心实现
1. 配置中心设计
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ConfigCenter {
private final CuratorFramework client;
private final String configPath;
private final Map<String, String> localCache;
public ConfigCenter(CuratorFramework client, String configPath) {
this.client = client;
this.configPath = configPath;
this.localCache = new ConcurrentHashMap<>();
initConfigCenter();
}
private void initConfigCenter() {
try {
// 确保配置根节点存在
if (client.checkExists().forPath(configPath) == null) {
client.create()
.creatingParentsIfNeeded()
.forPath(configPath);
}
// 监听配置变化
PathChildrenCache cache = new PathChildrenCache(client, configPath, true);
cache.start();
cache.getListenable().addListener((client, event) -> {
switch (event.getType()) {
case CHILD_ADDED:
case CHILD_UPDATED:
updateLocalCache(event.getData().getPath(),
new String(event.getData().getData()));
break;
case CHILD_REMOVED:
removeFromLocalCache(event.getData().getPath());
break;
}
});
} catch (Exception e) {
log.error("初始化配置中心失败", e);
}
}
private void updateLocalCache(String path, String data) {
String key = path.substring(configPath.length() + 1);
localCache.put(key, data);
log.info("配置更新:{} = {}", key, data);
}
private void removeFromLocalCache(String path) {
String key = path.substring(configPath.length() + 1);
localCache.remove(key);
log.info("配置删除:{}", key);
}
}
2. 配置热更新
import org.springframework.context.ApplicationEventPublisher;
@Slf4j
public class ConfigurationManager {
private final ConfigCenter configCenter;
private final ApplicationEventPublisher eventPublisher;
public void updateConfig(String key, String value) {
try {
// 更新ZooKeeper中的配置
String path = configCenter.getConfigPath() + "/" + key;
client.setData().forPath(path, value.getBytes());
// 发布配置更新事件
eventPublisher.publishEvent(new ConfigChangeEvent(key, value));
} catch (Exception e) {
log.error("更新配置失败:{}", key, e);
throw new RuntimeException("配置更新失败", e);
}
}
}
@Slf4j
@Component
public class ConfigChangeListener {
@EventListener
public void handleConfigChange(ConfigChangeEvent event) {
log.info("接收到配置变更:{} = {}", event.getKey(), event.getValue());
// 处理配置变更
refreshConfig(event);
}
}
分布式任务调度
1. 任务分发器
import lombok.Data;
@Data
public class Task {
private String id;
private String name;
private String cron;
private Map<String, Object> params;
}
@Slf4j
public class TaskDispatcher {
private final CuratorFramework client;
private final String taskPath;
public void dispatchTask(Task task) {
try {
// 创建任务节点
String path = taskPath + "/" + task.getId();
byte[] data = JsonUtils.toJson(task).getBytes();
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT_SEQUENTIAL)
.forPath(path, data);
log.info("任务分发成功:{}", task.getName());
} catch (Exception e) {
log.error("任务分发失败:{}", task.getName(), e);
throw new RuntimeException("任务分发失败", e);
}
}
}
2. 任务执行器
@Slf4j
public class TaskExecutor {
private final CuratorFramework client;
private final String taskPath;
private final ExecutorService executorService;
public void start() {
// 监听任务节点
PathChildrenCache cache = new PathChildrenCache(client, taskPath, true);
cache.start();
cache.getListenable().addListener((client, event) -> {
if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
// 获取任务信息
Task task = JsonUtils.fromJson(
new String(event.getData().getData()), Task.class);
// 提交任务执行
executorService.submit(() -> executeTask(task));
}
});
}
private void executeTask(Task task) {
try {
log.info("开始执行任务:{}", task.getName());
// 执行任务逻辑
// 更新任务状态
updateTaskStatus(task.getId(), "COMPLETED");
} catch (Exception e) {
log.error("任务执行失败:{}", task.getName(), e);
updateTaskStatus(task.getId(), "FAILED");
}
}
}
分布式限流实现
1. 令牌桶限流器
@Slf4j
public class DistributedRateLimiter {
private final CuratorFramework client;
private final String limitPath;
private final int rate;
private final int capacity;
public boolean tryAcquire() {
try {
InterProcessMutex lock = new InterProcessMutex(client, limitPath + "/lock");
try {
if (lock.acquire(100, TimeUnit.MILLISECONDS)) {
// 获取当前令牌数
byte[] data = client.getData().forPath(limitPath);
TokenBucket bucket = JsonUtils.fromJson(
new String(data), TokenBucket.class);
// 检查和更新令牌
if (bucket.getTokens() > 0) {
bucket.setTokens(bucket.getTokens() - 1);
client.setData().forPath(limitPath,
JsonUtils.toJson(bucket).getBytes());
return true;
}
return false;
}
} finally {
lock.release();
}
} catch (Exception e) {
log.error("获取令牌失败", e);
return false;
}
return false;
}
}
2. 分布式计数器
@Slf4j
public class DistributedCounter {
private final CuratorFramework client;
private final String counterPath;
private final DistributedAtomicLong counter;
public DistributedCounter(CuratorFramework client, String counterPath) {
this.client = client;
this.counterPath = counterPath;
this.counter = new DistributedAtomicLong(client, counterPath,
new RetryNTimes(3, 1000));
}
public long increment() throws Exception {
AtomicValue<Long> value = counter.increment();
if (value.succeeded()) {
return value.postValue();
}
throw new RuntimeException("增加计数失败");
}
public long decrement() throws Exception {
AtomicValue<Long> value = counter.decrement();
if (value.succeeded()) {
return value.postValue();
}
throw new RuntimeException("减少计数失败");
}
}
最佳实践总结
1. 异常处理
异常处理建议
- 重试机制:实现合理的重试策略
- 降级处理:准备服务降级方案
- 异常恢复:实现故障自动恢复
- 日志记录:详细记录异常信息
@Slf4j
public class ZkExceptionHandler {
private final CuratorFramework client;
public <T> T executeWithRetry(ZkOperation<T> operation) {
int retryCount = 0;
while (retryCount < 3) {
try {
return operation.execute();
} catch (Exception e) {
log.warn("操作失败,重试第{}次", retryCount + 1, e);
retryCount++;
if (retryCount == 3) {
throw new RuntimeException("操作最终失败", e);
}
try {
Thread.sleep(1000 * retryCount);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
throw new RuntimeException("重试次数超限");
}
}
2. 性能优化
性能优化建议
- 本地缓存:合理使用本地缓存
- 批量操作:使用事务批量处理
- 连接复用:使用连接池管理连接
- 数据压缩:压缩节点数据
@Slf4j
public class ZkOptimizer {
private final CuratorFramework client;
public void batchOperation(List<Operation> operations) {
CuratorTransaction transaction = client.inTransaction();
try {
for (Operation op : operations) {
switch (op.getType()) {
case CREATE:
transaction.create().forPath(op.getPath(), op.getData());
break;
case UPDATE:
transaction.setData().forPath(op.getPath(), op.getData());
break;
case DELETE:
transaction.delete().forPath(op.getPath());
break;
}
}
transaction.commit();
} catch (Exception e) {
log.error("批量操作失败", e);
throw new RuntimeException("批量操作失败", e);
}
}
}
总结
本教程详细介绍了 ZooKeeper 的实战应用和最佳实践:
- ✅ 配置中心:动态配置管理实现
- ✅ 任务调度:分布式任务处理
- ✅ 限流实现:分布式限流方案
- ✅ 异常处理:容错和重试机制
- ✅ 性能优化:实践优化建议
下一步学习
- 探索更多实战场景
- 深入研究性能优化
- 实践大规模应用方案