提交 af2091cc 编写于 作者: U Ufuk Celebi

[FLINK-3499] [runtime] Clear ZooKeeper references on recovery

This closes #1707.
上级 be68b178
...@@ -137,6 +137,11 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto ...@@ -137,6 +137,11 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
public void recover() throws Exception { public void recover() throws Exception {
LOG.info("Recovering checkpoints from ZooKeeper."); LOG.info("Recovering checkpoints from ZooKeeper.");
// Clear local handles in order to prevent duplicates on
// recovery. The local handles should reflect the state
// of ZooKeeper.
checkpointStateHandles.clear();
// Get all there is first // Get all there is first
List<Tuple2<StateHandle<CompletedCheckpoint>, String>> initialCheckpoints; List<Tuple2<StateHandle<CompletedCheckpoint>, String>> initialCheckpoints;
while (true) { while (true) {
......
...@@ -87,6 +87,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint ...@@ -87,6 +87,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
// All three should be in ZK // All three should be in ZK
assertEquals(3, ZooKeeper.getClient().getChildren().forPath(CheckpointsPath).size()); assertEquals(3, ZooKeeper.getClient().getChildren().forPath(CheckpointsPath).size());
assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
// Recover // Recover
checkpoints.recover(); checkpoints.recover();
...@@ -102,6 +103,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint ...@@ -102,6 +103,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
} }
assertEquals(1, ZooKeeper.getClient().getChildren().forPath(CheckpointsPath).size()); assertEquals(1, ZooKeeper.getClient().getChildren().forPath(CheckpointsPath).size());
assertEquals(1, checkpoints.getNumberOfRetainedCheckpoints());
assertEquals(expected[2], checkpoints.getLatestCheckpoint()); assertEquals(expected[2], checkpoints.getLatestCheckpoint());
} }
} }
...@@ -74,7 +74,7 @@ import static org.junit.Assert.assertEquals; ...@@ -74,7 +74,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
public class JobManagerCheckpointRecoveryITCase extends TestLogger { public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
@Rule @Rule
public RetryRule retryRule = new RetryRule(); public RetryRule retryRule = new RetryRule();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册