From 70464bb0a44f682c155fdfdd2a6b0a6cc1203663 Mon Sep 17 00:00:00 2001 From: Gyula Fora Date: Wed, 24 Sep 2014 16:57:36 +0200 Subject: [PATCH] [FLINK-1121] [streaming] minBy and maxBy operators added to streaming api --- docs/streaming_guide.md | 6 +- .../api/datastream/BatchedDataStream.java | 69 ++++++- .../streaming/api/datastream/DataStream.java | 66 +++++++ .../api/datastream/GroupedDataStream.java | 71 +++++++ .../aggregation/MaxByAggregationFunction.java | 37 ++++ .../aggregation/MinByAggregationFunction.java | 55 ++++++ .../api/AggregationFunctionTest.java | 177 ++++++++++++++++-- 7 files changed, 460 insertions(+), 21 deletions(-) create mode 100644 flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxByAggregationFunction.java create mode 100644 flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinByAggregationFunction.java diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md index 27a32bae14a..37ff90d12fa 100644 --- a/docs/streaming_guide.md +++ b/docs/streaming_guide.md @@ -246,9 +246,11 @@ When the reduce operator is applied on a grouped data stream, the user-defined ` The Flink Streaming API supports different types of aggregation operators similarly to the core API. For grouped data streams the aggregations work in a grouped fashion. -Types of aggregations: `sum(fieldPosition)`, `min(fieldPosition)`, `max(fieldPosition)` +Types of aggregations: `sum(fieldPosition)`, `min(fieldPosition)`, `max(fieldPosition)`, `minBy(fieldPosition, first)`, `maxBy(fieldPosition, first)` -For every incoming tuple the selected field is replaced with the current aggregated value. If the aggregations are used without defining field position, position `0` is used as default. +With `sum`, `min`, and `max` for every incoming tuple the selected field is replaced with the current aggregated value. If the aggregations are used without defining field position, position `0` is used as default. + +With `minBy` and `maxBy` the output of the operator is the element with the current minimal or maximal value at the given fieldposition. If more components share the minimum or maximum value, the user can decide if the operator should return the first or last element. This can be set by the `first` boolean parameter. ### Window/Batch operators diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java index 51f1467582f..e8e3f31b9ce 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java @@ -24,7 +24,9 @@ import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.function.aggregation.AggregationFunction; import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction; +import org.apache.flink.streaming.api.function.aggregation.MaxByAggregationFunction; import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction; +import org.apache.flink.streaming.api.function.aggregation.MinByAggregationFunction; import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction; import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable; @@ -118,8 +120,8 @@ public class BatchedDataStream { */ public SingleOutputStreamOperator reduceGroup(GroupReduceFunction reducer) { return dataStream.addFunction("batchReduce", reducer, new FunctionTypeWrapper(reducer, - GroupReduceFunction.class, 0), new FunctionTypeWrapper(reducer, GroupReduceFunction.class, - 1), getGroupReduceInvokable(reducer)); + GroupReduceFunction.class, 0), new FunctionTypeWrapper(reducer, + GroupReduceFunction.class, 1), getGroupReduceInvokable(reducer)); } /** @@ -159,6 +161,38 @@ public class BatchedDataStream { return aggregate(new MinAggregationFunction(positionToMin)); } + /** + * Applies an aggregation that gives the minimum element of every sliding + * batch/window of the data stream by the given position. If more elements + * have the same minimum value the operator returns the first element by + * default. + * + * @param positionToMinBy + * The position in the data point to minimize + * @return The transformed DataStream. + */ + public SingleOutputStreamOperator minBy(int positionToMinBy) { + return this.minBy(positionToMinBy, true); + } + + /** + * Applies an aggregation that gives the minimum element of every sliding + * batch/window of the data stream by the given position. If more elements + * have the same minimum value the operator returns either the first or last + * one depending on the parameter setting. + * + * @param positionToMinBy + * The position in the data point to minimize + * @param first + * If true, then the operator return the first element with the + * minimum value, otherwise returns the last + * @return The transformed DataStream. + */ + public SingleOutputStreamOperator minBy(int positionToMinBy, boolean first) { + dataStream.checkFieldRange(positionToMinBy); + return aggregate(new MinByAggregationFunction(positionToMinBy, first)); + } + /** * Syntactic sugar for min(0) * @@ -181,6 +215,37 @@ public class BatchedDataStream { return aggregate(new MaxAggregationFunction(positionToMax)); } + /** + * Applies an aggregation that gives the maximum element of every sliding + * batch/window of the data stream by the given position. If more elements + * have the same maximum value the operator returns the first by default. + * + * @param positionToMaxBy + * The position in the data point to maximize + * @return The transformed DataStream. + */ + public SingleOutputStreamOperator maxBy(int positionToMaxBy) { + return this.maxBy(positionToMaxBy, true); + } + + /** + * Applies an aggregation that gives the maximum element of every sliding + * batch/window of the data stream by the given position. If more elements + * have the same maximum value the operator returns either the first or last + * one depending on the parameter setting. + * + * @param positionToMaxBy + * The position in the data point to maximize + * @param first + * If true, then the operator return the first element with the + * maximum value, otherwise returns the last + * @return The transformed DataStream. + */ + public SingleOutputStreamOperator maxBy(int positionToMaxBy, boolean first) { + dataStream.checkFieldRange(positionToMaxBy); + return aggregate(new MaxByAggregationFunction(positionToMaxBy, first)); + } + /** * Syntactic sugar for max(0) * diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 423de4b9fdf..8ff8c540964 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -39,7 +39,9 @@ import org.apache.flink.streaming.api.JobGraphBuilder; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.function.aggregation.AggregationFunction; import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction; +import org.apache.flink.streaming.api.function.aggregation.MaxByAggregationFunction; import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction; +import org.apache.flink.streaming.api.function.aggregation.MinByAggregationFunction; import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction; import org.apache.flink.streaming.api.function.sink.PrintSinkFunction; import org.apache.flink.streaming.api.function.sink.SinkFunction; @@ -552,6 +554,38 @@ public class DataStream { return aggregate(new MinAggregationFunction(positionToMin)); } + /** + * Applies an aggregation that that gives the current element with the + * minimum value at the given position, if more elements have the minimum + * value at the given position, the operator returns the first one by + * default. + * + * @param positionToMinBy + * The position in the data point to minimize + * @return The transformed DataStream. + */ + public SingleOutputStreamOperator minBy(int positionToMinBy) { + return this.minBy(positionToMinBy, true); + } + + /** + * Applies an aggregation that that gives the current element with the + * minimum value at the given position, if more elements have the minimum + * value at the given position, the operator returns either the first or + * last one, depending on the parameter set. + * + * @param positionToMinBy + * The position in the data point to minimize + * @param first + * If true, then the operator return the first element with the + * minimal value, otherwise returns the last + * @return The transformed DataStream. + */ + public SingleOutputStreamOperator minBy(int positionToMinBy, boolean first) { + checkFieldRange(positionToMinBy); + return aggregate(new MinByAggregationFunction(positionToMinBy, first)); + } + /** * Syntactic sugar for min(0) * @@ -574,6 +608,38 @@ public class DataStream { return aggregate(new MaxAggregationFunction(positionToMax)); } + /** + * Applies an aggregation that that gives the current element with the + * maximum value at the given position, if more elements have the maximum + * value at the given position, the operator returns the first one by + * default. + * + * @param positionToMaxBy + * The position in the data point to maximize + * @return The transformed DataStream. + */ + public SingleOutputStreamOperator maxBy(int positionToMaxBy) { + return this.maxBy(positionToMaxBy, true); + } + + /** + * Applies an aggregation that that gives the current element with the + * maximum value at the given position, if more elements have the maximum + * value at the given position, the operator returns either the first or + * last one, depending on the parameter set. + * + * @param positionToMaxBy + * The position in the data point to maximize. + * @param first + * If true, then the operator return the first element with the + * maximum value, otherwise returns the last + * @return The transformed DataStream. + */ + public SingleOutputStreamOperator maxBy(int positionToMaxBy, boolean first) { + checkFieldRange(positionToMaxBy); + return aggregate(new MaxByAggregationFunction(positionToMaxBy, first)); + } + /** * Syntactic sugar for max(0) * diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java index 3a61a350cae..af2f18606d5 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java @@ -75,6 +75,7 @@ public class GroupedDataStream extends DataStream { * The position in the data point to sum * @return The transformed DataStream. */ + @Override public SingleOutputStreamOperator sum(final int positionToSum) { return super.sum(positionToSum); } @@ -88,10 +89,45 @@ public class GroupedDataStream extends DataStream { * The position in the data point to minimize * @return The transformed DataStream. */ + @Override public SingleOutputStreamOperator min(final int positionToMin) { return super.min(positionToMin); } + /** + * Applies an aggregation that that gives the current element with the + * minimum value at the given position for each group on a grouped data + * stream. If more elements have the minimum value at the given position, + * the operator returns the first one by default. + * + * @param positionToMinBy + * The position in the data point to minimize + * @return The transformed DataStream. + */ + @Override + public SingleOutputStreamOperator minBy(int positionToMinBy) { + return super.minBy(positionToMinBy); + } + + /** + * Applies an aggregation that that gives the current element with the + * minimum value at the given position for each group on a grouped data + * stream. If more elements have the minimum value at the given position, + * the operator returns either the first or last one depending on the + * parameters. + * + * @param positionToMinBy + * The position in the data point to minimize + * @param first + * If true, then the operator return the first element with the + * maximum value, otherwise returns the last + * @return The transformed DataStream. + */ + @Override + public SingleOutputStreamOperator minBy(int positionToMinBy, boolean first) { + return super.minBy(positionToMinBy, first); + } + /** * Applies an aggregation that gives the maximum of the grouped data stream * at the given position, grouped by the given key position. Input values @@ -101,10 +137,45 @@ public class GroupedDataStream extends DataStream { * The position in the data point to maximize * @return The transformed DataStream. */ + @Override public SingleOutputStreamOperator max(final int positionToMax) { return super.max(positionToMax); } + /** + * Applies an aggregation that that gives the current element with the + * maximum value at the given position for each group on a grouped data + * stream. If more elements have the maximum value at the given position, + * the operator returns the first one by default. + * + * @param positionToMaxBy + * The position in the data point to maximize + * @return The transformed DataStream. + */ + @Override + public SingleOutputStreamOperator maxBy(int positionToMaxBy) { + return super.maxBy(positionToMaxBy); + } + + /** + * Applies an aggregation that that gives the current element with the + * maximum value at the given position for each group on a grouped data + * stream. If more elements have the maximum value at the given position, + * the operator returns either the first or last one depending on the + * parameters. + * + * @param positionToMaxBy + * The position in the data point to maximize + * @param first + * If true, then the operator return the first element with the + * maximum value, otherwise returns the last + * @return The transformed DataStream. + */ + @Override + public SingleOutputStreamOperator maxBy(int positionToMaxBy, boolean first) { + return super.maxBy(positionToMaxBy, first); + } + @Override protected SingleOutputStreamOperator aggregate(AggregationFunction aggregate) { diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxByAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxByAggregationFunction.java new file mode 100644 index 00000000000..274c8b6fa61 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxByAggregationFunction.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.aggregation; + +public class MaxByAggregationFunction extends MinByAggregationFunction { + + private static final long serialVersionUID = 1L; + + public MaxByAggregationFunction(int pos, boolean first) { + super(pos, first); + } + + @Override + public boolean isExtremal(Comparable o1, R o2) { + if (first) { + return o1.compareTo(o2) >= 0; + } else { + return o1.compareTo(o2) > 0; + } + + } +} diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinByAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinByAggregationFunction.java new file mode 100644 index 00000000000..a4a328c5cad --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinByAggregationFunction.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.aggregation; + +import org.apache.flink.api.java.tuple.Tuple; + +public class MinByAggregationFunction extends ComparableAggregationFunction { + + private static final long serialVersionUID = 1L; + protected boolean first; + + public MinByAggregationFunction(int pos, boolean first) { + super(pos); + this.first = first; + } + + @Override + public void compare(Tuple tuple1, Tuple tuple2) throws InstantiationException, + IllegalAccessException { + + Comparable o1 = tuple1.getField(position); + R o2 = tuple2.getField(position); + + if (isExtremal(o1, o2)) { + returnTuple = tuple1; + } else { + returnTuple = tuple2; + } + } + + @Override + public boolean isExtremal(Comparable o1, R o2) { + if (first) { + return o1.compareTo(o2) <= 0; + } else { + return o1.compareTo(o2) < 0; + } + + } +} diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java index d48f8ad6efc..1f86ce1a480 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java @@ -26,7 +26,9 @@ import java.util.List; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction; +import org.apache.flink.streaming.api.function.aggregation.MaxByAggregationFunction; import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction; +import org.apache.flink.streaming.api.function.aggregation.MinByAggregationFunction; import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction; import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable; import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable; @@ -49,7 +51,7 @@ public class AggregationFunctionTest { List> expectedGroupMaxList = new ArrayList>(); List simpleInput = new ArrayList(); - + int groupedSum0 = 0; int groupedSum1 = 0; int groupedSum2 = 0; @@ -86,16 +88,14 @@ public class AggregationFunctionTest { SumAggregationFunction> sumFunction = SumAggregationFunction .getSumFunction(1, Integer.class); @SuppressWarnings("unchecked") - SumAggregationFunction sumFunction0 = SumAggregationFunction - .getSumFunction(0, Integer.class); + SumAggregationFunction sumFunction0 = SumAggregationFunction.getSumFunction(0, + Integer.class); MinAggregationFunction> minFunction = new MinAggregationFunction>( 1); - MinAggregationFunction minFunction0 = new MinAggregationFunction( - 0); + MinAggregationFunction minFunction0 = new MinAggregationFunction(0); MaxAggregationFunction> maxFunction = new MaxAggregationFunction>( 1); - MaxAggregationFunction maxFunction0 = new MaxAggregationFunction( - 0); + MaxAggregationFunction maxFunction0 = new MaxAggregationFunction(0); List> sumList = MockInvokable.createAndExecute( new StreamReduceInvokable>(sumFunction), getInputList()); @@ -107,13 +107,16 @@ public class AggregationFunctionTest { new StreamReduceInvokable>(maxFunction), getInputList()); List> groupedSumList = MockInvokable.createAndExecute( - new GroupedReduceInvokable>(sumFunction, 0), getInputList()); + new GroupedReduceInvokable>(sumFunction, 0), + getInputList()); List> groupedMinList = MockInvokable.createAndExecute( - new GroupedReduceInvokable>(minFunction, 0), getInputList()); + new GroupedReduceInvokable>(minFunction, 0), + getInputList()); List> groupedMaxList = MockInvokable.createAndExecute( - new GroupedReduceInvokable>(maxFunction, 0), getInputList()); + new GroupedReduceInvokable>(maxFunction, 0), + getInputList()); assertEquals(expectedSumList, sumList); assertEquals(expectedMinList, minList); @@ -121,31 +124,171 @@ public class AggregationFunctionTest { assertEquals(expectedGroupSumList, groupedSumList); assertEquals(expectedGroupMinList, groupedMinList); assertEquals(expectedGroupMaxList, groupedMaxList); - assertEquals(expectedSumList0, MockInvokable.createAndExecute(new StreamReduceInvokable(sumFunction0),simpleInput )); - assertEquals(expectedMinList0, MockInvokable.createAndExecute(new StreamReduceInvokable(minFunction0),simpleInput )); - assertEquals(expectedMaxList0, MockInvokable.createAndExecute(new StreamReduceInvokable(maxFunction0),simpleInput )); - + assertEquals(expectedSumList0, MockInvokable.createAndExecute( + new StreamReduceInvokable(sumFunction0), simpleInput)); + assertEquals(expectedMinList0, MockInvokable.createAndExecute( + new StreamReduceInvokable(minFunction0), simpleInput)); + assertEquals(expectedMaxList0, MockInvokable.createAndExecute( + new StreamReduceInvokable(maxFunction0), simpleInput)); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); try { env.generateSequence(1, 100).min(1); fail(); } catch (Exception e) { - //Nothing to do here + // Nothing to do here } try { env.generateSequence(1, 100).min(2); fail(); } catch (Exception e) { - //Nothing to do here + // Nothing to do here } try { env.generateSequence(1, 100).min(3); fail(); } catch (Exception e) { - //Nothing to do here + // Nothing to do here } + MaxByAggregationFunction> maxByFunctionFirst = new MaxByAggregationFunction>( + 0, true); + MaxByAggregationFunction> maxByFunctionLast = new MaxByAggregationFunction>( + 0, false); + + MinByAggregationFunction> minByFunctionFirst = new MinByAggregationFunction>( + 0, true); + MinByAggregationFunction> minByFunctionLast = new MinByAggregationFunction>( + 0, false); + + List> maxByFirstExpected = new ArrayList>(); + maxByFirstExpected.add(new Tuple2(0, 0)); + maxByFirstExpected.add(new Tuple2(1, 1)); + maxByFirstExpected.add(new Tuple2(2, 2)); + maxByFirstExpected.add(new Tuple2(2, 2)); + maxByFirstExpected.add(new Tuple2(2, 2)); + maxByFirstExpected.add(new Tuple2(2, 2)); + maxByFirstExpected.add(new Tuple2(2, 2)); + maxByFirstExpected.add(new Tuple2(2, 2)); + maxByFirstExpected.add(new Tuple2(2, 2)); + + List> maxByLastExpected = new ArrayList>(); + maxByLastExpected.add(new Tuple2(0, 0)); + maxByLastExpected.add(new Tuple2(1, 1)); + maxByLastExpected.add(new Tuple2(2, 2)); + maxByLastExpected.add(new Tuple2(2, 2)); + maxByLastExpected.add(new Tuple2(2, 2)); + maxByLastExpected.add(new Tuple2(2, 5)); + maxByLastExpected.add(new Tuple2(2, 5)); + maxByLastExpected.add(new Tuple2(2, 5)); + maxByLastExpected.add(new Tuple2(2, 8)); + + List> minByFirstExpected = new ArrayList>(); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + + List> minByLastExpected = new ArrayList>(); + minByLastExpected.add(new Tuple2(0, 0)); + minByLastExpected.add(new Tuple2(0, 0)); + minByLastExpected.add(new Tuple2(0, 0)); + minByLastExpected.add(new Tuple2(0, 3)); + minByLastExpected.add(new Tuple2(0, 3)); + minByLastExpected.add(new Tuple2(0, 3)); + minByLastExpected.add(new Tuple2(0, 6)); + minByLastExpected.add(new Tuple2(0, 6)); + minByLastExpected.add(new Tuple2(0, 6)); + + assertEquals(maxByFirstExpected, MockInvokable.createAndExecute( + new StreamReduceInvokable>(maxByFunctionFirst), + getInputList())); + assertEquals(maxByLastExpected, MockInvokable.createAndExecute( + new StreamReduceInvokable>(maxByFunctionLast), + getInputList())); + assertEquals(minByLastExpected, MockInvokable.createAndExecute( + new StreamReduceInvokable>(minByFunctionLast), + getInputList())); + assertEquals(minByFirstExpected, MockInvokable.createAndExecute( + new StreamReduceInvokable>(minByFunctionFirst), + getInputList())); + + } + + @Test + public void minMaxByTest() { + + MaxByAggregationFunction> maxByFunctionFirst = new MaxByAggregationFunction>( + 0, true); + MaxByAggregationFunction> maxByFunctionLast = new MaxByAggregationFunction>( + 0, false); + + MinByAggregationFunction> minByFunctionFirst = new MinByAggregationFunction>( + 0, true); + MinByAggregationFunction> minByFunctionLast = new MinByAggregationFunction>( + 0, false); + + List> maxByFirstExpected = new ArrayList>(); + maxByFirstExpected.add(new Tuple2(0, 0)); + maxByFirstExpected.add(new Tuple2(1, 1)); + maxByFirstExpected.add(new Tuple2(2, 2)); + maxByFirstExpected.add(new Tuple2(2, 2)); + maxByFirstExpected.add(new Tuple2(2, 2)); + maxByFirstExpected.add(new Tuple2(2, 2)); + maxByFirstExpected.add(new Tuple2(2, 2)); + maxByFirstExpected.add(new Tuple2(2, 2)); + maxByFirstExpected.add(new Tuple2(2, 2)); + + List> maxByLastExpected = new ArrayList>(); + maxByLastExpected.add(new Tuple2(0, 0)); + maxByLastExpected.add(new Tuple2(1, 1)); + maxByLastExpected.add(new Tuple2(2, 2)); + maxByLastExpected.add(new Tuple2(2, 2)); + maxByLastExpected.add(new Tuple2(2, 2)); + maxByLastExpected.add(new Tuple2(2, 5)); + maxByLastExpected.add(new Tuple2(2, 5)); + maxByLastExpected.add(new Tuple2(2, 5)); + maxByLastExpected.add(new Tuple2(2, 8)); + + List> minByFirstExpected = new ArrayList>(); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + minByFirstExpected.add(new Tuple2(0, 0)); + + List> minByLastExpected = new ArrayList>(); + minByLastExpected.add(new Tuple2(0, 0)); + minByLastExpected.add(new Tuple2(0, 0)); + minByLastExpected.add(new Tuple2(0, 0)); + minByLastExpected.add(new Tuple2(0, 3)); + minByLastExpected.add(new Tuple2(0, 3)); + minByLastExpected.add(new Tuple2(0, 3)); + minByLastExpected.add(new Tuple2(0, 6)); + minByLastExpected.add(new Tuple2(0, 6)); + minByLastExpected.add(new Tuple2(0, 6)); + + assertEquals(maxByFirstExpected, MockInvokable.createAndExecute( + new StreamReduceInvokable>(maxByFunctionFirst), + getInputList())); + assertEquals(maxByLastExpected, MockInvokable.createAndExecute( + new StreamReduceInvokable>(maxByFunctionLast), + getInputList())); + assertEquals(minByLastExpected, MockInvokable.createAndExecute( + new StreamReduceInvokable>(minByFunctionLast), + getInputList())); + assertEquals(minByFirstExpected, MockInvokable.createAndExecute( + new StreamReduceInvokable>(minByFunctionFirst), + getInputList())); } private List> getInputList() { -- GitLab