diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 6ca8c4d08da555a2939946054a5620f5c4a27fdd..3e61a3b3b76449a0b6142de6669866743534593c 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -723,13 +723,7 @@ public class CliFrontend { /** - * - * @param options - * @param classLoader - * @param programName * @param userParallelism The parallelism requested by the user in the CLI frontend. - * @return - * @throws Exception */ protected Client getClient(CommandLineOptions options, ClassLoader classLoader, String programName, int userParallelism) throws Exception { InetSocketAddress jobManagerAddress; diff --git a/flink-dist/src/main/flink-bin/bin/flink b/flink-dist/src/main/flink-bin/bin/flink index d28c04fdfefc828011ddc82abff4dd279e92a714..842dd9817ed82b60c140b2c653a30bae0b81ac0c 100644 --- a/flink-dist/src/main/flink-bin/bin/flink +++ b/flink-dist/src/main/flink-bin/bin/flink @@ -36,4 +36,4 @@ export FLINK_ROOT_DIR export FLINK_CONF_DIR # Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems -$JAVA_RUN $JVM_ARGS "$log_setting" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.CliFrontend $* +$JAVA_RUN $JVM_ARGS "$log_setting" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.CliFrontend "$@" diff --git a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh index 9bf3d0ccae414ff05fa85067bf5e0916864004e6..aa87c89e39e087d7c970c8839961b633ac3080db 100755 --- a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh +++ b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh @@ -52,5 +52,5 @@ log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4 export FLINK_CONF_DIR -$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR $log_setting org.apache.flink.client.FlinkYarnSessionCli -ship $bin/../lib/ -j $FLINK_LIB_DIR/flink-dist*.jar $* +$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR $log_setting org.apache.flink.client.FlinkYarnSessionCli -ship $bin/../lib/ -j $FLINK_LIB_DIR/flink-dist*.jar "$@" diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java index 76775ac1d8d9121c4aa419bc4de2535bf3faaa5c..7b7bdcca1ed071191fb1e0a70543d9103f38d564 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java @@ -68,6 +68,7 @@ import org.apache.flink.streaming.connectors.kafka.util.KafkaLocalSystemTime; import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema; import org.apache.flink.util.Collector; +import org.apache.flink.util.StringUtils; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -908,13 +909,13 @@ public class KafkaITCase { while (!(t instanceof SuccessException)) { if(t == null) { LOG.warn("Test failed with exception", good); - Assert.fail("Test failed with: " + good.getMessage()); + Assert.fail("Test failed with: " + StringUtils.stringifyException(good)); } t = t.getCause(); if (limit++ == 20) { LOG.warn("Test failed with exception", good); - Assert.fail("Test failed with: " + good.getMessage()); + Assert.fail("Test failed with: " + StringUtils.stringifyException(good)); } } } diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java index 9a38b4d1a22ef90166ac7488ecbd7cf06b3b2c2c..b0e4ae9495f9f936387f4c8d55535d3477639a5c 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java @@ -46,6 +46,7 @@ import java.io.FileReader; import java.io.FilenameFilter; import java.io.IOException; import java.io.InputStream; +import java.lang.reflect.Field; import java.net.HttpURLConnection; import java.net.URI; import java.net.URISyntaxException; @@ -57,6 +58,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -401,6 +403,41 @@ public class TestBaseUtils { return configs; } + // This code is taken from: http://stackoverflow.com/a/7201825/568695 + // it changes the environment variables of this JVM. Use only for testing purposes! + @SuppressWarnings("unchecked") + public static void setEnv(Map newenv) { + try { + Class processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment"); + Field theEnvironmentField = processEnvironmentClass.getDeclaredField("theEnvironment"); + theEnvironmentField.setAccessible(true); + Map env = (Map) theEnvironmentField.get(null); + env.putAll(newenv); + Field theCaseInsensitiveEnvironmentField = processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment"); + theCaseInsensitiveEnvironmentField.setAccessible(true); + Map cienv = (Map) theCaseInsensitiveEnvironmentField.get(null); + cienv.putAll(newenv); + } catch (NoSuchFieldException e) { + try { + Class[] classes = Collections.class.getDeclaredClasses(); + Map env = System.getenv(); + for (Class cl : classes) { + if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) { + Field field = cl.getDeclaredField("m"); + field.setAccessible(true); + Object obj = field.get(env); + Map map = (Map) obj; + map.clear(); + map.putAll(newenv); + } + } + } catch (Exception e2) { + throw new RuntimeException(e2); + } + } catch (Exception e1) { + throw new RuntimeException(e1); + } + } // -------------------------------------------------------------------------------------------- // File helper methods // -------------------------------------------------------------------------------------------- diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml index 2a6397830ebaf27d7d8b902f82f47c3078e381d7..fcc5b87af71db7439fe75dc701759ef87053958b 100644 --- a/flink-yarn-tests/pom.xml +++ b/flink-yarn-tests/pom.xml @@ -63,6 +63,7 @@ under the License. flink-yarn ${project.version} + org.apache.flink flink-test-utils diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java index 79507921589026e5ee21968214401e88facea428..b002e8fe5fb39c52a796d0f938a01df63a5b1dec 100644 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java @@ -21,6 +21,7 @@ package org.apache.flink.yarn; import org.apache.commons.io.FileUtils; import org.apache.flink.client.CliFrontend; import org.apache.flink.client.FlinkYarnSessionCli; +import org.apache.flink.test.util.TestBaseUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.ApplicationReport; @@ -51,9 +52,7 @@ import java.io.FileWriter; import java.io.FilenameFilter; import java.io.IOException; import java.io.PrintStream; -import java.lang.reflect.Field; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -111,41 +110,7 @@ public abstract class YarnTestBase { yarnConfiguration.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 20000); // 20 seconds expiry (to ensure we properly heartbeat with YARN). } - // This code is taken from: http://stackoverflow.com/a/7201825/568695 - // it changes the environment variables of this JVM. Use only for testing purposes! - @SuppressWarnings("unchecked") - private static void setEnv(Map newenv) { - try { - Class processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment"); - Field theEnvironmentField = processEnvironmentClass.getDeclaredField("theEnvironment"); - theEnvironmentField.setAccessible(true); - Map env = (Map) theEnvironmentField.get(null); - env.putAll(newenv); - Field theCaseInsensitiveEnvironmentField = processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment"); - theCaseInsensitiveEnvironmentField.setAccessible(true); - Map cienv = (Map) theCaseInsensitiveEnvironmentField.get(null); - cienv.putAll(newenv); - } catch (NoSuchFieldException e) { - try { - Class[] classes = Collections.class.getDeclaredClasses(); - Map env = System.getenv(); - for (Class cl : classes) { - if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) { - Field field = cl.getDeclaredField("m"); - field.setAccessible(true); - Object obj = field.get(env); - Map map = (Map) obj; - map.clear(); - map.putAll(newenv); - } - } - } catch (Exception e2) { - throw new RuntimeException(e2); - } - } catch (Exception e1) { - throw new RuntimeException(e1); - } - } + /** * Sleep a bit between the tests (we are re-using the YARN cluster for the tests) @@ -366,7 +331,7 @@ public abstract class YarnTestBase { File yarnConfFile = writeYarnSiteConfigXML(conf); map.put("YARN_CONF_DIR", yarnConfFile.getParentFile().getAbsolutePath()); map.put("IN_TESTS", "yes we are in tests"); // see FlinkYarnClient() for more infos - setEnv(map); + TestBaseUtils.setEnv(map); Assert.assertTrue(yarnCluster.getServiceState() == Service.STATE.STARTED); } catch (Exception ex) { diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml index 4f10f814ccb38526b0c86e776b2f7fba880ce17b..103c06e36af7db2d026ac1ddfb4236dd526b33c6 100644 --- a/flink-yarn/pom.xml +++ b/flink-yarn/pom.xml @@ -56,6 +56,13 @@ under the License. ${project.version} + + org.apache.flink + flink-test-utils + ${project.version} + + + com.typesafe.akka akka-actor_${scala.binary.version} diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java new file mode 100644 index 0000000000000000000000000000000000000000..7b22e16dc48744e2312bd306d4cafdde8d9e519a --- /dev/null +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.CliFrontend; +import org.apache.flink.client.FlinkYarnSessionCli; +import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; +import org.apache.flink.test.util.TestBaseUtils; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class FlinkYarnSessionCliTest { + + @Rule + public TemporaryFolder tmp = new TemporaryFolder(); + + @Test + public void testDynamicProperties() throws IOException { + + Map map = new HashMap(System.getenv()); + File tmpFolder = tmp.newFolder(); + File fakeConf = new File(tmpFolder, "flink-conf.yaml"); + fakeConf.createNewFile(); + map.put("FLINK_CONF_DIR", tmpFolder.getAbsolutePath()); + TestBaseUtils.setEnv(map); + Options options = new Options(); + FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); + cli.getYARNSessionCLIOptions(options); + + CommandLineParser parser = new PosixParser(); + CommandLine cmd = null; + try { + cmd = parser.parse(options, new String[]{"run", "-j", "fake.jar", "-n", "15", "-D", "akka.ask.timeout=5 min"}); + } catch(Exception e) { + e.printStackTrace(); + Assert.fail("Parsing failed with "+e.getMessage()); + } + + AbstractFlinkYarnClient flinkYarnClient = cli.createFlinkYarnClient(cmd); + + Assert.assertNotNull(flinkYarnClient); + + List> dynProperties = CliFrontend.getDynamicProperties(flinkYarnClient.getDynamicPropertiesEncoded()); + Assert.assertEquals(1, dynProperties.size()); + Assert.assertEquals("akka.ask.timeout", dynProperties.get(0).f0); + Assert.assertEquals("5 min", dynProperties.get(0).f1); + } + +}