diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index cb2be648ed907ea80bf0cf81c40fb1c83294737c..65cdee788d8e46d9028f5ac1d504baa650298805 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -137,6 +137,11 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto public void recover() throws Exception { LOG.info("Recovering checkpoints from ZooKeeper."); + // Clear local handles in order to prevent duplicates on + // recovery. The local handles should reflect the state + // of ZooKeeper. + checkpointStateHandles.clear(); + // Get all there is first List, String>> initialCheckpoints; while (true) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java index dc6f550a92a128413bd76be4945bf29e098c45c6..222e69319ce51906a1ff2c73dbdd923691dc6586 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java @@ -87,6 +87,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint // All three should be in ZK assertEquals(3, ZooKeeper.getClient().getChildren().forPath(CheckpointsPath).size()); + assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints()); // Recover checkpoints.recover(); @@ -102,6 +103,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint } assertEquals(1, ZooKeeper.getClient().getChildren().forPath(CheckpointsPath).size()); + assertEquals(1, checkpoints.getNumberOfRetainedCheckpoints()); assertEquals(expected[2], checkpoints.getLatestCheckpoint()); } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java similarity index 99% rename from flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java rename to flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java index b1477c910d50714e79a21a41aa8c6557a16869b3..5783fccd3451a03057e7ff2d9ce19c154d518475 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java @@ -74,7 +74,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; -public class JobManagerCheckpointRecoveryITCase extends TestLogger { +public class JobManagerHACheckpointRecoveryITCase extends TestLogger { @Rule public RetryRule retryRule = new RetryRule();