diff --git a/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/authorization/AuthorizationManager.java b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/authorization/AuthorizationManager.java index 98641387ee039447f9d1c3f654a5fc9f997edfeb..e2bef05f529509a018d5b44ec7744d477bbb7bff 100644 --- a/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/authorization/AuthorizationManager.java +++ b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/authorization/AuthorizationManager.java @@ -18,6 +18,7 @@ package com.yahoo.pulsar.broker.authorization; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import static java.util.concurrent.TimeUnit.SECONDS; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,6 +27,7 @@ import com.yahoo.pulsar.broker.ServiceConfiguration; import com.yahoo.pulsar.broker.cache.ConfigurationCacheService; import com.yahoo.pulsar.common.naming.DestinationName; import com.yahoo.pulsar.common.policies.data.AuthAction; +import static com.yahoo.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec; /** */ @@ -55,7 +57,10 @@ public class AuthorizationManager { public boolean canProduce(DestinationName destination, String role) throws Exception { try { - return canProduceAsync(destination, role).get(); + return canProduceAsync(destination, role).get(cacheTimeOutInSec, SECONDS); + } catch (InterruptedException e) { + log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, destination); + throw e; } catch (Exception e) { log.warn("Producer-client with Role - {} failed to get permissions for destination - {}", role, destination, e); @@ -78,7 +83,10 @@ public class AuthorizationManager { public boolean canConsume(DestinationName destination, String role) throws Exception { try { - return canConsumeAsync(destination, role).get(); + return canConsumeAsync(destination, role).get(cacheTimeOutInSec, SECONDS); + } catch (InterruptedException e) { + log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, destination); + throw e; } catch (Exception e) { log.warn("Consumer-client with Role - {} failed to get permissions for destination - {}", role, destination, e); diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/NamespaceService.java index d859cb985c1d8f76e077cd65ac28f1d6d1d99547..da73eccfb8a55a847a5f826269e7b29ee56f1460 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/NamespaceService.java @@ -21,7 +21,9 @@ import static com.yahoo.pulsar.broker.admin.AdminResource.jsonMapper; import static com.yahoo.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT; import static com.yahoo.pulsar.broker.web.PulsarWebResource.joinPath; import static com.yahoo.pulsar.common.naming.NamespaceBundleFactory.getBundlesData; +import static com.yahoo.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec; import static java.lang.String.format; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; import java.net.URI; @@ -607,7 +609,7 @@ public class NamespaceService { if (!policies.isPresent()) { // if policies is not present into localZk then create new policies - this.pulsar.getLocalZkCacheService().createPolicies(path, false).get(); + this.pulsar.getLocalZkCacheService().createPolicies(path, false).get(cacheTimeOutInSec, SECONDS); policies = this.pulsar.getLocalZkCacheService().policiesCache().get(path); } @@ -667,17 +669,17 @@ public class NamespaceService { } public void removeOwnedServiceUnit(NamespaceName nsName) throws Exception { - ownershipCache.removeOwnership(getFullBundle(nsName)).get(); + ownershipCache.removeOwnership(getFullBundle(nsName)).get(cacheTimeOutInSec, SECONDS); bundleFactory.invalidateBundleCache(nsName); } public void removeOwnedServiceUnit(NamespaceBundle nsBundle) throws Exception { - ownershipCache.removeOwnership(nsBundle).get(); + ownershipCache.removeOwnership(nsBundle).get(cacheTimeOutInSec, SECONDS); bundleFactory.invalidateBundleCache(nsBundle.getNamespaceObject()); } public void removeOwnedServiceUnits(NamespaceName nsName, BundlesData bundleData) throws Exception { - ownershipCache.removeOwnership(bundleFactory.getBundles(nsName, bundleData)).get(); + ownershipCache.removeOwnership(bundleFactory.getBundles(nsName, bundleData)).get(cacheTimeOutInSec, SECONDS); bundleFactory.invalidateBundleCache(nsName); } @@ -710,7 +712,7 @@ public class NamespaceService { public Optional getOwner(NamespaceBundle bundle) throws Exception { // if there is no znode for the service unit, it is not owned by any broker - return getOwnerAsync(bundle).get(); + return getOwnerAsync(bundle).get(cacheTimeOutInSec, SECONDS); } public CompletableFuture> getOwnerAsync(NamespaceBundle bundle) { diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/PulsarWebResource.java index ac6b924ea7bd4093e356d62b3643708c953d6d6c..3f7511c881cebe057a8d35d2929522396a01b9d0 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/PulsarWebResource.java @@ -17,6 +17,8 @@ package com.yahoo.pulsar.broker.web; import static com.google.common.base.Preconditions.checkArgument; import static com.yahoo.pulsar.common.api.Commands.newLookupResponse; +import static com.yahoo.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec; +import static java.util.concurrent.TimeUnit.SECONDS; import java.net.URI; import java.net.URL; @@ -498,7 +500,11 @@ public abstract class PulsarWebResource { */ protected static void validateReplicationSettingsOnNamespace(PulsarService pulsarService, NamespaceName namespace) { try { - validateReplicationSettingsOnNamespaceAsync(pulsarService, namespace).get(); + validateReplicationSettingsOnNamespaceAsync(pulsarService, namespace).get(cacheTimeOutInSec, SECONDS); + } catch (InterruptedException e) { + log.warn("Time-out {} sec while validating policy on {} ", cacheTimeOutInSec, namespace); + throw new RestException(Status.SERVICE_UNAVAILABLE, String.format( + "Failed to validate global cluster configuration : ns=%s emsg=%s", namespace, e.getMessage())); } catch (Exception e) { if(e.getCause() instanceof WebApplicationException) { throw (WebApplicationException) e.getCause(); diff --git a/pulsar-zookeeper-utils/src/main/java/com/yahoo/pulsar/zookeeper/ZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/com/yahoo/pulsar/zookeeper/ZooKeeperCache.java index f9e5b3099d52fe99d71697e0678f3c1a6337447c..6acc98140999f02203034cf6a823a92214b27c6d 100644 --- a/pulsar-zookeeper-utils/src/main/java/com/yahoo/pulsar/zookeeper/ZooKeeperCache.java +++ b/pulsar-zookeeper-utils/src/main/java/com/yahoo/pulsar/zookeeper/ZooKeeperCache.java @@ -74,6 +74,7 @@ public abstract class ZooKeeperCache implements Watcher { protected final Cache> childrenCache; protected final Cache existsCache; protected final OrderedSafeExecutor executor; + public static final int cacheTimeOutInSec = 30; protected AtomicReference zkSession = new AtomicReference(null); @@ -115,6 +116,7 @@ public abstract class ZooKeeperCache implements Watcher { }); } catch (RejectedExecutionException e) { // Ok, the service is shutting down + LOG.error("Failed to updated zk-cache {} on zk-watch {}", path, e.getMessage()); } } else { if (LOG.isDebugEnabled()) { @@ -235,12 +237,14 @@ public abstract class ZooKeeperCache implements Watcher { public Optional> getData(final String path, final Watcher watcher, final Deserializer deserializer) throws Exception { try { - return getDataAsync(path, watcher, deserializer).get(); + return getDataAsync(path, watcher, deserializer).get(cacheTimeOutInSec, TimeUnit.SECONDS); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof KeeperException) { throw (KeeperException) cause; } else if (cause instanceof InterruptedException) { + LOG.warn("Time-out while fetching {} zk-data in {} sec", path, cacheTimeOutInSec); + invalidate(path); throw (InterruptedException) cause; } else if (cause instanceof RuntimeException) { throw (RuntimeException) cause;