diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 0da6f008a209d3c032413dc8b0d56921e0c9f929..558f16ab6a8e4adabc25b3a2353bf005feaf6d81 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -423,14 +423,6 @@ public class CheckpointCoordinator { CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN); // clear queued requests and in-flight checkpoints abortPendingAndQueuedCheckpoints(reason); - - completedCheckpointStore.shutdown( - jobStatus, - checkpointsCleaner, - () -> { - // don't schedule anything on shutdown - }); - checkpointIdCounter.shutdown(jobStatus); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 3b938007a3483b929eaaaeedd63a3372fa30aa24..13a4c03adaa3b5e8b60a36db9783a15227ede5d1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -416,7 +416,8 @@ public class ExecutionGraph implements AccessExecutionGraph { CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore checkpointStore, StateBackend checkpointStateBackend, - CheckpointStatsTracker statsTracker) { + CheckpointStatsTracker statsTracker, + CheckpointsCleaner checkpointsCleaner) { checkState(state == JobStatus.CREATED, "Job must be in CREATED state"); checkState(checkpointCoordinator == null, "checkpointing already enabled"); @@ -470,7 +471,7 @@ public class ExecutionGraph implements AccessExecutionGraph { checkpointStore, checkpointStateBackend, ioExecutor, - new CheckpointsCleaner(), + checkpointsCleaner, new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer), SharedStateRegistry.DEFAULT_FACTORY, failureManager); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index d63b045a5db7c81da2c3a1af1139c3483d2242d3..86c26e0fdefd98e3daddacf493d7b32a023ab11f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook; import org.apache.flink.runtime.checkpoint.hooks.MasterHooks; @@ -80,6 +81,7 @@ public class ExecutionGraphBuilder { SlotProvider slotProvider, ClassLoader classLoader, CompletedCheckpointStore completedCheckpointStore, + CheckpointsCleaner checkpointsCleaner, CheckpointIDCounter checkpointIdCounter, Time rpcTimeout, MetricGroup metrics, @@ -289,7 +291,8 @@ public class ExecutionGraphBuilder { checkpointIdCounter, completedCheckpointStore, rootBackend, - checkpointStatsTracker); + checkpointStatsTracker, + checkpointsCleaner); } // create all the metrics for the Execution Graph diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index 15ac47af8e73db791ee17417b2f25f5e90f5ae5e..cf93cdc36a3af1ffdbb18b0c0811f87c29027507 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.DeactivatedCheckpointCompletedCheckpointStore; @@ -157,6 +158,12 @@ public abstract class SchedulerBase implements SchedulerNG { private final CheckpointRecoveryFactory checkpointRecoveryFactory; + private final CompletedCheckpointStore completedCheckpointStore; + + private final CheckpointsCleaner checkpointsCleaner; + + private final CheckpointIDCounter checkpointIdCounter; + private final Time rpcTimeout; private final BlobWriter blobWriter; @@ -211,14 +218,23 @@ public abstract class SchedulerBase implements SchedulerNG { this.slotRequestTimeout = checkNotNull(slotRequestTimeout); this.executionVertexVersioner = checkNotNull(executionVertexVersioner); + this.checkpointsCleaner = new CheckpointsCleaner(); + this.completedCheckpointStore = createCompletedCheckpointStore(); + this.checkpointIdCounter = createCheckpointIdCounter(); + this.executionGraph = createAndRestoreExecutionGraph( jobManagerJobMetricGroup, + completedCheckpointStore, + checkpointsCleaner, + checkpointIdCounter, checkNotNull(shuffleMaster), checkNotNull(partitionTracker), checkNotNull(executionDeploymentTracker), initializationTimestamp); + registerShutDownCheckpointServicesOnExecutionGraphTermination(executionGraph); + this.schedulingTopology = executionGraph.getSchedulingTopology(); stateLocationRetriever = @@ -230,8 +246,78 @@ public abstract class SchedulerBase implements SchedulerNG { this.coordinatorMap = createCoordinatorMap(); } + private void registerShutDownCheckpointServicesOnExecutionGraphTermination( + ExecutionGraph executionGraph) { + FutureUtils.assertNoException( + executionGraph.getTerminationFuture().thenAccept(this::shutDownCheckpointServices)); + } + + private void shutDownCheckpointServices(JobStatus jobStatus) { + Exception exception = null; + + try { + completedCheckpointStore.shutdown( + jobStatus, + checkpointsCleaner, + () -> { + // don't schedule anything on shutdown + }); + } catch (Exception e) { + exception = e; + } + + try { + checkpointIdCounter.shutdown(jobStatus); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + if (exception != null) { + log.error("Error while shutting down checkpoint services.", exception); + } + } + + private CompletedCheckpointStore createCompletedCheckpointStore() throws JobExecutionException { + final JobID jobId = jobGraph.getJobID(); + if (ExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) { + try { + return ExecutionGraphBuilder.createCompletedCheckpointStore( + jobMasterConfiguration, + userCodeLoader, + checkpointRecoveryFactory, + log, + jobId); + } catch (Exception e) { + throw new JobExecutionException( + jobId, + "Failed to initialize high-availability completed checkpoint store", + e); + } + } else { + return DeactivatedCheckpointCompletedCheckpointStore.INSTANCE; + } + } + + private CheckpointIDCounter createCheckpointIdCounter() throws JobExecutionException { + final JobID jobId = jobGraph.getJobID(); + if (ExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) { + try { + return ExecutionGraphBuilder.createCheckpointIdCounter( + checkpointRecoveryFactory, jobId); + } catch (Exception e) { + throw new JobExecutionException( + jobId, "Failed to initialize high-availability checkpoint id counter", e); + } + } else { + return DeactivatedCheckpointIDCounter.INSTANCE; + } + } + private ExecutionGraph createAndRestoreExecutionGraph( JobManagerJobMetricGroup currentJobManagerJobMetricGroup, + CompletedCheckpointStore completedCheckpointStore, + CheckpointsCleaner checkpointsCleaner, + CheckpointIDCounter checkpointIdCounter, ShuffleMaster shuffleMaster, JobMasterPartitionTracker partitionTracker, ExecutionDeploymentTracker executionDeploymentTracker, @@ -241,6 +327,9 @@ public abstract class SchedulerBase implements SchedulerNG { ExecutionGraph newExecutionGraph = createExecutionGraph( currentJobManagerJobMetricGroup, + completedCheckpointStore, + checkpointsCleaner, + checkpointIdCounter, shuffleMaster, partitionTracker, executionDeploymentTracker, @@ -265,6 +354,9 @@ public abstract class SchedulerBase implements SchedulerNG { private ExecutionGraph createExecutionGraph( JobManagerJobMetricGroup currentJobManagerJobMetricGroup, + CompletedCheckpointStore completedCheckpointStore, + CheckpointsCleaner checkpointsCleaner, + CheckpointIDCounter checkpointIdCounter, ShuffleMaster shuffleMaster, final JobMasterPartitionTracker partitionTracker, ExecutionDeploymentTracker executionDeploymentTracker, @@ -280,31 +372,6 @@ public abstract class SchedulerBase implements SchedulerNG { } }; - final JobID jobId = jobGraph.getJobID(); - final CheckpointIDCounter checkpointIdCounter; - final CompletedCheckpointStore completedCheckpointStore; - - if (ExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) { - try { - checkpointIdCounter = - ExecutionGraphBuilder.createCheckpointIdCounter( - checkpointRecoveryFactory, jobId); - completedCheckpointStore = - ExecutionGraphBuilder.createCompletedCheckpointStore( - jobMasterConfiguration, - userCodeLoader, - checkpointRecoveryFactory, - log, - jobId); - } catch (Exception e) { - throw new JobExecutionException( - jobId, "Failed to initialize high-availability checkpoint handler", e); - } - } else { - checkpointIdCounter = DeactivatedCheckpointIDCounter.INSTANCE; - completedCheckpointStore = DeactivatedCheckpointCompletedCheckpointStore.INSTANCE; - } - return ExecutionGraphBuilder.buildGraph( jobGraph, jobMasterConfiguration, @@ -313,6 +380,7 @@ public abstract class SchedulerBase implements SchedulerNG { slotProvider, userCodeLoader, completedCheckpointStore, + checkpointsCleaner, checkpointIdCounter, rpcTimeout, currentJobManagerJobMetricGroup, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index dd26126f841a29a72d6cb77b47c200e612c1aeef..c7d66fe8c3ab2be28dfcfa5eee9a321885df23b2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -983,6 +983,8 @@ public class CheckpointCoordinatorTest extends TestLogger { ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID); // set up the coordinator and validate the initial state + final StandaloneCompletedCheckpointStore completedCheckpointStore = + new StandaloneCompletedCheckpointStore(10); CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorBuilder() .setJobId(jobId) @@ -995,7 +997,7 @@ public class CheckpointCoordinatorTest extends TestLogger { .setTasksToWaitFor( new ExecutionVertex[] {ackVertex1, ackVertex2, ackVertex3}) .setTasksToCommitTo(new ExecutionVertex[] {commitVertex}) - .setCompletedCheckpointStore(new StandaloneCompletedCheckpointStore(10)) + .setCompletedCheckpointStore(completedCheckpointStore) .setTimer(manuallyTriggeredScheduledExecutor) .build(); @@ -1168,6 +1170,8 @@ public class CheckpointCoordinatorTest extends TestLogger { verify(subtaskState13, times(1)).discardState(); checkpointCoordinator.shutdown(JobStatus.FINISHED); + completedCheckpointStore.shutdown( + JobStatus.FINISHED, new CheckpointsCleaner(), () -> {}); // validate that the states in the second checkpoint have been discarded verify(subtaskState21, times(1)).discardState(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java index 0add2f689e6ac3a9611bbe047b0bdd3a91f8f575..59360edd803e5e41a44deb1803ddda587db9f65c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.blob.VoidBlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; @@ -175,6 +176,7 @@ public class TestingExecutionGraphBuilder { slotProvider, userClassLoader, completedCheckpointStore, + new CheckpointsCleaner(), checkpointIdCounter, rpcTimeout, metricGroup,