提交 0e69dd5c 编写于 作者: Z zentol

[FLINK-6699] Activate strict checkstyle for flink-yarn-tests

This closes #3985.
上级 20e4b994
......@@ -18,8 +18,6 @@
package org.apache.flink.yarn;
import org.apache.commons.cli.CommandLine;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.CommandLineOptions;
......@@ -33,6 +31,8 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.commons.cli.CommandLine;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
......@@ -40,14 +40,12 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import java.io.File;
......@@ -73,8 +71,8 @@ public class CliFrontendYarnAddressConfigurationTest {
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private final static PrintStream OUT = System.out;
private final static PrintStream ERR = System.err;
private static final PrintStream OUT = System.out;
private static final PrintStream ERR = System.err;
@BeforeClass
public static void disableStdOutErr() {
......@@ -106,7 +104,6 @@ public class CliFrontendYarnAddressConfigurationTest {
private static final String validPropertiesFile = "applicationID=" + TEST_YARN_APPLICATION_ID;
private static final String TEST_JOB_MANAGER_ADDRESS = "192.168.1.33";
private static final int TEST_JOB_MANAGER_PORT = 55443;
......@@ -114,11 +111,9 @@ public class CliFrontendYarnAddressConfigurationTest {
"jobmanager.rpc.address: " + TEST_JOB_MANAGER_ADDRESS + "\n" +
"jobmanager.rpc.port: " + TEST_JOB_MANAGER_PORT;
private static final String invalidPropertiesFile =
"jasfobManager=" + TEST_YARN_JOB_MANAGER_ADDRESS + ":asf" + TEST_YARN_JOB_MANAGER_PORT;
/**
* Test that the CliFrontend is able to pick up the .yarn-properties file from a specified location.
*/
......@@ -175,7 +170,6 @@ public class CliFrontendYarnAddressConfigurationTest {
TEST_JOB_MANAGER_PORT);
}
@Test
public void testResumeFromYarnID() throws Exception {
File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
......@@ -257,7 +251,6 @@ public class CliFrontendYarnAddressConfigurationTest {
TEST_YARN_JOB_MANAGER_PORT);
}
@Test
public void testYarnIDOverridesPropertiesFile() throws Exception {
File directoryPath = writeYarnPropertiesFile(invalidPropertiesFile);
......@@ -276,7 +269,6 @@ public class CliFrontendYarnAddressConfigurationTest {
TEST_YARN_JOB_MANAGER_PORT);
}
@Test
public void testManualOptionsOverridesYarn() throws Exception {
......@@ -307,7 +299,7 @@ public class CliFrontendYarnAddressConfigurationTest {
String currentUser = System.getProperty("user.name");
// copy .yarn-properties-<username>
File testPropertiesFile = new File(tmpFolder, ".yarn-properties-"+currentUser);
File testPropertiesFile = new File(tmpFolder, ".yarn-properties-" + currentUser);
Files.write(testPropertiesFile.toPath(), contents.getBytes(), StandardOpenOption.CREATE);
// copy reference flink-conf.yaml to temporary test directory and append custom configuration path.
......@@ -336,9 +328,8 @@ public class CliFrontendYarnAddressConfigurationTest {
}
}
/**
* Injects an extended FlinkYarnSessionCli that deals with mocking Yarn communication
* Injects an extended FlinkYarnSessionCli that deals with mocking Yarn communication.
*/
private static class CustomYarnTestCLI extends TestCLI {
......@@ -396,7 +387,6 @@ public class CliFrontendYarnAddressConfigurationTest {
return Mockito.mock(YarnClusterClient.class);
}
private class TestYarnClient extends YarnClientImpl {
private final List<ApplicationReport> reports = new LinkedList<>();
......@@ -439,7 +429,6 @@ public class CliFrontendYarnAddressConfigurationTest {
}
}
private static void checkJobManagerAddress(Configuration config, String expectedAddress, int expectedPort) {
String jobManagerAddress = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
......
......@@ -18,19 +18,19 @@
package org.apache.flink.yarn;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.RunOptions;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.api.YarnClient;
......@@ -41,18 +41,20 @@ import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
/**
* Tests for the FlinkYarnSessionCli.
*/
public class FlinkYarnSessionCliTest {
@Rule
public TemporaryFolder tmp = new TemporaryFolder();
@Test
public void testDynamicProperties() throws IOException {
public void testDynamicProperties() throws Exception {
Map<String, String> map = new HashMap<String, String>(System.getenv());
File tmpFolder = tmp.newFolder();
......@@ -66,14 +68,8 @@ public class FlinkYarnSessionCliTest {
cli.addRunOptions(options);
CommandLineParser parser = new DefaultParser();
CommandLine cmd = null;
try {
cmd = parser.parse(options, new String[]{"run", "-j", "fake.jar", "-n", "15",
CommandLine cmd = parser.parse(options, new String[]{"run", "-j", "fake.jar", "-n", "15",
"-D", "akka.ask.timeout=5 min", "-D", "env.java.opts=-DappName=foobar"});
} catch(Exception e) {
e.printStackTrace();
Assert.fail("Parsing failed with " + e.getMessage());
}
AbstractYarnClusterDescriptor flinkYarnDescriptor = cli.createDescriptor(null, cmd);
......
......@@ -27,7 +27,7 @@ import java.util.List;
/**
* Yarn client which starts a {@link TestingApplicationMaster}. Additionally the client adds the
* flink-yarn-tests-XXX-tests.jar and the flink-runtime-XXX-tests.jar to the set of files which
* flink-yarn-tests-X-tests.jar and the flink-runtime-X-tests.jar to the set of files which
* are shipped to the yarn cluster. This is necessary to load the testing classes.
*/
public class TestingYarnClusterDescriptor extends AbstractYarnClusterDescriptor {
......@@ -59,7 +59,7 @@ public class TestingYarnClusterDescriptor extends AbstractYarnClusterDescriptor
return TestingApplicationMaster.class;
}
public static class TestJarFinder implements FilenameFilter {
private static class TestJarFinder implements FilenameFilter {
private final String jarName;
......
......@@ -15,10 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.yarn;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
......@@ -33,6 +35,9 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
* Tests for various utilities.
*/
public class UtilsTest {
private static final Logger LOG = LoggerFactory.getLogger(UtilsTest.class);
......@@ -60,8 +65,8 @@ public class UtilsTest {
conf.setDouble(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, 0.15);
conf.setInteger(ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN, 384);
Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf) );
Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) );
Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf));
Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf));
// test different configuration
Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf));
......@@ -81,8 +86,8 @@ public class UtilsTest {
conf.setDouble(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 0.15);
conf.setInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, 384);
Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf) );
Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) );
Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf));
Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf));
}
@Test(expected = IllegalArgumentException.class)
......@@ -145,15 +150,15 @@ public class UtilsTest {
public static void checkForLogString(String expected) {
LoggingEvent found = getEventContainingString(expected);
if(found != null) {
LOG.info("Found expected string '"+expected+"' in log message "+found);
if (found != null) {
LOG.info("Found expected string '" + expected + "' in log message " + found);
return;
}
Assert.fail("Unable to find expected string '" + expected + "' in log messages");
}
public static LoggingEvent getEventContainingString(String expected) {
if(testAppender == null) {
if (testAppender == null) {
throw new NullPointerException("Initialize test appender first");
}
LoggingEvent found = null;
......@@ -169,10 +174,16 @@ public class UtilsTest {
return found;
}
public static class TestAppender extends AppenderSkeleton {
private static class TestAppender extends AppenderSkeleton {
public final List<LoggingEvent> events = new ArrayList<>();
public void close() {}
public boolean requiresLayout() {return false;}
public void close() {
}
public boolean requiresLayout() {
return false;
}
@Override
protected void append(LoggingEvent event) {
synchronized (events){
......
......@@ -18,10 +18,6 @@
package org.apache.flink.yarn;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.testkit.JavaTestKit;
import org.apache.curator.test.TestingServer;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.CoreOptions;
......@@ -37,6 +33,11 @@ import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.testkit.JavaTestKit;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.AfterClass;
......@@ -45,19 +46,23 @@ import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import scala.concurrent.duration.FiniteDuration;
import java.io.File;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import scala.concurrent.duration.FiniteDuration;
/**
* Tests that verify correct HA behavior.
*/
public class YARNHighAvailabilityITCase extends YarnTestBase {
protected static TestingServer zkServer;
private static TestingServer zkServer;
protected static ActorSystem actorSystem;
private static ActorSystem actorSystem;
protected static final int numberApplicationAttempts = 3;
private static final int numberApplicationAttempts = 3;
@Rule
public TemporaryFolder temp = new TemporaryFolder();
......@@ -74,15 +79,15 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
Assert.fail("Could not start ZooKeeper testing cluster.");
}
yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-ha");
yarnConfiguration.set(YarnConfiguration.RM_AM_MAX_ATTEMPTS, "" + numberApplicationAttempts);
YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-ha");
YARN_CONFIGURATION.set(YarnConfiguration.RM_AM_MAX_ATTEMPTS, "" + numberApplicationAttempts);
startYARNWithConfig(yarnConfiguration);
startYARNWithConfig(YARN_CONFIGURATION);
}
@AfterClass
public static void teardown() throws Exception {
if(zkServer != null) {
if (zkServer != null) {
zkServer.stop();
}
......
......@@ -15,14 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.yarn;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.base.Joiner;
import com.google.common.collect.Sets;
import org.apache.commons.io.FileUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.client.JobClient;
......@@ -30,6 +25,13 @@ import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.base.Joiner;
import com.google.common.collect.Sets;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
......@@ -56,7 +58,14 @@ import java.io.FilenameFilter;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.*;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
......@@ -64,7 +73,6 @@ import java.util.regex.Pattern;
import static org.apache.flink.yarn.UtilsTest.addTestAppender;
import static org.apache.flink.yarn.UtilsTest.checkForLogString;
/**
* This test starts a MiniYARNCluster with a CapacityScheduler.
* Is has, by default a queue called "default". The configuration here adds another queue: "qa-team".
......@@ -74,12 +82,12 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
@BeforeClass
public static void setup() {
yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class);
yarnConfiguration.set("yarn.scheduler.capacity.root.queues", "default,qa-team");
yarnConfiguration.setInt("yarn.scheduler.capacity.root.default.capacity", 40);
yarnConfiguration.setInt("yarn.scheduler.capacity.root.qa-team.capacity", 60);
yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-capacityscheduler");
startYARNWithConfig(yarnConfiguration);
YARN_CONFIGURATION.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class);
YARN_CONFIGURATION.set("yarn.scheduler.capacity.root.queues", "default,qa-team");
YARN_CONFIGURATION.setInt("yarn.scheduler.capacity.root.default.capacity", 40);
YARN_CONFIGURATION.setInt("yarn.scheduler.capacity.root.qa-team.capacity", 60);
YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-capacityscheduler");
startYARNWithConfig(YARN_CONFIGURATION);
}
/**
......@@ -99,7 +107,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
/**
* Test per-job yarn cluster
*
* This also tests the prefixed CliFrontend options for the YARN case
* <p>This also tests the prefixed CliFrontend options for the YARN case
* We also test if the requested parallelism of 2 is passed through.
* The parallelism is requested at the YARN client (-ys).
*/
......@@ -123,11 +131,10 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
LOG.info("Finished perJobYarnCluster()");
}
/**
* Test TaskManager failure and also if the vcores are set correctly (see issue FLINK-2213).
*/
@Test(timeout=100000) // timeout after 100 seconds
@Test(timeout = 100000) // timeout after 100 seconds
public void testTaskManagerFailure() {
LOG.info("Starting testTaskManagerFailure()");
Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
......@@ -149,7 +156,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
YarnClient yc = null;
try {
yc = YarnClient.createYarnClient();
yc.init(yarnConfiguration);
yc.init(YARN_CONFIGURATION);
yc.start();
List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
......@@ -157,10 +164,10 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
ApplicationReport app = apps.get(0);
Assert.assertEquals("customName", app.getName());
String url = app.getTrackingUrl();
if(!url.endsWith("/")) {
if (!url.endsWith("/")) {
url += "/";
}
if(!url.startsWith("http://")) {
if (!url.startsWith("http://")) {
url = "http://" + url;
}
LOG.info("Got application URL from YARN {}", url);
......@@ -188,7 +195,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
Matcher matches = p.matcher(oC);
String hostname = null;
String port = null;
while(matches.find()) {
while (matches.find()) {
hostname = matches.group(1).toLowerCase();
port = matches.group(2);
}
......@@ -204,8 +211,8 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
Assert.assertTrue(logs.contains("Starting YARN ApplicationMaster"));
Assert.assertTrue(logs.contains("Starting JobManager"));
Assert.assertTrue(logs.contains("Starting JobManager Web Frontend"));
} catch(Throwable e) {
LOG.warn("Error while running test",e);
} catch (Throwable e) {
LOG.warn("Error while running test", e);
Assert.fail(e.getMessage());
}
......@@ -222,15 +229,15 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
LOG.warn("Unable to get curr user", e);
Assert.fail();
}
for(int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) {
for (int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) {
NodeManager nm = yarnCluster.getNodeManager(nmId);
ConcurrentMap<ContainerId, Container> containers = nm.getNMContext().getContainers();
for(Map.Entry<ContainerId, Container> entry : containers.entrySet()) {
for (Map.Entry<ContainerId, Container> entry : containers.entrySet()) {
String command = Joiner.on(" ").join(entry.getValue().getLaunchContext().getCommands());
if(command.contains(YarnTaskManager.class.getSimpleName())) {
if (command.contains(YarnTaskManager.class.getSimpleName())) {
taskManagerContainer = entry.getKey();
nodeManager = nm;
nmIdent = new NMTokenIdentifier(taskManagerContainer.getApplicationAttemptId(), null, "",0);
nmIdent = new NMTokenIdentifier(taskManagerContainer.getApplicationAttemptId(), null, "", 0);
// allow myself to do stuff with the container
// remoteUgi.addCredentials(entry.getValue().getCredentials());
remoteUgi.addTokenIdentifier(nmIdent);
......@@ -252,7 +259,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
nodeManager.getNMContext().getContainerManager().stopContainers(scr);
} catch (Throwable e) {
LOG.warn("Error stopping container", e);
Assert.fail("Error stopping container: "+e.getMessage());
Assert.fail("Error stopping container: " + e.getMessage());
}
// stateful termination check:
......@@ -270,7 +277,6 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
sleep(1000);
} while(!ok);
// send "stop" command to command line interface
runner.sendStop();
// wait for the thread to stop
......@@ -282,8 +288,8 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
LOG.warn("stopped");
// ----------- Send output to logger
System.setOut(originalStdout);
System.setErr(originalStderr);
System.setOut(ORIGINAL_STDOUT);
System.setErr(ORIGINAL_STDERR);
String oC = outContent.toString();
String eC = errContent.toString();
LOG.info("Sending stdout content through logger: \n\n{}\n\n", oC);
......@@ -354,7 +360,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
/**
* Test a fire-and-forget job submission to a YARN cluster.
*/
@Test(timeout=60000)
@Test(timeout = 60000)
public void testDetachedPerJobYarnCluster() {
LOG.info("Starting testDetachedPerJobYarnCluster()");
......@@ -372,7 +378,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
/**
* Test a fire-and-forget job submission to a YARN cluster.
*/
@Test(timeout=60000)
@Test(timeout = 60000)
public void testDetachedPerJobYarnClusterWithStreamingJob() {
LOG.info("Starting testDetachedPerJobYarnClusterWithStreamingJob()");
......@@ -388,25 +394,25 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
private void testDetachedPerJobYarnClusterInternal(String job) {
YarnClient yc = YarnClient.createYarnClient();
yc.init(yarnConfiguration);
yc.init(YARN_CONFIGURATION);
yc.start();
// get temporary folder for writing output of wordcount example
File tmpOutFolder = null;
try{
try {
tmpOutFolder = tmp.newFolder();
}
catch(IOException e) {
catch (IOException e) {
throw new RuntimeException(e);
}
// get temporary file for reading input data for wordcount example
File tmpInFile;
try{
try {
tmpInFile = tmp.newFile();
FileUtils.writeStringToFile(tmpInFile, WordCountData.TEXT);
}
catch(IOException e) {
catch (IOException e) {
throw new RuntimeException(e);
}
......@@ -450,7 +456,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
LOG.info("waiting for the job with appId {} to finish", tmpAppId);
// wait until the app has finished
while(yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)).size() > 0) {
while (yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)).size() > 0) {
sleep(500);
}
} else {
......@@ -459,7 +465,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
Collections.sort(apps, new Comparator<ApplicationReport>() {
@Override
public int compare(ApplicationReport o1, ApplicationReport o2) {
return o1.getApplicationId().compareTo(o2.getApplicationId())*-1;
return o1.getApplicationId().compareTo(o2.getApplicationId()) * -1;
}
});
tmpAppId = apps.get(0).getApplicationId();
......@@ -471,23 +477,20 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
// check the output files.
File[] listOfOutputFiles = tmpOutFolder.listFiles();
Assert.assertNotNull("Taskmanager output not found", listOfOutputFiles);
LOG.info("The job has finished. TaskManager output files found in {}", tmpOutFolder );
LOG.info("The job has finished. TaskManager output files found in {}", tmpOutFolder);
// read all output files in output folder to one output string
String content = "";
for(File f:listOfOutputFiles)
{
if(f.isFile())
{
for (File f:listOfOutputFiles) {
if (f.isFile()) {
content += FileUtils.readFileToString(f) + "\n";
}
}
//String content = FileUtils.readFileToString(taskmanagerOut);
// check for some of the wordcount outputs.
Assert.assertTrue("Expected string 'da 5' or '(all,2)' not found in string '"+content+"'", content.contains("da 5") || content.contains("(da,5)") || content.contains("(all,2)"));
Assert.assertTrue("Expected string 'der 29' or '(mind,1)' not found in string'"+content+"'",content.contains("der 29") || content.contains("(der,29)") || content.contains("(mind,1)"));
Assert.assertTrue("Expected string 'da 5' or '(all,2)' not found in string '" + content + "'", content.contains("da 5") || content.contains("(da,5)") || content.contains("(all,2)"));
Assert.assertTrue("Expected string 'der 29' or '(mind,1)' not found in string'" + content + "'", content.contains("der 29") || content.contains("(der,29)") || content.contains("(mind,1)"));
// check if the heap size for the TaskManager was set correctly
File jobmanagerLog = YarnTestBase.findFile("..", new FilenameFilter() {
......@@ -500,11 +503,11 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
content = FileUtils.readFileToString(jobmanagerLog);
// TM was started with 1024 but we cut off 50% (NOT THE DEFAULT VALUE)
String expected = "Starting TaskManagers with command: $JAVA_HOME/bin/java -Xms424m -Xmx424m";
Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log: '"+jobmanagerLog+"'",
Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log: '" + jobmanagerLog + "'",
content.contains(expected));
expected = " (2/2) (attempt #0) to ";
Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log." +
"This string checks that the job has been started with a parallelism of 2. Log contents: '"+jobmanagerLog+"'",
"This string checks that the job has been started with a parallelism of 2. Log contents: '" + jobmanagerLog + "'",
content.contains(expected));
// make sure the detached app is really finished.
......@@ -514,10 +517,10 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
sleep(500);
rep = yc.getApplicationReport(id);
LOG.info("Got report {}", rep);
} while(rep.getYarnApplicationState() == YarnApplicationState.RUNNING);
} while (rep.getYarnApplicationState() == YarnApplicationState.RUNNING);
verifyApplicationTags(rep);
} catch(Throwable t) {
} catch (Throwable t) {
LOG.warn("Error while detached yarn session was running", t);
Assert.fail(t.getMessage());
} finally {
......@@ -533,7 +536,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
try {
File yarnPropertiesFile = FlinkYarnSessionCli.getYarnPropertiesLocation(GlobalConfiguration.loadConfiguration());
if(yarnPropertiesFile.exists()) {
if (yarnPropertiesFile.exists()) {
LOG.info("testDetachedPerJobYarnClusterInternal: Cleaning up temporary Yarn address reference: {}", yarnPropertiesFile.getAbsolutePath());
yarnPropertiesFile.delete();
}
......@@ -547,7 +550,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
/**
* Ensures that the YARN application tags were set properly.
*
* Since YARN application tags were only added in Hadoop 2.4, but Flink still supports Hadoop 2.3, reflection is
* <p>Since YARN application tags were only added in Hadoop 2.4, but Flink still supports Hadoop 2.3, reflection is
* required to invoke the methods. If the method does not exist, this test passes.
*/
private void verifyApplicationTags(final ApplicationReport report) throws InvocationTargetException,
......
......@@ -20,9 +20,9 @@ package org.apache.flink.yarn;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
......@@ -32,15 +32,12 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -52,7 +49,6 @@ import java.util.List;
import static org.apache.flink.yarn.UtilsTest.addTestAppender;
import static org.apache.flink.yarn.UtilsTest.checkForLogString;
/**
* This test starts a MiniYARNCluster with a FIFO scheduler.
* There are no queues for that scheduler.
......@@ -65,11 +61,11 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
*/
@BeforeClass
public static void setup() {
yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class);
yarnConfiguration.setInt(YarnConfiguration.NM_PMEM_MB, 768);
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-fifo");
startYARNWithConfig(yarnConfiguration);
YARN_CONFIGURATION.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class);
YARN_CONFIGURATION.setInt(YarnConfiguration.NM_PMEM_MB, 768);
YARN_CONFIGURATION.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-fifo");
startYARNWithConfig(YARN_CONFIGURATION);
}
@After
......@@ -80,7 +76,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
/**
* Test regular operation, including command line parameter parsing.
*/
@Test(timeout=60000) // timeout after a minute.
@Test(timeout = 60000) // timeout after a minute.
public void testDetachedMode() throws InterruptedException {
LOG.info("Starting testDetachedMode()");
addTestAppender(FlinkYarnSessionCli.class, Level.INFO);
......@@ -100,7 +96,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
LOG.info("Waiting until two containers are running");
// wait until two containers are running
while(getRunningContainers() < 2) {
while (getRunningContainers() < 2) {
sleep(500);
}
......@@ -111,7 +107,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
// kill application "externally".
try {
YarnClient yc = YarnClient.createYarnClient();
yc.init(yarnConfiguration);
yc.init(YARN_CONFIGURATION);
yc.start();
List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
Assert.assertEquals(1, apps.size()); // Only one running
......@@ -121,10 +117,10 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
ApplicationId id = app.getApplicationId();
yc.killApplication(id);
while(yc.getApplications(EnumSet.of(YarnApplicationState.KILLED)).size() == 0) {
while (yc.getApplications(EnumSet.of(YarnApplicationState.KILLED)).size() == 0) {
sleep(500);
}
} catch(Throwable t) {
} catch (Throwable t) {
LOG.warn("Killing failed", t);
Assert.fail();
} finally {
......@@ -140,7 +136,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
try {
File yarnPropertiesFile = FlinkYarnSessionCli.getYarnPropertiesLocation(GlobalConfiguration.loadConfiguration());
if(yarnPropertiesFile.exists()) {
if (yarnPropertiesFile.exists()) {
LOG.info("testDetachedPerJobYarnClusterInternal: Cleaning up temporary Yarn address reference: {}", yarnPropertiesFile.getAbsolutePath());
yarnPropertiesFile.delete();
}
......@@ -156,26 +152,25 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
/**
* Test querying the YARN cluster.
*
* This test validates through 666*2 cores in the "cluster".
* <p>This test validates through 666*2 cores in the "cluster".
*/
@Test
public void testQueryCluster() {
LOG.info("Starting testQueryCluster()");
runWithArgs(new String[] {"-q"}, "Summary: totalMemory 8192 totalCores 1332",null, RunTypes.YARN_SESSION, 0); // we have 666*2 cores.
runWithArgs(new String[] {"-q"}, "Summary: totalMemory 8192 totalCores 1332", null, RunTypes.YARN_SESSION, 0); // we have 666*2 cores.
LOG.info("Finished testQueryCluster()");
}
/**
* The test cluster has the following resources:
* - 2 Nodes with 4096 MB each.
* - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512
*
* We allocate:
* <p>We allocate:
* 1 JobManager with 256 MB (will be automatically upgraded to 512 due to min alloc mb)
* 5 TaskManagers with 1585 MB
*
* user sees a total request of: 8181 MB (fits)
* <p>user sees a total request of: 8181 MB (fits)
* system sees a total request of: 8437 (doesn't fit due to min alloc mb)
*/
@Ignore("The test is too resource consuming (8.5 GB of memory)")
......@@ -196,15 +191,15 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
* - 2 Nodes with 4096 MB each.
* - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512
*
* We allocate:
* <p>We allocate:
* 1 JobManager with 256 MB (will be automatically upgraded to 512 due to min alloc mb)
* 2 TaskManagers with 3840 MB
*
* the user sees a total request of: 7936 MB (fits)
* <p>the user sees a total request of: 7936 MB (fits)
* the system sees a request of: 8192 MB (fits)
* HOWEVER: one machine is going to need 3840 + 512 = 4352 MB, which doesn't fit.
*
* --> check if the system properly rejects allocating this session.
* <p>--> check if the system properly rejects allocating this session.
*/
@Ignore("The test is too resource consuming (8 GB of memory)")
@Test
......@@ -221,11 +216,11 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
}
/**
* Test the YARN Java API
* Test the YARN Java API.
*/
@Test
public void testJavaAPI() throws Exception {
final int WAIT_TIME = 15;
final int waitTime = 15;
LOG.info("Starting testJavaAPI()");
AbstractYarnClusterDescriptor flinkYarnClient = new YarnClusterDescriptor();
......@@ -246,23 +241,23 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
yarnCluster = flinkYarnClient.deploy();
} catch (Exception e) {
LOG.warn("Failing test", e);
Assert.fail("Error while deploying YARN cluster: "+e.getMessage());
Assert.fail("Error while deploying YARN cluster: " + e.getMessage());
}
GetClusterStatusResponse expectedStatus = new GetClusterStatusResponse(1, 1);
for(int second = 0; second < WAIT_TIME * 2; second++) { // run "forever"
for (int second = 0; second < waitTime * 2; second++) { // run "forever"
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOG.warn("Interrupted", e);
}
GetClusterStatusResponse status = yarnCluster.getClusterStatus();
if(status != null && status.equals(expectedStatus)) {
if (status != null && status.equals(expectedStatus)) {
LOG.info("ClusterClient reached status " + status);
break; // all good, cluster started
}
if(second > WAIT_TIME) {
if (second > waitTime) {
// we waited for 15 seconds. cluster didn't come up correctly
Assert.fail("The custer didn't start after " + WAIT_TIME + " seconds");
Assert.fail("The custer didn't start after " + waitTime + " seconds");
}
}
......
......@@ -23,6 +23,7 @@ import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.test.util.SecureTestEnvironment;
import org.apache.flink.test.util.TestingSecurityContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
......@@ -33,6 +34,9 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
/**
* An extension of the {@link YARNSessionFIFOITCase} that runs the tests in a secured YARN cluster.
*/
public class YARNSessionFIFOSecuredITCase extends YARNSessionFIFOITCase {
protected static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOSecuredITCase.class);
......@@ -42,14 +46,14 @@ public class YARNSessionFIFOSecuredITCase extends YARNSessionFIFOITCase {
LOG.info("starting secure cluster environment for testing");
yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class);
yarnConfiguration.setInt(YarnConfiguration.NM_PMEM_MB, 768);
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-fifo-secured");
YARN_CONFIGURATION.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class);
YARN_CONFIGURATION.setInt(YarnConfiguration.NM_PMEM_MB, 768);
YARN_CONFIGURATION.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-fifo-secured");
SecureTestEnvironment.prepare(tmp);
populateYarnSecureConfigurations(yarnConfiguration, SecureTestEnvironment.getHadoopServicePrincipal(),
populateYarnSecureConfigurations(YARN_CONFIGURATION, SecureTestEnvironment.getHadoopServicePrincipal(),
SecureTestEnvironment.getTestKeytab());
Configuration flinkConfig = new Configuration();
......@@ -59,20 +63,20 @@ public class YARNSessionFIFOSecuredITCase extends YARNSessionFIFOITCase {
SecureTestEnvironment.getHadoopServicePrincipal());
SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig,
yarnConfiguration);
YARN_CONFIGURATION);
try {
TestingSecurityContext.install(ctx, SecureTestEnvironment.getClientSecurityConfigurationMap());
SecurityUtils.getInstalledContext().runSecured(new Callable<Object>() {
@Override
public Integer call() {
startYARNSecureMode(yarnConfiguration, SecureTestEnvironment.getHadoopServicePrincipal(),
startYARNSecureMode(YARN_CONFIGURATION, SecureTestEnvironment.getHadoopServicePrincipal(),
SecureTestEnvironment.getTestKeytab());
return null;
}
});
} catch(Exception e) {
} catch (Exception e) {
throw new RuntimeException("Exception occurred while setting up secure test context. Reason: {}", e);
}
......
......@@ -15,11 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.yarn;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Rule;
......@@ -34,13 +36,16 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Tests for the YarnClusterDescriptor.
*/
public class YarnClusterDescriptorTest {
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
/**
* Tests to ship a lib folder through the {@code YarnClusterDescriptor.addShipFiles}
* Tests to ship a lib folder through the {@code YarnClusterDescriptor.addShipFiles}.
*/
@Test
public void testExplicitLibShipping() throws Exception {
......@@ -77,7 +82,7 @@ public class YarnClusterDescriptorTest {
}
/**
* Tests to ship a lib folder through the {@code ConfigConstants.ENV_FLINK_LIB_DIR}
* Tests to ship a lib folder through the {@code ConfigConstants.ENV_FLINK_LIB_DIR}.
*/
@Test
public void testEnvironmentLibShipping() throws Exception {
......
......@@ -22,7 +22,6 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID
import org.apache.flink.runtime.highavailability.HighAvailabilityServices
import org.apache.flink.runtime.io.disk.iomanager.IOManager
import org.apache.flink.runtime.io.network.NetworkEnvironment
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
import org.apache.flink.runtime.memory.MemoryManager
import org.apache.flink.runtime.metrics.MetricRegistry
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
......@@ -79,4 +78,3 @@ class TestingYarnTaskManager(
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册