提交 84bc21e3 编写于 作者: S Stephan Ewen

[FLINK-1110] Implement collection-based Cross

上级 bb71b239
......@@ -16,21 +16,24 @@
* limitations under the License.
*/
package org.apache.flink.api.common.operators.base;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
import org.apache.flink.api.common.operators.DualInputOperator;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
/**
* @see org.apache.flink.api.common.functions.CrossFunction
*/
public class CrossOperatorBase<IN1, IN2, OUT, FT extends CrossFunction<?, ?, ?>> extends DualInputOperator<IN1, IN2, OUT, FT> {
public class CrossOperatorBase<IN1, IN2, OUT, FT extends CrossFunction<IN1, IN2, OUT>> extends DualInputOperator<IN1, IN2, OUT, FT> {
public CrossOperatorBase(UserCodeWrapper<FT> udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, String name) {
super(udf, operatorInfo, name);
......@@ -44,6 +47,27 @@ public class CrossOperatorBase<IN1, IN2, OUT, FT extends CrossFunction<?, ?, ?>>
this(new UserCodeClassWrapper<FT>(udf), operatorInfo, name);
}
// --------------------------------------------------------------------------------------------
@Override
protected List<OUT> executeOnCollections(List<IN1> inputData1, List<IN2> inputData2, RuntimeContext ctx) throws Exception {
CrossFunction<IN1, IN2, OUT> function = this.userFunction.getUserCodeObject();
FunctionUtils.setFunctionRuntimeContext(function, ctx);
FunctionUtils.openFunction(function, this.parameters);
ArrayList<OUT> result = new ArrayList<OUT>(inputData1.size() * inputData2.size());
for (IN1 element1 : inputData1) {
for (IN2 element2 : inputData2) {
result.add(function.cross(element1, element2));
}
}
FunctionUtils.closeFunction(function);
return result;
}
// --------------------------------------------------------------------------------------------
/**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册