diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java index de221bdd8ed1547ca9205202cde187f08bcacd70..6bf9cdaba856a609214c733d4054eb55bdf674aa 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java @@ -47,7 +47,7 @@ public class BatchExecutor extends ExecutorBase { public Pipeline createPipeline( List> transformations, TableConfig tableConfig, String jobName) { StreamExecutionEnvironment execEnv = getExecutionEnvironment(); - ExecutorUtils.setBatchProperties(execEnv, tableConfig); + ExecutorUtils.setBatchProperties(execEnv); StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(execEnv, transformations); streamGraph.setJobName(getNonEmptyJobName(jobName)); ExecutorUtils.setBatchProperties(streamGraph, tableConfig); diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java index 7f3db8c4b64134025c58fc8c2bef324b4c572a4c..e6a7e0c58f0c0fe75e236539e16826e9f3334263 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java @@ -19,7 +19,6 @@ package org.apache.flink.table.planner.utils; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.InputDependencyConstraint; import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.dag.Transformation; import org.apache.flink.runtime.jobgraph.ScheduleMode; @@ -53,16 +52,12 @@ public class ExecutorUtils { } /** Sets batch properties for {@link StreamExecutionEnvironment}. */ - public static void setBatchProperties( - StreamExecutionEnvironment execEnv, TableConfig tableConfig) { + public static void setBatchProperties(StreamExecutionEnvironment execEnv) { ExecutionConfig executionConfig = execEnv.getConfig(); executionConfig.enableObjectReuse(); executionConfig.setLatencyTrackingInterval(-1); execEnv.getConfig().setAutoWatermarkInterval(0); execEnv.setBufferTimeout(-1); - if (isShuffleModeAllBlocking(tableConfig)) { - executionConfig.setDefaultInputDependencyConstraint(InputDependencyConstraint.ALL); - } } /** Sets batch properties for {@link StreamGraph}. */ @@ -80,10 +75,6 @@ public class ExecutorUtils { streamGraph.setGlobalDataExchangeMode(getGlobalDataExchangeMode(tableConfig)); } - private static boolean isShuffleModeAllBlocking(TableConfig tableConfig) { - return getGlobalDataExchangeMode(tableConfig) == GlobalDataExchangeMode.ALL_EDGES_BLOCKING; - } - private static GlobalDataExchangeMode getGlobalDataExchangeMode(TableConfig tableConfig) { return ShuffleModeUtils.getShuffleModeAsGlobalDataExchangeMode( tableConfig.getConfiguration()); diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala index 4d90c944e36cacb40b84f0843c3a750d2818353a..059c9e38ad79ce013351cbbfa1e57877db06bfc7 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala @@ -125,7 +125,7 @@ class BatchPlanner( val transformations = translateToPlan(execNodes) val execEnv = getExecEnv - ExecutorUtils.setBatchProperties(execEnv, getTableConfig) + ExecutorUtils.setBatchProperties(execEnv) val streamGraph = ExecutorUtils.generateStreamGraph(execEnv, transformations) ExecutorUtils.setBatchProperties(streamGraph, getTableConfig)