From 114af5a3af83868f28d983f3e0cea23e7c766f86 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Mon, 22 Sep 2014 17:54:34 +0200 Subject: [PATCH] [FLINK-1110] Add execution on collections for flatMap --- .../operators/base/FlatMapOperatorBase.java | 31 +++++- ...latMapOperatorCollectionExecutionTest.java | 102 ++++++++++++++++++ 2 files changed, 130 insertions(+), 3 deletions(-) create mode 100644 flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionExecutionTest.java diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java index 1f80008ffcb..81f3bcf507e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java @@ -16,16 +16,20 @@ * limitations under the License. */ - package org.apache.flink.api.common.operators.base; import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.functions.util.ListCollector; import org.apache.flink.api.common.operators.SingleInputOperator; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; import org.apache.flink.api.common.operators.util.UserCodeWrapper; +import java.util.ArrayList; +import java.util.List; /** * @see org.apache.flink.api.common.functions.FlatMapFunction @@ -35,12 +39,33 @@ public class FlatMapOperatorBase> e public FlatMapOperatorBase(UserCodeWrapper udf, UnaryOperatorInformation operatorInfo, String name) { super(udf, operatorInfo, name); } - + public FlatMapOperatorBase(FT udf, UnaryOperatorInformation operatorInfo, String name) { super(new UserCodeObjectWrapper(udf), operatorInfo, name); } - + public FlatMapOperatorBase(Class udf, UnaryOperatorInformation operatorInfo, String name) { super(new UserCodeClassWrapper(udf), operatorInfo, name); } + + // ------------------------------------------------------------------------ + + @Override + protected List executeOnCollections(List input, RuntimeContext ctx) throws Exception { + FlatMapFunction function = userFunction.getUserCodeObject(); + + FunctionUtils.setFunctionRuntimeContext(function, ctx); + FunctionUtils.openFunction(function, parameters); + + ArrayList result = new ArrayList(input.size()); + ListCollector resultCollector = new ListCollector(result); + + for (IN element : input) { + function.flatMap(element, resultCollector); + } + + FunctionUtils.closeFunction(function); + + return result; + } } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionExecutionTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionExecutionTest.java new file mode 100644 index 00000000000..72b555ab267 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionExecutionTest.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.operators.base; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.RuntimeUDFContext; +import org.apache.flink.api.common.operators.UnaryOperatorInformation; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; +import org.junit.Assert; +import org.junit.Test; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class FlatMapOperatorCollectionExecutionTest implements Serializable { + + @Test + public void testExecuteOnCollection() { + try { + IdRichFlatMap udf = new IdRichFlatMap(); + testExecuteOnCollection(udf, Arrays.asList("f", "l", "i", "n", "k")); + Assert.assertTrue(udf.isClosed); + + testExecuteOnCollection(new IdRichFlatMap(), new ArrayList()); + } catch (Throwable t) { + Assert.fail(t.getMessage()); + } + } + + private void testExecuteOnCollection(FlatMapFunction udf, List input) throws Exception { + // run on collections + final List result = getTestFlatMapOperator(udf) + .executeOnCollections(input, new RuntimeUDFContext("Test UDF", 4, 0)); + + Assert.assertEquals(input.size(), result.size()); + + for (int i = 0; i < input.size(); i++) { + Assert.assertEquals(input.get(i), result.get(i)); + } + } + + public class IdRichFlatMap extends RichFlatMapFunction { + + private boolean isOpened = false; + private boolean isClosed = false; + + @Override + public void open(Configuration parameters) throws Exception { + isOpened = true; + + RuntimeContext ctx = getRuntimeContext(); + Assert.assertEquals("Test UDF", ctx.getTaskName()); + Assert.assertEquals(4, ctx.getNumberOfParallelSubtasks()); + Assert.assertEquals(0, ctx.getIndexOfThisSubtask()); + } + + @Override + public void flatMap(IN value, Collector out) throws Exception { + Assert.assertTrue(isOpened); + Assert.assertFalse(isClosed); + + out.collect(value); + } + + @Override + public void close() throws Exception { + isClosed = true; + } + } + + private FlatMapOperatorBase> getTestFlatMapOperator( + FlatMapFunction udf) { + + UnaryOperatorInformation typeInfo = new UnaryOperatorInformation( + BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + + return new FlatMapOperatorBase>( + udf, typeInfo, "flatMap on Collections"); + } +} -- GitLab