From 09b6bb0c2e39e1418b4422c138bd0865cf8950bc Mon Sep 17 00:00:00 2001 From: Rajan Date: Mon, 24 Apr 2017 21:35:28 -0700 Subject: [PATCH] handle zkCache failure: invalidate cache and zk-getData failure (#377) * handle zkCache failure: invalidate cache and zk-getData failure * introduce separate executor to serve zkcache callback * add executor to discovery service * remove testing npe * add assetion on zkCache executor parameter and update tests * update executor thread in testcase for intermittent failure --- .../yahoo/pulsar/broker/PulsarService.java | 10 +- .../pulsar/broker/service/ServerCnx.java | 12 +- .../broker/cache/ResourceQuotaCacheTest.java | 17 ++- .../broker/namespace/OwnershipCacheTest.java | 7 +- .../service/BrokerDiscoveryProvider.java | 5 +- .../service/web/ZookeeperCacheLoader.java | 11 +- .../pulsar/zookeeper/ZooKeeperCache.java | 54 ++++++--- .../pulsar/zookeeper/ZooKeeperDataCache.java | 2 + .../ZkBookieRackAffinityMappingTest.java | 6 +- ...atedBookieEnsemblePlacementPolicyTest.java | 6 +- .../pulsar/zookeeper/ZookeeperCacheTest.java | 109 +++++++++++++++++- 11 files changed, 196 insertions(+), 43 deletions(-) diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java index 6e7faa1ead0..b71a2ac9421 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java @@ -95,6 +95,8 @@ public class PulsarService implements AutoCloseable { private LocalZooKeeperConnectionService localZooKeeperConnectionProvider; private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(20, new DefaultThreadFactory("pulsar")); + private final ScheduledExecutorService cacheExecutor = Executors.newScheduledThreadPool(10, + new DefaultThreadFactory("zk-cache-callback")); private final OrderedSafeExecutor orderedExecutor = new OrderedSafeExecutor(8, "pulsar-ordered"); private ScheduledExecutorService loadManagerExecutor = null; private ScheduledFuture loadReportTask = null; @@ -382,10 +384,10 @@ public class PulsarService implements AutoCloseable { LOG.info("starting configuration cache service"); - this.localZkCache = new LocalZooKeeperCache(getZkClient(), getOrderedExecutor(), this.executor); + this.localZkCache = new LocalZooKeeperCache(getZkClient(), getOrderedExecutor(), this.cacheExecutor); this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(), (int) config.getZooKeeperSessionTimeoutMillis(), config.getGlobalZookeeperServers(), - getOrderedExecutor(), this.executor); + getOrderedExecutor(), this.cacheExecutor); try { this.globalZkCache.start(); } catch (IOException e) { @@ -533,6 +535,10 @@ public class PulsarService implements AutoCloseable { return executor; } + public ScheduledExecutorService getCacheExecutor() { + return cacheExecutor; + } + public ScheduledExecutorService getLoadManagerExecutor() { return loadManagerExecutor; } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java index 990c363ff32..00b678d63ca 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java @@ -155,11 +155,11 @@ public class ServerCnx extends PulsarHandler { @Override protected void handleLookup(CommandLookupTopic lookup) { - if (log.isDebugEnabled()) { - log.debug("Received Lookup from {}", remoteAddress); - } final long requestId = lookup.getRequestId(); final String topic = lookup.getTopic(); + if (log.isDebugEnabled()) { + log.debug("[{}] Received Lookup from {} for {}", topic, remoteAddress, requestId); + } final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); if (lookupSemaphore.tryAcquire()) { lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topic), lookup.getAuthoritative(), @@ -187,11 +187,11 @@ public class ServerCnx extends PulsarHandler { @Override protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) { - if (log.isDebugEnabled()) { - log.debug("Received PartitionMetadataLookup from {}", remoteAddress); - } final long requestId = partitionMetadata.getRequestId(); final String topic = partitionMetadata.getTopic(); + if (log.isDebugEnabled()) { + log.debug("[{}] Received PartitionMetadataLookup from {} for {}", topic, remoteAddress, requestId); + } final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); if (lookupSemaphore.tryAcquire()) { getPartitionedTopicMetadata(getBrokerService().pulsar(), getRole(), DestinationName.get(topic)) diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/cache/ResourceQuotaCacheTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/cache/ResourceQuotaCacheTest.java index ffb591a249f..ee4232871e2 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/cache/ResourceQuotaCacheTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/cache/ResourceQuotaCacheTest.java @@ -19,8 +19,12 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.apache.zookeeper.MockZooKeeper; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -41,12 +45,15 @@ public class ResourceQuotaCacheTest { private ZooKeeperCache zkCache; private LocalZooKeeperCacheService localCache; private NamespaceBundleFactory bundleFactory; + private OrderedSafeExecutor executor; + private ScheduledExecutorService scheduledExecutor; @BeforeMethod public void setup() throws Exception { pulsar = mock(PulsarService.class); - OrderedSafeExecutor executor = new OrderedSafeExecutor(1, "test"); - zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor, null); + executor = new OrderedSafeExecutor(1, "test"); + scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); + zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor, scheduledExecutor); localCache = new LocalZooKeeperCacheService(zkCache, null); bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32()); @@ -54,6 +61,12 @@ public class ResourceQuotaCacheTest { doReturn(localCache).when(pulsar).getLocalZkCacheService(); } + @AfterMethod + public void teardown() { + executor.shutdown(); + scheduledExecutor.shutdown(); + } + @Test public void testGetSetDefaultQuota() throws Exception { ResourceQuotaCache cache = new ResourceQuotaCache(zkCache); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/OwnershipCacheTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/OwnershipCacheTest.java index 2356994e482..d7381288ce3 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/OwnershipCacheTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/OwnershipCacheTest.java @@ -29,6 +29,8 @@ import static org.testng.Assert.fail; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.apache.zookeeper.KeeperException; @@ -60,6 +62,7 @@ public class OwnershipCacheTest { private NamespaceService nsService; private BrokerService brokerService; private OrderedSafeExecutor executor; + private ScheduledExecutorService scheduledExecutor; @BeforeMethod public void setup() throws Exception { @@ -68,7 +71,8 @@ public class OwnershipCacheTest { pulsar = mock(PulsarService.class); config = mock(ServiceConfiguration.class); executor = new OrderedSafeExecutor(1, "test"); - zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor, null); + scheduledExecutor = Executors.newScheduledThreadPool(2); + zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor, scheduledExecutor); localCache = new LocalZooKeeperCacheService(zkCache, null); bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32()); nsService = mock(NamespaceService.class); @@ -88,6 +92,7 @@ public class OwnershipCacheTest { @AfterMethod public void teardown() throws Exception { executor.shutdown(); + scheduledExecutor.shutdown(); } @Test diff --git a/pulsar-discovery-service/src/main/java/com/yahoo/pulsar/discovery/service/BrokerDiscoveryProvider.java b/pulsar-discovery-service/src/main/java/com/yahoo/pulsar/discovery/service/BrokerDiscoveryProvider.java index a09a26242be..6d50b83b786 100644 --- a/pulsar-discovery-service/src/main/java/com/yahoo/pulsar/discovery/service/BrokerDiscoveryProvider.java +++ b/pulsar-discovery-service/src/main/java/com/yahoo/pulsar/discovery/service/BrokerDiscoveryProvider.java @@ -41,6 +41,8 @@ import com.yahoo.pulsar.discovery.service.web.ZookeeperCacheLoader; import com.yahoo.pulsar.zookeeper.GlobalZooKeeperCache; import com.yahoo.pulsar.zookeeper.ZooKeeperClientFactory; +import io.netty.util.concurrent.DefaultThreadFactory; + /** * Maintains available active broker list and returns next active broker in round-robin for discovery service. * @@ -52,7 +54,8 @@ public class BrokerDiscoveryProvider implements Closeable { private final AtomicInteger counter = new AtomicInteger(); private final OrderedSafeExecutor orderedExecutor = new OrderedSafeExecutor(4, "pulsar-discovery-ordered"); - private final ScheduledExecutorService scheduledExecutorScheduler = Executors.newScheduledThreadPool(1); + private final ScheduledExecutorService scheduledExecutorScheduler = Executors.newScheduledThreadPool(4, + new DefaultThreadFactory("pulsar-discovery")); private static final String PARTITIONED_TOPIC_PATH_ZNODE = "partitioned-topics"; diff --git a/pulsar-discovery-service/src/main/java/com/yahoo/pulsar/discovery/service/web/ZookeeperCacheLoader.java b/pulsar-discovery-service/src/main/java/com/yahoo/pulsar/discovery/service/web/ZookeeperCacheLoader.java index a740e96a04d..8913ca143d5 100644 --- a/pulsar-discovery-service/src/main/java/com/yahoo/pulsar/discovery/service/web/ZookeeperCacheLoader.java +++ b/pulsar-discovery-service/src/main/java/com/yahoo/pulsar/discovery/service/web/ZookeeperCacheLoader.java @@ -19,6 +19,8 @@ import java.io.Closeable; import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.slf4j.Logger; @@ -33,6 +35,8 @@ import com.yahoo.pulsar.zookeeper.ZooKeeperChildrenCache; import com.yahoo.pulsar.zookeeper.ZooKeeperClientFactory; import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache; +import io.netty.util.concurrent.DefaultThreadFactory; + /** * Connects with ZooKeeper and sets watch to listen changes for active broker list. * @@ -47,7 +51,9 @@ public class ZookeeperCacheLoader implements Closeable { private volatile List availableBrokers; - private final OrderedSafeExecutor orderedExecutor = new OrderedSafeExecutor(8, "pulsar-discovery"); + private final OrderedSafeExecutor orderedExecutor = new OrderedSafeExecutor(8, "pulsar-discovery-ordered-cache"); + private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(8, + new DefaultThreadFactory("pulsar-discovery-cache")); public static final String LOADBALANCE_BROKERS_ROOT = "/loadbalance/brokers"; @@ -66,7 +72,7 @@ public class ZookeeperCacheLoader implements Closeable { }); this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), this.orderedExecutor, - null/* cache uses ForkJoinPool if provided scheduler is null to load data-async */); + executor); localZkConnectionSvc.start(exitCode -> { try { localZkCache.getZooKeeper().close(); @@ -106,6 +112,7 @@ public class ZookeeperCacheLoader implements Closeable { @Override public void close() { orderedExecutor.shutdown(); + executor.shutdown(); } private void updateBrokerList(Set brokerNodes) throws Exception { diff --git a/pulsar-zookeeper-utils/src/main/java/com/yahoo/pulsar/zookeeper/ZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/com/yahoo/pulsar/zookeeper/ZooKeeperCache.java index e6ff4a899b7..6923ad3b861 100644 --- a/pulsar-zookeeper-utils/src/main/java/com/yahoo/pulsar/zookeeper/ZooKeeperCache.java +++ b/pulsar-zookeeper-utils/src/main/java/com/yahoo/pulsar/zookeeper/ZooKeeperCache.java @@ -29,6 +29,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import org.apache.bookkeeper.util.OrderedSafeExecutor; @@ -86,6 +87,8 @@ public abstract class ZooKeeperCache implements Watcher { protected AtomicReference zkSession = new AtomicReference(null); public ZooKeeperCache(ZooKeeper zkSession, OrderedSafeExecutor executor, ScheduledExecutorService scheduledExecutor) { + checkNotNull(executor); + checkNotNull(scheduledExecutor); this.executor = executor; this.scheduledExecutor = scheduledExecutor; this.zkSession.set(zkSession); @@ -166,6 +169,10 @@ public abstract class ZooKeeperCache implements Watcher { existsCache.invalidate(path); } + public void asyncInvalidate(String path) { + scheduledExecutor.submit(() -> invalidate(path)); + } + public void invalidate(final String path) { invalidateData(path); invalidateChildren(path); @@ -222,6 +229,7 @@ public abstract class ZooKeeperCache implements Watcher { getDataAsync(path, this, deserializer).thenAccept(data -> { future.complete(data.map(e -> e.getKey())); }).exceptionally(ex -> { + asyncInvalidate(path); if (ex.getCause() instanceof NoNodeException) { future.complete(Optional.empty()); } else { @@ -249,18 +257,22 @@ public abstract class ZooKeeperCache implements Watcher { try { return getDataAsync(path, watcher, deserializer).get(cacheTimeOutInSec, TimeUnit.SECONDS); } catch (ExecutionException e) { + asyncInvalidate(path); Throwable cause = e.getCause(); if (cause instanceof KeeperException) { throw (KeeperException) cause; } else if (cause instanceof InterruptedException) { LOG.warn("Time-out while fetching {} zk-data in {} sec", path, cacheTimeOutInSec); - invalidate(path); throw (InterruptedException) cause; } else if (cause instanceof RuntimeException) { throw (RuntimeException) cause; } else { throw new RuntimeException(cause); } + } catch (TimeoutException e) { + LOG.warn("Time-out while fetching {} zk-data in {} sec", path, cacheTimeOutInSec); + asyncInvalidate(path); + throw e; } } @@ -275,24 +287,30 @@ public abstract class ZooKeeperCache implements Watcher { // Return a future for the z-node to be fetched from ZK CompletableFuture> zkFuture = new CompletableFuture<>(); - this.zkSession.get().getData(path, watcher, (rc, path1, ctx, content, stat) -> { - Executor exec = scheduledExecutor != null ? scheduledExecutor : executor; - if (rc == Code.OK.intValue()) { - try { - T obj = deserializer.deserialize(path, content); - // avoid using the zk-client thread to process the result - exec.execute(() -> zkFuture.complete(new SimpleImmutableEntry(obj, stat))); - } catch (Exception e) { - exec.execute(() -> zkFuture.completeExceptionally(e)); + // Broker doesn't restart on global-zk session lost: so handling unexpected exception + try { + this.zkSession.get().getData(path, watcher, (rc, path1, ctx, content, stat) -> { + Executor exec = scheduledExecutor != null ? scheduledExecutor : executor; + if (rc == Code.OK.intValue()) { + try { + T obj = deserializer.deserialize(path, content); + // avoid using the zk-client thread to process the result + exec.execute(() -> zkFuture.complete(new SimpleImmutableEntry(obj, stat))); + } catch (Exception e) { + exec.execute(() -> zkFuture.completeExceptionally(e)); + } + } else if (rc == Code.NONODE.intValue()) { + // Return null values for missing z-nodes, as this is not "exceptional" condition + exec.execute(() -> zkFuture.complete(null)); + } else { + exec.execute(() -> zkFuture.completeExceptionally(KeeperException.create(rc))); } - } else if (rc == Code.NONODE.intValue()) { - // Return null values for missing z-nodes, as this is not "exceptional" condition - exec.execute(() -> zkFuture.complete(null)); - } else { - exec.execute(() -> zkFuture.completeExceptionally(KeeperException.create(rc))); - } - }, null); - + }, null); + } catch (Exception e) { + LOG.warn("Failed to access zkSession for {} {}", path, e.getMessage(), e); + zkFuture.completeExceptionally(e); + } + return zkFuture; }).thenAccept(result -> { if (result != null) { diff --git a/pulsar-zookeeper-utils/src/main/java/com/yahoo/pulsar/zookeeper/ZooKeeperDataCache.java b/pulsar-zookeeper-utils/src/main/java/com/yahoo/pulsar/zookeeper/ZooKeeperDataCache.java index 061d12c641c..67989a85f6a 100644 --- a/pulsar-zookeeper-utils/src/main/java/com/yahoo/pulsar/zookeeper/ZooKeeperDataCache.java +++ b/pulsar-zookeeper-utils/src/main/java/com/yahoo/pulsar/zookeeper/ZooKeeperDataCache.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +62,7 @@ public abstract class ZooKeeperDataCache implements Deserializer, CacheUpd cache.getDataAsync(path, this, this).thenAccept(entry -> { future.complete(entry.map(Entry::getKey)); }).exceptionally(ex -> { + cache.asyncInvalidate(path); future.completeExceptionally(ex); return null; }); diff --git a/pulsar-zookeeper-utils/src/test/java/com/yahoo/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java b/pulsar-zookeeper-utils/src/test/java/com/yahoo/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java index fcb8e12790c..a663e3c0fb0 100644 --- a/pulsar-zookeeper-utils/src/test/java/com/yahoo/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java +++ b/pulsar-zookeeper-utils/src/test/java/com/yahoo/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java @@ -77,7 +77,7 @@ public class ZkBookieRackAffinityMappingTest { // Case1: ZKCache is given ZkBookieRackAffinityMapping mapping1 = new ZkBookieRackAffinityMapping(); ClientConfiguration bkClientConf1 = new ClientConfiguration(); - bkClientConf1.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) { + bkClientConf1.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) { }); mapping1.setConf(bkClientConf1); List racks1 = mapping1.resolve(Lists.newArrayList(BOOKIE1, BOOKIE2, BOOKIE3)); @@ -104,7 +104,7 @@ public class ZkBookieRackAffinityMappingTest { public void testNoBookieInfo() throws Exception { ZkBookieRackAffinityMapping mapping = new ZkBookieRackAffinityMapping(); ClientConfiguration bkClientConf = new ClientConfiguration(); - bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) { + bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) { }); mapping.setConf(bkClientConf); List racks = mapping.resolve(Lists.newArrayList(BOOKIE1, BOOKIE2, BOOKIE3)); @@ -158,7 +158,7 @@ public class ZkBookieRackAffinityMappingTest { ZkBookieRackAffinityMapping mapping = new ZkBookieRackAffinityMapping(); ClientConfiguration bkClientConf = new ClientConfiguration(); - bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) { + bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) { }); mapping.setConf(bkClientConf); List racks = mapping.resolve(Lists.newArrayList(BOOKIE1, BOOKIE2, BOOKIE3)); diff --git a/pulsar-zookeeper-utils/src/test/java/com/yahoo/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java b/pulsar-zookeeper-utils/src/test/java/com/yahoo/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java index d8ca05b7c96..04cb5951ce5 100644 --- a/pulsar-zookeeper-utils/src/test/java/com/yahoo/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java +++ b/pulsar-zookeeper-utils/src/test/java/com/yahoo/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java @@ -106,7 +106,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest { ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy(); ClientConfiguration bkClientConf = new ClientConfiguration(); - bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) { + bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) { }); bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolationGroups); isolationPolicy.initialize(bkClientConf); @@ -176,7 +176,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest { public void testNoBookieInfo() throws Exception { ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy(); ClientConfiguration bkClientConf = new ClientConfiguration(); - bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) { + bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) { }); bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolationGroups); isolationPolicy.initialize(bkClientConf); @@ -296,7 +296,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest { ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy(); ClientConfiguration bkClientConf = new ClientConfiguration(); - bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) { + bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) { }); isolationPolicy.initialize(bkClientConf); isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies); diff --git a/pulsar-zookeeper-utils/src/test/java/com/yahoo/pulsar/zookeeper/ZookeeperCacheTest.java b/pulsar-zookeeper-utils/src/test/java/com/yahoo/pulsar/zookeeper/ZookeeperCacheTest.java index 2bd1b6f3f2c..71e7cd3096a 100644 --- a/pulsar-zookeeper-utils/src/test/java/com/yahoo/pulsar/zookeeper/ZookeeperCacheTest.java +++ b/pulsar-zookeeper-utils/src/test/java/com/yahoo/pulsar/zookeeper/ZookeeperCacheTest.java @@ -16,6 +16,7 @@ package com.yahoo.pulsar.zookeeper; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertFalse; import static org.testng.Assert.fail; import static org.testng.AssertJUnit.assertNotNull; @@ -33,6 +34,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.bookkeeper.mledger.util.Pair; import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.MockZooKeeper; @@ -67,7 +69,9 @@ public class ZookeeperCacheTest { @Test void testSimpleCache() throws Exception { - ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, null, null /* no executors in unit test */); + OrderedSafeExecutor executor = new OrderedSafeExecutor(1, "test"); + ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); + ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor, scheduledExecutor); ZooKeeperDataCache zkCache = new ZooKeeperDataCache(zkCacheService) { @Override public String deserialize(String key, byte[] content) throws Exception { @@ -101,14 +105,18 @@ public class ZookeeperCacheTest { } catch (Exception e) { // Ok } + executor.shutdown(); + scheduledExecutor.shutdown(); } @Test void testChildrenCache() throws Exception { OrderedSafeExecutor executor = new OrderedSafeExecutor(1, "test"); + ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); + zkClient.create("/test", new byte[0], null, null); - ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor, null); + ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor, scheduledExecutor); ZooKeeperChildrenCache cache = new ZooKeeperChildrenCache(zkCacheService, "/test"); // Create callback counter @@ -154,14 +162,19 @@ public class ZookeeperCacheTest { } assertEquals(notificationCount.get(), 3); + executor.shutdown(); + scheduledExecutor.shutdown(); } @Test void testExistsCache() throws Exception { + OrderedSafeExecutor executor = new OrderedSafeExecutor(1, "test"); + ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); + // Check existence after creation of the node zkClient.create("/test", new byte[0], null, null); Thread.sleep(20); - ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, null /* no executor in unit test */, null); + ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor, scheduledExecutor); boolean exists = zkCacheService.exists("/test"); Assert.assertTrue(exists, "/test should exists in the cache"); @@ -170,15 +183,20 @@ public class ZookeeperCacheTest { Thread.sleep(20); boolean shouldNotExist = zkCacheService.exists("/test"); Assert.assertFalse(shouldNotExist, "/test should not exist in the cache"); + executor.shutdown(); + scheduledExecutor.shutdown(); } @Test void testInvalidateCache() throws Exception { + OrderedSafeExecutor executor = new OrderedSafeExecutor(1, "test"); + ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); + zkClient.create("/test", new byte[0], null, null); zkClient.create("/test/c1", new byte[0], null, null); zkClient.create("/test/c2", new byte[0], null, null); Thread.sleep(20); - ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, null /* no executor in unit test */, null); + ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor, scheduledExecutor); boolean exists = zkCacheService.exists("/test"); Assert.assertTrue(exists, "/test should exists in the cache"); @@ -203,6 +221,8 @@ public class ZookeeperCacheTest { assertNotNull(zkCacheService.getChildren("/test")); zkCacheService.invalidateRoot("/test"); assertNull(zkCacheService.getChildrenIfPresent("/test")); + executor.shutdown(); + scheduledExecutor.shutdown(); } @Test @@ -304,6 +324,7 @@ public class ZookeeperCacheTest { zkCacheService.close(); executor.shutdown(); + scheduledExecutor.shutdown(); // Update shouldn't happen after the last check assertEquals(notificationCount.get(), 1); @@ -317,11 +338,13 @@ public class ZookeeperCacheTest { */ @Test(timeOut = 2000) void testZkCallbackThreadStuck() throws Exception { + OrderedSafeExecutor executor = new OrderedSafeExecutor(1, "test"); + ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2); ExecutorService zkExecutor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("mockZk")); // add readOpDelayMs so, main thread will not serve zkCacahe-returned future and let zkExecutor-thread handle // callback-result process MockZooKeeper zkClient = MockZooKeeper.newInstance(zkExecutor, 100); - ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, null /* no executors in unit test */, null); + ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor, scheduledExecutor); ZooKeeperDataCache zkCache = new ZooKeeperDataCache(zkCacheService) { @Override public String deserialize(String key, byte[] content) throws Exception { @@ -349,5 +372,81 @@ public class ZookeeperCacheTest { }); latch.await(); + executor.shutdown(); + zkExecutor.shutdown(); + scheduledExecutor.shutdown(); + } + + /** + *
+     * Verifies that if {@link ZooKeeperCache} fails to fetch data into the cache then 
+     * (1) it invalidates failed future so, next time it helps to get fresh data from zk
+     * (2) handles zk.getData() unexpected exception if zkSession is lost
+     * 
+ * + * @throws Exception + */ + @Test + public void testInvalidateCacheOnFailure() throws Exception { + ExecutorService zkExecutor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("mockZk")); + OrderedSafeExecutor executor = new OrderedSafeExecutor(1, "test"); + ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); + // add readOpDelayMs so, main thread will not serve zkCacahe-returned future and let zkExecutor-thread handle + // callback-result process + MockZooKeeper zkClient = MockZooKeeper.newInstance(zkExecutor, 100); + ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor, scheduledExecutor); + + final AtomicInteger count = new AtomicInteger(0); + ZooKeeperDataCache zkCache = new ZooKeeperDataCache(zkCacheService) { + @Override + public String deserialize(String key, byte[] content) throws Exception { + if (count.getAndIncrement() == 0) { + throw new NullPointerException("data is null"); + } else { + return new String(content); + } + } + }; + + String value = "test"; + String key1 = "/zkDesrializationExceptionTest"; + String key2 = "/zkSessionExceptionTest"; + zkClient.create(key1, value.getBytes(), null, null); + zkClient.create(key2, value.getBytes(), null, null); + + // (1) deserialization will fail so, result should be exception + try { + zkCache.getAsync(key1).get(); + fail("it should have failed with NPE"); + } catch (Exception e) { + assertTrue(e.getCause() instanceof NullPointerException); + } + + // (2) sleep to let cache to be invalidated async + Thread.sleep(1000); + // (3) now, cache should be invalidate failed-future and should refetch the data + assertEquals(zkCache.getAsync(key1).get().get(), value); + + // (4) make zk-session invalid + ZooKeeper zkSession = zkCacheService.zkSession.get(); + zkCacheService.zkSession.set(null); + + try { + zkCache.getAsync(key2).get(); + fail("it should have failed with NPE"); + } catch (Exception e) { + assertTrue(e.getCause() instanceof NullPointerException); + } + + // global-Zk session is connected now + zkCacheService.zkSession.set(zkSession); + // (5) sleep to let cache to be invalidated async + Thread.sleep(1000); + // (6) now, cache should be invalidate failed-future and should refetch the data + assertEquals(zkCache.getAsync(key1).get().get(), value); + zkExecutor.shutdown(); + executor.shutdown(); + scheduledExecutor.shutdown(); + } } -- GitLab