From af2091cc92d4f5d8fe04ea4bdce865ea02032f02 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Wed, 24 Feb 2016 17:34:05 +0100 Subject: [PATCH] [FLINK-3499] [runtime] Clear ZooKeeper references on recovery This closes #1707. --- .../checkpoint/ZooKeeperCompletedCheckpointStore.java | 5 +++++ .../checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java | 2 ++ ...ITCase.java => JobManagerHACheckpointRecoveryITCase.java} | 2 +- 3 files changed, 8 insertions(+), 1 deletion(-) rename flink-tests/src/test/java/org/apache/flink/test/recovery/{JobManagerCheckpointRecoveryITCase.java => JobManagerHACheckpointRecoveryITCase.java} (99%) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index cb2be648ed9..65cdee788d8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -137,6 +137,11 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto public void recover() throws Exception { LOG.info("Recovering checkpoints from ZooKeeper."); + // Clear local handles in order to prevent duplicates on + // recovery. The local handles should reflect the state + // of ZooKeeper. + checkpointStateHandles.clear(); + // Get all there is first List, String>> initialCheckpoints; while (true) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java index dc6f550a92a..222e69319ce 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java @@ -87,6 +87,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint // All three should be in ZK assertEquals(3, ZooKeeper.getClient().getChildren().forPath(CheckpointsPath).size()); + assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints()); // Recover checkpoints.recover(); @@ -102,6 +103,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint } assertEquals(1, ZooKeeper.getClient().getChildren().forPath(CheckpointsPath).size()); + assertEquals(1, checkpoints.getNumberOfRetainedCheckpoints()); assertEquals(expected[2], checkpoints.getLatestCheckpoint()); } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java similarity index 99% rename from flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java rename to flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java index b1477c910d5..5783fccd345 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java @@ -74,7 +74,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; -public class JobManagerCheckpointRecoveryITCase extends TestLogger { +public class JobManagerHACheckpointRecoveryITCase extends TestLogger { @Rule public RetryRule retryRule = new RetryRule(); -- GitLab