From 8411195b08bac461bc76073de71c425c219638de Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Thu, 18 Sep 2014 17:07:09 +0200 Subject: [PATCH] Improve shutdown of JobManager (graceful exit of runing jobs and RPC service) --- .../flink/runtime/jobmanager/JobManager.java | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java index 1c02127c7f1..90bfdc652b4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java @@ -40,7 +40,6 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; - import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -61,6 +60,7 @@ import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.JobStatusListener; +import org.apache.flink.runtime.instance.Hardware; import org.apache.flink.runtime.instance.HardwareDescription; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceConnectionInfo; @@ -92,7 +92,6 @@ import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.runtime.util.SerializableArrayList; import org.apache.flink.util.StringUtils; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,7 +111,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide /** Executor service for asynchronous commands (to relieve the RPC threads of work) */ - private final ExecutorService executorService = Executors.newCachedThreadPool(ExecutorThreadFactory.INSTANCE); + private final ExecutorService executorService = Executors.newFixedThreadPool(2 * Hardware.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE); /** The RPC end point through which the JobManager gets its calls */ @@ -230,18 +229,13 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide if (!this.isShutdownInProgress.compareAndSet(false, true)) { return; } - - // Stop instance manager - if (this.instanceManager != null) { - this.instanceManager.shutdown(); - } - - // Stop RPC server - if (this.jobManagerServer != null) { - this.jobManagerServer.stop(); + + for (ExecutionGraph e : this.currentJobs.values()) { + e.fail(new Exception("The JobManager is shutting down.")); } - + // Stop the executor service + // this waits for any pending calls to be done if (this.executorService != null) { this.executorService.shutdown(); try { @@ -251,6 +245,16 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide } } + // Stop instance manager + if (this.instanceManager != null) { + this.instanceManager.shutdown(); + } + + // Stop RPC server + if (this.jobManagerServer != null) { + this.jobManagerServer.stop(); + } + // Stop and clean up the job progress collector if (this.eventCollector != null) { this.eventCollector.shutdown(); -- GitLab