提交 f8b051a6 编写于 作者: R Rajan Dhabalia 提交者: GitHub

Handle NPE at load-manager when leader couldn't find available broker (#726)

* Handle NPE at load-manager when leader couldn't find available broker

* avoid throwing exception

* return empty lookup-result if broker not found
上级 aa40ebbe
......@@ -21,6 +21,8 @@ package org.apache.pulsar.broker.loadbalance.impl;
import org.apache.pulsar.broker.loadbalance.ResourceDescription;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import com.google.common.base.Objects;
public class SimpleResourceUnit implements ResourceUnit {
private String resourceId;
......@@ -66,4 +68,10 @@ public class SimpleResourceUnit implements ResourceUnit {
public int hashCode() {
return this.resourceId.hashCode();
}
@Override
public String toString() {
return Objects.toStringHelper(this).add("resourceId", resourceId).toString();
}
}
......@@ -256,7 +256,8 @@ public class DestinationLookup extends PulsarWebResource {
}
if (!lookupResult.isPresent()) {
lookupfuture.complete(newLookupErrorResponse(ServerError.ServiceNotReady, "Namespace bundle is not owned by any broker", requestId));
lookupfuture.complete(newLookupErrorResponse(ServerError.ServiceNotReady,
"No broker was available to own " + fqdn, requestId));
return;
}
......
......@@ -48,6 +48,7 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.client.admin.PulsarAdmin;
......@@ -333,7 +334,12 @@ public class NamespaceService {
if (candidateBroker == null) {
if (!this.loadManager.get().isCentralized() || pulsar.getLeaderElectionService().isLeader()) {
candidateBroker = getLeastLoadedFromLoadManager(bundle);
Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle);
if (!availableBroker.isPresent()) {
lookupFuture.complete(Optional.empty());
return;
}
candidateBroker = availableBroker.get();
} else {
if (authoritative) {
// leader broker already assigned the current broker as owner
......@@ -452,13 +458,19 @@ public class NamespaceService {
* @return
* @throws Exception
*/
private String getLeastLoadedFromLoadManager(ServiceUnitId serviceUnit) throws Exception {
String lookupAddress = loadManager.get().getLeastLoaded(serviceUnit).getResourceId();
private Optional<String> getLeastLoadedFromLoadManager(ServiceUnitId serviceUnit) throws Exception {
ResourceUnit leastLoadedBroker = loadManager.get().getLeastLoaded(serviceUnit);
if (leastLoadedBroker != null) {
String lookupAddress = leastLoadedBroker.getResourceId();
if (LOG.isDebugEnabled()) {
LOG.debug("{} : redirecting to the least loaded broker, lookup address={}", pulsar.getWebServiceAddress(),
lookupAddress);
LOG.debug("{} : redirecting to the least loaded broker, lookup address={}",
pulsar.getWebServiceAddress(), lookupAddress);
}
return Optional.of(lookupAddress);
} else {
LOG.warn("No broker is available for {}", serviceUnit);
return Optional.empty();
}
return lookupAddress;
}
public void unloadNamespace(NamespaceName ns) throws Exception {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册