From 3eac6f239cc0c6fe0e7fae6718215b626446df44 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Mon, 22 Sep 2014 23:23:17 +0200 Subject: [PATCH] [FLINK-1110] Implement collection-based execution for mapPartition. Make groupReduce code compliant with pre-java-8 versions, fix java8 tests with moved type information classes. Fix Various warnings. --- .../base/CollectorMapOperatorBase.java | 11 ++- .../base/GroupReduceOperatorBase.java | 6 +- .../base/MapPartitionOperatorBase.java | 25 ++++++ ...latMapOperatorCollectionExecutionTest.java | 2 + .../base/PartitionMapOperatorTest.java | 89 +++++++++++++++++++ .../CollectionExecutionAccumulatorsTest.java | 75 ++++++++++++++++ .../base/GroupReduceOperatorTest.java | 2 - .../api/java/operator/MaxByOperatorTest.java | 3 +- .../api/java/operator/MinByOperatorTest.java | 2 +- .../lambdas/ReduceITCase.java | 2 +- 10 files changed, 208 insertions(+), 9 deletions(-) create mode 100644 flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java create mode 100644 flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java index eb246becf0f..c3e00bc0ffc 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java @@ -18,14 +18,16 @@ package org.apache.flink.api.common.operators.base; +import java.util.List; + import org.apache.flink.api.common.functions.GenericCollectorMap; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.operators.SingleInputOperator; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; import org.apache.flink.api.common.operators.util.UserCodeWrapper; - /** * The CollectorMap is the old version of the Map operator. It is effectively a "flatMap", where the * UDF is called "map". @@ -46,4 +48,11 @@ public class CollectorMapOperatorBase udf, UnaryOperatorInformation operatorInfo, String name) { super(new UserCodeClassWrapper(udf), operatorInfo, name); } + + // -------------------------------------------------------------------------------------------- + + @Override + protected List executeOnCollections(List inputData, RuntimeContext ctx) { + throw new UnsupportedOperationException(); + } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java index b3097062896..6abd7b52377 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java @@ -37,6 +37,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeComparator; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -144,6 +145,7 @@ public class GroupReduceOperatorBase inputComparator = ((CompositeType) inputType).createComparator(inputColumns, inputOrderings); @@ -154,10 +156,10 @@ public class GroupReduceOperatorBase result = new ArrayList(inputData.size()); ListCollector collector = new ListCollector(result); - inputData.sort( new Comparator() { + Collections.sort(inputData, new Comparator() { @Override public int compare(IN o1, IN o2) { - return - inputComparator.compare(o1, o2); + return inputComparator.compare(o2, o1); } }); ListKeyGroupedIterator keyedIterator = diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java index e2c3a088557..21fa9be6cf2 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java @@ -18,7 +18,13 @@ package org.apache.flink.api.common.operators.base; +import java.util.ArrayList; +import java.util.List; + import org.apache.flink.api.common.functions.MapPartitionFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.functions.util.ListCollector; import org.apache.flink.api.common.operators.SingleInputOperator; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; @@ -44,4 +50,23 @@ public class MapPartitionOperatorBase udf, UnaryOperatorInformation operatorInfo, String name) { super(new UserCodeClassWrapper(udf), operatorInfo, name); } + + // -------------------------------------------------------------------------------------------- + + @Override + protected List executeOnCollections(List inputData, RuntimeContext ctx) throws Exception { + MapPartitionFunction function = this.userFunction.getUserCodeObject(); + + FunctionUtils.setFunctionRuntimeContext(function, ctx); + FunctionUtils.openFunction(function, this.parameters); + + ArrayList result = new ArrayList(inputData.size() / 4); + ListCollector resultCollector = new ListCollector(result); + + function.mapPartition(inputData, resultCollector); + result.trimToSize(); + + FunctionUtils.closeFunction(function); + return result; + } } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionExecutionTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionExecutionTest.java index 72b555ab267..ba51ed811d0 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionExecutionTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionExecutionTest.java @@ -34,6 +34,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +@SuppressWarnings("serial") public class FlatMapOperatorCollectionExecutionTest implements Serializable { @Test @@ -61,6 +62,7 @@ public class FlatMapOperatorCollectionExecutionTest implements Serializable { } } + public class IdRichFlatMap extends RichFlatMapFunction { private boolean isOpened = false; diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java new file mode 100644 index 00000000000..0657ac10df0 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.operators.base; + +import static org.junit.Assert.*; +import static java.util.Arrays.asList; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.flink.util.Collector; +import org.apache.flink.api.common.functions.MapPartitionFunction; +import org.apache.flink.api.common.functions.RichMapPartitionFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.RuntimeUDFContext; +import org.apache.flink.api.common.operators.UnaryOperatorInformation; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.junit.Test; + +@SuppressWarnings("serial") +public class PartitionMapOperatorTest implements java.io.Serializable { + + @Test + public void testMapPartitionWithRuntimeContext() { + try { + final String taskName = "Test Task"; + final AtomicBoolean opened = new AtomicBoolean(); + final AtomicBoolean closed = new AtomicBoolean(); + + final MapPartitionFunction parser = new RichMapPartitionFunction() { + + @Override + public void open(Configuration parameters) throws Exception { + opened.set(true); + RuntimeContext ctx = getRuntimeContext(); + assertEquals(0, ctx.getIndexOfThisSubtask()); + assertEquals(1, ctx.getNumberOfParallelSubtasks()); + assertEquals(taskName, ctx.getTaskName()); + } + + @Override + public void mapPartition(Iterable values, Collector out) { + for (String s : values) { + out.collect(Integer.parseInt(s)); + } + } + + @Override + public void close() throws Exception { + closed.set(true); + } + }; + + MapPartitionOperatorBase> op = + new MapPartitionOperatorBase>( + parser, new UnaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), taskName); + + List input = new ArrayList(asList("1", "2", "3", "4", "5", "6")); + List result = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0)); + + assertEquals(asList(1, 2, 3, 4, 5, 6), result); + + assertTrue(opened.get()); + assertTrue(closed.get()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java new file mode 100644 index 00000000000..7c89f1325dc --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.operators; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.accumulators.IntCounter; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.CollectionEnvironment; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOuputFormat; +import org.apache.flink.configuration.Configuration; +import org.junit.Test; + +public class CollectionExecutionAccumulatorsTest { + + private static final String ACCUMULATOR_NAME = "TEST ACC"; + + @Test + public void testAccumulator() { + try { + final int NUM_ELEMENTS = 100; + + ExecutionEnvironment env = new CollectionEnvironment(); + + env.generateSequence(1, NUM_ELEMENTS) + .map(new CountingMapper()) + .output(new DiscardingOuputFormat()); + + JobExecutionResult result = env.execute(); + + assertTrue(result.getNetRuntime() >= 0); + + assertEquals(NUM_ELEMENTS, result.getAccumulatorResult(ACCUMULATOR_NAME)); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @SuppressWarnings("serial") + public static class CountingMapper extends RichMapFunction { + + private IntCounter accumulator; + + @Override + public void open(Configuration parameters) { + accumulator = getRuntimeContext().getIntCounter(ACCUMULATOR_NAME); + } + + @Override + public Long map(Long value) { + accumulator.add(1); + return value; + } + } +} diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java index 6c655ce7acc..f77b2926017 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java @@ -19,9 +19,7 @@ package org.apache.flink.api.common.operators.base; import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichGroupReduceFunction; -import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.RuntimeUDFContext; import org.apache.flink.api.common.operators.UnaryOperatorInformation; diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java index 76470b6fb74..2af8a8c3918 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java @@ -21,8 +21,6 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; -import junit.framework.Assert; - import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.DataSet; @@ -30,6 +28,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.UnsortedGrouping; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.junit.Assert; import org.junit.Test; public class MaxByOperatorTest { diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java index 72e91b63614..5d9c938a7ce 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java @@ -21,7 +21,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java index 52c215f184d..9fdb83708c8 100644 --- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java +++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java @@ -69,7 +69,7 @@ public class ReduceITCase extends JavaProgramTestBase { BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO + BasicTypeInfo.LONG_TYPE_INFO ); return env.fromCollection(data, type); -- GitLab