diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/core/statement/ShardingPreparedStatement.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/core/statement/ShardingPreparedStatement.java index 302f25af5bc3e8bd4982fa3789ffa65bd666ae6b..83c037a0f9f7c1c9b07ccdaef8b2a96f30985083 100644 --- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/core/statement/ShardingPreparedStatement.java +++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/core/statement/ShardingPreparedStatement.java @@ -27,6 +27,7 @@ import com.dangdang.ddframe.rdb.sharding.jdbc.core.resultset.ShardingResultSet; import com.dangdang.ddframe.rdb.sharding.merger.core.ResultSetMerger; import com.dangdang.ddframe.rdb.sharding.merger.core.MergeEngine; import com.dangdang.ddframe.rdb.sharding.parsing.parser.context.GeneratedKey; +import com.dangdang.ddframe.rdb.sharding.parsing.parser.statement.select.SelectStatement; import com.dangdang.ddframe.rdb.sharding.routing.PreparedStatementRoutingEngine; import com.dangdang.ddframe.rdb.sharding.routing.SQLExecutionUnit; import com.google.common.base.Optional; @@ -85,7 +86,7 @@ public final class ShardingPreparedStatement extends AbstractPreparedStatementAd // TODO refactor List resultSets = new PreparedStatementExecutor( getShardingConnection().getShardingContext().getExecutorEngine(), getRouteResult().getSqlStatement().getType(), preparedStatementUnits, getParameters()).executeQuery(); - Optional mergeResultSet = MergeEngine.getResultSet(resultSets, getRouteResult().getSqlStatement()); + Optional mergeResultSet = MergeEngine.getResultSet(resultSets, (SelectStatement) getRouteResult().getSqlStatement()); if (mergeResultSet.isPresent()) { result = new ShardingResultSet(resultSets, mergeResultSet.get()); } else { diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/core/statement/ShardingStatement.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/core/statement/ShardingStatement.java index c774bfb0b2d97d6f3052616164c9e195a849ff66..d836ac4957f351713c211db762b211e06686cac6 100644 --- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/core/statement/ShardingStatement.java +++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/core/statement/ShardingStatement.java @@ -27,6 +27,7 @@ import com.dangdang.ddframe.rdb.sharding.merger.core.ResultSetMerger; import com.dangdang.ddframe.rdb.sharding.merger.core.MergeEngine; import com.dangdang.ddframe.rdb.sharding.parsing.parser.context.GeneratedKey; import com.dangdang.ddframe.rdb.sharding.parsing.parser.statement.insert.InsertStatement; +import com.dangdang.ddframe.rdb.sharding.parsing.parser.statement.select.SelectStatement; import com.dangdang.ddframe.rdb.sharding.routing.SQLExecutionUnit; import com.dangdang.ddframe.rdb.sharding.routing.SQLRouteResult; import com.dangdang.ddframe.rdb.sharding.routing.StatementRoutingEngine; @@ -105,7 +106,7 @@ public class ShardingStatement extends AbstractStatementAdapter { try { // TODO refactor List resultSets = generateExecutor(sql).executeQuery(); - Optional mergeResultSet = MergeEngine.getResultSet(resultSets, getRouteResult().getSqlStatement()); + Optional mergeResultSet = MergeEngine.getResultSet(resultSets, (SelectStatement) getRouteResult().getSqlStatement()); if (mergeResultSet.isPresent()) { result = new ShardingResultSet(resultSets, mergeResultSet.get()); } else { @@ -255,7 +256,7 @@ public class ShardingStatement extends AbstractStatementAdapter { resultSets.add(each.getResultSet()); } // TODO refactor - Optional mergeResultSet = MergeEngine.getResultSet(resultSets, getRouteResult().getSqlStatement()); + Optional mergeResultSet = MergeEngine.getResultSet(resultSets, (SelectStatement) getRouteResult().getSqlStatement()); if (mergeResultSet.isPresent()) { currentResultSet = new ShardingResultSet(resultSets, mergeResultSet.get()); } else { diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/core/MergeEngine.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/core/MergeEngine.java index 11a77bc4fcd8c837449655578436398883788efe..970b5415be44c7d1a39b1a3416b6db779dd7c279 100644 --- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/core/MergeEngine.java +++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/core/MergeEngine.java @@ -25,7 +25,7 @@ import com.dangdang.ddframe.rdb.sharding.merger.core.stream.OrderByStreamResultS import com.dangdang.ddframe.rdb.sharding.merger.util.ResultSetUtil; import com.dangdang.ddframe.rdb.sharding.parsing.parser.context.OrderItem; import com.dangdang.ddframe.rdb.sharding.parsing.parser.context.selectitem.AggregationSelectItem; -import com.dangdang.ddframe.rdb.sharding.parsing.parser.statement.SQLStatement; +import com.dangdang.ddframe.rdb.sharding.parsing.parser.statement.select.SelectStatement; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import lombok.AccessLevel; @@ -51,20 +51,20 @@ public final class MergeEngine { * 获取结果集. * * @param resultSets 结果集列表 - * @param sqlStatement SQL语句对象 + * @param selectStatement SQL语句对象 * @return 结果集包装 * @throws SQLException SQL异常 */ - public static Optional getResultSet(final List resultSets, final SQLStatement sqlStatement) throws SQLException { - return buildResultSet(resultSets, sqlStatement); + public static Optional getResultSet(final List resultSets, final SelectStatement selectStatement) throws SQLException { + return buildResultSet(resultSets, selectStatement); } - private static Optional buildResultSet(final List resultSets, final SQLStatement sqlStatement) throws SQLException { + private static Optional buildResultSet(final List resultSets, final SelectStatement selectStatement) throws SQLException { // TODO MOVE TO sqlStatement Map columnLabelIndexMap = ResultSetUtil.getColumnLabelIndexMap(resultSets.get(0)); - setIndexForAggregationItem(sqlStatement, columnLabelIndexMap); - setIndexForOrderItem(columnLabelIndexMap, sqlStatement.getOrderByItems()); - setIndexForOrderItem(columnLabelIndexMap, sqlStatement.getGroupByItems()); + setIndexForAggregationItem(selectStatement, columnLabelIndexMap); + setIndexForOrderItem(columnLabelIndexMap, selectStatement.getOrderByItems()); + setIndexForOrderItem(columnLabelIndexMap, selectStatement.getGroupByItems()); List filteredResults = new ArrayList<>(resultSets.size()); @@ -76,33 +76,33 @@ public final class MergeEngine { if (filteredResults.isEmpty()) { return Optional.absent(); } - ResultSetMerger result = !sqlStatement.getGroupByItems().isEmpty() || !sqlStatement.getAggregationSelectItems().isEmpty() - ? buildMemoryResultSet(filteredResults, sqlStatement) : buildStreamResultSet(filteredResults, sqlStatement); - return Optional.of(buildDecorateResultSet(result, sqlStatement)); + ResultSetMerger result = !selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty() + ? buildMemoryResultSet(filteredResults, selectStatement) : buildStreamResultSet(filteredResults, selectStatement); + return Optional.of(buildDecorateResultSet(result, selectStatement)); } - private static ResultSetMerger buildMemoryResultSet(final List resultSets, final SQLStatement sqlStatement) throws SQLException { + private static ResultSetMerger buildMemoryResultSet(final List resultSets, final SelectStatement selectStatement) throws SQLException { return new GroupByMemoryResultSetMerger( - ResultSetUtil.getColumnLabelIndexMap(resultSets.get(0)), resultSets, sqlStatement.getGroupByItems(), sqlStatement.getOrderByItems(), sqlStatement.getAggregationSelectItems()); + ResultSetUtil.getColumnLabelIndexMap(resultSets.get(0)), resultSets, selectStatement.getGroupByItems(), selectStatement.getOrderByItems(), selectStatement.getAggregationSelectItems()); } - private static ResultSetMerger buildStreamResultSet(final List resultSets, final SQLStatement sqlStatement) throws SQLException { - if (sqlStatement.getGroupByItems().isEmpty() && sqlStatement.getOrderByItems().isEmpty()) { + private static ResultSetMerger buildStreamResultSet(final List resultSets, final SelectStatement selectStatement) throws SQLException { + if (selectStatement.getGroupByItems().isEmpty() && selectStatement.getOrderByItems().isEmpty()) { return new IteratorStreamResultSetMerger(resultSets); } - return new OrderByStreamResultSetMerger(resultSets, sqlStatement.getOrderByItems()); + return new OrderByStreamResultSetMerger(resultSets, selectStatement.getOrderByItems()); } - private static ResultSetMerger buildDecorateResultSet(final ResultSetMerger resultSetMerger, final SQLStatement sqlStatement) throws SQLException { + private static ResultSetMerger buildDecorateResultSet(final ResultSetMerger resultSetMerger, final SelectStatement selectStatement) throws SQLException { ResultSetMerger result = resultSetMerger; - if (null != sqlStatement.getLimit()) { - result = new LimitDecoratorResultSetMerger(result, sqlStatement.getLimit()); + if (null != selectStatement.getLimit()) { + result = new LimitDecoratorResultSetMerger(result, selectStatement.getLimit()); } return result; } - private static void setIndexForAggregationItem(final SQLStatement sqlStatement, final Map columnLabelIndexMap) { - for (AggregationSelectItem each : sqlStatement.getAggregationSelectItems()) { + private static void setIndexForAggregationItem(final SelectStatement selectStatement, final Map columnLabelIndexMap) { + for (AggregationSelectItem each : selectStatement.getAggregationSelectItems()) { Preconditions.checkState(columnLabelIndexMap.containsKey(each.getColumnLabel()), String.format("Can't find index: %s, please add alias for aggregate selections", each)); each.setIndex(columnLabelIndexMap.get(each.getColumnLabel())); for (AggregationSelectItem derived : each.getDerivedAggregationSelectItems()) {