提交 70464bb0 编写于 作者: G Gyula Fora 提交者: mbalassi

[FLINK-1121] [streaming] minBy and maxBy operators added to streaming api

上级 30ac9fe6
...@@ -246,9 +246,11 @@ When the reduce operator is applied on a grouped data stream, the user-defined ` ...@@ -246,9 +246,11 @@ When the reduce operator is applied on a grouped data stream, the user-defined `
The Flink Streaming API supports different types of aggregation operators similarly to the core API. For grouped data streams the aggregations work in a grouped fashion. The Flink Streaming API supports different types of aggregation operators similarly to the core API. For grouped data streams the aggregations work in a grouped fashion.
Types of aggregations: `sum(fieldPosition)`, `min(fieldPosition)`, `max(fieldPosition)` Types of aggregations: `sum(fieldPosition)`, `min(fieldPosition)`, `max(fieldPosition)`, `minBy(fieldPosition, first)`, `maxBy(fieldPosition, first)`
For every incoming tuple the selected field is replaced with the current aggregated value. If the aggregations are used without defining field position, position `0` is used as default. With `sum`, `min`, and `max` for every incoming tuple the selected field is replaced with the current aggregated value. If the aggregations are used without defining field position, position `0` is used as default.
With `minBy` and `maxBy` the output of the operator is the element with the current minimal or maximal value at the given fieldposition. If more components share the minimum or maximum value, the user can decide if the operator should return the first or last element. This can be set by the `first` boolean parameter.
### Window/Batch operators ### Window/Batch operators
......
...@@ -24,7 +24,9 @@ import org.apache.flink.api.common.functions.RichReduceFunction; ...@@ -24,7 +24,9 @@ import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.function.aggregation.AggregationFunction; import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction; import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.MaxByAggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction; import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.MinByAggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction; import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction;
import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable; import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable;
...@@ -118,8 +120,8 @@ public class BatchedDataStream<OUT> { ...@@ -118,8 +120,8 @@ public class BatchedDataStream<OUT> {
*/ */
public <R> SingleOutputStreamOperator<R, ?> reduceGroup(GroupReduceFunction<OUT, R> reducer) { public <R> SingleOutputStreamOperator<R, ?> reduceGroup(GroupReduceFunction<OUT, R> reducer) {
return dataStream.addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT>(reducer, return dataStream.addFunction("batchReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
GroupReduceFunction.class, 0), new FunctionTypeWrapper<R>(reducer, GroupReduceFunction.class, GroupReduceFunction.class, 0), new FunctionTypeWrapper<R>(reducer,
1), getGroupReduceInvokable(reducer)); GroupReduceFunction.class, 1), getGroupReduceInvokable(reducer));
} }
/** /**
...@@ -159,6 +161,38 @@ public class BatchedDataStream<OUT> { ...@@ -159,6 +161,38 @@ public class BatchedDataStream<OUT> {
return aggregate(new MinAggregationFunction<OUT>(positionToMin)); return aggregate(new MinAggregationFunction<OUT>(positionToMin));
} }
/**
* Applies an aggregation that gives the minimum element of every sliding
* batch/window of the data stream by the given position. If more elements
* have the same minimum value the operator returns the first element by
* default.
*
* @param positionToMinBy
* The position in the data point to minimize
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy) {
return this.minBy(positionToMinBy, true);
}
/**
* Applies an aggregation that gives the minimum element of every sliding
* batch/window of the data stream by the given position. If more elements
* have the same minimum value the operator returns either the first or last
* one depending on the parameter setting.
*
* @param positionToMinBy
* The position in the data point to minimize
* @param first
* If true, then the operator return the first element with the
* minimum value, otherwise returns the last
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy, boolean first) {
dataStream.checkFieldRange(positionToMinBy);
return aggregate(new MinByAggregationFunction<OUT>(positionToMinBy, first));
}
/** /**
* Syntactic sugar for min(0) * Syntactic sugar for min(0)
* *
...@@ -181,6 +215,37 @@ public class BatchedDataStream<OUT> { ...@@ -181,6 +215,37 @@ public class BatchedDataStream<OUT> {
return aggregate(new MaxAggregationFunction<OUT>(positionToMax)); return aggregate(new MaxAggregationFunction<OUT>(positionToMax));
} }
/**
* Applies an aggregation that gives the maximum element of every sliding
* batch/window of the data stream by the given position. If more elements
* have the same maximum value the operator returns the first by default.
*
* @param positionToMaxBy
* The position in the data point to maximize
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy) {
return this.maxBy(positionToMaxBy, true);
}
/**
* Applies an aggregation that gives the maximum element of every sliding
* batch/window of the data stream by the given position. If more elements
* have the same maximum value the operator returns either the first or last
* one depending on the parameter setting.
*
* @param positionToMaxBy
* The position in the data point to maximize
* @param first
* If true, then the operator return the first element with the
* maximum value, otherwise returns the last
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first) {
dataStream.checkFieldRange(positionToMaxBy);
return aggregate(new MaxByAggregationFunction<OUT>(positionToMaxBy, first));
}
/** /**
* Syntactic sugar for max(0) * Syntactic sugar for max(0)
* *
......
...@@ -39,7 +39,9 @@ import org.apache.flink.streaming.api.JobGraphBuilder; ...@@ -39,7 +39,9 @@ import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.aggregation.AggregationFunction; import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction; import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.MaxByAggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction; import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.MinByAggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction; import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction;
import org.apache.flink.streaming.api.function.sink.PrintSinkFunction; import org.apache.flink.streaming.api.function.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.function.sink.SinkFunction; import org.apache.flink.streaming.api.function.sink.SinkFunction;
...@@ -552,6 +554,38 @@ public class DataStream<OUT> { ...@@ -552,6 +554,38 @@ public class DataStream<OUT> {
return aggregate(new MinAggregationFunction<OUT>(positionToMin)); return aggregate(new MinAggregationFunction<OUT>(positionToMin));
} }
/**
* Applies an aggregation that that gives the current element with the
* minimum value at the given position, if more elements have the minimum
* value at the given position, the operator returns the first one by
* default.
*
* @param positionToMinBy
* The position in the data point to minimize
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy) {
return this.minBy(positionToMinBy, true);
}
/**
* Applies an aggregation that that gives the current element with the
* minimum value at the given position, if more elements have the minimum
* value at the given position, the operator returns either the first or
* last one, depending on the parameter set.
*
* @param positionToMinBy
* The position in the data point to minimize
* @param first
* If true, then the operator return the first element with the
* minimal value, otherwise returns the last
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy, boolean first) {
checkFieldRange(positionToMinBy);
return aggregate(new MinByAggregationFunction<OUT>(positionToMinBy, first));
}
/** /**
* Syntactic sugar for min(0) * Syntactic sugar for min(0)
* *
...@@ -574,6 +608,38 @@ public class DataStream<OUT> { ...@@ -574,6 +608,38 @@ public class DataStream<OUT> {
return aggregate(new MaxAggregationFunction<OUT>(positionToMax)); return aggregate(new MaxAggregationFunction<OUT>(positionToMax));
} }
/**
* Applies an aggregation that that gives the current element with the
* maximum value at the given position, if more elements have the maximum
* value at the given position, the operator returns the first one by
* default.
*
* @param positionToMaxBy
* The position in the data point to maximize
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy) {
return this.maxBy(positionToMaxBy, true);
}
/**
* Applies an aggregation that that gives the current element with the
* maximum value at the given position, if more elements have the maximum
* value at the given position, the operator returns either the first or
* last one, depending on the parameter set.
*
* @param positionToMaxBy
* The position in the data point to maximize.
* @param first
* If true, then the operator return the first element with the
* maximum value, otherwise returns the last
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first) {
checkFieldRange(positionToMaxBy);
return aggregate(new MaxByAggregationFunction<OUT>(positionToMaxBy, first));
}
/** /**
* Syntactic sugar for max(0) * Syntactic sugar for max(0)
* *
......
...@@ -75,6 +75,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> { ...@@ -75,6 +75,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
* The position in the data point to sum * The position in the data point to sum
* @return The transformed DataStream. * @return The transformed DataStream.
*/ */
@Override
public SingleOutputStreamOperator<OUT, ?> sum(final int positionToSum) { public SingleOutputStreamOperator<OUT, ?> sum(final int positionToSum) {
return super.sum(positionToSum); return super.sum(positionToSum);
} }
...@@ -88,10 +89,45 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> { ...@@ -88,10 +89,45 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
* The position in the data point to minimize * The position in the data point to minimize
* @return The transformed DataStream. * @return The transformed DataStream.
*/ */
@Override
public SingleOutputStreamOperator<OUT, ?> min(final int positionToMin) { public SingleOutputStreamOperator<OUT, ?> min(final int positionToMin) {
return super.min(positionToMin); return super.min(positionToMin);
} }
/**
* Applies an aggregation that that gives the current element with the
* minimum value at the given position for each group on a grouped data
* stream. If more elements have the minimum value at the given position,
* the operator returns the first one by default.
*
* @param positionToMinBy
* The position in the data point to minimize
* @return The transformed DataStream.
*/
@Override
public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy) {
return super.minBy(positionToMinBy);
}
/**
* Applies an aggregation that that gives the current element with the
* minimum value at the given position for each group on a grouped data
* stream. If more elements have the minimum value at the given position,
* the operator returns either the first or last one depending on the
* parameters.
*
* @param positionToMinBy
* The position in the data point to minimize
* @param first
* If true, then the operator return the first element with the
* maximum value, otherwise returns the last
* @return The transformed DataStream.
*/
@Override
public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy, boolean first) {
return super.minBy(positionToMinBy, first);
}
/** /**
* Applies an aggregation that gives the maximum of the grouped data stream * Applies an aggregation that gives the maximum of the grouped data stream
* at the given position, grouped by the given key position. Input values * at the given position, grouped by the given key position. Input values
...@@ -101,10 +137,45 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> { ...@@ -101,10 +137,45 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
* The position in the data point to maximize * The position in the data point to maximize
* @return The transformed DataStream. * @return The transformed DataStream.
*/ */
@Override
public SingleOutputStreamOperator<OUT, ?> max(final int positionToMax) { public SingleOutputStreamOperator<OUT, ?> max(final int positionToMax) {
return super.max(positionToMax); return super.max(positionToMax);
} }
/**
* Applies an aggregation that that gives the current element with the
* maximum value at the given position for each group on a grouped data
* stream. If more elements have the maximum value at the given position,
* the operator returns the first one by default.
*
* @param positionToMaxBy
* The position in the data point to maximize
* @return The transformed DataStream.
*/
@Override
public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy) {
return super.maxBy(positionToMaxBy);
}
/**
* Applies an aggregation that that gives the current element with the
* maximum value at the given position for each group on a grouped data
* stream. If more elements have the maximum value at the given position,
* the operator returns either the first or last one depending on the
* parameters.
*
* @param positionToMaxBy
* The position in the data point to maximize
* @param first
* If true, then the operator return the first element with the
* maximum value, otherwise returns the last
* @return The transformed DataStream.
*/
@Override
public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first) {
return super.maxBy(positionToMaxBy, first);
}
@Override @Override
protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) { protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.function.aggregation;
public class MaxByAggregationFunction<T> extends MinByAggregationFunction<T> {
private static final long serialVersionUID = 1L;
public MaxByAggregationFunction(int pos, boolean first) {
super(pos, first);
}
@Override
public <R> boolean isExtremal(Comparable<R> o1, R o2) {
if (first) {
return o1.compareTo(o2) >= 0;
} else {
return o1.compareTo(o2) > 0;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.function.aggregation;
import org.apache.flink.api.java.tuple.Tuple;
public class MinByAggregationFunction<T> extends ComparableAggregationFunction<T> {
private static final long serialVersionUID = 1L;
protected boolean first;
public MinByAggregationFunction(int pos, boolean first) {
super(pos);
this.first = first;
}
@Override
public <R> void compare(Tuple tuple1, Tuple tuple2) throws InstantiationException,
IllegalAccessException {
Comparable<R> o1 = tuple1.getField(position);
R o2 = tuple2.getField(position);
if (isExtremal(o1, o2)) {
returnTuple = tuple1;
} else {
returnTuple = tuple2;
}
}
@Override
public <R> boolean isExtremal(Comparable<R> o1, R o2) {
if (first) {
return o1.compareTo(o2) <= 0;
} else {
return o1.compareTo(o2) < 0;
}
}
}
...@@ -26,7 +26,9 @@ import java.util.List; ...@@ -26,7 +26,9 @@ import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction; import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.MaxByAggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction; import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.MinByAggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction; import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction;
import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable; import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable; import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable;
...@@ -49,7 +51,7 @@ public class AggregationFunctionTest { ...@@ -49,7 +51,7 @@ public class AggregationFunctionTest {
List<Tuple2<Integer, Integer>> expectedGroupMaxList = new ArrayList<Tuple2<Integer, Integer>>(); List<Tuple2<Integer, Integer>> expectedGroupMaxList = new ArrayList<Tuple2<Integer, Integer>>();
List<Integer> simpleInput = new ArrayList<Integer>(); List<Integer> simpleInput = new ArrayList<Integer>();
int groupedSum0 = 0; int groupedSum0 = 0;
int groupedSum1 = 0; int groupedSum1 = 0;
int groupedSum2 = 0; int groupedSum2 = 0;
...@@ -86,16 +88,14 @@ public class AggregationFunctionTest { ...@@ -86,16 +88,14 @@ public class AggregationFunctionTest {
SumAggregationFunction<Tuple2<Integer, Integer>> sumFunction = SumAggregationFunction SumAggregationFunction<Tuple2<Integer, Integer>> sumFunction = SumAggregationFunction
.getSumFunction(1, Integer.class); .getSumFunction(1, Integer.class);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
SumAggregationFunction<Integer> sumFunction0 = SumAggregationFunction SumAggregationFunction<Integer> sumFunction0 = SumAggregationFunction.getSumFunction(0,
.getSumFunction(0, Integer.class); Integer.class);
MinAggregationFunction<Tuple2<Integer, Integer>> minFunction = new MinAggregationFunction<Tuple2<Integer, Integer>>( MinAggregationFunction<Tuple2<Integer, Integer>> minFunction = new MinAggregationFunction<Tuple2<Integer, Integer>>(
1); 1);
MinAggregationFunction<Integer> minFunction0 = new MinAggregationFunction<Integer>( MinAggregationFunction<Integer> minFunction0 = new MinAggregationFunction<Integer>(0);
0);
MaxAggregationFunction<Tuple2<Integer, Integer>> maxFunction = new MaxAggregationFunction<Tuple2<Integer, Integer>>( MaxAggregationFunction<Tuple2<Integer, Integer>> maxFunction = new MaxAggregationFunction<Tuple2<Integer, Integer>>(
1); 1);
MaxAggregationFunction<Integer> maxFunction0 = new MaxAggregationFunction<Integer>( MaxAggregationFunction<Integer> maxFunction0 = new MaxAggregationFunction<Integer>(0);
0);
List<Tuple2<Integer, Integer>> sumList = MockInvokable.createAndExecute( List<Tuple2<Integer, Integer>> sumList = MockInvokable.createAndExecute(
new StreamReduceInvokable<Tuple2<Integer, Integer>>(sumFunction), getInputList()); new StreamReduceInvokable<Tuple2<Integer, Integer>>(sumFunction), getInputList());
...@@ -107,13 +107,16 @@ public class AggregationFunctionTest { ...@@ -107,13 +107,16 @@ public class AggregationFunctionTest {
new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxFunction), getInputList()); new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxFunction), getInputList());
List<Tuple2<Integer, Integer>> groupedSumList = MockInvokable.createAndExecute( List<Tuple2<Integer, Integer>> groupedSumList = MockInvokable.createAndExecute(
new GroupedReduceInvokable<Tuple2<Integer, Integer>>(sumFunction, 0), getInputList()); new GroupedReduceInvokable<Tuple2<Integer, Integer>>(sumFunction, 0),
getInputList());
List<Tuple2<Integer, Integer>> groupedMinList = MockInvokable.createAndExecute( List<Tuple2<Integer, Integer>> groupedMinList = MockInvokable.createAndExecute(
new GroupedReduceInvokable<Tuple2<Integer, Integer>>(minFunction, 0), getInputList()); new GroupedReduceInvokable<Tuple2<Integer, Integer>>(minFunction, 0),
getInputList());
List<Tuple2<Integer, Integer>> groupedMaxList = MockInvokable.createAndExecute( List<Tuple2<Integer, Integer>> groupedMaxList = MockInvokable.createAndExecute(
new GroupedReduceInvokable<Tuple2<Integer, Integer>>(maxFunction, 0), getInputList()); new GroupedReduceInvokable<Tuple2<Integer, Integer>>(maxFunction, 0),
getInputList());
assertEquals(expectedSumList, sumList); assertEquals(expectedSumList, sumList);
assertEquals(expectedMinList, minList); assertEquals(expectedMinList, minList);
...@@ -121,31 +124,171 @@ public class AggregationFunctionTest { ...@@ -121,31 +124,171 @@ public class AggregationFunctionTest {
assertEquals(expectedGroupSumList, groupedSumList); assertEquals(expectedGroupSumList, groupedSumList);
assertEquals(expectedGroupMinList, groupedMinList); assertEquals(expectedGroupMinList, groupedMinList);
assertEquals(expectedGroupMaxList, groupedMaxList); assertEquals(expectedGroupMaxList, groupedMaxList);
assertEquals(expectedSumList0, MockInvokable.createAndExecute(new StreamReduceInvokable<Integer>(sumFunction0),simpleInput )); assertEquals(expectedSumList0, MockInvokable.createAndExecute(
assertEquals(expectedMinList0, MockInvokable.createAndExecute(new StreamReduceInvokable<Integer>(minFunction0),simpleInput )); new StreamReduceInvokable<Integer>(sumFunction0), simpleInput));
assertEquals(expectedMaxList0, MockInvokable.createAndExecute(new StreamReduceInvokable<Integer>(maxFunction0),simpleInput )); assertEquals(expectedMinList0, MockInvokable.createAndExecute(
new StreamReduceInvokable<Integer>(minFunction0), simpleInput));
assertEquals(expectedMaxList0, MockInvokable.createAndExecute(
new StreamReduceInvokable<Integer>(maxFunction0), simpleInput));
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
try { try {
env.generateSequence(1, 100).min(1); env.generateSequence(1, 100).min(1);
fail(); fail();
} catch (Exception e) { } catch (Exception e) {
//Nothing to do here // Nothing to do here
} }
try { try {
env.generateSequence(1, 100).min(2); env.generateSequence(1, 100).min(2);
fail(); fail();
} catch (Exception e) { } catch (Exception e) {
//Nothing to do here // Nothing to do here
} }
try { try {
env.generateSequence(1, 100).min(3); env.generateSequence(1, 100).min(3);
fail(); fail();
} catch (Exception e) { } catch (Exception e) {
//Nothing to do here // Nothing to do here
} }
MaxByAggregationFunction<Tuple2<Integer, Integer>> maxByFunctionFirst = new MaxByAggregationFunction<Tuple2<Integer, Integer>>(
0, true);
MaxByAggregationFunction<Tuple2<Integer, Integer>> maxByFunctionLast = new MaxByAggregationFunction<Tuple2<Integer, Integer>>(
0, false);
MinByAggregationFunction<Tuple2<Integer, Integer>> minByFunctionFirst = new MinByAggregationFunction<Tuple2<Integer, Integer>>(
0, true);
MinByAggregationFunction<Tuple2<Integer, Integer>> minByFunctionLast = new MinByAggregationFunction<Tuple2<Integer, Integer>>(
0, false);
List<Tuple2<Integer, Integer>> maxByFirstExpected = new ArrayList<Tuple2<Integer, Integer>>();
maxByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
maxByFirstExpected.add(new Tuple2<Integer, Integer>(1, 1));
maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
List<Tuple2<Integer, Integer>> maxByLastExpected = new ArrayList<Tuple2<Integer, Integer>>();
maxByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
maxByLastExpected.add(new Tuple2<Integer, Integer>(1, 1));
maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 8));
List<Tuple2<Integer, Integer>> minByFirstExpected = new ArrayList<Tuple2<Integer, Integer>>();
minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
List<Tuple2<Integer, Integer>> minByLastExpected = new ArrayList<Tuple2<Integer, Integer>>();
minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
assertEquals(maxByFirstExpected, MockInvokable.createAndExecute(
new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionFirst),
getInputList()));
assertEquals(maxByLastExpected, MockInvokable.createAndExecute(
new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionLast),
getInputList()));
assertEquals(minByLastExpected, MockInvokable.createAndExecute(
new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionLast),
getInputList()));
assertEquals(minByFirstExpected, MockInvokable.createAndExecute(
new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionFirst),
getInputList()));
}
@Test
public void minMaxByTest() {
MaxByAggregationFunction<Tuple2<Integer, Integer>> maxByFunctionFirst = new MaxByAggregationFunction<Tuple2<Integer, Integer>>(
0, true);
MaxByAggregationFunction<Tuple2<Integer, Integer>> maxByFunctionLast = new MaxByAggregationFunction<Tuple2<Integer, Integer>>(
0, false);
MinByAggregationFunction<Tuple2<Integer, Integer>> minByFunctionFirst = new MinByAggregationFunction<Tuple2<Integer, Integer>>(
0, true);
MinByAggregationFunction<Tuple2<Integer, Integer>> minByFunctionLast = new MinByAggregationFunction<Tuple2<Integer, Integer>>(
0, false);
List<Tuple2<Integer, Integer>> maxByFirstExpected = new ArrayList<Tuple2<Integer, Integer>>();
maxByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
maxByFirstExpected.add(new Tuple2<Integer, Integer>(1, 1));
maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
maxByFirstExpected.add(new Tuple2<Integer, Integer>(2, 2));
List<Tuple2<Integer, Integer>> maxByLastExpected = new ArrayList<Tuple2<Integer, Integer>>();
maxByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
maxByLastExpected.add(new Tuple2<Integer, Integer>(1, 1));
maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 2));
maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 5));
maxByLastExpected.add(new Tuple2<Integer, Integer>(2, 8));
List<Tuple2<Integer, Integer>> minByFirstExpected = new ArrayList<Tuple2<Integer, Integer>>();
minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
minByFirstExpected.add(new Tuple2<Integer, Integer>(0, 0));
List<Tuple2<Integer, Integer>> minByLastExpected = new ArrayList<Tuple2<Integer, Integer>>();
minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
minByLastExpected.add(new Tuple2<Integer, Integer>(0, 0));
minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
minByLastExpected.add(new Tuple2<Integer, Integer>(0, 3));
minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
assertEquals(maxByFirstExpected, MockInvokable.createAndExecute(
new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionFirst),
getInputList()));
assertEquals(maxByLastExpected, MockInvokable.createAndExecute(
new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionLast),
getInputList()));
assertEquals(minByLastExpected, MockInvokable.createAndExecute(
new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionLast),
getInputList()));
assertEquals(minByFirstExpected, MockInvokable.createAndExecute(
new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionFirst),
getInputList()));
} }
private List<Tuple2<Integer, Integer>> getInputList() { private List<Tuple2<Integer, Integer>> getInputList() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册