提交 73fc7527 编写于 作者: G gaohongtao

fix #215 #161 Improve streaming sort performance.

上级 4e63e67d
......@@ -21,12 +21,14 @@ import com.dangdang.ddframe.rdb.sharding.merger.ResultSetMergeContext;
import com.dangdang.ddframe.rdb.sharding.merger.resultset.delegate.AbstractDelegateResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.resultset.memory.row.OrderByResultSetRow;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
/**
* 流式排序的聚集结果集.
......@@ -37,72 +39,64 @@ import java.util.List;
@Slf4j
public final class StreamingOrderByReducerResultSet extends AbstractDelegateResultSet {
private final List<OrderByColumn> orderByColumns;
private final Queue<ResultSetOrderByWrapper> delegateResultSetQueue;
private final List<OrderByColumn> orderByKeys;
private final List<OrderByDelegateResultSet> delegateResultSets = new LinkedList<>();
private OrderByDelegateResultSet lastDelegateResultSet ;
public StreamingOrderByReducerResultSet(final ResultSetMergeContext resultSetMergeContext) throws SQLException {
super(resultSetMergeContext.getShardingResultSets().getResultSets());
orderByColumns = resultSetMergeContext.getCurrentOrderByKeys();
List<ResultSet> mergeResultSets = resultSetMergeContext.getShardingResultSets().getResultSets();
for(ResultSet each: mergeResultSets){
delegateResultSets.add(new OrderByDelegateResultSet(each,orderByColumns));
}
delegateResultSetQueue = new PriorityQueue<>(getResultSets().size());
orderByKeys = resultSetMergeContext.getCurrentOrderByKeys();
}
@Override
protected boolean firstNext() throws SQLException {
for (OrderByDelegateResultSet each : delegateResultSets) {
each.next();
for (ResultSet each : getResultSets()) {
ResultSetOrderByWrapper wrapper = new ResultSetOrderByWrapper(each);
if (wrapper.next()) {
delegateResultSetQueue.offer(wrapper);
}
}
return doNext();
}
@Override
protected boolean afterFirstNext() throws SQLException {
lastDelegateResultSet.next();
ResultSetOrderByWrapper firstResultSet = delegateResultSetQueue.poll();
setDelegate(firstResultSet.delegate);
if (firstResultSet.next()) {
delegateResultSetQueue.offer(firstResultSet);
}
return doNext();
}
private boolean doNext() throws SQLException {
setDelegateResultSet();
return !delegateResultSets.isEmpty();
}
private void setDelegateResultSet() throws SQLException {
OrderByResultSetRow chosenOrderByValue = null;
for (OrderByDelegateResultSet each : delegateResultSets) {
OrderByResultSetRow eachOrderByValue = each.orderByValue;
if (null == chosenOrderByValue || chosenOrderByValue.compareTo(eachOrderByValue) > 0) {
chosenOrderByValue = eachOrderByValue;
setDelegate(each.delegate);
lastDelegateResultSet = each;
}
private boolean doNext() {
if (delegateResultSetQueue.isEmpty()) {
return false;
}
log.trace("Chosen order by value: {}, current result set hashcode: {}", chosenOrderByValue, getDelegate().hashCode());
setDelegate(delegateResultSetQueue.peek().delegate);
log.trace("Chosen order by value: {}, current result set hashcode: {}", delegateResultSetQueue.peek().row, getDelegate().hashCode());
return true;
}
class OrderByDelegateResultSet {
private ResultSet delegate;
private List<OrderByColumn> orderByColumns;
private OrderByResultSetRow orderByValue ;
public OrderByDelegateResultSet(ResultSet delegate, List<OrderByColumn> orderByColumns) throws SQLException {
this.delegate = delegate;
this.orderByColumns = orderByColumns;
}
public boolean next() throws SQLException {
@RequiredArgsConstructor
private class ResultSetOrderByWrapper implements Comparable<ResultSetOrderByWrapper> {
private final ResultSet delegate;
private OrderByResultSetRow row;
boolean next() throws SQLException {
boolean result = delegate.next();
if(result) {
orderByValue = new OrderByResultSetRow(delegate, orderByColumns);
}else {
delegateResultSets.remove(this);
if (result) {
row = new OrderByResultSetRow(delegate, orderByKeys);
}
return result;
}
@Override
public int compareTo(final ResultSetOrderByWrapper o) {
return row.compareTo(o.row);
}
}
}
......@@ -8,6 +8,13 @@ next = "/03-community/directory-structure"
+++
## 1.4.2
### 功能提升
1. [ISSUE #215](https://github.com/dangdangdotcom/sharding-jdbc/issues/215) 流式排序的聚集结果集 StreamingOrderByReducerResultSet性能优化
1. [ISSUE #161](https://github.com/dangdangdotcom/sharding-jdbc/issues/161) 结果集归并的时候可以采用堆排序来提升性能
## 1.4.1
### 功能提升
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册