提交 3b593632 编写于 作者: M Maximilian Michels

[FLINK-3667] additional cleanups in YarnClusterClient

- remove ActorRunner thread, print status in finalizeCluster instead
- prevent premature shutdown of actor system in shutdown method
- prevent timeout exceptions due to poisoning the ApplicationClient
上级 6420c1c2
......@@ -153,7 +153,7 @@ public class CliFrontend {
// load the configuration
LOG.info("Trying to load configuration file");
System.setProperty("FLINK_CONF_DIR", configDirectory.getAbsolutePath());
System.setProperty(ENV_CONFIG_DIRECTORY, configDirectory.getAbsolutePath());
this.config = GlobalConfiguration.getConfiguration();
......@@ -166,12 +166,14 @@ public abstract class ClusterClient {
* Shuts down the client. This stops the internal actor system and actors.
public void shutdown() {
try {
} finally {
if (!this.actorSystem.isTerminated()) {
synchronized (this) {
try {
} finally {
if (!this.actorSystem.isTerminated()) {
......@@ -21,7 +21,6 @@ import akka.actor.ActorRef;
import static akka.pattern.Patterns.ask;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.pattern.Patterns;
import akka.util.Timeout;
......@@ -72,7 +71,7 @@ public class YarnClusterClient extends ClusterClient {
private static final int POLLING_THREAD_INTERVAL_MS = 1000;
private YarnClient yarnClient;
private Thread actorRunner;
private Thread clientShutdownHook = new ClientShutdownHook();
private PollingThread pollingRunner;
private final Configuration hadoopConfig;
......@@ -144,36 +143,6 @@ public class YarnClusterClient extends ClusterClient {
actorRunner = new Thread(new Runnable() {
public void run() {
// blocks until ApplicationClient has been stopped
// get final application report
try {
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
LOG.info("Application " + appId + " finished with state " + appReport
.getYarnApplicationState() + " and final state " + appReport
.getFinalApplicationStatus() + " at " + appReport.getFinishTime());
if (appReport.getYarnApplicationState() == YarnApplicationState.FAILED || appReport.getYarnApplicationState()
== YarnApplicationState.KILLED) {
LOG.warn("Application failed. Diagnostics " + appReport.getDiagnostics());
LOG.warn("If log aggregation is activated in the Hadoop cluster, we recommend to retrieve "
+ "the full application log using this command:\n"
+ "\tyarn logs -applicationId " + appReport.getApplicationId() + "\n"
+ "(It sometimes takes a few seconds until the logs are aggregated)");
} catch (Exception e) {
LOG.warn("Error while getting final application report", e);
pollingRunner = new PollingThread(yarnClient, appId);
......@@ -211,10 +180,19 @@ public class YarnClusterClient extends ClusterClient {
* Disconnect from the Yarn cluster
public void disconnect() {
if (hasBeenShutDown.getAndSet(true)) {
if(!isConnected) {
throw new IllegalStateException("Can not disconnect from an unconnected cluster.");
LOG.info("Disconnecting YarnClusterClient from ApplicationMaster");
try {
......@@ -223,15 +201,6 @@ public class YarnClusterClient extends ClusterClient {
// we are already in the shutdown hook
// tell the actor to shut down.
applicationClient.tell(PoisonPill.getInstance(), applicationClient);
try {
actorRunner.join(1000); // wait for 1 second
} catch (InterruptedException e) {
LOG.warn("Shutdown of the actor runner was interrupted", e);
try {
......@@ -239,6 +208,7 @@ public class YarnClusterClient extends ClusterClient {
LOG.warn("Shutdown of the polling runner was interrupted", e);
isConnected = false;
......@@ -278,23 +248,7 @@ public class YarnClusterClient extends ClusterClient {
if (isDetached()) {
return super.runDetached(jobGraph, classLoader);
} else {
try {
return super.run(jobGraph, classLoader);
} finally {
// show cluster status
List<String> msgs = getNewMessages();
if (msgs != null && msgs.size() > 1) {
logAndSysout("The following messages were created by the YARN cluster while running the Job:");
for (String msg : msgs) {
if (getApplicationStatus() != ApplicationStatus.SUCCEEDED) {
logAndSysout("YARN cluster is in non-successful state " + getApplicationStatus());
logAndSysout("YARN Diagnostics: " + getDiagnostics());
return super.run(jobGraph, classLoader);
......@@ -369,35 +323,18 @@ public class YarnClusterClient extends ClusterClient {
public List<String> getNewMessages() {
private String getDiagnostics() {
if(!isConnected) {
throw new IllegalStateException("The cluster has been connected to the ApplicationMaster.");
if (getApplicationStatus() == ApplicationStatus.SUCCEEDED) {
LOG.warn("getDiagnostics() called for cluster which is not in failed state");
ApplicationReport lastReport = pollingRunner.getLastReport();
if (lastReport == null) {
LOG.warn("Last report is null");
return null;
} else {
return lastReport.getDiagnostics();
if(hasBeenShutdown()) {
throw new RuntimeException("The YarnClusterClient has already been stopped");
public List<String> getNewMessages() {
if(!isConnected) {
throw new IllegalStateException("The cluster has been connected to the ApplicationMaster.");
if(hasBeenShutdown()) {
throw new RuntimeException("The YarnClusterClient has already been stopped");
List<String> ret = new ArrayList<String>();
// get messages from ApplicationClient (locally)
while(true) {
Object result;
......@@ -443,7 +380,6 @@ public class YarnClusterClient extends ClusterClient {
public void finalizeCluster() {
if (isDetached() || !perJobCluster) {
// only disconnect if we are not running a per job cluster
......@@ -452,14 +388,17 @@ public class YarnClusterClient extends ClusterClient {
* Shuts down the Yarn application
public void shutdownCluster() {
if (!isConnected) {
throw new IllegalStateException("The cluster has been not been connected to the ApplicationMaster.");
if (hasBeenShutDown.getAndSet(true)) {
if(hasBeenShutDown.getAndSet(true)) {
if (!isConnected) {
throw new IllegalStateException("The cluster has been not been connected to the ApplicationMaster.");
try {
......@@ -481,9 +420,6 @@ public class YarnClusterClient extends ClusterClient {
LOG.warn("Error while stopping YARN Application Client", e);
try {
......@@ -512,12 +448,6 @@ public class YarnClusterClient extends ClusterClient {
LOG.warn("Session file directory not set. Not deleting session files");
try {
actorRunner.join(1000); // wait for 1 second
} catch (InterruptedException e) {
LOG.warn("Shutdown of the actor runner was interrupted", e);
try {
......@@ -526,6 +456,25 @@ public class YarnClusterClient extends ClusterClient {
try {
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
LOG.info("Application " + appId + " finished with state " + appReport
.getYarnApplicationState() + " and final state " + appReport
.getFinalApplicationStatus() + " at " + appReport.getFinishTime());
if (appReport.getYarnApplicationState() == YarnApplicationState.FAILED || appReport.getYarnApplicationState()
== YarnApplicationState.KILLED) {
LOG.warn("Application failed. Diagnostics " + appReport.getDiagnostics());
LOG.warn("If log aggregation is activated in the Hadoop cluster, we recommend to retrieve "
+ "the full application log using this command:\n"
+ "\tyarn logs -applicationId " + appReport.getApplicationId() + "\n"
+ "(It sometimes takes a few seconds until the logs are aggregated)");
} catch (Exception e) {
LOG.warn("Couldn't get final report", e);
LOG.info("YARN Client is shutting down");
yarnClient.stop(); // actorRunner is using the yarnClient.
yarnClient = null; // set null to clearly see if somebody wants to access it afterwards.
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册