- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.dangdang.ddframe.rdb.sharding.jdbc;
-
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.List;
-
-import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractResultSetAdapter;
-import com.dangdang.ddframe.rdb.sharding.parser.result.merger.Limit;
-import lombok.extern.slf4j.Slf4j;
-
-/**
- * 支持分片的结果集抽象类.
- *
- * @author zhangliang
- */
-@Slf4j
-public abstract class AbstractShardingResultSet extends AbstractResultSetAdapter {
-
- private final Limit limit;
-
- private boolean offsetSkipped;
-
- private int readCount;
-
- protected AbstractShardingResultSet(final List resultSets, final Limit limit) {
-// super(resultSets);
- this.limit = limit;
- setCurrentResultSet(resultSets.get(0));
- }
-
- @Override
- public final boolean next() throws SQLException {
- if (null != limit && !offsetSkipped) {
- skipOffset();
- }
- return null == limit ? nextForSharding() : ++readCount <= limit.getRowCount() && nextForSharding();
- }
-
- private void skipOffset() {
- for (int i = 0; i < limit.getOffset(); i++) {
- try {
- if (!nextForSharding()) {
- break;
- }
- } catch (final SQLException ignored) {
- log.warn("Skip result set error", ignored);
- }
- }
- offsetSkipped = true;
- }
-
- /**
- * 迭代结果集.
- *
- * @return true 可以继续访问 false 不能继续访问
- * @throws SQLException
- */
- protected abstract boolean nextForSharding() throws SQLException;
-}
diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingStatement.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingStatement.java
index 869dc92a90509af2fbf93588f5657279d8437d86..48690ffa6e7d3747b65b63a8951ae5b20b59d23d 100644
--- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingStatement.java
+++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingStatement.java
@@ -17,17 +17,6 @@
package com.dangdang.ddframe.rdb.sharding.jdbc;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
import com.dangdang.ddframe.rdb.sharding.executor.StatementExecutor;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractStatementAdapter;
import com.dangdang.ddframe.rdb.sharding.merger.ResultSetFactory;
@@ -41,6 +30,17 @@ import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
/**
* 支持分片的静态语句对象.
*
@@ -78,7 +78,7 @@ public class ShardingStatement extends AbstractStatementAdapter {
this(shardingConnection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
- public ShardingStatement(final ShardingConnection shardingConnection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException {
+ public ShardingStatement(final ShardingConnection shardingConnection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
super(Statement.class);
this.shardingConnection = shardingConnection;
this.resultSetType = resultSetType;
diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/adapter/AbstractDelegateResultSetAdapter.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/adapter/AbstractDelegateResultSetAdapter.java
index 497a4d25b648fb412463f0ef809383cc62f22862..28f8122f550839cede59bcfb5467e1a1fbd7f335 100644
--- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/adapter/AbstractDelegateResultSetAdapter.java
+++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/adapter/AbstractDelegateResultSetAdapter.java
@@ -4,9 +4,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,17 +17,17 @@
package com.dangdang.ddframe.rdb.sharding.jdbc.adapter;
+import lombok.extern.slf4j.Slf4j;
+
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
-import lombok.extern.slf4j.Slf4j;
-
/**
* 代理结果集.
- *
+ *
* @author gaohongtao
*/
@Slf4j
@@ -35,15 +35,22 @@ public abstract class AbstractDelegateResultSetAdapter extends AbstractResultSet
private int offset;
- public void setDelegatedResultSet(final ResultSet resultSet) {
+ protected void setDelegatedResultSet(final ResultSet resultSet) {
setCurrentResultSet(resultSet);
}
- @Override
- public boolean next() throws SQLException {
+ protected void increaseStat() {
offset++;
log.trace(toString());
- return getCurrentResultSet().next();
+ }
+
+ @Override
+ public boolean next() throws SQLException {
+ boolean result = getCurrentResultSet().next();
+ if (result) {
+ increaseStat();
+ }
+ return result;
}
@Override
@@ -118,6 +125,6 @@ public abstract class AbstractDelegateResultSetAdapter extends AbstractResultSet
@Override
public String toString() {
- return String.format("Delegate result set's offset is %d", offset);
+ return String.format("%s(%d)'s offset is %d", this.getClass().getSimpleName(), hashCode(), offset);
}
}
diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/adapter/AbstractRowSetResultSetAdapter.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/adapter/AbstractRowSetResultSetAdapter.java
index 05d817f89d992174c883d19f1300bb1a25ee439e..1ab9a2fd7e5e19ac37dd28e8eae99cc6b11b364c 100644
--- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/adapter/AbstractRowSetResultSetAdapter.java
+++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/adapter/AbstractRowSetResultSetAdapter.java
@@ -17,6 +17,16 @@
package com.dangdang.ddframe.rdb.sharding.jdbc.adapter;
+import com.dangdang.ddframe.rdb.sharding.jdbc.unsupported.AbstractUnsupportedOperationRowResultSet;
+import com.dangdang.ddframe.rdb.sharding.merger.common.ResultSetUtil;
+import com.dangdang.ddframe.rdb.sharding.merger.row.Row;
+import com.dangdang.ddframe.rdb.sharding.util.SQLUtil;
+import com.google.common.base.Preconditions;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
import java.math.BigDecimal;
import java.net.MalformedURLException;
import java.net.URL;
@@ -33,17 +43,12 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-import com.dangdang.ddframe.rdb.sharding.jdbc.unsupported.AbstractUnsupportedOperationRowResultSet;
-import com.dangdang.ddframe.rdb.sharding.merger.common.ResultSetUtil;
-import com.dangdang.ddframe.rdb.sharding.merger.row.Row;
-import com.google.common.base.Preconditions;
-import lombok.Setter;
-
/**
* 使用行数据集实现的结果集.
*
* @author gaohongtao
*/
+@Slf4j
public abstract class AbstractRowSetResultSetAdapter extends AbstractUnsupportedOperationRowResultSet {
@Setter
@@ -55,6 +60,7 @@ public abstract class AbstractRowSetResultSetAdapter extends AbstractUnsupported
private final Map columnLabelToIndexMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+ @Getter(AccessLevel.PROTECTED)
private Row currentRow;
private boolean wasNullFlag;
@@ -62,7 +68,9 @@ public abstract class AbstractRowSetResultSetAdapter extends AbstractUnsupported
@Override
public boolean next() throws SQLException {
init();
- return (currentRow = nextRow()) != null;
+ boolean result = (currentRow = nextRow()) != null;
+ log.trace(toString());
+ return result;
}
private void init() throws SQLException {
@@ -106,8 +114,19 @@ public abstract class AbstractRowSetResultSetAdapter extends AbstractUnsupported
@Override
public int findColumn(final String columnLabel) throws SQLException {
initColumnIndexMap();
- Preconditions.checkArgument(columnLabelToIndexMap.containsKey(columnLabel), String.format("Column label %s does not exist", columnLabel));
- return columnLabelToIndexMap.get(columnLabel);
+ String formattedColumnLabel;
+ if (columnLabelToIndexMap.containsKey(columnLabel)) {
+ formattedColumnLabel = columnLabel;
+ } else {
+ formattedColumnLabel = SQLUtil.getExactlyValue(columnLabel);
+ }
+ Preconditions.checkArgument(columnLabelToIndexMap.containsKey(formattedColumnLabel), String.format("Column label %s does not exist", formattedColumnLabel));
+ return columnLabelToIndexMap.get(formattedColumnLabel);
+ }
+
+ @Override
+ public Statement getStatement() throws SQLException {
+ return resultSets.get(0).getStatement();
}
@Override
@@ -118,17 +137,10 @@ public abstract class AbstractRowSetResultSetAdapter extends AbstractUnsupported
Preconditions.checkArgument(currentRow.containsCell(columnIndex), String.format("Column Index %d out of range", columnIndex));
Object cell = currentRow.getCell(columnIndex);
- if (null == cell) {
- this.wasNullFlag = true;
- }
+ this.wasNullFlag = null == cell;
return cell;
}
- @Override
- public Statement getStatement() throws SQLException {
- return resultSets.get(0).getStatement();
- }
-
@Override
public Object getObject(final String columnLabel) throws SQLException {
return getObject(findColumn(columnLabel));
@@ -153,6 +165,7 @@ public abstract class AbstractRowSetResultSetAdapter extends AbstractUnsupported
public boolean getBoolean(final int columnIndex) throws SQLException {
Object cell = getObject(columnIndex);
if (null == cell) {
+ wasNullFlag = false;
return false;
}
return (cell instanceof Boolean) ? (Boolean) cell : Boolean.valueOf(cell.toString());
@@ -375,4 +388,9 @@ public abstract class AbstractRowSetResultSetAdapter extends AbstractUnsupported
public int getConcurrency() throws SQLException {
return ResultSet.CONCUR_READ_ONLY;
}
+
+ @Override
+ public String toString() {
+ return String.format("Current row is %s", currentRow);
+ }
}
diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/adapter/WrapperAdapter.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/adapter/WrapperAdapter.java
index c7fae0c29ecf4a501c10f56bd2adfed0010d0ea5..cea8848739ccc220eaf751c7dceba562809ffbd5 100644
--- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/adapter/WrapperAdapter.java
+++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/adapter/WrapperAdapter.java
@@ -17,14 +17,14 @@
package com.dangdang.ddframe.rdb.sharding.jdbc.adapter;
+import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException;
+import com.dangdang.ddframe.rdb.sharding.jdbc.util.JdbcMethodInvocation;
+
import java.sql.SQLException;
import java.sql.Wrapper;
import java.util.ArrayList;
import java.util.Collection;
-import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException;
-import com.dangdang.ddframe.rdb.sharding.jdbc.util.JdbcMethodInvocation;
-
/**
* JDBC Wrapper适配类.
*
@@ -32,7 +32,7 @@ import com.dangdang.ddframe.rdb.sharding.jdbc.util.JdbcMethodInvocation;
*/
public class WrapperAdapter implements Wrapper {
- private Collection jdbcMethodInvocations = new ArrayList<>();
+ private final Collection jdbcMethodInvocations = new ArrayList<>();
@SuppressWarnings("unchecked")
@Override
diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/unsupported/AbstractUnsupportedOperationResultSet.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/unsupported/AbstractUnsupportedOperationResultSet.java
index a50ec3d1808527ced87bf48826ae3f5b27ae99d0..c0230df879bfb0bec95a867c0dddf8260d18a31f 100644
--- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/unsupported/AbstractUnsupportedOperationResultSet.java
+++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/unsupported/AbstractUnsupportedOperationResultSet.java
@@ -37,9 +37,8 @@ public abstract class AbstractUnsupportedOperationResultSet extends AbstractResu
throw new SQLFeatureNotSupportedException("previous");
}
- //TODO:MERGE改造,改造后子类需要实现光标判断的四个方法
@Override
- public boolean isBeforeFirst() throws SQLException {
+ public final boolean isBeforeFirst() throws SQLException {
throw new SQLFeatureNotSupportedException("isBeforeFirst");
}
diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/ResultSetFactory.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/ResultSetFactory.java
index fce1736236eaa8e213cd47e533fbe10a255b4b9e..acf2fcc83b18a80bb6537c69193d43538cafd9e5 100644
--- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/ResultSetFactory.java
+++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/ResultSetFactory.java
@@ -4,9 +4,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,19 +17,15 @@
package com.dangdang.ddframe.rdb.sharding.merger;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
+import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException;
import com.dangdang.ddframe.rdb.sharding.merger.component.coupling.GroupByCouplingResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.component.coupling.LimitCouplingResultSet;
+import com.dangdang.ddframe.rdb.sharding.merger.component.other.WrapperResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.component.reducer.IteratorReducerResultSet;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.AggregationColumn;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.IndexColumn;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.MergeContext;
+import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
@@ -39,9 +35,16 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.map.CaseInsensitiveMap;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
/**
* 创建归并分片结果集的工厂.
- *
+ *
* @author gaohongtao
*/
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
@@ -50,7 +53,7 @@ public final class ResultSetFactory {
/**
* 获取结果集.
- *
+ *
* @param resultSets 结果集列表
* @param mergeContext 结果归并上下文
* @return 结果集包装
@@ -62,7 +65,7 @@ public final class ResultSetFactory {
return resultSets.get(0);
} else if (filteredResultSets.size() == 1) {
log.trace("Sharding-JDBC:Only one result set");
- return resultSets.get(0);
+ return filteredResultSets.get(0);
}
setColumnIndex(filteredResultSets.get(0), mergeContext);
ResultSetPipelineBuilder builder = new ResultSetPipelineBuilder(filteredResultSets, mergeContext.getOrderByColumns());
@@ -71,21 +74,25 @@ public final class ResultSetFactory {
return builder.build();
}
- private static List filterResultSets(final List resultSets) throws SQLException {
- return Lists.newArrayList(Collections2.filter(resultSets, new Predicate() {
+ private static List filterResultSets(final List resultSets) {
+ return Lists.newArrayList(Collections2.filter(Lists.transform(resultSets, new Function() {
+
@Override
- public boolean apply(final ResultSet input) {
+ public ResultSet apply(final ResultSet input) {
try {
- return input.next();
+ return new WrapperResultSet(input);
} catch (final SQLException ex) {
- log.error("filter result set error", ex);
- return false;
+ throw new ShardingJdbcException(ex);
}
}
+ }), new Predicate() {
+ @Override
+ public boolean apply(final ResultSet input) {
+ return !((WrapperResultSet) input).isEmpty();
+ }
}));
}
-
private static void setColumnIndex(final ResultSet resultSet, final MergeContext mergeContext) throws SQLException {
ResultSetMetaData md = resultSet.getMetaData();
Map columnLabelIndexMap = new CaseInsensitiveMap<>(md.getColumnCount());
@@ -94,7 +101,10 @@ public final class ResultSetFactory {
columnLabelIndexMap.put(columnLabel, i);
}
for (IndexColumn each : extractIndexColumns(mergeContext)) {
- Preconditions.checkState(columnLabelIndexMap.containsKey(each.getColumnLabel().orNull()) || columnLabelIndexMap.containsKey(each.getColumnName().orNull()),
+ if (each.getColumnIndex() > 0) {
+ continue;
+ }
+ Preconditions.checkState(columnLabelIndexMap.containsKey(each.getColumnLabel().orNull()) || columnLabelIndexMap.containsKey(each.getColumnName().orNull()),
String.format("%s has not index", each));
if (each.getColumnLabel().isPresent() && columnLabelIndexMap.containsKey(each.getColumnLabel().get())) {
each.setColumnIndex(columnLabelIndexMap.get(each.getColumnLabel().get()));
diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/ResultSetPipelineBuilder.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/ResultSetPipelineBuilder.java
index 4aed5f5678e06aa76cd0f9aed920163826e1573e..4c22641ed2d9e2aff3e89e94b836bf1358c16a79 100644
--- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/ResultSetPipelineBuilder.java
+++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/ResultSetPipelineBuilder.java
@@ -17,12 +17,6 @@
package com.dangdang.ddframe.rdb.sharding.merger;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.AbstractList;
-import java.util.ArrayList;
-import java.util.List;
-
import com.dangdang.ddframe.rdb.sharding.merger.component.ComponentResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.component.CouplingResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.component.ReducerResultSet;
@@ -31,9 +25,14 @@ import com.dangdang.ddframe.rdb.sharding.merger.component.reducer.MemoryOrderByR
import com.dangdang.ddframe.rdb.sharding.merger.component.reducer.StreamingOrderByReducerResultSet;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
import com.google.common.base.Preconditions;
-import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* 结果集管道构建器.
*
@@ -42,14 +41,13 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ResultSetPipelineBuilder {
- @Getter
private final List inputResultSets;
- private AbstractList orderByColumns = new ArrayList<>();
+ private final AbstractList orderByColumns = new ArrayList<>();
private ComponentResultSet tailResultSet;
- public ResultSetPipelineBuilder(final List resultSets, final List orderByColumns) throws SQLException {
+ public ResultSetPipelineBuilder(final List resultSets, final List orderByColumns) {
inputResultSets = resultSets;
this.orderByColumns.addAll(orderByColumns);
}
@@ -63,11 +61,12 @@ public class ResultSetPipelineBuilder {
public ResultSetPipelineBuilder join(final ComponentResultSet componentResultSet) throws SQLException {
Preconditions.checkArgument(componentResultSet instanceof ReducerResultSet || componentResultSet instanceof CouplingResultSet);
if (componentResultSet instanceof ReducerResultSet) {
- ((ReducerResultSet) componentResultSet).inject(inputResultSets);
+ ((ReducerResultSet) componentResultSet).init(inputResultSets);
} else {
- ((CouplingResultSet) componentResultSet).inject(tailResultSet);
+ ((CouplingResultSet) componentResultSet).init(tailResultSet);
}
tailResultSet = componentResultSet;
+ log.trace("join component {}", tailResultSet.getClass().getSimpleName());
return this;
}
@@ -81,6 +80,7 @@ public class ResultSetPipelineBuilder {
if (orderEqual(expectOrderList)) {
join(new StreamingOrderByReducerResultSet(expectOrderList));
} else {
+ setNewOrder(expectOrderList);
join(new MemoryOrderByReducerResultSet(expectOrderList));
}
return this;
@@ -97,6 +97,7 @@ public class ResultSetPipelineBuilder {
if (orderEqual(expectOrderList)) {
return this;
}
+ setNewOrder(expectOrderList);
join(new MemoryOrderByCouplingResultSet(expectOrderList));
return this;
}
@@ -107,7 +108,6 @@ public class ResultSetPipelineBuilder {
* @return 结果集
*/
public ResultSet build() {
- log.trace("The pipeline of result set handling is : {}", tailResultSet);
return tailResultSet;
}
@@ -115,4 +115,9 @@ public class ResultSetPipelineBuilder {
return orderByColumns.equals(expectOrderList);
}
+ private void setNewOrder(final List orderList) {
+ orderByColumns.clear();
+ orderByColumns.addAll(orderList);
+ }
+
}
diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/common/ResultSetUtil.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/common/ResultSetUtil.java
index a8f7c90ede4a4d950aef5218ae395d2471106c66..101871f72bbe911ccc98e280990494f4f3db3acd 100644
--- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/common/ResultSetUtil.java
+++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/common/ResultSetUtil.java
@@ -17,21 +17,16 @@
package com.dangdang.ddframe.rdb.sharding.merger.common;
-import java.math.BigDecimal;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn.OrderByType;
-import com.google.common.base.Function;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
+import java.math.BigDecimal;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Date;
+
/**
* 结果集处理工具类.
*
@@ -135,15 +130,4 @@ public final class ResultSetUtil {
public static int compareTo(final Comparable thisValue, final Comparable otherValue, final OrderByType orderByType) {
return OrderByType.ASC == orderByType ? thisValue.compareTo(otherValue) : -thisValue.compareTo(otherValue);
}
-
- public static List iterateResultSet(final ResultSet rs, final Function function) throws SQLException {
- if (rs.isBeforeFirst()) {
- rs.next();
- }
- List result = new ArrayList<>();
- do {
- result.add(function.apply(rs));
- } while (rs.next());
- return result;
- }
}
diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/ComponentResultSet.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/ComponentResultSet.java
index 9e2a1402aa5a718f523c300444fefd9ce9ef2ef3..73f296ccaae00f77d9ae558ba8b6196cd9d7e2d1 100644
--- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/ComponentResultSet.java
+++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/ComponentResultSet.java
@@ -27,5 +27,5 @@ import java.sql.SQLException;
*/
public interface ComponentResultSet extends ResultSet {
- void inject(final T preResultSet) throws SQLException;
+ void init(final T preResultSet) throws SQLException;
}
diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/coupling/GroupByCouplingResultSet.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/coupling/GroupByCouplingResultSet.java
index 6167cab7bcbac4d4184b17b7c5fb4375b3addff1..8d0c984256fd3ff8435104be0d6337f24f57943d 100644
--- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/coupling/GroupByCouplingResultSet.java
+++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/coupling/GroupByCouplingResultSet.java
@@ -17,11 +17,6 @@
package com.dangdang.ddframe.rdb.sharding.merger.component.coupling;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.Collections;
-import java.util.List;
-
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractRowSetResultSetAdapter;
import com.dangdang.ddframe.rdb.sharding.merger.component.CouplingResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.row.GroupByRow;
@@ -30,9 +25,14 @@ import com.dangdang.ddframe.rdb.sharding.parser.result.merger.AggregationColumn;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.GroupByColumn;
import lombok.RequiredArgsConstructor;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
/**
* 分组节点结果集.
- *
+ *
* @author gaohongtao
*/
@RequiredArgsConstructor
@@ -47,7 +47,7 @@ public class GroupByCouplingResultSet extends AbstractRowSetResultSetAdapter imp
private boolean hasNext;
@Override
- public void inject(final ResultSet preResultSet) {
+ public void init(final ResultSet preResultSet) {
setResultSets(Collections.singletonList(preResultSet));
}
@@ -62,8 +62,8 @@ public class GroupByCouplingResultSet extends AbstractRowSetResultSetAdapter imp
if (!hasNext) {
return null;
}
- GroupByRow row = new GroupByRow(resultSet);
- hasNext = row.aggregate(groupByColumns, aggregationColumns);
+ GroupByRow row = new GroupByRow(resultSet, groupByColumns, aggregationColumns);
+ hasNext = row.aggregate();
return row;
}
}
diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/coupling/LimitCouplingResultSet.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/coupling/LimitCouplingResultSet.java
index d4fece73019987e0f0bc4440e8840e52b9ff62a0..3212ed6f28a7f22ef72ec4652ffc126ab405ffd5 100644
--- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/coupling/LimitCouplingResultSet.java
+++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/coupling/LimitCouplingResultSet.java
@@ -17,21 +17,25 @@
package com.dangdang.ddframe.rdb.sharding.merger.component.coupling;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractDelegateResultSetAdapter;
import com.dangdang.ddframe.rdb.sharding.merger.component.CouplingResultSet;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.Limit;
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
/**
+ * 限制结果集.
+ *
* @author gaohongtao
*/
+@Slf4j
public class LimitCouplingResultSet extends AbstractDelegateResultSetAdapter implements CouplingResultSet {
private final Limit limit;
- private int rowCount;
+ private int rowNumber;
private ResultSet preResultSet;
@@ -42,24 +46,41 @@ public class LimitCouplingResultSet extends AbstractDelegateResultSetAdapter imp
}
@Override
- public void inject(final ResultSet preResultSet) {
+ public void init(final ResultSet preResultSet) {
setDelegatedResultSet(preResultSet);
this.preResultSet = preResultSet;
}
@Override
public boolean next() throws SQLException {
- init();
- return ++rowCount <= limit.getRowCount() && preResultSet.next();
+ boolean result = true;
+ if (!initial) {
+ result = skipOffset();
+ }
+ if (!result) {
+ return false;
+ }
+ result = ++rowNumber <= limit.getRowCount() && preResultSet.next();
+ if (result) {
+ increaseStat();
+ }
+ return result;
}
- private void init() throws SQLException {
- if (initial) {
- return;
- }
+ private boolean skipOffset() throws SQLException {
+ boolean result = true;
for (int i = 0; i < limit.getOffset(); i++) {
- preResultSet.next();
+ result = preResultSet.next();
+ if (!result) {
+ break;
+ }
}
initial = true;
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Limit row number:%d limit size:%d result set stat:%s", rowNumber, limit.getRowCount(), super.toString());
}
}
diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/coupling/MemoryOrderByCouplingResultSet.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/coupling/MemoryOrderByCouplingResultSet.java
index 6d2ed4dba627a266e6de921c5fe596abd04d4a7e..da378b35698369e546ad7aaebd2b9f3c3c219482 100644
--- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/coupling/MemoryOrderByCouplingResultSet.java
+++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/coupling/MemoryOrderByCouplingResultSet.java
@@ -17,14 +17,14 @@
package com.dangdang.ddframe.rdb.sharding.merger.component.coupling;
-import java.sql.ResultSet;
-import java.util.List;
-
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractDelegateResultSetAdapter;
-import com.dangdang.ddframe.rdb.sharding.merger.common.MemoryOrderByResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.component.CouplingResultSet;
+import com.dangdang.ddframe.rdb.sharding.merger.component.other.MemoryOrderByResultSet;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
+import java.sql.ResultSet;
+import java.util.List;
+
/**
* 基于内存的全排序.
*
@@ -39,7 +39,7 @@ public class MemoryOrderByCouplingResultSet extends AbstractDelegateResultSetAda
}
@Override
- public void inject(final ResultSet preResultSet) {
+ public void init(final ResultSet preResultSet) {
setDelegatedResultSet(new MemoryOrderByResultSet(preResultSet, expectOrderList));
}
}
diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/common/MemoryOrderByResultSet.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/other/MemoryOrderByResultSet.java
similarity index 76%
rename from sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/common/MemoryOrderByResultSet.java
rename to sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/other/MemoryOrderByResultSet.java
index c48e18b216fa8a404471f4e193e3cbb1d5518789..ce249ba65674ffc5691ec9604611c6fb7e29b695 100644
--- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/common/MemoryOrderByResultSet.java
+++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/other/MemoryOrderByResultSet.java
@@ -15,7 +15,12 @@
*
*/
-package com.dangdang.ddframe.rdb.sharding.merger.common;
+package com.dangdang.ddframe.rdb.sharding.merger.component.other;
+
+import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractRowSetResultSetAdapter;
+import com.dangdang.ddframe.rdb.sharding.merger.row.OrderByRow;
+import com.dangdang.ddframe.rdb.sharding.merger.row.Row;
+import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -24,16 +29,9 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException;
-import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractRowSetResultSetAdapter;
-import com.dangdang.ddframe.rdb.sharding.merger.row.OrderByRow;
-import com.dangdang.ddframe.rdb.sharding.merger.row.Row;
-import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
-import com.google.common.base.Function;
-
/**
* 内存结果集.
- *
+ *
* @author gaohongtao
*/
public class MemoryOrderByResultSet extends AbstractRowSetResultSetAdapter {
@@ -55,16 +53,9 @@ public class MemoryOrderByResultSet extends AbstractRowSetResultSetAdapter {
protected void initRows(final List resultSets) throws SQLException {
List orderByRows = new LinkedList<>();
for (ResultSet each : resultSets) {
- orderByRows.addAll(ResultSetUtil.iterateResultSet(each, new Function() {
- @Override
- public OrderByRow apply(final ResultSet input) {
- try {
- return new OrderByRow(orderByColumns, input);
- } catch (final SQLException ex) {
- throw new ShardingJdbcException("Access result set error", ex);
- }
- }
- }));
+ while (each.next()) {
+ orderByRows.add(new OrderByRow(orderByColumns, each));
+ }
}
Collections.sort(orderByRows);
orderByRowsIterator = orderByRows.iterator();
diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/other/WrapperResultSet.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/other/WrapperResultSet.java
new file mode 100644
index 0000000000000000000000000000000000000000..0f357bacc037266346bde846414d0b036b879968
--- /dev/null
+++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/other/WrapperResultSet.java
@@ -0,0 +1,57 @@
+/**
+ * Copyright 1999-2015 dangdang.com.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.dangdang.ddframe.rdb.sharding.merger.component.other;
+
+import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractDelegateResultSetAdapter;
+import lombok.Getter;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * 原始结果集包装类.
+ *
+ * @author gaohongtao
+ */
+public class WrapperResultSet extends AbstractDelegateResultSetAdapter {
+
+ @Getter
+ private final boolean isEmpty;
+
+ private boolean isFirstNext;
+
+ public WrapperResultSet(final ResultSet resultSet) throws SQLException {
+ isEmpty = !resultSet.next();
+ if (isEmpty) {
+ return;
+ }
+ setDelegatedResultSet(resultSet);
+ increaseStat();
+ }
+
+ @Override
+ public boolean next() throws SQLException {
+ if (isEmpty) {
+ return false;
+ }
+ if (!isFirstNext) {
+ return isFirstNext = true;
+ }
+ return super.next();
+ }
+}
diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/reducer/IteratorReducerResultSet.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/reducer/IteratorReducerResultSet.java
index a417757801869cda79e134b06db8b95fac62dbb6..196ce76d6d3bc28811f742d76d96128d0922ccdf 100644
--- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/reducer/IteratorReducerResultSet.java
+++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/reducer/IteratorReducerResultSet.java
@@ -17,14 +17,14 @@
package com.dangdang.ddframe.rdb.sharding.merger.component.reducer;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.List;
-
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractResultSetAdapter;
import com.dangdang.ddframe.rdb.sharding.merger.component.ReducerResultSet;
import lombok.extern.slf4j.Slf4j;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
/**
* 迭代归并结果集.
*
@@ -38,23 +38,27 @@ public class IteratorReducerResultSet extends AbstractResultSetAdapter implement
private int currentResultSetOffset;
@Override
- public void inject(final List preResultSet) {
+ public void init(final List preResultSet) {
setResultSets(preResultSet);
+ resultSetIndex++;
+ setCurrentResultSet(preResultSet.get(0));
}
@Override
public boolean next() throws SQLException {
if (null != getCurrentResultSet() && getCurrentResultSet().next()) {
currentResultSetOffset++;
+ log.trace(toString());
return true;
}
if (resultSetIndex >= getResultSets().size()) {
return false;
}
+ currentResultSetOffset = 1;
ResultSet rs = getResultSets().get(resultSetIndex++);
setCurrentResultSet(rs);
log.trace(toString());
- return true;
+ return rs.next();
}
@Override
diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/reducer/MemoryOrderByReducerResultSet.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/reducer/MemoryOrderByReducerResultSet.java
index 6ebc423cf5bb5753a315cb4f94da926c79da7a4d..78914f6a3e8db7863a6708fd65e6ea2488df6653 100644
--- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/reducer/MemoryOrderByReducerResultSet.java
+++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/reducer/MemoryOrderByReducerResultSet.java
@@ -17,14 +17,14 @@
package com.dangdang.ddframe.rdb.sharding.merger.component.reducer;
-import java.sql.ResultSet;
-import java.util.List;
-
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractDelegateResultSetAdapter;
-import com.dangdang.ddframe.rdb.sharding.merger.common.MemoryOrderByResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.component.ReducerResultSet;
+import com.dangdang.ddframe.rdb.sharding.merger.component.other.MemoryOrderByResultSet;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
+import java.sql.ResultSet;
+import java.util.List;
+
/**
* 根据排序列进行内存中排序.
*
@@ -39,7 +39,7 @@ public class MemoryOrderByReducerResultSet extends AbstractDelegateResultSetAdap
}
@Override
- public void inject(final List preResultSet) {
+ public void init(final List preResultSet) {
setDelegatedResultSet(new MemoryOrderByResultSet(preResultSet, orderByColumns));
}
diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/reducer/StreamingOrderByReducerResultSet.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/reducer/StreamingOrderByReducerResultSet.java
index d5b783d9e541457acffcebd635fe1fe5030069ca..3b8b3d12fd16f61de11c5d1bcc60ce3c5cc6cf30 100644
--- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/reducer/StreamingOrderByReducerResultSet.java
+++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/component/reducer/StreamingOrderByReducerResultSet.java
@@ -17,19 +17,21 @@
package com.dangdang.ddframe.rdb.sharding.merger.component.reducer;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
+import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractResultSetAdapter;
import com.dangdang.ddframe.rdb.sharding.merger.component.ReducerResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.row.OrderByRow;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
-import com.google.common.collect.Lists;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
import lombok.extern.slf4j.Slf4j;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
/**
* 流式排序.
*
@@ -40,24 +42,29 @@ public class StreamingOrderByReducerResultSet extends AbstractResultSetAdapter i
private final List orderByColumns;
- private List effectiveResultSets;
-
private boolean initial;
- private Map innerStateMap = new HashMap<>();
+ private Queue effectiveResultSetQueue;
public StreamingOrderByReducerResultSet(final List orderByColumns) {
this.orderByColumns = orderByColumns;
}
@Override
- public void inject(final List preResultSet) throws SQLException {
+ public void init(final List preResultSet) throws SQLException {
setResultSets(preResultSet);
setCurrentResultSet(preResultSet.get(0));
- effectiveResultSets = Lists.newArrayList(preResultSet);
- if (log.isDebugEnabled()) {
-
- }
+ effectiveResultSetQueue = new LinkedList<>(Collections2.filter(preResultSet, new Predicate() {
+ @Override
+ public boolean apply(final ResultSet input) {
+ try {
+ return input.next();
+ } catch (final SQLException ex) {
+ throw new ShardingJdbcException(ex);
+ }
+ }
+ }));
+ log.trace("Effective result set:{}", effectiveResultSetQueue);
}
@Override
@@ -68,20 +75,29 @@ public class StreamingOrderByReducerResultSet extends AbstractResultSetAdapter i
initial = true;
}
OrderByRow chosenOrderByValue = null;
- for (ResultSet each : effectiveResultSets) {
+ for (ResultSet each : effectiveResultSetQueue) {
OrderByRow eachOrderByValue = new OrderByRow(orderByColumns, each);
if (null == chosenOrderByValue || chosenOrderByValue.compareTo(eachOrderByValue) > 0) {
chosenOrderByValue = eachOrderByValue;
setCurrentResultSet(each);
}
}
- return !effectiveResultSets.isEmpty();
+ if (!effectiveResultSetQueue.isEmpty()) {
+ log.trace(toString());
+ }
+ return !effectiveResultSetQueue.isEmpty();
}
private void nextEffectiveResultSets() throws SQLException {
boolean next = getCurrentResultSet().next();
if (!next) {
- effectiveResultSets.remove(getCurrentResultSet());
+ effectiveResultSetQueue.remove(getCurrentResultSet());
+ log.trace("Result set {} finish", getCurrentResultSet());
}
}
+
+ @Override
+ public String toString() {
+ return String.format("Current result set:%s", getCurrentResultSet());
+ }
}
diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/row/GroupByRow.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/row/GroupByRow.java
index 15e347ece51dd19f52b96b9ea54a4cdcceb45c70..709ee44dbfae8acf58184e90380dc90caaf6d029 100644
--- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/row/GroupByRow.java
+++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/merger/row/GroupByRow.java
@@ -17,14 +17,6 @@
package com.dangdang.ddframe.rdb.sharding.merger.row;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException;
import com.dangdang.ddframe.rdb.sharding.merger.aggregation.AggregationUnit;
import com.dangdang.ddframe.rdb.sharding.merger.aggregation.AggregationUnitFactory;
@@ -33,41 +25,50 @@ import com.dangdang.ddframe.rdb.sharding.parser.result.merger.GroupByColumn;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.IndexColumn;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
/**
* @author gaohongtao
*/
+@Slf4j
public class GroupByRow extends Row {
private final ResultSet resultSet;
- private final Function getValueByColumnIndexFunction = new Function() {
- @Override
- public Object apply(final IndexColumn input) {
- return getValueSafely(input.getColumnIndex());
- }
- };
+ private final List groupByColumns;
- public GroupByRow(final ResultSet resultSet) throws SQLException {
+ private final List aggregationColumns;
+
+ public GroupByRow(final ResultSet resultSet, final List groupByColumns, final List aggregationColumns) throws SQLException {
super(resultSet);
this.resultSet = resultSet;
+ this.groupByColumns = groupByColumns;
+ this.aggregationColumns = aggregationColumns;
}
- public boolean aggregate(final List groupByColumns, final List aggregationColumns) throws SQLException {
+ public boolean aggregate() throws SQLException {
Map aggregationUnitMap = null;
if (!aggregationColumns.isEmpty()) {
aggregationUnitMap = new HashMap<>(aggregationColumns.size());
}
List