From eff261efee8c99a0ecc88c06aa07db9a553e6bdd Mon Sep 17 00:00:00 2001 From: Gary Yao Date: Tue, 8 Jan 2019 21:02:16 +0100 Subject: [PATCH] [FLINK-11156][tests, runtime] Simplify ZooKeeperCompletedCheckpointStore constructor --- .../ZooKeeperCompletedCheckpointStore.java | 88 +++---------------- .../flink/runtime/util/ZooKeeperUtils.java | 48 +++++++++- .../zookeeper/ZooKeeperStateHandleStore.java | 12 +++ .../zookeeper/ZooKeeperUtilityFactory.java | 11 +-- ...oKeeperCompletedCheckpointStoreITCase.java | 10 ++- ...erCompletedCheckpointStoreMockitoTest.java | 15 +--- ...ZooKeeperCompletedCheckpointStoreTest.java | 10 ++- .../ZooKeeperStateHandleStoreTest.java | 15 ++++ 8 files changed, 104 insertions(+), 105 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index e87ca362331..12cc8eb66ab 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -18,22 +18,17 @@ package org.apache.flink.runtime.checkpoint; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.state.RetrievableStateHandle; -import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; import org.apache.flink.util.FlinkException; import org.apache.flink.util.function.ThrowingConsumer; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.utils.ZKPaths; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; @@ -75,9 +70,6 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto private static final Comparator, String>> STRING_COMPARATOR = Comparator.comparing(o -> o.f1); - /** Curator ZooKeeper client. */ - private final CuratorFramework client; - /** Completed checkpoints in ZooKeeper. */ private final ZooKeeperStateHandleStore checkpointsInZooKeeper; @@ -100,79 +92,23 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto * least 1). Adding more checkpoints than this results * in older checkpoints being discarded. On recovery, * we will only start with a single checkpoint. - * @param client The Curator ZooKeeper client - * @param checkpointsPath The ZooKeeper path for the checkpoints (needs to - * start with a '/') - * @param stateStorage State storage to be used to persist the completed - * checkpoint - * @param executor to execute blocking calls - * @throws Exception + * @param checkpointsInZooKeeper Completed checkpoints in ZooKeeper + * @param executor to execute blocking calls */ public ZooKeeperCompletedCheckpointStore( - int maxNumberOfCheckpointsToRetain, - CuratorFramework client, - String checkpointsPath, - RetrievableStateStorageHelper stateStorage, - Executor executor - ) throws Exception { - this(maxNumberOfCheckpointsToRetain, - adaptNameSpace(client, checkpointsPath), - stateStorage, - executor); - - LOG.info("Initialized in '{}'.", checkpointsPath); - } - - @VisibleForTesting - ZooKeeperCompletedCheckpointStore( - int maxNumberOfCheckpointsToRetain, - CuratorFramework client, - String checkpointsPath, - Executor executor, - ZooKeeperStateHandleStore checkpointsInZooKeeper - ) throws Exception { - this(maxNumberOfCheckpointsToRetain, - adaptNameSpace(client, checkpointsPath), - executor, - checkpointsInZooKeeper); - - LOG.info("Initialized in '{}'.", checkpointsPath); - } - - private ZooKeeperCompletedCheckpointStore( - int maxNumberOfCheckpointsToRetain, - CuratorFramework client, - RetrievableStateStorageHelper stateStorage, - Executor executor - ) { - this(maxNumberOfCheckpointsToRetain, - client, - executor, - new ZooKeeperStateHandleStore<>(client, stateStorage)); - } + int maxNumberOfCheckpointsToRetain, + ZooKeeperStateHandleStore checkpointsInZooKeeper, + Executor executor) { - private ZooKeeperCompletedCheckpointStore( - int maxNumberOfCheckpointsToRetain, - @Nonnull CuratorFramework client, - @Nonnull Executor executor, - @Nonnull ZooKeeperStateHandleStore checkpointsInZooKeeper - ) { checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint."); this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain; - this.client = client; - this.executor = executor; - this.checkpointsInZooKeeper = checkpointsInZooKeeper; - this.completedCheckpoints = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1); - } - private static CuratorFramework adaptNameSpace(CuratorFramework client, String checkpointsPath) throws Exception { - // Ensure that the checkpoints path exists - client.newNamespaceAwareEnsurePath(checkpointsPath) - .ensure(client.getZookeeperClient()); + this.checkpointsInZooKeeper = checkNotNull(checkpointsInZooKeeper); + + this.completedCheckpoints = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1); - // All operations will have the path as root - return client.usingNamespace(client.getNamespace() + checkpointsPath); + this.executor = checkNotNull(executor); } @Override @@ -345,11 +281,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto } completedCheckpoints.clear(); - - String path = "/" + client.getNamespace(); - - LOG.info("Removing {} from ZooKeeper", path); - ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true); + checkpointsInZooKeeper.deleteChildren(); } else { LOG.info("Suspending"); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java index 039dcf41d8d..86b7e6b1d11 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java @@ -294,12 +294,31 @@ public class ZooKeeperUtils { checkpointsPath += ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId); - return new ZooKeeperCompletedCheckpointStore( + final ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore( maxNumberOfCheckpointsToRetain, - client, - checkpointsPath, - stateStorage, + createZooKeeperStateHandleStore(client, checkpointsPath, stateStorage), executor); + + LOG.info("Initialized {} in '{}'.", ZooKeeperCompletedCheckpointStore.class.getSimpleName(), checkpointsPath); + return zooKeeperCompletedCheckpointStore; + } + + /** + * Creates an instance of {@link ZooKeeperStateHandleStore}. + * + * @param client ZK client + * @param path Path to use for the client namespace + * @param stateStorage RetrievableStateStorageHelper that persist the actual state and whose + * returned state handle is then written to ZooKeeper + * @param Type of state + * @return {@link ZooKeeperStateHandleStore} instance + * @throws Exception ZK errors + */ + public static ZooKeeperStateHandleStore createZooKeeperStateHandleStore( + final CuratorFramework client, + final String path, + final RetrievableStateStorageHelper stateStorage) throws Exception { + return new ZooKeeperStateHandleStore<>(useNamespaceAndEnsurePath(client, path), stateStorage); } /** @@ -362,6 +381,27 @@ public class ZooKeeperUtils { return root + namespace; } + /** + * Returns a facade of the client that uses the specified namespace, and ensures that all nodes + * in the path exist. + * + * @param client ZK client + * @param path the new namespace + * @return ZK Client that uses the new namespace + * @throws Exception ZK errors + */ + public static CuratorFramework useNamespaceAndEnsurePath(final CuratorFramework client, final String path) throws Exception { + Preconditions.checkNotNull(client, "client must not be null"); + Preconditions.checkNotNull(path, "path must not be null"); + + // Ensure that the checkpoints path exists + client.newNamespaceAwareEnsurePath(path) + .ensure(client.getZookeeperClient()); + + // All operations will have the path as root + return client.usingNamespace(generateZookeeperPath(client.getNamespace(), path)); + } + /** * Secure {@link ACLProvider} implementation. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java index 2cb1ccc5071..62630104942 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java @@ -24,6 +24,7 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.InstantiationUtil; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; @@ -424,6 +425,17 @@ public class ZooKeeperStateHandleStore { } } + /** + * Recursively deletes all children. + * + * @throws Exception ZK errors + */ + public void deleteChildren() throws Exception { + final String path = "/" + client.getNamespace(); + LOG.info("Removing {} from ZooKeeper", path); + ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true); + } + // --------------------------------------------------------------------------------------------------------- // Protected methods // --------------------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java index 3e294e0dbdd..3a9ae922867 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java @@ -80,13 +80,10 @@ public class ZooKeeperUtilityFactory { String zkStateHandleStorePath, RetrievableStateStorageHelper stateStorageHelper) throws Exception { - facade.newNamespaceAwareEnsurePath(zkStateHandleStorePath).ensure(facade.getZookeeperClient()); - CuratorFramework stateHandleStoreFacade = facade.usingNamespace( - ZooKeeperUtils.generateZookeeperPath( - facade.getNamespace(), - zkStateHandleStorePath)); - - return new ZooKeeperStateHandleStore<>(stateHandleStoreFacade, stateStorageHelper); + return ZooKeeperUtils.createZooKeeperStateHandleStore( + facade, + zkStateHandleStorePath, + stateStorageHelper); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java index f493d6f967b..c0c18773e6f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java @@ -22,7 +22,9 @@ import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.state.RetrievableStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment; import org.apache.curator.framework.CuratorFramework; @@ -67,10 +69,14 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint @Override protected ZooKeeperCompletedCheckpointStore createCompletedCheckpoints(int maxNumberOfCheckpointsToRetain) throws Exception { - return new ZooKeeperCompletedCheckpointStore(maxNumberOfCheckpointsToRetain, + final ZooKeeperStateHandleStore checkpointsInZooKeeper = ZooKeeperUtils.createZooKeeperStateHandleStore( ZOOKEEPER.getClient(), CHECKPOINT_PATH, - new HeapStateStorageHelper(), + new TestingRetrievableStateStorageHelper<>()); + + return new ZooKeeperCompletedCheckpointStore( + maxNumberOfCheckpointsToRetain, + checkpointsInZooKeeper, Executors.directExecutor()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java index 5fe0f41fc52..c8fcfb90711 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java @@ -157,14 +157,10 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger { } }); - final String checkpointsPath = "foobar"; - ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore( numCheckpointsToRetain, - client, - checkpointsPath, - Executors.directExecutor(), - zooKeeperStateHandleStoreMock); + zooKeeperStateHandleStoreMock, + Executors.directExecutor()); zooKeeperCompletedCheckpointStore.recover(); @@ -221,14 +217,11 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger { doThrow(new Exception()).when(zookeeperStateHandleStoreMock).releaseAndTryRemove(anyString()); final int numCheckpointsToRetain = 1; - final String checkpointsPath = "foobar"; ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore( numCheckpointsToRetain, - client, - checkpointsPath, - Executors.directExecutor(), - zookeeperStateHandleStoreMock); + zookeeperStateHandleStoreMock, + Executors.directExecutor()); for (long i = 0; i <= numCheckpointsToRetain; ++i) { CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java index a9cba8861c0..bff6779336f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.runtime.zookeeper.ZooKeeperResource; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; import org.apache.flink.util.TestLogger; import org.apache.curator.framework.CuratorFramework; @@ -117,11 +118,14 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger { @Nonnull private ZooKeeperCompletedCheckpointStore createZooKeeperCheckpointStore(CuratorFramework client) throws Exception { - return new ZooKeeperCompletedCheckpointStore( - 1, + final ZooKeeperStateHandleStore checkpointsInZooKeeper = ZooKeeperUtils.createZooKeeperStateHandleStore( client, "/checkpoints", - new TestingRetrievableStateStorageHelper<>(), + new TestingRetrievableStateStorageHelper<>()); + + return new ZooKeeperCompletedCheckpointStore( + 1, + checkpointsInZooKeeper, Executors.directExecutor()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java index 3c37faef3aa..161b3b8b57e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java @@ -42,10 +42,13 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.spy; @@ -685,6 +688,18 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger { assertEquals(0, stat.getNumChildren()); } + @Test + public void testDeleteAllShouldRemoveAllPaths() throws Exception { + final ZooKeeperStateHandleStore zkStore = new ZooKeeperStateHandleStore<>( + ZooKeeperUtils.useNamespaceAndEnsurePath(ZOOKEEPER.getClient(), "/path"), + new LongStateStorage()); + + zkStore.addAndLock("/state", 1L); + zkStore.deleteChildren(); + + assertThat(zkStore.getAllPaths(), is(empty())); + } + // --------------------------------------------------------------------------------------------- // Simple test helpers // --------------------------------------------------------------------------------------------- -- GitLab