提交 4563540c 编写于 作者: M Matteo Merli 提交者: GitHub

[#114] Process ZK event from background thread to avoid deadlock on ZK sync operations (#115)

上级 3c596d49
......@@ -21,6 +21,7 @@ import static com.yahoo.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POL
import static com.yahoo.pulsar.broker.web.PulsarWebResource.joinPath;
import static com.yahoo.pulsar.common.naming.NamespaceBundleFactory.getBundlesData;
import static java.lang.String.format;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import java.net.URI;
import java.net.URL;
......@@ -38,7 +39,6 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -154,7 +154,7 @@ public class NamespaceService {
return jsonMapper().readValue(content, LoadReport.class);
}
};
public URL getWebServiceUrl(ServiceUnitId suName, boolean authoritative, boolean isRequestHttps, boolean readOnly)
throws Exception {
if (suName instanceof DestinationName) {
......@@ -425,7 +425,7 @@ public class NamespaceService {
}
return lookupFuture;
}
private boolean isBrokerActive(String candidateBroker) throws KeeperException, InterruptedException {
Set<String> activeNativeBrokers = pulsar.getLocalZkCache()
.getChildren(SimpleLoadManagerImpl.LOADBALANCE_BROKERS_ROOT);
......@@ -558,33 +558,34 @@ public class NamespaceService {
for (NamespaceBundle sBundle : splittedBundles.getRight()) {
checkNotNull(ownershipCache.tryAcquiringOwnership(sBundle));
}
updateNamespaceBundles(nsname, splittedBundles.getLeft(), new StatCallback() {
public void processResult(int rc, String path, Object zkCtx, Stat stat) {
if (rc == KeeperException.Code.OK.intValue()) {
// disable old bundle
try {
ownershipCache.disableOwnership(bundle);
// invalidate cache as zookeeper has new split
// namespace bundle
bundleFactory.invalidateBundleCache(nsname);
// update bundled_topic cache for load-report-generation
pulsar.getBrokerService().refreshTopicToStatsMaps(bundle);
loadManager.setLoadReportForceUpdateFlag();
future.complete(null);
} catch (Exception e) {
String msg = format("failed to disable bundle %s under namespace [%s] with error %s",
nsname.toString(), bundle.toString(), e.getMessage());
LOG.warn(msg, e);
future.completeExceptionally(new ServiceUnitNotReadyException(msg));
updateNamespaceBundles(nsname, splittedBundles.getLeft(),
(rc, path, zkCtx, stat) -> pulsar.getOrderedExecutor().submit(safeRun(() -> {
if (rc == KeeperException.Code.OK.intValue()) {
// disable old bundle
try {
ownershipCache.disableOwnership(bundle);
// invalidate cache as zookeeper has new split
// namespace bundle
bundleFactory.invalidateBundleCache(nsname);
// update bundled_topic cache for load-report-generation
pulsar.getBrokerService().refreshTopicToStatsMaps(bundle);
loadManager.setLoadReportForceUpdateFlag();
future.complete(null);
} catch (Exception e) {
String msg1 = format(
"failed to disable bundle %s under namespace [%s] with error %s",
nsname.toString(), bundle.toString(), e.getMessage());
LOG.warn(msg1, e);
future.completeExceptionally(new ServiceUnitNotReadyException(msg1));
}
} else {
String msg2 = format("failed to update namespace [%s] policies due to %s",
nsname.toString(),
KeeperException.create(KeeperException.Code.get(rc)).getMessage());
LOG.warn(msg2);
future.completeExceptionally(new ServiceUnitNotReadyException(msg2));
}
} else {
String msg = format("failed to update namespace [%s] policies due to %s", nsname.toString(),
KeeperException.create(KeeperException.Code.get(rc)).getMessage());
LOG.warn(msg);
future.completeExceptionally(new ServiceUnitNotReadyException(msg));
}
}
});
})));
} catch (Exception e) {
String msg = format("failed to aquire ownership of split bundle for namespace [%s], %s",
nsname.toString(), e.getMessage());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册