提交 914e4282 编写于 作者: B bobbeyreese 提交者: Matteo Merli

Add load shedding strategy (#345)

上级 398bb6a7
......@@ -35,12 +35,18 @@ public class LoadData {
*/
private final Map<String, BundleData> bundleData;
/**
* Map from recently unloaded bundles to the timestamp of when they were last loaded.
*/
private final Map<String, Long> recentlyUnloadedBundles;
/**
* Initialize a LoadData.
*/
public LoadData() {
this.brokerData = new ConcurrentHashMap<>();
this.bundleData = new ConcurrentHashMap<>();
this.recentlyUnloadedBundles = new ConcurrentHashMap<>();
}
public Map<String, BrokerData> getBrokerData() {
......@@ -50,4 +56,8 @@ public class LoadData {
public Map<String, BundleData> getBundleData() {
return bundleData;
}
public Map<String, Long> getRecentlyUnloadedBundles() {
return recentlyUnloadedBundles;
}
}
......@@ -26,6 +26,8 @@ import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.admin.AdminResource;
import com.yahoo.pulsar.broker.loadbalance.BrokerHostUsage;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
......@@ -134,7 +136,7 @@ public class LoadManagerShared {
// From a full bundle name, extract the namespace name.
public static String getNamespaceNameFromBundleName(String bundleName) {
// the bundle format is property/cluster/namespace/0x00000000_0xFFFFFFFF
int pos = bundleName.lastIndexOf("/");
int pos = bundleName.lastIndexOf('/');
checkArgument(pos != -1);
return bundleName.substring(0, pos);
}
......@@ -156,4 +158,26 @@ public class LoadManagerShared {
return systemResourceUsage;
}
/**
* If load balancing is enabled, load shedding is enabled by default unless forced off by setting a flag in global
* zk /admin/flags/load-shedding-unload-disabled
*
* @return false by default, unload is allowed in load shedding true if zk flag is set, unload is disabled
*/
public static boolean isUnloadDisabledInLoadShedding(final PulsarService pulsar) {
if (!pulsar.getConfiguration().isLoadBalancerEnabled()) {
return true;
}
boolean unloadDisabledInLoadShedding = false;
try {
unloadDisabledInLoadShedding = pulsar.getGlobalZkCache()
.exists(AdminResource.LOAD_SHEDDING_UNLOAD_DISABLED_FLAG_PATH);
} catch (Exception e) {
log.warn("Unable to fetch contents of [{}] from global zookeeper",
AdminResource.LOAD_SHEDDING_UNLOAD_DISABLED_FLAG_PATH, e);
}
return unloadDisabledInLoadShedding;
}
}
......@@ -18,7 +18,6 @@ package com.yahoo.pulsar.broker.loadbalance.impl;
import static com.yahoo.pulsar.broker.admin.AdminResource.jsonMapper;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
......@@ -41,11 +40,6 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.yahoo.pulsar.broker.BrokerData;
import com.yahoo.pulsar.broker.BundleData;
import com.yahoo.pulsar.broker.LocalBrokerData;
......@@ -61,7 +55,6 @@ import com.yahoo.pulsar.broker.loadbalance.LoadManager;
import com.yahoo.pulsar.broker.loadbalance.LoadSheddingStrategy;
import com.yahoo.pulsar.broker.loadbalance.ModularLoadManager;
import com.yahoo.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
import com.yahoo.pulsar.client.admin.PulsarAdmin;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.ResourceQuota;
import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats;
......@@ -97,9 +90,6 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
// Path to ZNode containing TimeAverageBrokerData jsons for each broker.
public static final String TIME_AVERAGE_BROKER_ZPATH = "/loadbalance/broker-time-average";
// Cache of PulsarAdmins for each broker.
private LoadingCache<String, PulsarAdmin> adminCache;
// ZooKeeper Cache of the currently available active brokers.
// availableActiveBrokers.get() will return a set of the broker names without an http prefix.
private ZooKeeperChildrenCache availableActiveBrokers;
......@@ -153,15 +143,9 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
// Pulsar service used to initialize this.
private PulsarService pulsar;
// Cache for primary brokers according to policies.
private final Set<String> primariesCache;
// Executor service used to regularly update broker data.
private final ScheduledExecutorService scheduler;
// Cache for shard brokers according to policies.
private final Set<String> sharedCache;
// ZooKeeper belonging to the pulsar service.
private ZooKeeper zkClient;
......@@ -177,10 +161,9 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
filterPipeline = new ArrayList<>();
loadData = new LoadData();
loadSheddingPipeline = new ArrayList<>();
loadSheddingPipeline.add(new OverloadShedder(conf));
preallocatedBundleToBroker = new ConcurrentHashMap<>();
primariesCache = new HashSet<>();
scheduler = Executors.newScheduledThreadPool(1);
sharedCache = new HashSet<>();
}
/**
......@@ -191,19 +174,6 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
* The service to initialize with.
*/
public void initialize(final PulsarService pulsar) {
adminCache = CacheBuilder.newBuilder().removalListener(new RemovalListener<String, PulsarAdmin>() {
public void onRemoval(RemovalNotification<String, PulsarAdmin> removal) {
removal.getValue().close();
}
}).expireAfterAccess(1, TimeUnit.DAYS).build(new CacheLoader<String, PulsarAdmin>() {
@Override
public PulsarAdmin load(String key) throws Exception {
// key - broker name already is valid URL, has prefix "http://"
return new PulsarAdmin(new URL(key), pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
}
});
availableActiveBrokers = new ZooKeeperChildrenCache(pulsar.getLocalZkCache(),
LoadManager.LOADBALANCE_BROKERS_ROOT);
availableActiveBrokers.registerListener(new ZooKeeperCacheListener<Set<String>>() {
......@@ -272,6 +242,15 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
}
}
private Set<String> getAvailableBrokers() {
try {
return availableActiveBrokers.get();
} catch (Exception e) {
log.warn("Error when trying to get active brokers", e);
return loadData.getBrokerData().keySet();
}
}
// Attempt to local the data for the given bundle in ZooKeeper.
// If it cannot be found, return the default bundle data.
private BundleData getBundleDataOrDefault(final String bundle) {
......@@ -372,35 +351,31 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
// As the leader broker, update the broker data map in loadData by querying ZooKeeper for the broker data put there
// by each broker via updateLocalBrokerData.
private void updateAllBrokerData() {
try {
Set<String> activeBrokers = availableActiveBrokers.get();
final Map<String, BrokerData> brokerDataMap = loadData.getBrokerData();
for (final String broker : activeBrokers) {
try {
String key = String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, broker);
final LocalBrokerData localData = brokerDataCache.get(key)
.orElseThrow(KeeperException.NoNodeException::new);
if (brokerDataMap.containsKey(broker)) {
// Replace previous local broker data.
brokerDataMap.get(broker).setLocalData(localData);
} else {
// Initialize BrokerData object for previously unseen brokers.
brokerDataMap.put(broker, new BrokerData(localData));
}
} catch (Exception e) {
log.warn("Error reading broker data from cache for broker - [{}], [{}]", broker, e.getMessage());
final Set<String> activeBrokers = getAvailableBrokers();
final Map<String, BrokerData> brokerDataMap = loadData.getBrokerData();
for (String broker : activeBrokers) {
try {
String key = String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, broker);
final LocalBrokerData localData = brokerDataCache.get(key)
.orElseThrow(KeeperException.NoNodeException::new);
if (brokerDataMap.containsKey(broker)) {
// Replace previous local broker data.
brokerDataMap.get(broker).setLocalData(localData);
} else {
// Initialize BrokerData object for previously unseen
// brokers.
brokerDataMap.put(broker, new BrokerData(localData));
}
} catch (Exception e) {
log.warn("Error reading broker data from cache for broker - [{}], [{}]", broker, e.getMessage());
}
// Remove obsolete brokers.
for (final String broker : brokerDataMap.keySet()) {
if (!activeBrokers.contains(broker)) {
brokerDataMap.remove(broker);
}
}
// Remove obsolete brokers.
for (final String broker : brokerDataMap.keySet()) {
if (!activeBrokers.contains(broker)) {
brokerDataMap.remove(broker);
}
} catch (Exception e) {
log.warn("Error reading active brokers list from zookeeper while updating broker data [{}]",
e.getMessage());
}
}
......@@ -410,7 +385,6 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
final Map<String, BundleData> bundleData = loadData.getBundleData();
// Iterate over the broker data.
for (Map.Entry<String, BrokerData> brokerEntry : loadData.getBrokerData().entrySet()) {
final String broker = brokerEntry.getKey();
final BrokerData brokerData = brokerEntry.getValue();
final Map<String, NamespaceBundleStats> statsMap = brokerData.getLocalData().getLastStats();
......@@ -435,7 +409,8 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
// Remove all loaded bundles from the preallocated maps.
final Map<String, BundleData> preallocatedBundleData = brokerData.getPreallocatedBundleData();
if (preallocatedBundleData.containsKey(broker)) {
// Should not iterate with more than one thread at a time.
synchronized (preallocatedBundleData) {
final Iterator<Map.Entry<String, BundleData>> preallocatedIterator = preallocatedBundleData.entrySet()
.iterator();
while (preallocatedIterator.hasNext()) {
......@@ -447,8 +422,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
}
}
// Using the newest data, update the aggregated time-average data
// for the current broker.
// Using the newest data, update the aggregated time-average data for the current broker.
brokerData.getTimeAverageData().reset(statsMap.keySet(), bundleData, defaultStats);
}
}
......@@ -476,19 +450,34 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
*/
@Override
public synchronized void doLoadShedding() {
if (LoadManagerShared.isUnloadDisabledInLoadShedding(pulsar)) {
return;
}
if (getAvailableBrokers().size() <= 1) {
log.info("Only 1 broker available: no load shedding will be performed");
return;
}
// Remove bundles who have been unloaded for longer than the grace period from the recently unloaded
// map.
final long timeout = System.currentTimeMillis()
- TimeUnit.MINUTES.toMillis(conf.getLoadBalancerSheddingGracePeriodMinutes());
final Map<String, Long> recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles();
recentlyUnloadedBundles.keySet().removeIf(e -> recentlyUnloadedBundles.get(e) < timeout);
for (LoadSheddingStrategy strategy : loadSheddingPipeline) {
final Map<String, String> bundlesToUnload = strategy.findBundlesForUnloading(loadData, conf);
if (bundlesToUnload != null && !bundlesToUnload.isEmpty()) {
try {
for (Map.Entry<String, String> entry : bundlesToUnload.entrySet()) {
final String bundle = entry.getKey();
final String broker = entry.getValue();
adminCache.get(broker).namespaces().unloadNamespaceBundle(
final String broker = entry.getKey();
final String bundle = entry.getValue();
log.info("Unloading bundle: {}", bundle);
pulsar.getAdminClient().namespaces().unloadNamespaceBundle(
LoadManagerShared.getNamespaceNameFromBundleName(bundle),
LoadManagerShared.getBundleRangeFromBundleName(bundle));
loadData.getRecentlyUnloadedBundles().put(bundle, System.currentTimeMillis());
}
} catch (Exception e) {
log.warn("Error when trying to perform load shedding: {}", e);
log.warn("Error when trying to perform load shedding", e);
}
return;
}
......@@ -530,18 +519,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
final BundleData data = loadData.getBundleData().computeIfAbsent(bundle,
key -> getBundleDataOrDefault(bundle));
brokerCandidateCache.clear();
Set<String> activeBrokers;
try {
activeBrokers = availableActiveBrokers.get();
} catch (Exception e) {
// Try-catch block inserted because ZooKeeperChildrenCache.get throws checked exception, though we
// should not really see this happen unless something goes very wrong.
log.warn("Unexpected error when trying to get active brokers", e);
// Fall back to using loadData key set.
activeBrokers = loadData.getBrokerData().keySet();
}
LoadManagerShared.applyPolicies(serviceUnit, policies, brokerCandidateCache, activeBrokers);
LoadManagerShared.applyPolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers());
log.info("{} brokers being considered for assignment of {}", brokerCandidateCache.size(), bundle);
// Use the filter pipeline to finalize broker candidates.
......
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.broker.loadbalance.impl;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.yahoo.pulsar.broker.BrokerData;
import com.yahoo.pulsar.broker.BundleData;
import com.yahoo.pulsar.broker.LocalBrokerData;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.TimeAverageMessageData;
import com.yahoo.pulsar.broker.loadbalance.LoadData;
import com.yahoo.pulsar.broker.loadbalance.LoadSheddingStrategy;
/**
* Load shedding strategy which will attempt to shed exactly one bundle on brokers which are overloaded, that is, whose
* maximum system resource usage exceeds loadBalancerBrokerOverloadedThresholdPercentage. A bundle will be recommended
* for unloading off that broker if and only if the following conditions hold: The broker has at least two bundles
* assigned and the broker has at least one bundle that has not been unloaded recently according to
* LoadBalancerSheddingGracePeriodMinutes. The unloaded bundle will be the most expensive bundle in terms of message
* rate that has not been recently unloaded.
*/
public class OverloadShedder implements LoadSheddingStrategy {
private static final Logger log = LoggerFactory.getLogger(OverloadShedder.class);
private Map<String, String> selectedBundlesCache;
/**
* Create an OverloadShedder with the service configuration.
*
* @param conf
* Service configuration to create from.
*/
public OverloadShedder(final ServiceConfiguration conf) {
selectedBundlesCache = new HashMap<>();
}
/**
* Attempt to shed one bundle off every broker which is overloaded.
*
* @param loadData
* The load data to used to make the unloading decision.
* @param conf
* The service configuration.
* @return A map from bundles to unload to the brokers on which they are loaded.
*/
public Map<String, String> findBundlesForUnloading(final LoadData loadData, final ServiceConfiguration conf) {
selectedBundlesCache.clear();
final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
final Map<String, Long> recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles();
for (final Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) {
final String broker = entry.getKey();
final BrokerData brokerData = entry.getValue();
final LocalBrokerData localData = brokerData.getLocalData();
final double maxUsage = localData.getMaxResourceUsage();
if (maxUsage >= overloadThreshold) {
log.info("Attempting to shed load on {}, which has max resource usage {}%", broker, maxUsage);
double maxMessageRate = Double.NEGATIVE_INFINITY;
String mostTaxingBundle = null;
if (localData.getBundles().size() > 1) {
for (final String bundle : localData.getBundles()) {
final BundleData bundleData = loadData.getBundleData().get(bundle);
// Consider short-term message rate to address system resource burden
final TimeAverageMessageData shortTermData = bundleData.getShortTermData();
final double messageRate = shortTermData.getMsgRateIn() + shortTermData.getMsgRateOut();
// The burden of checking the timestamp is for the load manager, not the strategy.
if (messageRate > maxMessageRate && !recentlyUnloadedBundles.containsKey(bundle)) {
maxMessageRate = messageRate;
mostTaxingBundle = bundle;
}
}
if (mostTaxingBundle != null) {
selectedBundlesCache.put(broker, mostTaxingBundle);
} else {
log.warn("Load shedding could not be performed on broker {} because all bundles assigned to it "
+ "have recently been unloaded");
}
} else if (localData.getBundles().size() == 1) {
log.warn(
"HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. "
+ "No Load Shedding will be done on this broker",
localData.getBundles().iterator().next(), broker);
} else {
log.warn("Broker {} is overloaded despite having no bundles", broker);
}
}
}
return selectedBundlesCache;
}
}
......@@ -15,6 +15,7 @@
*/
package com.yahoo.pulsar.broker.loadbalance.impl;
import static com.yahoo.pulsar.broker.admin.AdminResource.jsonMapper;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import java.io.IOException;
......@@ -57,7 +58,6 @@ import com.google.common.collect.TreeMultimap;
import com.yahoo.pulsar.broker.PulsarServerException;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.admin.AdminResource;
import com.yahoo.pulsar.broker.loadbalance.BrokerHostUsage;
import com.yahoo.pulsar.broker.loadbalance.LoadManager;
import com.yahoo.pulsar.broker.loadbalance.PlacementStrategy;
......@@ -77,7 +77,6 @@ import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
import com.yahoo.pulsar.zookeeper.ZooKeeperCacheListener;
import com.yahoo.pulsar.zookeeper.ZooKeeperChildrenCache;
import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;
import static com.yahoo.pulsar.broker.admin.AdminResource.jsonMapper;
public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListener<LoadReport> {
......@@ -141,7 +140,6 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
private ZooKeeperDataCache<LoadReport> loadReportCacheZk;
private ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache;
private BrokerHostUsage brokerHostUsage;
private LoadingCache<String, PulsarAdmin> adminCache;
private LoadingCache<String, Long> unloadedHotNamespaceCache;
public static final String LOADBALANCER_DYNAMIC_SETTING_STRATEGY_ZPATH = "/loadbalance/settings/strategy";
......@@ -177,7 +175,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
// flag to force update load report
private boolean forceLoadReportUpdate = false;
private static final Deserializer<LoadReport> loadReportDeserializer = (key, content) -> jsonMapper()
.readValue(content, LoadReport.class);
.readValue(content, LoadReport.class);
// Perform initializations which may be done without a PulsarService.
public SimpleLoadManagerImpl() {
......@@ -218,18 +216,6 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
return ObjectMapperFactory.getThreadLocal().readValue(content, HashMap.class);
}
};
adminCache = CacheBuilder.newBuilder().removalListener(new RemovalListener<String, PulsarAdmin>() {
public void onRemoval(RemovalNotification<String, PulsarAdmin> removal) {
removal.getValue().close();
}
}).expireAfterAccess(1, TimeUnit.DAYS).build(new CacheLoader<String, PulsarAdmin>() {
@Override
public PulsarAdmin load(String key) throws Exception {
// key - broker name already is valid URL, has prefix "http://"
return new PulsarAdmin(new URL(key), pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
}
});
int entryExpiryTime = (int) pulsar.getConfiguration().getLoadBalancerSheddingGracePeriodMinutes();
unloadedHotNamespaceCache = CacheBuilder.newBuilder().expireAfterWrite(entryExpiryTime, TimeUnit.MINUTES)
.build(new CacheLoader<String, Long>() {
......@@ -1254,37 +1240,15 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
return false;
}
/**
* If load balancing is enabled, load shedding is enabled by default unless forced off by setting a flag in global
* zk /admin/flags/load-shedding-unload-disabled
*
* @return false by default, unload is allowed in load shedding true if zk flag is set, unload is disabled
*/
public boolean isUnloadDisabledInLoadShedding() {
if (!pulsar.getConfiguration().isLoadBalancerEnabled()) {
return true;
}
boolean unloadDisabledInLoadShedding = false;
try {
unloadDisabledInLoadShedding = pulsar.getGlobalZkCache()
.exists(AdminResource.LOAD_SHEDDING_UNLOAD_DISABLED_FLAG_PATH);
} catch (Exception e) {
log.warn("Unable to fetch contents of [{}] from global zookeeper",
AdminResource.LOAD_SHEDDING_UNLOAD_DISABLED_FLAG_PATH, e);
}
return unloadDisabledInLoadShedding;
}
private void unloadNamespacesFromOverLoadedBrokers(Map<ResourceUnit, String> namespaceBundlesToUnload) {
for (Map.Entry<ResourceUnit, String> bundle : namespaceBundlesToUnload.entrySet()) {
String brokerName = bundle.getKey().getResourceId();
String bundleName = bundle.getValue();
try {
if (unloadedHotNamespaceCache.getIfPresent(bundleName) == null) {
if (!isUnloadDisabledInLoadShedding()) {
if (!LoadManagerShared.isUnloadDisabledInLoadShedding(pulsar)) {
log.info("Unloading namespace {} from overloaded broker {}", bundleName, brokerName);
adminCache.get(brokerName).namespaces().unloadNamespaceBundle(
pulsar.getAdminClient().namespaces().unloadNamespaceBundle(
LoadManagerShared.getNamespaceNameFromBundleName(bundleName),
LoadManagerShared.getBundleRangeFromBundleName(bundleName));
log.info("Successfully unloaded namespace {} from broker {}", bundleName, brokerName);
......
......@@ -15,17 +15,26 @@
*/
package com.yahoo.pulsar.broker.loadbalance;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.URL;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.bookkeeper.test.PortManager;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
......@@ -35,10 +44,12 @@ import org.testng.annotations.Test;
import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.google.common.hash.Hashing;
import com.yahoo.pulsar.broker.BrokerData;
import com.yahoo.pulsar.broker.LocalBrokerData;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import com.yahoo.pulsar.client.admin.Namespaces;
import com.yahoo.pulsar.client.admin.PulsarAdmin;
import com.yahoo.pulsar.client.api.Authentication;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
......@@ -46,7 +57,9 @@ import com.yahoo.pulsar.common.naming.NamespaceBundleFactory;
import com.yahoo.pulsar.common.naming.NamespaceBundles;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ResourceUsage;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
import com.yahoo.pulsar.zookeeper.LocalBookkeeperEnsemble;
public class ModularLoadManagerImplTest {
......@@ -174,6 +187,10 @@ public class ModularLoadManagerImplTest {
return makeBundle(all, all, all);
}
private String mockBundleName(final int i) {
return String.format("%d/%d/%d/0x00000000_0xffffffff", i, i, i);
}
@Test
public void testCandidateConsistency() throws Exception {
boolean foundFirst = false;
......@@ -208,8 +225,60 @@ public class ModularLoadManagerImplTest {
}
}
// Test that load shedding works
@Test
public void testLoadShedding() throws Exception {
final NamespaceBundleStats stats1 = new NamespaceBundleStats();
final NamespaceBundleStats stats2 = new NamespaceBundleStats();
stats1.msgRateIn = 100;
stats2.msgRateIn = 200;
final Map<String, NamespaceBundleStats> statsMap = new ConcurrentHashMap<>();
statsMap.put(mockBundleName(1), stats1);
statsMap.put(mockBundleName(2), stats2);
final LocalBrokerData localBrokerData = new LocalBrokerData();
localBrokerData.update(new SystemResourceUsage(), statsMap);
final Namespaces namespacesSpy1 = spy(pulsar1.getAdminClient().namespaces());
AtomicReference<String> bundleReference = new AtomicReference<>();
doAnswer(invocation -> {
bundleReference.set(invocation.getArguments()[0].toString() + '/' + invocation.getArguments()[1]);
return null;
}).when(namespacesSpy1).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString());
setField(pulsar1.getAdminClient(), "namespaces", namespacesSpy1);
pulsar1.getConfiguration().setLoadBalancerEnabled(true);
final LoadData loadData = (LoadData) getField(primaryLoadManager, "loadData");
final Map<String, BrokerData> brokerDataMap = loadData.getBrokerData();
final BrokerData brokerDataSpy1 = spy(brokerDataMap.get(primaryHost));
when(brokerDataSpy1.getLocalData()).thenReturn(localBrokerData);
brokerDataMap.put(primaryHost, brokerDataSpy1);
// Need to update all the bundle data for the shedder to see the spy.
primaryLoadManager.onUpdate(null, null, null);
Thread.sleep(100);
localBrokerData.setCpu(new ResourceUsage(80, 100));
primaryLoadManager.doLoadShedding();
// 80% is below overload threshold: verify nothing is unloaded.
verify(namespacesSpy1, Mockito.times(0)).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString());
localBrokerData.getCpu().usage = 90;
primaryLoadManager.doLoadShedding();
// Most expensive bundle will be unloaded.
verify(namespacesSpy1, Mockito.times(1)).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString());
assert (bundleReference.get().equals(mockBundleName(2)));
primaryLoadManager.doLoadShedding();
// Now less expensive bundle will be unloaded (normally other bundle would move off and nothing would be
// unloaded, but this is not the case due to the spy's behavior).
verify(namespacesSpy1, Mockito.times(2)).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString());
assert (bundleReference.get().equals(mockBundleName(1)));
primaryLoadManager.doLoadShedding();
// Now both are in grace period: neither should be unloaded.
verify(namespacesSpy1, Mockito.times(2)).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString());
}
// Test that ModularLoadManagerImpl will determine that writing local data to ZooKeeper is necessary if certain
// metrics change by a percentage threshold.
@Test
public void testNeedBrokerDataUpdate() throws Exception {
final LocalBrokerData lastData = new LocalBrokerData();
......
......@@ -224,6 +224,10 @@ public class BrokerMonitor {
for (String oldBroker : brokers) {
if (!newBrokers.contains(oldBroker)) {
log.info("Lost broker: " + oldBroker);
synchronized (loadData) {
// Stop including lost broker in global stats.
loadData.remove(oldBroker);
}
}
}
for (String newBroker : newBrokers) {
......
......@@ -19,6 +19,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URL;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
......@@ -37,6 +38,8 @@ import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.google.common.util.concurrent.RateLimiter;
import com.yahoo.pulsar.client.admin.PulsarAdmin;
import com.yahoo.pulsar.client.admin.PulsarAdminException;
import com.yahoo.pulsar.client.api.ClientConfiguration;
import com.yahoo.pulsar.client.api.Consumer;
import com.yahoo.pulsar.client.api.ConsumerConfiguration;
......@@ -74,6 +77,9 @@ public class LoadSimulationClient {
// Map from a full topic name to the TradeUnit created for that topic.
private final Map<String, TradeUnit> topicsToTradeUnits;
// Pulsar admin to create namespaces with.
private final PulsarAdmin admin;
// Pulsar client to create producers and consumers with.
private final PulsarClient client;
......@@ -86,7 +92,6 @@ public class LoadSimulationClient {
// consumption as well as size may be changed at
// any time, and the TradeUnit may also be stopped.
private static class TradeUnit {
Future<Producer> producerFuture;
Future<Consumer> consumerFuture;
final AtomicBoolean stop;
final RateLimiter rateLimiter;
......@@ -102,10 +107,9 @@ public class LoadSimulationClient {
final Map<Integer, byte[]> payloadCache;
public TradeUnit(final TradeConfiguration tradeConf, final PulsarClient client,
final ProducerConfiguration producerConf, final ConsumerConfiguration consumerConf,
final Map<Integer, byte[]> payloadCache) throws Exception {
final ProducerConfiguration producerConf, final ConsumerConfiguration consumerConf,
final Map<Integer, byte[]> payloadCache) throws Exception {
consumerFuture = client.subscribeAsync(tradeConf.topic, "Subscriber-" + tradeConf.topic, consumerConf);
producerFuture = client.createProducerAsync(tradeConf.topic, producerConf);
this.payload = new AtomicReference<>();
this.producerConf = producerConf;
this.payloadCache = payloadCache;
......@@ -143,7 +147,7 @@ public class LoadSimulationClient {
}
public void start() throws Exception {
Producer producer = producerFuture.get();
Producer producer = getNewProducer();
final Consumer consumer = consumerFuture.get();
while (!stop.get()) {
final MutableBoolean wellnessFlag = new MutableBoolean();
......@@ -229,68 +233,75 @@ public class LoadSimulationClient {
final TradeConfiguration tradeConf = new TradeConfiguration();
tradeConf.command = command;
switch (command) {
case CHANGE_COMMAND:
// Change the topic's settings if it exists.
decodeProducerOptions(tradeConf, inputStream);
if (topicsToTradeUnits.containsKey(tradeConf.topic)) {
topicsToTradeUnits.get(tradeConf.topic).change(tradeConf);
}
break;
case STOP_COMMAND:
// Stop the topic if it exists.
tradeConf.topic = inputStream.readUTF();
if (topicsToTradeUnits.containsKey(tradeConf.topic)) {
topicsToTradeUnits.get(tradeConf.topic).stop.set(true);
}
break;
case TRADE_COMMAND:
// Create the topic. It is assumed that the topic does not already exist.
decodeProducerOptions(tradeConf, inputStream);
final TradeUnit tradeUnit = new TradeUnit(tradeConf, client, producerConf, consumerConf, payloadCache);
topicsToTradeUnits.put(tradeConf.topic, tradeUnit);
executor.submit(() -> {
case CHANGE_COMMAND:
// Change the topic's settings if it exists.
decodeProducerOptions(tradeConf, inputStream);
if (topicsToTradeUnits.containsKey(tradeConf.topic)) {
topicsToTradeUnits.get(tradeConf.topic).change(tradeConf);
}
break;
case STOP_COMMAND:
// Stop the topic if it exists.
tradeConf.topic = inputStream.readUTF();
if (topicsToTradeUnits.containsKey(tradeConf.topic)) {
topicsToTradeUnits.get(tradeConf.topic).stop.set(true);
}
break;
case TRADE_COMMAND:
// Create the topic. It is assumed that the topic does not already exist.
decodeProducerOptions(tradeConf, inputStream);
final TradeUnit tradeUnit = new TradeUnit(tradeConf, client, producerConf, consumerConf, payloadCache);
topicsToTradeUnits.put(tradeConf.topic, tradeUnit);
executor.submit(() -> {
try {
final String topic = tradeConf.topic;
final String namespace = topic.substring("persistent://".length(), topic.lastIndexOf('/'));
try {
tradeUnit.start();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
});
break;
case CHANGE_GROUP_COMMAND:
// Change the settings of all topics belonging to a group.
decodeGroupOptions(tradeConf, inputStream);
tradeConf.size = inputStream.readInt();
tradeConf.rate = inputStream.readDouble();
// See if a topic belongs to this tenant and group using this regex.
final String groupRegex = ".*://" + tradeConf.tenant + "/.*/" + tradeConf.group + "-.*/.*";
for (Map.Entry<String, TradeUnit> entry : topicsToTradeUnits.entrySet()) {
final String destination = entry.getKey();
final TradeUnit unit = entry.getValue();
if (destination.matches(groupRegex)) {
unit.change(tradeConf);
admin.namespaces().createNamespace(namespace);
} catch (PulsarAdminException.ConflictException e) {
// Ignore, already created namespace.
}
tradeUnit.start();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
break;
case STOP_GROUP_COMMAND:
// Stop all topics belonging to a group.
decodeGroupOptions(tradeConf, inputStream);
// See if a topic belongs to this tenant and group using this regex.
final String regex = ".*://" + tradeConf.tenant + "/.*/" + tradeConf.group + "-.*/.*";
for (Map.Entry<String, TradeUnit> entry : topicsToTradeUnits.entrySet()) {
final String destination = entry.getKey();
final TradeUnit unit = entry.getValue();
if (destination.matches(regex)) {
unit.stop.set(true);
}
});
break;
case CHANGE_GROUP_COMMAND:
// Change the settings of all topics belonging to a group.
decodeGroupOptions(tradeConf, inputStream);
tradeConf.size = inputStream.readInt();
tradeConf.rate = inputStream.readDouble();
// See if a topic belongs to this tenant and group using this regex.
final String groupRegex = ".*://" + tradeConf.tenant + "/.*/" + tradeConf.group + "-.*/.*";
for (Map.Entry<String, TradeUnit> entry : topicsToTradeUnits.entrySet()) {
final String destination = entry.getKey();
final TradeUnit unit = entry.getValue();
if (destination.matches(groupRegex)) {
unit.change(tradeConf);
}
break;
case FIND_COMMAND:
// Write a single boolean indicating if the topic was found.
outputStream.writeBoolean(topicsToTradeUnits.containsKey(inputStream.readUTF()));
outputStream.flush();
break;
default:
throw new IllegalArgumentException("Unrecognized command code received: " + command);
}
break;
case STOP_GROUP_COMMAND:
// Stop all topics belonging to a group.
decodeGroupOptions(tradeConf, inputStream);
// See if a topic belongs to this tenant and group using this regex.
final String regex = ".*://" + tradeConf.tenant + "/.*/" + tradeConf.group + "-.*/.*";
for (Map.Entry<String, TradeUnit> entry : topicsToTradeUnits.entrySet()) {
final String destination = entry.getKey();
final TradeUnit unit = entry.getValue();
if (destination.matches(regex)) {
unit.stop.set(true);
}
}
break;
case FIND_COMMAND:
// Write a single boolean indicating if the topic was found.
outputStream.writeBoolean(topicsToTradeUnits.containsKey(inputStream.readUTF()));
outputStream.flush();
break;
default:
throw new IllegalArgumentException("Unrecognized command code received: " + command);
}
}
......@@ -299,16 +310,18 @@ public class LoadSimulationClient {
/**
* Create a LoadSimulationClient with the given JCommander arguments.
* @param arguments Arguments to configure this from.
*
* @param arguments
* Arguments to configure this from.
*/
public LoadSimulationClient(final MainArguments arguments) throws Exception {
payloadCache = new ConcurrentHashMap<>();
topicsToTradeUnits = new ConcurrentHashMap<>();
final EventLoopGroup eventLoopGroup = SystemUtils.IS_OS_LINUX
? new EpollEventLoopGroup(Runtime.getRuntime().availableProcessors(),
new DefaultThreadFactory("pulsar-test-client"))
new DefaultThreadFactory("pulsar-test-client"))
: new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(),
new DefaultThreadFactory("pulsar-test-client"));
new DefaultThreadFactory("pulsar-test-client"));
clientConf = new ClientConfiguration();
clientConf.setConnectionsPerBroker(4);
......@@ -328,6 +341,7 @@ public class LoadSimulationClient {
producerConf.setBatchingEnabled(true);
consumerConf = new ConsumerConfiguration();
consumerConf.setMessageListener(ackListener);
admin = new PulsarAdmin(new URL(arguments.serviceURL), clientConf);
client = new PulsarClientImpl(arguments.serviceURL, clientConf, eventLoopGroup);
port = arguments.port;
executor = Executors.newCachedThreadPool(new DefaultThreadFactory("test-client"));
......@@ -335,7 +349,9 @@ public class LoadSimulationClient {
/**
* Start a client with command line arguments.
* @param args Command line arguments to pass in.
*
* @param args
* Command line arguments to pass in.
*/
public static void main(String[] args) throws Exception {
final MainArguments mainArguments = new MainArguments();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册