提交 3c037be7 编写于 作者: R Robert Metzger

[yarn] Adjust default values for YARN heap memory cutoff

This closes #717
上级 11b021b0
...@@ -553,9 +553,17 @@ public final class ConfigConstants { ...@@ -553,9 +553,17 @@ public final class ConfigConstants {
// ------------------------ YARN Configuration ------------------------ // ------------------------ YARN Configuration ------------------------
public static final int DEFAULT_YARN_MIN_HEAP_CUTOFF = 384; /**
* Minimum amount of Heap memory to subtract from the requested TaskManager size.
* We came up with these values experimentally.
* Flink fails when the cutoff is set only to 500 mb.
*/
public static final int DEFAULT_YARN_MIN_HEAP_CUTOFF = 600;
public static final float DEFAULT_YARN_HEAP_CUTOFF_RATIO = 0.15f; /**
* Relative amount of memory to subtract from the requested memory.
*/
public static final float DEFAULT_YARN_HEAP_CUTOFF_RATIO = 0.25f;
// ------------------------ File System Behavior ------------------------ // ------------------------ File System Behavior ------------------------
......
...@@ -56,7 +56,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { ...@@ -56,7 +56,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
LOG.info("Starting testClientStartup()"); LOG.info("Starting testClientStartup()");
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
"-n", "1", "-n", "1",
"-jm", "512", "-jm", "768",
"-tm", "1024", "-qu", "qa-team"}, "-tm", "1024", "-qu", "qa-team"},
"Number of connected TaskManagers changed to 1. Slots available: 1", null, RunTypes.YARN_SESSION, 0); "Number of connected TaskManagers changed to 1. Slots available: 1", null, RunTypes.YARN_SESSION, 0);
LOG.info("Finished testClientStartup()"); LOG.info("Finished testClientStartup()");
...@@ -73,7 +73,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { ...@@ -73,7 +73,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
addTestAppender(FlinkYarnClient.class, Level.WARN); addTestAppender(FlinkYarnClient.class, Level.WARN);
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
"-n", "1", "-n", "1",
"-jm", "512", "-jm", "768",
"-tm", "1024", "-tm", "1024",
"-qu", "doesntExist"}, "to unknown queue: doesntExist", null, RunTypes.YARN_SESSION, 1); "-qu", "doesntExist"}, "to unknown queue: doesntExist", null, RunTypes.YARN_SESSION, 1);
checkForLogString("The specified queue 'doesntExist' does not exist. Available queues: default, qa-team"); checkForLogString("The specified queue 'doesntExist' does not exist. Available queues: default, qa-team");
......
...@@ -102,7 +102,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { ...@@ -102,7 +102,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
LOG.info("Starting testClientStartup()"); LOG.info("Starting testClientStartup()");
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
"-n", "1", "-n", "1",
"-jm", "512", "-jm", "768",
"-tm", "1024", "-tm", "1024",
"-s", "2" // Test that 2 slots are started on the TaskManager. "-s", "2" // Test that 2 slots are started on the TaskManager.
}, },
...@@ -119,7 +119,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { ...@@ -119,7 +119,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
addTestAppender(FlinkYarnSessionCli.class, Level.INFO); addTestAppender(FlinkYarnSessionCli.class, Level.INFO);
Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
"-n", "1", "-n", "1",
"-jm", "512", "-jm", "768",
"-tm", "1024", "-tm", "1024",
"--detached"}, "--detached"},
"Flink JobManager is now running on", RunTypes.YARN_SESSION); "Flink JobManager is now running on", RunTypes.YARN_SESSION);
...@@ -164,7 +164,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { ...@@ -164,7 +164,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
LOG.info("Starting testTaskManagerFailure()"); LOG.info("Starting testTaskManagerFailure()");
Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
"-n", "1", "-n", "1",
"-jm", "512", "-jm", "768",
"-tm", "1024", "-tm", "1024",
"-Dfancy-configuration-value=veryFancy", "-Dfancy-configuration-value=veryFancy",
"-Dyarn.maximum-failed-containers=3"}, "-Dyarn.maximum-failed-containers=3"},
...@@ -332,7 +332,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { ...@@ -332,7 +332,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
LOG.info("Starting testNonexistingQueue()"); LOG.info("Starting testNonexistingQueue()");
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
"-n", "1", "-n", "1",
"-jm", "512", "-jm", "768",
"-tm", "1024", "-tm", "1024",
"-qu", "doesntExist"}, "Number of connected TaskManagers changed to 1. Slots available: 1", null, RunTypes.YARN_SESSION, 0); "-qu", "doesntExist"}, "Number of connected TaskManagers changed to 1. Slots available: 1", null, RunTypes.YARN_SESSION, 0);
LOG.info("Finished testNonexistingQueue()"); LOG.info("Finished testNonexistingQueue()");
...@@ -408,7 +408,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { ...@@ -408,7 +408,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
"-yj", flinkUberjar.getAbsolutePath(), "-yj", flinkUberjar.getAbsolutePath(),
"-yn", "1", "-yn", "1",
"-ys", "2", //test that the job is executed with a DOP of 2 "-ys", "2", //test that the job is executed with a DOP of 2
"-yjm", "512", "-yjm", "768",
"-ytm", "1024", exampleJarLocation.getAbsolutePath()}, "-ytm", "1024", exampleJarLocation.getAbsolutePath()},
/* test succeeded after this string */ /* test succeeded after this string */
"Job execution switched to status FINISHED.", "Job execution switched to status FINISHED.",
...@@ -431,7 +431,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { ...@@ -431,7 +431,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
"-m", "yarn-cluster", "-m", "yarn-cluster",
"-yj", flinkUberjar.getAbsolutePath(), "-yj", flinkUberjar.getAbsolutePath(),
"-yn", "1", "-yn", "1",
"-yjm", "512", "-yjm", "768",
"-ytm", "1024", exampleJarLocation.getAbsolutePath()}, "-ytm", "1024", exampleJarLocation.getAbsolutePath()},
/* test succeeded after this string */ /* test succeeded after this string */
"Job execution switched to status FINISHED.", "Job execution switched to status FINISHED.",
...@@ -467,7 +467,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { ...@@ -467,7 +467,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
Runner runner = startWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(), Runner runner = startWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(),
"-yn", "1", "-yn", "1",
"-yjm", "512", "-yjm", "768",
"-yD", "yarn.heap-cutoff-ratio=0.5", // test if the cutoff is passed correctly "-yD", "yarn.heap-cutoff-ratio=0.5", // test if the cutoff is passed correctly
"-ytm", "1024", "-ytm", "1024",
"-ys", "2", // test requesting slots from YARN. "-ys", "2", // test requesting slots from YARN.
...@@ -541,8 +541,8 @@ public class YARNSessionFIFOITCase extends YarnTestBase { ...@@ -541,8 +541,8 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog); Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog);
content = FileUtils.readFileToString(jobmanagerLog); content = FileUtils.readFileToString(jobmanagerLog);
// expecting 512 mb, because TM was started with 1024, we cut off 50% (NOT THE DEFAULT VALUE). // expecting 512 mb, because TM was started with 1024, we cut off 50% (NOT THE DEFAULT VALUE).
Assert.assertTrue("Expected string 'Starting TM with command=$JAVA_HOME/bin/java -Xms512m -Xmx512m' not found in JobManager log: '"+jobmanagerLog+"'", Assert.assertTrue("Expected string 'Starting TM with command=$JAVA_HOME/bin/java -Xms424m -Xmx424m' not found in JobManager log: '"+jobmanagerLog+"'",
content.contains("Starting TM with command=$JAVA_HOME/bin/java -Xms512m -Xmx512m")); content.contains("Starting TM with command=$JAVA_HOME/bin/java -Xms424m -Xmx424m"));
Assert.assertTrue("Expected string ' (2/2) (attempt #0) to ' not found in JobManager log." + Assert.assertTrue("Expected string ' (2/2) (attempt #0) to ' not found in JobManager log." +
"This string checks that the job has been started with a parallelism of 2. Log contents: '"+jobmanagerLog+"'", "This string checks that the job has been started with a parallelism of 2. Log contents: '"+jobmanagerLog+"'",
content.contains(" (2/2) (attempt #0) to ")); content.contains(" (2/2) (attempt #0) to "));
...@@ -558,7 +558,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { ...@@ -558,7 +558,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
} catch(Throwable t) { } catch(Throwable t) {
LOG.warn("Error while detached yarn session was running", t); LOG.warn("Error while detached yarn session was running", t);
Assert.fail(); Assert.fail(t.getMessage());
} }
} }
...@@ -604,8 +604,8 @@ public class YARNSessionFIFOITCase extends YarnTestBase { ...@@ -604,8 +604,8 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
AbstractFlinkYarnClient flinkYarnClient = FlinkYarnSessionCli.getFlinkYarnClient(); AbstractFlinkYarnClient flinkYarnClient = FlinkYarnSessionCli.getFlinkYarnClient();
Assert.assertNotNull("unable to get yarn client", flinkYarnClient); Assert.assertNotNull("unable to get yarn client", flinkYarnClient);
flinkYarnClient.setTaskManagerCount(1); flinkYarnClient.setTaskManagerCount(1);
flinkYarnClient.setJobManagerMemory(512); flinkYarnClient.setJobManagerMemory(768);
flinkYarnClient.setTaskManagerMemory(512); flinkYarnClient.setTaskManagerMemory(768);
flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath())); flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
String confDirPath = System.getenv("FLINK_CONF_DIR"); String confDirPath = System.getenv("FLINK_CONF_DIR");
flinkYarnClient.setConfigurationDirectory(confDirPath); flinkYarnClient.setConfigurationDirectory(confDirPath);
......
...@@ -99,8 +99,8 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient { ...@@ -99,8 +99,8 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
/** /**
* Minimum memory requirements, checked by the Client. * Minimum memory requirements, checked by the Client.
*/ */
private static final int MIN_JM_MEMORY = 128; private static final int MIN_JM_MEMORY = 768; // the minimum memory should be higher than the min heap cutoff
private static final int MIN_TM_MEMORY = 128; private static final int MIN_TM_MEMORY = 768;
private Configuration conf; private Configuration conf;
private YarnClient yarnClient; private YarnClient yarnClient;
...@@ -164,7 +164,7 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient { ...@@ -164,7 +164,7 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
@Override @Override
public void setJobManagerMemory(int memoryMb) { public void setJobManagerMemory(int memoryMb) {
if(memoryMb < MIN_JM_MEMORY) { if(memoryMb < MIN_JM_MEMORY) {
throw new IllegalArgumentException("The JobManager memory is below the minimum required memory amount " throw new IllegalArgumentException("The JobManager memory ("+memoryMb+") is below the minimum required memory amount "
+ "of "+MIN_JM_MEMORY+" MB"); + "of "+MIN_JM_MEMORY+" MB");
} }
this.jobManagerMemoryMb = memoryMb; this.jobManagerMemoryMb = memoryMb;
...@@ -173,7 +173,7 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient { ...@@ -173,7 +173,7 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
@Override @Override
public void setTaskManagerMemory(int memoryMb) { public void setTaskManagerMemory(int memoryMb) {
if(memoryMb < MIN_TM_MEMORY) { if(memoryMb < MIN_TM_MEMORY) {
throw new IllegalArgumentException("The TaskManager memory is below the minimum required memory amount " throw new IllegalArgumentException("The TaskManager memory ("+memoryMb+") is below the minimum required memory amount "
+ "of "+MIN_TM_MEMORY+" MB"); + "of "+MIN_TM_MEMORY+" MB");
} }
this.taskManagerMemoryMb = memoryMb; this.taskManagerMemoryMb = memoryMb;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册