提交 2b8db40a 编写于 作者: S Stephan Ewen

[FLINK-1918] [client] Fix misleading NullPointerException in case of unresolvable host names

上级 b7043123
......@@ -124,18 +124,26 @@ public class RemoteExecutor extends PlanExecutor {
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
public static InetSocketAddress getInetFromHostport(String hostport) {
/**
* Utility method that converts a string of the form "host:port" into an {@link InetSocketAddress}.
* The returned InetSocketAddress may be unresolved!
*
* @param hostport The "host:port" string.
* @return The converted InetSocketAddress.
*/
private static InetSocketAddress getInetFromHostport(String hostport) {
// from http://stackoverflow.com/questions/2345063/java-common-way-to-validate-and-convert-hostport-to-inetsocketaddress
URI uri;
try {
uri = new URI("my://" + hostport);
} catch (URISyntaxException e) {
throw new RuntimeException("Could not identify hostname and port", e);
throw new RuntimeException("Could not identify hostname and port in '" + hostport + "'.", e);
}
String host = uri.getHost();
int port = uri.getPort();
if (host == null || port == -1) {
throw new RuntimeException("Could not identify hostname and port");
throw new RuntimeException("Could not identify hostname and port in '" + hostport + "'.");
}
return new InetSocketAddress(host, port);
}
......
......@@ -22,11 +22,14 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.List;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
......@@ -43,7 +46,6 @@ import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
......@@ -65,12 +67,20 @@ import com.google.common.base.Preconditions;
public class Client {
private static final Logger LOG = LoggerFactory.getLogger(Client.class);
/** The configuration to use for the client (optimizer, timeouts, ...) */
private final Configuration configuration;
/** The address of the JobManager to send the program to */
private final InetSocketAddress jobManagerAddress;
/** The optimizer used in the optimization of batch programs */
private final Optimizer compiler;
/** The class loader to use for classes from the user program (e.g., functions and data types) */
private final ClassLoader userCodeClassLoader;
private final Configuration configuration; // the configuration describing the job manager address
private final Optimizer compiler; // the compiler to compile the jobs
/** Flag indicating whether to sysout print execution updates */
private boolean printStatusDuringExecution = true;
/**
......@@ -79,12 +89,9 @@ public class Client {
*/
private int maxSlots = -1;
/**
* ID of the last job submitted with this client.
*/
/** ID of the last job submitted with this client. */
private JobID lastJobId = null;
private ClassLoader userCodeClassLoader;
// ------------------------------------------------------------------------
// Construction
......@@ -96,13 +103,30 @@ public class Client {
*
* @param jobManagerAddress Address and port of the job-manager.
*/
public Client(InetSocketAddress jobManagerAddress, Configuration config, ClassLoader userCodeClassLoader, int maxSlots) {
public Client(InetSocketAddress jobManagerAddress, Configuration config,
ClassLoader userCodeClassLoader, int maxSlots) throws UnknownHostException
{
Preconditions.checkNotNull(jobManagerAddress, "JobManager address is null");
Preconditions.checkNotNull(config, "Configuration is null");
Preconditions.checkNotNull(userCodeClassLoader, "User code ClassLoader is null");
this.configuration = config;
// using the host string instead of the host name saves a reverse name lookup
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerAddress.getAddress().getHostAddress());
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerAddress.getPort());
if (jobManagerAddress.isUnresolved()) {
// address is unresolved, resolve it
String host = jobManagerAddress.getHostString();
try {
InetAddress address = InetAddress.getByName(host);
this.jobManagerAddress = new InetSocketAddress(address, jobManagerAddress.getPort());
}
catch (UnknownHostException e) {
throw new UnknownHostException("Cannot resolve JobManager host name '" + host + "'.");
}
}
else {
// address is already resolved, use it as is
this.jobManagerAddress = jobManagerAddress;
}
this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration);
this.userCodeClassLoader = userCodeClassLoader;
......@@ -110,42 +134,55 @@ public class Client {
}
/**
* Creates a instance that submits the programs to the job-manager defined in the
* configuration.
* Creates a instance that submits the programs to the JobManager defined in the
* configuration. This method will try to resolve the JobManager hostname and throw an exception
* if that is not possible.
*
* @param config The config used to obtain the job-manager's address.
* @param userCodeClassLoader The class loader to use for loading user code classes.
*/
public Client(Configuration config, ClassLoader userCodeClassLoader) {
public Client(Configuration config, ClassLoader userCodeClassLoader) throws UnknownHostException {
Preconditions.checkNotNull(config, "Configuration is null");
Preconditions.checkNotNull(userCodeClassLoader, "User code ClassLoader is null");
this.configuration = config;
this.userCodeClassLoader = userCodeClassLoader;
// instantiate the address to the job manager
final String address = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
if (address == null) {
throw new CompilerException("Cannot find address to job manager's RPC service in the global configuration.");
throw new IllegalConfigurationException(
"Cannot find address to job manager's RPC service in the global configuration.");
}
final int port = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
final int port = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
if (port < 0) {
throw new CompilerException("Cannot find port to job manager's RPC service in the global configuration.");
throw new IllegalConfigurationException("Cannot find port to job manager's RPC service in the global configuration.");
}
try {
InetAddress inetAddress = InetAddress.getByName(address);
this.jobManagerAddress = new InetSocketAddress(inetAddress, port);
}
catch (UnknownHostException e) {
throw new UnknownHostException("Cannot resolve the JobManager hostname '" + address
+ "' specified in the configuration");
}
this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration);
this.userCodeClassLoader = userCodeClassLoader;
}
/**
* Configures whether the client should print progress updates during the execution to {@code System.out}.
* All updates are logged via the SLF4J loggers regardless of this setting.
*
* @param print True to print updates to standard out during execution, false to not print them.
*/
public void setPrintStatusDuringExecution(boolean print) {
this.printStatusDuringExecution = print;
}
public String getJobManagerAddress() {
return this.configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
}
public int getJobManagerPort() {
return this.configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
}
/**
* @return -1 if unknown. The maximum number of available processing slots at the Flink cluster
* connected to this client.
......@@ -316,14 +353,7 @@ public class Client {
public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramInvocationException {
this.lastJobId = jobGraph.getJobID();
InetSocketAddress jobManagerAddress;
try {
jobManagerAddress = JobClient.getJobManagerAddress(configuration);
}
catch (IOException e) {
throw new ProgramInvocationException(e.getMessage(), e);
}
LOG.info("JobManager actor system address is " + jobManagerAddress);
LOG.info("Starting client actor system");
......
......@@ -294,25 +294,23 @@ public class CliFrontendPackageProgramTest {
assertArrayEquals(progArgs, prog.getArguments());
Configuration c = new Configuration();
c.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "devil");
c.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
Client cli = new Client(c, getClass().getClassLoader());
// we expect this to fail with a "ClassNotFoundException"
cli.getOptimizedPlanAsJson(prog, 666);
fail("Should have failed with a ClassNotFoundException");
}
catch (ProgramInvocationException pie) {
assertTrue("Classloader was not called", callme[0]);
// class not found exception is expected as some point
if( ! ( pie.getCause() instanceof ClassNotFoundException ) ) {
System.err.println(pie.getMessage());
pie.printStackTrace();
fail("Program caused an exception: " + pie.getMessage());
catch (ProgramInvocationException e) {
if (!(e.getCause() instanceof ClassNotFoundException)) {
e.printStackTrace();
fail("Program didn't throw ClassNotFoundException");
}
assertTrue("Classloader was not called", callme[0]);
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
assertTrue("Classloader was not called", callme[0]);
fail("Program caused an exception: " + e.getMessage());
fail("Program failed with the wrong exception: " + e.getClass().getName());
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.client;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.junit.Test;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import static org.junit.Assert.fail;
public class RemoteExecutorHostnameResolutionTest {
private static final String nonExistingHostname = "foo.bar.com.invalid";
private static final int port = 14451;
@Test
public void testUnresolvableHostname1() {
try {
RemoteExecutor exec = new RemoteExecutor(nonExistingHostname, port);
exec.executePlan(getProgram());
fail("This should fail with an UnknownHostException");
}
catch (UnknownHostException e) {
// that is what we want!
}
catch (Exception e) {
System.err.println("Wrong exception!");
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testUnresolvableHostname2() {
try {
InetSocketAddress add = new InetSocketAddress(nonExistingHostname, port);
RemoteExecutor exec = new RemoteExecutor(add, Collections.<String>emptyList());
exec.executePlan(getProgram());
fail("This should fail with an UnknownHostException");
}
catch (UnknownHostException e) {
// that is what we want!
}
catch (Exception e) {
System.err.println("Wrong exception!");
e.printStackTrace();
fail(e.getMessage());
}
}
private static Plan getProgram() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1, 2, 3).output(new DiscardingOutputFormat<Integer>());
return env.createProgramPlan();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.client.program;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.junit.Test;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import static org.junit.Assert.*;
/**
* Tests that verify that the client correctly handles non-resolvable host names and does not
* fail with another exception
*/
public class ClientHostnameResolutionTest {
private static final String nonExistingHostname = "foo.bar.com.invalid";
@Test
public void testUnresolvableHostname1() {
try {
InetSocketAddress addr = new InetSocketAddress(nonExistingHostname, 17234);
new Client(addr, new Configuration(), getClass().getClassLoader(), 1);
fail("This should fail with an UnknownHostException");
}
catch (UnknownHostException e) {
// that is what we want!
}
catch (Exception e) {
System.err.println("Wrong exception!");
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testUnresolvableHostname2() {
try {
Configuration config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, nonExistingHostname);
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);
new Client(config, getClass().getClassLoader());
fail("This should fail with an UnknownHostException");
}
catch (UnknownHostException e) {
// that is what we want!
}
catch (Exception e) {
System.err.println("Wrong exception!");
e.printStackTrace();
fail(e.getMessage());
}
}
}
......@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.environment;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
......@@ -37,9 +38,9 @@ import org.slf4j.LoggerFactory;
public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamEnvironment.class);
private String host;
private int port;
private List<File> jarFiles;
private final String host;
private final int port;
private final List<File> jarFiles;
/**
* Creates a new RemoteStreamEnvironment that points to the master
......@@ -82,14 +83,14 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
}
@Override
public JobExecutionResult execute() {
public JobExecutionResult execute() throws ProgramInvocationException {
JobGraph jobGraph = streamGraph.getJobGraph();
return executeRemotely(jobGraph);
}
@Override
public JobExecutionResult execute(String jobName) {
public JobExecutionResult execute(String jobName) throws ProgramInvocationException {
JobGraph jobGraph = streamGraph.getJobGraph(jobName);
return executeRemotely(jobGraph);
......@@ -102,7 +103,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
* jobGraph to execute
* @return The result of the job execution, containing elapsed time and accumulators.
*/
private JobExecutionResult executeRemotely(JobGraph jobGraph) {
private JobExecutionResult executeRemotely(JobGraph jobGraph) throws ProgramInvocationException {
if (LOG.isInfoEnabled()) {
LOG.info("Running remotely at {}:{}", host, port);
}
......@@ -112,20 +113,29 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
}
Configuration configuration = jobGraph.getJobConfiguration();
Client client = new Client(new InetSocketAddress(host, port), configuration,
JobWithJars.buildUserCodeClassLoader(jarFiles, JobWithJars.class.getClassLoader()), -1);
client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, getClass().getClassLoader());
try {
Client client = new Client(new InetSocketAddress(host, port), configuration, usercodeClassLoader, -1);
client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
JobSubmissionResult result = client.run(jobGraph, true);
if(result instanceof JobExecutionResult) {
if (result instanceof JobExecutionResult) {
return (JobExecutionResult) result;
} else {
LOG.warn("The Client didn't return a JobExecutionResult");
return new JobExecutionResult(result.getJobID(), -1, null);
}
} catch (ProgramInvocationException e) {
throw new RuntimeException("Cannot execute job due to ProgramInvocationException", e);
}
catch (ProgramInvocationException e) {
throw e;
}
catch (UnknownHostException e) {
throw new ProgramInvocationException(e.getMessage(), e);
}
catch (Exception e) {
String term = e.getMessage() == null ? "." : (": " + e.getMessage());
throw new ProgramInvocationException("The program execution failed" + term, e);
}
}
......
......@@ -43,6 +43,7 @@ import static org.junit.Assert.assertEquals;
/**
* Test ExecutionEnvironment from user perspective
*/
@SuppressWarnings("serial")
@RunWith(Parameterized.class)
public class ExecutionEnvironmentITCase extends MultipleProgramsTestBase {
private static final int PARALLELISM = 5;
......@@ -66,8 +67,10 @@ public class ExecutionEnvironmentITCase extends MultipleProgramsTestBase {
public void testLocalEnvironmentWithConfig() throws Exception {
Configuration conf = new Configuration();
conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
env.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
env.getConfig().disableSysoutLogging();
DataSet<Integer> result = env.createInput(new ParallelismDependentInputFormat())
.rebalance()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册