提交 8d589623 编写于 作者: M Maximilian Michels

[FLINK-3667] delay connection to JobManager until job execution

- lazily initialize ActorSystem
- make sure it is not created before job execution
- print connection information on the CLI

This closes #2189
上级 b674fd57
......@@ -154,7 +154,6 @@ public class CliFrontend {
LOG.info("Trying to load configuration file");
GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
System.setProperty(ENV_CONFIG_DIRECTORY, configDirectory.getAbsolutePath());
this.config = GlobalConfiguration.getConfiguration();
try {
......@@ -234,7 +233,7 @@ public class CliFrontend {
ClusterClient client = null;
try {
client = getClient(options, program.getMainClassName());
client = createClient(options, program.getMainClassName());
client.setPrintStatusDuringExecution(options.getStdoutLogging());
client.setDetached(options.getDetachedMode());
LOG.debug("Client slots is set to {}", client.getMaxSlots());
......@@ -810,7 +809,7 @@ public class CliFrontend {
CustomCommandLine customCLI = getActiveCustomCommandLine(options.getCommandLine());
try {
ClusterClient client = customCLI.retrieveCluster(options.getCommandLine(), config);
LOG.info("Using address {} to connect to JobManager.", client.getJobManagerAddressFromConfig());
logAndSysout("Using address " + client.getJobManagerAddressFromConfig() + " to connect to JobManager.");
return client;
} catch (Exception e) {
LOG.error("Couldn't retrieve {} cluster.", customCLI.getId(), e);
......@@ -827,6 +826,7 @@ public class CliFrontend {
* @throws Exception
*/
protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws Exception {
logAndSysout("Retrieving JobManager.");
return retrieveClient(options).getJobManagerGateway();
}
......@@ -836,7 +836,7 @@ public class CliFrontend {
* @param programName Program name
* @throws Exception
*/
protected ClusterClient getClient(
protected ClusterClient createClient(
CommandLineOptions options,
String programName) throws Exception {
......@@ -846,12 +846,12 @@ public class CliFrontend {
ClusterClient client;
try {
client = activeCommandLine.retrieveCluster(options.getCommandLine(), config);
logAndSysout("Cluster retrieved");
logAndSysout("Cluster retrieved: " + client.getClusterIdentifier());
} catch (UnsupportedOperationException e) {
try {
String applicationName = "Flink Application: " + programName;
client = activeCommandLine.createCluster(applicationName, options.getCommandLine(), config);
logAndSysout("Cluster started");
logAndSysout("Cluster started: " + client.getClusterIdentifier());
} catch (UnsupportedOperationException e2) {
throw new IllegalConfigurationException(
"The JobManager address is neither provided at the command-line, " +
......@@ -859,7 +859,9 @@ public class CliFrontend {
}
}
logAndSysout("Using address " + client.getJobManagerAddress() + " to connect to JobManager.");
// Avoid resolving the JobManager Gateway here to prevent blocking until we invoke the user's program.
final InetSocketAddress jobManagerAddress = client.getJobManagerAddressFromConfig();
logAndSysout("Using address " + jobManagerAddress.getHostString() + ":" + jobManagerAddress.getPort() + " to connect to JobManager.");
logAndSysout("JobManager web interface address " + client.getWebInterfaceURL());
return client;
}
......@@ -1054,7 +1056,7 @@ public class CliFrontend {
* @param config The config to write to
*/
public static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) {
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.getHostName());
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.getHostString());
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.getPort());
}
......
......@@ -50,7 +50,7 @@ public class StandaloneClusterDescriptor implements ClusterDescriptor<Standalone
}
@Override
public StandaloneClusterClient deploy() {
public StandaloneClusterClient deploy() throws UnsupportedOperationException {
throw new UnsupportedOperationException("Can't deploy a standalone cluster.");
}
}
......@@ -81,8 +81,8 @@ public abstract class ClusterClient {
/** The optimizer used in the optimization of batch programs */
final Optimizer compiler;
/** The actor system used to communicate with the JobManager */
protected final ActorSystem actorSystem;
/** The actor system used to communicate with the JobManager. Lazily initialized upon first use */
protected final LazyActorSystemLoader actorSystemLoader;
/** Configuration of the client */
protected final Configuration flinkConfig;
......@@ -127,39 +127,74 @@ public abstract class ClusterClient {
this.timeout = AkkaUtils.getClientTimeout(flinkConfig);
this.lookupTimeout = AkkaUtils.getLookupTimeout(flinkConfig);
this.actorSystem = createActorSystem();
this.actorSystemLoader = new LazyActorSystemLoader(flinkConfig, LOG);
}
// ------------------------------------------------------------------------
// Startup & Shutdown
// ------------------------------------------------------------------------
/**
* Method to create the ActorSystem of the Client. May be overriden in subclasses.
* @return ActorSystem
* @throws IOException
*/
protected ActorSystem createActorSystem() throws IOException {
protected static class LazyActorSystemLoader {
private final Logger LOG;
private final Configuration flinkConfig;
private ActorSystem actorSystem;
if (actorSystem != null) {
throw new RuntimeException("This method may only be called once.");
private LazyActorSystemLoader(Configuration flinkConfig, Logger LOG) {
this.flinkConfig = flinkConfig;
this.LOG = LOG;
}
// start actor system
LOG.info("Starting client actor system.");
/**
* Indicates whether the ActorSystem has already been instantiated.
* @return boolean True if it exists, False otherwise
*/
public boolean isLoaded() {
return actorSystem != null;
}
String hostName = flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
if (hostName == null || port == -1) {
throw new IOException("The initial JobManager address has not been set correctly.");
public void shutdown() {
if (isLoaded()) {
actorSystem.shutdown();
actorSystem.awaitTermination();
actorSystem = null;
}
}
/**
* Creates a new ActorSystem or returns an existing one.
* @return ActorSystem
*/
public ActorSystem get() {
if (!isLoaded()) {
// start actor system
LOG.info("Starting client actor system.");
String hostName = flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
if (hostName == null || port == -1) {
throw new RuntimeException("The initial JobManager address has not been set correctly.");
}
InetSocketAddress initialJobManagerAddress = new InetSocketAddress(hostName, port);
// find name of own public interface, able to connect to the JM
// try to find address for 2 seconds. log after 400 ms.
InetAddress ownHostname;
try {
ownHostname = ConnectionUtils.findConnectingAddress(initialJobManagerAddress, 2000, 400);
} catch (IOException e) {
throw new RuntimeException("Failed to resolve JobManager address at " + initialJobManagerAddress, e);
}
actorSystem = AkkaUtils.createActorSystem(flinkConfig,
new Some<>(new Tuple2<String, Object>(ownHostname.getCanonicalHostName(), 0)));
}
return actorSystem;
}
InetSocketAddress initialJobManagerAddress = new InetSocketAddress(hostName, port);
// find name of own public interface, able to connect to the JM
// try to find address for 2 seconds. log after 400 ms.
InetAddress ownHostname = ConnectionUtils.findConnectingAddress(initialJobManagerAddress, 2000, 400);
return AkkaUtils.createActorSystem(flinkConfig,
new Some<>(new Tuple2<String, Object>(ownHostname.getCanonicalHostName(), 0)));
}
/**
......@@ -170,10 +205,7 @@ public abstract class ClusterClient {
try {
finalizeCluster();
} finally {
if (!this.actorSystem.isTerminated()) {
this.actorSystem.shutdown();
this.actorSystem.awaitTermination();
}
this.actorSystemLoader.shutdown();
}
}
}
......@@ -201,7 +233,7 @@ public abstract class ClusterClient {
/**
* Gets the current JobManager address from the Flink configuration (may change in case of a HA setup).
* @return The address (host and port) of the leading JobManager
* @return The address (host and port) of the leading JobManager when it was last retrieved (may be outdated)
*/
public InetSocketAddress getJobManagerAddressFromConfig() {
try {
......@@ -375,7 +407,7 @@ public abstract class ClusterClient {
try {
logAndSysout("Submitting job with JobID: " + jobGraph.getJobID() + ". Waiting for job completion.");
this.lastJobID = jobGraph.getJobID();
return JobClient.submitJobAndWait(actorSystem,
return JobClient.submitJobAndWait(actorSystemLoader.get(),
leaderRetrievalService, jobGraph, timeout, printStatusDuringExecution, classLoader);
} catch (JobExecutionException e) {
throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
......@@ -614,7 +646,7 @@ public abstract class ClusterClient {
return LeaderRetrievalUtils.retrieveLeaderGateway(
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig),
actorSystem,
actorSystemLoader.get(),
lookupTimeout);
}
......@@ -652,7 +684,7 @@ public abstract class ClusterClient {
/**
* Returns a string representation of the cluster.
*/
protected abstract String getClusterIdentifier();
public abstract String getClusterIdentifier();
/**
* Request the cluster to shut down or disconnect.
......
......@@ -44,7 +44,7 @@ public class StandaloneClusterClient extends ClusterClient {
@Override
public String getWebInterfaceURL() {
String host = this.getJobManagerAddress().getHostName();
String host = this.getJobManagerAddressFromConfig().getHostString();
int port = getFlinkConfiguration().getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
return "http://" + host + ":" + port;
......@@ -74,7 +74,8 @@ public class StandaloneClusterClient extends ClusterClient {
@Override
public String getClusterIdentifier() {
return "Standalone cluster with JobManager running at " + this.getJobManagerAddress();
// Avoid blocking here by getting the address from the config without resolving the address
return "Standalone cluster with JobManager at " + this.getJobManagerAddressFromConfig();
}
@Override
......
......@@ -118,13 +118,13 @@ public class CliFrontendRunTest {
}
// --------------------------------------------------------------------------------------------
public static final class RunTestingCliFrontend extends CliFrontend {
private final int expectedParallelism;
private final boolean sysoutLogging;
private final boolean isDetached;
public RunTestingCliFrontend(int expectedParallelism, boolean logging, boolean isDetached) throws Exception {
super(CliFrontendTestUtils.getConfigDir());
this.expectedParallelism = expectedParallelism;
......@@ -139,10 +139,5 @@ public class CliFrontendRunTest {
assertEquals(expectedParallelism, parallelism);
return 0;
}
@Override
protected ClusterClient getClient(CommandLineOptions options, String programName) throws Exception {
return TestingClusterClientWithoutActorSystem.create();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.client;
import akka.actor.ActorSystem;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.Configuration;
import org.mockito.Mockito;
import java.io.IOException;
/**
* A client to use in tests which does not instantiate an ActorSystem.
*/
public class TestingClusterClientWithoutActorSystem extends StandaloneClusterClient {
private TestingClusterClientWithoutActorSystem() throws IOException {
super(new Configuration());
}
/**
* Do not instantiate the Actor System to save resources.
* @return Mocked ActorSystem
* @throws IOException
*/
@Override
protected ActorSystem createActorSystem() throws IOException {
return Mockito.mock(ActorSystem.class);
}
public static ClusterClient create() {
try {
return new TestingClusterClientWithoutActorSystem();
} catch (IOException e) {
throw new RuntimeException("Could not create TestingClientWithoutActorSystem.", e);
}
}
}
......@@ -303,8 +303,8 @@ public class CliFrontendYarnAddressConfigurationTest {
@Override
// make method public
public ClusterClient getClient(CommandLineOptions options, String programName) throws Exception {
return super.getClient(options, programName);
public ClusterClient createClient(CommandLineOptions options, String programName) throws Exception {
return super.createClient(options, programName);
}
@Override
......
......@@ -134,6 +134,7 @@ public class FlinkYarnSessionCliTest {
Assert.assertEquals(6, client.getMaxSlots());
}
private static class TestCLI extends FlinkYarnSessionCli {
public TestCLI(String shortPrefix, String longPrefix) {
......@@ -143,7 +144,7 @@ public class FlinkYarnSessionCliTest {
private static class JarAgnosticClusterDescriptor extends YarnClusterDescriptor {
@Override
public void setLocalJarPath(Path localJarPath) {
// setLocalJarPath("/tmp");
// add nothing
}
}
......@@ -160,12 +161,7 @@ public class FlinkYarnSessionCliTest {
Mockito.mock(YarnClient.class),
Mockito.mock(ApplicationReport.class),
config,
new Path("/tmp"), true);
}
@Override
protected ActorSystem createActorSystem() throws IOException {
return Mockito.mock(ActorSystem.class);
new Path("/tmp"), false);
}
}
}
......@@ -81,14 +81,14 @@ public class YarnClusterClient extends ClusterClient {
//---------- Class internal fields -------------------
private final AbstractYarnClusterDescriptor clusterDescriptor;
private final ActorRef applicationClient;
private final LazApplicationClientLoader applicationClient;
private final FiniteDuration akkaDuration;
private final Timeout akkaTimeout;
private final ApplicationReport applicationId;
private final ApplicationReport appReport;
private final ApplicationId appId;
private final String trackingURL;
private boolean isConnected = false;
private boolean isConnected = true;
private final boolean perJobCluster;
......@@ -120,63 +120,18 @@ public class YarnClusterClient extends ClusterClient {
this.yarnClient = yarnClient;
this.hadoopConfig = yarnClient.getConfig();
this.sessionFilesDir = sessionFilesDir;
this.applicationId = appReport;
this.appReport = appReport;
this.appId = appReport.getApplicationId();
this.trackingURL = appReport.getTrackingUrl();
this.perJobCluster = perJobCluster;
/* The leader retrieval service for connecting to the cluster and finding the active leader. */
LeaderRetrievalService leaderRetrievalService;
try {
leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
} catch (Exception e) {
throw new IOException("Could not create the leader retrieval service.", e);
}
// start application client
LOG.info("Start application client.");
applicationClient = actorSystem.actorOf(
Props.create(
ApplicationClient.class,
flinkConfig,
leaderRetrievalService),
"applicationClient");
this.applicationClient = new LazApplicationClientLoader();
pollingRunner = new PollingThread(yarnClient, appId);
pollingRunner.setDaemon(true);
pollingRunner.start();
Runtime.getRuntime().addShutdownHook(clientShutdownHook);
isConnected = true;
if (perJobCluster) {
logAndSysout("Waiting until all TaskManagers have connected");
for (GetClusterStatusResponse currentStatus, lastStatus = null;; lastStatus = currentStatus) {
currentStatus = getClusterStatus();
if (currentStatus != null && !currentStatus.equals(lastStatus)) {
logAndSysout("TaskManager status (" + currentStatus.numRegisteredTaskManagers() + "/"
+ clusterDescriptor.getTaskManagerCount() + ")");
if (currentStatus.numRegisteredTaskManagers() >= clusterDescriptor.getTaskManagerCount()) {
logAndSysout("All TaskManagers are connected");
break;
}
} else if (lastStatus == null) {
logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
}
try {
Thread.sleep(250);
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for TaskManagers");
System.err.println("Thread is interrupted");
throw new IOException("Interrupted while waiting for TaskManagers", e);
}
}
}
}
/**
......@@ -219,7 +174,10 @@ public class YarnClusterClient extends ClusterClient {
*/
private void stopAfterJob(JobID jobID) {
Preconditions.checkNotNull(jobID, "The job id must not be null");
Future<Object> messageReceived = ask(applicationClient, new YarnMessages.LocalStopAMAfterJob(jobID), akkaTimeout);
Future<Object> messageReceived =
ask(
applicationClient.get(),
new YarnMessages.LocalStopAMAfterJob(jobID), akkaTimeout);
try {
Await.result(messageReceived, akkaDuration);
} catch (Exception e) {
......@@ -263,7 +221,7 @@ public class YarnClusterClient extends ClusterClient {
@Override
public String getClusterIdentifier() {
return applicationId.getApplicationId().toString();
return "Yarn cluster with application id " + appReport.getApplicationId();
}
/**
......@@ -278,7 +236,11 @@ public class YarnClusterClient extends ClusterClient {
return null;
}
Future<Object> clusterStatusOption = ask(applicationClient, YarnMessages.getLocalGetyarnClusterStatus(), akkaTimeout);
Future<Object> clusterStatusOption =
ask(
applicationClient.get(),
YarnMessages.getLocalGetyarnClusterStatus(),
akkaTimeout);
Object clusterStatus;
try {
clusterStatus = Await.result(clusterStatusOption, akkaDuration);
......@@ -338,9 +300,11 @@ public class YarnClusterClient extends ClusterClient {
while(true) {
Object result;
try {
Future<Object> response = Patterns.ask(applicationClient,
YarnMessages.getLocalGetYarnMessage(), new Timeout(akkaDuration));
Future<Object> response =
Patterns.ask(
applicationClient.get(),
YarnMessages.getLocalGetYarnMessage(),
new Timeout(akkaDuration));
result = Await.result(response, akkaDuration);
} catch(Exception ioe) {
LOG.warn("Error retrieving the YARN messages locally", ioe);
......@@ -406,11 +370,12 @@ public class YarnClusterClient extends ClusterClient {
// we are already in the shutdown hook
}
if(actorSystem != null){
if(actorSystemLoader.isLoaded()){
LOG.info("Sending shutdown request to the Application Master");
if(applicationClient != ActorRef.noSender()) {
if(applicationClient.get() != ActorRef.noSender()) {
try {
Future<Object> response = Patterns.ask(applicationClient,
Future<Object> response =
Patterns.ask(applicationClient.get(),
new YarnMessages.LocalStopYarnSession(getApplicationStatus(),
"Flink YARN Client requested shutdown"),
new Timeout(akkaDuration));
......@@ -467,7 +432,7 @@ public class YarnClusterClient extends ClusterClient {
LOG.warn("Application failed. Diagnostics " + appReport.getDiagnostics());
LOG.warn("If log aggregation is activated in the Hadoop cluster, we recommend to retrieve "
+ "the full application log using this command:\n"
+ "\tyarn logs -applicationId " + appReport.getApplicationId() + "\n"
+ "\tyarn logs -appReport " + appReport.getApplicationId() + "\n"
+ "(It sometimes takes a few seconds until the logs are aggregated)");
}
} catch (Exception e) {
......@@ -553,4 +518,68 @@ public class YarnClusterClient extends ClusterClient {
public boolean isDetached() {
return super.isDetached() || clusterDescriptor.isDetachedMode();
}
public ApplicationId getApplicationId() {
return appId;
}
protected class LazApplicationClientLoader {
private ActorRef applicationClient;
/**
* Creates a new ApplicationClient actor or returns an existing one. May start an ActorSystem.
* @return ActorSystem
*/
public ActorRef get() {
if (applicationClient == null) {
/* The leader retrieval service for connecting to the cluster and finding the active leader. */
LeaderRetrievalService leaderRetrievalService;
try {
leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
} catch (Exception e) {
throw new RuntimeException("Could not create the leader retrieval service.", e);
}
// start application client
LOG.info("Start application client.");
applicationClient = actorSystemLoader.get().actorOf(
Props.create(
ApplicationClient.class,
flinkConfig,
leaderRetrievalService),
"applicationClient");
if (perJobCluster) {
logAndSysout("Waiting until all TaskManagers have connected");
for (GetClusterStatusResponse currentStatus, lastStatus = null;; lastStatus = currentStatus) {
currentStatus = getClusterStatus();
if (currentStatus != null && !currentStatus.equals(lastStatus)) {
logAndSysout("TaskManager status (" + currentStatus.numRegisteredTaskManagers() + "/"
+ clusterDescriptor.getTaskManagerCount() + ")");
if (currentStatus.numRegisteredTaskManagers() >= clusterDescriptor.getTaskManagerCount()) {
logAndSysout("All TaskManagers are connected");
break;
}
} else if (lastStatus == null) {
logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
}
try {
Thread.sleep(250);
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for TaskManagers");
System.err.println("Thread is interrupted");
throw new RuntimeException("Interrupted while waiting for TaskManagers", e);
}
}
}
}
return applicationClient;
}
}
}
......@@ -30,7 +30,6 @@ import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterClient;
......@@ -113,7 +112,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
private final boolean acceptInteractiveInput;
//------------------------------------ Internal fields -------------------------
private YarnClusterClient yarnCluster = null;
private YarnClusterClient yarnCluster;
private boolean detachedMode = false;
public FlinkYarnSessionCli(String shortPrefix, String longPrefix) {
......@@ -555,7 +554,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
if (detachedMode) {
LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
"yarn application -kill "+yarnCluster.getClusterIdentifier());
"yarn application -kill " + APPLICATION_ID.getOpt());
yarnCluster.disconnect();
} else {
runInteractiveCli(yarnCluster, true);
......@@ -608,7 +607,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
// print info and quit:
LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
"yarn application -kill " + yarnCluster.getClusterIdentifier() + "\n" +
"yarn application -kill " + yarnCluster.getApplicationId() + System.lineSeparator() +
"Please also note that the temporary files of the YARN session in {} will not be removed.",
yarnDescriptor.getSessionFilesDir());
yarnCluster.disconnect();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册