diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 35cace779b15f605ca5dd377e8eb2ec3df41a210..fa18a143429470d403a7db0276fb8efe52dc1e3c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -1370,6 +1370,7 @@ public class CheckpointCoordinator { return this.pendingCheckpoints.size(); } + @VisibleForTesting public int getNumberOfRetainedSuccessfulCheckpoints() { synchronized (lock) { return completedCheckpointStore.getNumberOfRetainedCheckpoints(); @@ -1382,6 +1383,7 @@ public class CheckpointCoordinator { } } + @VisibleForTesting public List getSuccessfulCheckpoints() throws Exception { synchronized (lock) { return completedCheckpointStore.getAllCheckpoints(); @@ -1392,6 +1394,7 @@ public class CheckpointCoordinator { return checkpointStorage; } + @VisibleForTesting public CompletedCheckpointStore getCheckpointStore() { return completedCheckpointStore; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java index 42f608b82366b5adf57226a114545ffe20478eff..80ca9fbfb01fa0597c7f8c464e5a37f58609ebcc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java @@ -23,12 +23,16 @@ import org.apache.flink.api.common.JobStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.concurrent.ThreadSafe; + import java.util.List; import java.util.ListIterator; /** * A bounded LIFO-queue of {@link CompletedCheckpoint} instances. + * Note that it might be visited by multiple threads. So implementation should keep it thread-safe. */ +@ThreadSafe public interface CompletedCheckpointStore { Logger LOG = LoggerFactory.getLogger(CompletedCheckpointStore.class); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java index ddaf9095f801cdda60f7d127e14d4aeecf2d041c..4cc9931ee3fc36a3fef48a58e225a96cc788bc30 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java @@ -24,15 +24,19 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.concurrent.ThreadSafe; + import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkState; /** * {@link CompletedCheckpointStore} for JobManagers running in {@link HighAvailabilityMode#NONE}. */ +@ThreadSafe public class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore { private static final Logger LOG = LoggerFactory.getLogger(StandaloneCompletedCheckpointStore.class); @@ -43,6 +47,8 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt /** The completed checkpoints. */ private final ArrayDeque checkpoints; + private boolean shutdown = false; + /** * Creates {@link StandaloneCompletedCheckpointStore}. * @@ -64,26 +70,34 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt @Override public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception { - checkpoints.addLast(checkpoint); + synchronized (checkpoints) { + checkState(!shutdown, "StandaloneCompletedCheckpointStore has been shut down"); + + checkpoints.addLast(checkpoint); - if (checkpoints.size() > maxNumberOfCheckpointsToRetain) { - try { - CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst(); - checkpointToSubsume.discardOnSubsume(); - } catch (Exception e) { - LOG.warn("Fail to subsume the old checkpoint.", e); + if (checkpoints.size() > maxNumberOfCheckpointsToRetain) { + try { + CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst(); + checkpointToSubsume.discardOnSubsume(); + } catch (Exception e) { + LOG.warn("Fail to subsume the old checkpoint.", e); + } } } } @Override public List getAllCheckpoints() { - return new ArrayList<>(checkpoints); + synchronized (checkpoints) { + return new ArrayList<>(checkpoints); + } } @Override public int getNumberOfRetainedCheckpoints() { - return checkpoints.size(); + synchronized (checkpoints) { + return checkpoints.size(); + } } @Override @@ -93,14 +107,19 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt @Override public void shutdown(JobStatus jobStatus) throws Exception { - try { - LOG.info("Shutting down"); - - for (CompletedCheckpoint checkpoint : checkpoints) { - checkpoint.discardOnShutdown(jobStatus); + synchronized (checkpoints) { + if (!shutdown) { + shutdown = true; + try { + LOG.info("Shutting down"); + + for (CompletedCheckpoint checkpoint : checkpoints) { + checkpoint.discardOnShutdown(jobStatus); + } + } finally { + checkpoints.clear(); + } } - } finally { - checkpoints.clear(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index b4f72b7436d6154f7476a21399a934005baaa5ac..80b6d500ce86b03bb2e0c79142ee1e5f46b4cedb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -29,6 +29,8 @@ import org.apache.flink.util.function.ThrowingConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.concurrent.ThreadSafe; + import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; @@ -40,6 +42,7 @@ import java.util.concurrent.Executor; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** * {@link CompletedCheckpointStore} for JobManagers running in {@link HighAvailabilityMode#ZOOKEEPER}. @@ -64,6 +67,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * checkpoints is consistent. Currently, after recovery we start out with only a single * checkpoint to circumvent those situations. */ +@ThreadSafe public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore { private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class); @@ -85,6 +89,10 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto private final Executor executor; + private final Object lock = new Object(); + + private boolean shutdown = false; + /** * Creates a {@link ZooKeeperCompletedCheckpointStore} instance. * @@ -127,77 +135,79 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto public void recover() throws Exception { LOG.info("Recovering checkpoints from ZooKeeper."); - // Get all there is first - List, String>> initialCheckpoints; - while (true) { - try { - initialCheckpoints = checkpointsInZooKeeper.getAllAndLock(); - break; - } - catch (ConcurrentModificationException e) { - LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying."); + synchronized (lock) { + checkState(!shutdown, "ZooKeeperCompletedCheckpointStore has been shut down"); + // Get all there is first + List, String>> initialCheckpoints; + while (true) { + try { + initialCheckpoints = checkpointsInZooKeeper.getAllAndLock(); + break; + } catch (ConcurrentModificationException e) { + LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying."); + } } - } - Collections.sort(initialCheckpoints, STRING_COMPARATOR); + Collections.sort(initialCheckpoints, STRING_COMPARATOR); - int numberOfInitialCheckpoints = initialCheckpoints.size(); + int numberOfInitialCheckpoints = initialCheckpoints.size(); - LOG.info("Found {} checkpoints in ZooKeeper.", numberOfInitialCheckpoints); + LOG.info("Found {} checkpoints in ZooKeeper.", numberOfInitialCheckpoints); - // Try and read the state handles from storage. We try until we either successfully read - // all of them or when we reach a stable state, i.e. when we successfully read the same set - // of checkpoints in two tries. We do it like this to protect against transient outages - // of the checkpoint store (for example a DFS): if the DFS comes online midway through - // reading a set of checkpoints we would run the risk of reading only a partial set - // of checkpoints while we could in fact read the other checkpoints as well if we retried. - // Waiting until a stable state protects against this while also being resilient against - // checkpoints being actually unreadable. - // - // These considerations are also important in the scope of incremental checkpoints, where - // we use ref-counting for shared state handles and might accidentally delete shared state - // of checkpoints that we don't read due to transient storage outages. - List lastTryRetrievedCheckpoints = new ArrayList<>(numberOfInitialCheckpoints); - List retrievedCheckpoints = new ArrayList<>(numberOfInitialCheckpoints); - do { - LOG.info("Trying to fetch {} checkpoints from storage.", numberOfInitialCheckpoints); + // Try and read the state handles from storage. We try until we either successfully read + // all of them or when we reach a stable state, i.e. when we successfully read the same set + // of checkpoints in two tries. We do it like this to protect against transient outages + // of the checkpoint store (for example a DFS): if the DFS comes online midway through + // reading a set of checkpoints we would run the risk of reading only a partial set + // of checkpoints while we could in fact read the other checkpoints as well if we retried. + // Waiting until a stable state protects against this while also being resilient against + // checkpoints being actually unreadable. + // + // These considerations are also important in the scope of incremental checkpoints, where + // we use ref-counting for shared state handles and might accidentally delete shared state + // of checkpoints that we don't read due to transient storage outages. + List lastTryRetrievedCheckpoints = new ArrayList<>(numberOfInitialCheckpoints); + List retrievedCheckpoints = new ArrayList<>(numberOfInitialCheckpoints); + do { + LOG.info("Trying to fetch {} checkpoints from storage.", numberOfInitialCheckpoints); - lastTryRetrievedCheckpoints.clear(); - lastTryRetrievedCheckpoints.addAll(retrievedCheckpoints); + lastTryRetrievedCheckpoints.clear(); + lastTryRetrievedCheckpoints.addAll(retrievedCheckpoints); - retrievedCheckpoints.clear(); + retrievedCheckpoints.clear(); - for (Tuple2, String> checkpointStateHandle : initialCheckpoints) { + for (Tuple2, String> checkpointStateHandle : initialCheckpoints) { - CompletedCheckpoint completedCheckpoint; + CompletedCheckpoint completedCheckpoint; - try { - completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle); - if (completedCheckpoint != null) { - retrievedCheckpoints.add(completedCheckpoint); + try { + completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle); + if (completedCheckpoint != null) { + retrievedCheckpoints.add(completedCheckpoint); + } + } catch (Exception e) { + LOG.warn("Could not retrieve checkpoint, not adding to list of recovered checkpoints.", e); } - } catch (Exception e) { - LOG.warn("Could not retrieve checkpoint, not adding to list of recovered checkpoints.", e); } - } - } while (retrievedCheckpoints.size() != numberOfInitialCheckpoints && - !CompletedCheckpoint.checkpointsMatch(lastTryRetrievedCheckpoints, retrievedCheckpoints)); - - // Clear local handles in order to prevent duplicates on - // recovery. The local handles should reflect the state - // of ZooKeeper. - completedCheckpoints.clear(); - completedCheckpoints.addAll(retrievedCheckpoints); - - if (completedCheckpoints.isEmpty() && numberOfInitialCheckpoints > 0) { - throw new FlinkException( - "Could not read any of the " + numberOfInitialCheckpoints + " checkpoints from storage."); - } else if (completedCheckpoints.size() != numberOfInitialCheckpoints) { - LOG.warn( - "Could only fetch {} of {} checkpoints from storage.", - completedCheckpoints.size(), - numberOfInitialCheckpoints); + } while (retrievedCheckpoints.size() != numberOfInitialCheckpoints && + !CompletedCheckpoint.checkpointsMatch(lastTryRetrievedCheckpoints, retrievedCheckpoints)); + + // Clear local handles in order to prevent duplicates on + // recovery. The local handles should reflect the state + // of ZooKeeper. + completedCheckpoints.clear(); + completedCheckpoints.addAll(retrievedCheckpoints); + + if (completedCheckpoints.isEmpty() && numberOfInitialCheckpoints > 0) { + throw new FlinkException( + "Could not read any of the " + numberOfInitialCheckpoints + " checkpoints from storage."); + } else if (completedCheckpoints.size() != numberOfInitialCheckpoints) { + LOG.warn( + "Could only fetch {} of {} checkpoints from storage.", + completedCheckpoints.size(), + numberOfInitialCheckpoints); + } } } @@ -210,20 +220,24 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto public void addCheckpoint(final CompletedCheckpoint checkpoint) throws Exception { checkNotNull(checkpoint, "Checkpoint"); - final String path = checkpointIdToPath(checkpoint.getCheckpointID()); + synchronized (lock) { + checkState(!shutdown, "ZooKeeperCompletedCheckpointStore has been shut down"); - // Now add the new one. If it fails, we don't want to loose existing data. - checkpointsInZooKeeper.addAndLock(path, checkpoint); + final String path = checkpointIdToPath(checkpoint.getCheckpointID()); - completedCheckpoints.addLast(checkpoint); + // Now add the new one. If it fails, we don't want to loose existing data. + checkpointsInZooKeeper.addAndLock(path, checkpoint); - // Everything worked, let's remove a previous checkpoint if necessary. - while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) { - final CompletedCheckpoint completedCheckpoint = completedCheckpoints.removeFirst(); - tryRemoveCompletedCheckpoint(completedCheckpoint, CompletedCheckpoint::discardOnSubsume); - } + completedCheckpoints.addLast(checkpoint); + + // Everything worked, let's remove a previous checkpoint if necessary. + while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) { + final CompletedCheckpoint completedCheckpoint = completedCheckpoints.removeFirst(); + tryRemoveCompletedCheckpoint(completedCheckpoint, CompletedCheckpoint::discardOnSubsume); + } - LOG.debug("Added {} to {}.", checkpoint, path); + LOG.debug("Added {} to {}.", checkpoint, path); + } } private void tryRemoveCompletedCheckpoint(CompletedCheckpoint completedCheckpoint, ThrowingConsumer discardCallback) { @@ -245,12 +259,16 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto @Override public List getAllCheckpoints() throws Exception { - return new ArrayList<>(completedCheckpoints); + synchronized (lock) { + return new ArrayList<>(completedCheckpoints); + } } @Override public int getNumberOfRetainedCheckpoints() { - return completedCheckpoints.size(); + synchronized (lock) { + return completedCheckpoints.size(); + } } @Override @@ -260,25 +278,30 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto @Override public void shutdown(JobStatus jobStatus) throws Exception { - if (jobStatus.isGloballyTerminalState()) { - LOG.info("Shutting down"); - - for (CompletedCheckpoint checkpoint : completedCheckpoints) { - tryRemoveCompletedCheckpoint( - checkpoint, - completedCheckpoint -> completedCheckpoint.discardOnShutdown(jobStatus)); - } + synchronized (lock) { + if (!shutdown) { + shutdown = true; + if (jobStatus.isGloballyTerminalState()) { + LOG.info("Shutting down"); + + for (CompletedCheckpoint checkpoint : completedCheckpoints) { + tryRemoveCompletedCheckpoint( + checkpoint, + completedCheckpoint -> completedCheckpoint.discardOnShutdown(jobStatus)); + } - completedCheckpoints.clear(); - checkpointsInZooKeeper.deleteChildren(); - } else { - LOG.info("Suspending"); + completedCheckpoints.clear(); + checkpointsInZooKeeper.deleteChildren(); + } else { + LOG.info("Suspending"); - // Clear the local handles, but don't remove any state - completedCheckpoints.clear(); + // Clear the local handles, but don't remove any state + completedCheckpoints.clear(); - // Release the state handle locks in ZooKeeper such that they can be deleted - checkpointsInZooKeeper.releaseAll(); + // Release the state handle locks in ZooKeeper such that they can be deleted + checkpointsInZooKeeper.releaseAll(); + } + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java index 859f1b940898539567a8c8ceb878c926a819fe2c..35465c2a0c176f0768e2d97eae09c2eac5abe1fc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java @@ -158,6 +158,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint assertNull(client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID()))); sharedStateRegistry.close(); + store = createCompletedCheckpoints(1); store.recover(); assertEquals(0, store.getNumberOfRetainedCheckpoints()); @@ -192,6 +193,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint // Recover again sharedStateRegistry.close(); + store = createCompletedCheckpoints(1); store.recover(); CompletedCheckpoint recovered = store.getLatestCheckpoint(false);