提交 d889e7ae 编写于 作者: M Matthias Pohl 提交者: Robert Metzger

[FLINK-20619][runtime] Remove setup of InputDependencyConstraint

ExeucutorUtils.setBatchProperties(StreamExecutionEnvironment) does not set
InputDependencyContraint anymore as it's not used anywhere.

FLINK-20589
上级 7404c951
......@@ -47,7 +47,7 @@ public class BatchExecutor extends ExecutorBase {
public Pipeline createPipeline(
List<Transformation<?>> transformations, TableConfig tableConfig, String jobName) {
StreamExecutionEnvironment execEnv = getExecutionEnvironment();
ExecutorUtils.setBatchProperties(execEnv, tableConfig);
ExecutorUtils.setBatchProperties(execEnv);
StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(execEnv, transformations);
streamGraph.setJobName(getNonEmptyJobName(jobName));
ExecutorUtils.setBatchProperties(streamGraph, tableConfig);
......
......@@ -19,7 +19,6 @@
package org.apache.flink.table.planner.utils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InputDependencyConstraint;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
......@@ -53,16 +52,12 @@ public class ExecutorUtils {
}
/** Sets batch properties for {@link StreamExecutionEnvironment}. */
public static void setBatchProperties(
StreamExecutionEnvironment execEnv, TableConfig tableConfig) {
public static void setBatchProperties(StreamExecutionEnvironment execEnv) {
ExecutionConfig executionConfig = execEnv.getConfig();
executionConfig.enableObjectReuse();
executionConfig.setLatencyTrackingInterval(-1);
execEnv.getConfig().setAutoWatermarkInterval(0);
execEnv.setBufferTimeout(-1);
if (isShuffleModeAllBlocking(tableConfig)) {
executionConfig.setDefaultInputDependencyConstraint(InputDependencyConstraint.ALL);
}
}
/** Sets batch properties for {@link StreamGraph}. */
......@@ -80,10 +75,6 @@ public class ExecutorUtils {
streamGraph.setGlobalDataExchangeMode(getGlobalDataExchangeMode(tableConfig));
}
private static boolean isShuffleModeAllBlocking(TableConfig tableConfig) {
return getGlobalDataExchangeMode(tableConfig) == GlobalDataExchangeMode.ALL_EDGES_BLOCKING;
}
private static GlobalDataExchangeMode getGlobalDataExchangeMode(TableConfig tableConfig) {
return ShuffleModeUtils.getShuffleModeAsGlobalDataExchangeMode(
tableConfig.getConfiguration());
......
......@@ -125,7 +125,7 @@ class BatchPlanner(
val transformations = translateToPlan(execNodes)
val execEnv = getExecEnv
ExecutorUtils.setBatchProperties(execEnv, getTableConfig)
ExecutorUtils.setBatchProperties(execEnv)
val streamGraph = ExecutorUtils.generateStreamGraph(execEnv, transformations)
ExecutorUtils.setBatchProperties(streamGraph, getTableConfig)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册