提交 7e21a9bb 编写于 作者: G gaoht

first version pipeline implement finish

上级 f2a5026c
......@@ -17,17 +17,6 @@
package com.dangdang.ddframe.rdb.sharding.jdbc;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.dangdang.ddframe.rdb.sharding.executor.StatementExecutor;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractStatementAdapter;
import com.dangdang.ddframe.rdb.sharding.merger.ResultSetFactory;
......@@ -41,6 +30,17 @@ import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 支持分片的静态语句对象.
*
......@@ -78,7 +78,7 @@ public class ShardingStatement extends AbstractStatementAdapter {
this(shardingConnection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
public ShardingStatement(final ShardingConnection shardingConnection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException {
public ShardingStatement(final ShardingConnection shardingConnection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
super(Statement.class);
this.shardingConnection = shardingConnection;
this.resultSetType = resultSetType;
......
......@@ -4,9 +4,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
......@@ -17,17 +17,17 @@
package com.dangdang.ddframe.rdb.sharding.jdbc.adapter;
import lombok.extern.slf4j.Slf4j;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
import lombok.extern.slf4j.Slf4j;
/**
* 代理结果集.
*
*
* @author gaohongtao
*/
@Slf4j
......@@ -35,15 +35,22 @@ public abstract class AbstractDelegateResultSetAdapter extends AbstractResultSet
private int offset;
public void setDelegatedResultSet(final ResultSet resultSet) {
protected void setDelegatedResultSet(final ResultSet resultSet) {
setCurrentResultSet(resultSet);
}
@Override
public boolean next() throws SQLException {
protected void increaseStat() {
offset++;
log.trace(toString());
return getCurrentResultSet().next();
}
@Override
public boolean next() throws SQLException {
boolean result = getCurrentResultSet().next();
if (result) {
increaseStat();
}
return result;
}
@Override
......@@ -118,6 +125,6 @@ public abstract class AbstractDelegateResultSetAdapter extends AbstractResultSet
@Override
public String toString() {
return String.format("Delegate result set's offset is %d", offset);
return String.format("%s(%d)'s offset is %d", this.getClass().getSimpleName(), hashCode(), offset);
}
}
......@@ -17,6 +17,16 @@
package com.dangdang.ddframe.rdb.sharding.jdbc.adapter;
import com.dangdang.ddframe.rdb.sharding.jdbc.unsupported.AbstractUnsupportedOperationRowResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.common.ResultSetUtil;
import com.dangdang.ddframe.rdb.sharding.merger.row.Row;
import com.dangdang.ddframe.rdb.sharding.util.SQLUtil;
import com.google.common.base.Preconditions;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.math.BigDecimal;
import java.net.MalformedURLException;
import java.net.URL;
......@@ -33,17 +43,12 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import com.dangdang.ddframe.rdb.sharding.jdbc.unsupported.AbstractUnsupportedOperationRowResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.common.ResultSetUtil;
import com.dangdang.ddframe.rdb.sharding.merger.row.Row;
import com.google.common.base.Preconditions;
import lombok.Setter;
/**
* 使用行数据集实现的结果集.
*
* @author gaohongtao
*/
@Slf4j
public abstract class AbstractRowSetResultSetAdapter extends AbstractUnsupportedOperationRowResultSet {
@Setter
......@@ -55,6 +60,7 @@ public abstract class AbstractRowSetResultSetAdapter extends AbstractUnsupported
private final Map<String, Integer> columnLabelToIndexMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
@Getter(AccessLevel.PROTECTED)
private Row currentRow;
private boolean wasNullFlag;
......@@ -62,7 +68,9 @@ public abstract class AbstractRowSetResultSetAdapter extends AbstractUnsupported
@Override
public boolean next() throws SQLException {
init();
return (currentRow = nextRow()) != null;
boolean result = (currentRow = nextRow()) != null;
log.trace(toString());
return result;
}
private void init() throws SQLException {
......@@ -106,8 +114,19 @@ public abstract class AbstractRowSetResultSetAdapter extends AbstractUnsupported
@Override
public int findColumn(final String columnLabel) throws SQLException {
initColumnIndexMap();
Preconditions.checkArgument(columnLabelToIndexMap.containsKey(columnLabel), String.format("Column label %s does not exist", columnLabel));
return columnLabelToIndexMap.get(columnLabel);
String formattedColumnLabel;
if (columnLabelToIndexMap.containsKey(columnLabel)) {
formattedColumnLabel = columnLabel;
} else {
formattedColumnLabel = SQLUtil.getExactlyValue(columnLabel);
}
Preconditions.checkArgument(columnLabelToIndexMap.containsKey(formattedColumnLabel), String.format("Column label %s does not exist", formattedColumnLabel));
return columnLabelToIndexMap.get(formattedColumnLabel);
}
@Override
public Statement getStatement() throws SQLException {
return resultSets.get(0).getStatement();
}
@Override
......@@ -118,17 +137,10 @@ public abstract class AbstractRowSetResultSetAdapter extends AbstractUnsupported
Preconditions.checkArgument(currentRow.containsCell(columnIndex), String.format("Column Index %d out of range", columnIndex));
Object cell = currentRow.getCell(columnIndex);
if (null == cell) {
this.wasNullFlag = true;
}
this.wasNullFlag = null == cell;
return cell;
}
@Override
public Statement getStatement() throws SQLException {
return resultSets.get(0).getStatement();
}
@Override
public Object getObject(final String columnLabel) throws SQLException {
return getObject(findColumn(columnLabel));
......@@ -153,6 +165,7 @@ public abstract class AbstractRowSetResultSetAdapter extends AbstractUnsupported
public boolean getBoolean(final int columnIndex) throws SQLException {
Object cell = getObject(columnIndex);
if (null == cell) {
wasNullFlag = false;
return false;
}
return (cell instanceof Boolean) ? (Boolean) cell : Boolean.valueOf(cell.toString());
......@@ -375,4 +388,9 @@ public abstract class AbstractRowSetResultSetAdapter extends AbstractUnsupported
public int getConcurrency() throws SQLException {
return ResultSet.CONCUR_READ_ONLY;
}
@Override
public String toString() {
return String.format("Current row is %s", currentRow);
}
}
......@@ -17,14 +17,14 @@
package com.dangdang.ddframe.rdb.sharding.jdbc.adapter;
import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException;
import com.dangdang.ddframe.rdb.sharding.jdbc.util.JdbcMethodInvocation;
import java.sql.SQLException;
import java.sql.Wrapper;
import java.util.ArrayList;
import java.util.Collection;
import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException;
import com.dangdang.ddframe.rdb.sharding.jdbc.util.JdbcMethodInvocation;
/**
* JDBC Wrapper适配类.
*
......@@ -32,7 +32,7 @@ import com.dangdang.ddframe.rdb.sharding.jdbc.util.JdbcMethodInvocation;
*/
public class WrapperAdapter implements Wrapper {
private Collection<JdbcMethodInvocation> jdbcMethodInvocations = new ArrayList<>();
private final Collection<JdbcMethodInvocation> jdbcMethodInvocations = new ArrayList<>();
@SuppressWarnings("unchecked")
@Override
......
......@@ -37,9 +37,8 @@ public abstract class AbstractUnsupportedOperationResultSet extends AbstractResu
throw new SQLFeatureNotSupportedException("previous");
}
//TODO:MERGE改造,改造后子类需要实现光标判断的四个方法
@Override
public boolean isBeforeFirst() throws SQLException {
public final boolean isBeforeFirst() throws SQLException {
throw new SQLFeatureNotSupportedException("isBeforeFirst");
}
......
......@@ -4,9 +4,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
......@@ -17,19 +17,15 @@
package com.dangdang.ddframe.rdb.sharding.merger;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException;
import com.dangdang.ddframe.rdb.sharding.merger.component.coupling.GroupByCouplingResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.component.coupling.LimitCouplingResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.component.other.WrapperResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.component.reducer.IteratorReducerResultSet;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.AggregationColumn;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.IndexColumn;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.MergeContext;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
......@@ -39,9 +35,16 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.map.CaseInsensitiveMap;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* 创建归并分片结果集的工厂.
*
*
* @author gaohongtao
*/
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
......@@ -50,7 +53,7 @@ public final class ResultSetFactory {
/**
* 获取结果集.
*
*
* @param resultSets 结果集列表
* @param mergeContext 结果归并上下文
* @return 结果集包装
......@@ -62,7 +65,7 @@ public final class ResultSetFactory {
return resultSets.get(0);
} else if (filteredResultSets.size() == 1) {
log.trace("Sharding-JDBC:Only one result set");
return resultSets.get(0);
return filteredResultSets.get(0);
}
setColumnIndex(filteredResultSets.get(0), mergeContext);
ResultSetPipelineBuilder builder = new ResultSetPipelineBuilder(filteredResultSets, mergeContext.getOrderByColumns());
......@@ -71,21 +74,25 @@ public final class ResultSetFactory {
return builder.build();
}
private static List<ResultSet> filterResultSets(final List<ResultSet> resultSets) throws SQLException {
return Lists.newArrayList(Collections2.filter(resultSets, new Predicate<ResultSet>() {
private static List<ResultSet> filterResultSets(final List<ResultSet> resultSets) {
return Lists.newArrayList(Collections2.filter(Lists.transform(resultSets, new Function<ResultSet, ResultSet>() {
@Override
public boolean apply(final ResultSet input) {
public ResultSet apply(final ResultSet input) {
try {
return input.next();
return new WrapperResultSet(input);
} catch (final SQLException ex) {
log.error("filter result set error", ex);
return false;
throw new ShardingJdbcException(ex);
}
}
}), new Predicate<ResultSet>() {
@Override
public boolean apply(final ResultSet input) {
return !((WrapperResultSet) input).isEmpty();
}
}));
}
private static void setColumnIndex(final ResultSet resultSet, final MergeContext mergeContext) throws SQLException {
ResultSetMetaData md = resultSet.getMetaData();
Map<String, Integer> columnLabelIndexMap = new CaseInsensitiveMap<>(md.getColumnCount());
......@@ -94,7 +101,10 @@ public final class ResultSetFactory {
columnLabelIndexMap.put(columnLabel, i);
}
for (IndexColumn each : extractIndexColumns(mergeContext)) {
Preconditions.checkState(columnLabelIndexMap.containsKey(each.getColumnLabel().orNull()) || columnLabelIndexMap.containsKey(each.getColumnName().orNull()),
if (each.getColumnIndex() > 0) {
continue;
}
Preconditions.checkState(columnLabelIndexMap.containsKey(each.getColumnLabel().orNull()) || columnLabelIndexMap.containsKey(each.getColumnName().orNull()),
String.format("%s has not index", each));
if (each.getColumnLabel().isPresent() && columnLabelIndexMap.containsKey(each.getColumnLabel().get())) {
each.setColumnIndex(columnLabelIndexMap.get(each.getColumnLabel().get()));
......
......@@ -17,12 +17,6 @@
package com.dangdang.ddframe.rdb.sharding.merger;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.List;
import com.dangdang.ddframe.rdb.sharding.merger.component.ComponentResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.component.CouplingResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.component.ReducerResultSet;
......@@ -31,9 +25,14 @@ import com.dangdang.ddframe.rdb.sharding.merger.component.reducer.MemoryOrderByR
import com.dangdang.ddframe.rdb.sharding.merger.component.reducer.StreamingOrderByReducerResultSet;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
import com.google.common.base.Preconditions;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.List;
/**
* 结果集管道构建器.
*
......@@ -42,14 +41,13 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ResultSetPipelineBuilder {
@Getter
private final List<ResultSet> inputResultSets;
private AbstractList<OrderByColumn> orderByColumns = new ArrayList<>();
private final AbstractList<OrderByColumn> orderByColumns = new ArrayList<>();
private ComponentResultSet tailResultSet;
public ResultSetPipelineBuilder(final List<ResultSet> resultSets, final List<OrderByColumn> orderByColumns) throws SQLException {
public ResultSetPipelineBuilder(final List<ResultSet> resultSets, final List<OrderByColumn> orderByColumns) {
inputResultSets = resultSets;
this.orderByColumns.addAll(orderByColumns);
}
......@@ -63,11 +61,12 @@ public class ResultSetPipelineBuilder {
public <T> ResultSetPipelineBuilder join(final ComponentResultSet<T> componentResultSet) throws SQLException {
Preconditions.checkArgument(componentResultSet instanceof ReducerResultSet || componentResultSet instanceof CouplingResultSet);
if (componentResultSet instanceof ReducerResultSet) {
((ReducerResultSet) componentResultSet).inject(inputResultSets);
((ReducerResultSet) componentResultSet).init(inputResultSets);
} else {
((CouplingResultSet) componentResultSet).inject(tailResultSet);
((CouplingResultSet) componentResultSet).init(tailResultSet);
}
tailResultSet = componentResultSet;
log.trace("join component {}", tailResultSet.getClass().getSimpleName());
return this;
}
......@@ -81,6 +80,7 @@ public class ResultSetPipelineBuilder {
if (orderEqual(expectOrderList)) {
join(new StreamingOrderByReducerResultSet(expectOrderList));
} else {
setNewOrder(expectOrderList);
join(new MemoryOrderByReducerResultSet(expectOrderList));
}
return this;
......@@ -97,6 +97,7 @@ public class ResultSetPipelineBuilder {
if (orderEqual(expectOrderList)) {
return this;
}
setNewOrder(expectOrderList);
join(new MemoryOrderByCouplingResultSet(expectOrderList));
return this;
}
......@@ -107,7 +108,6 @@ public class ResultSetPipelineBuilder {
* @return 结果集
*/
public ResultSet build() {
log.trace("The pipeline of result set handling is : {}", tailResultSet);
return tailResultSet;
}
......@@ -115,4 +115,9 @@ public class ResultSetPipelineBuilder {
return orderByColumns.equals(expectOrderList);
}
private void setNewOrder(final List<OrderByColumn> orderList) {
orderByColumns.clear();
orderByColumns.addAll(orderList);
}
}
......@@ -17,21 +17,16 @@
package com.dangdang.ddframe.rdb.sharding.merger.common;
import java.math.BigDecimal;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn.OrderByType;
import com.google.common.base.Function;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import java.math.BigDecimal;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Date;
/**
* 结果集处理工具类.
*
......@@ -135,15 +130,4 @@ public final class ResultSetUtil {
public static int compareTo(final Comparable thisValue, final Comparable otherValue, final OrderByType orderByType) {
return OrderByType.ASC == orderByType ? thisValue.compareTo(otherValue) : -thisValue.compareTo(otherValue);
}
public static <V> List<V> iterateResultSet(final ResultSet rs, final Function<ResultSet, V> function) throws SQLException {
if (rs.isBeforeFirst()) {
rs.next();
}
List<V> result = new ArrayList<>();
do {
result.add(function.apply(rs));
} while (rs.next());
return result;
}
}
......@@ -27,5 +27,5 @@ import java.sql.SQLException;
*/
public interface ComponentResultSet<T> extends ResultSet {
void inject(final T preResultSet) throws SQLException;
void init(final T preResultSet) throws SQLException;
}
......@@ -17,11 +17,6 @@
package com.dangdang.ddframe.rdb.sharding.merger.component.coupling;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractRowSetResultSetAdapter;
import com.dangdang.ddframe.rdb.sharding.merger.component.CouplingResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.row.GroupByRow;
......@@ -30,9 +25,14 @@ import com.dangdang.ddframe.rdb.sharding.parser.result.merger.AggregationColumn;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.GroupByColumn;
import lombok.RequiredArgsConstructor;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
/**
* 分组节点结果集.
*
*
* @author gaohongtao
*/
@RequiredArgsConstructor
......@@ -47,7 +47,7 @@ public class GroupByCouplingResultSet extends AbstractRowSetResultSetAdapter imp
private boolean hasNext;
@Override
public void inject(final ResultSet preResultSet) {
public void init(final ResultSet preResultSet) {
setResultSets(Collections.singletonList(preResultSet));
}
......@@ -62,8 +62,8 @@ public class GroupByCouplingResultSet extends AbstractRowSetResultSetAdapter imp
if (!hasNext) {
return null;
}
GroupByRow row = new GroupByRow(resultSet);
hasNext = row.aggregate(groupByColumns, aggregationColumns);
GroupByRow row = new GroupByRow(resultSet, groupByColumns, aggregationColumns);
hasNext = row.aggregate();
return row;
}
}
......@@ -17,21 +17,25 @@
package com.dangdang.ddframe.rdb.sharding.merger.component.coupling;
import java.sql.ResultSet;
import java.sql.SQLException;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractDelegateResultSetAdapter;
import com.dangdang.ddframe.rdb.sharding.merger.component.CouplingResultSet;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.Limit;
import lombok.extern.slf4j.Slf4j;
import java.sql.ResultSet;
import java.sql.SQLException;
/**
* 限制结果集.
*
* @author gaohongtao
*/
@Slf4j
public class LimitCouplingResultSet extends AbstractDelegateResultSetAdapter implements CouplingResultSet {
private final Limit limit;
private int rowCount;
private int rowNumber;
private ResultSet preResultSet;
......@@ -42,24 +46,41 @@ public class LimitCouplingResultSet extends AbstractDelegateResultSetAdapter imp
}
@Override
public void inject(final ResultSet preResultSet) {
public void init(final ResultSet preResultSet) {
setDelegatedResultSet(preResultSet);
this.preResultSet = preResultSet;
}
@Override
public boolean next() throws SQLException {
init();
return ++rowCount <= limit.getRowCount() && preResultSet.next();
boolean result = true;
if (!initial) {
result = skipOffset();
}
if (!result) {
return false;
}
result = ++rowNumber <= limit.getRowCount() && preResultSet.next();
if (result) {
increaseStat();
}
return result;
}
private void init() throws SQLException {
if (initial) {
return;
}
private boolean skipOffset() throws SQLException {
boolean result = true;
for (int i = 0; i < limit.getOffset(); i++) {
preResultSet.next();
result = preResultSet.next();
if (!result) {
break;
}
}
initial = true;
return result;
}
@Override
public String toString() {
return String.format("Limit row number:%d limit size:%d result set stat:%s", rowNumber, limit.getRowCount(), super.toString());
}
}
......@@ -17,14 +17,14 @@
package com.dangdang.ddframe.rdb.sharding.merger.component.coupling;
import java.sql.ResultSet;
import java.util.List;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractDelegateResultSetAdapter;
import com.dangdang.ddframe.rdb.sharding.merger.common.MemoryOrderByResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.component.CouplingResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.component.other.MemoryOrderByResultSet;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
import java.sql.ResultSet;
import java.util.List;
/**
* 基于内存的全排序.
*
......@@ -39,7 +39,7 @@ public class MemoryOrderByCouplingResultSet extends AbstractDelegateResultSetAda
}
@Override
public void inject(final ResultSet preResultSet) {
public void init(final ResultSet preResultSet) {
setDelegatedResultSet(new MemoryOrderByResultSet(preResultSet, expectOrderList));
}
}
......@@ -15,7 +15,12 @@
* </p>
*/
package com.dangdang.ddframe.rdb.sharding.merger.common;
package com.dangdang.ddframe.rdb.sharding.merger.component.other;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractRowSetResultSetAdapter;
import com.dangdang.ddframe.rdb.sharding.merger.row.OrderByRow;
import com.dangdang.ddframe.rdb.sharding.merger.row.Row;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
import java.sql.ResultSet;
import java.sql.SQLException;
......@@ -24,16 +29,9 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractRowSetResultSetAdapter;
import com.dangdang.ddframe.rdb.sharding.merger.row.OrderByRow;
import com.dangdang.ddframe.rdb.sharding.merger.row.Row;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
import com.google.common.base.Function;
/**
* 内存结果集.
*
*
* @author gaohongtao
*/
public class MemoryOrderByResultSet extends AbstractRowSetResultSetAdapter {
......@@ -55,16 +53,9 @@ public class MemoryOrderByResultSet extends AbstractRowSetResultSetAdapter {
protected void initRows(final List<ResultSet> resultSets) throws SQLException {
List<OrderByRow> orderByRows = new LinkedList<>();
for (ResultSet each : resultSets) {
orderByRows.addAll(ResultSetUtil.iterateResultSet(each, new Function<ResultSet, OrderByRow>() {
@Override
public OrderByRow apply(final ResultSet input) {
try {
return new OrderByRow(orderByColumns, input);
} catch (final SQLException ex) {
throw new ShardingJdbcException("Access result set error", ex);
}
}
}));
while (each.next()) {
orderByRows.add(new OrderByRow(orderByColumns, each));
}
}
Collections.sort(orderByRows);
orderByRowsIterator = orderByRows.iterator();
......
......@@ -4,9 +4,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
......@@ -15,62 +15,43 @@
* </p>
*/
package com.dangdang.ddframe.rdb.sharding.jdbc;
package com.dangdang.ddframe.rdb.sharding.merger.component.other;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractDelegateResultSetAdapter;
import lombok.Getter;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractResultSetAdapter;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.Limit;
import lombok.extern.slf4j.Slf4j;
/**
* 支持分片的结果集抽象类.
* 原始结果集包装类.
*
* @author zhangliang
* @author gaohongtao
*/
@Slf4j
public abstract class AbstractShardingResultSet extends AbstractResultSetAdapter {
private final Limit limit;
public class WrapperResultSet extends AbstractDelegateResultSetAdapter {
private boolean offsetSkipped;
@Getter
private final boolean isEmpty;
private int readCount;
private boolean isFirstNext;
protected AbstractShardingResultSet(final List<ResultSet> resultSets, final Limit limit) {
// super(resultSets);
this.limit = limit;
setCurrentResultSet(resultSets.get(0));
public WrapperResultSet(final ResultSet resultSet) throws SQLException {
isEmpty = !resultSet.next();
if (isEmpty) {
return;
}
setDelegatedResultSet(resultSet);
increaseStat();
}
@Override
public final boolean next() throws SQLException {
if (null != limit && !offsetSkipped) {
skipOffset();
public boolean next() throws SQLException {
if (isEmpty) {
return false;
}
return null == limit ? nextForSharding() : ++readCount <= limit.getRowCount() && nextForSharding();
}
private void skipOffset() {
for (int i = 0; i < limit.getOffset(); i++) {
try {
if (!nextForSharding()) {
break;
}
} catch (final SQLException ignored) {
log.warn("Skip result set error", ignored);
}
if (!isFirstNext) {
return isFirstNext = true;
}
offsetSkipped = true;
return super.next();
}
/**
* 迭代结果集.
*
* @return true 可以继续访问 false 不能继续访问
* @throws SQLException
*/
protected abstract boolean nextForSharding() throws SQLException;
}
......@@ -17,14 +17,14 @@
package com.dangdang.ddframe.rdb.sharding.merger.component.reducer;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractResultSetAdapter;
import com.dangdang.ddframe.rdb.sharding.merger.component.ReducerResultSet;
import lombok.extern.slf4j.Slf4j;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
/**
* 迭代归并结果集.
*
......@@ -38,23 +38,27 @@ public class IteratorReducerResultSet extends AbstractResultSetAdapter implement
private int currentResultSetOffset;
@Override
public void inject(final List<ResultSet> preResultSet) {
public void init(final List<ResultSet> preResultSet) {
setResultSets(preResultSet);
resultSetIndex++;
setCurrentResultSet(preResultSet.get(0));
}
@Override
public boolean next() throws SQLException {
if (null != getCurrentResultSet() && getCurrentResultSet().next()) {
currentResultSetOffset++;
log.trace(toString());
return true;
}
if (resultSetIndex >= getResultSets().size()) {
return false;
}
currentResultSetOffset = 1;
ResultSet rs = getResultSets().get(resultSetIndex++);
setCurrentResultSet(rs);
log.trace(toString());
return true;
return rs.next();
}
@Override
......
......@@ -17,14 +17,14 @@
package com.dangdang.ddframe.rdb.sharding.merger.component.reducer;
import java.sql.ResultSet;
import java.util.List;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractDelegateResultSetAdapter;
import com.dangdang.ddframe.rdb.sharding.merger.common.MemoryOrderByResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.component.ReducerResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.component.other.MemoryOrderByResultSet;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
import java.sql.ResultSet;
import java.util.List;
/**
* 根据排序列进行内存中排序.
*
......@@ -39,7 +39,7 @@ public class MemoryOrderByReducerResultSet extends AbstractDelegateResultSetAdap
}
@Override
public void inject(final List<ResultSet> preResultSet) {
public void init(final List<ResultSet> preResultSet) {
setDelegatedResultSet(new MemoryOrderByResultSet(preResultSet, orderByColumns));
}
......
......@@ -17,19 +17,21 @@
package com.dangdang.ddframe.rdb.sharding.merger.component.reducer;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractResultSetAdapter;
import com.dangdang.ddframe.rdb.sharding.merger.component.ReducerResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.row.OrderByRow;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
import com.google.common.collect.Lists;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import lombok.extern.slf4j.Slf4j;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
/**
* 流式排序.
*
......@@ -40,24 +42,29 @@ public class StreamingOrderByReducerResultSet extends AbstractResultSetAdapter i
private final List<OrderByColumn> orderByColumns;
private List<ResultSet> effectiveResultSets;
private boolean initial;
private Map<ResultSet, Integer> innerStateMap = new HashMap<>();
private Queue<ResultSet> effectiveResultSetQueue;
public StreamingOrderByReducerResultSet(final List<OrderByColumn> orderByColumns) {
this.orderByColumns = orderByColumns;
}
@Override
public void inject(final List<ResultSet> preResultSet) throws SQLException {
public void init(final List<ResultSet> preResultSet) throws SQLException {
setResultSets(preResultSet);
setCurrentResultSet(preResultSet.get(0));
effectiveResultSets = Lists.newArrayList(preResultSet);
if (log.isDebugEnabled()) {
}
effectiveResultSetQueue = new LinkedList<>(Collections2.filter(preResultSet, new Predicate<ResultSet>() {
@Override
public boolean apply(final ResultSet input) {
try {
return input.next();
} catch (final SQLException ex) {
throw new ShardingJdbcException(ex);
}
}
}));
log.trace("Effective result set:{}", effectiveResultSetQueue);
}
@Override
......@@ -68,20 +75,29 @@ public class StreamingOrderByReducerResultSet extends AbstractResultSetAdapter i
initial = true;
}
OrderByRow chosenOrderByValue = null;
for (ResultSet each : effectiveResultSets) {
for (ResultSet each : effectiveResultSetQueue) {
OrderByRow eachOrderByValue = new OrderByRow(orderByColumns, each);
if (null == chosenOrderByValue || chosenOrderByValue.compareTo(eachOrderByValue) > 0) {
chosenOrderByValue = eachOrderByValue;
setCurrentResultSet(each);
}
}
return !effectiveResultSets.isEmpty();
if (!effectiveResultSetQueue.isEmpty()) {
log.trace(toString());
}
return !effectiveResultSetQueue.isEmpty();
}
private void nextEffectiveResultSets() throws SQLException {
boolean next = getCurrentResultSet().next();
if (!next) {
effectiveResultSets.remove(getCurrentResultSet());
effectiveResultSetQueue.remove(getCurrentResultSet());
log.trace("Result set {} finish", getCurrentResultSet());
}
}
@Override
public String toString() {
return String.format("Current result set:%s", getCurrentResultSet());
}
}
......@@ -17,14 +17,6 @@
package com.dangdang.ddframe.rdb.sharding.merger.row;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException;
import com.dangdang.ddframe.rdb.sharding.merger.aggregation.AggregationUnit;
import com.dangdang.ddframe.rdb.sharding.merger.aggregation.AggregationUnitFactory;
......@@ -33,41 +25,50 @@ import com.dangdang.ddframe.rdb.sharding.parser.result.merger.GroupByColumn;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.IndexColumn;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author gaohongtao
*/
@Slf4j
public class GroupByRow extends Row {
private final ResultSet resultSet;
private final Function<IndexColumn, Object> getValueByColumnIndexFunction = new Function<IndexColumn, Object>() {
@Override
public Object apply(final IndexColumn input) {
return getValueSafely(input.getColumnIndex());
}
};
private final List<GroupByColumn> groupByColumns;
public GroupByRow(final ResultSet resultSet) throws SQLException {
private final List<AggregationColumn> aggregationColumns;
public GroupByRow(final ResultSet resultSet, final List<GroupByColumn> groupByColumns, final List<AggregationColumn> aggregationColumns) throws SQLException {
super(resultSet);
this.resultSet = resultSet;
this.groupByColumns = groupByColumns;
this.aggregationColumns = aggregationColumns;
}
public boolean aggregate(final List<GroupByColumn> groupByColumns, final List<AggregationColumn> aggregationColumns) throws SQLException {
public boolean aggregate() throws SQLException {
Map<AggregationColumn, AggregationUnit> aggregationUnitMap = null;
if (!aggregationColumns.isEmpty()) {
aggregationUnitMap = new HashMap<>(aggregationColumns.size());
}
List<Object> groupByKey = getGroupByKey(groupByColumns);
boolean hasNext;
while (hasNext = resultSet.next()) {
List<Object> nextRowGroupByKey = getGroupByKey(groupByColumns);
if (!groupByColumns.isEmpty() && !groupByKey.equals(nextRowGroupByKey)) {
log.trace("Group {} start", groupByKey);
boolean hasNext = false;
do {
if (!groupByColumns.isEmpty() && !groupByKey.equals(getGroupByKey(groupByColumns))) {
log.trace("Group {} finish", groupByKey);
break;
}
mergeAggregationColumn(aggregationColumns, aggregationUnitMap);
groupByKey = nextRowGroupByKey;
}
mergeAggregationColumn(aggregationUnitMap);
} while (hasNext = resultSet.next());
if (null == aggregationUnitMap) {
return hasNext;
}
......@@ -84,9 +85,8 @@ public class GroupByRow extends Row {
}
return result;
}
@SuppressWarnings("SuspiciousToArrayCall")
private void mergeAggregationColumn(final List<AggregationColumn> aggregationColumns, final Map<AggregationColumn, AggregationUnit> aggregationUnitMap) {
private void mergeAggregationColumn(final Map<AggregationColumn, AggregationUnit> aggregationUnitMap) {
if (null == aggregationUnitMap) {
return;
}
......@@ -101,7 +101,14 @@ public class GroupByRow extends Row {
} else {
mergingAggregationColumns = Lists.newArrayList(each.getDerivedColumns());
}
unit.merge(Lists.transform(mergingAggregationColumns, getValueByColumnIndexFunction).toArray(new Comparable[each.getDerivedColumns().size()]));
unit.merge(Lists.transform(mergingAggregationColumns, new Function<IndexColumn, Comparable<?>>() {
@Override
public Comparable<?> apply(final IndexColumn input) {
log.trace("Column Index {} will be merged", input.getColumnIndex());
return (Comparable<?>) getValueSafely(input.getColumnIndex());
}
}).toArray(new Comparable[mergingAggregationColumns.size()]));
}
}
......@@ -112,4 +119,9 @@ public class GroupByRow extends Row {
throw new ShardingJdbcException(ex);
}
}
@Override
public String toString() {
return String.format("Group by columns is %s, aggregation column is %s, %s", groupByColumns, aggregationColumns, super.toString());
}
}
......@@ -17,15 +17,15 @@
package com.dangdang.ddframe.rdb.sharding.merger.row;
import com.dangdang.ddframe.rdb.sharding.merger.common.ResultSetUtil;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
import com.google.common.base.Preconditions;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import com.dangdang.ddframe.rdb.sharding.merger.common.ResultSetUtil;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
import com.google.common.base.Preconditions;
/**
* 具有排序功能的行对象.
*
......@@ -43,7 +43,7 @@ public class OrderByRow extends Row implements Comparable<OrderByRow> {
this.values = getValues();
}
private List<Comparable<?>> getValues() throws SQLException {
private List<Comparable<?>> getValues() {
List<Comparable<?>> result = new ArrayList<>(orderByColumns.size());
for (OrderByColumn each : orderByColumns) {
Object value = getCell(each.getColumnIndex());
......@@ -64,4 +64,9 @@ public class OrderByRow extends Row implements Comparable<OrderByRow> {
}
return 0;
}
@Override
public String toString() {
return String.format("Order by columns value is %s", values);
}
}
......@@ -17,11 +17,14 @@
package com.dangdang.ddframe.rdb.sharding.merger.row;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import com.google.common.base.Preconditions;
import java.util.Arrays;
/**
* 数据行.
......@@ -45,7 +48,7 @@ public class Row {
return result;
}
protected void setCell(final int index, final Object value) {
void setCell(final int index, final Object value) {
Preconditions.checkArgument(containsCell(index));
rowData[index - 1] = value;
}
......@@ -59,4 +62,14 @@ public class Row {
return index - 1 > -1 && index - 1 < rowData.length;
}
@Override
public String toString() {
return String.format("value is : %s", Lists.transform(Arrays.asList(rowData), new Function<Object, Object>() {
@Override
public Object apply(final Object input) {
return null == input ? "nil" : input;
}
}));
}
}
......@@ -68,6 +68,7 @@ public final class ThreadLocalObjectContainer {
* @param clazz 对象类型
* @return 对象
*/
@SuppressWarnings("unchecked")
public static <T> T getItem(final Class<T> clazz) {
return (T) (null == THREAD_LOCAL_CONTAINER.get() ? null : THREAD_LOCAL_CONTAINER.get().get(clazz.getName()));
}
......
......@@ -18,6 +18,7 @@
package com.dangdang.ddframe.rdb.sharding.parser.result.merger;
import com.alibaba.druid.sql.ast.SQLOrderingSpecification;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
import lombok.AccessLevel;
import lombok.Getter;
......@@ -94,4 +95,24 @@ public final class OrderByColumn implements IndexColumn {
return OrderByType.valueOf(sqlOrderingSpecification.name());
}
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (!(o instanceof OrderByColumn)) {
return false;
}
OrderByColumn that = (OrderByColumn) o;
return orderByType == that.orderByType && (columnIndex == that.columnIndex
|| index.isPresent() && that.index.isPresent() && index.get().equals(that.index.get())
|| name.isPresent() && that.name.isPresent() && name.get().equals(that.name.get())
|| alias.isPresent() && that.alias.isPresent() && alias.get().equals(that.alias.get()));
}
@Override
public int hashCode() {
return Objects.hashCode(orderByType, columnIndex);
}
}
......@@ -17,9 +17,7 @@
package com.dangdang.ddframe.rdb.sharding.jdbc;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractRowSetResultSetAdapterTest;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.ConnectionAdapterTest;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.DataSourceAdapterTest;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.PreparedStatementAdapterTest;
......@@ -33,6 +31,8 @@ import com.dangdang.ddframe.rdb.sharding.jdbc.unsupported.UnsupportedOperationPr
import com.dangdang.ddframe.rdb.sharding.jdbc.unsupported.UnsupportedOperationResultSetTest;
import com.dangdang.ddframe.rdb.sharding.jdbc.unsupported.UnsupportedOperationStatementTest;
import com.dangdang.ddframe.rdb.sharding.jdbc.util.JdbcMethodInvocationTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@RunWith(Suite.class)
@Suite.SuiteClasses({
......@@ -50,7 +50,8 @@ import com.dangdang.ddframe.rdb.sharding.jdbc.util.JdbcMethodInvocationTest;
PreparedStatementAdapterTest.class,
ResultSetGetterAdapterTest.class,
ResultSetAdapterTest.class,
JdbcMethodInvocationTest.class
JdbcMethodInvocationTest.class,
AbstractRowSetResultSetAdapterTest.class,
})
public class AllJDBCTest {
}
......@@ -15,7 +15,14 @@
* </p>
*/
package com.dangdang.ddframe.rdb.sharding.merger.rs;
package com.dangdang.ddframe.rdb.sharding.jdbc.adapter;
import com.dangdang.ddframe.rdb.sharding.jdbc.fixture.MockRowSetResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.fixture.MockResultSet;
import lombok.RequiredArgsConstructor;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.lang.reflect.InvocationTargetException;
import java.math.BigDecimal;
......@@ -30,20 +37,13 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import com.dangdang.ddframe.rdb.sharding.merger.common.MemoryOrderByResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.fixture.MockResultSet;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
import lombok.RequiredArgsConstructor;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNull.nullValue;
import static org.junit.Assert.assertThat;
@RunWith(Parameterized.class)
@RequiredArgsConstructor
public class MemoryOrderByResultSetGetTest {
public class AbstractRowSetResultSetAdapterTest {
private final Object input;
......@@ -87,8 +87,19 @@ public class MemoryOrderByResultSetGetTest {
@Test
public void test() throws SQLException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
MemoryOrderByResultSet rs = new MemoryOrderByResultSet(Collections.<ResultSet>singletonList(new MockResultSet<>(input)), Collections.<OrderByColumn>emptyList());
rs.next();
MockRowSetResultSet rs = new MockRowSetResultSet();
MockResultSet mockResultSet = new MockResultSet<>(input);
rs.setResultSets(Collections.<ResultSet>singletonList(mockResultSet));
assertThat(rs.getStatement(), nullValue());
assertThat(rs.getFetchSize() ,is(1));
assertThat(rs.getMetaData().getColumnCount(), is(1));
assertThat(rs.getFetchDirection(), is(ResultSet.FETCH_FORWARD));
assertThat(rs.getType(), is(ResultSet.TYPE_FORWARD_ONLY));
assertThat(rs.getConcurrency(), is(ResultSet.CONCUR_READ_ONLY));
rs.clearWarnings();
assertThat(rs.getWarnings(), nullValue());
assertThat(rs.next(), is(true));
if (scale > 0) {
assertThat(ResultSet.class.getMethod(methodName, int.class, int.class).invoke(rs, 1, scale), is(result));
assertThat(ResultSet.class.getMethod(methodName, String.class, int.class).invoke(rs, "name", scale), is(result));
......@@ -99,5 +110,14 @@ public class MemoryOrderByResultSetGetTest {
assertThat(ResultSet.class.getMethod(methodName, int.class).invoke(rs, 1), is(result));
assertThat(ResultSet.class.getMethod(methodName, String.class).invoke(rs, "name"), is(result));
}
if(null == result){
assertThat(rs.wasNull(), is(true));
} else {
assertThat(rs.wasNull(), is(false));
}
assertThat(rs.next(), is(false));
assertThat(rs.isClosed(), is(false));
rs.close();
assertThat(rs.isClosed(), is(true));
}
}
......@@ -17,6 +17,19 @@
package com.dangdang.ddframe.rdb.sharding.jdbc.adapter;
import com.dangdang.ddframe.rdb.integrate.AbstractDBUnitTest;
import com.dangdang.ddframe.rdb.integrate.db.AbstractShardingDataBasesOnlyDBUnitTest;
import com.dangdang.ddframe.rdb.sharding.api.DatabaseType;
import com.dangdang.ddframe.rdb.sharding.api.ShardingDataSource;
import com.dangdang.ddframe.rdb.sharding.jdbc.ShardingConnection;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
......@@ -24,21 +37,6 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.dangdang.ddframe.rdb.integrate.AbstractDBUnitTest;
import com.dangdang.ddframe.rdb.integrate.db.AbstractShardingDataBasesOnlyDBUnitTest;
import com.dangdang.ddframe.rdb.sharding.api.DatabaseType;
import com.dangdang.ddframe.rdb.sharding.api.ShardingDataSource;
import com.dangdang.ddframe.rdb.sharding.jdbc.AbstractShardingResultSet;
import com.dangdang.ddframe.rdb.sharding.jdbc.ShardingConnection;
public final class ResultSetAdapterTest extends AbstractShardingDataBasesOnlyDBUnitTest {
private ShardingDataSource shardingDataSource;
......@@ -67,10 +65,10 @@ public final class ResultSetAdapterTest extends AbstractShardingDataBasesOnlyDBU
@Test
public void assertColse() throws SQLException {
actual.close();
assertClose((AbstractShardingResultSet) actual);
assertClose((AbstractResultSetAdapter) actual);
}
private void assertClose(final AbstractShardingResultSet actual) throws SQLException {
private void assertClose(final AbstractResultSetAdapter actual) throws SQLException {
assertTrue(actual.isClosed());
assertThat(actual.getResultSets().size(), is(10));
for (ResultSet each : actual.getResultSets()) {
......@@ -90,10 +88,10 @@ public final class ResultSetAdapterTest extends AbstractShardingDataBasesOnlyDBU
actual.setFetchDirection(ResultSet.FETCH_REVERSE);
} catch (final SQLException ignore) {
}
assertFetchDirection((AbstractShardingResultSet) actual, ResultSet.FETCH_REVERSE);
assertFetchDirection((AbstractResultSetAdapter) actual, ResultSet.FETCH_REVERSE);
}
private void assertFetchDirection(final AbstractShardingResultSet actual, final int fetchDirection) throws SQLException {
private void assertFetchDirection(final AbstractResultSetAdapter actual, final int fetchDirection) throws SQLException {
// H2数据库未实现getFetchDirection方法
assertThat(actual.getFetchDirection(), is(DatabaseType.H2 == AbstractDBUnitTest.CURRENT_DB_TYPE ? ResultSet.FETCH_FORWARD : fetchDirection));
assertThat(actual.getResultSets().size(), is(10));
......@@ -106,10 +104,10 @@ public final class ResultSetAdapterTest extends AbstractShardingDataBasesOnlyDBU
public void assertSetFetchSize() throws SQLException {
assertThat(actual.getFetchSize(), is(0));
actual.setFetchSize(100);
assertFetchSize((AbstractShardingResultSet) actual, 100);
assertFetchSize((AbstractResultSetAdapter) actual, 100);
}
private void assertFetchSize(final AbstractShardingResultSet actual, final int fetchSize) throws SQLException {
private void assertFetchSize(final AbstractResultSetAdapter actual, final int fetchSize) throws SQLException {
// H2数据库未实现getFetchSize方法
assertThat(actual.getFetchSize(), is(DatabaseType.H2 == AbstractDBUnitTest.CURRENT_DB_TYPE ? 0 : fetchSize));
assertThat(actual.getResultSets().size(), is(10));
......
/**
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package com.dangdang.ddframe.rdb.sharding.jdbc.fixture;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractRowSetResultSetAdapter;
import com.dangdang.ddframe.rdb.sharding.merger.row.Row;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
public class MockRowSetResultSet extends AbstractRowSetResultSetAdapter {
private ResultSet resultSet;
@Override
protected void initRows(final List<ResultSet> resultSets) throws SQLException {
this.resultSet = resultSets.get(0);
}
@Override
protected Row nextRow() throws SQLException {
if(resultSet.next()){
return new Row(resultSet);
}
return null;
}
}
......@@ -25,7 +25,6 @@ import com.dangdang.ddframe.rdb.sharding.merger.aggregation.NullableAggregationR
import com.dangdang.ddframe.rdb.sharding.merger.iterator.IteratorResultSetTest;
import com.dangdang.ddframe.rdb.sharding.merger.orderby.OrderByResultSetTest;
import com.dangdang.ddframe.rdb.sharding.merger.orderby.OrderByRowTest;
import com.dangdang.ddframe.rdb.sharding.merger.rs.MemoryOrderByResultSetGetTest;
import com.dangdang.ddframe.rdb.sharding.merger.rs.MemoryOrderByResultSetTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
......@@ -42,7 +41,6 @@ import org.junit.runners.Suite;
NullableAggregationResultSetTest.class,
OrderByRowTest.class,
MemoryOrderByResultSetTest.class,
MemoryOrderByResultSetGetTest.class
})
public class AllMergerTest {
}
......@@ -17,12 +17,6 @@
package com.dangdang.ddframe.rdb.sharding.merger.aggregation;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import com.dangdang.ddframe.rdb.sharding.merger.ResultSetFactory;
import com.dangdang.ddframe.rdb.sharding.merger.fixture.MergerTestUtil;
import com.dangdang.ddframe.rdb.sharding.merger.fixture.MockResultSet;
......@@ -34,6 +28,13 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
......@@ -62,22 +63,22 @@ public final class AggregationResultSetTest {
@Parameterized.Parameters(name = "{index}: testTarget:{0}, aggregation type:{1}, columns:{2}, r1:{3}, r2:{4}, rsName:{5}, rsClass:{6}, result:{7}")
public static Collection init() {
return Arrays.asList(new Object[][]{
{TestTarget.INDEX, AggregationType.SUM, Arrays.asList(""), Arrays.asList(6), Arrays.asList(2), Optional.absent(), Integer.class, 8},
{TestTarget.COLUMN_NAME, AggregationType.SUM, Arrays.asList("SUM(0)"), Arrays.asList(6), Arrays.asList(2), Optional.of("SUM(0)"), Integer.class, 8},
{TestTarget.ALIAS, AggregationType.SUM, Arrays.asList("SUM_RESULT"), Arrays.asList(6), Arrays.asList(2), Optional.of("SUM_RESULT"), Integer.class, 8},
{TestTarget.INDEX, AggregationType.COUNT, Arrays.asList(""), Arrays.asList(6), Arrays.asList(2), Optional.absent(), Integer.class, 8},
{TestTarget.COLUMN_NAME, AggregationType.COUNT, Arrays.asList("COUNT(`id`)"), Arrays.asList(6), Arrays.asList(2), Optional.of("COUNT(`id`)"), Integer.class, 8},
{TestTarget.ALIAS, AggregationType.COUNT, Arrays.asList("COUNT_RESULT"), Arrays.asList(6), Arrays.asList(2), Optional.of("COUNT_RESULT"), Integer.class, 8},
{TestTarget.INDEX, AggregationType.MAX, Arrays.asList(""), Arrays.asList(6), Arrays.asList(2), Optional.absent(), Integer.class, 6},
{TestTarget.COLUMN_NAME, AggregationType.MAX, Arrays.asList("MAX(id)"), Arrays.asList(6), Arrays.asList(2), Optional.of("MAX(`id`)"), Integer.class, 6},
{TestTarget.ALIAS, AggregationType.MAX, Arrays.asList("MAX_RESULT"), Arrays.asList(6), Arrays.asList(2), Optional.of("MAX_RESULT"), Integer.class, 6},
{TestTarget.INDEX, AggregationType.MIN, Arrays.asList(""), Arrays.asList(6), Arrays.asList(2), Optional.absent(), Integer.class, 2},
{TestTarget.COLUMN_NAME, AggregationType.MIN, Arrays.asList("MIN(0)"), Arrays.asList(6), Arrays.asList(2), Optional.of("MIN(0)"), Integer.class, 2},
{TestTarget.ALIAS, AggregationType.MIN, Arrays.asList("MIN_RESULT"), Arrays.asList(6), Arrays.asList(2), Optional.of("MIN_RESULT"), Integer.class, 2},
return Collections.singletonList(new Object[][]{
{TestTarget.INDEX, AggregationType.SUM, Collections.singletonList("column"), Collections.singletonList(6), Collections.singletonList(2), Optional.absent(), Integer.class, 8},
{TestTarget.COLUMN_NAME, AggregationType.SUM, Collections.singletonList("SUM(0)"), Collections.singletonList(6), Collections.singletonList(2), Optional.of("SUM(0)"), Integer.class, 8},
{TestTarget.ALIAS, AggregationType.SUM, Collections.singletonList("SUM_RESULT"), Collections.singletonList(6), Collections.singletonList(2), Optional.of("SUM_RESULT"), Integer.class, 8},
{TestTarget.INDEX, AggregationType.COUNT, Collections.singletonList("column"), Collections.singletonList(6), Collections.singletonList(2), Optional.absent(), Integer.class, 8},
{TestTarget.COLUMN_NAME, AggregationType.COUNT, Collections.singletonList("COUNT(`id`)"), Collections.singletonList(6), Collections.singletonList(2), Optional.of("COUNT(`id`)"), Integer.class, 8},
{TestTarget.ALIAS, AggregationType.COUNT, Collections.singletonList("COUNT_RESULT"), Collections.singletonList(6), Collections.singletonList(2), Optional.of("COUNT_RESULT"), Integer.class, 8},
{TestTarget.INDEX, AggregationType.MAX, Collections.singletonList("column"), Collections.singletonList(6), Collections.singletonList(2), Optional.absent(), Integer.class, 6},
{TestTarget.COLUMN_NAME, AggregationType.MAX, Collections.singletonList("MAX(id)"), Collections.singletonList(6), Collections.singletonList(2), Optional.of("MAX(`id`)"), Integer.class, 6},
{TestTarget.ALIAS, AggregationType.MAX, Collections.singletonList("MAX_RESULT"), Collections.singletonList(6), Collections.singletonList(2), Optional.of("MAX_RESULT"), Integer.class, 6},
{TestTarget.INDEX, AggregationType.MIN, Collections.singletonList("column"), Collections.singletonList(6), Collections.singletonList(2), Optional.absent(), Integer.class, 2},
{TestTarget.COLUMN_NAME, AggregationType.MIN, Collections.singletonList("MIN(0)"), Collections.singletonList(6), Collections.singletonList(2), Optional.of("MIN(0)"), Integer.class, 2},
{TestTarget.ALIAS, AggregationType.MIN, Collections.singletonList("MIN_RESULT"), Collections.singletonList(6), Collections.singletonList(2), Optional.of("MIN_RESULT"), Integer.class, 2},
{TestTarget.INDEX, AggregationType.AVG, Arrays.asList("sharding_gen_1", "sharding_gen_2"), Arrays.asList(5, 10), Arrays.asList(10, 100), Optional.absent(), Double.class, 7.3333D},
{TestTarget.COLUMN_NAME, AggregationType.AVG, Arrays.asList("sharding_gen_1", "sharding_gen_2"), Arrays.asList(5, 10), Arrays.asList(10, 100), Optional.of("AVG(*)"), Double.class, 7.3333D},
{TestTarget.ALIAS, AggregationType.AVG, Arrays.asList("sharding_gen_1", "sharding_gen_2"), Arrays.asList(5, 10), Arrays.asList(10, 100), Optional.of("AVG_RESULT"), Double.class, 7.3333D},
{TestTarget.COLUMN_NAME, AggregationType.AVG, Arrays.asList("AVG(*)", "sharding_gen_1", "sharding_gen_2"), Arrays.asList(2, 5, 10), Arrays.asList(10, 10, 100), Optional.of("AVG(*)"), Double.class, 7.3333D},
{TestTarget.ALIAS, AggregationType.AVG, Arrays.asList("AVG_RESULT", "sharding_gen_1", "sharding_gen_2"), Arrays.asList(2, 5, 10), Arrays.asList(10, 10, 100), Optional.of("AVG_RESULT"), Double.class, 7.3333D},
});
}
......
......@@ -17,6 +17,8 @@
package com.dangdang.ddframe.rdb.sharding.merger.fixture;
import com.dangdang.ddframe.rdb.sharding.jdbc.unsupported.AbstractUnsupportedOperationResultSet;
import java.io.InputStream;
import java.io.Reader;
import java.math.BigDecimal;
......@@ -28,13 +30,10 @@ import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLWarning;
import java.sql.SQLXML;
import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Calendar;
import java.util.Map;
import com.dangdang.ddframe.rdb.sharding.jdbc.unsupported.AbstractUnsupportedOperationResultSet;
public abstract class AbstractUnsupportedOperationMockResultSet extends AbstractUnsupportedOperationResultSet {
......@@ -270,10 +269,7 @@ public abstract class AbstractUnsupportedOperationMockResultSet extends Abstract
throw new SQLFeatureNotSupportedException();
}
@Override
public final Statement getStatement() throws SQLException {
throw new SQLFeatureNotSupportedException();
}
@Override
public final Object getObject(final int columnIndex, final Map<String, Class<?>> map) throws SQLException {
......
......@@ -20,6 +20,7 @@ package com.dangdang.ddframe.rdb.sharding.merger.fixture;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
......@@ -141,13 +142,13 @@ public final class MockResultSet<T> extends AbstractUnsupportedOperationMockResu
}
@Override
public boolean isBeforeFirst() throws SQLException {
return null == currentValue;
public final int getFetchSize() throws SQLException {
return size;
}
@Override
public final int getFetchSize() throws SQLException {
return size;
public final Statement getStatement() throws SQLException {
return null;
}
@Override
......
......@@ -17,16 +17,16 @@
package com.dangdang.ddframe.rdb.sharding.merger.orderby;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import com.dangdang.ddframe.rdb.sharding.merger.fixture.MockResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.row.OrderByRow;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
import org.junit.Before;
import org.junit.Test;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
......@@ -48,7 +48,7 @@ public class OrderByRowTest {
assertThat((int) row.getCell(1), is(1));
}
@Test(expected = IndexOutOfBoundsException.class)
@Test(expected = IllegalArgumentException.class)
public void testGetCellError() throws Exception {
assertThat((int) row.getCell(2), is(1));
}
......
......@@ -17,6 +17,11 @@
package com.dangdang.ddframe.rdb.sharding.merger.rs;
import com.dangdang.ddframe.rdb.sharding.merger.component.other.MemoryOrderByResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.fixture.MockResultSet;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
import org.junit.Test;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
......@@ -28,11 +33,6 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import com.dangdang.ddframe.rdb.sharding.merger.common.MemoryOrderByResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.fixture.MockResultSet;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
import org.junit.Test;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNull.nullValue;
import static org.junit.Assert.assertThat;
......
package com.dangdang.ddframe.rdb.sharding.parser.result;
import java.io.IOException;
import java.util.Arrays;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.AggregationColumn;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.AggregationColumn.AggregationType;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.GroupByColumn;
......@@ -20,6 +17,9 @@ import com.dangdang.ddframe.rdb.sharding.parser.result.router.Table;
import com.google.common.base.Optional;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
......@@ -39,9 +39,9 @@ public final class SQLParsedResultTest {
+ "conditionContexts=[ConditionContext(conditions={Condition.Column(columnName=id, tableName=order)=Condition(column=Condition.Column(columnName=id, tableName=order), "
+ "operator=IN, values=[1, 2, 3])})], "
+ "mergeContext=MergeContext("
+ "orderByColumns=[OrderByColumn(name=Optional.of(id), index=Optional.absent(), alias=Optional.of(a), orderByType=DESC)], "
+ "groupByColumns=[GroupByColumn(name=id, alias=d, orderByType=ASC)], "
+ "aggregationColumns=[AggregationColumn(expression=COUNT(id), aggregationType=COUNT, alias=Optional.of(c), option=Optional.absent(), derivedColumns=[], index=-1)], "
+ "orderByColumns=[OrderByColumn(name=Optional.of(id), index=Optional.absent(), alias=Optional.of(a), orderByType=DESC, columnIndex=0)], "
+ "groupByColumns=[GroupByColumn(name=id, alias=d, orderByType=ASC, columnIndex=0)], "
+ "aggregationColumns=[AggregationColumn(expression=COUNT(id), aggregationType=COUNT, alias=Optional.of(c), option=Optional.absent(), derivedColumns=[], columnIndex=-1)], "
+ "limit=Limit(offset=0, rowCount=10), executorEngine=null))"));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册