StreamingOrderByReducerResultSet.java 3.5 KB
Newer Older
G
gaohongtao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/**
 * Copyright 1999-2015 dangdang.com.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * </p>
 */

package com.dangdang.ddframe.rdb.sharding.merger.component.reducer;

G
gaohongtao 已提交
20 21 22 23 24 25
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;

G
gaoht 已提交
26
import com.dangdang.ddframe.rdb.sharding.exception.ShardingJdbcException;
G
gaohongtao 已提交
27
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractReducerResultSetAdapter;
G
gaohongtao 已提交
28 29 30
import com.dangdang.ddframe.rdb.sharding.merger.component.ReducerResultSet;
import com.dangdang.ddframe.rdb.sharding.merger.row.OrderByRow;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.OrderByColumn;
G
gaoht 已提交
31 32
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
G
gaohongtao 已提交
33
import lombok.extern.slf4j.Slf4j;
G
gaohongtao 已提交
34 35 36 37 38 39

/**
 * 流式排序.
 *
 * @author gaohongtao
 */
G
gaohongtao 已提交
40
@Slf4j
G
gaohongtao 已提交
41
public class StreamingOrderByReducerResultSet extends AbstractReducerResultSetAdapter implements ReducerResultSet {
G
gaohongtao 已提交
42 43 44 45 46
    
    private final List<OrderByColumn> orderByColumns;
    
    private boolean initial;
    
G
gaoht 已提交
47
    private Queue<ResultSet> effectiveResultSetQueue;
G
gaohongtao 已提交
48
    
G
gaohongtao 已提交
49 50 51 52 53
    public StreamingOrderByReducerResultSet(final List<OrderByColumn> orderByColumns) {
        this.orderByColumns = orderByColumns;
    }
    
    @Override
G
gaohongtao 已提交
54 55 56
    public void init(final List<ResultSet> preResultSet) throws SQLException {
        input(preResultSet);
        setDelegate(preResultSet.get(0));
G
gaoht 已提交
57 58 59 60 61 62 63 64 65 66
        effectiveResultSetQueue = new LinkedList<>(Collections2.filter(preResultSet, new Predicate<ResultSet>() {
            @Override
            public boolean apply(final ResultSet input) {
                try {
                    return input.next();
                } catch (final SQLException ex) {
                    throw new ShardingJdbcException(ex);
                }
            }
        }));
G
gaohongtao 已提交
67
        log.trace("Effective result set:{}", effectiveResultSetQueue);
G
gaohongtao 已提交
68 69 70 71 72 73 74 75 76 77
    }
    
    @Override
    public boolean next() throws SQLException {
        if (initial) {
            nextEffectiveResultSets();
        } else {
            initial = true;
        }
        OrderByRow chosenOrderByValue = null;
G
gaoht 已提交
78
        for (ResultSet each : effectiveResultSetQueue) {
G
gaohongtao 已提交
79 80 81
            OrderByRow eachOrderByValue = new OrderByRow(orderByColumns, each);
            if (null == chosenOrderByValue || chosenOrderByValue.compareTo(eachOrderByValue) > 0) {
                chosenOrderByValue = eachOrderByValue;
G
gaohongtao 已提交
82
                setDelegate(each);
G
gaohongtao 已提交
83 84
            }
        }
G
gaoht 已提交
85 86 87 88
        if (!effectiveResultSetQueue.isEmpty()) {
            log.trace(toString());
        }
        return !effectiveResultSetQueue.isEmpty();
G
gaohongtao 已提交
89 90 91
    }
    
    private void nextEffectiveResultSets() throws SQLException {
G
gaohongtao 已提交
92
        boolean next = getDelegate().next();
G
gaohongtao 已提交
93
        if (!next) {
G
gaohongtao 已提交
94 95
            effectiveResultSetQueue.remove(getDelegate());
            log.trace("Result set {} finish", getDelegate());
G
gaohongtao 已提交
96 97
        }
    }
G
gaoht 已提交
98 99 100
    
    @Override
    public String toString() {
G
gaohongtao 已提交
101
        return String.format("Current result set:%s", getDelegate());
G
gaoht 已提交
102
    }
G
gaohongtao 已提交
103
}