diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index fa18a143429470d403a7db0276fb8efe52dc1e3c..35cace779b15f605ca5dd377e8eb2ec3df41a210 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -1370,7 +1370,6 @@ public class CheckpointCoordinator { return this.pendingCheckpoints.size(); } - @VisibleForTesting public int getNumberOfRetainedSuccessfulCheckpoints() { synchronized (lock) { return completedCheckpointStore.getNumberOfRetainedCheckpoints(); @@ -1383,7 +1382,6 @@ public class CheckpointCoordinator { } } - @VisibleForTesting public List getSuccessfulCheckpoints() throws Exception { synchronized (lock) { return completedCheckpointStore.getAllCheckpoints(); @@ -1394,7 +1392,6 @@ public class CheckpointCoordinator { return checkpointStorage; } - @VisibleForTesting public CompletedCheckpointStore getCheckpointStore() { return completedCheckpointStore; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java index 80ca9fbfb01fa0597c7f8c464e5a37f58609ebcc..42f608b82366b5adf57226a114545ffe20478eff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java @@ -23,16 +23,12 @@ import org.apache.flink.api.common.JobStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.concurrent.ThreadSafe; - import java.util.List; import java.util.ListIterator; /** * A bounded LIFO-queue of {@link CompletedCheckpoint} instances. - * Note that it might be visited by multiple threads. So implementation should keep it thread-safe. */ -@ThreadSafe public interface CompletedCheckpointStore { Logger LOG = LoggerFactory.getLogger(CompletedCheckpointStore.class); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java index 4cc9931ee3fc36a3fef48a58e225a96cc788bc30..ddaf9095f801cdda60f7d127e14d4aeecf2d041c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java @@ -24,19 +24,15 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.concurrent.ThreadSafe; - import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkState; /** * {@link CompletedCheckpointStore} for JobManagers running in {@link HighAvailabilityMode#NONE}. */ -@ThreadSafe public class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore { private static final Logger LOG = LoggerFactory.getLogger(StandaloneCompletedCheckpointStore.class); @@ -47,8 +43,6 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt /** The completed checkpoints. */ private final ArrayDeque checkpoints; - private boolean shutdown = false; - /** * Creates {@link StandaloneCompletedCheckpointStore}. * @@ -70,34 +64,26 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt @Override public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception { - synchronized (checkpoints) { - checkState(!shutdown, "StandaloneCompletedCheckpointStore has been shut down"); - - checkpoints.addLast(checkpoint); + checkpoints.addLast(checkpoint); - if (checkpoints.size() > maxNumberOfCheckpointsToRetain) { - try { - CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst(); - checkpointToSubsume.discardOnSubsume(); - } catch (Exception e) { - LOG.warn("Fail to subsume the old checkpoint.", e); - } + if (checkpoints.size() > maxNumberOfCheckpointsToRetain) { + try { + CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst(); + checkpointToSubsume.discardOnSubsume(); + } catch (Exception e) { + LOG.warn("Fail to subsume the old checkpoint.", e); } } } @Override public List getAllCheckpoints() { - synchronized (checkpoints) { - return new ArrayList<>(checkpoints); - } + return new ArrayList<>(checkpoints); } @Override public int getNumberOfRetainedCheckpoints() { - synchronized (checkpoints) { - return checkpoints.size(); - } + return checkpoints.size(); } @Override @@ -107,19 +93,14 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt @Override public void shutdown(JobStatus jobStatus) throws Exception { - synchronized (checkpoints) { - if (!shutdown) { - shutdown = true; - try { - LOG.info("Shutting down"); - - for (CompletedCheckpoint checkpoint : checkpoints) { - checkpoint.discardOnShutdown(jobStatus); - } - } finally { - checkpoints.clear(); - } + try { + LOG.info("Shutting down"); + + for (CompletedCheckpoint checkpoint : checkpoints) { + checkpoint.discardOnShutdown(jobStatus); } + } finally { + checkpoints.clear(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index 80b6d500ce86b03bb2e0c79142ee1e5f46b4cedb..b4f72b7436d6154f7476a21399a934005baaa5ac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -29,8 +29,6 @@ import org.apache.flink.util.function.ThrowingConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.concurrent.ThreadSafe; - import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; @@ -42,7 +40,6 @@ import java.util.concurrent.Executor; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; /** * {@link CompletedCheckpointStore} for JobManagers running in {@link HighAvailabilityMode#ZOOKEEPER}. @@ -67,7 +64,6 @@ import static org.apache.flink.util.Preconditions.checkState; * checkpoints is consistent. Currently, after recovery we start out with only a single * checkpoint to circumvent those situations. */ -@ThreadSafe public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore { private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class); @@ -89,10 +85,6 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto private final Executor executor; - private final Object lock = new Object(); - - private boolean shutdown = false; - /** * Creates a {@link ZooKeeperCompletedCheckpointStore} instance. * @@ -135,79 +127,77 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto public void recover() throws Exception { LOG.info("Recovering checkpoints from ZooKeeper."); - synchronized (lock) { - checkState(!shutdown, "ZooKeeperCompletedCheckpointStore has been shut down"); - // Get all there is first - List, String>> initialCheckpoints; - while (true) { - try { - initialCheckpoints = checkpointsInZooKeeper.getAllAndLock(); - break; - } catch (ConcurrentModificationException e) { - LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying."); - } + // Get all there is first + List, String>> initialCheckpoints; + while (true) { + try { + initialCheckpoints = checkpointsInZooKeeper.getAllAndLock(); + break; + } + catch (ConcurrentModificationException e) { + LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying."); } + } - Collections.sort(initialCheckpoints, STRING_COMPARATOR); + Collections.sort(initialCheckpoints, STRING_COMPARATOR); - int numberOfInitialCheckpoints = initialCheckpoints.size(); + int numberOfInitialCheckpoints = initialCheckpoints.size(); - LOG.info("Found {} checkpoints in ZooKeeper.", numberOfInitialCheckpoints); + LOG.info("Found {} checkpoints in ZooKeeper.", numberOfInitialCheckpoints); - // Try and read the state handles from storage. We try until we either successfully read - // all of them or when we reach a stable state, i.e. when we successfully read the same set - // of checkpoints in two tries. We do it like this to protect against transient outages - // of the checkpoint store (for example a DFS): if the DFS comes online midway through - // reading a set of checkpoints we would run the risk of reading only a partial set - // of checkpoints while we could in fact read the other checkpoints as well if we retried. - // Waiting until a stable state protects against this while also being resilient against - // checkpoints being actually unreadable. - // - // These considerations are also important in the scope of incremental checkpoints, where - // we use ref-counting for shared state handles and might accidentally delete shared state - // of checkpoints that we don't read due to transient storage outages. - List lastTryRetrievedCheckpoints = new ArrayList<>(numberOfInitialCheckpoints); - List retrievedCheckpoints = new ArrayList<>(numberOfInitialCheckpoints); - do { - LOG.info("Trying to fetch {} checkpoints from storage.", numberOfInitialCheckpoints); + // Try and read the state handles from storage. We try until we either successfully read + // all of them or when we reach a stable state, i.e. when we successfully read the same set + // of checkpoints in two tries. We do it like this to protect against transient outages + // of the checkpoint store (for example a DFS): if the DFS comes online midway through + // reading a set of checkpoints we would run the risk of reading only a partial set + // of checkpoints while we could in fact read the other checkpoints as well if we retried. + // Waiting until a stable state protects against this while also being resilient against + // checkpoints being actually unreadable. + // + // These considerations are also important in the scope of incremental checkpoints, where + // we use ref-counting for shared state handles and might accidentally delete shared state + // of checkpoints that we don't read due to transient storage outages. + List lastTryRetrievedCheckpoints = new ArrayList<>(numberOfInitialCheckpoints); + List retrievedCheckpoints = new ArrayList<>(numberOfInitialCheckpoints); + do { + LOG.info("Trying to fetch {} checkpoints from storage.", numberOfInitialCheckpoints); - lastTryRetrievedCheckpoints.clear(); - lastTryRetrievedCheckpoints.addAll(retrievedCheckpoints); + lastTryRetrievedCheckpoints.clear(); + lastTryRetrievedCheckpoints.addAll(retrievedCheckpoints); - retrievedCheckpoints.clear(); + retrievedCheckpoints.clear(); - for (Tuple2, String> checkpointStateHandle : initialCheckpoints) { + for (Tuple2, String> checkpointStateHandle : initialCheckpoints) { - CompletedCheckpoint completedCheckpoint; + CompletedCheckpoint completedCheckpoint; - try { - completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle); - if (completedCheckpoint != null) { - retrievedCheckpoints.add(completedCheckpoint); - } - } catch (Exception e) { - LOG.warn("Could not retrieve checkpoint, not adding to list of recovered checkpoints.", e); + try { + completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle); + if (completedCheckpoint != null) { + retrievedCheckpoints.add(completedCheckpoint); } + } catch (Exception e) { + LOG.warn("Could not retrieve checkpoint, not adding to list of recovered checkpoints.", e); } - - } while (retrievedCheckpoints.size() != numberOfInitialCheckpoints && - !CompletedCheckpoint.checkpointsMatch(lastTryRetrievedCheckpoints, retrievedCheckpoints)); - - // Clear local handles in order to prevent duplicates on - // recovery. The local handles should reflect the state - // of ZooKeeper. - completedCheckpoints.clear(); - completedCheckpoints.addAll(retrievedCheckpoints); - - if (completedCheckpoints.isEmpty() && numberOfInitialCheckpoints > 0) { - throw new FlinkException( - "Could not read any of the " + numberOfInitialCheckpoints + " checkpoints from storage."); - } else if (completedCheckpoints.size() != numberOfInitialCheckpoints) { - LOG.warn( - "Could only fetch {} of {} checkpoints from storage.", - completedCheckpoints.size(), - numberOfInitialCheckpoints); } + + } while (retrievedCheckpoints.size() != numberOfInitialCheckpoints && + !CompletedCheckpoint.checkpointsMatch(lastTryRetrievedCheckpoints, retrievedCheckpoints)); + + // Clear local handles in order to prevent duplicates on + // recovery. The local handles should reflect the state + // of ZooKeeper. + completedCheckpoints.clear(); + completedCheckpoints.addAll(retrievedCheckpoints); + + if (completedCheckpoints.isEmpty() && numberOfInitialCheckpoints > 0) { + throw new FlinkException( + "Could not read any of the " + numberOfInitialCheckpoints + " checkpoints from storage."); + } else if (completedCheckpoints.size() != numberOfInitialCheckpoints) { + LOG.warn( + "Could only fetch {} of {} checkpoints from storage.", + completedCheckpoints.size(), + numberOfInitialCheckpoints); } } @@ -220,24 +210,20 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto public void addCheckpoint(final CompletedCheckpoint checkpoint) throws Exception { checkNotNull(checkpoint, "Checkpoint"); - synchronized (lock) { - checkState(!shutdown, "ZooKeeperCompletedCheckpointStore has been shut down"); + final String path = checkpointIdToPath(checkpoint.getCheckpointID()); - final String path = checkpointIdToPath(checkpoint.getCheckpointID()); + // Now add the new one. If it fails, we don't want to loose existing data. + checkpointsInZooKeeper.addAndLock(path, checkpoint); - // Now add the new one. If it fails, we don't want to loose existing data. - checkpointsInZooKeeper.addAndLock(path, checkpoint); + completedCheckpoints.addLast(checkpoint); - completedCheckpoints.addLast(checkpoint); - - // Everything worked, let's remove a previous checkpoint if necessary. - while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) { - final CompletedCheckpoint completedCheckpoint = completedCheckpoints.removeFirst(); - tryRemoveCompletedCheckpoint(completedCheckpoint, CompletedCheckpoint::discardOnSubsume); - } - - LOG.debug("Added {} to {}.", checkpoint, path); + // Everything worked, let's remove a previous checkpoint if necessary. + while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) { + final CompletedCheckpoint completedCheckpoint = completedCheckpoints.removeFirst(); + tryRemoveCompletedCheckpoint(completedCheckpoint, CompletedCheckpoint::discardOnSubsume); } + + LOG.debug("Added {} to {}.", checkpoint, path); } private void tryRemoveCompletedCheckpoint(CompletedCheckpoint completedCheckpoint, ThrowingConsumer discardCallback) { @@ -259,16 +245,12 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto @Override public List getAllCheckpoints() throws Exception { - synchronized (lock) { - return new ArrayList<>(completedCheckpoints); - } + return new ArrayList<>(completedCheckpoints); } @Override public int getNumberOfRetainedCheckpoints() { - synchronized (lock) { - return completedCheckpoints.size(); - } + return completedCheckpoints.size(); } @Override @@ -278,30 +260,25 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto @Override public void shutdown(JobStatus jobStatus) throws Exception { - synchronized (lock) { - if (!shutdown) { - shutdown = true; - if (jobStatus.isGloballyTerminalState()) { - LOG.info("Shutting down"); - - for (CompletedCheckpoint checkpoint : completedCheckpoints) { - tryRemoveCompletedCheckpoint( - checkpoint, - completedCheckpoint -> completedCheckpoint.discardOnShutdown(jobStatus)); - } + if (jobStatus.isGloballyTerminalState()) { + LOG.info("Shutting down"); - completedCheckpoints.clear(); - checkpointsInZooKeeper.deleteChildren(); - } else { - LOG.info("Suspending"); + for (CompletedCheckpoint checkpoint : completedCheckpoints) { + tryRemoveCompletedCheckpoint( + checkpoint, + completedCheckpoint -> completedCheckpoint.discardOnShutdown(jobStatus)); + } - // Clear the local handles, but don't remove any state - completedCheckpoints.clear(); + completedCheckpoints.clear(); + checkpointsInZooKeeper.deleteChildren(); + } else { + LOG.info("Suspending"); - // Release the state handle locks in ZooKeeper such that they can be deleted - checkpointsInZooKeeper.releaseAll(); - } - } + // Clear the local handles, but don't remove any state + completedCheckpoints.clear(); + + // Release the state handle locks in ZooKeeper such that they can be deleted + checkpointsInZooKeeper.releaseAll(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java index 35465c2a0c176f0768e2d97eae09c2eac5abe1fc..859f1b940898539567a8c8ceb878c926a819fe2c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java @@ -158,7 +158,6 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint assertNull(client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID()))); sharedStateRegistry.close(); - store = createCompletedCheckpoints(1); store.recover(); assertEquals(0, store.getNumberOfRetainedCheckpoints()); @@ -193,7 +192,6 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint // Recover again sharedStateRegistry.close(); - store = createCompletedCheckpoints(1); store.recover(); CompletedCheckpoint recovered = store.getLatestCheckpoint(false);