提交 de216306 编写于 作者: K Kostas Kloudas 提交者: Robert Metzger

FLINK-2213: Makes the number of vcores per YARN container configurable.

This closes #1588
上级 1c48e346
......@@ -240,6 +240,8 @@ definition. This scheme is used **ONLY** if no other scheme is specified (explic
yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"
- `yarn.containers.vcores` The number of virtual cores (vcores) per YARN container. By default, the number of `vcores` is set to the number of slots per TaskManager, if set, or to 1, otherwise.
- `yarn.taskmanager.env.` Similar to the configuration prefix about, this prefix allows setting custom environment variables for the TaskManager processes.
- `yarn.application-master.port` (Default: 0, which lets the OS choose an ephemeral port) With this configuration option, users can specify a port, a range of ports or a list of ports for the Application Master (and JobManager) RPC port. By default we recommend using the default value (0) to let the operating system choose an appropriate port. In particular when multiple AMs are running on the same physical host, fixed port assignments prevent the AM from starting.
......
......@@ -242,6 +242,11 @@ public final class ConfigConstants {
// ------------------------ YARN Configuration ------------------------
/**
* The vcores exposed by YYARN.
*/
public static final String YARN_VCORES = "yarn.containers.vcores";
/**
* Percentage of heap space to remove from containers started by YARN.
*/
......
......@@ -17,16 +17,53 @@
*/
package org.apache.flink.yarn;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.base.Joiner;
import org.apache.commons.io.FileUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.EnumSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Collections;
import java.util.Comparator;
import java.util.Arrays;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.apache.flink.yarn.UtilsTest.addTestAppender;
import static org.apache.flink.yarn.UtilsTest.checkForLogString;
......@@ -62,6 +99,220 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
LOG.info("Finished testClientStartup()");
}
/**
* Test per-job yarn cluster
*
* This also tests the prefixed CliFrontend options for the YARN case
* We also test if the requested parallelism of 2 is passed through.
* The parallelism is requested at the YARN client (-ys).
*/
@Test
public void perJobYarnCluster() {
LOG.info("Starting perJobYarnCluster()");
addTestAppender(JobClient.class, Level.INFO);
File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"} , "streaming")); // exclude streaming wordcount here.
Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
runWithArgs(new String[]{"run", "-m", "yarn-cluster",
"-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(),
"-yn", "1",
"-ys", "2", //test that the job is executed with a DOP of 2
"-yjm", "768",
"-ytm", "1024", exampleJarLocation.getAbsolutePath()},
/* test succeeded after this string */
"Job execution complete",
/* prohibited strings: (we want to see (2/2)) */
new String[]{"System.out)(1/1) switched to FINISHED "},
RunTypes.CLI_FRONTEND, 0, true);
LOG.info("Finished perJobYarnCluster()");
}
/**
* Test TaskManager failure and also if the vcores are set correctly (see issue FLINK-2213).
*/
@Test(timeout=100000) // timeout after 100 seconds
public void testTaskManagerFailure() {
LOG.info("Starting testTaskManagerFailure()");
Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
"-n", "1",
"-jm", "768",
"-tm", "1024",
"-s", "3", // set the slots 3 to check if the vCores are set properly!
"-nm", "customName",
"-Dfancy-configuration-value=veryFancy",
"-Dyarn.maximum-failed-containers=3",
"-D" + ConfigConstants.YARN_VCORES + "=2"},
"Number of connected TaskManagers changed to 1. Slots available: 3",
RunTypes.YARN_SESSION);
Assert.assertEquals(2, getRunningContainers());
// ------------------------ Test if JobManager web interface is accessible -------
YarnClient yc = null;
try {
yc = YarnClient.createYarnClient();
yc.init(yarnConfiguration);
yc.start();
List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
Assert.assertEquals(1, apps.size()); // Only one running
ApplicationReport app = apps.get(0);
Assert.assertEquals("customName", app.getName());
String url = app.getTrackingUrl();
if(!url.endsWith("/")) {
url += "/";
}
if(!url.startsWith("http://")) {
url = "http://" + url;
}
LOG.info("Got application URL from YARN {}", url);
String response = TestBaseUtils.getFromHTTP(url + "taskmanagers/");
JsonNode parsedTMs = new ObjectMapper().readTree(response);
ArrayNode taskManagers = (ArrayNode) parsedTMs.get("taskmanagers");
Assert.assertNotNull(taskManagers);
Assert.assertEquals(1, taskManagers.size());
Assert.assertEquals(3, taskManagers.get(0).get("slotsNumber").asInt());
// get the configuration from webinterface & check if the dynamic properties from YARN show up there.
String jsonConfig = TestBaseUtils.getFromHTTP(url + "jobmanager/config");
Map<String, String> parsedConfig = WebMonitorUtils.fromKeyValueJsonArray(jsonConfig);
Assert.assertEquals("veryFancy", parsedConfig.get("fancy-configuration-value"));
Assert.assertEquals("3", parsedConfig.get("yarn.maximum-failed-containers"));
Assert.assertEquals("2", parsedConfig.get(ConfigConstants.YARN_VCORES));
// -------------- FLINK-1902: check if jobmanager hostname/port are shown in web interface
// first, get the hostname/port
String oC = outContent.toString();
Pattern p = Pattern.compile("Flink JobManager is now running on ([a-zA-Z0-9.-]+):([0-9]+)");
Matcher matches = p.matcher(oC);
String hostname = null;
String port = null;
while(matches.find()) {
hostname = matches.group(1).toLowerCase();
port = matches.group(2);
}
LOG.info("Extracted hostname:port: {} {}", hostname, port);
Assert.assertEquals("unable to find hostname in " + jsonConfig, hostname,
parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY));
Assert.assertEquals("unable to find port in " + jsonConfig, port,
parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY));
// test logfile access
String logs = TestBaseUtils.getFromHTTP(url + "jobmanager/log");
Assert.assertTrue(logs.contains("Starting YARN ApplicationMaster/JobManager (Version"));
} catch(Throwable e) {
LOG.warn("Error while running test",e);
Assert.fail(e.getMessage());
}
// ------------------------ Kill container with TaskManager and check if vcores are set correctly -------
// find container id of taskManager:
ContainerId taskManagerContainer = null;
NodeManager nodeManager = null;
UserGroupInformation remoteUgi = null;
NMTokenIdentifier nmIdent = null;
try {
remoteUgi = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
LOG.warn("Unable to get curr user", e);
Assert.fail();
}
for(int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) {
NodeManager nm = yarnCluster.getNodeManager(nmId);
ConcurrentMap<ContainerId, Container> containers = nm.getNMContext().getContainers();
for(Map.Entry<ContainerId, Container> entry : containers.entrySet()) {
String command = Joiner.on(" ").join(entry.getValue().getLaunchContext().getCommands());
if(command.contains(YarnTaskManagerRunner.class.getSimpleName())) {
taskManagerContainer = entry.getKey();
nodeManager = nm;
nmIdent = new NMTokenIdentifier(taskManagerContainer.getApplicationAttemptId(), null, "",0);
// allow myself to do stuff with the container
// remoteUgi.addCredentials(entry.getValue().getCredentials());
remoteUgi.addTokenIdentifier(nmIdent);
}
}
sleep(500);
}
Assert.assertNotNull("Unable to find container with TaskManager", taskManagerContainer);
Assert.assertNotNull("Illegal state", nodeManager);
try {
List<NodeReport> nodeReports = yc.getNodeReports(NodeState.RUNNING);
// we asked for one node with 2 vcores so we expect 2 vcores
int userVcores = 0;
for (NodeReport rep: nodeReports) {
userVcores += rep.getUsed().getVirtualCores();
}
Assert.assertEquals(2, userVcores);
} catch (Exception e) {
Assert.fail("Test failed: " + e.getMessage());
}
yc.stop();
List<ContainerId> toStop = new LinkedList<ContainerId>();
toStop.add(taskManagerContainer);
StopContainersRequest scr = StopContainersRequest.newInstance(toStop);
try {
nodeManager.getNMContext().getContainerManager().stopContainers(scr);
} catch (Throwable e) {
LOG.warn("Error stopping container", e);
Assert.fail("Error stopping container: "+e.getMessage());
}
// stateful termination check:
// wait until we saw a container being killed and AFTERWARDS a new one launched
boolean ok = false;
do {
LOG.debug("Waiting for correct order of events. Output: {}", errContent.toString());
String o = errContent.toString();
int killedOff = o.indexOf("Container killed by the ApplicationMaster");
if (killedOff != -1) {
o = o.substring(killedOff);
ok = o.indexOf("Launching container") > 0;
}
sleep(1000);
} while(!ok);
// send "stop" command to command line interface
runner.sendStop();
// wait for the thread to stop
try {
runner.join(1000);
} catch (InterruptedException e) {
LOG.warn("Interrupted while stopping runner", e);
}
LOG.warn("stopped");
// ----------- Send output to logger
System.setOut(originalStdout);
System.setErr(originalStderr);
String oC = outContent.toString();
String eC = errContent.toString();
LOG.info("Sending stdout content through logger: \n\n{}\n\n", oC);
LOG.info("Sending stderr content through logger: \n\n{}\n\n", eC);
// ------ Check if everything happened correctly
Assert.assertTrue("Expect to see failed container", eC.contains("New messages from the YARN cluster"));
Assert.assertTrue("Expect to see failed container", eC.contains("Container killed by the ApplicationMaster"));
Assert.assertTrue("Expect to see new container started", eC.contains("Launching container") && eC.contains("on host"));
// cleanup auth for the subsequent tests.
remoteUgi.getTokenIdentifiers().remove(nmIdent);
LOG.info("Finished testTaskManagerFailure()");
}
/**
* Test deployment to non-existing queue. (user-reported error)
......@@ -81,6 +332,199 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
LOG.info("Finished testNonexistingQueue()");
}
/**
* Test per-job yarn cluster with the parallelism set at the CliFrontend instead of the YARN client.
*/
@Test
public void perJobYarnClusterWithParallelism() {
LOG.info("Starting perJobYarnClusterWithParallelism()");
// write log messages to stdout as well, so that the runWithArgs() method
// is catching the log output
addTestAppender(JobClient.class, Level.INFO);
File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"}, "streaming")); // exclude streaming wordcount here.
Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
runWithArgs(new String[]{"run",
"-p", "2", //test that the job is executed with a DOP of 2
"-m", "yarn-cluster",
"-yj", flinkUberjar.getAbsolutePath(),
"-yt", flinkLibFolder.getAbsolutePath(),
"-yn", "1",
"-yjm", "768",
"-ytm", "1024", exampleJarLocation.getAbsolutePath()},
/* test succeeded after this string */
"Job execution complete",
/* prohibited strings: (we want to see (2/2)) */
new String[]{"System.out)(1/1) switched to FINISHED "},
RunTypes.CLI_FRONTEND, 0, true);
LOG.info("Finished perJobYarnClusterWithParallelism()");
}
/**
* Test a fire-and-forget job submission to a YARN cluster.
*/
@Test(timeout=60000)
public void testDetachedPerJobYarnCluster() {
LOG.info("Starting testDetachedPerJobYarnCluster()");
File exampleJarLocation = YarnTestBase.findFile(
".." + File.separator + "flink-examples" + File.separator + "flink-examples-batch",
new ContainsName(new String[] {"-WordCount.jar"}));
Assert.assertNotNull("Could not find batch wordcount jar", exampleJarLocation);
testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath());
LOG.info("Finished testDetachedPerJobYarnCluster()");
}
/**
* Test a fire-and-forget job submission to a YARN cluster.
*/
@Test(timeout=60000)
public void testDetachedPerJobYarnClusterWithStreamingJob() {
LOG.info("Starting testDetachedPerJobYarnClusterWithStreamingJob()");
File exampleJarLocation = YarnTestBase.findFile(
".." + File.separator + "flink-examples" + File.separator + "flink-examples-streaming",
new ContainsName(new String[] {"-WordCount.jar"}));
Assert.assertNotNull("Could not find streaming wordcount jar", exampleJarLocation);
testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath());
LOG.info("Finished testDetachedPerJobYarnClusterWithStreamingJob()");
}
private void testDetachedPerJobYarnClusterInternal(String job) {
YarnClient yc = YarnClient.createYarnClient();
yc.init(yarnConfiguration);
yc.start();
// get temporary folder for writing output of wordcount example
File tmpOutFolder = null;
try{
tmpOutFolder = tmp.newFolder();
}
catch(IOException e) {
throw new RuntimeException(e);
}
// get temporary file for reading input data for wordcount example
File tmpInFile;
try{
tmpInFile = tmp.newFile();
FileUtils.writeStringToFile(tmpInFile, WordCountData.TEXT);
}
catch(IOException e) {
throw new RuntimeException(e);
}
Runner runner = startWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(),
"-yt", flinkLibFolder.getAbsolutePath(),
"-yn", "1",
"-yjm", "768",
"-yD", "yarn.heap-cutoff-ratio=0.5", // test if the cutoff is passed correctly
"-ytm", "1024",
"-ys", "2", // test requesting slots from YARN.
"--yarndetached", job, tmpInFile.getAbsoluteFile().toString() , tmpOutFolder.getAbsoluteFile().toString()},
"Job has been submitted with JobID",
RunTypes.CLI_FRONTEND);
// it should usually be 2, but on slow machines, the number varies
Assert.assertTrue("There should be at most 2 containers running", getRunningContainers() <= 2);
// give the runner some time to detach
for (int attempt = 0; runner.isAlive() && attempt < 5; attempt++) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
}
Assert.assertFalse("The runner should detach.", runner.isAlive());
LOG.info("CLI Frontend has returned, so the job is running");
// find out the application id and wait until it has finished.
try {
List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
ApplicationId tmpAppId;
if (apps.size() == 1) {
// Better method to find the right appId. But sometimes the app is shutting down very fast
// Only one running
tmpAppId = apps.get(0).getApplicationId();
LOG.info("waiting for the job with appId {} to finish", tmpAppId);
// wait until the app has finished
while(yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)).size() > 0) {
sleep(500);
}
} else {
// get appId by finding the latest finished appid
apps = yc.getApplications();
Collections.sort(apps, new Comparator<ApplicationReport>() {
@Override
public int compare(ApplicationReport o1, ApplicationReport o2) {
return o1.getApplicationId().compareTo(o2.getApplicationId())*-1;
}
});
tmpAppId = apps.get(0).getApplicationId();
LOG.info("Selected {} as the last appId from {}", tmpAppId, Arrays.toString(apps.toArray()));
}
final ApplicationId id = tmpAppId;
// now it has finished.
// check the output files.
File[] listOfOutputFiles = tmpOutFolder.listFiles();
Assert.assertNotNull("Taskmanager output not found", listOfOutputFiles);
LOG.info("The job has finished. TaskManager output files found in {}", tmpOutFolder );
// read all output files in output folder to one output string
String content = "";
for(File f:listOfOutputFiles)
{
if(f.isFile())
{
content += FileUtils.readFileToString(f) + "\n";
}
}
//String content = FileUtils.readFileToString(taskmanagerOut);
// check for some of the wordcount outputs.
Assert.assertTrue("Expected string 'da 5' or '(all,2)' not found in string '"+content+"'", content.contains("da 5") || content.contains("(da,5)") || content.contains("(all,2)"));
Assert.assertTrue("Expected string 'der 29' or '(mind,1)' not found in string'"+content+"'",content.contains("der 29") || content.contains("(der,29)") || content.contains("(mind,1)"));
// check if the heap size for the TaskManager was set correctly
File jobmanagerLog = YarnTestBase.findFile("..", new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.contains("jobmanager.log") && dir.getAbsolutePath().contains(id.toString());
}
});
Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog);
content = FileUtils.readFileToString(jobmanagerLog);
// TM was started with 1024 but we cut off 50% (NOT THE DEFAULT VALUE)
String expected = "Starting TM with command=$JAVA_HOME/bin/java -Xms424m -Xmx424m";
Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log: '"+jobmanagerLog+"'",
content.contains(expected));
expected = " (2/2) (attempt #0) to ";
Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log." +
"This string checks that the job has been started with a parallelism of 2. Log contents: '"+jobmanagerLog+"'",
content.contains(expected));
// make sure the detached app is really finished.
LOG.info("Checking again that app has finished");
ApplicationReport rep;
do {
sleep(500);
rep = yc.getApplicationReport(id);
LOG.info("Got report {}", rep);
} while(rep.getYarnApplicationState() == YarnApplicationState.RUNNING);
} catch(Throwable t) {
LOG.warn("Error while detached yarn session was running", t);
Assert.fail(t.getMessage());
}
}
@After
public void checkForProhibitedLogContents() {
ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, WHITELISTED_STRINGS);
......
......@@ -18,36 +18,18 @@
package org.apache.flink.yarn;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.base.Joiner;
import org.apache.commons.io.FileUtils;
import org.apache.flink.client.FlinkYarnSessionCli;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
......@@ -63,18 +45,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.apache.flink.yarn.UtilsTest.addTestAppender;
import static org.apache.flink.yarn.UtilsTest.checkForLogString;
......@@ -104,22 +77,6 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, WHITELISTED_STRINGS);
}
/**
* Test regular operation, including command line parameter parsing.
*/
@Test
public void testClientStartup() {
LOG.info("Starting testClientStartup()");
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
"-n", "1",
"-jm", "768",
"-tm", "1024",
"-s", "2" // Test that 2 slots are started on the TaskManager.
},
"Number of connected TaskManagers changed to 1. Slots available: 2", null, RunTypes.YARN_SESSION, 0);
LOG.info("Finished testClientStartup()");
}
/**
* Test regular operation, including command line parameter parsing.
*/
......@@ -171,173 +128,6 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
LOG.info("Finished testDetachedMode()");
}
/**
* Test TaskManager failure
*/
@Test(timeout=100000) // timeout after 100 seconds
public void testTaskManagerFailure() {
LOG.info("Starting testTaskManagerFailure()");
Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
"-n", "1",
"-jm", "768",
"-tm", "1024",
"-nm", "customName",
"-Dfancy-configuration-value=veryFancy",
"-Dyarn.maximum-failed-containers=3"},
"Number of connected TaskManagers changed to 1. Slots available: 1",
RunTypes.YARN_SESSION);
Assert.assertEquals(2, getRunningContainers());
// ------------------------ Test if JobManager web interface is accessible -------
try {
YarnClient yc = YarnClient.createYarnClient();
yc.init(yarnConfiguration);
yc.start();
List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
Assert.assertEquals(1, apps.size()); // Only one running
ApplicationReport app = apps.get(0);
Assert.assertEquals("customName", app.getName());
String url = app.getTrackingUrl();
if(!url.endsWith("/")) {
url += "/";
}
if(!url.startsWith("http://")) {
url = "http://" + url;
}
LOG.info("Got application URL from YARN {}", url);
String response = TestBaseUtils.getFromHTTP(url + "taskmanagers/");
JsonNode parsedTMs = new ObjectMapper().readTree(response);
ArrayNode taskManagers = (ArrayNode) parsedTMs.get("taskmanagers");
Assert.assertNotNull(taskManagers);
Assert.assertEquals(1, taskManagers.size());
Assert.assertEquals(1, taskManagers.get(0).get("slotsNumber").asInt());
// get the configuration from webinterface & check if the dynamic properties from YARN show up there.
String jsonConfig = TestBaseUtils.getFromHTTP(url + "jobmanager/config");
Map<String, String> parsedConfig = WebMonitorUtils.fromKeyValueJsonArray(jsonConfig);
Assert.assertEquals("veryFancy", parsedConfig.get("fancy-configuration-value"));
Assert.assertEquals("3", parsedConfig.get("yarn.maximum-failed-containers"));
// -------------- FLINK-1902: check if jobmanager hostname/port are shown in web interface
// first, get the hostname/port
String oC = outContent.toString();
Pattern p = Pattern.compile("Flink JobManager is now running on ([a-zA-Z0-9.-]+):([0-9]+)");
Matcher matches = p.matcher(oC);
String hostname = null;
String port = null;
while(matches.find()) {
hostname = matches.group(1).toLowerCase();
port = matches.group(2);
}
LOG.info("Extracted hostname:port: {} {}", hostname, port);
Assert.assertEquals("unable to find hostname in " + jsonConfig, hostname,
parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY));
Assert.assertEquals("unable to find port in " + jsonConfig, port,
parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY));
// test logfile access
String logs = TestBaseUtils.getFromHTTP(url + "jobmanager/log");
Assert.assertTrue(logs.contains("Starting YARN ApplicationMaster/JobManager (Version"));
} catch(Throwable e) {
LOG.warn("Error while running test",e);
Assert.fail(e.getMessage());
}
// ------------------------ Kill container with TaskManager -------
// find container id of taskManager:
ContainerId taskManagerContainer = null;
NodeManager nodeManager = null;
UserGroupInformation remoteUgi = null;
NMTokenIdentifier nmIdent = null;
try {
remoteUgi = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
LOG.warn("Unable to get curr user", e);
Assert.fail();
}
for(int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) {
NodeManager nm = yarnCluster.getNodeManager(nmId);
ConcurrentMap<ContainerId, Container> containers = nm.getNMContext().getContainers();
for(Map.Entry<ContainerId, Container> entry : containers.entrySet()) {
String command = Joiner.on(" ").join(entry.getValue().getLaunchContext().getCommands());
if(command.contains(YarnTaskManagerRunner.class.getSimpleName())) {
taskManagerContainer = entry.getKey();
nodeManager = nm;
nmIdent = new NMTokenIdentifier(taskManagerContainer.getApplicationAttemptId(), null, "",0);
// allow myself to do stuff with the container
// remoteUgi.addCredentials(entry.getValue().getCredentials());
remoteUgi.addTokenIdentifier(nmIdent);
}
}
sleep(500);
}
Assert.assertNotNull("Unable to find container with TaskManager", taskManagerContainer);
Assert.assertNotNull("Illegal state", nodeManager);
List<ContainerId> toStop = new LinkedList<ContainerId>();
toStop.add(taskManagerContainer);
StopContainersRequest scr = StopContainersRequest.newInstance(toStop);
try {
nodeManager.getNMContext().getContainerManager().stopContainers(scr);
} catch (Throwable e) {
LOG.warn("Error stopping container", e);
Assert.fail("Error stopping container: "+e.getMessage());
}
// stateful termination check:
// wait until we saw a container being killed and AFTERWARDS a new one launched
boolean ok = false;
do {
LOG.debug("Waiting for correct order of events. Output: {}", errContent.toString());
String o = errContent.toString();
int killedOff = o.indexOf("Container killed by the ApplicationMaster");
if (killedOff != -1) {
o = o.substring(killedOff);
ok = o.indexOf("Launching container") > 0;
}
sleep(1000);
} while(!ok);
// send "stop" command to command line interface
runner.sendStop();
// wait for the thread to stop
try {
runner.join(1000);
} catch (InterruptedException e) {
LOG.warn("Interrupted while stopping runner", e);
}
LOG.warn("stopped");
// ----------- Send output to logger
System.setOut(originalStdout);
System.setErr(originalStderr);
String oC = outContent.toString();
String eC = errContent.toString();
LOG.info("Sending stdout content through logger: \n\n{}\n\n", oC);
LOG.info("Sending stderr content through logger: \n\n{}\n\n", eC);
// ------ Check if everything happened correctly
Assert.assertTrue("Expect to see failed container", eC.contains("New messages from the YARN cluster"));
Assert.assertTrue("Expect to see failed container", eC.contains("Container killed by the ApplicationMaster"));
Assert.assertTrue("Expect to see new container started", eC.contains("Launching container") && eC.contains("on host"));
// cleanup auth for the subsequent tests.
remoteUgi.getTokenIdentifiers().remove(nmIdent);
LOG.info("Finished testTaskManagerFailure()");
}
/**
* Test querying the YARN cluster.
*
......@@ -420,227 +210,6 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
"After allocating the JobManager (512MB) and (1/2) TaskManagers, the following NodeManagers are available: [3584, 256]");
}
/**
* Test per-job yarn cluster
*
* This also tests the prefixed CliFrontend options for the YARN case
* We also test if the requested parallelism of 2 is passed through.
* The parallelism is requested at the YARN client (-ys).
*/
@Test
public void perJobYarnCluster() {
LOG.info("Starting perJobYarnCluster()");
addTestAppender(JobClient.class, Level.INFO);
File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"} , "streaming")); // exclude streaming wordcount here.
Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
runWithArgs(new String[]{"run", "-m", "yarn-cluster",
"-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(),
"-yn", "1",
"-ys", "2", //test that the job is executed with a DOP of 2
"-yjm", "768",
"-ytm", "1024", exampleJarLocation.getAbsolutePath()},
/* test succeeded after this string */
"Job execution complete",
/* prohibited strings: (we want to see (2/2)) */
new String[]{"System.out)(1/1) switched to FINISHED "},
RunTypes.CLI_FRONTEND, 0, true);
LOG.info("Finished perJobYarnCluster()");
}
/**
* Test per-job yarn cluster with the parallelism set at the CliFrontend instead of the YARN client.
*/
@Test
public void perJobYarnClusterWithParallelism() {
LOG.info("Starting perJobYarnClusterWithParallelism()");
// write log messages to stdout as well, so that the runWithArgs() method
// is catching the log output
addTestAppender(JobClient.class, Level.INFO);
File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"}, "streaming")); // exclude streaming wordcount here.
Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
runWithArgs(new String[]{"run",
"-p", "2", //test that the job is executed with a DOP of 2
"-m", "yarn-cluster",
"-yj", flinkUberjar.getAbsolutePath(),
"-yt", flinkLibFolder.getAbsolutePath(),
"-yn", "1",
"-yjm", "768",
"-ytm", "1024", exampleJarLocation.getAbsolutePath()},
/* test succeeded after this string */
"Job execution complete",
/* prohibited strings: (we want to see (2/2)) */
new String[]{"System.out)(1/1) switched to FINISHED "},
RunTypes.CLI_FRONTEND, 0, true);
LOG.info("Finished perJobYarnClusterWithParallelism()");
}
private void testDetachedPerJobYarnClusterInternal(String job) {
YarnClient yc = YarnClient.createYarnClient();
yc.init(yarnConfiguration);
yc.start();
// get temporary folder for writing output of wordcount example
File tmpOutFolder = null;
try{
tmpOutFolder = tmp.newFolder();
}
catch(IOException e) {
throw new RuntimeException(e);
}
// get temporary file for reading input data for wordcount example
File tmpInFile;
try{
tmpInFile = tmp.newFile();
FileUtils.writeStringToFile(tmpInFile,WordCountData.TEXT);
}
catch(IOException e) {
throw new RuntimeException(e);
}
Runner runner = startWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(),
"-yt", flinkLibFolder.getAbsolutePath(),
"-yn", "1",
"-yjm", "768",
"-yD", "yarn.heap-cutoff-ratio=0.5", // test if the cutoff is passed correctly
"-ytm", "1024",
"-ys", "2", // test requesting slots from YARN.
"--yarndetached", job, tmpInFile.getAbsoluteFile().toString() , tmpOutFolder.getAbsoluteFile().toString()},
"Job has been submitted with JobID",
RunTypes.CLI_FRONTEND);
// it should usually be 2, but on slow machines, the number varies
Assert.assertTrue("There should be at most 2 containers running", getRunningContainers() <= 2);
// give the runner some time to detach
for (int attempt = 0; runner.isAlive() && attempt < 5; attempt++) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
}
Assert.assertFalse("The runner should detach.", runner.isAlive());
LOG.info("CLI Frontend has returned, so the job is running");
// find out the application id and wait until it has finished.
try {
List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
ApplicationId tmpAppId;
if (apps.size() == 1) {
// Better method to find the right appId. But sometimes the app is shutting down very fast
// Only one running
tmpAppId = apps.get(0).getApplicationId();
LOG.info("waiting for the job with appId {} to finish", tmpAppId);
// wait until the app has finished
while(yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)).size() > 0) {
sleep(500);
}
} else {
// get appId by finding the latest finished appid
apps = yc.getApplications();
Collections.sort(apps, new Comparator<ApplicationReport>() {
@Override
public int compare(ApplicationReport o1, ApplicationReport o2) {
return o1.getApplicationId().compareTo(o2.getApplicationId())*-1;
}
});
tmpAppId = apps.get(0).getApplicationId();
LOG.info("Selected {} as the last appId from {}", tmpAppId, Arrays.toString(apps.toArray()));
}
final ApplicationId id = tmpAppId;
// now it has finished.
// check the output files.
File[] listOfOutputFiles = tmpOutFolder.listFiles();
Assert.assertNotNull("Taskmanager output not found", listOfOutputFiles);
LOG.info("The job has finished. TaskManager output files found in {}", tmpOutFolder );
// read all output files in output folder to one output string
String content = "";
for(File f:listOfOutputFiles)
{
if(f.isFile())
{
content += FileUtils.readFileToString(f) + "\n";
}
}
//String content = FileUtils.readFileToString(taskmanagerOut);
// check for some of the wordcount outputs.
Assert.assertTrue("Expected string 'da 5' or '(all,2)' not found in string '"+content+"'", content.contains("da 5") || content.contains("(da,5)") || content.contains("(all,2)"));
Assert.assertTrue("Expected string 'der 29' or '(mind,1)' not found in string'"+content+"'",content.contains("der 29") || content.contains("(der,29)") || content.contains("(mind,1)"));
// check if the heap size for the TaskManager was set correctly
File jobmanagerLog = YarnTestBase.findFile("..", new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.contains("jobmanager.log") && dir.getAbsolutePath().contains(id.toString());
}
});
Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog);
content = FileUtils.readFileToString(jobmanagerLog);
// TM was started with 1024 but we cut off 50% (NOT THE DEFAULT VALUE)
String expected = "Starting TM with command=$JAVA_HOME/bin/java -Xms424m -Xmx424m";
Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log: '"+jobmanagerLog+"'",
content.contains(expected));
expected = " (2/2) (attempt #0) to ";
Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log." +
"This string checks that the job has been started with a parallelism of 2. Log contents: '"+jobmanagerLog+"'",
content.contains(expected));
// make sure the detached app is really finished.
LOG.info("Checking again that app has finished");
ApplicationReport rep;
do {
sleep(500);
rep = yc.getApplicationReport(id);
LOG.info("Got report {}", rep);
} while(rep.getYarnApplicationState() == YarnApplicationState.RUNNING);
} catch(Throwable t) {
LOG.warn("Error while detached yarn session was running", t);
Assert.fail(t.getMessage());
}
}
/**
* Test a fire-and-forget job submission to a YARN cluster.
*/
@Test(timeout=60000)
public void testDetachedPerJobYarnCluster() {
LOG.info("Starting testDetachedPerJobYarnCluster()");
File exampleJarLocation = YarnTestBase.findFile(
".." + File.separator + "flink-examples" + File.separator + "flink-examples-batch",
new ContainsName(new String[] {"-WordCount.jar"}));
Assert.assertNotNull("Could not find batch wordcount jar", exampleJarLocation);
testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath());
LOG.info("Finished testDetachedPerJobYarnCluster()");
}
/**
* Test a fire-and-forget job submission to a YARN cluster.
*/
@Test(timeout=60000)
public void testDetachedPerJobYarnClusterWithStreamingJob() {
LOG.info("Starting testDetachedPerJobYarnClusterWithStreamingJob()");
File exampleJarLocation = YarnTestBase.findFile(
".." + File.separator + "flink-examples" + File.separator + "flink-examples-streaming",
new ContainsName(new String[] {"-WordCount.jar"}));
Assert.assertNotNull("Could not find streaming wordcount jar", exampleJarLocation);
testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath());
LOG.info("Finished testDetachedPerJobYarnClusterWithStreamingJob()");
}
/**
* Test the YARN Java API
*/
......
......@@ -121,6 +121,7 @@ public abstract class YarnTestBase extends TestLogger {
yarnConfiguration.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true);
yarnConfiguration.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
yarnConfiguration.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2);
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 4);
yarnConfiguration.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
yarnConfiguration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
yarnConfiguration.setInt(YarnConfiguration.NM_VCORES, 666); // memory is overwritten in the MiniYARNCluster.
......@@ -371,6 +372,11 @@ public abstract class YarnTestBase extends TestLogger {
TestBaseUtils.setEnv(map);
Assert.assertTrue(yarnCluster.getServiceState() == Service.STATE.STARTED);
// wait for the nodeManagers to connect
while(!yarnCluster.waitForNodeManagersToConnect(500)) {
LOG.info("Waiting for Nodemanagers to connect");
}
} catch (Exception ex) {
ex.printStackTrace();
LOG.error("setup failure", ex);
......
......@@ -518,10 +518,14 @@ class YarnJobManager(
val priority = Records.newRecord(classOf[Priority])
priority.setPriority(0)
val taskManagerSlots = env.get(FlinkYarnClientBase.ENV_SLOTS).toInt
val vcores: Int = flinkConfiguration
.getInteger(ConfigConstants.YARN_VCORES, Math.max(taskManagerSlots, 1))
// Resource requirements for worker containers
val capability = Records.newRecord(classOf[Resource])
capability.setMemory(memoryPerTaskManager)
capability.setVirtualCores(1) // hard-code that number (YARN is not accounting for CPUs)
capability.setVirtualCores(vcores)
new ContainerRequest(capability, null, null, priority)
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册