提交 fdbe62e4 编写于 作者: T terrymanu

refactor merger into ShardingResultSet 24th version, refactor GroupByStreamResultSetMerger

上级 bbb12fde
......@@ -50,15 +50,12 @@ public final class GroupByStreamResultSetMerger extends OrderByStreamResultSetMe
private List<Comparable<?>> currentGroupByValues;
private boolean isFirstNext;
public GroupByStreamResultSetMerger(final Map<String, Integer> labelAndIndexMap, final List<ResultSet> resultSets, final SelectStatement selectStatement) throws SQLException {
super(resultSets, selectStatement.getOrderByItems());
this.labelAndIndexMap = labelAndIndexMap;
this.selectStatement = selectStatement;
currentRow = new ArrayList<>(labelAndIndexMap.size());
currentGroupByValues = getOrderByValuesQueue().isEmpty() ? Collections.<Comparable<?>>emptyList() : new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues();
isFirstNext = true;
currentGroupByValues = null == getCurrentResultSet() ? Collections.<Comparable<?>>emptyList() : new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues();
}
@Override
......@@ -67,6 +64,17 @@ public final class GroupByStreamResultSetMerger extends OrderByStreamResultSetMe
if (getOrderByValuesQueue().isEmpty()) {
return false;
}
if (isFirstNext()) {
super.next();
}
if (aggregateCurrentGroupByRowAndNext()) {
currentGroupByValues = new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues();
}
return true;
}
private boolean aggregateCurrentGroupByRowAndNext() throws SQLException {
boolean result = false;
Map<AggregationSelectItem, AggregationUnit> aggregationUnitMap = Maps.toMap(selectStatement.getAggregationSelectItems(), new Function<AggregationSelectItem, AggregationUnit>() {
@Override
......@@ -74,38 +82,36 @@ public final class GroupByStreamResultSetMerger extends OrderByStreamResultSetMe
return AggregationUnitFactory.create(input.getType());
}
});
if (isFirstNext) {
super.next();
isFirstNext = false;
}
boolean hasNext = false;
while (!getOrderByValuesQueue().isEmpty() && currentGroupByValues.equals(new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues())) {
for (Entry<AggregationSelectItem, AggregationUnit> entry : aggregationUnitMap.entrySet()) {
List<Comparable<?>> values = new ArrayList<>(2);
if (entry.getKey().getDerivedAggregationSelectItems().isEmpty()) {
values.add(getAggregationValue(entry.getKey()));
} else {
for (AggregationSelectItem each : entry.getKey().getDerivedAggregationSelectItems()) {
values.add(getAggregationValue(each));
}
}
entry.getValue().merge(values);
}
for (int i = 0; i < getCurrentResultSet().getMetaData().getColumnCount(); i++) {
currentRow.add(getCurrentResultSet().getObject(i + 1));
}
hasNext = super.next();
if (!hasNext) {
while (currentGroupByValues.equals(new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues())) {
aggregate(aggregationUnitMap);
cacheCurrentRow();
result = super.next();
if (!result) {
break;
}
}
setAggregationValueToCurrentRow(aggregationUnitMap);
return result;
}
private void aggregate(final Map<AggregationSelectItem, AggregationUnit> aggregationUnitMap) throws SQLException {
for (Entry<AggregationSelectItem, AggregationUnit> entry : aggregationUnitMap.entrySet()) {
currentRow.set(entry.getKey().getIndex() - 1, entry.getValue().getResult());
List<Comparable<?>> values = new ArrayList<>(2);
if (entry.getKey().getDerivedAggregationSelectItems().isEmpty()) {
values.add(getAggregationValue(entry.getKey()));
} else {
for (AggregationSelectItem each : entry.getKey().getDerivedAggregationSelectItems()) {
values.add(getAggregationValue(each));
}
}
entry.getValue().merge(values);
}
if (hasNext) {
currentGroupByValues = new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues();
}
private void cacheCurrentRow() throws SQLException {
for (int i = 0; i < getCurrentResultSet().getMetaData().getColumnCount(); i++) {
currentRow.add(getCurrentResultSet().getObject(i + 1));
}
return true;
}
private Comparable<?> getAggregationValue(final AggregationSelectItem aggregationSelectItem) throws SQLException {
......@@ -114,6 +120,12 @@ public final class GroupByStreamResultSetMerger extends OrderByStreamResultSetMe
return (Comparable<?>) result;
}
private void setAggregationValueToCurrentRow(final Map<AggregationSelectItem, AggregationUnit> aggregationUnitMap) {
for (Entry<AggregationSelectItem, AggregationUnit> entry : aggregationUnitMap.entrySet()) {
currentRow.set(entry.getKey().getIndex() - 1, entry.getValue().getResult());
}
}
@Override
public Object getValue(final int columnIndex, final Class<?> type) throws SQLException {
return currentRow.get(columnIndex - 1);
......
......@@ -41,6 +41,7 @@ public class OrderByStreamResultSetMerger extends AbstractStreamResultSetMerger
@Getter(AccessLevel.PROTECTED)
private final Queue<OrderByValue> orderByValuesQueue;
@Getter(AccessLevel.PROTECTED)
private boolean isFirstNext;
public OrderByStreamResultSetMerger(final List<ResultSet> resultSets, final List<OrderItem> orderByItems) throws SQLException {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册