提交 04712f8c 编写于 作者: T terrymanu

refactor merger into ShardingResultSet 6th version, merge only for select

上级 d365e254
......@@ -27,6 +27,7 @@ import com.dangdang.ddframe.rdb.sharding.jdbc.core.resultset.ShardingResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.core.ResultSetMerger;
import com.dangdang.ddframe.rdb.sharding.merger.core.MergeEngine;
import com.dangdang.ddframe.rdb.sharding.parsing.parser.context.GeneratedKey;
import com.dangdang.ddframe.rdb.sharding.parsing.parser.statement.select.SelectStatement;
import com.dangdang.ddframe.rdb.sharding.routing.PreparedStatementRoutingEngine;
import com.dangdang.ddframe.rdb.sharding.routing.SQLExecutionUnit;
import com.google.common.base.Optional;
......@@ -85,7 +86,7 @@ public final class ShardingPreparedStatement extends AbstractPreparedStatementAd
// TODO refactor
List<ResultSet> resultSets = new PreparedStatementExecutor(
getShardingConnection().getShardingContext().getExecutorEngine(), getRouteResult().getSqlStatement().getType(), preparedStatementUnits, getParameters()).executeQuery();
Optional<ResultSetMerger> mergeResultSet = MergeEngine.getResultSet(resultSets, getRouteResult().getSqlStatement());
Optional<ResultSetMerger> mergeResultSet = MergeEngine.getResultSet(resultSets, (SelectStatement) getRouteResult().getSqlStatement());
if (mergeResultSet.isPresent()) {
result = new ShardingResultSet(resultSets, mergeResultSet.get());
} else {
......
......@@ -27,6 +27,7 @@ import com.dangdang.ddframe.rdb.sharding.merger.core.ResultSetMerger;
import com.dangdang.ddframe.rdb.sharding.merger.core.MergeEngine;
import com.dangdang.ddframe.rdb.sharding.parsing.parser.context.GeneratedKey;
import com.dangdang.ddframe.rdb.sharding.parsing.parser.statement.insert.InsertStatement;
import com.dangdang.ddframe.rdb.sharding.parsing.parser.statement.select.SelectStatement;
import com.dangdang.ddframe.rdb.sharding.routing.SQLExecutionUnit;
import com.dangdang.ddframe.rdb.sharding.routing.SQLRouteResult;
import com.dangdang.ddframe.rdb.sharding.routing.StatementRoutingEngine;
......@@ -105,7 +106,7 @@ public class ShardingStatement extends AbstractStatementAdapter {
try {
// TODO refactor
List<ResultSet> resultSets = generateExecutor(sql).executeQuery();
Optional<ResultSetMerger> mergeResultSet = MergeEngine.getResultSet(resultSets, getRouteResult().getSqlStatement());
Optional<ResultSetMerger> mergeResultSet = MergeEngine.getResultSet(resultSets, (SelectStatement) getRouteResult().getSqlStatement());
if (mergeResultSet.isPresent()) {
result = new ShardingResultSet(resultSets, mergeResultSet.get());
} else {
......@@ -255,7 +256,7 @@ public class ShardingStatement extends AbstractStatementAdapter {
resultSets.add(each.getResultSet());
}
// TODO refactor
Optional<ResultSetMerger> mergeResultSet = MergeEngine.getResultSet(resultSets, getRouteResult().getSqlStatement());
Optional<ResultSetMerger> mergeResultSet = MergeEngine.getResultSet(resultSets, (SelectStatement) getRouteResult().getSqlStatement());
if (mergeResultSet.isPresent()) {
currentResultSet = new ShardingResultSet(resultSets, mergeResultSet.get());
} else {
......
......@@ -25,7 +25,7 @@ import com.dangdang.ddframe.rdb.sharding.merger.core.stream.OrderByStreamResultS
import com.dangdang.ddframe.rdb.sharding.merger.util.ResultSetUtil;
import com.dangdang.ddframe.rdb.sharding.parsing.parser.context.OrderItem;
import com.dangdang.ddframe.rdb.sharding.parsing.parser.context.selectitem.AggregationSelectItem;
import com.dangdang.ddframe.rdb.sharding.parsing.parser.statement.SQLStatement;
import com.dangdang.ddframe.rdb.sharding.parsing.parser.statement.select.SelectStatement;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import lombok.AccessLevel;
......@@ -51,20 +51,20 @@ public final class MergeEngine {
* 获取结果集.
*
* @param resultSets 结果集列表
* @param sqlStatement SQL语句对象
* @param selectStatement SQL语句对象
* @return 结果集包装
* @throws SQLException SQL异常
*/
public static Optional<ResultSetMerger> getResultSet(final List<ResultSet> resultSets, final SQLStatement sqlStatement) throws SQLException {
return buildResultSet(resultSets, sqlStatement);
public static Optional<ResultSetMerger> getResultSet(final List<ResultSet> resultSets, final SelectStatement selectStatement) throws SQLException {
return buildResultSet(resultSets, selectStatement);
}
private static Optional<ResultSetMerger> buildResultSet(final List<ResultSet> resultSets, final SQLStatement sqlStatement) throws SQLException {
private static Optional<ResultSetMerger> buildResultSet(final List<ResultSet> resultSets, final SelectStatement selectStatement) throws SQLException {
// TODO MOVE TO sqlStatement
Map<String, Integer> columnLabelIndexMap = ResultSetUtil.getColumnLabelIndexMap(resultSets.get(0));
setIndexForAggregationItem(sqlStatement, columnLabelIndexMap);
setIndexForOrderItem(columnLabelIndexMap, sqlStatement.getOrderByItems());
setIndexForOrderItem(columnLabelIndexMap, sqlStatement.getGroupByItems());
setIndexForAggregationItem(selectStatement, columnLabelIndexMap);
setIndexForOrderItem(columnLabelIndexMap, selectStatement.getOrderByItems());
setIndexForOrderItem(columnLabelIndexMap, selectStatement.getGroupByItems());
List<ResultSet> filteredResults = new ArrayList<>(resultSets.size());
......@@ -76,33 +76,33 @@ public final class MergeEngine {
if (filteredResults.isEmpty()) {
return Optional.absent();
}
ResultSetMerger result = !sqlStatement.getGroupByItems().isEmpty() || !sqlStatement.getAggregationSelectItems().isEmpty()
? buildMemoryResultSet(filteredResults, sqlStatement) : buildStreamResultSet(filteredResults, sqlStatement);
return Optional.of(buildDecorateResultSet(result, sqlStatement));
ResultSetMerger result = !selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty()
? buildMemoryResultSet(filteredResults, selectStatement) : buildStreamResultSet(filteredResults, selectStatement);
return Optional.of(buildDecorateResultSet(result, selectStatement));
}
private static ResultSetMerger buildMemoryResultSet(final List<ResultSet> resultSets, final SQLStatement sqlStatement) throws SQLException {
private static ResultSetMerger buildMemoryResultSet(final List<ResultSet> resultSets, final SelectStatement selectStatement) throws SQLException {
return new GroupByMemoryResultSetMerger(
ResultSetUtil.getColumnLabelIndexMap(resultSets.get(0)), resultSets, sqlStatement.getGroupByItems(), sqlStatement.getOrderByItems(), sqlStatement.getAggregationSelectItems());
ResultSetUtil.getColumnLabelIndexMap(resultSets.get(0)), resultSets, selectStatement.getGroupByItems(), selectStatement.getOrderByItems(), selectStatement.getAggregationSelectItems());
}
private static ResultSetMerger buildStreamResultSet(final List<ResultSet> resultSets, final SQLStatement sqlStatement) throws SQLException {
if (sqlStatement.getGroupByItems().isEmpty() && sqlStatement.getOrderByItems().isEmpty()) {
private static ResultSetMerger buildStreamResultSet(final List<ResultSet> resultSets, final SelectStatement selectStatement) throws SQLException {
if (selectStatement.getGroupByItems().isEmpty() && selectStatement.getOrderByItems().isEmpty()) {
return new IteratorStreamResultSetMerger(resultSets);
}
return new OrderByStreamResultSetMerger(resultSets, sqlStatement.getOrderByItems());
return new OrderByStreamResultSetMerger(resultSets, selectStatement.getOrderByItems());
}
private static ResultSetMerger buildDecorateResultSet(final ResultSetMerger resultSetMerger, final SQLStatement sqlStatement) throws SQLException {
private static ResultSetMerger buildDecorateResultSet(final ResultSetMerger resultSetMerger, final SelectStatement selectStatement) throws SQLException {
ResultSetMerger result = resultSetMerger;
if (null != sqlStatement.getLimit()) {
result = new LimitDecoratorResultSetMerger(result, sqlStatement.getLimit());
if (null != selectStatement.getLimit()) {
result = new LimitDecoratorResultSetMerger(result, selectStatement.getLimit());
}
return result;
}
private static void setIndexForAggregationItem(final SQLStatement sqlStatement, final Map<String, Integer> columnLabelIndexMap) {
for (AggregationSelectItem each : sqlStatement.getAggregationSelectItems()) {
private static void setIndexForAggregationItem(final SelectStatement selectStatement, final Map<String, Integer> columnLabelIndexMap) {
for (AggregationSelectItem each : selectStatement.getAggregationSelectItems()) {
Preconditions.checkState(columnLabelIndexMap.containsKey(each.getColumnLabel()), String.format("Can't find index: %s, please add alias for aggregate selections", each));
each.setIndex(columnLabelIndexMap.get(each.getColumnLabel()));
for (AggregationSelectItem derived : each.getDerivedAggregationSelectItems()) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册