提交 6adbe94b 编写于 作者: I ifndef-SleePy 提交者: Piotr Nowojski

Revert "[FLINK-14971][checkpointing] Make CompletedCheckpointStore thread-safe...

Revert "[FLINK-14971][checkpointing] Make CompletedCheckpointStore thread-safe to avoid synchronization outside"

This reverts commit f248e6df.
上级 214896ae
......@@ -1370,7 +1370,6 @@ public class CheckpointCoordinator {
return this.pendingCheckpoints.size();
}
@VisibleForTesting
public int getNumberOfRetainedSuccessfulCheckpoints() {
synchronized (lock) {
return completedCheckpointStore.getNumberOfRetainedCheckpoints();
......@@ -1383,7 +1382,6 @@ public class CheckpointCoordinator {
}
}
@VisibleForTesting
public List<CompletedCheckpoint> getSuccessfulCheckpoints() throws Exception {
synchronized (lock) {
return completedCheckpointStore.getAllCheckpoints();
......@@ -1394,7 +1392,6 @@ public class CheckpointCoordinator {
return checkpointStorage;
}
@VisibleForTesting
public CompletedCheckpointStore getCheckpointStore() {
return completedCheckpointStore;
}
......
......@@ -23,16 +23,12 @@ import org.apache.flink.api.common.JobStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.ThreadSafe;
import java.util.List;
import java.util.ListIterator;
/**
* A bounded LIFO-queue of {@link CompletedCheckpoint} instances.
* Note that it might be visited by multiple threads. So implementation should keep it thread-safe.
*/
@ThreadSafe
public interface CompletedCheckpointStore {
Logger LOG = LoggerFactory.getLogger(CompletedCheckpointStore.class);
......
......@@ -24,19 +24,15 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.ThreadSafe;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkState;
/**
* {@link CompletedCheckpointStore} for JobManagers running in {@link HighAvailabilityMode#NONE}.
*/
@ThreadSafe
public class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore {
private static final Logger LOG = LoggerFactory.getLogger(StandaloneCompletedCheckpointStore.class);
......@@ -47,8 +43,6 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt
/** The completed checkpoints. */
private final ArrayDeque<CompletedCheckpoint> checkpoints;
private boolean shutdown = false;
/**
* Creates {@link StandaloneCompletedCheckpointStore}.
*
......@@ -70,34 +64,26 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt
@Override
public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
synchronized (checkpoints) {
checkState(!shutdown, "StandaloneCompletedCheckpointStore has been shut down");
checkpoints.addLast(checkpoint);
checkpoints.addLast(checkpoint);
if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
try {
CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst();
checkpointToSubsume.discardOnSubsume();
} catch (Exception e) {
LOG.warn("Fail to subsume the old checkpoint.", e);
}
if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
try {
CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst();
checkpointToSubsume.discardOnSubsume();
} catch (Exception e) {
LOG.warn("Fail to subsume the old checkpoint.", e);
}
}
}
@Override
public List<CompletedCheckpoint> getAllCheckpoints() {
synchronized (checkpoints) {
return new ArrayList<>(checkpoints);
}
return new ArrayList<>(checkpoints);
}
@Override
public int getNumberOfRetainedCheckpoints() {
synchronized (checkpoints) {
return checkpoints.size();
}
return checkpoints.size();
}
@Override
......@@ -107,19 +93,14 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt
@Override
public void shutdown(JobStatus jobStatus) throws Exception {
synchronized (checkpoints) {
if (!shutdown) {
shutdown = true;
try {
LOG.info("Shutting down");
for (CompletedCheckpoint checkpoint : checkpoints) {
checkpoint.discardOnShutdown(jobStatus);
}
} finally {
checkpoints.clear();
}
try {
LOG.info("Shutting down");
for (CompletedCheckpoint checkpoint : checkpoints) {
checkpoint.discardOnShutdown(jobStatus);
}
} finally {
checkpoints.clear();
}
}
......
......@@ -29,8 +29,6 @@ import org.apache.flink.util.function.ThrowingConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.ThreadSafe;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
......@@ -42,7 +40,6 @@ import java.util.concurrent.Executor;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
/**
* {@link CompletedCheckpointStore} for JobManagers running in {@link HighAvailabilityMode#ZOOKEEPER}.
......@@ -67,7 +64,6 @@ import static org.apache.flink.util.Preconditions.checkState;
* checkpoints is consistent. Currently, after recovery we start out with only a single
* checkpoint to circumvent those situations.
*/
@ThreadSafe
public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore {
private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class);
......@@ -89,10 +85,6 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
private final Executor executor;
private final Object lock = new Object();
private boolean shutdown = false;
/**
* Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
*
......@@ -135,79 +127,77 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
public void recover() throws Exception {
LOG.info("Recovering checkpoints from ZooKeeper.");
synchronized (lock) {
checkState(!shutdown, "ZooKeeperCompletedCheckpointStore has been shut down");
// Get all there is first
List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints;
while (true) {
try {
initialCheckpoints = checkpointsInZooKeeper.getAllAndLock();
break;
} catch (ConcurrentModificationException e) {
LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying.");
}
// Get all there is first
List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints;
while (true) {
try {
initialCheckpoints = checkpointsInZooKeeper.getAllAndLock();
break;
}
catch (ConcurrentModificationException e) {
LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying.");
}
}
Collections.sort(initialCheckpoints, STRING_COMPARATOR);
Collections.sort(initialCheckpoints, STRING_COMPARATOR);
int numberOfInitialCheckpoints = initialCheckpoints.size();
int numberOfInitialCheckpoints = initialCheckpoints.size();
LOG.info("Found {} checkpoints in ZooKeeper.", numberOfInitialCheckpoints);
LOG.info("Found {} checkpoints in ZooKeeper.", numberOfInitialCheckpoints);
// Try and read the state handles from storage. We try until we either successfully read
// all of them or when we reach a stable state, i.e. when we successfully read the same set
// of checkpoints in two tries. We do it like this to protect against transient outages
// of the checkpoint store (for example a DFS): if the DFS comes online midway through
// reading a set of checkpoints we would run the risk of reading only a partial set
// of checkpoints while we could in fact read the other checkpoints as well if we retried.
// Waiting until a stable state protects against this while also being resilient against
// checkpoints being actually unreadable.
//
// These considerations are also important in the scope of incremental checkpoints, where
// we use ref-counting for shared state handles and might accidentally delete shared state
// of checkpoints that we don't read due to transient storage outages.
List<CompletedCheckpoint> lastTryRetrievedCheckpoints = new ArrayList<>(numberOfInitialCheckpoints);
List<CompletedCheckpoint> retrievedCheckpoints = new ArrayList<>(numberOfInitialCheckpoints);
do {
LOG.info("Trying to fetch {} checkpoints from storage.", numberOfInitialCheckpoints);
// Try and read the state handles from storage. We try until we either successfully read
// all of them or when we reach a stable state, i.e. when we successfully read the same set
// of checkpoints in two tries. We do it like this to protect against transient outages
// of the checkpoint store (for example a DFS): if the DFS comes online midway through
// reading a set of checkpoints we would run the risk of reading only a partial set
// of checkpoints while we could in fact read the other checkpoints as well if we retried.
// Waiting until a stable state protects against this while also being resilient against
// checkpoints being actually unreadable.
//
// These considerations are also important in the scope of incremental checkpoints, where
// we use ref-counting for shared state handles and might accidentally delete shared state
// of checkpoints that we don't read due to transient storage outages.
List<CompletedCheckpoint> lastTryRetrievedCheckpoints = new ArrayList<>(numberOfInitialCheckpoints);
List<CompletedCheckpoint> retrievedCheckpoints = new ArrayList<>(numberOfInitialCheckpoints);
do {
LOG.info("Trying to fetch {} checkpoints from storage.", numberOfInitialCheckpoints);
lastTryRetrievedCheckpoints.clear();
lastTryRetrievedCheckpoints.addAll(retrievedCheckpoints);
lastTryRetrievedCheckpoints.clear();
lastTryRetrievedCheckpoints.addAll(retrievedCheckpoints);
retrievedCheckpoints.clear();
retrievedCheckpoints.clear();
for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle : initialCheckpoints) {
for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle : initialCheckpoints) {
CompletedCheckpoint completedCheckpoint;
CompletedCheckpoint completedCheckpoint;
try {
completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle);
if (completedCheckpoint != null) {
retrievedCheckpoints.add(completedCheckpoint);
}
} catch (Exception e) {
LOG.warn("Could not retrieve checkpoint, not adding to list of recovered checkpoints.", e);
try {
completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle);
if (completedCheckpoint != null) {
retrievedCheckpoints.add(completedCheckpoint);
}
} catch (Exception e) {
LOG.warn("Could not retrieve checkpoint, not adding to list of recovered checkpoints.", e);
}
} while (retrievedCheckpoints.size() != numberOfInitialCheckpoints &&
!CompletedCheckpoint.checkpointsMatch(lastTryRetrievedCheckpoints, retrievedCheckpoints));
// Clear local handles in order to prevent duplicates on
// recovery. The local handles should reflect the state
// of ZooKeeper.
completedCheckpoints.clear();
completedCheckpoints.addAll(retrievedCheckpoints);
if (completedCheckpoints.isEmpty() && numberOfInitialCheckpoints > 0) {
throw new FlinkException(
"Could not read any of the " + numberOfInitialCheckpoints + " checkpoints from storage.");
} else if (completedCheckpoints.size() != numberOfInitialCheckpoints) {
LOG.warn(
"Could only fetch {} of {} checkpoints from storage.",
completedCheckpoints.size(),
numberOfInitialCheckpoints);
}
} while (retrievedCheckpoints.size() != numberOfInitialCheckpoints &&
!CompletedCheckpoint.checkpointsMatch(lastTryRetrievedCheckpoints, retrievedCheckpoints));
// Clear local handles in order to prevent duplicates on
// recovery. The local handles should reflect the state
// of ZooKeeper.
completedCheckpoints.clear();
completedCheckpoints.addAll(retrievedCheckpoints);
if (completedCheckpoints.isEmpty() && numberOfInitialCheckpoints > 0) {
throw new FlinkException(
"Could not read any of the " + numberOfInitialCheckpoints + " checkpoints from storage.");
} else if (completedCheckpoints.size() != numberOfInitialCheckpoints) {
LOG.warn(
"Could only fetch {} of {} checkpoints from storage.",
completedCheckpoints.size(),
numberOfInitialCheckpoints);
}
}
......@@ -220,24 +210,20 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
public void addCheckpoint(final CompletedCheckpoint checkpoint) throws Exception {
checkNotNull(checkpoint, "Checkpoint");
synchronized (lock) {
checkState(!shutdown, "ZooKeeperCompletedCheckpointStore has been shut down");
final String path = checkpointIdToPath(checkpoint.getCheckpointID());
final String path = checkpointIdToPath(checkpoint.getCheckpointID());
// Now add the new one. If it fails, we don't want to loose existing data.
checkpointsInZooKeeper.addAndLock(path, checkpoint);
// Now add the new one. If it fails, we don't want to loose existing data.
checkpointsInZooKeeper.addAndLock(path, checkpoint);
completedCheckpoints.addLast(checkpoint);
completedCheckpoints.addLast(checkpoint);
// Everything worked, let's remove a previous checkpoint if necessary.
while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) {
final CompletedCheckpoint completedCheckpoint = completedCheckpoints.removeFirst();
tryRemoveCompletedCheckpoint(completedCheckpoint, CompletedCheckpoint::discardOnSubsume);
}
LOG.debug("Added {} to {}.", checkpoint, path);
// Everything worked, let's remove a previous checkpoint if necessary.
while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) {
final CompletedCheckpoint completedCheckpoint = completedCheckpoints.removeFirst();
tryRemoveCompletedCheckpoint(completedCheckpoint, CompletedCheckpoint::discardOnSubsume);
}
LOG.debug("Added {} to {}.", checkpoint, path);
}
private void tryRemoveCompletedCheckpoint(CompletedCheckpoint completedCheckpoint, ThrowingConsumer<CompletedCheckpoint, Exception> discardCallback) {
......@@ -259,16 +245,12 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
@Override
public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
synchronized (lock) {
return new ArrayList<>(completedCheckpoints);
}
return new ArrayList<>(completedCheckpoints);
}
@Override
public int getNumberOfRetainedCheckpoints() {
synchronized (lock) {
return completedCheckpoints.size();
}
return completedCheckpoints.size();
}
@Override
......@@ -278,30 +260,25 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
@Override
public void shutdown(JobStatus jobStatus) throws Exception {
synchronized (lock) {
if (!shutdown) {
shutdown = true;
if (jobStatus.isGloballyTerminalState()) {
LOG.info("Shutting down");
for (CompletedCheckpoint checkpoint : completedCheckpoints) {
tryRemoveCompletedCheckpoint(
checkpoint,
completedCheckpoint -> completedCheckpoint.discardOnShutdown(jobStatus));
}
if (jobStatus.isGloballyTerminalState()) {
LOG.info("Shutting down");
completedCheckpoints.clear();
checkpointsInZooKeeper.deleteChildren();
} else {
LOG.info("Suspending");
for (CompletedCheckpoint checkpoint : completedCheckpoints) {
tryRemoveCompletedCheckpoint(
checkpoint,
completedCheckpoint -> completedCheckpoint.discardOnShutdown(jobStatus));
}
// Clear the local handles, but don't remove any state
completedCheckpoints.clear();
completedCheckpoints.clear();
checkpointsInZooKeeper.deleteChildren();
} else {
LOG.info("Suspending");
// Release the state handle locks in ZooKeeper such that they can be deleted
checkpointsInZooKeeper.releaseAll();
}
}
// Clear the local handles, but don't remove any state
completedCheckpoints.clear();
// Release the state handle locks in ZooKeeper such that they can be deleted
checkpointsInZooKeeper.releaseAll();
}
}
......
......@@ -158,7 +158,6 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
assertNull(client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
sharedStateRegistry.close();
store = createCompletedCheckpoints(1);
store.recover();
assertEquals(0, store.getNumberOfRetainedCheckpoints());
......@@ -193,7 +192,6 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
// Recover again
sharedStateRegistry.close();
store = createCompletedCheckpoints(1);
store.recover();
CompletedCheckpoint recovered = store.getLatestCheckpoint(false);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册