diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index 9a05c16b2af66193e5b1a2d6c19c4cfa2b808002..673341e8d40c90147964ab7709f5f28572f3c7d6 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -40,6 +40,7 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ShutdownHookUtil; import org.apache.flink.yarn.AbstractYarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterDescriptor; import org.apache.flink.yarn.configuration.YarnConfigOptions; @@ -650,48 +651,22 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine shutdownCluster(yarnClusterDescriptor, clusterClient, scheduledExecutorService, + yarnApplicationStatusMonitor, yarnApplicationId), + getClass().getSimpleName(), + LOG); try { runInteractiveCli( clusterClient, yarnApplicationStatusMonitor, acceptInteractiveInput); } finally { - try { - yarnApplicationStatusMonitor.close(); - } catch (Exception e) { - LOG.info("Could not properly close the Yarn application status monitor.", e); - } - - clusterClient.shutDownCluster(); - - try { - clusterClient.shutdown(); - } catch (Exception e) { - LOG.info("Could not properly shutdown cluster client.", e); - } - - // shut down the scheduled executor service - ExecutorUtils.gracefulShutdown( - 1000L, - TimeUnit.MILLISECONDS, - scheduledExecutorService); - - deleteYarnPropertiesFile(); - - ApplicationReport applicationReport; - - try { - applicationReport = yarnClusterDescriptor - .getYarnClient() - .getApplicationReport(yarnApplicationId); - } catch (YarnException | IOException e) { - LOG.info("Could not log the final application report.", e); - applicationReport = null; - } - - if (applicationReport != null) { - logFinalApplicationReport(applicationReport); + shutdownCluster(yarnClusterDescriptor, clusterClient, scheduledExecutorService, + yarnApplicationStatusMonitor, yarnApplicationId); + if (shutdownHook != null) { + // we do not need the hook anymore as we have just tried to shutdown the cluster. + ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG); } } } @@ -707,6 +682,47 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine