性能优化与源码分析
2025/8/15大约 4 分钟
Sharding-JDBC 性能优化与源码分析
前置知识
在学习本教程前,请确保您已经:
- 掌握 Sharding-JDBC 的基本使用
- 了解 Java 并发编程
- 熟悉 SQL 优化基础
性能优化
1. 分片策略优化
1.1 分片键选择
// 推荐:使用单一分片键
TableRuleConfiguration orderTableRuleConfig = new TableRuleConfiguration("t_order", "ds${0..1}.t_order${0..1}");
orderTableRuleConfig.setTableShardingStrategyConfig(
new StandardShardingStrategyConfiguration("order_id", new OrderShardingAlgorithm())
);
// 不推荐:使用多分片键,可能影响性能
TableRuleConfiguration orderTableRuleConfig = new TableRuleConfiguration("t_order", "ds${0..1}.t_order${0..1}");
orderTableRuleConfig.setTableShardingStrategyConfig(
new ComplexShardingStrategyConfiguration("order_id,user_id", new ComplexOrderShardingAlgorithm())
);
1.2 分片算法优化
public class OptimizedOrderShardingAlgorithm implements PreciseShardingAlgorithm<Long> {
@Override
public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
// 使用位运算代替取模运算
long orderId = shardingValue.getValue();
int shardingCount = availableTargetNames.size();
int shardingKey = (int) (orderId & (shardingCount - 1));
for (String tableName : availableTargetNames) {
if (tableName.endsWith(String.valueOf(shardingKey))) {
return tableName;
}
}
throw new UnsupportedOperationException();
}
}
2. SQL 优化
2.1 索引优化
-- 推荐:使用分片键作为查询条件
SELECT * FROM t_order WHERE order_id = ? AND user_id = ?;
-- 不推荐:不包含分片键的查询
SELECT * FROM t_order WHERE status = ?;
2.2 批量操作优化
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Transactional
public void batchInsert(List<Order> orders) {
// 按分片键分组
Map<Integer, List<Order>> shardingMap = orders.stream()
.collect(Collectors.groupingBy(order ->
Math.toIntExact(order.getOrderId() % 2)));
// 分组批量插入
shardingMap.forEach((sharding, shardingOrders) -> {
orderMapper.batchInsert(shardingOrders);
});
}
}
3. 连接池优化
spring:
shardingsphere:
datasource:
ds0:
type: com.zaxxer.hikari.HikariDataSource
maximum-pool-size: 50
minimum-idle: 10
idle-timeout: 30000
connection-timeout: 3000
max-lifetime: 1800000
ds1:
# ... 类似配置
源码分析
1. 路由引擎
路由引擎核心源码分析
public final class PreparedStatementRoutingEngine {
private final String sql;
private final ShardingRule shardingRule;
private final ShardingMetaData shardingMetaData;
private final DatabaseType databaseType;
private final List<Object> parameters;
public SQLRouteResult route() {
// 1. SQL解析
SQLStatement sqlStatement = parse();
// 2. 生成路由上下文
RouteContext routeContext = new RouteContext(sqlStatement, parameters);
// 3. 进行路由
RouteResult routeResult = route(routeContext);
// 4. 生成SQL重写上下文
SQLRewriteContext sqlRewriteContext = createSQLRewriteContext(routeContext);
// 5. 返回路由结果
return new SQLRouteResult(sqlRewriteContext, routeResult);
}
private RouteResult route(RouteContext routeContext) {
// 根据不同SQL类型选择路由策略
if (routeContext.getSqlStatement() instanceof DMLStatement) {
return routeForDML(routeContext);
}
if (routeContext.getSqlStatement() instanceof DDLStatement) {
return routeForDDL(routeContext);
}
// ... 其他类型处理
}
}
2. 执行引擎
执行引擎工作流程
public final class ShardingExecuteEngine implements ExecuteEngine {
private final ExecutorService executorService;
@Override
public <T> List<T> execute(Collection<? extends StatementExecuteUnit> units,
Collection<? extends Connection> connections,
StatementExecuteCallback<T> callback) throws SQLException {
// 1. 根据单元类型选择执行策略
ExecutionStrategy strategy = ExecutionStrategyFactory.newInstance(units);
// 2. 执行SQL
return strategy.execute(units, callback);
}
private <T> List<T> executeGroup(Collection<StatementExecuteUnit> units,
StatementExecuteCallback<T> callback) throws SQLException {
// 并行执行每个分片
return units.stream()
.map(unit -> CompletableFuture.supplyAsync(() -> {
try {
return callback.execute(unit);
} catch (SQLException ex) {
throw new CompletionException(ex);
}
}, executorService))
.collect(Collectors.toList())
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
}
3. 结果归并
结果归并处理流程
public final class StreamingMergeEngine implements MergeEngine {
private final ShardingRule shardingRule;
private final List<QueryResult> queryResults;
private final SQLStatement sqlStatement;
private final List<OrderItem> orderItems;
@Override
public MergedResult merge() throws SQLException {
if (1 == queryResults.size()) {
return new IteratorStreamMergedResult(queryResults);
}
Map<String, Integer> labelAndIndexMap = getLabelAndIndexMap(queryResults);
// 根据不同场景选择归并策略
if (isNeedProcessGroupBy()) {
return processGroupBy(labelAndIndexMap);
}
if (isNeedProcessOrderBy()) {
return processOrderBy(labelAndIndexMap);
}
return processDefault(labelAndIndexMap);
}
private MergedResult processOrderBy(Map<String, Integer> labelAndIndexMap) throws SQLException {
List<OrderItem> orderItems = new ArrayList<>(sqlStatement.getOrderByItems());
OrderByStreamMergedResult result = new OrderByStreamMergedResult(queryResults, orderItems, labelAndIndexMap);
return result;
}
}
性能监控
1. 监控指标
@Configuration
public class MetricsConfiguration {
@Bean
public MeterRegistry meterRegistry() {
return new SimpleMeterRegistry();
}
@Bean
public ShardingMetricsAspect shardingMetricsAspect(MeterRegistry meterRegistry) {
return new ShardingMetricsAspect(meterRegistry);
}
}
@Aspect
@Component
public class ShardingMetricsAspect {
private final MeterRegistry meterRegistry;
@Around("execution(* com.example.demo.mapper.*Mapper.*(..))")
public Object recordMetrics(ProceedingJoinPoint joinPoint) throws Throwable {
Timer.Sample sample = Timer.start(meterRegistry);
try {
return joinPoint.proceed();
} finally {
sample.stop(Timer.builder("sharding.sql.execution")
.tag("class", joinPoint.getSignature().getDeclaringTypeName())
.tag("method", joinPoint.getSignature().getName())
.register(meterRegistry));
}
}
}
2. 性能分析
spring:
shardingsphere:
props:
sql.show: true
executor.size: 16
max.connections.size.per.query: 2
最佳实践
性能优化建议
分片策略优化
- 选择合适的分片键
- 使用高效的分片算法
- 避免跨分片查询
SQL优化
- 合理使用索引
- 避免全表扫描
- 批量操作优化
连接池优化
- 合理配置连接池参数
- 监控连接池状态
- 及时释放连接
常见问题
1. 如何解决跨分片查询性能问题?
解决方案:
- 优化分片策略,减少跨分片查询
- 使用绑定表,避免笛卡尔积查询
- 合理设置连接池参数
- 使用异步查询优化
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
public List<Order> findOrders(List<Long> orderIds) {
// 按分片分组查询
Map<Integer, List<Long>> shardingMap = orderIds.stream()
.collect(Collectors.groupingBy(orderId ->
Math.toIntExact(orderId % 2)));
// 并行查询各分片
List<CompletableFuture<List<Order>>> futures = shardingMap.entrySet()
.stream()
.map(entry -> CompletableFuture.supplyAsync(() ->
orderMapper.findByIds(entry.getValue())))
.collect(Collectors.toList());
// 合并结果
return futures.stream()
.map(CompletableFuture::join)
.flatMap(List::stream)
.collect(Collectors.toList());
}
}
2. 如何优化批量操作性能?
优化方案:
- 使用批量接口
- 合理设置批次大小
- 按分片键分组处理
- 使用并行处理
总结
本文深入分析了 Sharding-JDBC 的性能优化和源码实现:
- ✅ 性能优化:分片策略、SQL和连接池优化
- ✅ 源码分析:路由引擎、执行引擎和结果归并
- ✅ 性能监控:监控指标和性能分析
- ✅ 最佳实践:实用的优化建议
下一步学习
- 深入研究分布式事务实现
- 探索更多性能优化技巧
- 实践监控系统搭建
希望这篇文章能帮助您更好地优化 Sharding-JDBC 的性能!如果您有任何问题,欢迎在评论区讨论。