提交 8ca502cd 编写于 作者: R Rajan Dhabalia 提交者: xiaolong.ran

[pulsar-broker] Fix: invalidate cache on zk-cache timeout (#5298)

* [pulsar-broker] Fix: invalidate cache on zk-cache timeout

* add test

* fix test

(cherry picked from commit dc95abf4)
上级 85af2f06
......@@ -23,6 +23,7 @@ import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.pulsar.zookeeper.ZooKeeperCache.CacheUpdater;
......@@ -92,7 +93,12 @@ public abstract class ZooKeeperDataCache<T> implements Deserializer<T>, CacheUpd
* @throws Exception
*/
public Optional<T> get(final String path) throws Exception {
return getAsync(path).get(zkOperationTimeoutSeconds, TimeUnit.SECONDS);
try {
return getAsync(path).get(zkOperationTimeoutSeconds, TimeUnit.SECONDS);
}catch(TimeoutException e) {
cache.asyncInvalidate(path);
throw e;
}
}
public Optional<Entry<T, Stat>> getWithStat(final String path) throws Exception {
......
......@@ -18,6 +18,10 @@
*/
package org.apache.pulsar.zookeeper;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
......@@ -32,6 +36,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.Collections;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
......@@ -41,14 +46,18 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.zookeeper.AsyncCallback.DataCallback;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
......@@ -492,4 +501,60 @@ public class ZookeeperCacheTest {
assertEquals(zkCache.getAsync(key1).get().get(), value);
zkExecutor.shutdown();
}
/**
* This tests verifies that {{@link ZooKeeperDataCache} invalidates the cache if the get-operation time-out on that
* path.
*
* @throws Exception
*/
@Test
public void testTimedOutZKCacheRequestInvalidates() throws Exception {
OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().build();
ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);
ExecutorService zkExecutor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("mockZk"));
MockZooKeeper zkSession = spy(MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService()));
String path = "test";
doNothing().when(zkSession).getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
zkClient.create("/test", new byte[0], null, null);
// add readOpDelayMs so, main thread will not serve zkCacahe-returned future and let zkExecutor-thread handle
// callback-result process
ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkSession, 1, executor);
ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) {
@Override
public String deserialize(String key, byte[] content) throws Exception {
return new String(content);
}
};
// try to do get on the path which will time-out and async-cache will have non-completed Future
try {
zkCache.get(path);
} catch (Exception e) {
// Ok
}
retryStrategically((test) -> {
return zkCacheService.dataCache.getIfPresent(path) == null;
}, 5, 1000);
assertNull(zkCacheService.dataCache.getIfPresent(path));
executor.shutdown();
zkExecutor.shutdown();
scheduledExecutor.shutdown();
}
private static void retryStrategically(Predicate<Void> predicate, int retryCount, long intSleepTimeInMillis)
throws Exception {
for (int i = 0; i < retryCount; i++) {
if (predicate.test(null) || i == (retryCount - 1)) {
break;
}
Thread.sleep(intSleepTimeInMillis + (intSleepTimeInMillis * i));
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册