/* * Copyright 1999-2015 dangdang.com. *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *

*/ package com.dangdang.ddframe.rdb.sharding.merger.groupby; import com.dangdang.ddframe.rdb.sharding.merger.common.AbstractMemoryResultSetMerger; import com.dangdang.ddframe.rdb.sharding.merger.common.MemoryResultSetRow; import com.dangdang.ddframe.rdb.sharding.merger.groupby.aggregation.AggregationUnit; import com.dangdang.ddframe.rdb.sharding.merger.groupby.aggregation.AggregationUnitFactory; import com.dangdang.ddframe.rdb.sharding.parsing.parser.context.selectitem.AggregationSelectItem; import com.dangdang.ddframe.rdb.sharding.parsing.parser.statement.dql.select.SelectStatement; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; /** * 内存分组归并结果集接口. * * @author zhangliang */ public final class GroupByMemoryResultSetMerger extends AbstractMemoryResultSetMerger { private final SelectStatement selectStatement; private final Iterator memoryResultSetRows; public GroupByMemoryResultSetMerger( final Map labelAndIndexMap, final List resultSets, final SelectStatement selectStatement) throws SQLException { super(labelAndIndexMap); this.selectStatement = selectStatement; memoryResultSetRows = init(resultSets); } private Iterator init(final List resultSets) throws SQLException { Map dataMap = new HashMap<>(1024); Map> aggregationMap = new HashMap<>(1024); for (ResultSet each : resultSets) { while (each.next()) { GroupByValue groupByValue = new GroupByValue(each, selectStatement.getGroupByItems()); initForFirstGroupByValue(each, groupByValue, dataMap, aggregationMap); aggregate(each, groupByValue, aggregationMap); } } setAggregationValueToMemoryRow(dataMap, aggregationMap); List result = getMemoryResultSetRows(dataMap); if (!result.isEmpty()) { setCurrentResultSetRow(result.get(0)); } return result.iterator(); } private void initForFirstGroupByValue(final ResultSet resultSet, final GroupByValue groupByValue, final Map dataMap, final Map> aggregationMap) throws SQLException { if (!dataMap.containsKey(groupByValue)) { dataMap.put(groupByValue, new MemoryResultSetRow(resultSet)); } if (!aggregationMap.containsKey(groupByValue)) { Map map = Maps.toMap(selectStatement.getAggregationSelectItems(), new Function() { @Override public AggregationUnit apply(final AggregationSelectItem input) { return AggregationUnitFactory.create(input.getType()); } }); aggregationMap.put(groupByValue, map); } } private void aggregate(final ResultSet resultSet, final GroupByValue groupByValue, final Map> aggregationMap) throws SQLException { for (AggregationSelectItem each : selectStatement.getAggregationSelectItems()) { List> values = new ArrayList<>(2); if (each.getDerivedAggregationSelectItems().isEmpty()) { values.add(getAggregationValue(resultSet, each)); } else { for (AggregationSelectItem derived : each.getDerivedAggregationSelectItems()) { values.add(getAggregationValue(resultSet, derived)); } } aggregationMap.get(groupByValue).get(each).merge(values); } } private Comparable getAggregationValue(final ResultSet resultSet, final AggregationSelectItem aggregationSelectItem) throws SQLException { Object result = resultSet.getObject(aggregationSelectItem.getIndex()); Preconditions.checkState(null == result || result instanceof Comparable, "Aggregation value must implements Comparable"); return (Comparable) result; } private void setAggregationValueToMemoryRow(final Map dataMap, final Map> aggregationMap) { for (Entry entry : dataMap.entrySet()) { for (AggregationSelectItem each : selectStatement.getAggregationSelectItems()) { entry.getValue().setCell(each.getIndex(), aggregationMap.get(entry.getKey()).get(each).getResult()); } } } private List getMemoryResultSetRows(final Map dataMap) { List result = new ArrayList<>(dataMap.values()); Collections.sort(result, new GroupByRowComparator(selectStatement)); return result; } @Override public boolean next() throws SQLException { if (memoryResultSetRows.hasNext()) { setCurrentResultSetRow(memoryResultSetRows.next()); return true; } return false; } }