From d889e7aedac5c3bf942f037a37e1409decdfa63f Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Thu, 7 Jan 2021 13:54:08 +0100 Subject: [PATCH] [FLINK-20619][runtime] Remove setup of InputDependencyConstraint ExeucutorUtils.setBatchProperties(StreamExecutionEnvironment) does not set InputDependencyContraint anymore as it's not used anywhere. FLINK-20589 --- .../flink/table/planner/delegation/BatchExecutor.java | 2 +- .../flink/table/planner/utils/ExecutorUtils.java | 11 +---------- .../flink/table/planner/delegation/BatchPlanner.scala | 2 +- 3 files changed, 3 insertions(+), 12 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java index de221bdd8ed..6bf9cdaba85 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java @@ -47,7 +47,7 @@ public class BatchExecutor extends ExecutorBase { public Pipeline createPipeline( List> transformations, TableConfig tableConfig, String jobName) { StreamExecutionEnvironment execEnv = getExecutionEnvironment(); - ExecutorUtils.setBatchProperties(execEnv, tableConfig); + ExecutorUtils.setBatchProperties(execEnv); StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(execEnv, transformations); streamGraph.setJobName(getNonEmptyJobName(jobName)); ExecutorUtils.setBatchProperties(streamGraph, tableConfig); diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java index 7f3db8c4b64..e6a7e0c58f0 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java @@ -19,7 +19,6 @@ package org.apache.flink.table.planner.utils; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.InputDependencyConstraint; import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.dag.Transformation; import org.apache.flink.runtime.jobgraph.ScheduleMode; @@ -53,16 +52,12 @@ public class ExecutorUtils { } /** Sets batch properties for {@link StreamExecutionEnvironment}. */ - public static void setBatchProperties( - StreamExecutionEnvironment execEnv, TableConfig tableConfig) { + public static void setBatchProperties(StreamExecutionEnvironment execEnv) { ExecutionConfig executionConfig = execEnv.getConfig(); executionConfig.enableObjectReuse(); executionConfig.setLatencyTrackingInterval(-1); execEnv.getConfig().setAutoWatermarkInterval(0); execEnv.setBufferTimeout(-1); - if (isShuffleModeAllBlocking(tableConfig)) { - executionConfig.setDefaultInputDependencyConstraint(InputDependencyConstraint.ALL); - } } /** Sets batch properties for {@link StreamGraph}. */ @@ -80,10 +75,6 @@ public class ExecutorUtils { streamGraph.setGlobalDataExchangeMode(getGlobalDataExchangeMode(tableConfig)); } - private static boolean isShuffleModeAllBlocking(TableConfig tableConfig) { - return getGlobalDataExchangeMode(tableConfig) == GlobalDataExchangeMode.ALL_EDGES_BLOCKING; - } - private static GlobalDataExchangeMode getGlobalDataExchangeMode(TableConfig tableConfig) { return ShuffleModeUtils.getShuffleModeAsGlobalDataExchangeMode( tableConfig.getConfiguration()); diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala index 4d90c944e36..059c9e38ad7 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala @@ -125,7 +125,7 @@ class BatchPlanner( val transformations = translateToPlan(execNodes) val execEnv = getExecEnv - ExecutorUtils.setBatchProperties(execEnv, getTableConfig) + ExecutorUtils.setBatchProperties(execEnv) val streamGraph = ExecutorUtils.generateStreamGraph(execEnv, transformations) ExecutorUtils.setBatchProperties(streamGraph, getTableConfig) -- GitLab