提交 86ca6927 编写于 作者: A Aljoscha Krettek

Change translateToDataflow to return Operator

Before, translateToDataflow of SingleInputOperator could only return
a single input operator of the lower layer, same for TwoInputOperator.

This change allows translateToDataflow to return more kinds of
operators.
上级 5232c56b
......@@ -111,7 +111,7 @@ public class OperatorTranslation {
}
private <I, O> org.apache.flink.api.common.operators.SingleInputOperator<?, O, ?> translateSingleInputOperator(SingleInputOperator<?, ?, ?> op) {
private <I, O> org.apache.flink.api.common.operators.Operator<O> translateSingleInputOperator(SingleInputOperator<?, ?, ?> op) {
@SuppressWarnings("unchecked")
SingleInputOperator<I, O, ?> typedOp = (SingleInputOperator<I, O, ?>) op;
......@@ -121,26 +121,30 @@ public class OperatorTranslation {
Operator<I> input = translate(typedInput);
org.apache.flink.api.common.operators.SingleInputOperator<?, O, ?> dataFlowOp = typedOp.translateToDataFlow(input);
org.apache.flink.api.common.operators.Operator<O> dataFlowOp = typedOp.translateToDataFlow(input);
if (op instanceof UdfOperator<?> ) {
if (op instanceof UdfOperator<?>) {
@SuppressWarnings("unchecked")
SingleInputUdfOperator<I, O, ?> udfOp = (SingleInputUdfOperator<I, O, ?>) op;
// set configuration parameters
Configuration opParams = udfOp.getParameters();
if (opParams != null) {
dataFlowOp.getParameters().addAll(opParams);
}
// set the semantic properties
dataFlowOp.setSemanticProperties(udfOp.getSemanticProperties());
if (dataFlowOp instanceof org.apache.flink.api.common.operators.SingleInputOperator) {
org.apache.flink.api.common.operators.SingleInputOperator<?, O, ?> unaryOp =
(org.apache.flink.api.common.operators.SingleInputOperator<?, O, ?>) dataFlowOp;
// set the semantic properties
unaryOp.setSemanticProperties(udfOp.getSemanticProperties());
}
}
return dataFlowOp;
}
private <I1, I2, O> org.apache.flink.api.common.operators.DualInputOperator<?, ?, O, ?> translateTwoInputOperator(TwoInputOperator<?, ?, ?, ?> op) {
private <I1, I2, O> org.apache.flink.api.common.operators.Operator<O> translateTwoInputOperator(TwoInputOperator<?, ?, ?, ?> op) {
@SuppressWarnings("unchecked")
TwoInputOperator<I1, I2, O, ?> typedOp = (TwoInputOperator<I1, I2, O, ?>) op;
......@@ -153,7 +157,7 @@ public class OperatorTranslation {
Operator<I1> input1 = translate(typedInput1);
Operator<I2> input2 = translate(typedInput2);
org.apache.flink.api.common.operators.DualInputOperator<?, ?, O, ?> dataFlowOp = typedOp.translateToDataFlow(input1, input2);
org.apache.flink.api.common.operators.Operator<O> dataFlowOp = typedOp.translateToDataFlow(input1, input2);
if (op instanceof UdfOperator<?> ) {
@SuppressWarnings("unchecked")
......@@ -164,9 +168,13 @@ public class OperatorTranslation {
if (opParams != null) {
dataFlowOp.getParameters().addAll(opParams);
}
// set the semantic properties
dataFlowOp.setSemanticProperties(udfOp.getSemanticProperties());
if (dataFlowOp instanceof org.apache.flink.api.common.operators.DualInputOperator) {
org.apache.flink.api.common.operators.DualInputOperator<?, ?, O, ?> binaryOp =
(org.apache.flink.api.common.operators.DualInputOperator<?, ?, O, ?>) dataFlowOp;
// set the semantic properties
binaryOp.setSemanticProperties(udfOp.getSemanticProperties());
}
}
return dataFlowOp;
......
......@@ -62,7 +62,7 @@ public abstract class SingleInputOperator<IN, OUT, O extends SingleInputOperator
* @param input The data flow operator that produces this operation's input data.
* @return The translated data flow operator.
*/
protected abstract org.apache.flink.api.common.operators.SingleInputOperator<?, OUT, ?> translateToDataFlow(
protected abstract org.apache.flink.api.common.operators.Operator<OUT> translateToDataFlow(
org.apache.flink.api.common.operators.Operator<IN> input);
}
......@@ -87,6 +87,6 @@ public abstract class TwoInputOperator<IN1, IN2, OUT, O extends TwoInputOperator
* @param input2 The second input of the operation, as a common API operator.
* @return The created common API operator.
*/
protected abstract org.apache.flink.api.common.operators.DualInputOperator<?, ?, OUT, ?> translateToDataFlow(
protected abstract org.apache.flink.api.common.operators.Operator<OUT> translateToDataFlow(
org.apache.flink.api.common.operators.Operator<IN1> input1, org.apache.flink.api.common.operators.Operator<IN2> input2);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册