提交 98770626 编写于 作者: F Fabian Hueske

Added UnsortedGrouping to separate Grouping methods from SortedGrouping methods

上级 6c4afe1f
......@@ -19,6 +19,7 @@ import org.apache.commons.lang3.Validate;
import eu.stratosphere.api.common.io.FileOutputFormat;
import eu.stratosphere.api.common.io.OutputFormat;
import eu.stratosphere.api.java.aggregation.Aggregations;
import eu.stratosphere.api.java.functions.CoGroupFunction;
import eu.stratosphere.api.java.functions.FilterFunction;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.functions.GroupReduceFunction;
......@@ -31,21 +32,26 @@ import eu.stratosphere.api.java.io.TextOutputFormat;
import eu.stratosphere.api.java.operators.AggregateOperator;
import eu.stratosphere.api.java.operators.CoGroupOperator;
import eu.stratosphere.api.java.operators.CoGroupOperator.CoGroupOperatorSets;
import eu.stratosphere.api.java.operators.CrossOperator.DefaultCross;
import eu.stratosphere.api.java.operators.CrossOperator;
import eu.stratosphere.api.java.operators.CrossOperator.DefaultCross;
import eu.stratosphere.api.java.operators.CustomUnaryOperation;
import eu.stratosphere.api.java.operators.DataSink;
import eu.stratosphere.api.java.operators.FilterOperator;
import eu.stratosphere.api.java.operators.FlatMapOperator;
import eu.stratosphere.api.java.operators.Grouping;
import eu.stratosphere.api.java.operators.JoinOperator;
import eu.stratosphere.api.java.operators.JoinOperator.JoinHint;
import eu.stratosphere.api.java.operators.JoinOperator.JoinOperatorSets;
import eu.stratosphere.api.java.operators.Keys;
import eu.stratosphere.api.java.operators.MapOperator;
import eu.stratosphere.api.java.operators.ProjectOperator;
import eu.stratosphere.api.java.operators.ProjectOperator.Projection;
import eu.stratosphere.api.java.operators.ReduceGroupOperator;
import eu.stratosphere.api.java.operators.ReduceOperator;
import eu.stratosphere.api.java.operators.SortedGrouping;
import eu.stratosphere.api.java.operators.UnionOperator;
import eu.stratosphere.api.java.operators.UnsortedGrouping;
import eu.stratosphere.api.java.record.functions.CrossFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.typeutils.InputTypeConfigurable;
......@@ -251,10 +257,6 @@ public abstract class DataSet<T> {
// return new DistinctOperator<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, getType()));
// }
// public DistinctOperator<T> distinct(String fieldExpression) {
// return new DistinctOperator<T>(this, new Keys.ExpressionKeys<T>(fieldExpression, getType()));
// }
// public DistinctOperator<T> distinct(int... fields) {
// return new DistinctOperator<T>(this, new Keys.FieldPositionKeys<T>(fields, getType(), true));
// }
......@@ -267,42 +269,42 @@ public abstract class DataSet<T> {
* Groups a {@link DataSet} using a {@link KeySelector} function.
* The KeySelector function is called for each element of the DataSet and extracts a single
* key value on which the DataSet is grouped. </br>
* This method returns a {@link Grouping} on which one of the following grouping transformation
* needs to be applied to obtain a transformed DataSet.
* This method returns an {@link UnsortedGrouping} on which one of the following grouping transformation
* can be applied.
* <ul>
* <li>{@link Grouping#aggregate(Aggregations, int)}
* <li>{@link Grouping#reduce(ReduceFunction)}
* <li>{@link Grouping#reduceGroup(GroupReduceFunction)}
* <li>{@link UnsortedGrouping#sortGroup(int, eu.stratosphere.api.common.operators.Order)} to get a {@link SortedGrouping}.
* <li>{@link Grouping#aggregate(Aggregations, int)} to apply an Aggregate transformation.
* <li>{@link Grouping#reduce(ReduceFunction)} to apply a Reduce transformation.
* <li>{@link Grouping#reduceGroup(GroupReduceFunction)} to apply a GroupReduce transformation.
* </ul>
*
* @param keyExtractor The KeySelector function which extracts the key values from the DataSet on which it is grouped.
* @return A Grouping on which a transformation needs to be applied to obtain a transformed DataSet.
* @return An UnsortedGrouping on which a transformation needs to be applied to obtain a transformed DataSet.
*
* @see KeySelector
* @see Grouping
* @see UnsortedGrouping
* @see SortedGrouping
* @see AggregateOperator
* @see ReduceOperator
* @see GroupReduceOperator
* @see DataSet
*/
public <K extends Comparable<K>> Grouping<T> groupBy(KeySelector<T, K> keyExtractor) {
return new Grouping<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, getType()));
public <K extends Comparable<K>> UnsortedGrouping<T> groupBy(KeySelector<T, K> keyExtractor) {
return new UnsortedGrouping<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, getType()));
}
// public Grouping<T> groupBy(String fieldExpression) {
// return new Grouping<T>(this, new Keys.ExpressionKeys<T>(fieldExpression, getType()));
// }
/**
* Groups a {@link Tuple} {@link DataSet} using field position keys.<br/>
* <b>Note: Field position keys only be specified for Tuple DataSets.</b></br>
* The field position keys specify the fields of Tuples on which the DataSet is grouped.
* This method returns a {@link Grouping} on which one of the following grouping transformation
* needs to be applied to obtain a transformed DataSet.
* This method returns an {@link UnsortedGrouping} on which one of the following grouping transformation
* can be applied.
* <ul>
* <li>{@link Grouping#aggregate(Aggregations, int)}
* <li>{@link Grouping#reduce(ReduceFunction)}
* <li>{@link Grouping#reduceGroup(GroupReduceFunction)}
* <li>{@link UnsortedGrouping#sortGroup(int, eu.stratosphere.api.common.operators.Order)} to get a {@link SortedGrouping}.
* <li>{@link Grouping#aggregate(Aggregations, int)} to apply an Aggregate transformation.
* <li>{@link Grouping#reduce(ReduceFunction)} to apply a Reduce transformation.
* <li>{@link Grouping#reduceGroup(GroupReduceFunction)} to apply a GroupReduce transformation.
* </ul>
*
* @param fields One or more field positions on which the DataSet will be grouped.
......@@ -310,13 +312,15 @@ public abstract class DataSet<T> {
*
* @see Tuple
* @see Grouping
* @see UnsortedGrouping
* @see SortedGrouping
* @see AggregateOperator
* @see ReduceOperator
* @see GroupReduceOperator
* @see DataSet
*/
public Grouping<T> groupBy(int... fields) {
return new Grouping<T>(this, new Keys.FieldPositionKeys<T>(fields, getType(), false));
public UnsortedGrouping<T> groupBy(int... fields) {
return new UnsortedGrouping<T>(this, new Keys.FieldPositionKeys<T>(fields, getType(), false));
}
// --------------------------------------------------------------------------------------------
......
......@@ -15,11 +15,7 @@
package eu.stratosphere.api.java.operators;
import eu.stratosphere.api.common.InvalidProgramException;
import eu.stratosphere.api.common.operators.Order;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.aggregation.Aggregations;
import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.functions.ReduceFunction;
/**
* Grouping is an intermediate step for a transformation on a grouped DataSet.<br/>
......@@ -34,7 +30,7 @@ import eu.stratosphere.api.java.functions.ReduceFunction;
*
* @see DataSet
*/
public class Grouping<T> {
public abstract class Grouping<T> {
protected final DataSet<T> dataSet;
......@@ -61,83 +57,5 @@ public class Grouping<T> {
protected Keys<T> getKeys() {
return this.keys;
}
// --------------------------------------------------------------------------------------------
// Operations / Transformations
// --------------------------------------------------------------------------------------------
/**
* Applies an Aggregate transformation on a grouped {@link Tuple} {@link DataSet}.<br/>
* <b>Note: Only Tuple DataSets can be aggregated.</b>
* The transformation applies a built-in {@link Aggregations Aggregation} on a specified field
* of a Tuple group. Additional aggregation functions can be added to the resulting
* {@link AggregateOperator} by calling {@link AggregateOperator#and(Aggregations, int)}.
*
* @param agg The built-in aggregation function that is computed.
* @param field The index of the Tuple field on which the aggregation function is applied.
* @return An AggregateOperator that represents the aggregated DataSet.
*
* @see Tuple
* @see Aggregations
* @see AggregateOperator
* @see DataSet
*/
public AggregateOperator<T> aggregate(Aggregations agg, int field) {
return new AggregateOperator<T>(this, agg, field);
}
/**
* Applies a Reduce transformation on a grouped {@link DataSet}.<br/>
* For each group, the transformation consecutively calls a {@link ReduceFunction}
* until only a single element for each group remains.
* A ReduceFunction combines two elements into one new element of the same type.
*
* @param reducer The ReduceFunction that is applied on each group of the DataSet.
* @return A ReduceOperator that represents the reduced DataSet.
*
* @see ReduceFunction
* @see ReduceOperator
* @see DataSet
*/
public ReduceOperator<T> reduce(ReduceFunction<T> reducer) {
return new ReduceOperator<T>(this, reducer);
}
/**
* Applies a GroupReduce transformation on a grouped {@link DataSet}.<br/>
* The transformation calls a {@link GroupReduceFunction} for each group of the DataSet.
* A GroupReduceFunction can iterate over all elements of a group and emit any
* number of output elements including none.
*
* @param reducer The GroupReduceFunction that is applied on each group of the DataSet.
* @return A GroupReduceOperator that represents the reduced DataSet.
*
* @see GroupReduceFunction
* @see GroupReduceOperator
* @see DataSet
*/
public <R> ReduceGroupOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reducer) {
return new ReduceGroupOperator<T, R>(this, reducer);
}
// --------------------------------------------------------------------------------------------
// Group Operations
// --------------------------------------------------------------------------------------------
/**
* Sorts {@link Tuple} elements within a group on the specified field in the specified {@link Order}.</br>
* <b>Note: Only groups of Tuple elements can be sorted.</b><br/>
* Groups can be sorted by multiple fields by chaining {@link #sortGroup(int, Order)} calls.
*
* @param field The Tuple field on which the group is sorted.
* @param order The Order in which the specified Tuple field is sorted.
* @return A SortedGrouping with specified order of group element.
*
* @see Tuple
* @see Order
*/
public SortedGrouping<T> sortGroup(int field, Order order) {
return new SortedGrouping<T>(this.dataSet, this.keys, field, order);
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.api.java.operators;
import eu.stratosphere.api.common.operators.Order;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.aggregation.Aggregations;
import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.functions.ReduceFunction;
public class UnsortedGrouping<T> extends Grouping<T> {
public UnsortedGrouping(DataSet<T> set, Keys<T> keys) {
super(set, keys);
}
// --------------------------------------------------------------------------------------------
// Operations / Transformations
// --------------------------------------------------------------------------------------------
/**
* Applies an Aggregate transformation on a grouped {@link Tuple} {@link DataSet}.<br/>
* <b>Note: Only Tuple DataSets can be aggregated.</b>
* The transformation applies a built-in {@link Aggregations Aggregation} on a specified field
* of a Tuple group. Additional aggregation functions can be added to the resulting
* {@link AggregateOperator} by calling {@link AggregateOperator#and(Aggregations, int)}.
*
* @param agg The built-in aggregation function that is computed.
* @param field The index of the Tuple field on which the aggregation function is applied.
* @return An AggregateOperator that represents the aggregated DataSet.
*
* @see Tuple
* @see Aggregations
* @see AggregateOperator
* @see DataSet
*/
public AggregateOperator<T> aggregate(Aggregations agg, int field) {
return new AggregateOperator<T>(this, agg, field);
}
/**
* Applies a Reduce transformation on a grouped {@link DataSet}.<br/>
* For each group, the transformation consecutively calls a {@link ReduceFunction}
* until only a single element for each group remains.
* A ReduceFunction combines two elements into one new element of the same type.
*
* @param reducer The ReduceFunction that is applied on each group of the DataSet.
* @return A ReduceOperator that represents the reduced DataSet.
*
* @see ReduceFunction
* @see ReduceOperator
* @see DataSet
*/
public ReduceOperator<T> reduce(ReduceFunction<T> reducer) {
return new ReduceOperator<T>(this, reducer);
}
/**
* Applies a GroupReduce transformation on a grouped {@link DataSet}.<br/>
* The transformation calls a {@link GroupReduceFunction} for each group of the DataSet.
* A GroupReduceFunction can iterate over all elements of a group and emit any
* number of output elements including none.
*
* @param reducer The GroupReduceFunction that is applied on each group of the DataSet.
* @return A GroupReduceOperator that represents the reduced DataSet.
*
* @see GroupReduceFunction
* @see GroupReduceOperator
* @see DataSet
*/
public <R> ReduceGroupOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reducer) {
return new ReduceGroupOperator<T, R>(this, reducer);
}
// --------------------------------------------------------------------------------------------
// Group Operations
// --------------------------------------------------------------------------------------------
/**
* Sorts {@link Tuple} elements within a group on the specified field in the specified {@link Order}.</br>
* <b>Note: Only groups of Tuple elements can be sorted.</b><br/>
* Groups can be sorted by multiple fields by chaining {@link #sortGroup(int, Order)} calls.
*
* @param field The Tuple field on which the group is sorted.
* @param order The Order in which the specified Tuple field is sorted.
* @return A SortedGrouping with specified order of group element.
*
* @see Tuple
* @see Order
*/
public SortedGrouping<T> sortGroup(int field, Order order) {
return new SortedGrouping<T>(this.dataSet, this.keys, field, order);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册