提交 3ef4e68b 编写于 作者: R Robert Metzger

[FLINK-1920] Properly pass command line arguments with spaces to Flink

This closes #689
上级 e0d1fd50
......@@ -723,13 +723,7 @@ public class CliFrontend {
/**
*
* @param options
* @param classLoader
* @param programName
* @param userParallelism The parallelism requested by the user in the CLI frontend.
* @return
* @throws Exception
*/
protected Client getClient(CommandLineOptions options, ClassLoader classLoader, String programName, int userParallelism) throws Exception {
InetSocketAddress jobManagerAddress;
......
......@@ -36,4 +36,4 @@ export FLINK_ROOT_DIR
export FLINK_CONF_DIR
# Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems
$JAVA_RUN $JVM_ARGS "$log_setting" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.CliFrontend $*
$JAVA_RUN $JVM_ARGS "$log_setting" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.CliFrontend "$@"
......@@ -52,5 +52,5 @@ log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4
export FLINK_CONF_DIR
$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR $log_setting org.apache.flink.client.FlinkYarnSessionCli -ship $bin/../lib/ -j $FLINK_LIB_DIR/flink-dist*.jar $*
$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR $log_setting org.apache.flink.client.FlinkYarnSessionCli -ship $bin/../lib/ -j $FLINK_LIB_DIR/flink-dist*.jar "$@"
......@@ -68,6 +68,7 @@ import org.apache.flink.streaming.connectors.kafka.util.KafkaLocalSystemTime;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
......@@ -908,13 +909,13 @@ public class KafkaITCase {
while (!(t instanceof SuccessException)) {
if(t == null) {
LOG.warn("Test failed with exception", good);
Assert.fail("Test failed with: " + good.getMessage());
Assert.fail("Test failed with: " + StringUtils.stringifyException(good));
}
t = t.getCause();
if (limit++ == 20) {
LOG.warn("Test failed with exception", good);
Assert.fail("Test failed with: " + good.getMessage());
Assert.fail("Test failed with: " + StringUtils.stringifyException(good));
}
}
}
......
......@@ -46,6 +46,7 @@ import java.io.FileReader;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
......@@ -57,6 +58,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
......@@ -401,6 +403,41 @@ public class TestBaseUtils {
return configs;
}
// This code is taken from: http://stackoverflow.com/a/7201825/568695
// it changes the environment variables of this JVM. Use only for testing purposes!
@SuppressWarnings("unchecked")
public static void setEnv(Map<String, String> newenv) {
try {
Class<?> processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment");
Field theEnvironmentField = processEnvironmentClass.getDeclaredField("theEnvironment");
theEnvironmentField.setAccessible(true);
Map<String, String> env = (Map<String, String>) theEnvironmentField.get(null);
env.putAll(newenv);
Field theCaseInsensitiveEnvironmentField = processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
theCaseInsensitiveEnvironmentField.setAccessible(true);
Map<String, String> cienv = (Map<String, String>) theCaseInsensitiveEnvironmentField.get(null);
cienv.putAll(newenv);
} catch (NoSuchFieldException e) {
try {
Class[] classes = Collections.class.getDeclaredClasses();
Map<String, String> env = System.getenv();
for (Class cl : classes) {
if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
Field field = cl.getDeclaredField("m");
field.setAccessible(true);
Object obj = field.get(env);
Map<String, String> map = (Map<String, String>) obj;
map.clear();
map.putAll(newenv);
}
}
} catch (Exception e2) {
throw new RuntimeException(e2);
}
} catch (Exception e1) {
throw new RuntimeException(e1);
}
}
// --------------------------------------------------------------------------------------------
// File helper methods
// --------------------------------------------------------------------------------------------
......
......@@ -63,6 +63,7 @@ under the License.
<artifactId>flink-yarn</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
......
......@@ -21,6 +21,7 @@ package org.apache.flink.yarn;
import org.apache.commons.io.FileUtils;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.FlinkYarnSessionCli;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
......@@ -51,9 +52,7 @@ import java.io.FileWriter;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.PrintStream;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -111,41 +110,7 @@ public abstract class YarnTestBase {
yarnConfiguration.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 20000); // 20 seconds expiry (to ensure we properly heartbeat with YARN).
}
// This code is taken from: http://stackoverflow.com/a/7201825/568695
// it changes the environment variables of this JVM. Use only for testing purposes!
@SuppressWarnings("unchecked")
private static void setEnv(Map<String, String> newenv) {
try {
Class<?> processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment");
Field theEnvironmentField = processEnvironmentClass.getDeclaredField("theEnvironment");
theEnvironmentField.setAccessible(true);
Map<String, String> env = (Map<String, String>) theEnvironmentField.get(null);
env.putAll(newenv);
Field theCaseInsensitiveEnvironmentField = processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
theCaseInsensitiveEnvironmentField.setAccessible(true);
Map<String, String> cienv = (Map<String, String>) theCaseInsensitiveEnvironmentField.get(null);
cienv.putAll(newenv);
} catch (NoSuchFieldException e) {
try {
Class[] classes = Collections.class.getDeclaredClasses();
Map<String, String> env = System.getenv();
for (Class cl : classes) {
if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
Field field = cl.getDeclaredField("m");
field.setAccessible(true);
Object obj = field.get(env);
Map<String, String> map = (Map<String, String>) obj;
map.clear();
map.putAll(newenv);
}
}
} catch (Exception e2) {
throw new RuntimeException(e2);
}
} catch (Exception e1) {
throw new RuntimeException(e1);
}
}
/**
* Sleep a bit between the tests (we are re-using the YARN cluster for the tests)
......@@ -366,7 +331,7 @@ public abstract class YarnTestBase {
File yarnConfFile = writeYarnSiteConfigXML(conf);
map.put("YARN_CONF_DIR", yarnConfFile.getParentFile().getAbsolutePath());
map.put("IN_TESTS", "yes we are in tests"); // see FlinkYarnClient() for more infos
setEnv(map);
TestBaseUtils.setEnv(map);
Assert.assertTrue(yarnCluster.getServiceState() == Service.STATE.STARTED);
} catch (Exception ex) {
......
......@@ -56,6 +56,13 @@ under the License.
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_${scala.binary.version}</artifactId>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.yarn;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.FlinkYarnSessionCli;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
import org.apache.flink.test.util.TestBaseUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class FlinkYarnSessionCliTest {
@Rule
public TemporaryFolder tmp = new TemporaryFolder();
@Test
public void testDynamicProperties() throws IOException {
Map<String, String> map = new HashMap<String, String>(System.getenv());
File tmpFolder = tmp.newFolder();
File fakeConf = new File(tmpFolder, "flink-conf.yaml");
fakeConf.createNewFile();
map.put("FLINK_CONF_DIR", tmpFolder.getAbsolutePath());
TestBaseUtils.setEnv(map);
Options options = new Options();
FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "");
cli.getYARNSessionCLIOptions(options);
CommandLineParser parser = new PosixParser();
CommandLine cmd = null;
try {
cmd = parser.parse(options, new String[]{"run", "-j", "fake.jar", "-n", "15", "-D", "akka.ask.timeout=5 min"});
} catch(Exception e) {
e.printStackTrace();
Assert.fail("Parsing failed with "+e.getMessage());
}
AbstractFlinkYarnClient flinkYarnClient = cli.createFlinkYarnClient(cmd);
Assert.assertNotNull(flinkYarnClient);
List<Tuple2<String, String>> dynProperties = CliFrontend.getDynamicProperties(flinkYarnClient.getDynamicPropertiesEncoded());
Assert.assertEquals(1, dynProperties.size());
Assert.assertEquals("akka.ask.timeout", dynProperties.get(0).f0);
Assert.assertEquals("5 min", dynProperties.get(0).f1);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册