提交 398bb6a7 编写于 作者: B bobbeyreese 提交者: Rajan

Introduce Interface for Customizing Update Conditions (#347)

* Allow customizable ZK update conditions

* Add copyright

* Add unit test, put update condition directly into load manager
上级 fa3cc70f
......@@ -110,17 +110,34 @@ public class LocalBrokerData extends JSONWritable implements ServiceLookupData {
updateSystemResourceUsage(systemResourceUsage);
updateBundleData(bundleStats);
lastStats = bundleStats;
lastUpdate = System.currentTimeMillis();
}
// Set the cpu, memory, and direct memory to that of the new system resource
// usage data.
/**
* Using another LocalBrokerData, update this.
*
* @param other
* LocalBrokerData to update from.
*/
public void update(final LocalBrokerData other) {
updateSystemResourceUsage(other.cpu, other.memory, other.directMemory, other.bandwidthIn, other.bandwidthOut);
updateBundleData(other.lastStats);
lastStats = other.lastStats;
}
// Set the cpu, memory, and direct memory to that of the new system resource usage data.
private void updateSystemResourceUsage(final SystemResourceUsage systemResourceUsage) {
this.cpu = systemResourceUsage.cpu;
this.memory = systemResourceUsage.memory;
this.directMemory = systemResourceUsage.directMemory;
this.bandwidthIn = systemResourceUsage.bandwidthIn;
this.bandwidthOut = systemResourceUsage.bandwidthOut;
updateSystemResourceUsage(systemResourceUsage.cpu, systemResourceUsage.memory, systemResourceUsage.directMemory,
systemResourceUsage.bandwidthIn, systemResourceUsage.bandwidthOut);
}
// Update resource usage given each individual usage.
private void updateSystemResourceUsage(final ResourceUsage cpu, final ResourceUsage memory,
final ResourceUsage directMemory, final ResourceUsage bandwidthIn, final ResourceUsage bandwidthOut) {
this.cpu = cpu;
this.memory = memory;
this.directMemory = directMemory;
this.bandwidthIn = bandwidthIn;
this.bandwidthOut = bandwidthOut;
}
// Aggregate all message, throughput, topic count, bundle count, consumer
......@@ -135,8 +152,6 @@ public class LocalBrokerData extends JSONWritable implements ServiceLookupData {
int totalNumBundles = 0;
int totalNumConsumers = 0;
int totalNumProducers = 0;
lastBundleGains.clear();
lastBundleLosses.clear();
final Iterator<String> oldBundleIterator = bundles.iterator();
while (oldBundleIterator.hasNext()) {
final String bundle = oldBundleIterator.next();
......
......@@ -129,6 +129,9 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
// Timestamp of last invocation of updateBundleData.
private long lastBundleDataUpdate;
// LocalBrokerData available before most recent update.
private LocalBrokerData lastData;
// Pipeline used to determine what namespaces, if any, should be unloaded.
private final List<LoadSheddingStrategy> loadSheddingPipeline;
......@@ -236,6 +239,8 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
defaultStats.msgRateIn = DEFAULT_MESSAGE_RATE;
defaultStats.msgRateOut = DEFAULT_MESSAGE_RATE;
lastData = new LocalBrokerData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
localData = new LocalBrokerData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
placementStrategy = ModularLoadManagerStrategy.create(conf);
......@@ -321,18 +326,41 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
return ObjectMapperFactory.getThreadLocal().readValue(data, clazz);
}
// Determine if the broker data requires an update by measuring the time
// past since the last update.
private boolean needBrokerDataUpdate() {
return System.currentTimeMillis() > localData.getLastUpdate()
+ TimeUnit.MINUTES.toMillis(conf.getLoadBalancerReportUpdateMaxIntervalMinutes());
private double percentChange(final double oldValue, final double newValue) {
if (oldValue == 0) {
if (newValue == 0) {
// Avoid NaN
return 0;
}
return Double.POSITIVE_INFINITY;
}
return 100 * Math.abs((oldValue - newValue) / oldValue);
}
// Determine if the bundle data requires an update by measuring the time
// past since the last update.
private boolean needBundleDataUpdate() {
return System.currentTimeMillis() > lastBundleDataUpdate
+ TimeUnit.MINUTES.toMillis(conf.getLoadBalancerResourceQuotaUpdateIntervalMinutes());
// Determine if the broker data requires an update by delegating to the update condition.
private boolean needBrokerDataUpdate() {
final long updateMaxIntervalMillis = TimeUnit.MINUTES
.toMillis(conf.getLoadBalancerReportUpdateMaxIntervalMinutes());
if (System.currentTimeMillis() - localData.getLastUpdate() > updateMaxIntervalMillis) {
log.info("Writing local data to ZooKeeper because time since last update exceeded threshold of {} minutes",
conf.getLoadBalancerReportUpdateMaxIntervalMinutes());
// Always update after surpassing the maximum interval.
return true;
}
final double maxChange = Math
.max(percentChange(lastData.getMaxResourceUsage(), localData.getMaxResourceUsage()),
Math.max(percentChange(lastData.getMsgRateIn() + lastData.getMsgRateOut(),
localData.getMsgRateIn() + localData.getMsgRateOut()),
Math.max(
percentChange(lastData.getMsgThroughputIn() + lastData.getMsgThroughputOut(),
localData.getMsgThroughputIn() + localData.getMsgThroughputOut()),
percentChange(lastData.getNumBundles(), localData.getNumBundles()))));
if (maxChange > conf.getLoadBalancerReportUpdateThresholdPercentage()) {
log.info("Writing local data to ZooKeeper because maximum change {}% exceeded threshold {}%", maxChange,
conf.getLoadBalancerReportUpdateThresholdPercentage());
return true;
}
return false;
}
// Update both the broker data and the bundle data.
......@@ -493,13 +521,14 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
@Override
public String selectBrokerForAssignment(final ServiceUnitId serviceUnit) {
// Use brokerCandidateCache as a lock to reduce synchronization.
synchronized(brokerCandidateCache) {
synchronized (brokerCandidateCache) {
final String bundle = serviceUnit.toString();
if (preallocatedBundleToBroker.containsKey(bundle)) {
// If the given bundle is already in preallocated, return the selected broker.
return preallocatedBundleToBroker.get(bundle);
}
final BundleData data = loadData.getBundleData().computeIfAbsent(bundle, key -> getBundleDataOrDefault(bundle));
final BundleData data = loadData.getBundleData().computeIfAbsent(bundle,
key -> getBundleDataOrDefault(bundle));
brokerCandidateCache.clear();
Set<String> activeBrokers;
try {
......@@ -599,9 +628,17 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
@Override
public void writeBrokerDataOnZooKeeper() {
try {
if (needBrokerDataUpdate()) {
updateLocalBrokerData();
if (needBrokerDataUpdate()) {
localData.setLastUpdate(System.currentTimeMillis());
zkClient.setData(brokerZnodePath, localData.getJsonBytes(), -1);
// Clear deltas.
localData.getLastBundleGains().clear();
localData.getLastBundleLosses().clear();
// Update previous data.
lastData.update(localData);
}
} catch (Exception e) {
log.warn("Error writing broker data on ZooKeeper: {}", e);
......@@ -618,7 +655,6 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
*/
@Override
public void writeBundleDataOnZooKeeper() {
if (needBundleDataUpdate()) {
updateBundleData();
// Write the bundle data to ZooKeeper.
for (Map.Entry<String, BundleData> entry : loadData.getBundleData().entrySet()) {
......@@ -645,6 +681,4 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
}
}
}
}
}
......@@ -23,6 +23,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.bookkeeper.test.PortManager;
import org.slf4j.Logger;
......@@ -34,6 +35,7 @@ import org.testng.annotations.Test;
import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.google.common.hash.Hashing;
import com.yahoo.pulsar.broker.LocalBrokerData;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
......@@ -44,6 +46,7 @@ import com.yahoo.pulsar.common.naming.NamespaceBundleFactory;
import com.yahoo.pulsar.common.naming.NamespaceBundles;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ResourceUsage;
import com.yahoo.pulsar.zookeeper.LocalBookkeeperEnsemble;
public class ModularLoadManagerImplTest {
......@@ -204,4 +207,78 @@ public class ModularLoadManagerImplTest {
assert (primaryLoadManager.selectBrokerForAssignment(serviceUnit).equals(primaryHost));
}
}
// Test that ModularLoadManagerImpl will determine that writing local data to ZooKeeper is necessary if certain
// metrics change by a percentage threshold.
@Test
public void testNeedBrokerDataUpdate() throws Exception {
final LocalBrokerData lastData = new LocalBrokerData();
final LocalBrokerData currentData = new LocalBrokerData();
final ServiceConfiguration conf = pulsar1.getConfiguration();
// Set this manually in case the default changes.
conf.setLoadBalancerReportUpdateThresholdPercentage(5);
// Easier to test using an uninitialized ModularLoadManagerImpl.
final ModularLoadManagerImpl loadManager = new ModularLoadManagerImpl();
setField(loadManager, "lastData", lastData);
setField(loadManager, "localData", currentData);
setField(loadManager, "conf", conf);
Supplier<Boolean> needUpdate = () -> {
try {
return (Boolean) invokeSimpleMethod(loadManager, "needBrokerDataUpdate");
} catch (Exception e) {
throw new RuntimeException(e);
}
};
lastData.setMsgRateIn(100);
currentData.setMsgRateIn(104);
// 4% difference: shouldn't trigger an update.
assert (!needUpdate.get());
currentData.setMsgRateIn(105.1);
// 5% difference: should trigger an update (exactly 5% is flaky due to precision).
assert (needUpdate.get());
// Do similar tests for lower values.
currentData.setMsgRateIn(94);
assert (needUpdate.get());
currentData.setMsgRateIn(95.1);
assert (!needUpdate.get());
// 0 to non-zero should always trigger an update.
lastData.setMsgRateIn(0);
currentData.setMsgRateIn(1e-8);
assert (needUpdate.get());
// non-zero to zero should trigger an update as long as the threshold is less than 100.
lastData.setMsgRateIn(1e-8);
currentData.setMsgRateIn(0);
assert (needUpdate.get());
// zero to zero should never trigger an update.
currentData.setMsgRateIn(0);
lastData.setMsgRateIn(0);
assert (!needUpdate.get());
// Minimally test other values to ensure they are included.
lastData.getCpu().usage = 100;
lastData.getCpu().limit = 1000;
currentData.getCpu().usage = 106;
currentData.getCpu().limit = 1000;
assert (needUpdate.get());
lastData.setCpu(new ResourceUsage());
currentData.setCpu(new ResourceUsage());
lastData.setMsgThroughputIn(100);
currentData.setMsgThroughputIn(106);
assert (needUpdate.get());
currentData.setMsgThroughputIn(100);
lastData.setNumBundles(100);
currentData.setNumBundles(106);
assert (needUpdate.get());
currentData.setNumBundles(100);
assert (!needUpdate.get());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册