From 84bc21e36e85fab147118959009c379735703351 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Mon, 22 Sep 2014 19:18:33 +0200 Subject: [PATCH] [FLINK-1110] Implement collection-based Cross --- .../operators/base/CrossOperatorBase.java | 30 +++++++++++++++++-- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java index 4783fe3975e..c70460a8ceb 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java @@ -16,21 +16,24 @@ * limitations under the License. */ - package org.apache.flink.api.common.operators.base; +import java.util.ArrayList; +import java.util.List; + import org.apache.flink.api.common.functions.CrossFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.operators.BinaryOperatorInformation; import org.apache.flink.api.common.operators.DualInputOperator; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; import org.apache.flink.api.common.operators.util.UserCodeWrapper; - /** * @see org.apache.flink.api.common.functions.CrossFunction */ -public class CrossOperatorBase> extends DualInputOperator { +public class CrossOperatorBase> extends DualInputOperator { public CrossOperatorBase(UserCodeWrapper udf, BinaryOperatorInformation operatorInfo, String name) { super(udf, operatorInfo, name); @@ -44,6 +47,27 @@ public class CrossOperatorBase> this(new UserCodeClassWrapper(udf), operatorInfo, name); } + // -------------------------------------------------------------------------------------------- + + @Override + protected List executeOnCollections(List inputData1, List inputData2, RuntimeContext ctx) throws Exception { + CrossFunction function = this.userFunction.getUserCodeObject(); + + FunctionUtils.setFunctionRuntimeContext(function, ctx); + FunctionUtils.openFunction(function, this.parameters); + + ArrayList result = new ArrayList(inputData1.size() * inputData2.size()); + for (IN1 element1 : inputData1) { + for (IN2 element2 : inputData2) { + result.add(function.cross(element1, element2)); + } + } + + FunctionUtils.closeFunction(function); + + return result; + } + // -------------------------------------------------------------------------------------------- /** -- GitLab