diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java index 3570402bed5d565d744dcea408d91953c0f95c73..68a216baf82e5f52c6da45666f886ee808d10c93 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java @@ -111,7 +111,7 @@ public class OperatorTranslation { } - private org.apache.flink.api.common.operators.SingleInputOperator translateSingleInputOperator(SingleInputOperator op) { + private org.apache.flink.api.common.operators.Operator translateSingleInputOperator(SingleInputOperator op) { @SuppressWarnings("unchecked") SingleInputOperator typedOp = (SingleInputOperator) op; @@ -121,26 +121,30 @@ public class OperatorTranslation { Operator input = translate(typedInput); - org.apache.flink.api.common.operators.SingleInputOperator dataFlowOp = typedOp.translateToDataFlow(input); + org.apache.flink.api.common.operators.Operator dataFlowOp = typedOp.translateToDataFlow(input); - if (op instanceof UdfOperator ) { + if (op instanceof UdfOperator) { @SuppressWarnings("unchecked") SingleInputUdfOperator udfOp = (SingleInputUdfOperator) op; - + // set configuration parameters Configuration opParams = udfOp.getParameters(); if (opParams != null) { dataFlowOp.getParameters().addAll(opParams); } - - // set the semantic properties - dataFlowOp.setSemanticProperties(udfOp.getSemanticProperties()); + + if (dataFlowOp instanceof org.apache.flink.api.common.operators.SingleInputOperator) { + org.apache.flink.api.common.operators.SingleInputOperator unaryOp = + (org.apache.flink.api.common.operators.SingleInputOperator) dataFlowOp; + // set the semantic properties + unaryOp.setSemanticProperties(udfOp.getSemanticProperties()); + } } return dataFlowOp; } - private org.apache.flink.api.common.operators.DualInputOperator translateTwoInputOperator(TwoInputOperator op) { + private org.apache.flink.api.common.operators.Operator translateTwoInputOperator(TwoInputOperator op) { @SuppressWarnings("unchecked") TwoInputOperator typedOp = (TwoInputOperator) op; @@ -153,7 +157,7 @@ public class OperatorTranslation { Operator input1 = translate(typedInput1); Operator input2 = translate(typedInput2); - org.apache.flink.api.common.operators.DualInputOperator dataFlowOp = typedOp.translateToDataFlow(input1, input2); + org.apache.flink.api.common.operators.Operator dataFlowOp = typedOp.translateToDataFlow(input1, input2); if (op instanceof UdfOperator ) { @SuppressWarnings("unchecked") @@ -164,9 +168,13 @@ public class OperatorTranslation { if (opParams != null) { dataFlowOp.getParameters().addAll(opParams); } - - // set the semantic properties - dataFlowOp.setSemanticProperties(udfOp.getSemanticProperties()); + + if (dataFlowOp instanceof org.apache.flink.api.common.operators.DualInputOperator) { + org.apache.flink.api.common.operators.DualInputOperator binaryOp = + (org.apache.flink.api.common.operators.DualInputOperator) dataFlowOp; + // set the semantic properties + binaryOp.setSemanticProperties(udfOp.getSemanticProperties()); + } } return dataFlowOp; diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputOperator.java index 12cd6ff328327e671e4815aee7a935a592829dda..3f9b120986bd82244a258df3c6446086298e10cc 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputOperator.java @@ -62,7 +62,7 @@ public abstract class SingleInputOperator translateToDataFlow( + protected abstract org.apache.flink.api.common.operators.Operator translateToDataFlow( org.apache.flink.api.common.operators.Operator input); } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputOperator.java index 9fb94f1db3e48ce90e5472586d735732112c75ee..b46aa7731e7504e867b94aba9ef6701599c32cca 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputOperator.java @@ -87,6 +87,6 @@ public abstract class TwoInputOperator translateToDataFlow( + protected abstract org.apache.flink.api.common.operators.Operator translateToDataFlow( org.apache.flink.api.common.operators.Operator input1, org.apache.flink.api.common.operators.Operator input2); }