提交 09b6bb0c 编写于 作者: R Rajan 提交者: Matteo Merli

handle zkCache failure: invalidate cache and zk-getData failure (#377)

* handle zkCache failure: invalidate cache and zk-getData failure

* introduce separate executor to serve zkcache callback

* add executor to discovery service

* remove testing npe

* add assetion on zkCache executor parameter and update tests

* update executor thread in testcase for intermittent failure
上级 7f63ee98
......@@ -95,6 +95,8 @@ public class PulsarService implements AutoCloseable {
private LocalZooKeeperConnectionService localZooKeeperConnectionProvider;
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(20,
new DefaultThreadFactory("pulsar"));
private final ScheduledExecutorService cacheExecutor = Executors.newScheduledThreadPool(10,
new DefaultThreadFactory("zk-cache-callback"));
private final OrderedSafeExecutor orderedExecutor = new OrderedSafeExecutor(8, "pulsar-ordered");
private ScheduledExecutorService loadManagerExecutor = null;
private ScheduledFuture<?> loadReportTask = null;
......@@ -382,10 +384,10 @@ public class PulsarService implements AutoCloseable {
LOG.info("starting configuration cache service");
this.localZkCache = new LocalZooKeeperCache(getZkClient(), getOrderedExecutor(), this.executor);
this.localZkCache = new LocalZooKeeperCache(getZkClient(), getOrderedExecutor(), this.cacheExecutor);
this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(),
(int) config.getZooKeeperSessionTimeoutMillis(), config.getGlobalZookeeperServers(),
getOrderedExecutor(), this.executor);
getOrderedExecutor(), this.cacheExecutor);
try {
this.globalZkCache.start();
} catch (IOException e) {
......@@ -533,6 +535,10 @@ public class PulsarService implements AutoCloseable {
return executor;
}
public ScheduledExecutorService getCacheExecutor() {
return cacheExecutor;
}
public ScheduledExecutorService getLoadManagerExecutor() {
return loadManagerExecutor;
}
......
......@@ -155,11 +155,11 @@ public class ServerCnx extends PulsarHandler {
@Override
protected void handleLookup(CommandLookupTopic lookup) {
if (log.isDebugEnabled()) {
log.debug("Received Lookup from {}", remoteAddress);
}
final long requestId = lookup.getRequestId();
final String topic = lookup.getTopic();
if (log.isDebugEnabled()) {
log.debug("[{}] Received Lookup from {} for {}", topic, remoteAddress, requestId);
}
final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topic), lookup.getAuthoritative(),
......@@ -187,11 +187,11 @@ public class ServerCnx extends PulsarHandler {
@Override
protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) {
if (log.isDebugEnabled()) {
log.debug("Received PartitionMetadataLookup from {}", remoteAddress);
}
final long requestId = partitionMetadata.getRequestId();
final String topic = partitionMetadata.getTopic();
if (log.isDebugEnabled()) {
log.debug("[{}] Received PartitionMetadataLookup from {} for {}", topic, remoteAddress, requestId);
}
final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
getPartitionedTopicMetadata(getBrokerService().pulsar(), getRole(), DestinationName.get(topic))
......
......@@ -19,8 +19,12 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.zookeeper.MockZooKeeper;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
......@@ -41,12 +45,15 @@ public class ResourceQuotaCacheTest {
private ZooKeeperCache zkCache;
private LocalZooKeeperCacheService localCache;
private NamespaceBundleFactory bundleFactory;
private OrderedSafeExecutor executor;
private ScheduledExecutorService scheduledExecutor;
@BeforeMethod
public void setup() throws Exception {
pulsar = mock(PulsarService.class);
OrderedSafeExecutor executor = new OrderedSafeExecutor(1, "test");
zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor, null);
executor = new OrderedSafeExecutor(1, "test");
scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor, scheduledExecutor);
localCache = new LocalZooKeeperCacheService(zkCache, null);
bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
......@@ -54,6 +61,12 @@ public class ResourceQuotaCacheTest {
doReturn(localCache).when(pulsar).getLocalZkCacheService();
}
@AfterMethod
public void teardown() {
executor.shutdown();
scheduledExecutor.shutdown();
}
@Test
public void testGetSetDefaultQuota() throws Exception {
ResourceQuotaCache cache = new ResourceQuotaCache(zkCache);
......
......@@ -29,6 +29,8 @@ import static org.testng.Assert.fail;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.zookeeper.KeeperException;
......@@ -60,6 +62,7 @@ public class OwnershipCacheTest {
private NamespaceService nsService;
private BrokerService brokerService;
private OrderedSafeExecutor executor;
private ScheduledExecutorService scheduledExecutor;
@BeforeMethod
public void setup() throws Exception {
......@@ -68,7 +71,8 @@ public class OwnershipCacheTest {
pulsar = mock(PulsarService.class);
config = mock(ServiceConfiguration.class);
executor = new OrderedSafeExecutor(1, "test");
zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor, null);
scheduledExecutor = Executors.newScheduledThreadPool(2);
zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor, scheduledExecutor);
localCache = new LocalZooKeeperCacheService(zkCache, null);
bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
nsService = mock(NamespaceService.class);
......@@ -88,6 +92,7 @@ public class OwnershipCacheTest {
@AfterMethod
public void teardown() throws Exception {
executor.shutdown();
scheduledExecutor.shutdown();
}
@Test
......
......@@ -41,6 +41,8 @@ import com.yahoo.pulsar.discovery.service.web.ZookeeperCacheLoader;
import com.yahoo.pulsar.zookeeper.GlobalZooKeeperCache;
import com.yahoo.pulsar.zookeeper.ZooKeeperClientFactory;
import io.netty.util.concurrent.DefaultThreadFactory;
/**
* Maintains available active broker list and returns next active broker in round-robin for discovery service.
*
......@@ -52,7 +54,8 @@ public class BrokerDiscoveryProvider implements Closeable {
private final AtomicInteger counter = new AtomicInteger();
private final OrderedSafeExecutor orderedExecutor = new OrderedSafeExecutor(4, "pulsar-discovery-ordered");
private final ScheduledExecutorService scheduledExecutorScheduler = Executors.newScheduledThreadPool(1);
private final ScheduledExecutorService scheduledExecutorScheduler = Executors.newScheduledThreadPool(4,
new DefaultThreadFactory("pulsar-discovery"));
private static final String PARTITIONED_TOPIC_PATH_ZNODE = "partitioned-topics";
......
......@@ -19,6 +19,8 @@ import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.slf4j.Logger;
......@@ -33,6 +35,8 @@ import com.yahoo.pulsar.zookeeper.ZooKeeperChildrenCache;
import com.yahoo.pulsar.zookeeper.ZooKeeperClientFactory;
import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;
import io.netty.util.concurrent.DefaultThreadFactory;
/**
* Connects with ZooKeeper and sets watch to listen changes for active broker list.
*
......@@ -47,7 +51,9 @@ public class ZookeeperCacheLoader implements Closeable {
private volatile List<LoadReport> availableBrokers;
private final OrderedSafeExecutor orderedExecutor = new OrderedSafeExecutor(8, "pulsar-discovery");
private final OrderedSafeExecutor orderedExecutor = new OrderedSafeExecutor(8, "pulsar-discovery-ordered-cache");
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(8,
new DefaultThreadFactory("pulsar-discovery-cache"));
public static final String LOADBALANCE_BROKERS_ROOT = "/loadbalance/brokers";
......@@ -66,7 +72,7 @@ public class ZookeeperCacheLoader implements Closeable {
});
this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), this.orderedExecutor,
null/* cache uses ForkJoinPool if provided scheduler is null to load data-async */);
executor);
localZkConnectionSvc.start(exitCode -> {
try {
localZkCache.getZooKeeper().close();
......@@ -106,6 +112,7 @@ public class ZookeeperCacheLoader implements Closeable {
@Override
public void close() {
orderedExecutor.shutdown();
executor.shutdown();
}
private void updateBrokerList(Set<String> brokerNodes) throws Exception {
......
......@@ -29,6 +29,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
......@@ -86,6 +87,8 @@ public abstract class ZooKeeperCache implements Watcher {
protected AtomicReference<ZooKeeper> zkSession = new AtomicReference<ZooKeeper>(null);
public ZooKeeperCache(ZooKeeper zkSession, OrderedSafeExecutor executor, ScheduledExecutorService scheduledExecutor) {
checkNotNull(executor);
checkNotNull(scheduledExecutor);
this.executor = executor;
this.scheduledExecutor = scheduledExecutor;
this.zkSession.set(zkSession);
......@@ -166,6 +169,10 @@ public abstract class ZooKeeperCache implements Watcher {
existsCache.invalidate(path);
}
public void asyncInvalidate(String path) {
scheduledExecutor.submit(() -> invalidate(path));
}
public void invalidate(final String path) {
invalidateData(path);
invalidateChildren(path);
......@@ -222,6 +229,7 @@ public abstract class ZooKeeperCache implements Watcher {
getDataAsync(path, this, deserializer).thenAccept(data -> {
future.complete(data.map(e -> e.getKey()));
}).exceptionally(ex -> {
asyncInvalidate(path);
if (ex.getCause() instanceof NoNodeException) {
future.complete(Optional.empty());
} else {
......@@ -249,18 +257,22 @@ public abstract class ZooKeeperCache implements Watcher {
try {
return getDataAsync(path, watcher, deserializer).get(cacheTimeOutInSec, TimeUnit.SECONDS);
} catch (ExecutionException e) {
asyncInvalidate(path);
Throwable cause = e.getCause();
if (cause instanceof KeeperException) {
throw (KeeperException) cause;
} else if (cause instanceof InterruptedException) {
LOG.warn("Time-out while fetching {} zk-data in {} sec", path, cacheTimeOutInSec);
invalidate(path);
throw (InterruptedException) cause;
} else if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else {
throw new RuntimeException(cause);
}
} catch (TimeoutException e) {
LOG.warn("Time-out while fetching {} zk-data in {} sec", path, cacheTimeOutInSec);
asyncInvalidate(path);
throw e;
}
}
......@@ -275,24 +287,30 @@ public abstract class ZooKeeperCache implements Watcher {
// Return a future for the z-node to be fetched from ZK
CompletableFuture<Entry<Object, Stat>> zkFuture = new CompletableFuture<>();
this.zkSession.get().getData(path, watcher, (rc, path1, ctx, content, stat) -> {
Executor exec = scheduledExecutor != null ? scheduledExecutor : executor;
if (rc == Code.OK.intValue()) {
try {
T obj = deserializer.deserialize(path, content);
// avoid using the zk-client thread to process the result
exec.execute(() -> zkFuture.complete(new SimpleImmutableEntry<Object, Stat>(obj, stat)));
} catch (Exception e) {
exec.execute(() -> zkFuture.completeExceptionally(e));
// Broker doesn't restart on global-zk session lost: so handling unexpected exception
try {
this.zkSession.get().getData(path, watcher, (rc, path1, ctx, content, stat) -> {
Executor exec = scheduledExecutor != null ? scheduledExecutor : executor;
if (rc == Code.OK.intValue()) {
try {
T obj = deserializer.deserialize(path, content);
// avoid using the zk-client thread to process the result
exec.execute(() -> zkFuture.complete(new SimpleImmutableEntry<Object, Stat>(obj, stat)));
} catch (Exception e) {
exec.execute(() -> zkFuture.completeExceptionally(e));
}
} else if (rc == Code.NONODE.intValue()) {
// Return null values for missing z-nodes, as this is not "exceptional" condition
exec.execute(() -> zkFuture.complete(null));
} else {
exec.execute(() -> zkFuture.completeExceptionally(KeeperException.create(rc)));
}
} else if (rc == Code.NONODE.intValue()) {
// Return null values for missing z-nodes, as this is not "exceptional" condition
exec.execute(() -> zkFuture.complete(null));
} else {
exec.execute(() -> zkFuture.completeExceptionally(KeeperException.create(rc)));
}
}, null);
}, null);
} catch (Exception e) {
LOG.warn("Failed to access zkSession for {} {}", path, e.getMessage(), e);
zkFuture.completeExceptionally(e);
}
return zkFuture;
}).thenAccept(result -> {
if (result != null) {
......
......@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -61,6 +62,7 @@ public abstract class ZooKeeperDataCache<T> implements Deserializer<T>, CacheUpd
cache.getDataAsync(path, this, this).thenAccept(entry -> {
future.complete(entry.map(Entry::getKey));
}).exceptionally(ex -> {
cache.asyncInvalidate(path);
future.completeExceptionally(ex);
return null;
});
......
......@@ -77,7 +77,7 @@ public class ZkBookieRackAffinityMappingTest {
// Case1: ZKCache is given
ZkBookieRackAffinityMapping mapping1 = new ZkBookieRackAffinityMapping();
ClientConfiguration bkClientConf1 = new ClientConfiguration();
bkClientConf1.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) {
bkClientConf1.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) {
});
mapping1.setConf(bkClientConf1);
List<String> racks1 = mapping1.resolve(Lists.newArrayList(BOOKIE1, BOOKIE2, BOOKIE3));
......@@ -104,7 +104,7 @@ public class ZkBookieRackAffinityMappingTest {
public void testNoBookieInfo() throws Exception {
ZkBookieRackAffinityMapping mapping = new ZkBookieRackAffinityMapping();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) {
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) {
});
mapping.setConf(bkClientConf);
List<String> racks = mapping.resolve(Lists.newArrayList(BOOKIE1, BOOKIE2, BOOKIE3));
......@@ -158,7 +158,7 @@ public class ZkBookieRackAffinityMappingTest {
ZkBookieRackAffinityMapping mapping = new ZkBookieRackAffinityMapping();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) {
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) {
});
mapping.setConf(bkClientConf);
List<String> racks = mapping.resolve(Lists.newArrayList(BOOKIE1, BOOKIE2, BOOKIE3));
......
......@@ -106,7 +106,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) {
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) {
});
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolationGroups);
isolationPolicy.initialize(bkClientConf);
......@@ -176,7 +176,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
public void testNoBookieInfo() throws Exception {
ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) {
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) {
});
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolationGroups);
isolationPolicy.initialize(bkClientConf);
......@@ -296,7 +296,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) {
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) {
});
isolationPolicy.initialize(bkClientConf);
isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
......
......@@ -16,6 +16,7 @@
package com.yahoo.pulsar.zookeeper;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.fail;
import static org.testng.AssertJUnit.assertNotNull;
......@@ -33,6 +34,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.mledger.util.Pair;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.MockZooKeeper;
......@@ -67,7 +69,9 @@ public class ZookeeperCacheTest {
@Test
void testSimpleCache() throws Exception {
ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, null, null /* no executors in unit test */);
OrderedSafeExecutor executor = new OrderedSafeExecutor(1, "test");
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor, scheduledExecutor);
ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) {
@Override
public String deserialize(String key, byte[] content) throws Exception {
......@@ -101,14 +105,18 @@ public class ZookeeperCacheTest {
} catch (Exception e) {
// Ok
}
executor.shutdown();
scheduledExecutor.shutdown();
}
@Test
void testChildrenCache() throws Exception {
OrderedSafeExecutor executor = new OrderedSafeExecutor(1, "test");
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
zkClient.create("/test", new byte[0], null, null);
ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor, null);
ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor, scheduledExecutor);
ZooKeeperChildrenCache cache = new ZooKeeperChildrenCache(zkCacheService, "/test");
// Create callback counter
......@@ -154,14 +162,19 @@ public class ZookeeperCacheTest {
}
assertEquals(notificationCount.get(), 3);
executor.shutdown();
scheduledExecutor.shutdown();
}
@Test
void testExistsCache() throws Exception {
OrderedSafeExecutor executor = new OrderedSafeExecutor(1, "test");
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
// Check existence after creation of the node
zkClient.create("/test", new byte[0], null, null);
Thread.sleep(20);
ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, null /* no executor in unit test */, null);
ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor, scheduledExecutor);
boolean exists = zkCacheService.exists("/test");
Assert.assertTrue(exists, "/test should exists in the cache");
......@@ -170,15 +183,20 @@ public class ZookeeperCacheTest {
Thread.sleep(20);
boolean shouldNotExist = zkCacheService.exists("/test");
Assert.assertFalse(shouldNotExist, "/test should not exist in the cache");
executor.shutdown();
scheduledExecutor.shutdown();
}
@Test
void testInvalidateCache() throws Exception {
OrderedSafeExecutor executor = new OrderedSafeExecutor(1, "test");
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
zkClient.create("/test", new byte[0], null, null);
zkClient.create("/test/c1", new byte[0], null, null);
zkClient.create("/test/c2", new byte[0], null, null);
Thread.sleep(20);
ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, null /* no executor in unit test */, null);
ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor, scheduledExecutor);
boolean exists = zkCacheService.exists("/test");
Assert.assertTrue(exists, "/test should exists in the cache");
......@@ -203,6 +221,8 @@ public class ZookeeperCacheTest {
assertNotNull(zkCacheService.getChildren("/test"));
zkCacheService.invalidateRoot("/test");
assertNull(zkCacheService.getChildrenIfPresent("/test"));
executor.shutdown();
scheduledExecutor.shutdown();
}
@Test
......@@ -304,6 +324,7 @@ public class ZookeeperCacheTest {
zkCacheService.close();
executor.shutdown();
scheduledExecutor.shutdown();
// Update shouldn't happen after the last check
assertEquals(notificationCount.get(), 1);
......@@ -317,11 +338,13 @@ public class ZookeeperCacheTest {
*/
@Test(timeOut = 2000)
void testZkCallbackThreadStuck() throws Exception {
OrderedSafeExecutor executor = new OrderedSafeExecutor(1, "test");
ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);
ExecutorService zkExecutor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("mockZk"));
// add readOpDelayMs so, main thread will not serve zkCacahe-returned future and let zkExecutor-thread handle
// callback-result process
MockZooKeeper zkClient = MockZooKeeper.newInstance(zkExecutor, 100);
ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, null /* no executors in unit test */, null);
ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor, scheduledExecutor);
ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) {
@Override
public String deserialize(String key, byte[] content) throws Exception {
......@@ -349,5 +372,81 @@ public class ZookeeperCacheTest {
});
latch.await();
executor.shutdown();
zkExecutor.shutdown();
scheduledExecutor.shutdown();
}
/**
* <pre>
* Verifies that if {@link ZooKeeperCache} fails to fetch data into the cache then
* (1) it invalidates failed future so, next time it helps to get fresh data from zk
* (2) handles zk.getData() unexpected exception if zkSession is lost
* </pre>
*
* @throws Exception
*/
@Test
public void testInvalidateCacheOnFailure() throws Exception {
ExecutorService zkExecutor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("mockZk"));
OrderedSafeExecutor executor = new OrderedSafeExecutor(1, "test");
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
// add readOpDelayMs so, main thread will not serve zkCacahe-returned future and let zkExecutor-thread handle
// callback-result process
MockZooKeeper zkClient = MockZooKeeper.newInstance(zkExecutor, 100);
ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor, scheduledExecutor);
final AtomicInteger count = new AtomicInteger(0);
ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) {
@Override
public String deserialize(String key, byte[] content) throws Exception {
if (count.getAndIncrement() == 0) {
throw new NullPointerException("data is null");
} else {
return new String(content);
}
}
};
String value = "test";
String key1 = "/zkDesrializationExceptionTest";
String key2 = "/zkSessionExceptionTest";
zkClient.create(key1, value.getBytes(), null, null);
zkClient.create(key2, value.getBytes(), null, null);
// (1) deserialization will fail so, result should be exception
try {
zkCache.getAsync(key1).get();
fail("it should have failed with NPE");
} catch (Exception e) {
assertTrue(e.getCause() instanceof NullPointerException);
}
// (2) sleep to let cache to be invalidated async
Thread.sleep(1000);
// (3) now, cache should be invalidate failed-future and should refetch the data
assertEquals(zkCache.getAsync(key1).get().get(), value);
// (4) make zk-session invalid
ZooKeeper zkSession = zkCacheService.zkSession.get();
zkCacheService.zkSession.set(null);
try {
zkCache.getAsync(key2).get();
fail("it should have failed with NPE");
} catch (Exception e) {
assertTrue(e.getCause() instanceof NullPointerException);
}
// global-Zk session is connected now
zkCacheService.zkSession.set(zkSession);
// (5) sleep to let cache to be invalidated async
Thread.sleep(1000);
// (6) now, cache should be invalidate failed-future and should refetch the data
assertEquals(zkCache.getAsync(key1).get().get(), value);
zkExecutor.shutdown();
executor.shutdown();
scheduledExecutor.shutdown();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册