提交 698bcc6f 编写于 作者: M Markus Holzemer 提交者: Fabian Hueske

Created new SortedGrouping to move the group order from the Grouping

上级 1b6a3acf
......@@ -14,8 +14,6 @@
**********************************************************************************************************************/
package eu.stratosphere.api.java.operators;
import java.util.Arrays;
import eu.stratosphere.api.common.InvalidProgramException;
import eu.stratosphere.api.common.operators.Order;
import eu.stratosphere.api.java.DataSet;
......@@ -38,12 +36,9 @@ import eu.stratosphere.api.java.functions.ReduceFunction;
*/
public class Grouping<T> {
private final DataSet<T> dataSet;
private final Keys<T> keys;
protected final DataSet<T> dataSet;
private int[] groupSortKeyPositions = null;
private Order[] groupSortOrders = null;
protected final Keys<T> keys;
public Grouping(DataSet<T> set, Keys<T> keys) {
if (set == null || keys == null) {
......@@ -67,14 +62,6 @@ public class Grouping<T> {
return this.keys;
}
protected int[] getGroupSortKeyPositions() {
return this.groupSortKeyPositions;
}
protected Order[] getGroupSortOrders() {
return this.groupSortOrders;
}
// --------------------------------------------------------------------------------------------
// Operations / Transformations
// --------------------------------------------------------------------------------------------
......@@ -96,9 +83,6 @@ public class Grouping<T> {
* @see DataSet
*/
public AggregateOperator<T> aggregate(Aggregations agg, int field) {
if(this.groupSortKeyPositions != null) {
throw new UnsupportedOperationException("Sorted groups not supported for Aggregation operation at the moment.");
}
return new AggregateOperator<T>(this, agg, field);
}
......@@ -116,9 +100,6 @@ public class Grouping<T> {
* @see DataSet
*/
public ReduceOperator<T> reduce(ReduceFunction<T> reducer) {
if(this.groupSortKeyPositions != null) {
throw new UnsupportedOperationException("Sorted groups not supported for Aggregation operation at the moment.");
}
return new ReduceOperator<T>(this, reducer);
}
......@@ -150,14 +131,16 @@ public class Grouping<T> {
*
* @param field The Tuple field on which the group is sorted.
* @param order The Order in which the specified Tuple field is sorted.
* @return A Grouping with specified order of group element.
* @return A SortedGrouping with specified order of group element.
*
* @see Tuple
* @see Order
*/
public Grouping<T> sortGroup(int field, Order order) {
public SortedGrouping<T> sortGroup(int field, Order order) {
int pos;
int[] groupSortKeyPositions;
Order[] groupSortOrders ;
if (!dataSet.getType().isTupleType()) {
throw new InvalidProgramException("Specifying order keys via field positions is only valid for tuple data types");
......@@ -166,20 +149,13 @@ public class Grouping<T> {
throw new IllegalArgumentException("Order key out of tuple bounds.");
}
if(this.groupSortKeyPositions == null) {
this.groupSortKeyPositions = new int[1];
this.groupSortOrders = new Order[1];
pos = 0;
} else {
int newLength = this.groupSortKeyPositions.length + 1;
this.groupSortKeyPositions = Arrays.copyOf(this.groupSortKeyPositions, newLength);
this.groupSortOrders = Arrays.copyOf(this.groupSortOrders, newLength);
pos = newLength - 1;
}
groupSortKeyPositions = new int[1];
groupSortOrders = new Order[1];
pos = 0;
this.groupSortKeyPositions[pos] = field;
this.groupSortOrders[pos] = order;
return this;
groupSortKeyPositions[pos] = field;
groupSortOrders[pos] = order;
return new SortedGrouping<T>(this.dataSet, this.keys, groupSortKeyPositions, groupSortOrders);
}
// public <K extends Comparable<K>> Grouping<T> sortGroup(KeySelector<T, K> keyExtractor, Order order) {
......
......@@ -148,10 +148,11 @@ public class ReduceGroupOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
po.setDegreeOfParallelism(this.getParallelism());
// set group order
if(grouper.getGroupSortKeyPositions() != null) {
if(grouper instanceof SortedGrouping) {
SortedGrouping<IN> sortedGrouper = (SortedGrouping<IN>) grouper;
int[] sortKeyPositions = grouper.getGroupSortKeyPositions();
Order[] sortOrders = grouper.getGroupSortOrders();
int[] sortKeyPositions = sortedGrouper.getGroupSortKeyPositions();
Order[] sortOrders = sortedGrouper.getGroupSortOrders();
Ordering o = new Ordering();
for(int i=0; i < sortKeyPositions.length; i++) {
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.api.java.operators;
import java.util.Arrays;
import eu.stratosphere.api.common.InvalidProgramException;
import eu.stratosphere.api.common.operators.Order;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.functions.GroupReduceFunction;
/**
* SortedGrouping is an intermediate step for a transformation on a grouped and sorted DataSet.<br/>
* The following transformation can be applied on sorted groups:
* <ul>
* <li>{@link Grouping#reduce(ReduceFunction)},</li>
* </ul>
*
* @param <T> The type of the elements of the sorted and grouped DataSet.
*/
public class SortedGrouping<T> extends Grouping<T> {
private int[] groupSortKeyPositions;
private Order[] groupSortOrders ;
public SortedGrouping(DataSet<T> set, Keys<T> keys, int[] groupSortKeyPositions, Order[] groupSortOrders) {
super(set, keys);
if (groupSortKeyPositions == null || groupSortKeyPositions.length == 0 || groupSortOrders == null || groupSortOrders.length == 0) {
throw new InvalidProgramException("Key positions and sort orders must be specified in order to create a SortedGrouping.");
}
this.groupSortKeyPositions = groupSortKeyPositions;
this.groupSortOrders = groupSortOrders;
}
protected int[] getGroupSortKeyPositions() {
return this.groupSortKeyPositions;
}
protected Order[] getGroupSortOrders() {
return this.groupSortOrders;
}
/**
* Applies a GroupReduce transformation on a grouped and sorted {@link DataSet}.<br/>
* The transformation calls a {@link GroupReduceFunction} for each group of the DataSet.
* A GroupReduceFunction can iterate over all elements of a group and emit any
* number of output elements including none.
*
* @param reducer The GroupReduceFunction that is applied on each group of the DataSet.
* @return A GroupReduceOperator that represents the reduced DataSet.
*
* @see GroupReduceFunction
* @see GroupReduceOperator
* @see DataSet
*/
public <R> ReduceGroupOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reducer) {
return new ReduceGroupOperator<T, R>(this, reducer);
}
// --------------------------------------------------------------------------------------------
// Group Operations
// --------------------------------------------------------------------------------------------
/**
* Sorts {@link Tuple} elements within a group on the specified field in the specified {@link Order}.</br>
* <b>Note: Only groups of Tuple elements can be sorted.</b><br/>
* Groups can be sorted by multiple fields by chaining {@link #sortGroup(int, Order)} calls.
*
* @param field The Tuple field on which the group is sorted.
* @param order The Order in which the specified Tuple field is sorted.
* @return A SortedGrouping with specified order of group element.
*
* @see Tuple
* @see Order
*/
public SortedGrouping<T> sortGroup(int field, Order order) {
int pos;
if (!dataSet.getType().isTupleType()) {
throw new InvalidProgramException("Specifying order keys via field positions is only valid for tuple data types");
}
if (field >= dataSet.getType().getArity()) {
throw new IllegalArgumentException("Order key out of tuple bounds.");
}
int newLength = this.groupSortKeyPositions.length + 1;
this.groupSortKeyPositions = Arrays.copyOf(this.groupSortKeyPositions, newLength);
this.groupSortOrders = Arrays.copyOf(this.groupSortOrders, newLength);
pos = newLength - 1;
this.groupSortKeyPositions[pos] = field;
this.groupSortOrders[pos] = order;
return this;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册