diff --git a/docs/setup/config.md b/docs/setup/config.md index 79a9527788beb93bdee1cd89c686a67597928b77..70963bbaa7f07d3a5e1b43e773ae76b297a4e000 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -240,6 +240,8 @@ definition. This scheme is used **ONLY** if no other scheme is specified (explic yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" +- `yarn.containers.vcores` The number of virtual cores (vcores) per YARN container. By default, the number of `vcores` is set to the number of slots per TaskManager, if set, or to 1, otherwise. + - `yarn.taskmanager.env.` Similar to the configuration prefix about, this prefix allows setting custom environment variables for the TaskManager processes. - `yarn.application-master.port` (Default: 0, which lets the OS choose an ephemeral port) With this configuration option, users can specify a port, a range of ports or a list of ports for the Application Master (and JobManager) RPC port. By default we recommend using the default value (0) to let the operating system choose an appropriate port. In particular when multiple AMs are running on the same physical host, fixed port assignments prevent the AM from starting. diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index ccf90b5ac0c6626c76dc35c4db0c39c03a4ce695..f4e13f6b6e43ef139dc53694f5825696df6c0c70 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -242,6 +242,11 @@ public final class ConfigConstants { // ------------------------ YARN Configuration ------------------------ + /** + * The vcores exposed by YYARN. + */ + public static final String YARN_VCORES = "yarn.containers.vcores"; + /** * Percentage of heap space to remove from containers started by YARN. */ diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java index bf47dde64ef8d1f1da1d6995c5e73f84480358d9..ca3a38b8826ff33553e8e1ed90ca0c249841ae1f 100644 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java +++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java @@ -17,16 +17,53 @@ */ package org.apache.flink.yarn; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.google.common.base.Joiner; +import org.apache.commons.io.FileUtils; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.client.JobClient; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.NMTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.log4j.Level; import org.junit.After; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.EnumSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Collections; +import java.util.Comparator; +import java.util.Arrays; +import java.util.concurrent.ConcurrentMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + import static org.apache.flink.yarn.UtilsTest.addTestAppender; import static org.apache.flink.yarn.UtilsTest.checkForLogString; @@ -62,6 +99,220 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { LOG.info("Finished testClientStartup()"); } + /** + * Test per-job yarn cluster + * + * This also tests the prefixed CliFrontend options for the YARN case + * We also test if the requested parallelism of 2 is passed through. + * The parallelism is requested at the YARN client (-ys). + */ + @Test + public void perJobYarnCluster() { + LOG.info("Starting perJobYarnCluster()"); + addTestAppender(JobClient.class, Level.INFO); + File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"} , "streaming")); // exclude streaming wordcount here. + Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation); + runWithArgs(new String[]{"run", "-m", "yarn-cluster", + "-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(), + "-yn", "1", + "-ys", "2", //test that the job is executed with a DOP of 2 + "-yjm", "768", + "-ytm", "1024", exampleJarLocation.getAbsolutePath()}, + /* test succeeded after this string */ + "Job execution complete", + /* prohibited strings: (we want to see (2/2)) */ + new String[]{"System.out)(1/1) switched to FINISHED "}, + RunTypes.CLI_FRONTEND, 0, true); + LOG.info("Finished perJobYarnCluster()"); + } + + + /** + * Test TaskManager failure and also if the vcores are set correctly (see issue FLINK-2213). + */ + @Test(timeout=100000) // timeout after 100 seconds + public void testTaskManagerFailure() { + LOG.info("Starting testTaskManagerFailure()"); + Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), + "-n", "1", + "-jm", "768", + "-tm", "1024", + "-s", "3", // set the slots 3 to check if the vCores are set properly! + "-nm", "customName", + "-Dfancy-configuration-value=veryFancy", + "-Dyarn.maximum-failed-containers=3", + "-D" + ConfigConstants.YARN_VCORES + "=2"}, + "Number of connected TaskManagers changed to 1. Slots available: 3", + RunTypes.YARN_SESSION); + + Assert.assertEquals(2, getRunningContainers()); + + // ------------------------ Test if JobManager web interface is accessible ------- + + YarnClient yc = null; + try { + yc = YarnClient.createYarnClient(); + yc.init(yarnConfiguration); + yc.start(); + + List apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)); + Assert.assertEquals(1, apps.size()); // Only one running + ApplicationReport app = apps.get(0); + Assert.assertEquals("customName", app.getName()); + String url = app.getTrackingUrl(); + if(!url.endsWith("/")) { + url += "/"; + } + if(!url.startsWith("http://")) { + url = "http://" + url; + } + LOG.info("Got application URL from YARN {}", url); + + String response = TestBaseUtils.getFromHTTP(url + "taskmanagers/"); + + JsonNode parsedTMs = new ObjectMapper().readTree(response); + ArrayNode taskManagers = (ArrayNode) parsedTMs.get("taskmanagers"); + Assert.assertNotNull(taskManagers); + Assert.assertEquals(1, taskManagers.size()); + Assert.assertEquals(3, taskManagers.get(0).get("slotsNumber").asInt()); + + // get the configuration from webinterface & check if the dynamic properties from YARN show up there. + String jsonConfig = TestBaseUtils.getFromHTTP(url + "jobmanager/config"); + Map parsedConfig = WebMonitorUtils.fromKeyValueJsonArray(jsonConfig); + + Assert.assertEquals("veryFancy", parsedConfig.get("fancy-configuration-value")); + Assert.assertEquals("3", parsedConfig.get("yarn.maximum-failed-containers")); + Assert.assertEquals("2", parsedConfig.get(ConfigConstants.YARN_VCORES)); + + // -------------- FLINK-1902: check if jobmanager hostname/port are shown in web interface + // first, get the hostname/port + String oC = outContent.toString(); + Pattern p = Pattern.compile("Flink JobManager is now running on ([a-zA-Z0-9.-]+):([0-9]+)"); + Matcher matches = p.matcher(oC); + String hostname = null; + String port = null; + while(matches.find()) { + hostname = matches.group(1).toLowerCase(); + port = matches.group(2); + } + LOG.info("Extracted hostname:port: {} {}", hostname, port); + + Assert.assertEquals("unable to find hostname in " + jsonConfig, hostname, + parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY)); + Assert.assertEquals("unable to find port in " + jsonConfig, port, + parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY)); + + // test logfile access + String logs = TestBaseUtils.getFromHTTP(url + "jobmanager/log"); + Assert.assertTrue(logs.contains("Starting YARN ApplicationMaster/JobManager (Version")); + } catch(Throwable e) { + LOG.warn("Error while running test",e); + Assert.fail(e.getMessage()); + } + + // ------------------------ Kill container with TaskManager and check if vcores are set correctly ------- + + // find container id of taskManager: + ContainerId taskManagerContainer = null; + NodeManager nodeManager = null; + UserGroupInformation remoteUgi = null; + NMTokenIdentifier nmIdent = null; + try { + remoteUgi = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + LOG.warn("Unable to get curr user", e); + Assert.fail(); + } + for(int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) { + NodeManager nm = yarnCluster.getNodeManager(nmId); + ConcurrentMap containers = nm.getNMContext().getContainers(); + for(Map.Entry entry : containers.entrySet()) { + String command = Joiner.on(" ").join(entry.getValue().getLaunchContext().getCommands()); + if(command.contains(YarnTaskManagerRunner.class.getSimpleName())) { + taskManagerContainer = entry.getKey(); + nodeManager = nm; + nmIdent = new NMTokenIdentifier(taskManagerContainer.getApplicationAttemptId(), null, "",0); + // allow myself to do stuff with the container + // remoteUgi.addCredentials(entry.getValue().getCredentials()); + remoteUgi.addTokenIdentifier(nmIdent); + } + } + sleep(500); + } + + Assert.assertNotNull("Unable to find container with TaskManager", taskManagerContainer); + Assert.assertNotNull("Illegal state", nodeManager); + + try { + List nodeReports = yc.getNodeReports(NodeState.RUNNING); + + // we asked for one node with 2 vcores so we expect 2 vcores + int userVcores = 0; + for (NodeReport rep: nodeReports) { + userVcores += rep.getUsed().getVirtualCores(); + } + Assert.assertEquals(2, userVcores); + } catch (Exception e) { + Assert.fail("Test failed: " + e.getMessage()); + } + + yc.stop(); + + List toStop = new LinkedList(); + toStop.add(taskManagerContainer); + StopContainersRequest scr = StopContainersRequest.newInstance(toStop); + + try { + nodeManager.getNMContext().getContainerManager().stopContainers(scr); + } catch (Throwable e) { + LOG.warn("Error stopping container", e); + Assert.fail("Error stopping container: "+e.getMessage()); + } + + // stateful termination check: + // wait until we saw a container being killed and AFTERWARDS a new one launched + boolean ok = false; + do { + LOG.debug("Waiting for correct order of events. Output: {}", errContent.toString()); + + String o = errContent.toString(); + int killedOff = o.indexOf("Container killed by the ApplicationMaster"); + if (killedOff != -1) { + o = o.substring(killedOff); + ok = o.indexOf("Launching container") > 0; + } + sleep(1000); + } while(!ok); + + + // send "stop" command to command line interface + runner.sendStop(); + // wait for the thread to stop + try { + runner.join(1000); + } catch (InterruptedException e) { + LOG.warn("Interrupted while stopping runner", e); + } + LOG.warn("stopped"); + + // ----------- Send output to logger + System.setOut(originalStdout); + System.setErr(originalStderr); + String oC = outContent.toString(); + String eC = errContent.toString(); + LOG.info("Sending stdout content through logger: \n\n{}\n\n", oC); + LOG.info("Sending stderr content through logger: \n\n{}\n\n", eC); + + // ------ Check if everything happened correctly + Assert.assertTrue("Expect to see failed container", eC.contains("New messages from the YARN cluster")); + Assert.assertTrue("Expect to see failed container", eC.contains("Container killed by the ApplicationMaster")); + Assert.assertTrue("Expect to see new container started", eC.contains("Launching container") && eC.contains("on host")); + + // cleanup auth for the subsequent tests. + remoteUgi.getTokenIdentifiers().remove(nmIdent); + + LOG.info("Finished testTaskManagerFailure()"); + } /** * Test deployment to non-existing queue. (user-reported error) @@ -81,6 +332,199 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { LOG.info("Finished testNonexistingQueue()"); } + /** + * Test per-job yarn cluster with the parallelism set at the CliFrontend instead of the YARN client. + */ + @Test + public void perJobYarnClusterWithParallelism() { + LOG.info("Starting perJobYarnClusterWithParallelism()"); + // write log messages to stdout as well, so that the runWithArgs() method + // is catching the log output + addTestAppender(JobClient.class, Level.INFO); + File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"}, "streaming")); // exclude streaming wordcount here. + Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation); + runWithArgs(new String[]{"run", + "-p", "2", //test that the job is executed with a DOP of 2 + "-m", "yarn-cluster", + "-yj", flinkUberjar.getAbsolutePath(), + "-yt", flinkLibFolder.getAbsolutePath(), + "-yn", "1", + "-yjm", "768", + "-ytm", "1024", exampleJarLocation.getAbsolutePath()}, + /* test succeeded after this string */ + "Job execution complete", + /* prohibited strings: (we want to see (2/2)) */ + new String[]{"System.out)(1/1) switched to FINISHED "}, + RunTypes.CLI_FRONTEND, 0, true); + LOG.info("Finished perJobYarnClusterWithParallelism()"); + } + + /** + * Test a fire-and-forget job submission to a YARN cluster. + */ + @Test(timeout=60000) + public void testDetachedPerJobYarnCluster() { + LOG.info("Starting testDetachedPerJobYarnCluster()"); + + File exampleJarLocation = YarnTestBase.findFile( + ".." + File.separator + "flink-examples" + File.separator + "flink-examples-batch", + new ContainsName(new String[] {"-WordCount.jar"})); + + Assert.assertNotNull("Could not find batch wordcount jar", exampleJarLocation); + + testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath()); + + LOG.info("Finished testDetachedPerJobYarnCluster()"); + } + + /** + * Test a fire-and-forget job submission to a YARN cluster. + */ + @Test(timeout=60000) + public void testDetachedPerJobYarnClusterWithStreamingJob() { + LOG.info("Starting testDetachedPerJobYarnClusterWithStreamingJob()"); + + File exampleJarLocation = YarnTestBase.findFile( + ".." + File.separator + "flink-examples" + File.separator + "flink-examples-streaming", + new ContainsName(new String[] {"-WordCount.jar"})); + Assert.assertNotNull("Could not find streaming wordcount jar", exampleJarLocation); + + testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath()); + + LOG.info("Finished testDetachedPerJobYarnClusterWithStreamingJob()"); + } + + private void testDetachedPerJobYarnClusterInternal(String job) { + YarnClient yc = YarnClient.createYarnClient(); + yc.init(yarnConfiguration); + yc.start(); + + // get temporary folder for writing output of wordcount example + File tmpOutFolder = null; + try{ + tmpOutFolder = tmp.newFolder(); + } + catch(IOException e) { + throw new RuntimeException(e); + } + + // get temporary file for reading input data for wordcount example + File tmpInFile; + try{ + tmpInFile = tmp.newFile(); + FileUtils.writeStringToFile(tmpInFile, WordCountData.TEXT); + } + catch(IOException e) { + throw new RuntimeException(e); + } + + Runner runner = startWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(), + "-yt", flinkLibFolder.getAbsolutePath(), + "-yn", "1", + "-yjm", "768", + "-yD", "yarn.heap-cutoff-ratio=0.5", // test if the cutoff is passed correctly + "-ytm", "1024", + "-ys", "2", // test requesting slots from YARN. + "--yarndetached", job, tmpInFile.getAbsoluteFile().toString() , tmpOutFolder.getAbsoluteFile().toString()}, + "Job has been submitted with JobID", + RunTypes.CLI_FRONTEND); + + // it should usually be 2, but on slow machines, the number varies + Assert.assertTrue("There should be at most 2 containers running", getRunningContainers() <= 2); + // give the runner some time to detach + for (int attempt = 0; runner.isAlive() && attempt < 5; attempt++) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + } + } + Assert.assertFalse("The runner should detach.", runner.isAlive()); + LOG.info("CLI Frontend has returned, so the job is running"); + + // find out the application id and wait until it has finished. + try { + List apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)); + + ApplicationId tmpAppId; + if (apps.size() == 1) { + // Better method to find the right appId. But sometimes the app is shutting down very fast + // Only one running + tmpAppId = apps.get(0).getApplicationId(); + + LOG.info("waiting for the job with appId {} to finish", tmpAppId); + // wait until the app has finished + while(yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)).size() > 0) { + sleep(500); + } + } else { + // get appId by finding the latest finished appid + apps = yc.getApplications(); + Collections.sort(apps, new Comparator() { + @Override + public int compare(ApplicationReport o1, ApplicationReport o2) { + return o1.getApplicationId().compareTo(o2.getApplicationId())*-1; + } + }); + tmpAppId = apps.get(0).getApplicationId(); + LOG.info("Selected {} as the last appId from {}", tmpAppId, Arrays.toString(apps.toArray())); + } + final ApplicationId id = tmpAppId; + + // now it has finished. + // check the output files. + File[] listOfOutputFiles = tmpOutFolder.listFiles(); + + + Assert.assertNotNull("Taskmanager output not found", listOfOutputFiles); + LOG.info("The job has finished. TaskManager output files found in {}", tmpOutFolder ); + + // read all output files in output folder to one output string + String content = ""; + for(File f:listOfOutputFiles) + { + if(f.isFile()) + { + content += FileUtils.readFileToString(f) + "\n"; + } + } + //String content = FileUtils.readFileToString(taskmanagerOut); + // check for some of the wordcount outputs. + Assert.assertTrue("Expected string 'da 5' or '(all,2)' not found in string '"+content+"'", content.contains("da 5") || content.contains("(da,5)") || content.contains("(all,2)")); + Assert.assertTrue("Expected string 'der 29' or '(mind,1)' not found in string'"+content+"'",content.contains("der 29") || content.contains("(der,29)") || content.contains("(mind,1)")); + + // check if the heap size for the TaskManager was set correctly + File jobmanagerLog = YarnTestBase.findFile("..", new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.contains("jobmanager.log") && dir.getAbsolutePath().contains(id.toString()); + } + }); + Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog); + content = FileUtils.readFileToString(jobmanagerLog); + // TM was started with 1024 but we cut off 50% (NOT THE DEFAULT VALUE) + String expected = "Starting TM with command=$JAVA_HOME/bin/java -Xms424m -Xmx424m"; + Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log: '"+jobmanagerLog+"'", + content.contains(expected)); + expected = " (2/2) (attempt #0) to "; + Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log." + + "This string checks that the job has been started with a parallelism of 2. Log contents: '"+jobmanagerLog+"'", + content.contains(expected)); + + // make sure the detached app is really finished. + LOG.info("Checking again that app has finished"); + ApplicationReport rep; + do { + sleep(500); + rep = yc.getApplicationReport(id); + LOG.info("Got report {}", rep); + } while(rep.getYarnApplicationState() == YarnApplicationState.RUNNING); + + } catch(Throwable t) { + LOG.warn("Error while detached yarn session was running", t); + Assert.fail(t.getMessage()); + } + } + @After public void checkForProhibitedLogContents() { ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, WHITELISTED_STRINGS); diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java index 98dc85f24bc888dfd92830ebc0cb73bc6d04016c..db9af8c96885513a47aab819be27daa2fd62dcfc 100644 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java +++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java @@ -18,36 +18,18 @@ package org.apache.flink.yarn; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.google.common.base.Joiner; - -import org.apache.commons.io.FileUtils; - import org.apache.flink.client.FlinkYarnSessionCli; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.runtime.client.JobClient; -import org.apache.flink.runtime.webmonitor.WebMonitorUtils; import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus; -import org.apache.flink.test.testdata.WordCountData; -import org.apache.flink.test.util.TestBaseUtils; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.security.NMTokenIdentifier; -import org.apache.hadoop.yarn.server.nodemanager.NodeManager; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; @@ -63,18 +45,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; -import java.io.FilenameFilter; -import java.io.IOException; import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; import java.util.EnumSet; -import java.util.LinkedList; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentMap; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import static org.apache.flink.yarn.UtilsTest.addTestAppender; import static org.apache.flink.yarn.UtilsTest.checkForLogString; @@ -104,22 +77,6 @@ public class YARNSessionFIFOITCase extends YarnTestBase { ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, WHITELISTED_STRINGS); } - /** - * Test regular operation, including command line parameter parsing. - */ - @Test - public void testClientStartup() { - LOG.info("Starting testClientStartup()"); - runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), - "-n", "1", - "-jm", "768", - "-tm", "1024", - "-s", "2" // Test that 2 slots are started on the TaskManager. - }, - "Number of connected TaskManagers changed to 1. Slots available: 2", null, RunTypes.YARN_SESSION, 0); - LOG.info("Finished testClientStartup()"); - } - /** * Test regular operation, including command line parameter parsing. */ @@ -171,173 +128,6 @@ public class YARNSessionFIFOITCase extends YarnTestBase { LOG.info("Finished testDetachedMode()"); } - /** - * Test TaskManager failure - */ - @Test(timeout=100000) // timeout after 100 seconds - public void testTaskManagerFailure() { - LOG.info("Starting testTaskManagerFailure()"); - Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), - "-n", "1", - "-jm", "768", - "-tm", "1024", - "-nm", "customName", - "-Dfancy-configuration-value=veryFancy", - "-Dyarn.maximum-failed-containers=3"}, - "Number of connected TaskManagers changed to 1. Slots available: 1", - RunTypes.YARN_SESSION); - - Assert.assertEquals(2, getRunningContainers()); - - // ------------------------ Test if JobManager web interface is accessible ------- - try { - YarnClient yc = YarnClient.createYarnClient(); - yc.init(yarnConfiguration); - yc.start(); - List apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)); - Assert.assertEquals(1, apps.size()); // Only one running - ApplicationReport app = apps.get(0); - Assert.assertEquals("customName", app.getName()); - String url = app.getTrackingUrl(); - if(!url.endsWith("/")) { - url += "/"; - } - if(!url.startsWith("http://")) { - url = "http://" + url; - } - LOG.info("Got application URL from YARN {}", url); - - String response = TestBaseUtils.getFromHTTP(url + "taskmanagers/"); - - - JsonNode parsedTMs = new ObjectMapper().readTree(response); - ArrayNode taskManagers = (ArrayNode) parsedTMs.get("taskmanagers"); - Assert.assertNotNull(taskManagers); - Assert.assertEquals(1, taskManagers.size()); - Assert.assertEquals(1, taskManagers.get(0).get("slotsNumber").asInt()); - - // get the configuration from webinterface & check if the dynamic properties from YARN show up there. - String jsonConfig = TestBaseUtils.getFromHTTP(url + "jobmanager/config"); - Map parsedConfig = WebMonitorUtils.fromKeyValueJsonArray(jsonConfig); - - Assert.assertEquals("veryFancy", parsedConfig.get("fancy-configuration-value")); - Assert.assertEquals("3", parsedConfig.get("yarn.maximum-failed-containers")); - - // -------------- FLINK-1902: check if jobmanager hostname/port are shown in web interface - // first, get the hostname/port - String oC = outContent.toString(); - Pattern p = Pattern.compile("Flink JobManager is now running on ([a-zA-Z0-9.-]+):([0-9]+)"); - Matcher matches = p.matcher(oC); - String hostname = null; - String port = null; - while(matches.find()) { - hostname = matches.group(1).toLowerCase(); - port = matches.group(2); - } - LOG.info("Extracted hostname:port: {} {}", hostname, port); - - Assert.assertEquals("unable to find hostname in " + jsonConfig, hostname, - parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY)); - Assert.assertEquals("unable to find port in " + jsonConfig, port, - parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY)); - - // test logfile access - String logs = TestBaseUtils.getFromHTTP(url + "jobmanager/log"); - Assert.assertTrue(logs.contains("Starting YARN ApplicationMaster/JobManager (Version")); - } catch(Throwable e) { - LOG.warn("Error while running test",e); - Assert.fail(e.getMessage()); - } - - // ------------------------ Kill container with TaskManager ------- - - // find container id of taskManager: - ContainerId taskManagerContainer = null; - NodeManager nodeManager = null; - UserGroupInformation remoteUgi = null; - NMTokenIdentifier nmIdent = null; - try { - remoteUgi = UserGroupInformation.getCurrentUser(); - } catch (IOException e) { - LOG.warn("Unable to get curr user", e); - Assert.fail(); - } - for(int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) { - NodeManager nm = yarnCluster.getNodeManager(nmId); - ConcurrentMap containers = nm.getNMContext().getContainers(); - for(Map.Entry entry : containers.entrySet()) { - String command = Joiner.on(" ").join(entry.getValue().getLaunchContext().getCommands()); - if(command.contains(YarnTaskManagerRunner.class.getSimpleName())) { - taskManagerContainer = entry.getKey(); - nodeManager = nm; - nmIdent = new NMTokenIdentifier(taskManagerContainer.getApplicationAttemptId(), null, "",0); - // allow myself to do stuff with the container - // remoteUgi.addCredentials(entry.getValue().getCredentials()); - remoteUgi.addTokenIdentifier(nmIdent); - } - } - sleep(500); - } - - Assert.assertNotNull("Unable to find container with TaskManager", taskManagerContainer); - Assert.assertNotNull("Illegal state", nodeManager); - - List toStop = new LinkedList(); - toStop.add(taskManagerContainer); - StopContainersRequest scr = StopContainersRequest.newInstance(toStop); - - try { - nodeManager.getNMContext().getContainerManager().stopContainers(scr); - } catch (Throwable e) { - LOG.warn("Error stopping container", e); - Assert.fail("Error stopping container: "+e.getMessage()); - } - - // stateful termination check: - // wait until we saw a container being killed and AFTERWARDS a new one launched - boolean ok = false; - do { - LOG.debug("Waiting for correct order of events. Output: {}", errContent.toString()); - - String o = errContent.toString(); - int killedOff = o.indexOf("Container killed by the ApplicationMaster"); - if (killedOff != -1) { - o = o.substring(killedOff); - ok = o.indexOf("Launching container") > 0; - } - sleep(1000); - } while(!ok); - - - // send "stop" command to command line interface - runner.sendStop(); - // wait for the thread to stop - try { - runner.join(1000); - } catch (InterruptedException e) { - LOG.warn("Interrupted while stopping runner", e); - } - LOG.warn("stopped"); - - // ----------- Send output to logger - System.setOut(originalStdout); - System.setErr(originalStderr); - String oC = outContent.toString(); - String eC = errContent.toString(); - LOG.info("Sending stdout content through logger: \n\n{}\n\n", oC); - LOG.info("Sending stderr content through logger: \n\n{}\n\n", eC); - - // ------ Check if everything happened correctly - Assert.assertTrue("Expect to see failed container", eC.contains("New messages from the YARN cluster")); - Assert.assertTrue("Expect to see failed container", eC.contains("Container killed by the ApplicationMaster")); - Assert.assertTrue("Expect to see new container started", eC.contains("Launching container") && eC.contains("on host")); - - // cleanup auth for the subsequent tests. - remoteUgi.getTokenIdentifiers().remove(nmIdent); - - LOG.info("Finished testTaskManagerFailure()"); - } - /** * Test querying the YARN cluster. * @@ -420,227 +210,6 @@ public class YARNSessionFIFOITCase extends YarnTestBase { "After allocating the JobManager (512MB) and (1/2) TaskManagers, the following NodeManagers are available: [3584, 256]"); } - /** - * Test per-job yarn cluster - * - * This also tests the prefixed CliFrontend options for the YARN case - * We also test if the requested parallelism of 2 is passed through. - * The parallelism is requested at the YARN client (-ys). - */ - @Test - public void perJobYarnCluster() { - LOG.info("Starting perJobYarnCluster()"); - addTestAppender(JobClient.class, Level.INFO); - File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"} , "streaming")); // exclude streaming wordcount here. - Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation); - runWithArgs(new String[]{"run", "-m", "yarn-cluster", - "-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(), - "-yn", "1", - "-ys", "2", //test that the job is executed with a DOP of 2 - "-yjm", "768", - "-ytm", "1024", exampleJarLocation.getAbsolutePath()}, - /* test succeeded after this string */ - "Job execution complete", - /* prohibited strings: (we want to see (2/2)) */ - new String[]{"System.out)(1/1) switched to FINISHED "}, - RunTypes.CLI_FRONTEND, 0, true); - LOG.info("Finished perJobYarnCluster()"); - } - - /** - * Test per-job yarn cluster with the parallelism set at the CliFrontend instead of the YARN client. - */ - @Test - public void perJobYarnClusterWithParallelism() { - LOG.info("Starting perJobYarnClusterWithParallelism()"); - // write log messages to stdout as well, so that the runWithArgs() method - // is catching the log output - addTestAppender(JobClient.class, Level.INFO); - File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"}, "streaming")); // exclude streaming wordcount here. - Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation); - runWithArgs(new String[]{"run", - "-p", "2", //test that the job is executed with a DOP of 2 - "-m", "yarn-cluster", - "-yj", flinkUberjar.getAbsolutePath(), - "-yt", flinkLibFolder.getAbsolutePath(), - "-yn", "1", - "-yjm", "768", - "-ytm", "1024", exampleJarLocation.getAbsolutePath()}, - /* test succeeded after this string */ - "Job execution complete", - /* prohibited strings: (we want to see (2/2)) */ - new String[]{"System.out)(1/1) switched to FINISHED "}, - RunTypes.CLI_FRONTEND, 0, true); - LOG.info("Finished perJobYarnClusterWithParallelism()"); - } - - private void testDetachedPerJobYarnClusterInternal(String job) { - YarnClient yc = YarnClient.createYarnClient(); - yc.init(yarnConfiguration); - yc.start(); - - // get temporary folder for writing output of wordcount example - File tmpOutFolder = null; - try{ - tmpOutFolder = tmp.newFolder(); - } - catch(IOException e) { - throw new RuntimeException(e); - } - - // get temporary file for reading input data for wordcount example - File tmpInFile; - try{ - tmpInFile = tmp.newFile(); - FileUtils.writeStringToFile(tmpInFile,WordCountData.TEXT); - } - catch(IOException e) { - throw new RuntimeException(e); - } - - Runner runner = startWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(), - "-yt", flinkLibFolder.getAbsolutePath(), - "-yn", "1", - "-yjm", "768", - "-yD", "yarn.heap-cutoff-ratio=0.5", // test if the cutoff is passed correctly - "-ytm", "1024", - "-ys", "2", // test requesting slots from YARN. - "--yarndetached", job, tmpInFile.getAbsoluteFile().toString() , tmpOutFolder.getAbsoluteFile().toString()}, - "Job has been submitted with JobID", - RunTypes.CLI_FRONTEND); - - // it should usually be 2, but on slow machines, the number varies - Assert.assertTrue("There should be at most 2 containers running", getRunningContainers() <= 2); - // give the runner some time to detach - for (int attempt = 0; runner.isAlive() && attempt < 5; attempt++) { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - } - } - Assert.assertFalse("The runner should detach.", runner.isAlive()); - LOG.info("CLI Frontend has returned, so the job is running"); - - // find out the application id and wait until it has finished. - try { - List apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)); - - ApplicationId tmpAppId; - if (apps.size() == 1) { - // Better method to find the right appId. But sometimes the app is shutting down very fast - // Only one running - tmpAppId = apps.get(0).getApplicationId(); - - LOG.info("waiting for the job with appId {} to finish", tmpAppId); - // wait until the app has finished - while(yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)).size() > 0) { - sleep(500); - } - } else { - // get appId by finding the latest finished appid - apps = yc.getApplications(); - Collections.sort(apps, new Comparator() { - @Override - public int compare(ApplicationReport o1, ApplicationReport o2) { - return o1.getApplicationId().compareTo(o2.getApplicationId())*-1; - } - }); - tmpAppId = apps.get(0).getApplicationId(); - LOG.info("Selected {} as the last appId from {}", tmpAppId, Arrays.toString(apps.toArray())); - } - final ApplicationId id = tmpAppId; - - // now it has finished. - // check the output files. - File[] listOfOutputFiles = tmpOutFolder.listFiles(); - - - Assert.assertNotNull("Taskmanager output not found", listOfOutputFiles); - LOG.info("The job has finished. TaskManager output files found in {}", tmpOutFolder ); - - // read all output files in output folder to one output string - String content = ""; - for(File f:listOfOutputFiles) - { - if(f.isFile()) - { - content += FileUtils.readFileToString(f) + "\n"; - } - } - //String content = FileUtils.readFileToString(taskmanagerOut); - // check for some of the wordcount outputs. - Assert.assertTrue("Expected string 'da 5' or '(all,2)' not found in string '"+content+"'", content.contains("da 5") || content.contains("(da,5)") || content.contains("(all,2)")); - Assert.assertTrue("Expected string 'der 29' or '(mind,1)' not found in string'"+content+"'",content.contains("der 29") || content.contains("(der,29)") || content.contains("(mind,1)")); - - // check if the heap size for the TaskManager was set correctly - File jobmanagerLog = YarnTestBase.findFile("..", new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.contains("jobmanager.log") && dir.getAbsolutePath().contains(id.toString()); - } - }); - Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog); - content = FileUtils.readFileToString(jobmanagerLog); - // TM was started with 1024 but we cut off 50% (NOT THE DEFAULT VALUE) - String expected = "Starting TM with command=$JAVA_HOME/bin/java -Xms424m -Xmx424m"; - Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log: '"+jobmanagerLog+"'", - content.contains(expected)); - expected = " (2/2) (attempt #0) to "; - Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log." + - "This string checks that the job has been started with a parallelism of 2. Log contents: '"+jobmanagerLog+"'", - content.contains(expected)); - - // make sure the detached app is really finished. - LOG.info("Checking again that app has finished"); - ApplicationReport rep; - do { - sleep(500); - rep = yc.getApplicationReport(id); - LOG.info("Got report {}", rep); - } while(rep.getYarnApplicationState() == YarnApplicationState.RUNNING); - - } catch(Throwable t) { - LOG.warn("Error while detached yarn session was running", t); - Assert.fail(t.getMessage()); - } - } - - /** - * Test a fire-and-forget job submission to a YARN cluster. - */ - @Test(timeout=60000) - public void testDetachedPerJobYarnCluster() { - LOG.info("Starting testDetachedPerJobYarnCluster()"); - - File exampleJarLocation = YarnTestBase.findFile( - ".." + File.separator + "flink-examples" + File.separator + "flink-examples-batch", - new ContainsName(new String[] {"-WordCount.jar"})); - - Assert.assertNotNull("Could not find batch wordcount jar", exampleJarLocation); - - testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath()); - - LOG.info("Finished testDetachedPerJobYarnCluster()"); - } - - /** - * Test a fire-and-forget job submission to a YARN cluster. - */ - @Test(timeout=60000) - public void testDetachedPerJobYarnClusterWithStreamingJob() { - LOG.info("Starting testDetachedPerJobYarnClusterWithStreamingJob()"); - - File exampleJarLocation = YarnTestBase.findFile( - ".." + File.separator + "flink-examples" + File.separator + "flink-examples-streaming", - new ContainsName(new String[] {"-WordCount.jar"})); - Assert.assertNotNull("Could not find streaming wordcount jar", exampleJarLocation); - - testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath()); - - LOG.info("Finished testDetachedPerJobYarnClusterWithStreamingJob()"); - } - - /** * Test the YARN Java API */ diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java index d3132d7717b375191cacde499a16a3e9e38254e6..a0a517c9eaea67cd08c9cab6c71ea8c8524c0e03 100644 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java @@ -121,6 +121,7 @@ public abstract class YarnTestBase extends TestLogger { yarnConfiguration.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true); yarnConfiguration.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); yarnConfiguration.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2); + yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 4); yarnConfiguration.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600); yarnConfiguration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false); yarnConfiguration.setInt(YarnConfiguration.NM_VCORES, 666); // memory is overwritten in the MiniYARNCluster. @@ -371,6 +372,11 @@ public abstract class YarnTestBase extends TestLogger { TestBaseUtils.setEnv(map); Assert.assertTrue(yarnCluster.getServiceState() == Service.STATE.STARTED); + + // wait for the nodeManagers to connect + while(!yarnCluster.waitForNodeManagersToConnect(500)) { + LOG.info("Waiting for Nodemanagers to connect"); + } } catch (Exception ex) { ex.printStackTrace(); LOG.error("setup failure", ex); diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala index 266345154e474f714083ed0143f75b013cdf29d9..314c5bd351830467e55cb7c7c532179175eee177 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala @@ -518,10 +518,14 @@ class YarnJobManager( val priority = Records.newRecord(classOf[Priority]) priority.setPriority(0) + val taskManagerSlots = env.get(FlinkYarnClientBase.ENV_SLOTS).toInt + val vcores: Int = flinkConfiguration + .getInteger(ConfigConstants.YARN_VCORES, Math.max(taskManagerSlots, 1)) + // Resource requirements for worker containers val capability = Records.newRecord(classOf[Resource]) capability.setMemory(memoryPerTaskManager) - capability.setVirtualCores(1) // hard-code that number (YARN is not accounting for CPUs) + capability.setVirtualCores(vcores) new ContainerRequest(capability, null, null, priority) }