提交 54f02ec7 编写于 作者: U Ufuk Celebi

[FLINK-4201] [runtime] Forward suspend to checkpoint coordinator

Suspended jobs were leading to shutdown of the checkpoint coordinator
and hence removal of checkpoint state. For standalone recovery mode
this is OK as no state can be recovered anyways (unchanged in this PR).
For HA though this lead to removal of checkpoint state, which we
actually want to keep for recovery.

We have the following behaviour now:

-----------+------------+-------------------
           | Standalone | High Availability
-----------+------------+-------------------
 SUSPENDED |  Discard   |       Keep
-----------+------------+-------------------
 FINISHED/ |  Discard   |     Discard
 FAILED/   |            |
 CANCELED  |            |
-----------+------------+-------------------

This closes #2276.
上级 b3fa459d
......@@ -280,10 +280,34 @@ public class CheckpointCoordinator {
/**
* Shuts down the checkpoint coordinator.
*
* After this method has been called, the coordinator does not accept and further
* messages and cannot trigger any further checkpoints.
* <p>After this method has been called, the coordinator does not accept
* and further messages and cannot trigger any further checkpoints. All
* checkpoint state is discarded.
*/
public void shutdown() throws Exception {
shutdown(true);
}
/**
* Suspends the checkpoint coordinator.
*
* <p>After this method has been called, the coordinator does not accept
* and further messages and cannot trigger any further checkpoints.
*
* <p>The difference to shutdown is that checkpoint state in the store
* and counter is kept around if possible to recover later.
*/
public void suspend() throws Exception {
shutdown(false);
}
/**
* Shuts down the checkpoint coordinator.
*
* @param shutdownStoreAndCounter Depending on this flag the checkpoint
* state services are shut down or suspended.
*/
private void shutdown(boolean shutdownStoreAndCounter) throws Exception {
synchronized (lock) {
try {
if (!shutdown) {
......@@ -302,21 +326,23 @@ public class CheckpointCoordinator {
jobStatusListener = null;
}
checkpointIdCounter.stop();
// clear and discard all pending checkpoints
for (PendingCheckpoint pending : pendingCheckpoints.values()) {
pending.discard(userClassLoader);
}
pendingCheckpoints.clear();
// clean and discard all successful checkpoints
completedCheckpointStore.discardAllCheckpoints();
if (shutdownStoreAndCounter) {
completedCheckpointStore.shutdown();
checkpointIdCounter.shutdown();
} else {
completedCheckpointStore.suspend();
checkpointIdCounter.suspend();
}
onShutdown();
}
}
finally {
} finally {
// Remove shutdown hook to prevent resource leaks, unless this is invoked by the
// shutdown hook itself.
if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
......
......@@ -29,9 +29,18 @@ public interface CheckpointIDCounter {
void start() throws Exception;
/**
* Stops the {@link CheckpointIDCounter} service.
* Shuts the {@link CheckpointIDCounter} service down and frees all created
* resources.
*/
void stop() throws Exception;
void shutdown() throws Exception;
/**
* Suspends the counter.
*
* <p>If the implementation allows recovery, the counter state needs to be
* kept. Otherwise, this acts as shutdown.
*/
void suspend() throws Exception;
/**
* Atomically increments the current checkpoint ID.
......
......@@ -49,10 +49,17 @@ public interface CompletedCheckpointStore {
CompletedCheckpoint getLatestCheckpoint() throws Exception;
/**
* Discards all added {@link CompletedCheckpoint} instances via {@link
* CompletedCheckpoint#discard(ClassLoader)}.
* Shuts down the store and discards all checkpoint instances.
*/
void shutdown() throws Exception;
/**
* Suspends the store.
*
* <p>If the implementation allows recovery, checkpoint state needs to be
* kept around. Otherwise, this should act like shutdown.
*/
void discardAllCheckpoints() throws Exception;
void suspend() throws Exception;
/**
* Returns all {@link CompletedCheckpoint} instances.
......
......@@ -362,7 +362,11 @@ public class SavepointCoordinator extends CheckpointCoordinator {
}
@Override
public void discardAllCheckpoints() throws Exception {
public void shutdown() throws Exception {
}
@Override
public void suspend() throws Exception {
}
@Override
......
......@@ -33,12 +33,13 @@ public class StandaloneCheckpointIDCounter implements CheckpointIDCounter {
private final AtomicLong checkpointIdCounter = new AtomicLong(1);
@Override
public void start() throws Exception {
}
public void start() throws Exception {}
@Override
public void stop() throws Exception {
}
public void shutdown() throws Exception {}
@Override
public void suspend() throws Exception {}
@Override
public long getAndIncrement() throws Exception {
......
......@@ -90,11 +90,17 @@ class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore {
}
@Override
public void discardAllCheckpoints() throws Exception {
public void shutdown() throws Exception {
for (CompletedCheckpoint checkpoint : checkpoints) {
checkpoint.discard(userClassLoader);
}
checkpoints.clear();
}
@Override
public void suspend() throws Exception {
// Do a regular shutdown, because we can't recovery anything
shutdown();
}
}
......@@ -91,9 +91,10 @@ public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter {
}
@Override
public void stop() throws Exception {
public void shutdown() throws Exception {
synchronized (startStopLock) {
if (isStarted) {
LOG.info("Shutting down.");
sharedCount.close();
client.getConnectionStateListenable().removeListener(connStateListener);
......@@ -105,6 +106,21 @@ public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter {
}
}
@Override
public void suspend() throws Exception {
synchronized (startStopLock) {
if (isStarted) {
LOG.info("Suspending.");
sharedCount.close();
client.getConnectionStateListenable().removeListener(connStateListener);
// Don't remove any state
isStarted = false;
}
}
}
@Override
public long getAndIncrement() throws Exception {
while (true) {
......
......@@ -232,7 +232,9 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
}
@Override
public void discardAllCheckpoints() throws Exception {
public void shutdown() throws Exception {
LOG.info("Shutting down");
for (Tuple2<StateHandle<CompletedCheckpoint>, String> checkpoint : checkpointStateHandles) {
try {
removeFromZooKeeperAndDiscardCheckpoint(checkpoint);
......@@ -250,6 +252,14 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true);
}
@Override
public void suspend() throws Exception {
LOG.info("Suspending");
// Clear the local handles, but don't remove any state
checkpointStateHandles.clear();
}
/**
* Removes the state handle from ZooKeeper, discards the checkpoints, and the state handle.
*/
......
......@@ -1123,7 +1123,11 @@ public class ExecutionGraph implements Serializable {
CheckpointCoordinator coord = this.checkpointCoordinator;
this.checkpointCoordinator = null;
if (coord != null) {
coord.shutdown();
if (state.isGloballyTerminalState()) {
coord.shutdown();
} else {
coord.suspend();
}
}
// We don't clean the checkpoint stats tracker, because we want
......@@ -1135,8 +1139,13 @@ public class ExecutionGraph implements Serializable {
try {
CheckpointCoordinator coord = this.savepointCoordinator;
this.savepointCoordinator = null;
if (coord != null) {
coord.shutdown();
if (state.isGloballyTerminalState()) {
coord.shutdown();
} else {
coord.suspend();
}
}
} catch (Exception e) {
LOG.error("Error while cleaning up after execution", e);
......
......@@ -241,28 +241,32 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
retry:
while (!success) {
// Initial cVersion (number of changes to the children of this node)
int initialCVersion = client.checkExists().forPath("/").getCversion();
List<String> children = client.getChildren().forPath("/");
for (String path : children) {
path = "/" + path;
try {
final StateHandle<T> stateHandle = get(path);
stateHandles.add(new Tuple2<>(stateHandle, path));
Stat stat = client.checkExists().forPath("/");
if (stat == null) {
break; // Node does not exist, done.
} else {
// Initial cVersion (number of changes to the children of this node)
int initialCVersion = stat.getCversion();
List<String> children = client.getChildren().forPath("/");
for (String path : children) {
path = "/" + path;
try {
final StateHandle<T> stateHandle = get(path);
stateHandles.add(new Tuple2<>(stateHandle, path));
} catch (KeeperException.NoNodeException ignored) {
// Concurrent deletion, retry
continue retry;
}
}
catch (KeeperException.NoNodeException ignored) {
// Concurrent deletion, retry
continue retry;
}
}
int finalCVersion = client.checkExists().forPath("/").getCversion();
int finalCVersion = client.checkExists().forPath("/").getCversion();
// Check for concurrent modifications
success = initialCVersion == finalCVersion;
// Check for concurrent modifications
success = initialCVersion == finalCVersion;
}
}
return stateHandles;
......@@ -284,30 +288,34 @@ public class ZooKeeperStateHandleStore<T extends Serializable> {
retry:
while (!success) {
// Initial cVersion (number of changes to the children of this node)
int initialCVersion = client.checkExists().forPath("/").getCversion();
List<String> children = ZKPaths.getSortedChildren(
client.getZookeeperClient().getZooKeeper(),
ZKPaths.fixForNamespace(client.getNamespace(), "/"));
for (String path : children) {
path = "/" + path;
try {
final StateHandle<T> stateHandle = get(path);
stateHandles.add(new Tuple2<>(stateHandle, path));
Stat stat = client.checkExists().forPath("/");
if (stat == null) {
break; // Node does not exist, done.
} else {
// Initial cVersion (number of changes to the children of this node)
int initialCVersion = stat.getCversion();
List<String> children = ZKPaths.getSortedChildren(
client.getZookeeperClient().getZooKeeper(),
ZKPaths.fixForNamespace(client.getNamespace(), "/"));
for (String path : children) {
path = "/" + path;
try {
final StateHandle<T> stateHandle = get(path);
stateHandles.add(new Tuple2<>(stateHandle, path));
} catch (KeeperException.NoNodeException ignored) {
// Concurrent deletion, retry
continue retry;
}
}
catch (KeeperException.NoNodeException ignored) {
// Concurrent deletion, retry
continue retry;
}
}
int finalCVersion = client.checkExists().forPath("/").getCversion();
int finalCVersion = client.checkExists().forPath("/").getCversion();
// Check for concurrent modifications
success = initialCVersion == finalCVersion;
// Check for concurrent modifications
success = initialCVersion == finalCVersion;
}
}
return stateHandles;
......
......@@ -18,6 +18,7 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.curator.framework.CuratorFramework;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
......@@ -35,6 +36,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
public abstract class CheckpointIDCounterTest extends TestLogger {
......@@ -64,6 +67,36 @@ public abstract class CheckpointIDCounterTest extends TestLogger {
ZooKeeper.deleteAll();
}
/**
* Tests that counter node is removed from ZooKeeper after shutdown.
*/
@Test
public void testShutdownRemovesState() throws Exception {
CheckpointIDCounter counter = createCompletedCheckpoints();
counter.start();
CuratorFramework client = ZooKeeper.getClient();
assertNotNull(client.checkExists().forPath("/checkpoint-id-counter"));
counter.shutdown();
assertNull(client.checkExists().forPath("/checkpoint-id-counter"));
}
/**
* Tests that counter node is NOT removed from ZooKeeper after suspend.
*/
@Test
public void testSuspendKeepsState() throws Exception {
CheckpointIDCounter counter = createCompletedCheckpoints();
counter.start();
CuratorFramework client = ZooKeeper.getClient();
assertNotNull(client.checkExists().forPath("/checkpoint-id-counter"));
counter.suspend();
assertNotNull(client.checkExists().forPath("/checkpoint-id-counter"));
}
@Override
protected CheckpointIDCounter createCompletedCheckpoints() throws Exception {
return new ZooKeeperCheckpointIDCounter(ZooKeeper.getClient(),
......@@ -89,7 +122,7 @@ public abstract class CheckpointIDCounterTest extends TestLogger {
assertEquals(4, counter.getAndIncrement());
}
finally {
counter.stop();
counter.shutdown();
}
}
......@@ -152,7 +185,7 @@ public abstract class CheckpointIDCounterTest extends TestLogger {
executor.shutdown();
}
counter.stop();
counter.shutdown();
}
}
......@@ -169,7 +202,7 @@ public abstract class CheckpointIDCounterTest extends TestLogger {
assertEquals(1337, counter.getAndIncrement());
assertEquals(1338, counter.getAndIncrement());
counter.stop();
counter.shutdown();
}
/**
......
......@@ -171,7 +171,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
checkpoints.addCheckpoint(checkpoint);
}
checkpoints.discardAllCheckpoints();
checkpoints.shutdown();
// Empty state
assertNull(checkpoints.getLatestCheckpoint());
......
......@@ -19,7 +19,7 @@
package org.apache.flink.runtime.checkpoint;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
......@@ -30,12 +30,12 @@ import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.SerializedValue;
import org.junit.AfterClass;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
import java.lang.reflect.Field;
......@@ -45,64 +45,115 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class ExecutionGraphCheckpointCoordinatorTest {
private static ActorSystem system = AkkaUtils.createLocalActorSystem(new Configuration());
@AfterClass
public static void teardown() {
JavaTestKit.shutdownActorSystem(system);
}
@Test
public void testCheckpointAndSavepointCoordinatorShareCheckpointIDCounter() throws Exception {
ExecutionGraph executionGraph = new ExecutionGraph(
TestingUtils.defaultExecutionContext(),
new JobID(),
"test",
new Configuration(),
new SerializedValue<>(new ExecutionConfig()),
new FiniteDuration(1, TimeUnit.DAYS),
new NoRestartStrategy(),
Collections.<BlobKey>emptyList(),
Collections.<URL>emptyList(),
ClassLoader.getSystemClassLoader(),
new UnregisteredMetricsGroup());
ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
try {
executionGraph.enableSnapshotCheckpointing(
100,
100,
100,
1,
42,
Collections.<ExecutionJobVertex>emptyList(),
Collections.<ExecutionJobVertex>emptyList(),
Collections.<ExecutionJobVertex>emptyList(),
actorSystem,
UUID.randomUUID(),
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1, ClassLoader.getSystemClassLoader()),
RecoveryMode.STANDALONE,
new HeapStateStore<CompletedCheckpoint>(),
new DisabledCheckpointStatsTracker());
CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
SavepointCoordinator savepointCoordinator = executionGraph.getSavepointCoordinator();
// Both the checkpoint and savepoint coordinator need to operate\
// with the same checkpoint ID counter.
Field counterField = CheckpointCoordinator.class.getDeclaredField("checkpointIdCounter");
CheckpointIDCounter counterCheckpointCoordinator = (CheckpointIDCounter) counterField
.get(checkpointCoordinator);
CheckpointIDCounter counterSavepointCoordinator = (CheckpointIDCounter) counterField
.get(savepointCoordinator);
assertEquals(counterCheckpointCoordinator, counterSavepointCoordinator);
}
finally {
if (actorSystem != null) {
actorSystem.shutdown();
}
}
ExecutionGraph executionGraph = createExecutionGraphAndEnableCheckpointing(
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1, ClassLoader.getSystemClassLoader()));
CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
SavepointCoordinator savepointCoordinator = executionGraph.getSavepointCoordinator();
// Both the checkpoint and savepoint coordinator need to operate
// with the same checkpoint ID counter.
Field counterField = CheckpointCoordinator.class.getDeclaredField("checkpointIdCounter");
CheckpointIDCounter counterCheckpointCoordinator = (CheckpointIDCounter) counterField
.get(checkpointCoordinator);
CheckpointIDCounter counterSavepointCoordinator = (CheckpointIDCounter) counterField
.get(savepointCoordinator);
assertEquals(counterCheckpointCoordinator, counterSavepointCoordinator);
}
/**
* Tests that a shut down checkpoint coordinator calls shutdown on
* the store and counter.
*/
@Test
public void testShutdownCheckpointCoordinator() throws Exception {
CheckpointIDCounter counter = mock(CheckpointIDCounter.class);
CompletedCheckpointStore store = mock(CompletedCheckpointStore.class);
ExecutionGraph graph = createExecutionGraphAndEnableCheckpointing(counter, store);
graph.fail(new Exception("Test Exception"));
// Two times, because shared with savepoint coordinator
verify(counter, times(2)).shutdown();
verify(store, times(1)).shutdown();
}
/**
* Tests that a suspended checkpoint coordinator calls suspend on
* the store and counter.
*/
@Test
public void testSuspendCheckpointCoordinator() throws Exception {
CheckpointIDCounter counter = mock(CheckpointIDCounter.class);
CompletedCheckpointStore store = mock(CompletedCheckpointStore.class);
ExecutionGraph graph = createExecutionGraphAndEnableCheckpointing(counter, store);
graph.suspend(new Exception("Test Exception"));
// No shutdown
verify(counter, times(0)).shutdown();
verify(store, times(0)).shutdown();
// Two times, because shared with savepoint coordinator
verify(counter, times(2)).suspend();
verify(store, times(1)).suspend();
}
private ExecutionGraph createExecutionGraphAndEnableCheckpointing(
CheckpointIDCounter counter,
CompletedCheckpointStore store) throws Exception {
ExecutionGraph executionGraph = new ExecutionGraph(
TestingUtils.defaultExecutionContext(),
new JobID(),
"test",
new Configuration(),
new SerializedValue<>(new ExecutionConfig()),
new FiniteDuration(1, TimeUnit.DAYS),
new NoRestartStrategy(),
Collections.<BlobKey>emptyList(),
Collections.<URL>emptyList(),
ClassLoader.getSystemClassLoader(),
new UnregisteredMetricsGroup());
executionGraph.enableSnapshotCheckpointing(
100,
100,
100,
1,
42,
Collections.<ExecutionJobVertex>emptyList(),
Collections.<ExecutionJobVertex>emptyList(),
Collections.<ExecutionJobVertex>emptyList(),
system,
UUID.randomUUID(),
counter,
store,
RecoveryMode.STANDALONE,
new HeapStateStore<CompletedCheckpoint>(),
new DisabledCheckpointStatsTracker());
JobVertex jobVertex = new JobVertex("MockVertex");
executionGraph.attachJobGraph(Collections.singletonList(jobVertex));
return executionGraph;
}
}
......@@ -1109,7 +1109,12 @@ public class SavepointCoordinatorTest extends TestLogger {
}
@Override
public void stop() throws Exception {
public void shutdown() throws Exception {
started = false;
}
@Override
public void suspend() throws Exception {
started = false;
}
......
......@@ -18,6 +18,11 @@
package org.apache.flink.runtime.checkpoint;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Tests for basic {@link CompletedCheckpointStore} contract.
*/
......@@ -30,4 +35,39 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
return new StandaloneCompletedCheckpointStore(maxNumberOfCheckpointsToRetain, userClassLoader);
}
/**
* Tests that shutdown discards all checkpoints.
*/
@Test
public void testShutdownDiscardsCheckpoints() throws Exception {
CompletedCheckpointStore store = createCompletedCheckpoints(1, ClassLoader.getSystemClassLoader());
TestCheckpoint checkpoint = createCheckpoint(0);
store.addCheckpoint(checkpoint);
assertEquals(1, store.getNumberOfRetainedCheckpoints());
store.shutdown();
assertEquals(0, store.getNumberOfRetainedCheckpoints());
assertTrue(checkpoint.isDiscarded());
}
/**
* Tests that suspends discards all checkpoints (as they cannot be
* recovered later in standalone recovery mode).
*/
@Test
public void testSuspendDiscardsCheckpoints() throws Exception {
CompletedCheckpointStore store = createCompletedCheckpoints(1, ClassLoader.getSystemClassLoader());
TestCheckpoint checkpoint = createCheckpoint(0);
store.addCheckpoint(checkpoint);
assertEquals(1, store.getNumberOfRetainedCheckpoints());
store.suspend();
assertEquals(0, store.getNumberOfRetainedCheckpoints());
assertTrue(checkpoint.isDiscarded());
}
}
......@@ -18,6 +18,7 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.curator.framework.CuratorFramework;
import org.apache.flink.runtime.state.LocalStateHandle;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.zookeeper.StateStorageHelper;
......@@ -31,6 +32,8 @@ import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
/**
* Tests for basic {@link CompletedCheckpointStore} contract and ZooKeeper state handling.
......@@ -106,4 +109,55 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
assertEquals(1, checkpoints.getNumberOfRetainedCheckpoints());
assertEquals(expected[2], checkpoints.getLatestCheckpoint());
}
/**
* Tests that shutdown discards all checkpoints.
*/
@Test
public void testShutdownDiscardsCheckpoints() throws Exception {
CuratorFramework client = ZooKeeper.getClient();
CompletedCheckpointStore store = createCompletedCheckpoints(1, ClassLoader.getSystemClassLoader());
TestCheckpoint checkpoint = createCheckpoint(0);
store.addCheckpoint(checkpoint);
assertEquals(1, store.getNumberOfRetainedCheckpoints());
assertNotNull(client.checkExists().forPath(CheckpointsPath + "/" + checkpoint.getCheckpointID()));
store.shutdown();
assertEquals(0, store.getNumberOfRetainedCheckpoints());
assertNull(client.checkExists().forPath(CheckpointsPath + "/" + checkpoint.getCheckpointID()));
store.recover();
assertEquals(0, store.getNumberOfRetainedCheckpoints());
}
/**
* Tests that suspends keeps all checkpoints (as they can be recovered
* later by the ZooKeeper store).
*/
@Test
public void testSuspendKeepsCheckpoints() throws Exception {
CuratorFramework client = ZooKeeper.getClient();
CompletedCheckpointStore store = createCompletedCheckpoints(1, ClassLoader.getSystemClassLoader());
TestCheckpoint checkpoint = createCheckpoint(0);
store.addCheckpoint(checkpoint);
assertEquals(1, store.getNumberOfRetainedCheckpoints());
assertNotNull(client.checkExists().forPath(CheckpointsPath + "/" + checkpoint.getCheckpointID()));
store.suspend();
assertEquals(0, store.getNumberOfRetainedCheckpoints());
assertNotNull(client.checkExists().forPath(CheckpointsPath + "/" + checkpoint.getCheckpointID()));
// Recover again
store.recover();
CompletedCheckpoint recovered = store.getLatestCheckpoint();
assertEquals(checkpoint, recovered);
}
}
......@@ -29,9 +29,13 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.HeapStateStore;
import org.apache.flink.runtime.checkpoint.SavepointStore;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
......@@ -41,12 +45,17 @@ import org.apache.flink.runtime.instance.InstanceManager;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.state.LocalStateHandle;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testingUtils.TestingJobManager;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
......@@ -66,11 +75,14 @@ import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
......@@ -97,13 +109,10 @@ public class JobManagerHARecoveryTest {
/**
* Tests that the persisted job is not removed from the SubmittedJobGraphStore if the JobManager
* loses its leadership. Furthermore, it tests that the job manager can recover the job from
* the SubmittedJobGraphStore.
*
* @throws Exception
* the SubmittedJobGraphStore and checkpoint state is recovered as well.
*/
@Test
public void testJobRecoveryWhenLosingLeadership() throws Exception {
FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
FiniteDuration jobRecoveryTimeout = new FiniteDuration(3, TimeUnit.SECONDS);
Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
......@@ -120,10 +129,12 @@ public class JobManagerHARecoveryTest {
flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots);
try {
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
MySubmittedJobGraphStore mySubmittedJobGraphStore = new MySubmittedJobGraphStore();
MyCheckpointStore checkpointStore = new MyCheckpointStore();
CheckpointIDCounter checkpointCounter = new StandaloneCheckpointIDCounter();
CheckpointRecoveryFactory checkpointStateFactory = new MyCheckpointRecoveryFactory(checkpointStore, checkpointCounter);
TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService();
TestingLeaderRetrievalService myLeaderRetrievalService = new TestingLeaderRetrievalService();
......@@ -146,7 +157,7 @@ public class JobManagerHARecoveryTest {
timeout,
myLeaderElectionService,
mySubmittedJobGraphStore,
new StandaloneCheckpointRecoveryFactory(),
checkpointStateFactory,
new SavepointStore(new HeapStateStore()),
jobRecoveryTimeout,
Option.apply(null));
......@@ -171,11 +182,23 @@ public class JobManagerHARecoveryTest {
Await.ready(tmAlive, deadline.timeLeft());
JobVertex sourceJobVertex = new JobVertex("Source");
sourceJobVertex.setInvokableClass(BlockingInvokable.class);
sourceJobVertex.setInvokableClass(BlockingStatefulInvokable.class);
sourceJobVertex.setParallelism(slots);
JobGraph jobGraph = new JobGraph("TestingJob", sourceJobVertex);
List<JobVertexID> vertexId = Collections.singletonList(sourceJobVertex.getID());
jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
vertexId,
vertexId,
vertexId,
100,
10 * 60 * 1000,
0,
1));
BlockingStatefulInvokable.initializeStaticHelpers(slots);
Future<Object> isLeader = gateway.ask(
TestingJobManagerMessages.getNotifyWhenLeader(),
deadline.timeLeft());
......@@ -199,6 +222,9 @@ public class JobManagerHARecoveryTest {
Await.ready(jobSubmitted, deadline.timeLeft());
// Wait for some checkpoints to complete
BlockingStatefulInvokable.awaitCompletedCheckpoints();
Future<Object> jobRemoved = gateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), deadline.timeLeft());
// Revoke leadership
......@@ -228,6 +254,15 @@ public class JobManagerHARecoveryTest {
// check that the job has been removed from the submitted job graph store
assertFalse(mySubmittedJobGraphStore.contains(jobGraph.getJobID()));
// Check that state has been recovered
long[] recoveredStates = BlockingStatefulInvokable.getRecoveredStates();
for (long state : recoveredStates) {
boolean isExpected = state >= BlockingStatefulInvokable.NUM_CHECKPOINTS_TO_COMPLETE;
assertTrue("Did not recover checkpoint state correctly, expecting >= " +
BlockingStatefulInvokable.NUM_CHECKPOINTS_TO_COMPLETE +
", but state was " + state, isExpected);
}
} finally {
if (archive != null) {
archive.tell(PoisonPill.getInstance(), ActorRef.noSender());
......@@ -243,7 +278,89 @@ public class JobManagerHARecoveryTest {
}
}
/**
* A checkpoint store, which supports shutdown and suspend. You can use this to test HA
* as long as the factory always returns the same store instance.
*/
static class MyCheckpointStore implements CompletedCheckpointStore {
private final ArrayDeque<CompletedCheckpoint> checkpoints = new ArrayDeque<>(2);
private final ArrayDeque<CompletedCheckpoint> suspended = new ArrayDeque<>(2);
@Override
public void recover() throws Exception {
checkpoints.addAll(suspended);
suspended.clear();
}
@Override
public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
checkpoints.addLast(checkpoint);
if (checkpoints.size() > 1) {
checkpoints.removeFirst().discard(ClassLoader.getSystemClassLoader());
}
}
@Override
public CompletedCheckpoint getLatestCheckpoint() throws Exception {
return checkpoints.isEmpty() ? null : checkpoints.getLast();
}
@Override
public void shutdown() throws Exception {
checkpoints.clear();
suspended.clear();
}
@Override
public void suspend() throws Exception {
suspended.addAll(checkpoints);
checkpoints.clear();
}
@Override
public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
return new ArrayList<>(checkpoints);
}
@Override
public int getNumberOfRetainedCheckpoints() {
return checkpoints.size();
}
}
static class MyCheckpointRecoveryFactory implements CheckpointRecoveryFactory {
private final CompletedCheckpointStore store;
private final CheckpointIDCounter counter;
public MyCheckpointRecoveryFactory(CompletedCheckpointStore store, CheckpointIDCounter counter) {
this.store = store;
this.counter = counter;
}
@Override
public void start() {}
@Override
public void stop() {}
@Override
public CompletedCheckpointStore createCompletedCheckpoints(JobID jobId, ClassLoader userClassLoader) throws Exception {
return store;
}
@Override
public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) throws Exception {
return counter;
}
}
static class MySubmittedJobGraphStore implements SubmittedJobGraphStore {
Map<JobID, SubmittedJobGraph> storedJobs = new HashMap<>();
@Override
......@@ -308,4 +425,50 @@ public class JobManagerHARecoveryTest {
}
}
public static class BlockingStatefulInvokable extends BlockingInvokable implements StatefulTask<StateHandle<Long>> {
private static final int NUM_CHECKPOINTS_TO_COMPLETE = 5;
private static volatile CountDownLatch completedCheckpointsLatch = new CountDownLatch(1);
private static volatile long[] recoveredStates = new long[0];
private int completedCheckpoints = 0;
@Override
public void setInitialState(StateHandle<Long> stateHandle) throws Exception {
int subtaskIndex = getIndexInSubtaskGroup();
if (subtaskIndex < recoveredStates.length) {
recoveredStates[subtaskIndex] = stateHandle.getState(getUserCodeClassLoader());
}
}
@Override
public boolean triggerCheckpoint(long checkpointId, long timestamp) {
StateHandle<Long> state = new LocalStateHandle<>(checkpointId);
getEnvironment().acknowledgeCheckpoint(checkpointId, state);
return true;
}
@Override
public void notifyCheckpointComplete(long checkpointId) {
if (completedCheckpoints++ > NUM_CHECKPOINTS_TO_COMPLETE) {
completedCheckpointsLatch.countDown();
}
}
public static void initializeStaticHelpers(int numSubtasks) {
completedCheckpointsLatch = new CountDownLatch(numSubtasks);
recoveredStates = new long[numSubtasks];
}
public static void awaitCompletedCheckpoints() throws InterruptedException {
completedCheckpointsLatch.await();
}
public static long[] getRecoveredStates() {
return recoveredStates;
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册