GroupByStreamMergedResult.java 6.1 KB
Newer Older
1
/*
2 3 4 5 6 7
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
8 9 10 11 12 13 14 15 16 17
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

18
package org.apache.shardingsphere.sharding.merge.dql.groupby;
19

ShardingSphere's avatar
ShardingSphere 已提交
20 21
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
22 23 24
import org.apache.shardingsphere.sharding.merge.dql.groupby.aggregation.AggregationUnit;
import org.apache.shardingsphere.sharding.merge.dql.groupby.aggregation.AggregationUnitFactory;
import org.apache.shardingsphere.sharding.merge.dql.orderby.OrderByStreamMergedResult;
25
import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
26 27 28
import org.apache.shardingsphere.infra.binder.segment.select.projection.impl.AggregationDistinctProjection;
import org.apache.shardingsphere.infra.binder.segment.select.projection.impl.AggregationProjection;
import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
29
import org.apache.shardingsphere.infra.executor.sql.QueryResult;
30 31 32 33 34 35 36 37 38 39

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

/**
T
terrymanu 已提交
40
 * Stream merged result for group by.
41
 */
T
terrymanu 已提交
42
public final class GroupByStreamMergedResult extends OrderByStreamMergedResult {
43
    
44
    private final SelectStatementContext selectStatementContext;
45 46 47 48 49
    
    private final List<Object> currentRow;
    
    private List<?> currentGroupByValues;
    
50
    public GroupByStreamMergedResult(final Map<String, Integer> labelAndIndexMap, final List<QueryResult> queryResults,
51 52
                                     final SelectStatementContext selectStatementContext, final ShardingSphereSchema schema) throws SQLException {
        super(queryResults, selectStatementContext, schema);
53
        this.selectStatementContext = selectStatementContext;
54
        currentRow = new ArrayList<>(labelAndIndexMap.size());
55
        currentGroupByValues = getOrderByValuesQueue().isEmpty()
56
                ? Collections.emptyList() : new GroupByValue(getCurrentQueryResult(), selectStatementContext.getGroupByContext().getItems()).getGroupValues();
57 58 59 60 61 62 63 64 65 66 67 68
    }
    
    @Override
    public boolean next() throws SQLException {
        currentRow.clear();
        if (getOrderByValuesQueue().isEmpty()) {
            return false;
        }
        if (isFirstNext()) {
            super.next();
        }
        if (aggregateCurrentGroupByRowAndNext()) {
69
            currentGroupByValues = new GroupByValue(getCurrentQueryResult(), selectStatementContext.getGroupByContext().getItems()).getGroupValues();
70 71 72 73 74 75
        }
        return true;
    }
    
    private boolean aggregateCurrentGroupByRowAndNext() throws SQLException {
        boolean result = false;
76
        Map<AggregationProjection, AggregationUnit> aggregationUnitMap = Maps.toMap(
L
Liang Zhang 已提交
77
                selectStatementContext.getProjectionsContext().getAggregationProjections(), input -> AggregationUnitFactory.create(input.getType(), input instanceof AggregationDistinctProjection));
78
        while (currentGroupByValues.equals(new GroupByValue(getCurrentQueryResult(), selectStatementContext.getGroupByContext().getItems()).getGroupValues())) {
79 80 81 82 83 84 85 86 87 88 89
            aggregate(aggregationUnitMap);
            cacheCurrentRow();
            result = super.next();
            if (!result) {
                break;
            }
        }
        setAggregationValueToCurrentRow(aggregationUnitMap);
        return result;
    }
    
90 91
    private void aggregate(final Map<AggregationProjection, AggregationUnit> aggregationUnitMap) throws SQLException {
        for (Entry<AggregationProjection, AggregationUnit> entry : aggregationUnitMap.entrySet()) {
92
            List<Comparable<?>> values = new ArrayList<>(2);
93
            if (entry.getKey().getDerivedAggregationProjections().isEmpty()) {
94 95
                values.add(getAggregationValue(entry.getKey()));
            } else {
96
                for (AggregationProjection each : entry.getKey().getDerivedAggregationProjections()) {
97 98 99 100 101 102 103 104 105 106 107 108 109
                    values.add(getAggregationValue(each));
                }
            }
            entry.getValue().merge(values);
        }
    }
    
    private void cacheCurrentRow() throws SQLException {
        for (int i = 0; i < getCurrentQueryResult().getColumnCount(); i++) {
            currentRow.add(getCurrentQueryResult().getValue(i + 1, Object.class));
        }
    }
    
T
terrymanu 已提交
110 111
    private Comparable<?> getAggregationValue(final AggregationProjection aggregationProjection) throws SQLException {
        Object result = getCurrentQueryResult().getValue(aggregationProjection.getIndex(), Object.class);
112 113 114 115
        Preconditions.checkState(null == result || result instanceof Comparable, "Aggregation value must implements Comparable");
        return (Comparable<?>) result;
    }
    
116 117
    private void setAggregationValueToCurrentRow(final Map<AggregationProjection, AggregationUnit> aggregationUnitMap) {
        for (Entry<AggregationProjection, AggregationUnit> entry : aggregationUnitMap.entrySet()) {
118 119 120 121 122 123
            currentRow.set(entry.getKey().getIndex() - 1, entry.getValue().getResult());
        }
    }
    
    @Override
    public Object getValue(final int columnIndex, final Class<?> type) {
124 125 126
        Object result = currentRow.get(columnIndex - 1);
        setWasNull(null == result);
        return result;
127 128 129 130
    }
    
    @Override
    public Object getCalendarValue(final int columnIndex, final Class<?> type, final Calendar calendar) {
131 132 133
        Object result = currentRow.get(columnIndex - 1);
        setWasNull(null == result);
        return result;
134 135
    }
}