提交 bebbbd87 编写于 作者: T terrymanu

refactor merger into ShardingResultSet 12th version, refactor MergeEngine

上级 f32a0065
......@@ -84,7 +84,7 @@ public final class ShardingPreparedStatement extends AbstractPreparedStatementAd
Collection<PreparedStatementUnit> preparedStatementUnits = route();
List<ResultSet> resultSets = new PreparedStatementExecutor(
getShardingConnection().getShardingContext().getExecutorEngine(), getRouteResult().getSqlStatement().getType(), preparedStatementUnits, getParameters()).executeQuery();
result = new ShardingResultSet(resultSets, MergeEngine.getResultSet(resultSets, (SelectStatement) getRouteResult().getSqlStatement()));
result = new ShardingResultSet(resultSets, new MergeEngine(resultSets, (SelectStatement) getRouteResult().getSqlStatement()).merge());
} finally {
clearBatch();
}
......
......@@ -104,7 +104,7 @@ public class ShardingStatement extends AbstractStatementAdapter {
ResultSet result;
try {
List<ResultSet> resultSets = generateExecutor(sql).executeQuery();
result = new ShardingResultSet(resultSets, MergeEngine.getResultSet(resultSets, (SelectStatement) getRouteResult().getSqlStatement()));
result = new ShardingResultSet(resultSets, new MergeEngine(resultSets, (SelectStatement) getRouteResult().getSqlStatement()).merge());
} finally {
setCurrentResultSet(null);
}
......@@ -248,7 +248,7 @@ public class ShardingStatement extends AbstractStatementAdapter {
for (Statement each : routedStatements) {
resultSets.add(each.getResultSet());
}
currentResultSet = new ShardingResultSet(resultSets, MergeEngine.getResultSet(resultSets, (SelectStatement) getRouteResult().getSqlStatement()));
currentResultSet = new ShardingResultSet(resultSets, new MergeEngine(resultSets, (SelectStatement) getRouteResult().getSqlStatement()).merge());
return currentResultSet;
}
}
......@@ -23,51 +23,55 @@ import com.dangdang.ddframe.rdb.sharding.merger.core.stream.IteratorStreamResult
import com.dangdang.ddframe.rdb.sharding.merger.core.stream.OrderByStreamResultSetMerger;
import com.dangdang.ddframe.rdb.sharding.merger.util.ResultSetUtil;
import com.dangdang.ddframe.rdb.sharding.parsing.parser.statement.select.SelectStatement;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
/**
* 分片结果集归并引擎.
*
* @author zhangliang
*/
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
@Slf4j
public final class MergeEngine {
private final List<ResultSet> resultSets;
private final SelectStatement selectStatement;
private final Map<String, Integer> columnLabelIndexMap;
public MergeEngine(final List<ResultSet> resultSets, final SelectStatement selectStatement) throws SQLException {
this.resultSets = resultSets;
this.selectStatement = selectStatement;
columnLabelIndexMap = ResultSetUtil.getColumnLabelIndexMap(resultSets.get(0));
}
/**
* 获取结果集.
* 合并结果集.
*
* @param resultSets 结果集列表
* @param selectStatement SQL语句对象
* @return 结果集包装
* @return 归并完毕后的结果集
* @throws SQLException SQL异常
*/
public static ResultSetMerger getResultSet(final List<ResultSet> resultSets, final SelectStatement selectStatement) throws SQLException {
selectStatement.setIndexForItems(ResultSetUtil.getColumnLabelIndexMap(resultSets.get(0)));
ResultSetMerger result = !selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty()
? buildMemoryResultSet(resultSets, selectStatement) : buildStreamResultSet(resultSets, selectStatement);
return buildDecorateResultSet(result, selectStatement);
public ResultSetMerger merge() throws SQLException {
selectStatement.setIndexForItems(columnLabelIndexMap);
ResultSetMerger result = !selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty() ? buildMemoryResultSet() : buildStreamResultSet();
return buildDecorateResultSet(result);
}
private static ResultSetMerger buildMemoryResultSet(final List<ResultSet> resultSets, final SelectStatement selectStatement) throws SQLException {
return new GroupByMemoryResultSetMerger(
ResultSetUtil.getColumnLabelIndexMap(resultSets.get(0)), resultSets, selectStatement.getGroupByItems(), selectStatement.getOrderByItems(), selectStatement.getAggregationSelectItems());
private ResultSetMerger buildMemoryResultSet() throws SQLException {
return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement.getGroupByItems(), selectStatement.getOrderByItems(), selectStatement.getAggregationSelectItems());
}
private static ResultSetMerger buildStreamResultSet(final List<ResultSet> resultSets, final SelectStatement selectStatement) throws SQLException {
private ResultSetMerger buildStreamResultSet() throws SQLException {
if (selectStatement.getGroupByItems().isEmpty() && selectStatement.getOrderByItems().isEmpty()) {
return new IteratorStreamResultSetMerger(resultSets);
}
return new OrderByStreamResultSetMerger(resultSets, selectStatement.getOrderByItems());
}
private static ResultSetMerger buildDecorateResultSet(final ResultSetMerger resultSetMerger, final SelectStatement selectStatement) throws SQLException {
private ResultSetMerger buildDecorateResultSet(final ResultSetMerger resultSetMerger) throws SQLException {
ResultSetMerger result = resultSetMerger;
if (null != selectStatement.getLimit()) {
result = new LimitDecoratorResultSetMerger(result, selectStatement.getLimit());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册