未验证 提交 0da62e8d 编写于 作者: W WangTaoTheTonic 提交者: Till Rohrmann

[FLINK-5904] Make jobmanager.heap.mb and taskmanager.heap.mb work in YARN mode

This closes #3414.
上级 9fb074c9
......@@ -113,7 +113,7 @@ public final class ConfigConstants {
public static final String EXECUTION_RETRY_DELAY_KEY = "execution-retries.delay";
// -------------------------------- Runtime -------------------------------
/**
* The config parameter defining the network address to connect to
* for communication with the job manager.
......
......@@ -57,6 +57,11 @@ public class JobManagerOptions {
key("jobmanager.rpc.port")
.defaultValue(6123);
/** JVM heap size (in megabytes) for the JobManager */
public static final ConfigOption<Integer> JOB_MANAGER_HEAP_MEMORY =
key("jobmanager.heap.mb")
.defaultValue(1024);
/**
* The maximum number of prior execution attempts kept in history.
*/
......
......@@ -39,6 +39,11 @@ public class TaskManagerOptions {
key("taskmanager.jvm-exit-on-oom")
.defaultValue(false);
/** JVM heap size (in megabytes) for the TaskManagers */
public static final ConfigOption<Integer> TASK_MANAGER_HEAP_MEMORY =
key("taskmanager.heap.mb")
.defaultValue(1024);
/** Size of memory buffers used by the network stack and the memory manager (in bytes). */
public static final ConfigOption<Integer> MEMORY_SEGMENT_SIZE =
key("taskmanager.memory.segment-size")
......
......@@ -26,10 +26,12 @@ import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmanager.JobManagerOptions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
......@@ -113,9 +115,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
*/
private int slots = -1;
private int jobManagerMemoryMb = 1024;
private int jobManagerMemoryMb = JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.defaultValue();
private int taskManagerMemoryMb = 1024;
private int taskManagerMemoryMb = TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.defaultValue();
private int taskManagerCount = 1;
......@@ -166,6 +168,13 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
flinkConfigurationPath = new Path(confFile.getAbsolutePath());
slots = flinkConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
if (flinkConfiguration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key())) {
jobManagerMemoryMb = flinkConfiguration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
}
if (flinkConfiguration.containsKey(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.key())) {
taskManagerMemoryMb = flinkConfiguration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
}
} catch (Exception e) {
LOG.debug("Config couldn't be loaded from environment variable.");
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册