提交 f248e6df 编写于 作者: I ifndef-SleePy 提交者: Piotr Nowojski

[FLINK-14971][checkpointing] Make CompletedCheckpointStore thread-safe to...

[FLINK-14971][checkpointing] Make CompletedCheckpointStore thread-safe to avoid synchronization outside
上级 300263e1
...@@ -1370,6 +1370,7 @@ public class CheckpointCoordinator { ...@@ -1370,6 +1370,7 @@ public class CheckpointCoordinator {
return this.pendingCheckpoints.size(); return this.pendingCheckpoints.size();
} }
@VisibleForTesting
public int getNumberOfRetainedSuccessfulCheckpoints() { public int getNumberOfRetainedSuccessfulCheckpoints() {
synchronized (lock) { synchronized (lock) {
return completedCheckpointStore.getNumberOfRetainedCheckpoints(); return completedCheckpointStore.getNumberOfRetainedCheckpoints();
...@@ -1382,6 +1383,7 @@ public class CheckpointCoordinator { ...@@ -1382,6 +1383,7 @@ public class CheckpointCoordinator {
} }
} }
@VisibleForTesting
public List<CompletedCheckpoint> getSuccessfulCheckpoints() throws Exception { public List<CompletedCheckpoint> getSuccessfulCheckpoints() throws Exception {
synchronized (lock) { synchronized (lock) {
return completedCheckpointStore.getAllCheckpoints(); return completedCheckpointStore.getAllCheckpoints();
...@@ -1392,6 +1394,7 @@ public class CheckpointCoordinator { ...@@ -1392,6 +1394,7 @@ public class CheckpointCoordinator {
return checkpointStorage; return checkpointStorage;
} }
@VisibleForTesting
public CompletedCheckpointStore getCheckpointStore() { public CompletedCheckpointStore getCheckpointStore() {
return completedCheckpointStore; return completedCheckpointStore;
} }
......
...@@ -23,12 +23,16 @@ import org.apache.flink.api.common.JobStatus; ...@@ -23,12 +23,16 @@ import org.apache.flink.api.common.JobStatus;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.ThreadSafe;
import java.util.List; import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
/** /**
* A bounded LIFO-queue of {@link CompletedCheckpoint} instances. * A bounded LIFO-queue of {@link CompletedCheckpoint} instances.
* Note that it might be visited by multiple threads. So implementation should keep it thread-safe.
*/ */
@ThreadSafe
public interface CompletedCheckpointStore { public interface CompletedCheckpointStore {
Logger LOG = LoggerFactory.getLogger(CompletedCheckpointStore.class); Logger LOG = LoggerFactory.getLogger(CompletedCheckpointStore.class);
......
...@@ -24,15 +24,19 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; ...@@ -24,15 +24,19 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.ThreadSafe;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkState;
/** /**
* {@link CompletedCheckpointStore} for JobManagers running in {@link HighAvailabilityMode#NONE}. * {@link CompletedCheckpointStore} for JobManagers running in {@link HighAvailabilityMode#NONE}.
*/ */
@ThreadSafe
public class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore { public class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore {
private static final Logger LOG = LoggerFactory.getLogger(StandaloneCompletedCheckpointStore.class); private static final Logger LOG = LoggerFactory.getLogger(StandaloneCompletedCheckpointStore.class);
...@@ -43,6 +47,8 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt ...@@ -43,6 +47,8 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt
/** The completed checkpoints. */ /** The completed checkpoints. */
private final ArrayDeque<CompletedCheckpoint> checkpoints; private final ArrayDeque<CompletedCheckpoint> checkpoints;
private boolean shutdown = false;
/** /**
* Creates {@link StandaloneCompletedCheckpointStore}. * Creates {@link StandaloneCompletedCheckpointStore}.
* *
...@@ -64,26 +70,34 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt ...@@ -64,26 +70,34 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt
@Override @Override
public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception { public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
checkpoints.addLast(checkpoint); synchronized (checkpoints) {
checkState(!shutdown, "StandaloneCompletedCheckpointStore has been shut down");
checkpoints.addLast(checkpoint);
if (checkpoints.size() > maxNumberOfCheckpointsToRetain) { if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
try { try {
CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst(); CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst();
checkpointToSubsume.discardOnSubsume(); checkpointToSubsume.discardOnSubsume();
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Fail to subsume the old checkpoint.", e); LOG.warn("Fail to subsume the old checkpoint.", e);
}
} }
} }
} }
@Override @Override
public List<CompletedCheckpoint> getAllCheckpoints() { public List<CompletedCheckpoint> getAllCheckpoints() {
return new ArrayList<>(checkpoints); synchronized (checkpoints) {
return new ArrayList<>(checkpoints);
}
} }
@Override @Override
public int getNumberOfRetainedCheckpoints() { public int getNumberOfRetainedCheckpoints() {
return checkpoints.size(); synchronized (checkpoints) {
return checkpoints.size();
}
} }
@Override @Override
...@@ -93,14 +107,19 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt ...@@ -93,14 +107,19 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt
@Override @Override
public void shutdown(JobStatus jobStatus) throws Exception { public void shutdown(JobStatus jobStatus) throws Exception {
try { synchronized (checkpoints) {
LOG.info("Shutting down"); if (!shutdown) {
shutdown = true;
for (CompletedCheckpoint checkpoint : checkpoints) { try {
checkpoint.discardOnShutdown(jobStatus); LOG.info("Shutting down");
for (CompletedCheckpoint checkpoint : checkpoints) {
checkpoint.discardOnShutdown(jobStatus);
}
} finally {
checkpoints.clear();
}
} }
} finally {
checkpoints.clear();
} }
} }
......
...@@ -29,6 +29,8 @@ import org.apache.flink.util.function.ThrowingConsumer; ...@@ -29,6 +29,8 @@ import org.apache.flink.util.function.ThrowingConsumer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.ThreadSafe;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -40,6 +42,7 @@ import java.util.concurrent.Executor; ...@@ -40,6 +42,7 @@ import java.util.concurrent.Executor;
import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
/** /**
* {@link CompletedCheckpointStore} for JobManagers running in {@link HighAvailabilityMode#ZOOKEEPER}. * {@link CompletedCheckpointStore} for JobManagers running in {@link HighAvailabilityMode#ZOOKEEPER}.
...@@ -64,6 +67,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; ...@@ -64,6 +67,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* checkpoints is consistent. Currently, after recovery we start out with only a single * checkpoints is consistent. Currently, after recovery we start out with only a single
* checkpoint to circumvent those situations. * checkpoint to circumvent those situations.
*/ */
@ThreadSafe
public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore { public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore {
private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class); private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class);
...@@ -85,6 +89,10 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto ...@@ -85,6 +89,10 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
private final Executor executor; private final Executor executor;
private final Object lock = new Object();
private boolean shutdown = false;
/** /**
* Creates a {@link ZooKeeperCompletedCheckpointStore} instance. * Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
* *
...@@ -127,77 +135,79 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto ...@@ -127,77 +135,79 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
public void recover() throws Exception { public void recover() throws Exception {
LOG.info("Recovering checkpoints from ZooKeeper."); LOG.info("Recovering checkpoints from ZooKeeper.");
// Get all there is first synchronized (lock) {
List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints; checkState(!shutdown, "ZooKeeperCompletedCheckpointStore has been shut down");
while (true) { // Get all there is first
try { List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints;
initialCheckpoints = checkpointsInZooKeeper.getAllAndLock(); while (true) {
break; try {
} initialCheckpoints = checkpointsInZooKeeper.getAllAndLock();
catch (ConcurrentModificationException e) { break;
LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying."); } catch (ConcurrentModificationException e) {
LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying.");
}
} }
}
Collections.sort(initialCheckpoints, STRING_COMPARATOR); Collections.sort(initialCheckpoints, STRING_COMPARATOR);
int numberOfInitialCheckpoints = initialCheckpoints.size(); int numberOfInitialCheckpoints = initialCheckpoints.size();
LOG.info("Found {} checkpoints in ZooKeeper.", numberOfInitialCheckpoints); LOG.info("Found {} checkpoints in ZooKeeper.", numberOfInitialCheckpoints);
// Try and read the state handles from storage. We try until we either successfully read // Try and read the state handles from storage. We try until we either successfully read
// all of them or when we reach a stable state, i.e. when we successfully read the same set // all of them or when we reach a stable state, i.e. when we successfully read the same set
// of checkpoints in two tries. We do it like this to protect against transient outages // of checkpoints in two tries. We do it like this to protect against transient outages
// of the checkpoint store (for example a DFS): if the DFS comes online midway through // of the checkpoint store (for example a DFS): if the DFS comes online midway through
// reading a set of checkpoints we would run the risk of reading only a partial set // reading a set of checkpoints we would run the risk of reading only a partial set
// of checkpoints while we could in fact read the other checkpoints as well if we retried. // of checkpoints while we could in fact read the other checkpoints as well if we retried.
// Waiting until a stable state protects against this while also being resilient against // Waiting until a stable state protects against this while also being resilient against
// checkpoints being actually unreadable. // checkpoints being actually unreadable.
// //
// These considerations are also important in the scope of incremental checkpoints, where // These considerations are also important in the scope of incremental checkpoints, where
// we use ref-counting for shared state handles and might accidentally delete shared state // we use ref-counting for shared state handles and might accidentally delete shared state
// of checkpoints that we don't read due to transient storage outages. // of checkpoints that we don't read due to transient storage outages.
List<CompletedCheckpoint> lastTryRetrievedCheckpoints = new ArrayList<>(numberOfInitialCheckpoints); List<CompletedCheckpoint> lastTryRetrievedCheckpoints = new ArrayList<>(numberOfInitialCheckpoints);
List<CompletedCheckpoint> retrievedCheckpoints = new ArrayList<>(numberOfInitialCheckpoints); List<CompletedCheckpoint> retrievedCheckpoints = new ArrayList<>(numberOfInitialCheckpoints);
do { do {
LOG.info("Trying to fetch {} checkpoints from storage.", numberOfInitialCheckpoints); LOG.info("Trying to fetch {} checkpoints from storage.", numberOfInitialCheckpoints);
lastTryRetrievedCheckpoints.clear(); lastTryRetrievedCheckpoints.clear();
lastTryRetrievedCheckpoints.addAll(retrievedCheckpoints); lastTryRetrievedCheckpoints.addAll(retrievedCheckpoints);
retrievedCheckpoints.clear(); retrievedCheckpoints.clear();
for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle : initialCheckpoints) { for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle : initialCheckpoints) {
CompletedCheckpoint completedCheckpoint; CompletedCheckpoint completedCheckpoint;
try { try {
completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle); completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle);
if (completedCheckpoint != null) { if (completedCheckpoint != null) {
retrievedCheckpoints.add(completedCheckpoint); retrievedCheckpoints.add(completedCheckpoint);
}
} catch (Exception e) {
LOG.warn("Could not retrieve checkpoint, not adding to list of recovered checkpoints.", e);
} }
} catch (Exception e) {
LOG.warn("Could not retrieve checkpoint, not adding to list of recovered checkpoints.", e);
} }
}
} while (retrievedCheckpoints.size() != numberOfInitialCheckpoints && } while (retrievedCheckpoints.size() != numberOfInitialCheckpoints &&
!CompletedCheckpoint.checkpointsMatch(lastTryRetrievedCheckpoints, retrievedCheckpoints)); !CompletedCheckpoint.checkpointsMatch(lastTryRetrievedCheckpoints, retrievedCheckpoints));
// Clear local handles in order to prevent duplicates on // Clear local handles in order to prevent duplicates on
// recovery. The local handles should reflect the state // recovery. The local handles should reflect the state
// of ZooKeeper. // of ZooKeeper.
completedCheckpoints.clear(); completedCheckpoints.clear();
completedCheckpoints.addAll(retrievedCheckpoints); completedCheckpoints.addAll(retrievedCheckpoints);
if (completedCheckpoints.isEmpty() && numberOfInitialCheckpoints > 0) { if (completedCheckpoints.isEmpty() && numberOfInitialCheckpoints > 0) {
throw new FlinkException( throw new FlinkException(
"Could not read any of the " + numberOfInitialCheckpoints + " checkpoints from storage."); "Could not read any of the " + numberOfInitialCheckpoints + " checkpoints from storage.");
} else if (completedCheckpoints.size() != numberOfInitialCheckpoints) { } else if (completedCheckpoints.size() != numberOfInitialCheckpoints) {
LOG.warn( LOG.warn(
"Could only fetch {} of {} checkpoints from storage.", "Could only fetch {} of {} checkpoints from storage.",
completedCheckpoints.size(), completedCheckpoints.size(),
numberOfInitialCheckpoints); numberOfInitialCheckpoints);
}
} }
} }
...@@ -210,20 +220,24 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto ...@@ -210,20 +220,24 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
public void addCheckpoint(final CompletedCheckpoint checkpoint) throws Exception { public void addCheckpoint(final CompletedCheckpoint checkpoint) throws Exception {
checkNotNull(checkpoint, "Checkpoint"); checkNotNull(checkpoint, "Checkpoint");
final String path = checkpointIdToPath(checkpoint.getCheckpointID()); synchronized (lock) {
checkState(!shutdown, "ZooKeeperCompletedCheckpointStore has been shut down");
// Now add the new one. If it fails, we don't want to loose existing data. final String path = checkpointIdToPath(checkpoint.getCheckpointID());
checkpointsInZooKeeper.addAndLock(path, checkpoint);
completedCheckpoints.addLast(checkpoint); // Now add the new one. If it fails, we don't want to loose existing data.
checkpointsInZooKeeper.addAndLock(path, checkpoint);
// Everything worked, let's remove a previous checkpoint if necessary. completedCheckpoints.addLast(checkpoint);
while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) {
final CompletedCheckpoint completedCheckpoint = completedCheckpoints.removeFirst(); // Everything worked, let's remove a previous checkpoint if necessary.
tryRemoveCompletedCheckpoint(completedCheckpoint, CompletedCheckpoint::discardOnSubsume); while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) {
} final CompletedCheckpoint completedCheckpoint = completedCheckpoints.removeFirst();
tryRemoveCompletedCheckpoint(completedCheckpoint, CompletedCheckpoint::discardOnSubsume);
}
LOG.debug("Added {} to {}.", checkpoint, path); LOG.debug("Added {} to {}.", checkpoint, path);
}
} }
private void tryRemoveCompletedCheckpoint(CompletedCheckpoint completedCheckpoint, ThrowingConsumer<CompletedCheckpoint, Exception> discardCallback) { private void tryRemoveCompletedCheckpoint(CompletedCheckpoint completedCheckpoint, ThrowingConsumer<CompletedCheckpoint, Exception> discardCallback) {
...@@ -245,12 +259,16 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto ...@@ -245,12 +259,16 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
@Override @Override
public List<CompletedCheckpoint> getAllCheckpoints() throws Exception { public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
return new ArrayList<>(completedCheckpoints); synchronized (lock) {
return new ArrayList<>(completedCheckpoints);
}
} }
@Override @Override
public int getNumberOfRetainedCheckpoints() { public int getNumberOfRetainedCheckpoints() {
return completedCheckpoints.size(); synchronized (lock) {
return completedCheckpoints.size();
}
} }
@Override @Override
...@@ -260,25 +278,30 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto ...@@ -260,25 +278,30 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
@Override @Override
public void shutdown(JobStatus jobStatus) throws Exception { public void shutdown(JobStatus jobStatus) throws Exception {
if (jobStatus.isGloballyTerminalState()) { synchronized (lock) {
LOG.info("Shutting down"); if (!shutdown) {
shutdown = true;
for (CompletedCheckpoint checkpoint : completedCheckpoints) { if (jobStatus.isGloballyTerminalState()) {
tryRemoveCompletedCheckpoint( LOG.info("Shutting down");
checkpoint,
completedCheckpoint -> completedCheckpoint.discardOnShutdown(jobStatus)); for (CompletedCheckpoint checkpoint : completedCheckpoints) {
} tryRemoveCompletedCheckpoint(
checkpoint,
completedCheckpoint -> completedCheckpoint.discardOnShutdown(jobStatus));
}
completedCheckpoints.clear(); completedCheckpoints.clear();
checkpointsInZooKeeper.deleteChildren(); checkpointsInZooKeeper.deleteChildren();
} else { } else {
LOG.info("Suspending"); LOG.info("Suspending");
// Clear the local handles, but don't remove any state // Clear the local handles, but don't remove any state
completedCheckpoints.clear(); completedCheckpoints.clear();
// Release the state handle locks in ZooKeeper such that they can be deleted // Release the state handle locks in ZooKeeper such that they can be deleted
checkpointsInZooKeeper.releaseAll(); checkpointsInZooKeeper.releaseAll();
}
}
} }
} }
......
...@@ -158,6 +158,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint ...@@ -158,6 +158,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
assertNull(client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID()))); assertNull(client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
sharedStateRegistry.close(); sharedStateRegistry.close();
store = createCompletedCheckpoints(1);
store.recover(); store.recover();
assertEquals(0, store.getNumberOfRetainedCheckpoints()); assertEquals(0, store.getNumberOfRetainedCheckpoints());
...@@ -192,6 +193,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint ...@@ -192,6 +193,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
// Recover again // Recover again
sharedStateRegistry.close(); sharedStateRegistry.close();
store = createCompletedCheckpoints(1);
store.recover(); store.recover();
CompletedCheckpoint recovered = store.getLatestCheckpoint(false); CompletedCheckpoint recovered = store.getLatestCheckpoint(false);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册