提交 09920112 编写于 作者: R Rajan 提交者: Matteo Merli

Add timeout on zkDataCache dependent task (#362)

上级 e7622f3a
......@@ -18,6 +18,7 @@ package com.yahoo.pulsar.broker.authorization;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -26,6 +27,7 @@ import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.cache.ConfigurationCacheService;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.policies.data.AuthAction;
import static com.yahoo.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;
/**
*/
......@@ -55,7 +57,10 @@ public class AuthorizationManager {
public boolean canProduce(DestinationName destination, String role) throws Exception {
try {
return canProduceAsync(destination, role).get();
return canProduceAsync(destination, role).get(cacheTimeOutInSec, SECONDS);
} catch (InterruptedException e) {
log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, destination);
throw e;
} catch (Exception e) {
log.warn("Producer-client with Role - {} failed to get permissions for destination - {}", role,
destination, e);
......@@ -78,7 +83,10 @@ public class AuthorizationManager {
public boolean canConsume(DestinationName destination, String role) throws Exception {
try {
return canConsumeAsync(destination, role).get();
return canConsumeAsync(destination, role).get(cacheTimeOutInSec, SECONDS);
} catch (InterruptedException e) {
log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, destination);
throw e;
} catch (Exception e) {
log.warn("Consumer-client with Role - {} failed to get permissions for destination - {}", role,
destination, e);
......
......@@ -21,7 +21,9 @@ import static com.yahoo.pulsar.broker.admin.AdminResource.jsonMapper;
import static com.yahoo.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static com.yahoo.pulsar.broker.web.PulsarWebResource.joinPath;
import static com.yahoo.pulsar.common.naming.NamespaceBundleFactory.getBundlesData;
import static com.yahoo.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import java.net.URI;
......@@ -607,7 +609,7 @@ public class NamespaceService {
if (!policies.isPresent()) {
// if policies is not present into localZk then create new policies
this.pulsar.getLocalZkCacheService().createPolicies(path, false).get();
this.pulsar.getLocalZkCacheService().createPolicies(path, false).get(cacheTimeOutInSec, SECONDS);
policies = this.pulsar.getLocalZkCacheService().policiesCache().get(path);
}
......@@ -667,17 +669,17 @@ public class NamespaceService {
}
public void removeOwnedServiceUnit(NamespaceName nsName) throws Exception {
ownershipCache.removeOwnership(getFullBundle(nsName)).get();
ownershipCache.removeOwnership(getFullBundle(nsName)).get(cacheTimeOutInSec, SECONDS);
bundleFactory.invalidateBundleCache(nsName);
}
public void removeOwnedServiceUnit(NamespaceBundle nsBundle) throws Exception {
ownershipCache.removeOwnership(nsBundle).get();
ownershipCache.removeOwnership(nsBundle).get(cacheTimeOutInSec, SECONDS);
bundleFactory.invalidateBundleCache(nsBundle.getNamespaceObject());
}
public void removeOwnedServiceUnits(NamespaceName nsName, BundlesData bundleData) throws Exception {
ownershipCache.removeOwnership(bundleFactory.getBundles(nsName, bundleData)).get();
ownershipCache.removeOwnership(bundleFactory.getBundles(nsName, bundleData)).get(cacheTimeOutInSec, SECONDS);
bundleFactory.invalidateBundleCache(nsName);
}
......@@ -710,7 +712,7 @@ public class NamespaceService {
public Optional<NamespaceEphemeralData> getOwner(NamespaceBundle bundle) throws Exception {
// if there is no znode for the service unit, it is not owned by any broker
return getOwnerAsync(bundle).get();
return getOwnerAsync(bundle).get(cacheTimeOutInSec, SECONDS);
}
public CompletableFuture<Optional<NamespaceEphemeralData>> getOwnerAsync(NamespaceBundle bundle) {
......
......@@ -17,6 +17,8 @@ package com.yahoo.pulsar.broker.web;
import static com.google.common.base.Preconditions.checkArgument;
import static com.yahoo.pulsar.common.api.Commands.newLookupResponse;
import static com.yahoo.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.net.URI;
import java.net.URL;
......@@ -498,7 +500,11 @@ public abstract class PulsarWebResource {
*/
protected static void validateReplicationSettingsOnNamespace(PulsarService pulsarService, NamespaceName namespace) {
try {
validateReplicationSettingsOnNamespaceAsync(pulsarService, namespace).get();
validateReplicationSettingsOnNamespaceAsync(pulsarService, namespace).get(cacheTimeOutInSec, SECONDS);
} catch (InterruptedException e) {
log.warn("Time-out {} sec while validating policy on {} ", cacheTimeOutInSec, namespace);
throw new RestException(Status.SERVICE_UNAVAILABLE, String.format(
"Failed to validate global cluster configuration : ns=%s emsg=%s", namespace, e.getMessage()));
} catch (Exception e) {
if(e.getCause() instanceof WebApplicationException) {
throw (WebApplicationException) e.getCause();
......
......@@ -74,6 +74,7 @@ public abstract class ZooKeeperCache implements Watcher {
protected final Cache<String, Set<String>> childrenCache;
protected final Cache<String, Boolean> existsCache;
protected final OrderedSafeExecutor executor;
public static final int cacheTimeOutInSec = 30;
protected AtomicReference<ZooKeeper> zkSession = new AtomicReference<ZooKeeper>(null);
......@@ -115,6 +116,7 @@ public abstract class ZooKeeperCache implements Watcher {
});
} catch (RejectedExecutionException e) {
// Ok, the service is shutting down
LOG.error("Failed to updated zk-cache {} on zk-watch {}", path, e.getMessage());
}
} else {
if (LOG.isDebugEnabled()) {
......@@ -235,12 +237,14 @@ public abstract class ZooKeeperCache implements Watcher {
public <T> Optional<Entry<T, Stat>> getData(final String path, final Watcher watcher,
final Deserializer<T> deserializer) throws Exception {
try {
return getDataAsync(path, watcher, deserializer).get();
return getDataAsync(path, watcher, deserializer).get(cacheTimeOutInSec, TimeUnit.SECONDS);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof KeeperException) {
throw (KeeperException) cause;
} else if (cause instanceof InterruptedException) {
LOG.warn("Time-out while fetching {} zk-data in {} sec", path, cacheTimeOutInSec);
invalidate(path);
throw (InterruptedException) cause;
} else if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册