提交 731e5df6 编写于 作者: B bobbeyreese 提交者: Rajan

Fix Bugs Introduced by New Load Manager (#332)

* Fix bugs introduced by new load manager
上级 74c5b6ef
......@@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory;
import com.yahoo.pulsar.broker.BrokerData;
import com.yahoo.pulsar.broker.BundleData;
import com.yahoo.pulsar.broker.LocalBrokerData;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.TimeAverageBrokerData;
import com.yahoo.pulsar.broker.TimeAverageMessageData;
......@@ -50,7 +51,7 @@ public class LeastLongTermMessageRate implements ModularLoadManagerStrategy {
// max_usage < overload_threshold ? 1 / (overload_threshold - max_usage): Inf
// This weight attempts to discourage the placement of bundles on brokers whose system resource usage is high.
private static double getScore(final BrokerData brokerData, final ServiceConfiguration conf) {
final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100;
final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
double totalMessageRate = 0;
for (BundleData bundleData : brokerData.getPreallocatedBundleData().values()) {
final TimeAverageMessageData longTermData = bundleData.getLongTermData();
......@@ -61,14 +62,10 @@ public class LeastLongTermMessageRate implements ModularLoadManagerStrategy {
if (maxUsage > overloadThreshold) {
return Double.POSITIVE_INFINITY;
}
// 1 / weight is the proportion of load this machine should receive in
// proportion to a machine with no system resource burden.
// This attempts to spread out the load in such a way that
// machines only become overloaded if there is too much
// load for the system to handle (e.g., all machines are
// at least nearly overloaded).
final double weight = maxUsage < overloadThreshold ? 1 / (overloadThreshold - maxUsage)
: Double.POSITIVE_INFINITY;
// 1 / weight is the proportion of load this machine should receive in proportion to a machine with no system
// resource burden. This attempts to spread out the load in such a way that machines only become overloaded if
// there is too much load for the system to handle (e.g., all machines are at least nearly overloaded).
final double weight = 1 / (overloadThreshold - maxUsage);
final double totalMessageRateEstimate = totalMessageRate + timeAverageData.getLongTermMsgRateIn()
+ timeAverageData.getLongTermMsgRateOut();
return weight * totalMessageRateEstimate;
......@@ -95,16 +92,26 @@ public class LeastLongTermMessageRate implements ModularLoadManagerStrategy {
// Maintain of list of all the best scoring brokers and then randomly
// select one of them at the end.
for (String broker : candidates) {
final double score = getScore(loadData.getBrokerData().get(broker), conf);
log.info("{} got score {}", broker, score);
final BrokerData brokerData = loadData.getBrokerData().get(broker);
final double score = getScore(brokerData, conf);
if (score == Double.POSITIVE_INFINITY) {
final LocalBrokerData localData = brokerData.getLocalData();
log.warn(
"Broker {} is overloaded: CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%, BANDWIDTH IN: {}%, "
+ "BANDWIDTH OUT: {}%",
broker, localData.getCpu().percentUsage(), localData.getMemory().percentUsage(),
localData.getDirectMemory().percentUsage(), localData.getBandwidthIn().percentUsage(),
localData.getBandwidthOut().percentUsage());
}
log.debug("{} got score {}", broker, score);
if (score < minScore) {
// Clear best brokers since this score beats the other brokers.
bestBrokers.clear();
bestBrokers.add(broker);
minScore = score;
} else if (score == minScore) {
// Add this broker to best brokers since it ties with the best
// score.
// Add this broker to best brokers since it ties with the best score.
bestBrokers.add(broker);
}
}
......
......@@ -362,11 +362,11 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
brokerDataMap.put(broker, new BrokerData(localData));
}
} catch (Exception e) {
log.warn("Error reading broker data from cache for broker - [{}], [{}]", broker, e);
log.warn("Error reading broker data from cache for broker - [{}], [{}]", broker, e.getMessage());
}
}
} catch (Exception e) {
log.warn("Error reading active brokers list from zookeeper while updating broker data [{}]", e);
log.warn("Error reading active brokers list from zookeeper while updating broker data [{}]", e.getMessage());
}
}
......@@ -486,19 +486,15 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
*/
@Override
public synchronized String selectBrokerForAssignment(final ServiceUnitId serviceUnit) {
// ?: Is it too inefficient to make this synchronized? If so, it may be
// a good idea to use weighted random
// or atomic data.
final String bundle = serviceUnit.toString();
if (preallocatedBundleToBroker.containsKey(bundle)) {
// If the given bundle is already in preallocated, return the
// selected broker.
// If the given bundle is already in preallocated, return the selected broker.
return preallocatedBundleToBroker.get(bundle);
}
final BundleData data = loadData.getBundleData().computeIfAbsent(bundle, key -> getBundleDataOrDefault(bundle));
brokerCandidateCache.clear();
LoadManagerShared.applyPolicies(serviceUnit, policies, brokerCandidateCache, loadData.getBrokerData().keySet());
log.info("{} brokers being considered for assignment of {}", brokerCandidateCache.size(), bundle);
// Use the filter pipeline to finalize broker candidates.
for (BrokerFilter filter : filterPipeline) {
......@@ -531,6 +527,9 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
try {
ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), brokerZnodePath, localData.getJsonBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (KeeperException.NodeExistsException e) {
// Node may already be created by another load manager: in this case update the data.
zkClient.setData(brokerZnodePath, localData.getJsonBytes(), -1);
} catch (Exception e) {
// Catching exception here to print the right error message
log.error("Unable to create znode - [{}] for load balance on zookeeper ", brokerZnodePath, e);
......@@ -554,7 +553,10 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
*/
@Override
public void stop() throws PulsarServerException {
// Do nothing.
availableActiveBrokers.close();
brokerDataCache.clear();
brokerDataCache.close();
scheduler.shutdown();
}
/**
......
......@@ -286,6 +286,12 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
try {
ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), brokerZnodePath,
loadReportJson.getBytes(Charsets.UTF_8), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (KeeperException.NodeExistsException e) {
// Node may already be created by another load manager: in this case update the data.
if (loadReport != null) {
pulsar.getZkClient().setData(brokerZnodePath, loadReportJson.getBytes(Charsets.UTF_8), -1);
}
} catch (Exception e) {
// Catching excption here to print the right error message
log.error("Unable to create znode - [{}] for load balance on zookeeper ", brokerZnodePath, e);
......@@ -1423,6 +1429,9 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
@Override
public void stop() throws PulsarServerException {
// do nothing
loadReportCacheZk.clear();
loadReportCacheZk.close();
availableActiveBrokers.close();
scheduler.shutdown();
}
}
......@@ -17,6 +17,7 @@ package com.yahoo.pulsar.broker.namespace;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.yahoo.pulsar.broker.admin.AdminResource.jsonMapper;
import static com.yahoo.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static com.yahoo.pulsar.broker.web.PulsarWebResource.joinPath;
import static com.yahoo.pulsar.common.naming.NamespaceBundleFactory.getBundlesData;
......@@ -36,7 +37,6 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ServiceLookupData;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.zookeeper.AsyncCallback.StatCallback;
......@@ -46,13 +46,12 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import com.yahoo.pulsar.broker.LocalBrokerData;
import com.yahoo.pulsar.broker.PulsarServerException;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.admin.AdminResource;
import com.yahoo.pulsar.broker.loadbalance.LoadManager;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import com.yahoo.pulsar.broker.lookup.LookupResult;
import com.yahoo.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import com.yahoo.pulsar.client.admin.PulsarAdmin;
......@@ -68,11 +67,12 @@ import com.yahoo.pulsar.common.policies.data.BrokerAssignment;
import com.yahoo.pulsar.common.policies.data.BundlesData;
import com.yahoo.pulsar.common.policies.data.LocalPolicies;
import com.yahoo.pulsar.common.policies.data.NamespaceOwnershipStatus;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ServiceLookupData;
import com.yahoo.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import com.yahoo.pulsar.common.util.Codec;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
import static com.yahoo.pulsar.broker.admin.AdminResource.jsonMapper;
/**
* The <code>NamespaceService</code> provides resource ownership lookup as well as resource ownership claiming services
......
......@@ -114,17 +114,21 @@ public final class ServiceUnitZkUtils {
*/
private static final void cleanupNamespaceNodes(ZooKeeper zkc, String root, String selfBrokerUrl) throws Exception {
// we don't need a watch here since we are only cleaning up the stale ephemeral nodes from previous session
for (String node : zkc.getChildren(root, false)) {
String currentPath = root + "/" + node;
// retrieve the content and try to decode with ServiceLookupData
List<String> children = zkc.getChildren(currentPath, false);
if (children.size() == 0) {
// clean up a single namespace node
cleanupSingleNamespaceNode(zkc, currentPath, selfBrokerUrl);
} else {
// this is an intermediate node, which means this is v2 namespace path
cleanupNamespaceNodes(zkc, currentPath, selfBrokerUrl);
try {
for (String node : zkc.getChildren(root, false)) {
String currentPath = root + "/" + node;
// retrieve the content and try to decode with ServiceLookupData
List<String> children = zkc.getChildren(currentPath, false);
if (children.size() == 0) {
// clean up a single namespace node
cleanupSingleNamespaceNode(zkc, currentPath, selfBrokerUrl);
} else {
// this is an intermediate node, which means this is v2 namespace path
cleanupNamespaceNodes(zkc, currentPath, selfBrokerUrl);
}
}
} catch (NoNodeException nne) {
LOG.info("No children for [{}]", nne.getPath());
}
}
......
......@@ -920,7 +920,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
try {
final LoadManager newLoadManager = LoadManager.create(pulsar);
log.info("Created load manager: {}", className);
pulsar.getLoadManager().get().disableBroker();
pulsar.getLoadManager().get().stop();
newLoadManager.start();
pulsar.getLoadManager().set(newLoadManager);
} catch (Exception ex) {
......
......@@ -38,6 +38,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.test.PortManager;
......@@ -85,6 +87,7 @@ import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.zookeeper.LocalBookkeeperEnsemble;
import com.yahoo.pulsar.zookeeper.LocalZooKeeperCache;
import com.yahoo.pulsar.zookeeper.ZooKeeperCache;
import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;
/**
* Start two brokers in the same cluster and have them connect to the same zookeeper. When the PulsarService starts, it
......@@ -279,7 +282,8 @@ public class LoadBalancerTest {
private AtomicReference<Map<Long, Set<ResourceUnit>>> getSortedRanking(PulsarService pulsar)
throws NoSuchFieldException, IllegalAccessException {
Field ranking = ((SimpleLoadManagerImpl) pulsar.getLoadManager().get()).getClass().getDeclaredField("sortedRankings");
Field ranking = ((SimpleLoadManagerImpl) pulsar.getLoadManager().get()).getClass()
.getDeclaredField("sortedRankings");
ranking.setAccessible(true);
AtomicReference<Map<Long, Set<ResourceUnit>>> sortedRanking = (AtomicReference<Map<Long, Set<ResourceUnit>>>) ranking
.get(pulsar.getLoadManager().get());
......@@ -420,6 +424,23 @@ public class LoadBalancerTest {
}
}
/**
* Ensure that the load manager's zookeeper data cache is shutdown after invoking stop().
*/
@Test
public void testStop() throws Exception {
final SimpleLoadManagerImpl loadManager = (SimpleLoadManagerImpl) pulsarServices[0].getLoadManager().get();
loadManager.stop();
Field loadReportCacheField = SimpleLoadManagerImpl.class.getDeclaredField("loadReportCacheZk");
loadReportCacheField.setAccessible(true);
ZooKeeperDataCache<LoadReport> loadReportCache = (ZooKeeperDataCache<LoadReport>) loadReportCacheField
.get(loadManager);
Field IS_SHUTDOWN_UPDATER = ZooKeeperDataCache.class.getDeclaredField("IS_SHUTDOWN_UPDATER");
IS_SHUTDOWN_UPDATER.setAccessible(true);
final int TRUE = 1;
assert (((AtomicIntegerFieldUpdater<ZooKeeperDataCache>) (IS_SHUTDOWN_UPDATER.get(loadReportCache))).get(loadReportCache) == TRUE);
}
private AtomicReference<Map<String, ResourceQuota>> getRealtimeResourceQuota(PulsarService pulsar)
throws NoSuchFieldException, IllegalAccessException {
Field quotasField = ((SimpleLoadManagerImpl) pulsar.getLoadManager().get()).getClass()
......
package com.yahoo.pulsar.broker.loadbalance;
import java.util.Map;
import org.testng.annotations.Test;
import com.yahoo.pulsar.broker.BrokerData;
import com.yahoo.pulsar.broker.BundleData;
import com.yahoo.pulsar.broker.LocalBrokerData;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.TimeAverageBrokerData;
import com.yahoo.pulsar.broker.loadbalance.impl.LeastLongTermMessageRate;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ResourceUsage;
public class ModularLoadManagerStrategyTest {
// Test that least long term message rate works correctly.
@Test
public void testLeastLongTermMessageRate() {
BundleData bundleData = new BundleData();
BrokerData brokerData1 = initBrokerData();
BrokerData brokerData2 = initBrokerData();
BrokerData brokerData3 = initBrokerData();
brokerData1.getTimeAverageData().setLongTermMsgRateIn(100);
brokerData2.getTimeAverageData().setLongTermMsgRateIn(200);
brokerData3.getTimeAverageData().setLongTermMsgRateIn(300);
LoadData loadData = new LoadData();
Map<String, BrokerData> brokerDataMap = loadData.getBrokerData();
brokerDataMap.put("1", brokerData1);
brokerDataMap.put("2", brokerData2);
brokerDataMap.put("3", brokerData3);
ServiceConfiguration conf = new ServiceConfiguration();
ModularLoadManagerStrategy strategy = new LeastLongTermMessageRate(conf);
assert (strategy.selectBroker(brokerDataMap.keySet(), bundleData, loadData, conf).equals("1"));
brokerData1.getTimeAverageData().setLongTermMsgRateIn(400);
assert (strategy.selectBroker(brokerDataMap.keySet(), bundleData, loadData, conf).equals("2"));
brokerData2.getLocalData().setCpu(new ResourceUsage(90, 100));
assert (strategy.selectBroker(brokerDataMap.keySet(), bundleData, loadData, conf).equals("3"));
}
private BrokerData initBrokerData() {
LocalBrokerData localBrokerData = new LocalBrokerData();
localBrokerData.setCpu(new ResourceUsage());
localBrokerData.setMemory(new ResourceUsage());
localBrokerData.setBandwidthIn(new ResourceUsage());
localBrokerData.setBandwidthOut(new ResourceUsage());
BrokerData brokerData = new BrokerData(localBrokerData);
TimeAverageBrokerData timeAverageBrokerData = new TimeAverageBrokerData();
brokerData.setTimeAverageData(timeAverageBrokerData);
return brokerData;
}
}
......@@ -17,6 +17,7 @@ package com.yahoo.pulsar.zookeeper;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
......@@ -36,10 +37,12 @@ public class ZooKeeperChildrenCache implements Watcher, CacheUpdater<Set<String>
private final ZooKeeperCache cache;
private final String path;
private final List<ZooKeeperCacheListener<Set<String>>> listeners = Lists.newCopyOnWriteArrayList();
private final AtomicBoolean isShutdown;
public ZooKeeperChildrenCache(ZooKeeperCache cache, String path) {
this.cache = cache;
this.path = path;
isShutdown = new AtomicBoolean(false);
}
public Set<String> get() throws KeeperException, InterruptedException {
......@@ -88,6 +91,12 @@ public class ZooKeeperChildrenCache implements Watcher, CacheUpdater<Set<String>
@Override
public void process(WatchedEvent event) {
LOG.debug("[{}] Received ZooKeeper watch event: {}", cache.zkSession.get(), event);
cache.process(event, this);
if (!isShutdown.get()) {
cache.process(event, this);
}
}
public void close() {
isShutdown.set(true);
}
}
......@@ -19,6 +19,8 @@ import java.util.List;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
......@@ -43,6 +45,13 @@ public abstract class ZooKeeperDataCache<T> implements Deserializer<T>, CacheUpd
private final ZooKeeperCache cache;
private final List<ZooKeeperCacheListener<T>> listeners = Lists.newCopyOnWriteArrayList();
private static final int FALSE = 0;
private static final int TRUE = 1;
private static final AtomicIntegerFieldUpdater<ZooKeeperDataCache> IS_SHUTDOWN_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(ZooKeeperDataCache.class, "isShutdown");
private volatile int isShutdown = FALSE;
public ZooKeeperDataCache(final ZooKeeperCache cache) {
this.cache = cache;
}
......@@ -129,6 +138,12 @@ public abstract class ZooKeeperDataCache<T> implements Deserializer<T>, CacheUpd
@Override
public void process(WatchedEvent event) {
LOG.info("[{}] Received ZooKeeper watch event: {}", cache.zkSession.get(), event);
cache.process(event, this);
if (IS_SHUTDOWN_UPDATER.get(this) == FALSE) {
cache.process(event, this);
}
}
public void close() {
IS_SHUTDOWN_UPDATER.set(this, TRUE);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册