提交 bbb12fde 编写于 作者: T terrymanu

refactor merger into ShardingResultSet 23th version, ...

refactor merger into ShardingResultSet 23th version,  GroupByStreamResultSetMerger implements OrderByStreamResultSetMerger
上级 080f32e1
......@@ -18,7 +18,6 @@
package com.dangdang.ddframe.rdb.sharding.merger.common;
import com.dangdang.ddframe.rdb.sharding.merger.ResultSetMerger;
import lombok.Getter;
import lombok.Setter;
import java.io.InputStream;
......@@ -40,152 +39,158 @@ import java.util.Calendar;
*
* @author thor zhangliang
*/
@Getter
@Setter
public abstract class AbstractStreamResultSetMerger implements ResultSetMerger {
private ResultSet currentResultSet;
protected ResultSet getCurrentResultSet() throws SQLException {
if (null == currentResultSet) {
throw new SQLException("Current ResultSet is null, ResultSet perhaps end of next.");
}
return currentResultSet;
}
@Override
public Object getValue(final int columnIndex, final Class<?> type) throws SQLException {
if (Object.class == type) {
return currentResultSet.getObject(columnIndex);
return getCurrentResultSet().getObject(columnIndex);
}
if (boolean.class == type) {
return currentResultSet.getBoolean(columnIndex);
return getCurrentResultSet().getBoolean(columnIndex);
}
if (byte.class == type) {
return currentResultSet.getByte(columnIndex);
return getCurrentResultSet().getByte(columnIndex);
}
if (short.class == type) {
return currentResultSet.getShort(columnIndex);
return getCurrentResultSet().getShort(columnIndex);
}
if (int.class == type) {
return currentResultSet.getInt(columnIndex);
return getCurrentResultSet().getInt(columnIndex);
}
if (long.class == type) {
return currentResultSet.getLong(columnIndex);
return getCurrentResultSet().getLong(columnIndex);
}
if (float.class == type) {
return currentResultSet.getFloat(columnIndex);
return getCurrentResultSet().getFloat(columnIndex);
}
if (double.class == type) {
return currentResultSet.getDouble(columnIndex);
return getCurrentResultSet().getDouble(columnIndex);
}
if (String.class == type) {
return currentResultSet.getString(columnIndex);
return getCurrentResultSet().getString(columnIndex);
}
if (BigDecimal.class == type) {
return currentResultSet.getBigDecimal(columnIndex);
return getCurrentResultSet().getBigDecimal(columnIndex);
}
if (BigDecimal.class == type) {
return currentResultSet.getBigDecimal(columnIndex);
return getCurrentResultSet().getBigDecimal(columnIndex);
}
if (byte[].class == type) {
return currentResultSet.getBytes(columnIndex);
return getCurrentResultSet().getBytes(columnIndex);
}
if (Date.class == type) {
return currentResultSet.getDate(columnIndex);
return getCurrentResultSet().getDate(columnIndex);
}
if (Time.class == type) {
return currentResultSet.getTime(columnIndex);
return getCurrentResultSet().getTime(columnIndex);
}
if (Timestamp.class == type) {
return currentResultSet.getTimestamp(columnIndex);
return getCurrentResultSet().getTimestamp(columnIndex);
}
if (URL.class == type) {
return currentResultSet.getURL(columnIndex);
return getCurrentResultSet().getURL(columnIndex);
}
if (Blob.class == type) {
return currentResultSet.getBlob(columnIndex);
return getCurrentResultSet().getBlob(columnIndex);
}
if (Clob.class == type) {
return currentResultSet.getClob(columnIndex);
return getCurrentResultSet().getClob(columnIndex);
}
if (SQLXML.class == type) {
return currentResultSet.getSQLXML(columnIndex);
return getCurrentResultSet().getSQLXML(columnIndex);
}
if (Reader.class == type) {
return currentResultSet.getCharacterStream(columnIndex);
return getCurrentResultSet().getCharacterStream(columnIndex);
}
return currentResultSet.getObject(columnIndex);
return getCurrentResultSet().getObject(columnIndex);
}
@Override
public Object getValue(final String columnLabel, final Class<?> type) throws SQLException {
if (Object.class == type) {
return currentResultSet.getObject(columnLabel);
return getCurrentResultSet().getObject(columnLabel);
}
if (boolean.class == type) {
return currentResultSet.getBoolean(columnLabel);
return getCurrentResultSet().getBoolean(columnLabel);
}
if (byte.class == type) {
return currentResultSet.getByte(columnLabel);
return getCurrentResultSet().getByte(columnLabel);
}
if (short.class == type) {
return currentResultSet.getShort(columnLabel);
return getCurrentResultSet().getShort(columnLabel);
}
if (int.class == type) {
return currentResultSet.getInt(columnLabel);
return getCurrentResultSet().getInt(columnLabel);
}
if (long.class == type) {
return currentResultSet.getLong(columnLabel);
return getCurrentResultSet().getLong(columnLabel);
}
if (float.class == type) {
return currentResultSet.getFloat(columnLabel);
return getCurrentResultSet().getFloat(columnLabel);
}
if (double.class == type) {
return currentResultSet.getDouble(columnLabel);
return getCurrentResultSet().getDouble(columnLabel);
}
if (String.class == type) {
return currentResultSet.getString(columnLabel);
return getCurrentResultSet().getString(columnLabel);
}
if (BigDecimal.class == type) {
return currentResultSet.getBigDecimal(columnLabel);
return getCurrentResultSet().getBigDecimal(columnLabel);
}
if (BigDecimal.class == type) {
return currentResultSet.getBigDecimal(columnLabel);
return getCurrentResultSet().getBigDecimal(columnLabel);
}
if (byte[].class == type) {
return currentResultSet.getBytes(columnLabel);
return getCurrentResultSet().getBytes(columnLabel);
}
if (Date.class == type) {
return currentResultSet.getDate(columnLabel);
return getCurrentResultSet().getDate(columnLabel);
}
if (Time.class == type) {
return currentResultSet.getTime(columnLabel);
return getCurrentResultSet().getTime(columnLabel);
}
if (Timestamp.class == type) {
return currentResultSet.getTimestamp(columnLabel);
return getCurrentResultSet().getTimestamp(columnLabel);
}
if (URL.class == type) {
return currentResultSet.getURL(columnLabel);
return getCurrentResultSet().getURL(columnLabel);
}
if (Blob.class == type) {
return currentResultSet.getBlob(columnLabel);
return getCurrentResultSet().getBlob(columnLabel);
}
if (Clob.class == type) {
return currentResultSet.getClob(columnLabel);
return getCurrentResultSet().getClob(columnLabel);
}
if (SQLXML.class == type) {
return currentResultSet.getSQLXML(columnLabel);
return getCurrentResultSet().getSQLXML(columnLabel);
}
if (Reader.class == type) {
return currentResultSet.getCharacterStream(columnLabel);
return getCurrentResultSet().getCharacterStream(columnLabel);
}
return currentResultSet.getObject(columnLabel);
return getCurrentResultSet().getObject(columnLabel);
}
@Override
public Object getCalendarValue(final int columnIndex, final Class<?> type, final Calendar calendar) throws SQLException {
if (Date.class == type) {
return currentResultSet.getDate(columnIndex, calendar);
return getCurrentResultSet().getDate(columnIndex, calendar);
}
if (Time.class == type) {
return currentResultSet.getTime(columnIndex, calendar);
return getCurrentResultSet().getTime(columnIndex, calendar);
}
if (Timestamp.class == type) {
return currentResultSet.getTimestamp(columnIndex, calendar);
return getCurrentResultSet().getTimestamp(columnIndex, calendar);
}
throw new SQLException(String.format("Unsupported type: %s", type));
}
......@@ -193,13 +198,13 @@ public abstract class AbstractStreamResultSetMerger implements ResultSetMerger {
@Override
public Object getCalendarValue(final String columnLabel, final Class<?> type, final Calendar calendar) throws SQLException {
if (Date.class == type) {
return currentResultSet.getDate(columnLabel, calendar);
return getCurrentResultSet().getDate(columnLabel, calendar);
}
if (Time.class == type) {
return currentResultSet.getTime(columnLabel, calendar);
return getCurrentResultSet().getTime(columnLabel, calendar);
}
if (Timestamp.class == type) {
return currentResultSet.getTimestamp(columnLabel, calendar);
return getCurrentResultSet().getTimestamp(columnLabel, calendar);
}
throw new SQLException(String.format("Unsupported type: %s", type));
}
......@@ -207,16 +212,16 @@ public abstract class AbstractStreamResultSetMerger implements ResultSetMerger {
@Override
public InputStream getInputStream(final int columnIndex, final String type) throws SQLException {
if ("Ascii".equals(type)) {
return currentResultSet.getAsciiStream(columnIndex);
return getCurrentResultSet().getAsciiStream(columnIndex);
}
if ("Unicode".equals(type)) {
return currentResultSet.getUnicodeStream(columnIndex);
return getCurrentResultSet().getUnicodeStream(columnIndex);
}
if ("Binary".equals(type)) {
return currentResultSet.getBinaryStream(columnIndex);
return getCurrentResultSet().getBinaryStream(columnIndex);
}
if ("Binary".equals(type)) {
return currentResultSet.getBinaryStream(columnIndex);
return getCurrentResultSet().getBinaryStream(columnIndex);
}
throw new SQLException(String.format("Unsupported type: %s", type));
}
......@@ -224,16 +229,16 @@ public abstract class AbstractStreamResultSetMerger implements ResultSetMerger {
@Override
public InputStream getInputStream(final String columnLabel, final String type) throws SQLException {
if ("Ascii".equals(type)) {
return currentResultSet.getAsciiStream(columnLabel);
return getCurrentResultSet().getAsciiStream(columnLabel);
}
if ("Unicode".equals(type)) {
return currentResultSet.getUnicodeStream(columnLabel);
return getCurrentResultSet().getUnicodeStream(columnLabel);
}
if ("Binary".equals(type)) {
return currentResultSet.getBinaryStream(columnLabel);
return getCurrentResultSet().getBinaryStream(columnLabel);
}
if ("Binary".equals(type)) {
return currentResultSet.getBinaryStream(columnLabel);
return getCurrentResultSet().getBinaryStream(columnLabel);
}
throw new SQLException(String.format("Unsupported type: %s", type));
}
......
......@@ -17,10 +17,9 @@
package com.dangdang.ddframe.rdb.sharding.merger.groupby;
import com.dangdang.ddframe.rdb.sharding.merger.common.AbstractStreamResultSetMerger;
import com.dangdang.ddframe.rdb.sharding.merger.groupby.aggregation.AggregationUnit;
import com.dangdang.ddframe.rdb.sharding.merger.groupby.aggregation.AggregationUnitFactory;
import com.dangdang.ddframe.rdb.sharding.merger.orderby.OrderByValue;
import com.dangdang.ddframe.rdb.sharding.merger.orderby.OrderByStreamResultSetMerger;
import com.dangdang.ddframe.rdb.sharding.parsing.parser.context.selectitem.AggregationSelectItem;
import com.dangdang.ddframe.rdb.sharding.parsing.parser.statement.select.SelectStatement;
import com.google.common.base.Function;
......@@ -31,20 +30,17 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.PriorityQueue;
import java.util.Queue;
/**
* 流式分组归并结果集接口.
*
* @author zhangliang
*/
public final class GroupByStreamResultSetMerger extends AbstractStreamResultSetMerger {
public final class GroupByStreamResultSetMerger extends OrderByStreamResultSetMerger {
private final Map<String, Integer> labelAndIndexMap;
......@@ -52,39 +48,23 @@ public final class GroupByStreamResultSetMerger extends AbstractStreamResultSetM
private final List<Object> currentRow;
private final Queue<OrderByValue> orderByValuesQueue;
private List<Comparable<?>> currentGroupByValues;
private boolean isFirstNext;
private List<Comparable<?>> currentGroupByValues;
public GroupByStreamResultSetMerger(final Map<String, Integer> labelAndIndexMap, final List<ResultSet> resultSets, final SelectStatement selectStatement) throws SQLException {
super(resultSets, selectStatement.getOrderByItems());
this.labelAndIndexMap = labelAndIndexMap;
this.selectStatement = selectStatement;
currentRow = new ArrayList<>(labelAndIndexMap.size());
this.orderByValuesQueue = new PriorityQueue<>(resultSets.size());
orderResultSetsToQueue(resultSets);
currentGroupByValues = getOrderByValuesQueue().isEmpty() ? Collections.<Comparable<?>>emptyList() : new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues();
isFirstNext = true;
currentGroupByValues = orderByValuesQueue.isEmpty()
? Collections.<Comparable<?>>emptyList() : new GroupByValue(orderByValuesQueue.peek().getResultSet(), selectStatement.getGroupByItems()).getGroupValues();
}
private void orderResultSetsToQueue(final Collection<ResultSet> resultSets) throws SQLException {
for (ResultSet each : resultSets) {
OrderByValue orderByValue = new OrderByValue(each, selectStatement.getOrderByItems());
if (orderByValue.next()) {
orderByValuesQueue.offer(orderByValue);
}
}
if (!orderByValuesQueue.isEmpty()) {
setCurrentResultSet(orderByValuesQueue.peek().getResultSet());
}
}
@Override
public boolean next() throws SQLException {
currentRow.clear();
if (orderByValuesQueue.isEmpty()) {
if (getOrderByValuesQueue().isEmpty()) {
return false;
}
Map<AggregationSelectItem, AggregationUnit> aggregationUnitMap = Maps.toMap(selectStatement.getAggregationSelectItems(), new Function<AggregationSelectItem, AggregationUnit>() {
......@@ -95,10 +75,11 @@ public final class GroupByStreamResultSetMerger extends AbstractStreamResultSetM
}
});
if (isFirstNext) {
nextInternal();
super.next();
isFirstNext = false;
}
boolean hasNext = false;
while (!orderByValuesQueue.isEmpty() && currentGroupByValues.equals(new GroupByValue(orderByValuesQueue.peek().getResultSet(), selectStatement.getGroupByItems()).getGroupValues())) {
while (!getOrderByValuesQueue().isEmpty() && currentGroupByValues.equals(new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues())) {
for (Entry<AggregationSelectItem, AggregationUnit> entry : aggregationUnitMap.entrySet()) {
List<Comparable<?>> values = new ArrayList<>(2);
if (entry.getKey().getDerivedAggregationSelectItems().isEmpty()) {
......@@ -110,10 +91,10 @@ public final class GroupByStreamResultSetMerger extends AbstractStreamResultSetM
}
entry.getValue().merge(values);
}
for (int i = 0; i < orderByValuesQueue.peek().getResultSet().getMetaData().getColumnCount(); i++) {
currentRow.add(orderByValuesQueue.peek().getResultSet().getObject(i + 1));
for (int i = 0; i < getCurrentResultSet().getMetaData().getColumnCount(); i++) {
currentRow.add(getCurrentResultSet().getObject(i + 1));
}
hasNext = nextInternal();
hasNext = super.next();
if (!hasNext) {
break;
}
......@@ -122,37 +103,17 @@ public final class GroupByStreamResultSetMerger extends AbstractStreamResultSetM
currentRow.set(entry.getKey().getIndex() - 1, entry.getValue().getResult());
}
if (hasNext) {
currentGroupByValues = new GroupByValue(orderByValuesQueue.peek().getResultSet(), selectStatement.getGroupByItems()).getGroupValues();
currentGroupByValues = new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues();
}
return true;
}
private boolean nextInternal() throws SQLException {
if (orderByValuesQueue.isEmpty()) {
return false;
}
if (isFirstNext) {
isFirstNext = false;
return true;
}
OrderByValue firstOrderByValue = orderByValuesQueue.poll();
if (firstOrderByValue.next()) {
orderByValuesQueue.offer(firstOrderByValue);
}
if (orderByValuesQueue.isEmpty()) {
return false;
}
setCurrentResultSet(orderByValuesQueue.peek().getResultSet());
return true;
}
private Comparable<?> getAggregationValue(final AggregationSelectItem aggregationSelectItem) throws SQLException {
Object result = orderByValuesQueue.peek().getResultSet().getObject(aggregationSelectItem.getIndex());
Object result = getCurrentResultSet().getObject(aggregationSelectItem.getIndex());
Preconditions.checkState(null == result || result instanceof Comparable, "Aggregation value must implements Comparable");
return (Comparable<?>) result;
}
@Override
public Object getValue(final int columnIndex, final Class<?> type) throws SQLException {
return currentRow.get(columnIndex - 1);
......
......@@ -19,6 +19,8 @@ package com.dangdang.ddframe.rdb.sharding.merger.orderby;
import com.dangdang.ddframe.rdb.sharding.merger.common.AbstractStreamResultSetMerger;
import com.dangdang.ddframe.rdb.sharding.parsing.parser.context.OrderItem;
import lombok.AccessLevel;
import lombok.Getter;
import java.sql.ResultSet;
import java.sql.SQLException;
......@@ -36,6 +38,7 @@ public class OrderByStreamResultSetMerger extends AbstractStreamResultSetMerger
private final List<OrderItem> orderByItems;
@Getter(AccessLevel.PROTECTED)
private final Queue<OrderByValue> orderByValuesQueue;
private boolean isFirstNext;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册