[FLINK-20846] Move checkpoint service shut down out of CheckpointCoordinator

By moving the shut down of checkpoint services out of the CheckpointCoordinator,
it is now possible to reuse these services across different CheckpointCoordinators.

This closes #14553.
上级 224822e8
......@@ -423,14 +423,6 @@ public class CheckpointCoordinator {
CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
// clear queued requests and in-flight checkpoints
abortPendingAndQueuedCheckpoints(reason);
completedCheckpointStore.shutdown(
jobStatus,
checkpointsCleaner,
() -> {
// don't schedule anything on shutdown
});
checkpointIdCounter.shutdown(jobStatus);
}
}
}
......
......@@ -416,7 +416,8 @@ public class ExecutionGraph implements AccessExecutionGraph {
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore checkpointStore,
StateBackend checkpointStateBackend,
CheckpointStatsTracker statsTracker) {
CheckpointStatsTracker statsTracker,
CheckpointsCleaner checkpointsCleaner) {
checkState(state == JobStatus.CREATED, "Job must be in CREATED state");
checkState(checkpointCoordinator == null, "checkpointing already enabled");
......@@ -470,7 +471,7 @@ public class ExecutionGraph implements AccessExecutionGraph {
checkpointStore,
checkpointStateBackend,
ioExecutor,
new CheckpointsCleaner(),
checkpointsCleaner,
new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer),
SharedStateRegistry.DEFAULT_FACTORY,
failureManager);
......
......@@ -31,6 +31,7 @@ import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
......@@ -80,6 +81,7 @@ public class ExecutionGraphBuilder {
SlotProvider slotProvider,
ClassLoader classLoader,
CompletedCheckpointStore completedCheckpointStore,
CheckpointsCleaner checkpointsCleaner,
CheckpointIDCounter checkpointIdCounter,
Time rpcTimeout,
MetricGroup metrics,
......@@ -289,7 +291,8 @@ public class ExecutionGraphBuilder {
checkpointIdCounter,
completedCheckpointStore,
rootBackend,
checkpointStatsTracker);
checkpointStatsTracker,
checkpointsCleaner);
}
// create all the metrics for the Execution Graph
......
......@@ -36,6 +36,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.DeactivatedCheckpointCompletedCheckpointStore;
......@@ -157,6 +158,12 @@ public abstract class SchedulerBase implements SchedulerNG {
private final CheckpointRecoveryFactory checkpointRecoveryFactory;
private final CompletedCheckpointStore completedCheckpointStore;
private final CheckpointsCleaner checkpointsCleaner;
private final CheckpointIDCounter checkpointIdCounter;
private final Time rpcTimeout;
private final BlobWriter blobWriter;
......@@ -211,14 +218,23 @@ public abstract class SchedulerBase implements SchedulerNG {
this.slotRequestTimeout = checkNotNull(slotRequestTimeout);
this.executionVertexVersioner = checkNotNull(executionVertexVersioner);
this.checkpointsCleaner = new CheckpointsCleaner();
this.completedCheckpointStore = createCompletedCheckpointStore();
this.checkpointIdCounter = createCheckpointIdCounter();
this.executionGraph =
createAndRestoreExecutionGraph(
jobManagerJobMetricGroup,
completedCheckpointStore,
checkpointsCleaner,
checkpointIdCounter,
checkNotNull(shuffleMaster),
checkNotNull(partitionTracker),
checkNotNull(executionDeploymentTracker),
initializationTimestamp);
registerShutDownCheckpointServicesOnExecutionGraphTermination(executionGraph);
this.schedulingTopology = executionGraph.getSchedulingTopology();
stateLocationRetriever =
......@@ -230,8 +246,78 @@ public abstract class SchedulerBase implements SchedulerNG {
this.coordinatorMap = createCoordinatorMap();
}
private void registerShutDownCheckpointServicesOnExecutionGraphTermination(
ExecutionGraph executionGraph) {
FutureUtils.assertNoException(
executionGraph.getTerminationFuture().thenAccept(this::shutDownCheckpointServices));
}
private void shutDownCheckpointServices(JobStatus jobStatus) {
Exception exception = null;
try {
completedCheckpointStore.shutdown(
jobStatus,
checkpointsCleaner,
() -> {
// don't schedule anything on shutdown
});
} catch (Exception e) {
exception = e;
}
try {
checkpointIdCounter.shutdown(jobStatus);
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
if (exception != null) {
log.error("Error while shutting down checkpoint services.", exception);
}
}
private CompletedCheckpointStore createCompletedCheckpointStore() throws JobExecutionException {
final JobID jobId = jobGraph.getJobID();
if (ExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) {
try {
return ExecutionGraphBuilder.createCompletedCheckpointStore(
jobMasterConfiguration,
userCodeLoader,
checkpointRecoveryFactory,
log,
jobId);
} catch (Exception e) {
throw new JobExecutionException(
jobId,
"Failed to initialize high-availability completed checkpoint store",
e);
}
} else {
return DeactivatedCheckpointCompletedCheckpointStore.INSTANCE;
}
}
private CheckpointIDCounter createCheckpointIdCounter() throws JobExecutionException {
final JobID jobId = jobGraph.getJobID();
if (ExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) {
try {
return ExecutionGraphBuilder.createCheckpointIdCounter(
checkpointRecoveryFactory, jobId);
} catch (Exception e) {
throw new JobExecutionException(
jobId, "Failed to initialize high-availability checkpoint id counter", e);
}
} else {
return DeactivatedCheckpointIDCounter.INSTANCE;
}
}
private ExecutionGraph createAndRestoreExecutionGraph(
JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
CompletedCheckpointStore completedCheckpointStore,
CheckpointsCleaner checkpointsCleaner,
CheckpointIDCounter checkpointIdCounter,
ShuffleMaster<?> shuffleMaster,
JobMasterPartitionTracker partitionTracker,
ExecutionDeploymentTracker executionDeploymentTracker,
......@@ -241,6 +327,9 @@ public abstract class SchedulerBase implements SchedulerNG {
ExecutionGraph newExecutionGraph =
createExecutionGraph(
currentJobManagerJobMetricGroup,
completedCheckpointStore,
checkpointsCleaner,
checkpointIdCounter,
shuffleMaster,
partitionTracker,
executionDeploymentTracker,
......@@ -265,6 +354,9 @@ public abstract class SchedulerBase implements SchedulerNG {
private ExecutionGraph createExecutionGraph(
JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
CompletedCheckpointStore completedCheckpointStore,
CheckpointsCleaner checkpointsCleaner,
CheckpointIDCounter checkpointIdCounter,
ShuffleMaster<?> shuffleMaster,
final JobMasterPartitionTracker partitionTracker,
ExecutionDeploymentTracker executionDeploymentTracker,
......@@ -280,31 +372,6 @@ public abstract class SchedulerBase implements SchedulerNG {
}
};
final JobID jobId = jobGraph.getJobID();
final CheckpointIDCounter checkpointIdCounter;
final CompletedCheckpointStore completedCheckpointStore;
if (ExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) {
try {
checkpointIdCounter =
ExecutionGraphBuilder.createCheckpointIdCounter(
checkpointRecoveryFactory, jobId);
completedCheckpointStore =
ExecutionGraphBuilder.createCompletedCheckpointStore(
jobMasterConfiguration,
userCodeLoader,
checkpointRecoveryFactory,
log,
jobId);
} catch (Exception e) {
throw new JobExecutionException(
jobId, "Failed to initialize high-availability checkpoint handler", e);
}
} else {
checkpointIdCounter = DeactivatedCheckpointIDCounter.INSTANCE;
completedCheckpointStore = DeactivatedCheckpointCompletedCheckpointStore.INSTANCE;
}
return ExecutionGraphBuilder.buildGraph(
jobGraph,
jobMasterConfiguration,
......@@ -313,6 +380,7 @@ public abstract class SchedulerBase implements SchedulerNG {
slotProvider,
userCodeLoader,
completedCheckpointStore,
checkpointsCleaner,
checkpointIdCounter,
rpcTimeout,
currentJobManagerJobMetricGroup,
......
......@@ -983,6 +983,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
// set up the coordinator and validate the initial state
final StandaloneCompletedCheckpointStore completedCheckpointStore =
new StandaloneCompletedCheckpointStore(10);
CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
.setJobId(jobId)
......@@ -995,7 +997,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
.setTasksToWaitFor(
new ExecutionVertex[] {ackVertex1, ackVertex2, ackVertex3})
.setTasksToCommitTo(new ExecutionVertex[] {commitVertex})
.setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(10))
.setCompletedCheckpointStore(completedCheckpointStore)
.setTimer(manuallyTriggeredScheduledExecutor)
.build();
......@@ -1168,6 +1170,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
verify(subtaskState13, times(1)).discardState();
checkpointCoordinator.shutdown(JobStatus.FINISHED);
completedCheckpointStore.shutdown(
JobStatus.FINISHED, new CheckpointsCleaner(), () -> {});
// validate that the states in the second checkpoint have been discarded
verify(subtaskState21, times(1)).discardState();
......
......@@ -27,6 +27,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
......@@ -175,6 +176,7 @@ public class TestingExecutionGraphBuilder {
slotProvider,
userClassLoader,
completedCheckpointStore,
new CheckpointsCleaner(),
checkpointIdCounter,
rpcTimeout,
metricGroup,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册