From 86ca692775b2e112276663468aeca5d201be3d9d Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Fri, 3 Oct 2014 18:25:15 +0200 Subject: [PATCH] Change translateToDataflow to return Operator Before, translateToDataflow of SingleInputOperator could only return a single input operator of the lower layer, same for TwoInputOperator. This change allows translateToDataflow to return more kinds of operators. --- .../java/operators/OperatorTranslation.java | 32 ++++++++++++------- .../java/operators/SingleInputOperator.java | 2 +- .../api/java/operators/TwoInputOperator.java | 2 +- 3 files changed, 22 insertions(+), 14 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java index 3570402bed5..68a216baf82 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java @@ -111,7 +111,7 @@ public class OperatorTranslation { } - private org.apache.flink.api.common.operators.SingleInputOperator translateSingleInputOperator(SingleInputOperator op) { + private org.apache.flink.api.common.operators.Operator translateSingleInputOperator(SingleInputOperator op) { @SuppressWarnings("unchecked") SingleInputOperator typedOp = (SingleInputOperator) op; @@ -121,26 +121,30 @@ public class OperatorTranslation { Operator input = translate(typedInput); - org.apache.flink.api.common.operators.SingleInputOperator dataFlowOp = typedOp.translateToDataFlow(input); + org.apache.flink.api.common.operators.Operator dataFlowOp = typedOp.translateToDataFlow(input); - if (op instanceof UdfOperator ) { + if (op instanceof UdfOperator) { @SuppressWarnings("unchecked") SingleInputUdfOperator udfOp = (SingleInputUdfOperator) op; - + // set configuration parameters Configuration opParams = udfOp.getParameters(); if (opParams != null) { dataFlowOp.getParameters().addAll(opParams); } - - // set the semantic properties - dataFlowOp.setSemanticProperties(udfOp.getSemanticProperties()); + + if (dataFlowOp instanceof org.apache.flink.api.common.operators.SingleInputOperator) { + org.apache.flink.api.common.operators.SingleInputOperator unaryOp = + (org.apache.flink.api.common.operators.SingleInputOperator) dataFlowOp; + // set the semantic properties + unaryOp.setSemanticProperties(udfOp.getSemanticProperties()); + } } return dataFlowOp; } - private org.apache.flink.api.common.operators.DualInputOperator translateTwoInputOperator(TwoInputOperator op) { + private org.apache.flink.api.common.operators.Operator translateTwoInputOperator(TwoInputOperator op) { @SuppressWarnings("unchecked") TwoInputOperator typedOp = (TwoInputOperator) op; @@ -153,7 +157,7 @@ public class OperatorTranslation { Operator input1 = translate(typedInput1); Operator input2 = translate(typedInput2); - org.apache.flink.api.common.operators.DualInputOperator dataFlowOp = typedOp.translateToDataFlow(input1, input2); + org.apache.flink.api.common.operators.Operator dataFlowOp = typedOp.translateToDataFlow(input1, input2); if (op instanceof UdfOperator ) { @SuppressWarnings("unchecked") @@ -164,9 +168,13 @@ public class OperatorTranslation { if (opParams != null) { dataFlowOp.getParameters().addAll(opParams); } - - // set the semantic properties - dataFlowOp.setSemanticProperties(udfOp.getSemanticProperties()); + + if (dataFlowOp instanceof org.apache.flink.api.common.operators.DualInputOperator) { + org.apache.flink.api.common.operators.DualInputOperator binaryOp = + (org.apache.flink.api.common.operators.DualInputOperator) dataFlowOp; + // set the semantic properties + binaryOp.setSemanticProperties(udfOp.getSemanticProperties()); + } } return dataFlowOp; diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputOperator.java index 12cd6ff3283..3f9b120986b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputOperator.java @@ -62,7 +62,7 @@ public abstract class SingleInputOperator translateToDataFlow( + protected abstract org.apache.flink.api.common.operators.Operator translateToDataFlow( org.apache.flink.api.common.operators.Operator input); } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputOperator.java index 9fb94f1db3e..b46aa7731e7 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputOperator.java @@ -87,6 +87,6 @@ public abstract class TwoInputOperator translateToDataFlow( + protected abstract org.apache.flink.api.common.operators.Operator translateToDataFlow( org.apache.flink.api.common.operators.Operator input1, org.apache.flink.api.common.operators.Operator input2); } -- GitLab