提交 8d924193 编写于 作者: B bobbeyreese 提交者: Matteo Merli

Always evenly distribute bundles in the same namespace (#388)

上级 bd3fa562
......@@ -39,7 +39,7 @@ public class ResourceQuotaCache {
private static final Logger LOG = LoggerFactory.getLogger(ResourceQuotaCache.class);
// Root path for resource-quota
private static final String RESOURCE_QUOTA_ROOT = "/loadbalance/resource-quota";
public static final String RESOURCE_QUOTA_ROOT = "/loadbalance/resource-quota";
// Serialize/de-serialize JSON objects
private final ObjectMapper jsonMapper = ObjectMapperFactory.create();
// Read only cache
......
......@@ -21,6 +21,7 @@ import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
......@@ -125,6 +126,22 @@ public class LoadManagerShared {
}
}
/**
* Using the given bundles, populate the namespace to bundle range map.
*
* @param bundles
* Bundles with which to populate.
* @param target
* Map to fill.
*/
public static void fillNamespaceToBundlesMap(final Set<String> bundles, final Map<String, Set<String>> target) {
bundles.forEach(bundleName -> {
final String namespaceName = getNamespaceNameFromBundleName(bundleName);
final String bundleRange = getBundleRangeFromBundleName(bundleName);
target.computeIfAbsent(namespaceName, k -> new HashSet<>()).add(bundleRange);
});
}
// From a full bundle name, extract the bundle range.
public static String getBundleRangeFromBundleName(String bundleName) {
// the bundle format is property/cluster/namespace/0x00000000_0xFFFFFFFF
......@@ -180,4 +197,53 @@ public class LoadManagerShared {
}
return unloadDisabledInLoadShedding;
}
/**
* Removes the brokers which have more bundles assigned to them in the same namespace as the incoming bundle than at
* least one other available broker from consideration.
*
* @param assignedBundleName
* Name of bundle to be assigned.
* @param candidates
* Brokers available for placement.
* @param brokerToNamespaceToBundleRange
* Map from brokers to namespaces to bundle ranges.
*/
public static void removeMostServicingBrokersForNamespace(final String assignedBundleName,
final Set<String> candidates, final Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange) {
if (candidates.isEmpty()) {
return;
}
final String namespaceName = getNamespaceNameFromBundleName(assignedBundleName);
int leastBundles = Integer.MAX_VALUE;
for (final String broker : candidates) {
if (brokerToNamespaceToBundleRange.containsKey(broker)) {
final Set<String> bundleRanges = brokerToNamespaceToBundleRange.get(broker).get(namespaceName);
if (bundleRanges == null) {
// Assume that when the namespace is absent, there are no bundles for this namespace assigned to
// that broker.
leastBundles = 0;
break;
}
leastBundles = Math.min(leastBundles, bundleRanges.size());
} else {
// Assume non-present brokers have 0 bundles.
leastBundles = 0;
break;
}
}
if (leastBundles == 0) {
// By assumption, the namespace name will not be present if there are no bundles in the namespace
// assigned to the broker.
candidates.removeIf(broker -> brokerToNamespaceToBundleRange.containsKey(broker)
&& brokerToNamespaceToBundleRange.get(broker).containsKey(namespaceName));
} else {
final int finalLeastBundles = leastBundles;
// We may safely assume that each broker has at least one bundle for this namespace.
// Note that this case is far less likely since it implies that there are at least as many bundles for this
// namespace as brokers.
candidates.removeIf(broker -> brokerToNamespaceToBundleRange.get(broker).get(namespaceName)
.size() != finalLeastBundles);
}
}
}
......@@ -19,6 +19,7 @@ import static com.yahoo.pulsar.broker.admin.AdminResource.jsonMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
......@@ -103,6 +104,10 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
// Broker host usage object used to calculate system resource usage.
private BrokerHostUsage brokerHostUsage;
// Map from brokers to namespaces to the bundle ranges in that namespace assigned to that broker.
// Used to distribute bundles within a namespace evely across brokers.
private final Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange;
// Path to the ZNode containing the LocalBrokerData json for this broker.
private String brokerZnodePath;
......@@ -157,6 +162,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
*/
public ModularLoadManagerImpl() {
brokerCandidateCache = new HashSet<>();
brokerToNamespaceToBundleRange = new HashMap<>();
defaultStats = new NamespaceBundleStats();
filterPipeline = new ArrayList<>();
loadData = new LoadData();
......@@ -385,6 +391,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
final Map<String, BundleData> bundleData = loadData.getBundleData();
// Iterate over the broker data.
for (Map.Entry<String, BrokerData> brokerEntry : loadData.getBrokerData().entrySet()) {
final String broker = brokerEntry.getKey();
final BrokerData brokerData = brokerEntry.getValue();
final Map<String, NamespaceBundleStats> statsMap = brokerData.getLocalData().getLastStats();
......@@ -424,6 +431,13 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
// Using the newest data, update the aggregated time-average data for the current broker.
brokerData.getTimeAverageData().reset(statsMap.keySet(), bundleData, defaultStats);
final Map<String, Set<String>> namespaceToBundleRange = brokerToNamespaceToBundleRange
.computeIfAbsent(broker, k -> new HashMap<>());
synchronized (namespaceToBundleRange) {
namespaceToBundleRange.clear();
LoadManagerShared.fillNamespaceToBundlesMap(statsMap.keySet(), namespaceToBundleRange);
LoadManagerShared.fillNamespaceToBundlesMap(preallocatedBundleData.keySet(), namespaceToBundleRange);
}
}
}
......@@ -520,6 +534,8 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
key -> getBundleDataOrDefault(bundle));
brokerCandidateCache.clear();
LoadManagerShared.applyPolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers());
LoadManagerShared.removeMostServicingBrokersForNamespace(serviceUnit.toString(), brokerCandidateCache,
brokerToNamespaceToBundleRange);
log.info("{} brokers being considered for assignment of {}", brokerCandidateCache.size(), bundle);
// Use the filter pipeline to finalize broker candidates.
......@@ -532,6 +548,10 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
loadData.getBrokerData().get(broker).getPreallocatedBundleData().put(bundle, data);
preallocatedBundleToBroker.put(bundle, broker);
final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
brokerToNamespaceToBundleRange.get(broker).computeIfAbsent(namespaceName, k -> new HashSet<>())
.add(bundleRange);
return broker;
}
}
......
......@@ -107,6 +107,10 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
private final Set<String> bundleGainsCache;
private final Set<String> bundleLossesCache;
// Map from brokers to namespaces to the bundle ranges in that namespace assigned to that broker.
// Used to distribute bundles within a namespace evely across brokers.
private final Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange;
// CPU usage per msg/sec
private double realtimeCpuLoadFactor = 0.025;
// memory usage per 500 (topics + producers + consumers)
......@@ -151,7 +155,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
private static final String LOADBALANCER_DYNAMIC_SETTING_AUTO_BUNDLE_SPLIT_ENABLED = "/loadbalance/settings/auto_bundle_split_enabled";
private static final String SETTING_NAME_LOAD_FACTOR_CPU = "loadFactorCPU";
private static final String SETTING_NAME_LOAD_FACTOR_MEM = "loadFactorMemory";
private static final String SETTING_NAME_STRATEGY = "loadBalancerStrategy";
public static final String SETTING_NAME_STRATEGY = "loadBalancerStrategy";
private static final String SETTING_NAME_OVERLOAD_THRESHOLD = "overloadThreshold";
private static final String SETTING_NAME_UNDERLOAD_THRESHOLD = "underloadThreshold";
private static final String SETTING_NAME_COMFORTLOAD_THRESHOLD = "comfortLoadThreshold";
......@@ -191,6 +195,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
bundleLossesCache = new HashSet<>();
brokerCandidateCache = new HashSet<>();
availableBrokersCache = new HashSet<>();
brokerToNamespaceToBundleRange = new HashMap<>();
}
@Override
......@@ -716,9 +721,9 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
updateLoadBalancingMetrics(hostname, finalRank, ranking);
}
}
updateBrokerToNamespaceToBundle();
this.sortedRankings.set(newSortedRankings);
this.resourceUnitRankings = newResourceUnitRankings;
} else {
log.info("Leader broker[{}] No ResourceUnits to rank this run, Using Old Ranking",
pulsar.getWebServiceAddress());
......@@ -798,12 +803,8 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
continue;
}
// check if this ServiceUnit is already pre-allocated
String resourceUnitId = candidate.getResourceId();
ResourceUnitRanking ranking = resourceUnitRankings.get(candidate);
if (ranking.isServiceUnitPreAllocated(serviceUnitId)) {
return candidate;
}
// check if this ServiceUnit is already loaded
if (ranking.isServiceUnitLoaded(serviceUnitId)) {
......@@ -864,7 +865,14 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
String loadPercentageDesc = ranking.getEstimatedLoadPercentageString();
log.info("Assign {} to {} with ({}).", serviceUnitId, selectedRU.getResourceId(), loadPercentageDesc);
if (!ranking.isServiceUnitPreAllocated(serviceUnitId)) {
final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(serviceUnitId);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(serviceUnitId);
ResourceQuota quota = this.getResourceQuota(serviceUnitId);
// Add preallocated bundle range so incoming bundles from the same namespace are not assigned to the
// same broker.
brokerToNamespaceToBundleRange
.computeIfAbsent(selectedRU.getResourceId().replace("http://", ""), k -> new HashMap<>())
.computeIfAbsent(namespaceName, k -> new HashSet<>()).add(bundleRange);
ranking.addPreAllocatedServiceUnit(serviceUnitId, quota);
resourceUnitRankings.put(selectedRU, ranking);
}
......@@ -893,6 +901,9 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
return result;
}
// As long as there is at least one broker left, this will always leave brokerCandidateCache non-empty.
LoadManagerShared.removeMostServicingBrokersForNamespace(serviceUnit.toString(), brokerCandidateCache,
brokerToNamespaceToBundleRange);
// After LoadManagerShared is finished applying the filter, put the results back into a multimap.
for (final Map.Entry<Long, Set<ResourceUnit>> entry : availableBrokers.entrySet()) {
final Long rank = entry.getKey();
......@@ -938,9 +949,17 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
return availableBrokers;
}
private ResourceUnit getLeastLoadedBroker(ServiceUnitId serviceUnit,
private synchronized ResourceUnit getLeastLoadedBroker(ServiceUnitId serviceUnit,
Map<Long, Set<ResourceUnit>> availableBrokers) {
ResourceUnit selectedBroker = null;
// If the broker is already assigned, return that candidate.
for (final Map.Entry<ResourceUnit, ResourceUnitRanking> entry : resourceUnitRankings.entrySet()) {
final ResourceUnit resourceUnit = entry.getKey();
final ResourceUnitRanking ranking = entry.getValue();
if (ranking.isServiceUnitPreAllocated(serviceUnit.toString())) {
return resourceUnit;
}
}
Multimap<Long, ResourceUnit> finalCandidates = getFinalCandidates(serviceUnit, availableBrokers);
// Remove candidates that point to inactive brokers
Set<String> activeBrokers = Collections.emptySet();
......@@ -966,8 +985,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
} else {
selectedBroker = placementStrategy.findBrokerForPlacement(finalCandidates);
}
log.info("Selected : [{}] for ServiceUnit : [{}]", selectedBroker.getResourceId(),
serviceUnit.getNamespaceObject().toString());
log.info("Selected : [{}] for ServiceUnit : [{}]", selectedBroker.getResourceId(), serviceUnit.toString());
return selectedBroker;
} else {
// No available broker found
......@@ -1240,6 +1258,20 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
return false;
}
// Update the brokerToNamespaceToBundleRange map with the current preallocated and assigned bundle data.
private synchronized void updateBrokerToNamespaceToBundle() {
resourceUnitRankings.forEach((resourceUnit, ranking) -> {
final String broker = resourceUnit.getResourceId();
final Set<String> loadedBundles = ranking.getLoadedBundles();
final Set<String> preallocatedBundles = resourceUnitRankings.get(resourceUnit).getPreAllocatedBundles();
final Map<String, Set<String>> namespaceToBundleRange = brokerToNamespaceToBundleRange
.computeIfAbsent(broker.replace("http://", ""), k -> new HashMap<>());
namespaceToBundleRange.clear();
LoadManagerShared.fillNamespaceToBundlesMap(loadedBundles, namespaceToBundleRange);
LoadManagerShared.fillNamespaceToBundlesMap(preallocatedBundles, namespaceToBundleRange);
});
}
private void unloadNamespacesFromOverLoadedBrokers(Map<ResourceUnit, String> namespaceBundlesToUnload) {
for (Map.Entry<ResourceUnit, String> bundle : namespaceBundlesToUnload.entrySet()) {
String brokerName = bundle.getKey().getResourceId();
......
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.broker.loadbalance;
import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.naming.NamespaceBundleFactory;
import com.yahoo.pulsar.common.naming.NamespaceBundles;
import com.yahoo.pulsar.common.naming.NamespaceName;
public class LoadBalancerTestingUtils {
public static NamespaceBundle[] makeBundles(final NamespaceBundleFactory nsFactory, final String property,
final String cluster, final String namespace, final int numBundles) {
final NamespaceBundle[] result = new NamespaceBundle[numBundles];
final NamespaceName namespaceName = new NamespaceName(property, cluster, namespace);
for (int i = 0; i < numBundles - 1; ++i) {
final long lower = NamespaceBundles.FULL_UPPER_BOUND * i / numBundles;
final long upper = NamespaceBundles.FULL_UPPER_BOUND * (i + 1) / numBundles;
result[i] = nsFactory.getBundle(namespaceName, Range.range(lower, BoundType.CLOSED, upper, BoundType.OPEN));
}
result[numBundles - 1] = nsFactory.getBundle(namespaceName,
Range.range(NamespaceBundles.FULL_UPPER_BOUND * (numBundles - 1) / numBundles, BoundType.CLOSED,
NamespaceBundles.FULL_UPPER_BOUND, BoundType.CLOSED));
return result;
}
}
......@@ -34,6 +34,9 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.bookkeeper.test.PortManager;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -45,9 +48,11 @@ import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.google.common.hash.Hashing;
import com.yahoo.pulsar.broker.BrokerData;
import com.yahoo.pulsar.broker.BundleData;
import com.yahoo.pulsar.broker.LocalBrokerData;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.TimeAverageMessageData;
import com.yahoo.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import com.yahoo.pulsar.client.admin.Namespaces;
import com.yahoo.pulsar.client.admin.PulsarAdmin;
......@@ -225,6 +230,36 @@ public class ModularLoadManagerImplTest {
}
}
// Test that bundles belonging to the same namespace are distributed evenly among brokers.
@Test
public void testEvenBundleDistribution() throws Exception {
final NamespaceBundle[] bundles = LoadBalancerTestingUtils.makeBundles(nsFactory, "test", "test", "test", 16);
int numAssignedToPrimary = 0;
int numAssignedToSecondary = 0;
final BundleData bundleData = new BundleData(10, 1000);
final TimeAverageMessageData longTermMessageData = new TimeAverageMessageData(1000);
longTermMessageData.setMsgRateIn(1000);
bundleData.setLongTermData(longTermMessageData);
final String firstBundleDataPath = String.format("%s/%s", ModularLoadManagerImpl.BUNDLE_DATA_ZPATH, bundles[0]);
// Write long message rate for first bundle to ensure that even bundle distribution is not a coincidence of
// balancing by message rate. If we were balancing by message rate, one of the brokers should only have this
// one bundle.
ZkUtils.createFullPathOptimistic(pulsar1.getZkClient(), firstBundleDataPath, bundleData.getJsonBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
for (final NamespaceBundle bundle : bundles) {
if (primaryLoadManager.selectBrokerForAssignment(bundle).equals(primaryHost)) {
++numAssignedToPrimary;
} else {
++numAssignedToSecondary;
}
if ((numAssignedToPrimary + numAssignedToSecondary) % 2 == 0) {
// On even number of assignments, assert that an equal number of bundles have been assigned between
// them.
assert (numAssignedToPrimary == numAssignedToSecondary);
}
}
}
// Test that load shedding works
@Test
public void testLoadShedding() throws Exception {
......
......@@ -25,11 +25,9 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import org.apache.commons.lang3.SystemUtils;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
......@@ -42,9 +40,9 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import com.yahoo.pulsar.broker.loadbalance.impl.*;
import org.apache.bookkeeper.test.PortManager;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
......@@ -59,11 +57,19 @@ import com.google.common.collect.Sets;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.admin.AdminResource;
import com.yahoo.pulsar.broker.cache.ResourceQuotaCache;
import com.yahoo.pulsar.broker.loadbalance.impl.GenericBrokerHostUsageImpl;
import com.yahoo.pulsar.broker.loadbalance.impl.LinuxBrokerHostUsageImpl;
import com.yahoo.pulsar.broker.loadbalance.impl.PulsarLoadReportImpl;
import com.yahoo.pulsar.broker.loadbalance.impl.PulsarResourceDescription;
import com.yahoo.pulsar.broker.loadbalance.impl.ResourceAvailabilityRanker;
import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadCalculatorImpl;
import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import com.yahoo.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
import com.yahoo.pulsar.client.admin.BrokerStats;
import com.yahoo.pulsar.client.admin.PulsarAdmin;
import com.yahoo.pulsar.client.api.Authentication;
import com.yahoo.pulsar.client.api.ClientConfiguration;
import com.yahoo.pulsar.client.api.PulsarClient;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.policies.data.AutoFailoverPolicyData;
import com.yahoo.pulsar.common.policies.data.AutoFailoverPolicyType;
......@@ -99,6 +105,9 @@ public class SimpleLoadManagerImplTest {
BrokerStats brokerStatsClient1;
BrokerStats brokerStatsClient2;
String primaryHost;
String secondaryHost;
ExecutorService executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
......@@ -132,6 +141,8 @@ public class SimpleLoadManagerImplTest {
url1 = new URL("http://127.0.0.1" + ":" + PRIMARY_BROKER_WEBSERVICE_PORT);
admin1 = new PulsarAdmin(url1, (Authentication) null);
brokerStatsClient1 = admin1.brokerStats();
primaryHost = String.format("http://%s:%d", InetAddress.getLocalHost().getHostName(),
PRIMARY_BROKER_WEBSERVICE_PORT);
// Start broker 2
ServiceConfiguration config2 = new ServiceConfiguration();
......@@ -146,8 +157,8 @@ public class SimpleLoadManagerImplTest {
url2 = new URL("http://127.0.0.1" + ":" + SECONDARY_BROKER_WEBSERVICE_PORT);
admin2 = new PulsarAdmin(url2, (Authentication) null);
brokerStatsClient2 = admin2.brokerStats();
createNamespacePolicies(pulsar1);
secondaryHost = String.format("http://%s:%d", InetAddress.getLocalHost().getHostName(),
SECONDARY_BROKER_WEBSERVICE_PORT);
Thread.sleep(100);
}
......@@ -227,6 +238,7 @@ public class SimpleLoadManagerImplTest {
@Test(enabled = true)
public void testPrimary() throws Exception {
createNamespacePolicies(pulsar1);
LoadManager loadManager = new SimpleLoadManagerImpl(pulsar1);
PulsarResourceDescription rd = new PulsarResourceDescription();
rd.put("memory", new ResourceUsage(1024, 4096));
......@@ -266,6 +278,7 @@ public class SimpleLoadManagerImplTest {
@Test(enabled = false)
public void testPrimarySecondary() throws Exception {
createNamespacePolicies(pulsar1);
LocalZooKeeperCache mockCache = mock(LocalZooKeeperCache.class);
ZooKeeperChildrenCache zooKeeperChildrenCache = mock(ZooKeeperChildrenCache.class);
......@@ -280,8 +293,8 @@ public class SimpleLoadManagerImplTest {
LocalZooKeeperCache originalLZK1 = (LocalZooKeeperCache) zkCacheField.get(pulsar1);
LocalZooKeeperCache originalLZK2 = (LocalZooKeeperCache) zkCacheField.get(pulsar2);
log.info("lzk are {} 2: {}", originalLZK1.getChildren(SimpleLoadManagerImpl.LOADBALANCE_BROKERS_ROOT),
originalLZK2.getChildren(SimpleLoadManagerImpl.LOADBALANCE_BROKERS_ROOT));
log.info("lzk are {} 2: {}", originalLZK1.getChildren(SimpleLoadManagerImpl.LOADBALANCE_BROKERS_ROOT),
originalLZK2.getChildren(SimpleLoadManagerImpl.LOADBALANCE_BROKERS_ROOT));
zkCacheField.set(pulsar1, mockCache);
LocalZooKeeperCache newZk = (LocalZooKeeperCache) pulsar1.getLocalZkCache();
......@@ -290,8 +303,8 @@ public class SimpleLoadManagerImplTest {
ZooKeeperChildrenCache availableActiveBrokers = new ZooKeeperChildrenCache(pulsar1.getLocalZkCache(),
SimpleLoadManagerImpl.LOADBALANCE_BROKERS_ROOT);
log.info("lzk mocked active brokers are {}",
availableActiveBrokers.get(SimpleLoadManagerImpl.LOADBALANCE_BROKERS_ROOT));
log.info("lzk mocked active brokers are {}",
availableActiveBrokers.get(SimpleLoadManagerImpl.LOADBALANCE_BROKERS_ROOT));
LoadManager loadManager = new SimpleLoadManagerImpl(pulsar1);
......@@ -411,6 +424,37 @@ public class SimpleLoadManagerImplTest {
verify(loadManager, atLeastOnce()).doLoadShedding();
}
// Test that bundles belonging to the same namespace are evenly distributed.
@Test
public void testEvenBundleDistribution() throws Exception {
final NamespaceBundle[] bundles = LoadBalancerTestingUtils
.makeBundles(pulsar1.getNamespaceService().getNamespaceBundleFactory(), "pulsar", "use", "test", 16);
final ResourceQuota quota = new ResourceQuota();
final String quotaZPath = String.format("%s/%s/%s", ResourceQuotaCache.RESOURCE_QUOTA_ROOT, "namespace",
bundles[0]);
// Create high message rate quota for the first bundle to make it unlikely to be a coincidence of even
// distribution.
ZkUtils.createFullPathOptimistic(pulsar1.getZkClient(), quotaZPath,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(quota), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
int numAssignedToPrimary = 0;
int numAssignedToSecondary = 0;
pulsar1.getConfiguration().setLoadBalancerPlacementStrategy(SimpleLoadManagerImpl.LOADBALANCER_STRATEGY_LLS);
final SimpleLoadManagerImpl loadManager = (SimpleLoadManagerImpl) pulsar1.getLoadManager().get();
for (final NamespaceBundle bundle : bundles) {
if (loadManager.getLeastLoaded(bundle).getResourceId().equals(primaryHost)) {
++numAssignedToPrimary;
} else {
++numAssignedToSecondary;
}
// Check that number of assigned bundles are equivalent when an even number have been assigned.
if ((numAssignedToPrimary + numAssignedToSecondary) % 2 == 0) {
assert (numAssignedToPrimary == numAssignedToSecondary);
}
}
}
@Test
public void testNamespaceBundleStats() {
NamespaceBundleStats nsb1 = new NamespaceBundleStats();
......@@ -457,7 +501,6 @@ public class SimpleLoadManagerImplTest {
task1.run();
verify(loadManager, times(1)).writeResourceQuotasToZooKeeper();
LoadSheddingTask task2 = new LoadSheddingTask(atomicLoadManager);
task2.run();
verify(loadManager, times(1)).doLoadShedding();
......
......@@ -199,6 +199,13 @@ public class ResourceUnitRanking implements Comparable<ResourceUnitRanking> {
return this.preAllocatedBundles;
}
/**
* Get the loaded bundles.
*/
public Set<String> getLoadedBundles() {
return loadedBundles;
}
/**
* Get the estimated load percentage
*/
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册