提交 305295b3 编写于 作者: G gaohongtao

update group by

上级 e734cdbd
/**
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package com.dangdang.ddframe.rdb.sharding.jdbc.adapter;
import lombok.extern.slf4j.Slf4j;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
/**
* 代理结果集.
*
* @author gaohongtao
*/
@Slf4j
public abstract class AbstractDelegateResultSetAdapter extends AbstractResultSetGetterAdapter {
private int offset;
protected void setDelegatedResultSet(final ResultSet resultSet) {
setCurrentResultSet(resultSet);
}
protected void increaseStat() {
offset++;
log.trace(toString());
}
@Override
public boolean next() throws SQLException {
boolean result = getCurrentResultSet().next();
if (result) {
increaseStat();
}
return result;
}
@Override
public final void close() throws SQLException {
getCurrentResultSet().close();
}
@Override
public final boolean isClosed() throws SQLException {
return getCurrentResultSet().isClosed();
}
@Override
public final boolean wasNull() throws SQLException {
return getCurrentResultSet().wasNull();
}
@Override
public final int getFetchDirection() throws SQLException {
return getCurrentResultSet().getFetchDirection();
}
@Override
public final void setFetchDirection(final int direction) throws SQLException {
getCurrentResultSet().setFetchDirection(direction);
}
@Override
public final int getFetchSize() throws SQLException {
return getCurrentResultSet().getFetchSize();
}
@Override
public final void setFetchSize(final int rows) throws SQLException {
getCurrentResultSet().setFetchSize(rows);
}
@Override
public final int getType() throws SQLException {
return getCurrentResultSet().getType();
}
@Override
public final int getConcurrency() throws SQLException {
return getCurrentResultSet().getConcurrency();
}
@Override
public final Statement getStatement() throws SQLException {
return getCurrentResultSet().getStatement();
}
@Override
public final SQLWarning getWarnings() throws SQLException {
return getCurrentResultSet().getWarnings();
}
@Override
public final void clearWarnings() throws SQLException {
getCurrentResultSet().clearWarnings();
}
@Override
public final ResultSetMetaData getMetaData() throws SQLException {
return getCurrentResultSet().getMetaData();
}
@Override
public final int findColumn(final String columnLabel) throws SQLException {
return getCurrentResultSet().findColumn(columnLabel);
}
@Override
public String toString() {
return String.format("%s(%d)'s offset is %d", this.getClass().getSimpleName(), hashCode(), offset);
}
}
......@@ -25,303 +25,394 @@ import java.sql.Blob;
import java.sql.Clob;
import java.sql.Date;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.SQLXML;
import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Calendar;
import java.util.Map;
import com.dangdang.ddframe.rdb.sharding.jdbc.unsupported.AbstractUnsupportedOperationResultSet;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
* 处理多结果集的适配器.
* 代理结果集适配器的适配器.
* 适配器作为对其他结果集的包装
*
* @author zhangliang
*/
@RequiredArgsConstructor
public abstract class AbstractResultSetGetterAdapter extends AbstractUnsupportedOperationResultSet {
@Slf4j
public abstract class AbstractForwardingResultSetAdapter extends AbstractUnsupportedOperationResultSet {
@Getter(AccessLevel.PROTECTED)
@Setter(AccessLevel.PROTECTED)
private ResultSet currentResultSet;
private ResultSet delegate;
private int offset;
protected void increaseStat() {
offset++;
log.trace("{}({})'s offset is {}", this.getClass().getSimpleName(), hashCode(), offset);
}
@Override
public boolean next() throws SQLException {
boolean result = delegate.next();
if (result) {
increaseStat();
}
return result;
}
@Override
public final boolean getBoolean(final int columnIndex) throws SQLException {
return currentResultSet.getBoolean(columnIndex);
return delegate.getBoolean(columnIndex);
}
@Override
public final boolean getBoolean(final String columnLabel) throws SQLException {
return currentResultSet.getBoolean(columnLabel);
return delegate.getBoolean(columnLabel);
}
@Override
public final byte getByte(final int columnIndex) throws SQLException {
return currentResultSet.getByte(columnIndex);
return delegate.getByte(columnIndex);
}
@Override
public final byte getByte(final String columnLabel) throws SQLException {
return currentResultSet.getByte(columnLabel);
return delegate.getByte(columnLabel);
}
@Override
public final short getShort(final int columnIndex) throws SQLException {
return currentResultSet.getShort(columnIndex);
return delegate.getShort(columnIndex);
}
@Override
public final short getShort(final String columnLabel) throws SQLException {
return currentResultSet.getShort(columnLabel);
return delegate.getShort(columnLabel);
}
@Override
public final int getInt(final int columnIndex) throws SQLException {
return currentResultSet.getInt(columnIndex);
return delegate.getInt(columnIndex);
}
@Override
public final int getInt(final String columnLabel) throws SQLException {
return currentResultSet.getInt(columnLabel);
return delegate.getInt(columnLabel);
}
@Override
public final long getLong(final int columnIndex) throws SQLException {
return currentResultSet.getLong(columnIndex);
return delegate.getLong(columnIndex);
}
@Override
public final long getLong(final String columnLabel) throws SQLException {
return currentResultSet.getLong(columnLabel);
return delegate.getLong(columnLabel);
}
@Override
public final float getFloat(final int columnIndex) throws SQLException {
return currentResultSet.getFloat(columnIndex);
return delegate.getFloat(columnIndex);
}
@Override
public final float getFloat(final String columnLabel) throws SQLException {
return currentResultSet.getFloat(columnLabel);
return delegate.getFloat(columnLabel);
}
@Override
public final double getDouble(final int columnIndex) throws SQLException {
return currentResultSet.getDouble(columnIndex);
return delegate.getDouble(columnIndex);
}
@Override
public final double getDouble(final String columnLabel) throws SQLException {
return currentResultSet.getDouble(columnLabel);
return delegate.getDouble(columnLabel);
}
@Override
public final String getString(final int columnIndex) throws SQLException {
return currentResultSet.getString(columnIndex);
return delegate.getString(columnIndex);
}
@Override
public final String getString(final String columnLabel) throws SQLException {
return currentResultSet.getString(columnLabel);
return delegate.getString(columnLabel);
}
@Override
public final BigDecimal getBigDecimal(final int columnIndex) throws SQLException {
return currentResultSet.getBigDecimal(columnIndex);
return delegate.getBigDecimal(columnIndex);
}
@Override
public final BigDecimal getBigDecimal(final String columnLabel) throws SQLException {
return currentResultSet.getBigDecimal(columnLabel);
return delegate.getBigDecimal(columnLabel);
}
@SuppressWarnings("deprecation")
@Override
public final BigDecimal getBigDecimal(final int columnIndex, final int scale) throws SQLException {
return currentResultSet.getBigDecimal(columnIndex, scale);
return delegate.getBigDecimal(columnIndex, scale);
}
@SuppressWarnings("deprecation")
@Override
public final BigDecimal getBigDecimal(final String columnLabel, final int scale) throws SQLException {
return currentResultSet.getBigDecimal(columnLabel, scale);
return delegate.getBigDecimal(columnLabel, scale);
}
@Override
public final byte[] getBytes(final int columnIndex) throws SQLException {
return currentResultSet.getBytes(columnIndex);
return delegate.getBytes(columnIndex);
}
@Override
public final byte[] getBytes(final String columnLabel) throws SQLException {
return currentResultSet.getBytes(columnLabel);
return delegate.getBytes(columnLabel);
}
@Override
public final Date getDate(final int columnIndex) throws SQLException {
return currentResultSet.getDate(columnIndex);
return delegate.getDate(columnIndex);
}
@Override
public final Date getDate(final String columnLabel) throws SQLException {
return currentResultSet.getDate(columnLabel);
return delegate.getDate(columnLabel);
}
@Override
public final Date getDate(final int columnIndex, final Calendar cal) throws SQLException {
return currentResultSet.getDate(columnIndex, cal);
return delegate.getDate(columnIndex, cal);
}
@Override
public final Date getDate(final String columnLabel, final Calendar cal) throws SQLException {
return currentResultSet.getDate(columnLabel, cal);
return delegate.getDate(columnLabel, cal);
}
@Override
public final Time getTime(final int columnIndex) throws SQLException {
return currentResultSet.getTime(columnIndex);
return delegate.getTime(columnIndex);
}
@Override
public final Time getTime(final String columnLabel) throws SQLException {
return currentResultSet.getTime(columnLabel);
return delegate.getTime(columnLabel);
}
@Override
public final Time getTime(final int columnIndex, final Calendar cal) throws SQLException {
return currentResultSet.getTime(columnIndex, cal);
return delegate.getTime(columnIndex, cal);
}
@Override
public final Time getTime(final String columnLabel, final Calendar cal) throws SQLException {
return currentResultSet.getTime(columnLabel, cal);
return delegate.getTime(columnLabel, cal);
}
@Override
public final Timestamp getTimestamp(final int columnIndex) throws SQLException {
return currentResultSet.getTimestamp(columnIndex);
return delegate.getTimestamp(columnIndex);
}
@Override
public final Timestamp getTimestamp(final String columnLabel) throws SQLException {
return currentResultSet.getTimestamp(columnLabel);
return delegate.getTimestamp(columnLabel);
}
@Override
public final Timestamp getTimestamp(final int columnIndex, final Calendar cal) throws SQLException {
return currentResultSet.getTimestamp(columnIndex, cal);
return delegate.getTimestamp(columnIndex, cal);
}
@Override
public final Timestamp getTimestamp(final String columnLabel, final Calendar cal) throws SQLException {
return currentResultSet.getTimestamp(columnLabel, cal);
return delegate.getTimestamp(columnLabel, cal);
}
@Override
public final InputStream getAsciiStream(final int columnIndex) throws SQLException {
return currentResultSet.getAsciiStream(columnIndex);
return delegate.getAsciiStream(columnIndex);
}
@Override
public final InputStream getAsciiStream(final String columnLabel) throws SQLException {
return currentResultSet.getAsciiStream(columnLabel);
return delegate.getAsciiStream(columnLabel);
}
@SuppressWarnings("deprecation")
@Override
public final InputStream getUnicodeStream(final int columnIndex) throws SQLException {
return currentResultSet.getUnicodeStream(columnIndex);
return delegate.getUnicodeStream(columnIndex);
}
@SuppressWarnings("deprecation")
@Override
public final InputStream getUnicodeStream(final String columnLabel) throws SQLException {
return currentResultSet.getUnicodeStream(columnLabel);
return delegate.getUnicodeStream(columnLabel);
}
@Override
public final InputStream getBinaryStream(final int columnIndex) throws SQLException {
return currentResultSet.getBinaryStream(columnIndex);
return delegate.getBinaryStream(columnIndex);
}
@Override
public final InputStream getBinaryStream(final String columnLabel) throws SQLException {
return currentResultSet.getBinaryStream(columnLabel);
return delegate.getBinaryStream(columnLabel);
}
@Override
public final Reader getCharacterStream(final int columnIndex) throws SQLException {
return currentResultSet.getCharacterStream(columnIndex);
return delegate.getCharacterStream(columnIndex);
}
@Override
public final Reader getCharacterStream(final String columnLabel) throws SQLException {
return currentResultSet.getCharacterStream(columnLabel);
return delegate.getCharacterStream(columnLabel);
}
@Override
public final Blob getBlob(final int columnIndex) throws SQLException {
return currentResultSet.getBlob(columnIndex);
return delegate.getBlob(columnIndex);
}
@Override
public final Blob getBlob(final String columnLabel) throws SQLException {
return currentResultSet.getBlob(columnLabel);
return delegate.getBlob(columnLabel);
}
@Override
public final Clob getClob(final int columnIndex) throws SQLException {
return currentResultSet.getClob(columnIndex);
return delegate.getClob(columnIndex);
}
@Override
public final Clob getClob(final String columnLabel) throws SQLException {
return currentResultSet.getClob(columnLabel);
return delegate.getClob(columnLabel);
}
@Override
public final URL getURL(final int columnIndex) throws SQLException {
return currentResultSet.getURL(columnIndex);
return delegate.getURL(columnIndex);
}
@Override
public final URL getURL(final String columnLabel) throws SQLException {
return currentResultSet.getURL(columnLabel);
return delegate.getURL(columnLabel);
}
@Override
public final SQLXML getSQLXML(final int columnIndex) throws SQLException {
return currentResultSet.getSQLXML(columnIndex);
return delegate.getSQLXML(columnIndex);
}
@Override
public final SQLXML getSQLXML(final String columnLabel) throws SQLException {
return currentResultSet.getSQLXML(columnLabel);
return delegate.getSQLXML(columnLabel);
}
@Override
public final Object getObject(final int columnIndex) throws SQLException {
return currentResultSet.getObject(columnIndex);
return delegate.getObject(columnIndex);
}
@Override
public final Object getObject(final String columnLabel) throws SQLException {
return currentResultSet.getObject(columnLabel);
return delegate.getObject(columnLabel);
}
@Override
public final Object getObject(final int columnIndex, final Map<String, Class<?>> map) throws SQLException {
return currentResultSet.getObject(columnIndex, map);
return delegate.getObject(columnIndex, map);
}
@Override
public final Object getObject(final String columnLabel, final Map<String, Class<?>> map) throws SQLException {
return currentResultSet.getObject(columnLabel, map);
return delegate.getObject(columnLabel, map);
}
@Override
public void close() throws SQLException {
delegate.close();
}
@Override
public boolean isClosed() throws SQLException {
return delegate.isClosed();
}
@Override
public final boolean wasNull() throws SQLException {
return delegate.wasNull();
}
@Override
public final int getFetchDirection() throws SQLException {
return delegate.getFetchDirection();
}
@Override
public void setFetchDirection(final int direction) throws SQLException {
delegate.setFetchDirection(direction);
}
@Override
public final int getFetchSize() throws SQLException {
return delegate.getFetchSize();
}
@Override
public void setFetchSize(final int rows) throws SQLException {
delegate.setFetchSize(rows);
}
@Override
public final int getType() throws SQLException {
return delegate.getType();
}
@Override
public final int getConcurrency() throws SQLException {
return delegate.getConcurrency();
}
@Override
public final Statement getStatement() throws SQLException {
return delegate.getStatement();
}
@Override
public final SQLWarning getWarnings() throws SQLException {
return delegate.getWarnings();
}
@Override
public final void clearWarnings() throws SQLException {
delegate.clearWarnings();
}
@Override
public final ResultSetMetaData getMetaData() throws SQLException {
return delegate.getMetaData();
}
@Override
public final int findColumn(final String columnLabel) throws SQLException {
return delegate.findColumn(columnLabel);
}
}
......@@ -18,31 +18,31 @@
package com.dangdang.ddframe.rdb.sharding.jdbc.adapter;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.util.List;
import lombok.Getter;
import lombok.Setter;
/**
* 处理多结果集的适配器.
*
* @author zhangliang
*/
public abstract class AbstractResultSetAdapter extends AbstractResultSetGetterAdapter {
public abstract class AbstractReducerResultSetAdapter extends AbstractForwardingResultSetAdapter {
@Getter
@Setter
private List<ResultSet> resultSets;
private List<ResultSet> inputResultSets;
private boolean closed;
public void input(final List<ResultSet> inputResultSets) {
this.inputResultSets = inputResultSets;
setDelegate(inputResultSets.get(0));
}
@Override
public final void close() throws SQLException {
for (ResultSet each : resultSets) {
for (ResultSet each : inputResultSets) {
each.close();
}
closed = true;
......@@ -53,67 +53,17 @@ public abstract class AbstractResultSetAdapter extends AbstractResultSetGetterAd
return closed;
}
@Override
public final boolean wasNull() throws SQLException {
return getCurrentResultSet().wasNull();
}
@Override
public final int getFetchDirection() throws SQLException {
return getCurrentResultSet().getFetchDirection();
}
@Override
public final void setFetchDirection(final int direction) throws SQLException {
for (ResultSet each : resultSets) {
for (ResultSet each : inputResultSets) {
each.setFetchDirection(direction);
}
}
@Override
public final int getFetchSize() throws SQLException {
return getCurrentResultSet().getFetchSize();
}
@Override
public final void setFetchSize(final int rows) throws SQLException {
for (ResultSet each : resultSets) {
for (ResultSet each : inputResultSets) {
each.setFetchSize(rows);
}
}
@Override
public final int getType() throws SQLException {
return getCurrentResultSet().getType();
}
@Override
public final int getConcurrency() throws SQLException {
return getCurrentResultSet().getConcurrency();
}
@Override
public final Statement getStatement() throws SQLException {
return getCurrentResultSet().getStatement();
}
@Override
public final SQLWarning getWarnings() throws SQLException {
return getCurrentResultSet().getWarnings();
}
@Override
public final void clearWarnings() throws SQLException {
getCurrentResultSet().clearWarnings();
}
@Override
public final ResultSetMetaData getMetaData() throws SQLException {
return getCurrentResultSet().getMetaData();
}
@Override
public final int findColumn(final String columnLabel) throws SQLException {
return getCurrentResultSet().findColumn(columnLabel);
}
}
......@@ -17,16 +17,6 @@
package com.dangdang.ddframe.rdb.sharding.jdbc.adapter;
import com.dangdang.ddframe.rdb.sharding.jdbc.unsupported.AbstractUnsupportedOperationRowResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.common.ResultSetUtil;
import com.dangdang.ddframe.rdb.sharding.merger.row.Row;
import com.dangdang.ddframe.rdb.sharding.util.SQLUtil;
import com.google.common.base.Preconditions;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.math.BigDecimal;
import java.net.MalformedURLException;
import java.net.URL;
......@@ -43,9 +33,19 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import com.dangdang.ddframe.rdb.sharding.jdbc.unsupported.AbstractUnsupportedOperationRowResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.common.ResultSetUtil;
import com.dangdang.ddframe.rdb.sharding.merger.row.Row;
import com.dangdang.ddframe.rdb.sharding.util.SQLUtil;
import com.google.common.base.Preconditions;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
* 使用行数据集实现的结果集.
*
*
* @author gaohongtao
*/
@Slf4j
......@@ -69,7 +69,9 @@ public abstract class AbstractRowSetResultSetAdapter extends AbstractUnsupported
public boolean next() throws SQLException {
init();
boolean result = (currentRow = nextRow()) != null;
log.trace(toString());
if (result) {
log.trace("Current row is {}", currentRow);
}
return result;
}
......@@ -389,9 +391,4 @@ public abstract class AbstractRowSetResultSetAdapter extends AbstractUnsupported
public int getConcurrency() throws SQLException {
return ResultSet.CONCUR_READ_ONLY;
}
@Override
public String toString() {
return String.format("Current row is %s", currentRow);
}
}
......@@ -18,6 +18,7 @@
package com.dangdang.ddframe.rdb.sharding.merger.aggregation;
import java.math.BigDecimal;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
......@@ -34,14 +35,14 @@ public final class AccumulationAggregationUnit implements AggregationUnit {
private BigDecimal result;
@Override
public void merge(final Comparable<?>... values) {
if (null == values || null == values[0]) {
public void merge(final List<Comparable<?>> values) {
if (null == values || null == values.get(0)) {
return;
}
if (null == result) {
result = new BigDecimal("0");
}
result = result.add(new BigDecimal(values[0].toString()));
result = result.add(new BigDecimal(values.get(0).toString()));
log.trace("Accumulation result: {}", result.toString());
}
......
......@@ -17,6 +17,8 @@
package com.dangdang.ddframe.rdb.sharding.merger.aggregation;
import java.util.List;
/**
* 归并计算单元接口.
*
......@@ -29,7 +31,7 @@ public interface AggregationUnit {
*
* @param values 聚合数值
*/
void merge(Comparable<?>... values);
void merge(List<Comparable<?>> values);
/**
* 获取计算结果.
......
......@@ -18,6 +18,7 @@
package com.dangdang.ddframe.rdb.sharding.merger.aggregation;
import java.math.BigDecimal;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
......@@ -36,8 +37,8 @@ public class AvgAggregationUnit implements AggregationUnit {
private BigDecimal sum;
@Override
public void merge(final Comparable<?>... values) {
if (null == values || null == values[0] || null == values[1]) {
public void merge(final List<Comparable<?>> values) {
if (null == values || null == values.get(0) || null == values.get(1)) {
return;
}
if (null == count) {
......@@ -46,8 +47,8 @@ public class AvgAggregationUnit implements AggregationUnit {
if (null == sum) {
sum = new BigDecimal("0");
}
count = count.add(new BigDecimal(values[0].toString()));
sum = sum.add(new BigDecimal(values[1].toString()));
count = count.add(new BigDecimal(values.get(0).toString()));
sum = sum.add(new BigDecimal(values.get(1).toString()));
log.trace("AVG result COUNT: {} SUM: {}", count, sum);
}
......
......@@ -17,6 +17,8 @@
package com.dangdang.ddframe.rdb.sharding.merger.aggregation;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
......@@ -35,18 +37,18 @@ public final class ComparableAggregationUnit implements AggregationUnit {
@SuppressWarnings("unchecked")
@Override
public void merge(@SuppressWarnings("rawtypes") final Comparable... values) {
if (null == values || null == values[0]) {
public void merge(final List<Comparable<?>> values) {
if (null == values || null == values.get(0)) {
return;
}
if (null == result) {
result = values[0];
result = values.get(0);
log.trace("Comparable result: {}", result);
return;
}
int comparedValue = values[0].compareTo(result);
int comparedValue = ((Comparable) values.get(0)).compareTo(result);
if (asc && comparedValue < 0 || !asc && comparedValue > 0) {
result = values[0];
result = values.get(0);
log.trace("Comparable result: {}", result);
}
}
......
......@@ -17,6 +17,11 @@
package com.dangdang.ddframe.rdb.sharding.merger.component.coupling;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractRowSetResultSetAdapter;
import com.dangdang.ddframe.rdb.sharding.merger.component.CouplingResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.row.GroupByRow;
......@@ -24,11 +29,7 @@ import com.dangdang.ddframe.rdb.sharding.merger.row.Row;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.AggregationColumn;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.GroupByColumn;
import lombok.RequiredArgsConstructor;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
/**
* 分组节点结果集.
......@@ -36,6 +37,7 @@ import java.util.List;
* @author gaohongtao
*/
@RequiredArgsConstructor
@Slf4j
public class GroupByCouplingResultSet extends AbstractRowSetResultSetAdapter implements CouplingResultSet {
private final List<GroupByColumn> groupByColumns;
......@@ -63,7 +65,13 @@ public class GroupByCouplingResultSet extends AbstractRowSetResultSetAdapter imp
return null;
}
GroupByRow row = new GroupByRow(resultSet, groupByColumns, aggregationColumns);
hasNext = row.aggregate();
if (aggregationColumns.isEmpty()) {
return row;
}
for (List<Object> groupByKey = row.getGroupByKey(); hasNext && (groupByColumns.isEmpty() || groupByKey.equals(row.getGroupByKey())); hasNext = resultSet.next()) {
row.aggregate();
}
row.generateResult();
return row;
}
}
......@@ -17,21 +17,21 @@
package com.dangdang.ddframe.rdb.sharding.merger.component.coupling;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractDelegateResultSetAdapter;
import java.sql.ResultSet;
import java.sql.SQLException;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractForwardingResultSetAdapter;
import com.dangdang.ddframe.rdb.sharding.merger.component.CouplingResultSet;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.Limit;
import lombok.extern.slf4j.Slf4j;
import java.sql.ResultSet;
import java.sql.SQLException;
/**
* 限制结果集.
*
* @author gaohongtao
*/
@Slf4j
public class LimitCouplingResultSet extends AbstractDelegateResultSetAdapter implements CouplingResultSet {
public class LimitCouplingResultSet extends AbstractForwardingResultSetAdapter implements CouplingResultSet {
private final Limit limit;
......@@ -47,7 +47,7 @@ public class LimitCouplingResultSet extends AbstractDelegateResultSetAdapter imp
@Override
public void init(final ResultSet preResultSet) {
setDelegatedResultSet(preResultSet);
setDelegate(preResultSet);
this.preResultSet = preResultSet;
}
......
......@@ -17,29 +17,27 @@
package com.dangdang.ddframe.rdb.sharding.merger.component.coupling;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractDelegateResultSetAdapter;
import java.sql.ResultSet;
import java.util.Collections;
import java.util.List;
import com.dangdang.ddframe.rdb.sharding.merger.component.CouplingResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.component.other.MemoryOrderByResultSet;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
import java.sql.ResultSet;
import java.util.List;
/**
* 基于内存的全排序.
*
* @author gaohongtao
*/
public class MemoryOrderByCouplingResultSet extends AbstractDelegateResultSetAdapter implements CouplingResultSet {
private final List<OrderByColumn> expectOrderList;
public class MemoryOrderByCouplingResultSet extends MemoryOrderByResultSet implements CouplingResultSet {
public MemoryOrderByCouplingResultSet(final List<OrderByColumn> expectOrderList) {
this.expectOrderList = expectOrderList;
super(expectOrderList);
}
@Override
public void init(final ResultSet preResultSet) {
setDelegatedResultSet(new MemoryOrderByResultSet(preResultSet, expectOrderList));
setResultSets(Collections.singletonList(preResultSet));
}
}
......@@ -17,11 +17,6 @@
package com.dangdang.ddframe.rdb.sharding.merger.component.other;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractRowSetResultSetAdapter;
import com.dangdang.ddframe.rdb.sharding.merger.row.OrderByRow;
import com.dangdang.ddframe.rdb.sharding.merger.row.Row;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
......@@ -29,26 +24,24 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractRowSetResultSetAdapter;
import com.dangdang.ddframe.rdb.sharding.merger.row.OrderByRow;
import com.dangdang.ddframe.rdb.sharding.merger.row.Row;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
import lombok.RequiredArgsConstructor;
/**
* 内存结果集.
*
* @author gaohongtao
*/
@RequiredArgsConstructor
public class MemoryOrderByResultSet extends AbstractRowSetResultSetAdapter {
private final List<OrderByColumn> orderByColumns;
private Iterator<OrderByRow> orderByRowsIterator;
public MemoryOrderByResultSet(final ResultSet resultSets, final List<OrderByColumn> orderByColumns) {
this(Collections.singletonList(resultSets), orderByColumns);
}
public MemoryOrderByResultSet(final List<ResultSet> resultSets, final List<OrderByColumn> orderByColumns) {
this.orderByColumns = orderByColumns;
setResultSets(resultSets);
}
@Override
protected void initRows(final List<ResultSet> resultSets) throws SQLException {
List<OrderByRow> orderByRows = new LinkedList<>();
......
......@@ -17,49 +17,31 @@
package com.dangdang.ddframe.rdb.sharding.merger.component.other;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractDelegateResultSetAdapter;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.IndexColumn;
import lombok.Getter;
import org.apache.commons.collections4.map.CaseInsensitiveMap;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Map;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractForwardingResultSetAdapter;
import lombok.Getter;
/**
* 原始结果集包装类.
*
* @author gaohongtao
*/
public class WrapperResultSet extends AbstractDelegateResultSetAdapter {
public class WrapperResultSet extends AbstractForwardingResultSetAdapter {
@Getter
private final boolean isEmpty;
private final Map<String, Integer> columnLabelIndexMap;
private boolean isFirstNext;
public WrapperResultSet(final ResultSet resultSet) throws SQLException {
isEmpty = !resultSet.next();
if (isEmpty) {
columnLabelIndexMap = Collections.emptyMap();
return;
}
setDelegatedResultSet(resultSet);
setDelegate(resultSet);
increaseStat();
columnLabelIndexMap = getColumnLabelIndexMap();
}
private Map<String, Integer> getColumnLabelIndexMap() throws SQLException {
ResultSetMetaData resultSetMetaData = getCurrentResultSet().getMetaData();
Map<String, Integer> result = new CaseInsensitiveMap<>(resultSetMetaData.getColumnCount());
for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
result.put(resultSetMetaData.getColumnLabel(i), i);
}
return result;
}
@Override
......@@ -72,19 +54,4 @@ public class WrapperResultSet extends AbstractDelegateResultSetAdapter {
}
return super.next();
}
/**
* 获取列索引.
*
* @param indexColumn 基于索引的列
* @return 列索引
*/
public int getColumnIndex(final IndexColumn indexColumn) {
if (indexColumn.getColumnLabel().isPresent() && columnLabelIndexMap.containsKey(indexColumn.getColumnLabel().get())) {
return columnLabelIndexMap.get(indexColumn.getColumnLabel().get());
} else if (indexColumn.getColumnName().isPresent() && columnLabelIndexMap.containsKey(indexColumn.getColumnName().get())) {
return columnLabelIndexMap.get(indexColumn.getColumnName().get());
}
throw new IllegalArgumentException(String.format("Cannot find index for column '%s' from ResultSet '%s'", indexColumn, columnLabelIndexMap));
}
}
......@@ -17,21 +17,21 @@
package com.dangdang.ddframe.rdb.sharding.merger.component.reducer;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractResultSetAdapter;
import com.dangdang.ddframe.rdb.sharding.merger.component.ReducerResultSet;
import lombok.extern.slf4j.Slf4j;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractReducerResultSetAdapter;
import com.dangdang.ddframe.rdb.sharding.merger.component.ReducerResultSet;
import lombok.extern.slf4j.Slf4j;
/**
* 迭代归并结果集.
*
* @author gaohongtao
*/
@Slf4j
public class IteratorReducerResultSet extends AbstractResultSetAdapter implements ReducerResultSet {
public class IteratorReducerResultSet extends AbstractReducerResultSetAdapter implements ReducerResultSet {
private int resultSetIndex;
......@@ -39,24 +39,23 @@ public class IteratorReducerResultSet extends AbstractResultSetAdapter implement
@Override
public void init(final List<ResultSet> preResultSet) {
setResultSets(preResultSet);
input(preResultSet);
resultSetIndex++;
setCurrentResultSet(preResultSet.get(0));
}
@Override
public boolean next() throws SQLException {
if (null != getCurrentResultSet() && getCurrentResultSet().next()) {
if (null != getDelegate() && getDelegate().next()) {
currentResultSetOffset++;
log.trace(toString());
return true;
}
if (resultSetIndex >= getResultSets().size()) {
if (resultSetIndex >= getInputResultSets().size()) {
return false;
}
currentResultSetOffset = 1;
ResultSet rs = getResultSets().get(resultSetIndex++);
setCurrentResultSet(rs);
ResultSet rs = getInputResultSets().get(resultSetIndex++);
setDelegate(rs);
log.trace(toString());
return rs.next();
}
......@@ -64,6 +63,6 @@ public class IteratorReducerResultSet extends AbstractResultSetAdapter implement
@Override
// TODO 同样的toString问题
public String toString() {
return String.format("Current access %d of %d result set, offset is %d", resultSetIndex, getResultSets().size(), currentResultSetOffset);
return String.format("Current access %d of %d result set, offset is %d", resultSetIndex, getInputResultSets().size(), currentResultSetOffset);
}
}
......@@ -20,25 +20,23 @@ package com.dangdang.ddframe.rdb.sharding.merger.component.reducer;
import java.sql.ResultSet;
import java.util.List;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractDelegateResultSetAdapter;
import com.dangdang.ddframe.rdb.sharding.merger.component.ReducerResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.component.other.MemoryOrderByResultSet;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
import lombok.RequiredArgsConstructor;
/**
* 根据排序列进行内存中排序.
*
* @author gaohongtao
*/
// TODO 和MemoryOrderByResultSet能否合二为一
@RequiredArgsConstructor
public class MemoryOrderByReducerResultSet extends AbstractDelegateResultSetAdapter implements ReducerResultSet {
public class MemoryOrderByReducerResultSet extends MemoryOrderByResultSet implements ReducerResultSet {
private final List<OrderByColumn> orderByColumns;
public MemoryOrderByReducerResultSet(final List<OrderByColumn> orderByColumns) {
super(orderByColumns);
}
@Override
public void init(final List<ResultSet> preResultSet) {
setDelegatedResultSet(new MemoryOrderByResultSet(preResultSet, orderByColumns));
setResultSets(preResultSet);
}
}
......@@ -24,7 +24,7 @@ import java.util.List;
import java.util.Queue;
import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractResultSetAdapter;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractReducerResultSetAdapter;
import com.dangdang.ddframe.rdb.sharding.merger.component.ReducerResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.row.OrderByRow;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
......@@ -38,42 +38,33 @@ import lombok.extern.slf4j.Slf4j;
* @author gaohongtao
*/
@Slf4j
public class StreamingOrderByReducerResultSet extends AbstractResultSetAdapter implements ReducerResultSet {
public class StreamingOrderByReducerResultSet extends AbstractReducerResultSetAdapter implements ReducerResultSet {
private final List<OrderByColumn> orderByColumns;
// TODO 作用?
private boolean initial;
// TODO 为什么用Queue
private Queue<ResultSet> effectiveResultSetQueue;
// TODO 使用lombok?
public StreamingOrderByReducerResultSet(final List<OrderByColumn> orderByColumns) {
this.orderByColumns = orderByColumns;
}
@Override
// TODO preResultSet什么意思, 如果是复数需要加s
public void init(final List<ResultSet> preResultSet) {
// TODO 以下两步可否通过构造器
setResultSets(preResultSet);
setCurrentResultSet(preResultSet.get(0));
public void init(final List<ResultSet> preResultSet) throws SQLException {
input(preResultSet);
setDelegate(preResultSet.get(0));
effectiveResultSetQueue = new LinkedList<>(Collections2.filter(preResultSet, new Predicate<ResultSet>() {
@Override
public boolean apply(final ResultSet input) {
try {
// TODO 之前在WrapperResultSet的构造器里已经next过了, 会否有问题
return input.next();
// TODO next问题是否直接抛SQLException, 目前方法签名的SQLException并不需要
} catch (final SQLException ex) {
throw new ShardingJdbcException(ex);
}
}
}));
// TODO log是否有意义,没有覆盖toString能否看清调试信息
log.trace("Effective result set: {}", effectiveResultSetQueue);
log.trace("Effective result set:{}", effectiveResultSetQueue);
}
@Override
......@@ -83,36 +74,30 @@ public class StreamingOrderByReducerResultSet extends AbstractResultSetAdapter i
} else {
initial = true;
}
// TODO 单独提炼一个getComparedResultSet这样的方法是否好一些
OrderByRow chosenOrderByValue = null;
for (ResultSet each : effectiveResultSetQueue) {
// TODO 变量名字是否应该叫orderByRow
OrderByRow eachOrderByValue = new OrderByRow(orderByColumns, each);
if (null == chosenOrderByValue || chosenOrderByValue.compareTo(eachOrderByValue) > 0) {
chosenOrderByValue = eachOrderByValue;
// TODO 作用?
setCurrentResultSet(each);
setDelegate(each);
}
}
if (!effectiveResultSetQueue.isEmpty()) {
// TODO toString是否应删除, 将内容直接挪入log
log.trace(toString());
}
return !effectiveResultSetQueue.isEmpty();
}
private void nextEffectiveResultSets() throws SQLException {
// TODO next rename => hasNext
boolean next = getCurrentResultSet().next();
boolean next = getDelegate().next();
if (!next) {
effectiveResultSetQueue.remove(getCurrentResultSet());
log.trace("Result set {} finish", getCurrentResultSet());
effectiveResultSetQueue.remove(getDelegate());
log.trace("Result set {} finish", getDelegate());
}
}
@Override
// TODO toString应该展现变量状态, 描述词语Current result set: 是否应去掉, 而且ToString是否不应只展现getCurrentResultSet的状态?
public String toString() {
return String.format("Current result set:%s", getCurrentResultSet());
return String.format("Current result set:%s", getDelegate());
}
}
......@@ -17,6 +17,14 @@
package com.dangdang.ddframe.rdb.sharding.merger.row;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException;
import com.dangdang.ddframe.rdb.sharding.merger.aggregation.AggregationUnit;
import com.dangdang.ddframe.rdb.sharding.merger.aggregation.AggregationUnitFactory;
......@@ -27,16 +35,9 @@ import com.google.common.base.Function;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
// TODO javadoc
/**
* 分组行.
*
* @author gaohongtao
*/
@Slf4j
......@@ -48,73 +49,47 @@ public class GroupByRow extends Row {
private final List<AggregationColumn> aggregationColumns;
private final Map<AggregationColumn, AggregationUnit> aggregationUnitMap;
public GroupByRow(final ResultSet resultSet, final List<GroupByColumn> groupByColumns, final List<AggregationColumn> aggregationColumns) throws SQLException {
super(resultSet);
this.resultSet = resultSet;
this.groupByColumns = groupByColumns;
this.aggregationColumns = aggregationColumns;
aggregationUnitMap = new HashMap<>(aggregationColumns.size());
for (AggregationColumn each : aggregationColumns) {
aggregationUnitMap.put(each, AggregationUnitFactory.create(each.getAggregationType()));
}
}
// TODO javadoc
public boolean aggregate() throws SQLException {
Map<AggregationColumn, AggregationUnit> aggregationUnitMap = null;
if (!aggregationColumns.isEmpty()) {
aggregationUnitMap = new HashMap<>(aggregationColumns.size());
}
List<Object> groupByKey = getGroupByKey(groupByColumns);
log.trace("Group {} start", groupByKey);
boolean hasNext = false;
do {
if (!groupByColumns.isEmpty() && !groupByKey.equals(getGroupByKey(groupByColumns))) {
log.trace("Group {} finish", groupByKey);
break;
}
mergeAggregationColumn(aggregationUnitMap);
} while (hasNext = resultSet.next());
if (null == aggregationUnitMap) {
return hasNext;
public void aggregate() throws SQLException {
for (Map.Entry<AggregationColumn, AggregationUnit> each : aggregationUnitMap.entrySet()) {
List<AggregationColumn> mergingAggregationColumns = each.getKey().getDerivedColumns().isEmpty() ? Collections.singletonList(each.getKey()) : Lists.newArrayList(each.getKey().getDerivedColumns());
each.getValue().merge(Lists.transform(mergingAggregationColumns, new Function<IndexColumn, Comparable<?>>() {
@Override
public Comparable<?> apply(final IndexColumn input) {
return (Comparable<?>) getValueSilently(input.getColumnIndex());
}
}));
}
}
public void generateResult() {
for (AggregationColumn each : aggregationUnitMap.keySet()) {
setCell(each.getColumnIndex(), aggregationUnitMap.get(each).getResult());
}
return hasNext;
}
private List<Object> getGroupByKey(final List<GroupByColumn> groupByColumns) {
public List<Object> getGroupByKey() {
List<Object> result = new ArrayList<>(groupByColumns.size());
for (GroupByColumn each : groupByColumns) {
result.add(getValueSafely(each.getColumnIndex()));
result.add(getValueSilently(each.getColumnIndex()));
}
return result;
}
private void mergeAggregationColumn(final Map<AggregationColumn, AggregationUnit> aggregationUnitMap) {
if (null == aggregationUnitMap) {
return;
}
for (AggregationColumn each : aggregationColumns) {
if (!aggregationUnitMap.containsKey(each)) {
aggregationUnitMap.put(each, AggregationUnitFactory.create(each.getAggregationType()));
}
AggregationUnit unit = aggregationUnitMap.get(each);
List<AggregationColumn> mergingAggregationColumns;
if (each.getDerivedColumns().isEmpty()) {
mergingAggregationColumns = Collections.singletonList(each);
} else {
mergingAggregationColumns = Lists.newArrayList(each.getDerivedColumns());
}
unit.merge(Lists.transform(mergingAggregationColumns, new Function<IndexColumn, Comparable<?>>() {
@Override
public Comparable<?> apply(final IndexColumn input) {
log.trace("Column Index {} will be merged", input.getColumnIndex());
return (Comparable<?>) getValueSafely(input.getColumnIndex());
}
}).toArray(new Comparable[mergingAggregationColumns.size()]));
}
}
private Object getValueSafely(final int index) {
private Object getValueSilently(final int index) {
try {
return resultSet.getObject(index);
} catch (final SQLException ex) {
......@@ -123,8 +98,25 @@ public class GroupByRow extends Row {
}
@Override
// TODO toString问题
public String toString() {
return String.format("Group by columns is %s, aggregation column is %s, %s", groupByColumns, aggregationColumns, super.toString());
StringBuilder result = new StringBuilder("GroupByKey is: ");
result.append(Lists.transform(groupByColumns, new Function<GroupByColumn, Object>() {
@Override
public Object apply(final GroupByColumn input) {
return getCell(input.getColumnIndex());
}
}));
if (aggregationColumns.isEmpty()) {
return result.toString();
}
result.append("; Aggregation result is: ").append(Lists.transform(aggregationColumns, new Function<AggregationColumn, String>() {
@Override
public String apply(final AggregationColumn input) {
Object value = getCell(input.getColumnIndex());
value = null == value ? "null" : value;
return String.format("{index:%d, type:%s, value:%s}", input.getColumnIndex(), input.getAggregationType(), value);
}
}));
return result.toString();
}
}
......@@ -17,18 +17,18 @@
package com.dangdang.ddframe.rdb.sharding.merger.row;
import com.dangdang.ddframe.rdb.sharding.merger.common.ResultSetUtil;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
import com.google.common.base.Preconditions;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import com.dangdang.ddframe.rdb.sharding.merger.common.ResultSetUtil;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
import com.google.common.base.Preconditions;
/**
* 具有排序功能的行对象.
*
*
* @author gaohongtao
*/
// TODO 继承还是复用Row? 好像没有用父类相关的方法
......@@ -72,6 +72,6 @@ public class OrderByRow extends Row implements Comparable<OrderByRow> {
@Override
// TODO toString应该展现变量状态, 描述词语Order by columns value is是否应去掉, 而且ToString是否不应只展现getCurrentResultSet的状态?
public String toString() {
return String.format("Order by columns value is %s", values);
return String.format("OrderByKey is %s", values);
}
}
......@@ -17,21 +17,18 @@
package com.dangdang.ddframe.rdb.sharding.merger.row;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Arrays;
import com.google.common.base.Preconditions;
/**
* 数据行.
*
* 每个数据行对象代表结果集中的一行数据.
*
* @author gaohongtao
*/
// TODO 名字最好再考虑下, ROW有点大, 而且guava, swing等已经用了
public class Row {
private final Object[] rowData;
......@@ -40,9 +37,7 @@ public class Row {
rowData = getRowData(resultSet);
}
// TODO rename => fillRowData?
private Object[] getRowData(final ResultSet resultSet) throws SQLException {
// TODO 最好不要用md这类缩写
ResultSetMetaData md = resultSet.getMetaData();
Object[] result = new Object[md.getColumnCount()];
for (int i = 0; i < md.getColumnCount(); i++) {
......@@ -51,32 +46,29 @@ public class Row {
return result;
}
void setCell(final int index, final Object value) {
protected void setCell(final int index, final Object value) {
Preconditions.checkArgument(containsCell(index));
rowData[index - 1] = value;
}
// TODO javadoc
/**
* 通过索引访问数据行中的单元格.
*
* @param index 索引
* @return 单元格中的数据
*/
public Object getCell(final int index) {
Preconditions.checkArgument(containsCell(index));
return rowData[index - 1];
}
// TODO javadoc
// TODO 改名? 这名字不能望文生义, 比如: isIndexOutOfRange
/**
* 判断数据行中是否包含该索引.
*
* @param index 索引
* @return true 包含 false 不包含
*/
public boolean containsCell(final int index) {
return index - 1 > -1 && index - 1 < rowData.length;
}
@Override
public String toString() {
// TODO 试试 Arrays.toString(rowData)
return String.format("value is : %s", Lists.transform(Arrays.asList(rowData), new Function<Object, Object>() {
@Override
public Object apply(final Object input) {
return null == input ? "nil" : input;
}
}));
}
}
......@@ -17,6 +17,10 @@
package com.dangdang.ddframe.rdb.sharding.jdbc.adapter;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import com.dangdang.ddframe.rdb.integrate.AbstractDBUnitTest;
import com.dangdang.ddframe.rdb.integrate.db.AbstractShardingDataBasesOnlyDBUnitTest;
import com.dangdang.ddframe.rdb.sharding.api.DatabaseType;
......@@ -26,10 +30,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
......@@ -65,13 +65,13 @@ public final class ResultSetAdapterTest extends AbstractShardingDataBasesOnlyDBU
@Test
public void assertColse() throws SQLException {
actual.close();
assertClose((AbstractResultSetAdapter) actual);
assertClose((AbstractReducerResultSetAdapter) actual);
}
private void assertClose(final AbstractResultSetAdapter actual) throws SQLException {
private void assertClose(final AbstractReducerResultSetAdapter actual) throws SQLException {
assertTrue(actual.isClosed());
assertThat(actual.getResultSets().size(), is(10));
for (ResultSet each : actual.getResultSets()) {
assertThat(actual.getInputResultSets().size(), is(10));
for (ResultSet each : actual.getInputResultSets()) {
assertTrue(each.isClosed());
}
}
......@@ -88,14 +88,14 @@ public final class ResultSetAdapterTest extends AbstractShardingDataBasesOnlyDBU
actual.setFetchDirection(ResultSet.FETCH_REVERSE);
} catch (final SQLException ignore) {
}
assertFetchDirection((AbstractResultSetAdapter) actual, ResultSet.FETCH_REVERSE);
assertFetchDirection((AbstractReducerResultSetAdapter) actual, ResultSet.FETCH_REVERSE);
}
private void assertFetchDirection(final AbstractResultSetAdapter actual, final int fetchDirection) throws SQLException {
private void assertFetchDirection(final AbstractReducerResultSetAdapter actual, final int fetchDirection) throws SQLException {
// H2数据库未实现getFetchDirection方法
assertThat(actual.getFetchDirection(), is(DatabaseType.H2 == AbstractDBUnitTest.CURRENT_DB_TYPE ? ResultSet.FETCH_FORWARD : fetchDirection));
assertThat(actual.getResultSets().size(), is(10));
for (ResultSet each : actual.getResultSets()) {
assertThat(actual.getInputResultSets().size(), is(10));
for (ResultSet each : actual.getInputResultSets()) {
assertThat(each.getFetchDirection(), is(DatabaseType.H2 == AbstractDBUnitTest.CURRENT_DB_TYPE ? ResultSet.FETCH_FORWARD : fetchDirection));
}
}
......@@ -104,14 +104,14 @@ public final class ResultSetAdapterTest extends AbstractShardingDataBasesOnlyDBU
public void assertSetFetchSize() throws SQLException {
assertThat(actual.getFetchSize(), is(0));
actual.setFetchSize(100);
assertFetchSize((AbstractResultSetAdapter) actual, 100);
assertFetchSize((AbstractReducerResultSetAdapter) actual, 100);
}
private void assertFetchSize(final AbstractResultSetAdapter actual, final int fetchSize) throws SQLException {
private void assertFetchSize(final AbstractReducerResultSetAdapter actual, final int fetchSize) throws SQLException {
// H2数据库未实现getFetchSize方法
assertThat(actual.getFetchSize(), is(DatabaseType.H2 == AbstractDBUnitTest.CURRENT_DB_TYPE ? 0 : fetchSize));
assertThat(actual.getResultSets().size(), is(10));
for (ResultSet each : actual.getResultSets()) {
assertThat(actual.getInputResultSets().size(), is(10));
for (ResultSet each : actual.getInputResultSets()) {
assertThat(each.getFetchSize(), is(DatabaseType.H2 == AbstractDBUnitTest.CURRENT_DB_TYPE ? 0 : fetchSize));
}
}
......
......@@ -17,6 +17,8 @@
package com.dangdang.ddframe.rdb.sharding.merger.aggregation;
import java.util.Collections;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
......@@ -27,9 +29,9 @@ public final class AccumulationAggregationUnitTest {
@Test
public void assertAccumulationAggregation() {
AccumulationAggregationUnit accumulationAggregationUnit = new AccumulationAggregationUnit();
accumulationAggregationUnit.merge(1);
accumulationAggregationUnit.merge(1);
accumulationAggregationUnit.merge(10);
accumulationAggregationUnit.merge(Collections.<Comparable<?>>singletonList(1));
accumulationAggregationUnit.merge(Collections.<Comparable<?>>singletonList(1));
accumulationAggregationUnit.merge(Collections.<Comparable<?>>singletonList(10));
assertThat(((Number) accumulationAggregationUnit.getResult()).intValue(), is(12));
}
}
......@@ -18,6 +18,7 @@
package com.dangdang.ddframe.rdb.sharding.merger.aggregation;
import java.math.BigDecimal;
import java.util.Arrays;
import org.junit.Test;
......@@ -29,18 +30,18 @@ public final class AvgAggregationUnitTest {
@Test
public void assertAvgAggregation() {
AvgAggregationUnit avgAggregationUnit = new AvgAggregationUnit();
avgAggregationUnit.merge(10, 50);
avgAggregationUnit.merge(10, 20);
avgAggregationUnit.merge(5, 40);
avgAggregationUnit.merge(Arrays.<Comparable<?>>asList(10, 50));
avgAggregationUnit.merge(Arrays.<Comparable<?>>asList(10, 20));
avgAggregationUnit.merge(Arrays.<Comparable<?>>asList(5, 40));
assertThat((BigDecimal) avgAggregationUnit.getResult(), is(new BigDecimal("4.4000")));
}
@Test
public void assertDivideZero() {
AvgAggregationUnit avgAggregationUnit = new AvgAggregationUnit();
avgAggregationUnit.merge(0, 50);
avgAggregationUnit.merge(0, 20);
avgAggregationUnit.merge(0, 40);
avgAggregationUnit.merge(Arrays.<Comparable<?>>asList(0, 50));
avgAggregationUnit.merge(Arrays.<Comparable<?>>asList(0, 20));
avgAggregationUnit.merge(Arrays.<Comparable<?>>asList(0, 40));
assertThat((BigDecimal) avgAggregationUnit.getResult(), is(new BigDecimal(0)));
}
}
......@@ -17,6 +17,8 @@
package com.dangdang.ddframe.rdb.sharding.merger.aggregation;
import java.util.Collections;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
......@@ -27,18 +29,18 @@ public final class ComparableAggregationUnitTest {
@Test
public void assertComparableAggregationForAsc() {
ComparableAggregationUnit comparableAggregation = new ComparableAggregationUnit(true);
comparableAggregation.merge(1);
comparableAggregation.merge(10);
comparableAggregation.merge(5);
comparableAggregation.merge(Collections.<Comparable<?>>singletonList(1));
comparableAggregation.merge(Collections.<Comparable<?>>singletonList(10));
comparableAggregation.merge(Collections.<Comparable<?>>singletonList(5));
assertThat((Integer) comparableAggregation.getResult(), is(1));
}
@Test
public void assertComparableAggregationForDesc() {
ComparableAggregationUnit comparableAggregation = new ComparableAggregationUnit(false);
comparableAggregation.merge(1);
comparableAggregation.merge(10);
comparableAggregation.merge(5);
comparableAggregation.merge(Collections.<Comparable<?>>singletonList(1));
comparableAggregation.merge(Collections.<Comparable<?>>singletonList(10));
comparableAggregation.merge(Collections.<Comparable<?>>singletonList(5));
assertThat((Integer) comparableAggregation.getResult(), is(10));
}
}
......@@ -17,11 +17,6 @@
package com.dangdang.ddframe.rdb.sharding.merger.rs;
import com.dangdang.ddframe.rdb.sharding.merger.component.other.MemoryOrderByResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.fixture.MockResultSet;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
import org.junit.Test;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
......@@ -33,6 +28,11 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import com.dangdang.ddframe.rdb.sharding.merger.component.other.MemoryOrderByResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.fixture.MockResultSet;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
import org.junit.Test;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNull.nullValue;
import static org.junit.Assert.assertThat;
......@@ -41,8 +41,8 @@ public class MemoryOrderByResultSetTest {
@Test
public void testSort() throws SQLException {
MemoryOrderByResultSet rs = new MemoryOrderByResultSet(Arrays.<ResultSet>asList(new MockResultSet<>(1, 3, 5, 6, 6), new MockResultSet<>(8, 6, 4, 2)),
Collections.singletonList(new OrderByColumn(1, OrderByColumn.OrderByType.ASC)));
MemoryOrderByResultSet rs = new MemoryOrderByResultSet(Collections.singletonList(new OrderByColumn(1, OrderByColumn.OrderByType.ASC)));
rs.setResultSets(Arrays.<ResultSet>asList(new MockResultSet<>(1, 3, 5, 6, 6), new MockResultSet<>(8, 6, 4, 2)));
List<Integer> actualList = new ArrayList<>();
while (rs.next()) {
actualList.add(rs.getInt(1));
......@@ -51,8 +51,8 @@ public class MemoryOrderByResultSetTest {
rs.close();
assertThat(rs.isClosed(), is(true));
rs = new MemoryOrderByResultSet(Arrays.<ResultSet>asList(new MockResultSet<>(1, 3, 5, 6, 6), new MockResultSet<>(8, 6, 4, 2)),
Collections.singletonList(new OrderByColumn(1, OrderByColumn.OrderByType.DESC)));
rs = new MemoryOrderByResultSet(Collections.singletonList(new OrderByColumn(1, OrderByColumn.OrderByType.DESC)));
rs.setResultSets(Arrays.<ResultSet>asList(new MockResultSet<>(1, 3, 5, 6, 6), new MockResultSet<>(8, 6, 4, 2)));
actualList.clear();
while (rs.next()) {
actualList.add(rs.getInt("nAmE"));
......@@ -85,8 +85,8 @@ public class MemoryOrderByResultSetTest {
orderByColumn1.setColumnIndex(1);
OrderByColumn orderByColumn2 = new OrderByColumn("time", OrderByColumn.OrderByType.DESC);
orderByColumn2.setColumnIndex(2);
MemoryOrderByResultSet rs = new MemoryOrderByResultSet(Collections.<ResultSet>singletonList(new MockResultSet<>(Arrays.asList(rs1, rs2, rs3))),
Arrays.asList(orderByColumn1, orderByColumn2));
MemoryOrderByResultSet rs = new MemoryOrderByResultSet(Arrays.asList(orderByColumn1, orderByColumn2));
rs.setResultSets(Collections.<ResultSet>singletonList(new MockResultSet<>(Arrays.asList(rs1, rs2, rs3))));
List<Map<String, Object>> actualList = new ArrayList<>();
while (rs.next()) {
Map<String, Object> map = new TreeMap<>();
......@@ -100,15 +100,15 @@ public class MemoryOrderByResultSetTest {
@Test
public void testFindColumnSuccess() throws SQLException {
MemoryOrderByResultSet rs = new MemoryOrderByResultSet(Arrays.<ResultSet>asList(new MockResultSet<>(1, 3, 5, 6, 6), new MockResultSet<>(8, 6, 4, 2)),
Collections.singletonList(new OrderByColumn(1, OrderByColumn.OrderByType.ASC)));
MemoryOrderByResultSet rs = new MemoryOrderByResultSet( Collections.singletonList(new OrderByColumn(1, OrderByColumn.OrderByType.ASC)));
rs.setResultSets(Arrays.<ResultSet>asList(new MockResultSet<>(1, 3, 5, 6, 6), new MockResultSet<>(8, 6, 4, 2)));
assertThat(rs.findColumn("name"), is(1));
}
@Test(expected = IllegalArgumentException.class)
public void testFindColumnError() throws SQLException {
MemoryOrderByResultSet rs = new MemoryOrderByResultSet(Arrays.<ResultSet>asList(new MockResultSet<>(1, 3, 5, 6, 6), new MockResultSet<>(8, 6, 4, 2)),
Collections.singletonList(new OrderByColumn(1, OrderByColumn.OrderByType.ASC)));
MemoryOrderByResultSet rs = new MemoryOrderByResultSet(Collections.singletonList(new OrderByColumn(1, OrderByColumn.OrderByType.ASC)));
rs.setResultSets(Arrays.<ResultSet>asList(new MockResultSet<>(1, 3, 5, 6, 6), new MockResultSet<>(8, 6, 4, 2)));
rs.findColumn("unknown");
}
......@@ -117,8 +117,8 @@ public class MemoryOrderByResultSetTest {
Map<String, Object> rs1 = new TreeMap<>();
rs1.put("name", "kecf");
rs1.put("time", null);
MemoryOrderByResultSet rs = new MemoryOrderByResultSet(Collections.<ResultSet>singletonList(new MockResultSet<>(Collections.singletonList(rs1))),
Collections.singletonList(new OrderByColumn(1, OrderByColumn.OrderByType.ASC)));
MemoryOrderByResultSet rs = new MemoryOrderByResultSet(Collections.singletonList(new OrderByColumn(1, OrderByColumn.OrderByType.ASC)));
rs.setResultSets(Collections.<ResultSet>singletonList(new MockResultSet<>(Collections.singletonList(rs1))));
assertThat(rs.next(), is(true));
assertThat(rs.getObject(2), nullValue());
assertThat(rs.wasNull(), is(true));
......@@ -126,8 +126,8 @@ public class MemoryOrderByResultSetTest {
@Test
public void testOthers() throws SQLException {
MemoryOrderByResultSet rs = new MemoryOrderByResultSet(Arrays.<ResultSet>asList(new MockResultSet<>(1, 3, 5, 6, 6), new MockResultSet<>(8, 6, 4, 2)),
Collections.singletonList(new OrderByColumn(1, OrderByColumn.OrderByType.ASC)));
MemoryOrderByResultSet rs = new MemoryOrderByResultSet(Collections.singletonList(new OrderByColumn(1, OrderByColumn.OrderByType.ASC)));
rs.setResultSets(Arrays.<ResultSet>asList(new MockResultSet<>(1, 3, 5, 6, 6), new MockResultSet<>(8, 6, 4, 2)));
assertThat(rs.next(), is(true));
assertThat(rs.getFetchDirection(), is(ResultSet.FETCH_FORWARD));
assertThat(rs.getFetchSize(), is(9));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册