提交 87d8d72c 编写于 作者: M Matteo Merli 提交者: GitHub

In MockZooKeeper triggers callback without holding mutex (#460)

上级 11f874e2
......@@ -22,6 +22,7 @@ import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.bookkeeper.mledger.util.Pair;
import org.apache.zookeeper.AsyncCallback.Children2Callback;
......@@ -51,7 +52,7 @@ import sun.reflect.ReflectionFactory;
public class MockZooKeeper extends ZooKeeper {
private TreeMap<String, Pair<String, Integer>> tree;
private SetMultimap<String, Watcher> watchers;
private boolean stopped;
private volatile boolean stopped;
private boolean alwaysFail = false;
private ExecutorService executor;
......@@ -62,6 +63,8 @@ public class MockZooKeeper extends ZooKeeper {
private long sessionId = 0L;
private int readOpDelayMs;
private ReentrantLock mutex;
public static MockZooKeeper newInstance() {
return newInstance(null);
}
......@@ -78,6 +81,7 @@ public class MockZooKeeper extends ZooKeeper {
MockZooKeeper zk = MockZooKeeper.class.cast(intConstr.newInstance());
zk.init(executor);
zk.readOpDelayMs = readOpDelayMs;
zk.mutex = new ReentrantLock();
return zk;
} catch (RuntimeException e) {
throw e;
......@@ -116,13 +120,17 @@ public class MockZooKeeper extends ZooKeeper {
}
@Override
public synchronized void register(Watcher watcher) {
public void register(Watcher watcher) {
mutex.lock();
sessionWatcher = watcher;
mutex.unlock();
}
@Override
public synchronized String create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
throws KeeperException, InterruptedException {
mutex.lock();
try {
checkProgrammedFail();
if (stopped)
......@@ -153,17 +161,20 @@ public class MockZooKeeper extends ZooKeeper {
toNotifyParent.addAll(watchers.get(parent));
executor.execute(() -> {
toNotifyParent.forEach(watcher -> watcher
.process(new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected, parent)));
toNotifyParent.forEach(watcher -> watcher.process(
new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected, parent)));
});
}
return path;
} finally {
mutex.unlock();
}
}
@Override
public synchronized void create(final String path, final byte[] data, final List<ACL> acl, CreateMode createMode,
public void create(final String path, final byte[] data, final List<ACL> acl, CreateMode createMode,
final StringCallback cb, final Object ctx) {
if (stopped) {
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
......@@ -173,29 +184,36 @@ public class MockZooKeeper extends ZooKeeper {
executor.execute(() -> {
String parent = path.substring(0, path.lastIndexOf("/"));
synchronized (MockZooKeeper.this) {
mutex.lock();
if (getProgrammedFailStatus()) {
mutex.unlock();
cb.processResult(failReturnCode.intValue(), path, ctx, null);
} else if (stopped) {
mutex.unlock();
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
} else if (tree.containsKey(path)) {
mutex.unlock();
cb.processResult(KeeperException.Code.NODEEXISTS.intValue(), path, ctx, null);
} else if (!parent.isEmpty() && !tree.containsKey(parent)) {
mutex.unlock();
cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
} else {
tree.put(path, Pair.create(new String(data), 0));
mutex.unlock();
cb.processResult(0, path, ctx, null);
if (!parent.isEmpty()) {
watchers.get(parent).forEach(watcher -> watcher.process(
new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected, parent)));
}
}
}
});
}
@Override
public synchronized byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException {
public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException {
mutex.lock();
try {
checkProgrammedFail();
Pair<String, Integer> value = tree.get(path);
if (value == null) {
......@@ -209,6 +227,9 @@ public class MockZooKeeper extends ZooKeeper {
}
return value.first.getBytes();
}
} finally {
mutex.unlock();
}
}
@Override
......@@ -224,8 +245,11 @@ public class MockZooKeeper extends ZooKeeper {
}
Pair<String, Integer> value;
synchronized (MockZooKeeper.this) {
mutex.lock();
try {
value = tree.get(path);
} finally {
mutex.unlock();
}
if (value == null) {
......@@ -242,17 +266,20 @@ public class MockZooKeeper extends ZooKeeper {
public void getData(final String path, final Watcher watcher, final DataCallback cb, final Object ctx) {
executor.execute(() -> {
checkReadOpDelay();
synchronized (MockZooKeeper.this) {
mutex.lock();
if (getProgrammedFailStatus()) {
mutex.unlock();
cb.processResult(failReturnCode.intValue(), path, ctx, null, null);
return;
} else if (stopped) {
mutex.unlock();
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null, null);
return;
}
Pair<String, Integer> value = tree.get(path);
if (value == null) {
mutex.unlock();
cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null, null);
} else {
if (watcher != null) {
......@@ -261,20 +288,22 @@ public class MockZooKeeper extends ZooKeeper {
Stat stat = new Stat();
stat.setVersion(value.second);
mutex.unlock();
cb.processResult(0, path, ctx, value.first.getBytes(), stat);
}
}
});
}
@Override
public void getChildren(final String path, final Watcher watcher, final ChildrenCallback cb, final Object ctx) {
executor.execute(() -> {
synchronized (MockZooKeeper.this) {
mutex.lock();
if (getProgrammedFailStatus()) {
mutex.unlock();
cb.processResult(failReturnCode.intValue(), path, ctx, null);
return;
} else if (stopped) {
mutex.unlock();
cb.processResult(KeeperException.Code.ConnectionLoss, path, ctx, null);
return;
}
......@@ -295,16 +324,19 @@ public class MockZooKeeper extends ZooKeeper {
}
}
mutex.unlock();
cb.processResult(0, path, ctx, children);
if (watcher != null) {
watchers.put(path, watcher);
}
}
});
}
@Override
public synchronized List<String> getChildren(String path, Watcher watcher) throws KeeperException {
public List<String> getChildren(String path, Watcher watcher) throws KeeperException {
mutex.lock();
try {
checkProgrammedFail();
if (!tree.containsKey(path)) {
......@@ -332,11 +364,15 @@ public class MockZooKeeper extends ZooKeeper {
}
return children;
} finally {
mutex.unlock();
}
}
@Override
public synchronized List<String> getChildren(String path, boolean watch)
throws KeeperException, InterruptedException {
public List<String> getChildren(String path, boolean watch) throws KeeperException, InterruptedException {
mutex.lock();
try {
checkProgrammedFail();
if (stopped) {
......@@ -361,19 +397,25 @@ public class MockZooKeeper extends ZooKeeper {
}
}
return children;
} finally {
mutex.unlock();
}
}
@Override
public void getChildren(final String path, boolean watcher, final Children2Callback cb, final Object ctx) {
executor.execute(() -> {
synchronized (MockZooKeeper.this) {
mutex.lock();
if (getProgrammedFailStatus()) {
mutex.unlock();
cb.processResult(failReturnCode.intValue(), path, ctx, null, null);
return;
} else if (stopped) {
mutex.unlock();
cb.processResult(KeeperException.Code.ConnectionLoss, path, ctx, null, null);
return;
} else if (!tree.containsKey(path)) {
mutex.unlock();
cb.processResult(KeeperException.Code.NoNode, path, ctx, null, null);
return;
}
......@@ -396,13 +438,16 @@ public class MockZooKeeper extends ZooKeeper {
}
log.debug("getChildren done path={} result={}", path, children);
mutex.unlock();
cb.processResult(0, path, ctx, children, new Stat());
}
});
}
@Override
public synchronized Stat exists(String path, boolean watch) throws KeeperException, InterruptedException {
public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException {
mutex.lock();
try {
checkProgrammedFail();
if (stopped)
......@@ -415,10 +460,15 @@ public class MockZooKeeper extends ZooKeeper {
} else {
return null;
}
} finally {
mutex.unlock();
}
}
@Override
public synchronized Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException {
public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException {
mutex.lock();
try {
checkProgrammedFail();
if (stopped)
......@@ -435,32 +485,37 @@ public class MockZooKeeper extends ZooKeeper {
} else {
return null;
}
} finally {
mutex.unlock();
}
}
public void exists(String path, boolean watch, StatCallback cb, Object ctx) {
executor.execute(() -> {
synchronized (this) {
mutex.lock();
if (getProgrammedFailStatus()) {
mutex.unlock();
cb.processResult(failReturnCode.intValue(), path, ctx, null);
return;
} else if (stopped) {
mutex.unlock();
cb.processResult(KeeperException.Code.ConnectionLoss, path, ctx, null);
return;
}
if (tree.containsKey(path)) {
mutex.unlock();
cb.processResult(0, path, ctx, new Stat());
} else {
mutex.unlock();
cb.processResult(KeeperException.Code.NoNode, path, ctx, null);
}
}
});
}
@Override
public void sync(String path, VoidCallback cb, Object ctx) {
executor.execute(() -> {
synchronized (this) {
if (getProgrammedFailStatus()) {
cb.processResult(failReturnCode.intValue(), path, ctx);
return;
......@@ -470,18 +525,18 @@ public class MockZooKeeper extends ZooKeeper {
}
cb.processResult(0, path, ctx);
}
});
}
@Override
public Stat setData(final String path, byte[] data, int version) throws KeeperException, InterruptedException {
mutex.lock();
final Set<Watcher> toNotify = Sets.newHashSet();
int newVersion;
synchronized (this) {
try {
checkProgrammedFail();
if (stopped) {
......@@ -505,6 +560,8 @@ public class MockZooKeeper extends ZooKeeper {
toNotify.addAll(watchers.get(path));
watchers.removeAll(path);
} finally {
mutex.unlock();
}
executor.execute(() -> {
......@@ -518,8 +575,7 @@ public class MockZooKeeper extends ZooKeeper {
}
@Override
public synchronized void setData(final String path, final byte[] data, int version, final StatCallback cb,
final Object ctx) {
public void setData(final String path, final byte[] data, int version, final StatCallback cb, final Object ctx) {
if (stopped) {
cb.processResult(KeeperException.Code.ConnectionLoss, path, ctx, null);
return;
......@@ -528,16 +584,20 @@ public class MockZooKeeper extends ZooKeeper {
executor.execute(() -> {
final Set<Watcher> toNotify = Sets.newHashSet();
synchronized (MockZooKeeper.this) {
mutex.lock();
if (getProgrammedFailStatus()) {
mutex.unlock();
cb.processResult(failReturnCode.intValue(), path, ctx, null);
return;
} else if (stopped) {
mutex.unlock();
cb.processResult(KeeperException.Code.ConnectionLoss, path, ctx, null);
return;
}
if (!tree.containsKey(path)) {
mutex.unlock();
cb.processResult(KeeperException.Code.NoNode, path, ctx, null);
return;
}
......@@ -547,6 +607,7 @@ public class MockZooKeeper extends ZooKeeper {
// Check version
if (version != -1 && version != currentVersion) {
log.debug("[{}] Current version: {} -- Expected: {}", path, currentVersion, version);
mutex.unlock();
cb.processResult(KeeperException.Code.BadVersion, path, ctx, null);
return;
}
......@@ -556,12 +617,12 @@ public class MockZooKeeper extends ZooKeeper {
tree.put(path, Pair.create(new String(data), newVersion));
Stat stat = new Stat();
stat.setVersion(newVersion);
cb.processResult(0, path, ctx, stat);
mutex.unlock();
cb.processResult(0, path, ctx, stat);
toNotify.addAll(watchers.get(path));
watchers.removeAll(path);
}
for (Watcher watcher : toNotify) {
watcher.process(new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, path));
......@@ -577,7 +638,8 @@ public class MockZooKeeper extends ZooKeeper {
final Set<Watcher> toNotifyParent;
final String parent;
synchronized (this) {
mutex.lock();
try {
if (stopped) {
throw new KeeperException.ConnectionLossException();
} else if (!tree.containsKey(path)) {
......@@ -605,6 +667,8 @@ public class MockZooKeeper extends ZooKeeper {
}
watchers.removeAll(path);
} finally {
mutex.unlock();
}
executor.execute(() -> {
......@@ -622,8 +686,10 @@ public class MockZooKeeper extends ZooKeeper {
}
@Override
public synchronized void delete(final String path, int version, final VoidCallback cb, final Object ctx) {
public void delete(final String path, int version, final VoidCallback cb, final Object ctx) {
mutex.lock();
if (executor.isShutdown()) {
mutex.unlock();
cb.processResult(KeeperException.Code.SESSIONEXPIRED.intValue(), path, ctx);
return;
}
......@@ -638,13 +704,19 @@ public class MockZooKeeper extends ZooKeeper {
}
executor.execute(() -> {
mutex.lock();
if (getProgrammedFailStatus()) {
mutex.unlock();
cb.processResult(failReturnCode.intValue(), path, ctx);
} else if (stopped) {
mutex.unlock();
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx);
} else if (!tree.containsKey(path)) {
mutex.unlock();
cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx);
} else if (hasChildren(path)) {
mutex.unlock();
cb.processResult(KeeperException.Code.NOTEMPTY.intValue(), path, ctx);
} else {
if (version != -1) {
......@@ -656,6 +728,8 @@ public class MockZooKeeper extends ZooKeeper {
}
tree.remove(path);
mutex.unlock();
cb.processResult(0, path, ctx);
toNotifyDelete.forEach(watcher -> watcher
......@@ -666,17 +740,23 @@ public class MockZooKeeper extends ZooKeeper {
});
watchers.removeAll(path);
mutex.unlock();
}
@Override
public void close() throws InterruptedException {
}
public synchronized void shutdown() throws InterruptedException {
public void shutdown() throws InterruptedException {
mutex.lock();
try {
stopped = true;
tree.clear();
watchers.clear();
executor.shutdownNow();
} finally {
mutex.unlock();
}
}
void checkProgrammedFail() throws KeeperException {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册