提交 082061da 编写于 作者: R Robert Metzger

[FLINK-11086][yarn] Use YARN_APPLICATION_CLASSPATH instead of flink-shaded-hadoop fat jar in tests

上级 90fbb944
......@@ -379,39 +379,53 @@ under the License.
<goals>
<goal>copy</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
<type>jar</type>
<classifier>WordCount</classifier>
<overWrite>true</overWrite>
<destFileName>BatchWordCount.jar</destFileName>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-examples-streaming_${scala.binary.version}</artifactId>
<type>jar</type>
<classifier>WordCount</classifier>
<overWrite>true</overWrite>
<destFileName>StreamingWordCount.jar</destFileName>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-examples-streaming_${scala.binary.version}</artifactId>
<type>jar</type>
<classifier>WindowJoin</classifier>
<overWrite>true</overWrite>
<destFileName>WindowJoin.jar</destFileName>
</artifactItem>
</artifactItems>
<outputDirectory>${project.build.directory}/programs</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>true</overWriteSnapshots>
</configuration>
</execution>
<!-- Write classpath of flink-yarn to a file, so that the yarn tests can use it as their classpath
for the YARN "containers".
-->
<execution>
<id>store-classpath-in-target-for-tests</id>
<phase>package</phase>
<goals>
<goal>build-classpath</goal>
</goals>
<configuration>
<outputFile>${project.build.directory}/yarn.classpath</outputFile>
<excludeGroupIds>org.apache.flink</excludeGroupIds>
</configuration>
</execution>
</executions>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
<type>jar</type>
<classifier>WordCount</classifier>
<overWrite>true</overWrite>
<destFileName>BatchWordCount.jar</destFileName>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-examples-streaming_${scala.binary.version}</artifactId>
<type>jar</type>
<classifier>WordCount</classifier>
<overWrite>true</overWrite>
<destFileName>StreamingWordCount.jar</destFileName>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-examples-streaming_${scala.binary.version}</artifactId>
<type>jar</type>
<classifier>WindowJoin</classifier>
<overWrite>true</overWrite>
<destFileName>WindowJoin.jar</destFileName>
</artifactItem>
</artifactItems>
<outputDirectory>${project.build.directory}/programs</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>true</overWriteSnapshots>
</configuration>
</plugin>
</plugins>
</build>
......
......@@ -83,7 +83,6 @@ public class YARNFileReplicationITCase extends YarnTestBase {
yarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));
final int masterMemory = yarnClusterDescriptor.getFlinkConfiguration().get(JobManagerOptions.TOTAL_PROCESS_MEMORY).getMebiBytes();
final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
......
......@@ -74,7 +74,6 @@ import javax.annotation.Nonnull;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
......@@ -160,8 +159,6 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
OperatingSystem.isLinux() || OperatingSystem.isMac() || OperatingSystem.isFreeBSD() || OperatingSystem.isSolaris());
final YarnClusterDescriptor yarnClusterDescriptor = setupYarnClusterDescriptor();
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));
final RestClusterClient<ApplicationId> restClusterClient = deploySessionCluster(yarnClusterDescriptor);
try {
......@@ -186,8 +183,6 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
public void testJobRecoversAfterKillingTaskManager() throws Exception {
runTest(() -> {
final YarnClusterDescriptor yarnClusterDescriptor = setupYarnClusterDescriptor();
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));
final RestClusterClient<ApplicationId> restClusterClient = deploySessionCluster(yarnClusterDescriptor);
try {
final JobID jobId = submitJob(restClusterClient);
......
......@@ -155,9 +155,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
public void testStartYarnSessionClusterInQaTeamQueue() throws Exception {
runTest(() -> runWithArgs(new String[]{
"-j", flinkUberjar.getAbsolutePath(),
"-t", flinkLibFolder.getAbsolutePath(),
"-t", flinkShadedHadoopDir.getAbsolutePath(),
"-jm", "768m",
"-t", flinkLibFolder.getAbsolutePath(), "-jm", "768m",
"-tm", "1024m", "-qu", "qa-team"},
"JobManager Web Interface:", null, RunTypes.YARN_SESSION, 0));
}
......@@ -177,7 +175,6 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
runWithArgs(new String[]{"run", "-m", "yarn-cluster",
"-yj", flinkUberjar.getAbsolutePath(),
"-yt", flinkLibFolder.getAbsolutePath(),
"-yt", flinkShadedHadoopDir.getAbsolutePath(),
"-ys", "2", //test that the job is executed with a DOP of 2
"-yjm", "768m",
"-ytm", "1024m", exampleJarLocation.getAbsolutePath()},
......@@ -213,7 +210,6 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
runWithArgs(new String[]{"run", "-m", "yarn-cluster",
"-yj", flinkUberjar.getAbsolutePath(),
"-yt", flinkLibFolder.getAbsolutePath(),
"-yt", flinkShadedHadoopDir.getAbsolutePath(),
"-ys", "2", //test that the job is executed with a DOP of 2
"-yjm", "768m",
"-ytm", taskManagerMemoryMB + "m",
......@@ -251,7 +247,6 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
final Runner yarnSessionClusterRunner = startWithArgs(new String[]{
"-j", flinkUberjar.getAbsolutePath(),
"-t", flinkLibFolder.getAbsolutePath(),
"-t", flinkShadedHadoopDir.getAbsolutePath(),
"-jm", "768m",
"-tm", "1024m",
"-s", "3", // set the slots 3 to check if the vCores are set properly!
......@@ -393,7 +388,6 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
try {
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
"-t", flinkLibFolder.getAbsolutePath(),
"-t", flinkShadedHadoopDir.getAbsolutePath(),
"-jm", "768m",
"-tm", "1024m",
"-qu", "doesntExist"}, "to unknown queue: doesntExist", null, RunTypes.YARN_SESSION, 1);
......@@ -420,7 +414,6 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
"-m", "yarn-cluster",
"-yj", flinkUberjar.getAbsolutePath(),
"-yt", flinkLibFolder.getAbsolutePath(),
"-yt", flinkShadedHadoopDir.getAbsolutePath(),
"-ys", "2",
"-yjm", "768m",
"-ytm", "1024m", exampleJarLocation.getAbsolutePath()},
......@@ -495,7 +488,6 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
"run", "-m", "yarn-cluster",
"-yj", flinkUberjar.getAbsolutePath(),
"-yt", flinkLibFolder.getAbsolutePath(),
"-yt", flinkShadedHadoopDir.getAbsolutePath(),
"-yjm", "768m",
"-yD", YarnConfigOptions.APPLICATION_TAGS.key() + "=test-tag",
"-ytm", "1024m",
......
......@@ -107,9 +107,6 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
args.add("-t");
args.add(flinkLibFolder.getAbsolutePath());
args.add("-t");
args.add(flinkShadedHadoopDir.getAbsolutePath());
args.add("-jm");
args.add("768m");
......@@ -247,7 +244,6 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
runWithArgs(new String[]{
"-j", flinkUberjar.getAbsolutePath(),
"-t", flinkLibFolder.getAbsolutePath(),
"-t", flinkShadedHadoopDir.getAbsolutePath(),
"-jm", "256m",
"-tm", "1585m"}, "Number of connected TaskManagers changed to", null, RunTypes.YARN_SESSION, 0);
LOG.info("Finished testResourceComputation()");
......@@ -280,7 +276,6 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
runWithArgs(new String[]{
"-j", flinkUberjar.getAbsolutePath(),
"-t", flinkLibFolder.getAbsolutePath(),
"-t", flinkShadedHadoopDir.getAbsolutePath(),
"-jm", "256m",
"-tm", "3840m"}, "Number of connected TaskManagers changed to", null, RunTypes.YARN_SESSION, 0);
LOG.info("Finished testfullAlloc()");
......
......@@ -102,7 +102,6 @@ public class YarnConfigurationITCase extends YarnTestBase {
clusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
clusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
clusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));
final File streamingWordCountFile = getTestJarPath("WindowJoin.jar");
......
......@@ -52,7 +52,6 @@ public class YarnPrioritySchedulingITCase extends YarnTestBase {
final Runner yarnSessionClusterRunner = startWithArgs(new String[]{
"-j", flinkUberjar.getAbsolutePath(),
"-t", flinkLibFolder.getAbsolutePath(),
"-t", flinkShadedHadoopDir.getAbsolutePath(),
"-jm", "768m",
"-tm", "1024m",
"-Dyarn.application.priority=" + priority},
......
......@@ -74,7 +74,6 @@ import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.PrintStream;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
......@@ -172,7 +171,6 @@ public abstract class YarnTestBase extends TestLogger {
* Temporary folder where Flink configurations will be kept for secure run.
*/
protected static File tempConfPathForSecureRun = null;
protected static File flinkShadedHadoopDir;
protected static File yarnSiteXML = null;
protected static File hdfsSiteXML = null;
......@@ -197,6 +195,24 @@ public abstract class YarnTestBase extends TestLogger {
// so we have to change the number of cores for testing.
YARN_CONFIGURATION.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 20000); // 20 seconds expiry (to ensure we properly heartbeat with YARN).
YARN_CONFIGURATION.setFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, 99.0F);
YARN_CONFIGURATION.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH, getYarnClasspath());
}
/**
* Searches for the yarn.classpath file generated by the "dependency:build-classpath" maven plugin in
* "flink-yarn".
* @return a classpath suitable for running all YARN-launched JVMs
*/
private static String getYarnClasspath() {
final String start = "../flink-yarn-tests";
try {
File classPathFile = findFile(start, (dir, name) -> name.equals("yarn.classpath"));
return FileUtils.readFileToString(classPathFile); // potential NPE is supposed to be fatal
} catch (Throwable t) {
LOG.error("Error while getting YARN classpath in {}", new File(start).getAbsoluteFile(), t);
throw new RuntimeException("Error while getting YARN classpath", t);
}
}
public static void populateYarnSecureConfigurations(Configuration conf, String principal, String keytab) {
......@@ -431,7 +447,7 @@ public abstract class YarnTestBase extends TestLogger {
if (!whitelistedFound) {
// logging in FATAL to see the actual message in TRAVIS tests.
Marker fatal = MarkerFactory.getMarker("FATAL");
LOG.error(fatal, "Prohibited String '{}' in line '{}'", aProhibited, lineFromFile);
LOG.error(fatal, "Prohibited String '{}' in '{}:{}'", aProhibited, f.getAbsolutePath(), lineFromFile);
StringBuilder logExcerpt = new StringBuilder();
......@@ -637,8 +653,6 @@ public abstract class YarnTestBase extends TestLogger {
Assert.assertNotNull("Flink uberjar not found", flinkUberjar);
String flinkDistRootDir = flinkUberjar.getParentFile().getParent();
flinkLibFolder = flinkUberjar.getParentFile(); // the uberjar is located in lib/
// the hadoop jar was copied into the target/shaded-hadoop directory during the build
flinkShadedHadoopDir = Paths.get("target/shaded-hadoop").toFile();
Assert.assertNotNull("Flink flinkLibFolder not found", flinkLibFolder);
Assert.assertTrue("lib folder not found", flinkLibFolder.exists());
Assert.assertTrue("lib folder not found", flinkLibFolder.isDirectory());
......
......@@ -29,7 +29,9 @@ import org.apache.flink.testutils.junit.RetryRule;
import org.apache.flink.testutils.s3.S3TestCredentials;
import org.apache.flink.util.TestLogger;
import org.apache.hadoop.util.VersionUtil;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
......@@ -174,6 +176,9 @@ public class YarnFileStageTestS3ITCase extends TestLogger {
@Test
@RetryOnFailure(times = 3)
public void testRecursiveUploadForYarnS3n() throws Exception {
// skip test on Hadoop 3: https://issues.apache.org/jira/browse/HADOOP-14738
Assume.assumeTrue("This test is skipped for Hadoop versions above 3", VersionUtil.compareVersions(System.getProperty("hadoop.version"), "3.0.0") < 0);
try {
Class.forName("org.apache.hadoop.fs.s3native.NativeS3FileSystem");
} catch (ClassNotFoundException e) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册