提交 8411195b 编写于 作者: S Stephan Ewen

Improve shutdown of JobManager (graceful exit of runing jobs and RPC service)

上级 3e644000
...@@ -40,7 +40,6 @@ import org.apache.commons.cli.Option; ...@@ -40,7 +40,6 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException; import org.apache.commons.cli.ParseException;
import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
...@@ -61,6 +60,7 @@ import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; ...@@ -61,6 +60,7 @@ import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.JobStatusListener; import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.instance.Hardware;
import org.apache.flink.runtime.instance.HardwareDescription; import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo; import org.apache.flink.runtime.instance.InstanceConnectionInfo;
...@@ -92,7 +92,6 @@ import org.apache.flink.runtime.util.EnvironmentInformation; ...@@ -92,7 +92,6 @@ import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.SerializableArrayList; import org.apache.flink.runtime.util.SerializableArrayList;
import org.apache.flink.util.StringUtils; import org.apache.flink.util.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -112,7 +111,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide ...@@ -112,7 +111,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
/** Executor service for asynchronous commands (to relieve the RPC threads of work) */ /** Executor service for asynchronous commands (to relieve the RPC threads of work) */
private final ExecutorService executorService = Executors.newCachedThreadPool(ExecutorThreadFactory.INSTANCE); private final ExecutorService executorService = Executors.newFixedThreadPool(2 * Hardware.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE);
/** The RPC end point through which the JobManager gets its calls */ /** The RPC end point through which the JobManager gets its calls */
...@@ -231,17 +230,12 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide ...@@ -231,17 +230,12 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
return; return;
} }
// Stop instance manager for (ExecutionGraph e : this.currentJobs.values()) {
if (this.instanceManager != null) { e.fail(new Exception("The JobManager is shutting down."));
this.instanceManager.shutdown();
}
// Stop RPC server
if (this.jobManagerServer != null) {
this.jobManagerServer.stop();
} }
// Stop the executor service // Stop the executor service
// this waits for any pending calls to be done
if (this.executorService != null) { if (this.executorService != null) {
this.executorService.shutdown(); this.executorService.shutdown();
try { try {
...@@ -251,6 +245,16 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide ...@@ -251,6 +245,16 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
} }
} }
// Stop instance manager
if (this.instanceManager != null) {
this.instanceManager.shutdown();
}
// Stop RPC server
if (this.jobManagerServer != null) {
this.jobManagerServer.stop();
}
// Stop and clean up the job progress collector // Stop and clean up the job progress collector
if (this.eventCollector != null) { if (this.eventCollector != null) {
this.eventCollector.shutdown(); this.eventCollector.shutdown();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册