提交 f9993942 编写于 作者: S sschepens 提交者: Matteo Merli

Implement BrokerHostUsage using java (#88)

上级 5e9884c7
......@@ -212,10 +212,7 @@ loadBalancerReportUpdateThresholdPercentage=10
# maximum interval to update load report
loadBalancerReportUpdateMaxIntervalMinutes=15
# Path for the script used to retrieve system usage
loadBalancerHostUsageScriptPath=
# Frequency of sar report to collect
# Frequency of report to collect
loadBalancerHostUsageCheckIntervalMinutes=1
# Load shedding interval. Broker periodically checks whether some traffic should be offload from
......
......@@ -185,10 +185,7 @@ loadBalancerReportUpdateThresholdPercentage=10
# maximum interval to update load report
loadBalancerReportUpdateMaxIntervalMinutes=15
# Path for the script used to retrieve system usage
loadBalancerHostUsageScriptPath=
# Frequency of sar report to collect
# Frequency of report to collect
loadBalancerHostUsageCheckIntervalMinutes=1
# Load shedding interval. Broker periodically checks whether some traffic should be offload from
......
......@@ -186,10 +186,7 @@ public class ServiceConfiguration {
private int loadBalancerReportUpdateThresholdPercentage = 10;
// maximum interval to update load report
private int loadBalancerReportUpdateMaxIntervalMinutes = 15;
// Path for the script used to retrieve system usage
@FieldContext(required = false)
private String loadBalancerHostUsageScriptPath;
// Frequency of sar report to collect
// Frequency of report to collect
private int loadBalancerHostUsageCheckIntervalMinutes = 1;
// Load shedding interval. Broker periodically checks whether some traffic
// should be offload from
......@@ -715,14 +712,6 @@ public class ServiceConfiguration {
this.loadBalancerReportUpdateMaxIntervalMinutes = loadBalancerReportUpdateMaxIntervalMinutes;
}
public String getLoadBalancerHostUsageScriptPath() {
return loadBalancerHostUsageScriptPath;
}
public void setLoadBalancerHostUsageScriptPath(String loadBalancerHostUsageScriptPath) {
this.loadBalancerHostUsageScriptPath = loadBalancerHostUsageScriptPath;
}
public int getLoadBalancerHostUsageCheckIntervalMinutes() {
return loadBalancerHostUsageCheckIntervalMinutes;
}
......
......@@ -128,6 +128,7 @@ public class PulsarService implements AutoCloseable {
this.brokerServiceUrlTls = brokerUrlTls(config);
this.config = config;
this.shutdownService = new MessagingServiceShutdownHook(this);
loadManagerExecutor = Executors.newSingleThreadScheduledExecutor();
}
/**
......@@ -233,11 +234,11 @@ public class PulsarService implements AutoCloseable {
managedLedgerClientFactory = new ManagedLedgerClientFactory(config, getZkClient(),
getBookKeeperClientFactory());
this.brokerService = new BrokerService(this);
// Start load management service (even if load balancing is disabled)
this.loadManager = new SimpleLoadManagerImpl(this);
this.brokerService = new BrokerService(this);
this.startLoadManagementService();
// needs load management service
......@@ -400,11 +401,6 @@ public class PulsarService implements AutoCloseable {
this.loadManager.start();
if (config.isLoadBalancerEnabled()) {
if (loadManagerExecutor == null) {
loadManagerExecutor = Executors.newSingleThreadScheduledExecutor();
;
}
LOG.info("Starting load balancer");
if (this.loadReportTask == null) {
long loadReportMinInterval = SimpleLoadManagerImpl.LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL;
......@@ -533,6 +529,10 @@ public class PulsarService implements AutoCloseable {
return executor;
}
public ScheduledExecutorService getLoadManagerExecutor() {
return loadManagerExecutor;
}
public OrderedSafeExecutor getOrderedExecutor() {
return orderedExecutor;
}
......
......@@ -16,80 +16,18 @@
package com.yahoo.pulsar.broker.loadbalance;
import java.io.IOException;
import java.io.StringWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.utils.CmdUtility;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
/**
* Class that will return the broker host usage.
*
*
*/
public class BrokerHostUsage {
// The interval for host usage check command
private final int hostUsageCheckInterval;
// Path to the pulsar-broker-host-usage script
private final String usageScriptPath;
private static final Logger LOG = LoggerFactory.getLogger(BrokerHostUsage.class);
public BrokerHostUsage(PulsarService pulsar) {
this.usageScriptPath = pulsar.getConfiguration().getLoadBalancerHostUsageScriptPath();
this.hostUsageCheckInterval = pulsar.getConfiguration().getLoadBalancerHostUsageCheckIntervalMinutes();
}
public interface BrokerHostUsage {
/**
* Returns the host usage information in the following format -
*
* <pre>
* {
* "bandwidthIn" : {
* "usage" : "100",
* "limit" : "1000",
* },
* "bandwidthOut" : {
* "usage" : "659",
* "limit" : "1000",
* },
* "memory" : {
* "usage" : "16.0",
* "limit" : "16070",
* }
* "cpu-utilization" : {
* "usage" : "160.0"
* "limit" : "1600"
* }
* }
* </pre>
*
* @return Broker host usage in the json string format
*
* @throws IOException
*/
public String getBrokerHostUsage() throws IOException {
StringWriter writer = new StringWriter();
try {
/**
* Spawns a python process and runs the usage exporter script. The script return the machine information in
* the json format.
*/
int exitCode = CmdUtility.exec(writer, usageScriptPath, "--host-usage-check-interval",
Integer.toString(hostUsageCheckInterval));
if (exitCode != 0) {
LOG.warn("Process exited with non-zero exit code - [{}], stderr - [{}] ", exitCode, writer.toString());
throw new IOException(writer.toString());
}
} catch (IOException e) {
e.printStackTrace();
LOG.warn("Error running the usage script {}", e.getMessage());
throw e;
}
return writer.toString();
}
SystemResourceUsage getBrokerHostUsage();
}
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.broker.loadbalance.impl;
import com.sun.management.OperatingSystemMXBean;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.loadbalance.BrokerHostUsage;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ResourceUsage;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.concurrent.TimeUnit;
/**
* Class that will return the broker host usage.
*/
public class GenericBrokerHostUsageImpl implements BrokerHostUsage {
// The interval for host usage check command
private static final int CPU_CHECK_MILLIS = 1000;
private static final Logger LOG = LoggerFactory.getLogger(GenericBrokerHostUsageImpl.class);
private final int hostUsageCheckIntervalMin;
private long lastCollection;
private double totalCpuLimit;
private double cpuUsageSum = 0d;
private int cpuUsageCount = 0;
private OperatingSystemMXBean systemBean;
private SystemResourceUsage usage;
public GenericBrokerHostUsageImpl(PulsarService pulsar) {
this.hostUsageCheckIntervalMin = pulsar.getConfiguration().getLoadBalancerHostUsageCheckIntervalMinutes();
this.systemBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
this.lastCollection = 0L;
this.usage = new SystemResourceUsage();
this.totalCpuLimit = getTotalCpuLimit();
pulsar.getLoadManagerExecutor().scheduleAtFixedRate(this::checkCpuLoad, 0, CPU_CHECK_MILLIS, TimeUnit.MILLISECONDS);
pulsar.getLoadManagerExecutor().scheduleAtFixedRate(this::calculateBrokerHostUsage, 0, hostUsageCheckIntervalMin, TimeUnit.MINUTES);
}
@Override
public SystemResourceUsage getBrokerHostUsage() {
return usage;
}
private void checkCpuLoad() {
cpuUsageSum += systemBean.getSystemCpuLoad();
cpuUsageCount++;
}
private void calculateBrokerHostUsage() {
SystemResourceUsage usage = new SystemResourceUsage();
usage.setCpu(getCpuUsage());
usage.setMemory(getMemUsage());
this.usage = usage;
}
private double getTotalCpuLimit() {
return (double) (100 * Runtime.getRuntime().availableProcessors());
}
private double getTotalCpuUsage() {
double cpuUsage = cpuUsageSum / cpuUsageCount;
cpuUsageSum = 0d;
cpuUsageCount = 0;
return cpuUsage;
}
private ResourceUsage getCpuUsage() {
if (cpuUsageCount == 0) {
return new ResourceUsage(0, totalCpuLimit);
}
return new ResourceUsage(getTotalCpuUsage() * totalCpuLimit, totalCpuLimit);
}
private ResourceUsage getMemUsage() {
double total = ((double) systemBean.getTotalPhysicalMemorySize()) / (1024 * 1024);
double free = ((double) systemBean.getFreePhysicalMemorySize()) / (1024 * 1024);
return new ResourceUsage(total - free, total);
}
}
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.broker.loadbalance.impl;
import com.sun.management.OperatingSystemMXBean;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.loadbalance.BrokerHostUsage;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ResourceUsage;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* Class that will return the broker host usage.
*
*
*/
public class LinuxBrokerHostUsageImpl implements BrokerHostUsage {
// The interval for host usage check command
private final int hostUsageCheckInterval;
private long lastCollection;
private double lastTotalNicUsageTx;
private double lastTotalNicUsageRx;
private CpuStat lastCpuStat;
private OperatingSystemMXBean systemBean;
private SystemResourceUsage usage;
private static final Logger LOG = LoggerFactory.getLogger(LinuxBrokerHostUsageImpl.class);
public LinuxBrokerHostUsageImpl(PulsarService pulsar) {
this.hostUsageCheckInterval = pulsar.getConfiguration().getLoadBalancerHostUsageCheckIntervalMinutes();
this.systemBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
this.lastCollection = 0L;
this.usage = new SystemResourceUsage();
pulsar.getLoadManagerExecutor().scheduleAtFixedRate(this::calculateBrokerHostUsage, 0, hostUsageCheckInterval, TimeUnit.SECONDS);
}
@Override
public SystemResourceUsage getBrokerHostUsage() {
return usage;
}
private void calculateBrokerHostUsage() {
List<String> nics = getNics();
double totalNicLimit = getTotalNicLimitKbps(nics);
double totalNicUsageTx = getTotalNicUsageTxKb(nics);
double totalNicUsageRx = getTotalNicUsageRxKb(nics);
double totalCpuLimit = getTotalCpuLimit();
CpuStat cpuStat = getTotalCpuUsage();
SystemResourceUsage usage = new SystemResourceUsage();
long now = System.currentTimeMillis();
if (lastCollection == 0L) {
usage.setMemory(getMemUsage());
usage.setBandwidthIn(new ResourceUsage(0d, totalNicLimit));
usage.setBandwidthOut(new ResourceUsage(0d, totalNicLimit));
usage.setCpu(new ResourceUsage(0d, totalCpuLimit));
} else {
double elapsedSeconds = (now - lastCollection) / 1000d;
double nicUsageTx = (totalNicUsageTx - lastTotalNicUsageTx) / elapsedSeconds;
double nicUsageRx = (totalNicUsageRx - lastTotalNicUsageRx) / elapsedSeconds;
if (cpuStat != null && lastCpuStat != null) {
// we need two non null stats to get a usage report
long cpuTimeDiff = cpuStat.getTotalTime() - lastCpuStat.getTotalTime();
long cpuUsageDiff = cpuStat.getUsage() - lastCpuStat.getUsage();
double cpuUsage = ((double) cpuUsageDiff / (double) cpuTimeDiff) * totalCpuLimit;
usage.setCpu(new ResourceUsage(cpuUsage, totalCpuLimit));
}
usage.setMemory(getMemUsage());
usage.setBandwidthIn(new ResourceUsage(nicUsageRx, totalNicLimit));
usage.setBandwidthOut(new ResourceUsage(nicUsageTx, totalNicLimit));
}
lastTotalNicUsageTx = totalNicUsageTx;
lastTotalNicUsageRx = totalNicUsageRx;
lastCpuStat = cpuStat;
lastCollection = System.currentTimeMillis();
this.usage = usage;
}
private double getTotalCpuLimit() {
return (double) (100 * Runtime.getRuntime().availableProcessors());
}
/**
* Reads first line of /proc/stat to get total cpu usage.
* <pre>
* cpu user nice system idle iowait irq softirq steal guest guest_nice
* cpu 317808 128 58637 2503692 7634 0 13472 0 0 0
* </pre>
* Line is split in "words", filtering the first.
* The sum of all numbers give the amount of cpu cycles used this far.
* Real CPU usage should equal the sum substracting the idle cycles,
* this would include iowait, irq and steal.
*/
private CpuStat getTotalCpuUsage() {
try {
String[] words = Files.lines(Paths.get("/proc/stat"))
.findFirst()
.get().split("\\s+");
long total = Arrays.stream(words)
.filter(s -> !s.contains("cpu"))
.mapToLong(Long::parseLong)
.sum();
long idle = Long.parseLong(words[4]);
return new CpuStat(total, total - idle);
} catch (IOException e) {
LOG.error("Failed to read CPU usage from /proc/stat", e);
return null;
}
}
private ResourceUsage getMemUsage() {
double total = ((double) systemBean.getTotalPhysicalMemorySize()) / (1024 * 1024);
double free = ((double) systemBean.getFreePhysicalMemorySize()) / (1024 * 1024);
return new ResourceUsage(total - free, total);
}
private List<String> getNics() {
try {
return Files.list(Paths.get("/sys/class/net/"))
.filter(this::isPhysicalNic)
.map(path -> path.getFileName().toString())
.collect(Collectors.toList());
} catch (IOException e) {
LOG.error("Failed to find NICs", e);
return Collections.emptyList();
}
}
private boolean isPhysicalNic(Path path) {
try {
if (!Files.readSymbolicLink(path).toString().contains("/virtual/")) {
try {
Files.readAllBytes(path.resolve("speed"));
return true;
} catch (Exception e) {
// wireless nics don't report speed, ignore them.
return false;
}
}
return false;
} catch (IOException e) {
LOG.error("Failed to read link target for NIC " + path, e);
return false;
}
}
private Path getNicSpeedPath(String nic) {
return Paths.get(String.format("/sys/class/net/%s/speed", nic));
}
private double getTotalNicLimitKbps(List<String> nics) {
// Nic speed is in Mbits/s, return kbits/s
return nics.stream().mapToDouble(s -> {
try {
return Double.parseDouble(new String(Files.readAllBytes(getNicSpeedPath(s))));
} catch (IOException e) {
LOG.error("Failed to read speed for nic " + s, e);
return 0d;
}
}).sum() * 1024;
}
private Path getNicTxPath(String nic) {
return Paths.get(String.format("/sys/class/net/%s/statistics/tx_bytes", nic));
}
private Path getNicRxPath(String nic) {
return Paths.get(String.format("/sys/class/net/%s/statistics/rx_bytes", nic));
}
private double getTotalNicUsageRxKb(List<String> nics) {
return nics.stream().mapToDouble(s -> {
try {
return Double.parseDouble(new String(Files.readAllBytes(getNicRxPath(s))));
} catch (IOException e) {
LOG.error("Failed to read rx_bytes for NIC " + s, e);
return 0d;
}
}).sum() * 8 / 1024;
}
private double getTotalNicUsageTxKb(List<String> nics) {
return nics.stream().mapToDouble(s -> {
try {
return Double.parseDouble(new String(Files.readAllBytes(getNicTxPath(s))));
} catch (IOException e) {
LOG.error("Failed to read tx_bytes for NIC " + s, e);
return 0d;
}
}).sum() * 8 / 1024;
}
private class CpuStat {
private long totalTime;
private long usage;
CpuStat(long totalTime, long usage) {
this.totalTime = totalTime;
this.usage = usage;
}
long getTotalTime() {
return totalTime;
}
long getUsage() {
return usage;
}
}
}
......@@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
......@@ -176,8 +177,12 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
this.realtimeAvgResourceQuota = new ResourceQuota();
placementStrategy = new WRRPlacementStrategy();
lastLoadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
brokerHostUsage = new BrokerHostUsage(pulsar);
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
if (SystemUtils.IS_OS_LINUX) {
brokerHostUsage = new LinuxBrokerHostUsageImpl(pulsar);
} else {
brokerHostUsage = new GenericBrokerHostUsageImpl(pulsar);
}
loadReportCacheZk = new ZooKeeperDataCache<LoadReport>(pulsar.getLocalZkCache()) {
@Override
public LoadReport deserialize(String key, byte[] content) throws Exception {
......@@ -200,17 +205,17 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
public PulsarAdmin load(String key) throws Exception {
// key - broker name already is valid URL, has prefix "http://"
return new PulsarAdmin(new URL(key), pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
}
});
int entryExpiryTime = (int) pulsar.getConfiguration().getLoadBalancerSheddingGracePeriodMinutes();
unloadedHotNamespaceCache = CacheBuilder.newBuilder().expireAfterWrite(entryExpiryTime, TimeUnit.MINUTES)
.build(new CacheLoader<String, Long>() {
@Override
public Long load(String key) throws Exception {
return System.currentTimeMillis();
}
});
.build(new CacheLoader<String, Long>() {
@Override
public Long load(String key) throws Exception {
return System.currentTimeMillis();
}
});
availableActiveBrokers = new ZooKeeperChildrenCache(pulsar.getLocalZkCache(), LOADBALANCE_BROKERS_ROOT);
availableActiveBrokers.registerListener(new ZooKeeperCacheListener<Set<String>>() {
@Override
......@@ -233,7 +238,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
if (pulsar.getZkClient().exists(LOADBALANCE_BROKERS_ROOT, false) == null) {
try {
ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), LOADBALANCE_BROKERS_ROOT, new byte[0],
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException e) {
// ignore the exception, node might be present already
}
......@@ -254,7 +259,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
}
try {
ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), brokerZnodePath,
loadReportJson.getBytes(Charsets.UTF_8), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
loadReportJson.getBytes(Charsets.UTF_8), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (Exception e) {
// Catching excption here to print the right error message
log.error("Unable to create znode - [{}] for load balance on zookeeper ", brokerZnodePath, e);
......@@ -268,11 +273,11 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
this.realtimeAvgResourceQuota = pulsar.getLocalZkCacheService().getResourceQuotaCache().getDefaultQuota();
this.lastResourceQuotaUpdateTimestamp = System.currentTimeMillis();
this.realtimeCpuLoadFactor = getDynamicConfigurationDouble(
LOADBALANCER_DYNAMIC_SETTING_LOAD_FACTOR_CPU_ZPATH, SETTING_NAME_LOAD_FACTOR_CPU,
this.realtimeCpuLoadFactor);
LOADBALANCER_DYNAMIC_SETTING_LOAD_FACTOR_CPU_ZPATH, SETTING_NAME_LOAD_FACTOR_CPU,
this.realtimeCpuLoadFactor);
this.realtimeMemoryLoadFactor = getDynamicConfigurationDouble(
LOADBALANCER_DYNAMIC_SETTING_LOAD_FACTOR_MEM_ZPATH, SETTING_NAME_LOAD_FACTOR_MEM,
this.realtimeMemoryLoadFactor);
LOADBALANCER_DYNAMIC_SETTING_LOAD_FACTOR_MEM_ZPATH, SETTING_NAME_LOAD_FACTOR_MEM,
this.realtimeMemoryLoadFactor);
} catch (Exception e) {
log.error("Unable to create znode - [{}] for load balance on zookeeper ", brokerZnodePath, e);
throw new PulsarServerException(e);
......@@ -301,7 +306,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
pulsar.getZkClient().setData(zkPath, settingBytes, -1);
} else {
ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), zkPath, settingBytes, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
CreateMode.PERSISTENT);
}
} catch (Exception e) {
log.warn("Got exception when writing to ZooKeeper path [{}]:", zkPath, e);
......@@ -346,7 +351,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
private String getLoadBalancerPlacementStrategy() {
String strategy = this.getDynamicConfigurationFromZK(LOADBALANCER_DYNAMIC_SETTING_STRATEGY_ZPATH,
SETTING_NAME_STRATEGY, pulsar.getConfiguration().getLoadBalancerPlacementStrategy());
SETTING_NAME_STRATEGY, pulsar.getConfiguration().getLoadBalancerPlacementStrategy());
if (!LOADBALANCER_STRATEGY_LLS.equals(strategy) && !LOADBALANCER_STRATEGY_RAND.equals(strategy)) {
strategy = LOADBALANCER_STRATEGY_RAND;
}
......@@ -355,12 +360,12 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
private double getCpuLoadFactorFromZK(double defaultValue) {
return getDynamicConfigurationDouble(LOADBALANCER_DYNAMIC_SETTING_LOAD_FACTOR_CPU_ZPATH,
SETTING_NAME_LOAD_FACTOR_CPU, defaultValue);
SETTING_NAME_LOAD_FACTOR_CPU, defaultValue);
}
private double getMemoryLoadFactorFromZK(double defaultValue) {
return getDynamicConfigurationDouble(LOADBALANCER_DYNAMIC_SETTING_LOAD_FACTOR_MEM_ZPATH,
SETTING_NAME_LOAD_FACTOR_MEM, defaultValue);
SETTING_NAME_LOAD_FACTOR_MEM, defaultValue);
}
@Override
......@@ -371,26 +376,26 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
private long getLoadBalancerBrokerUnderloadedThresholdPercentage() {
return (long) this.getDynamicConfigurationDouble(LOADBALANCER_DYNAMIC_SETTING_UNDERLOAD_THRESHOLD_ZPATH,
SETTING_NAME_UNDERLOAD_THRESHOLD,
pulsar.getConfiguration().getLoadBalancerBrokerUnderloadedThresholdPercentage());
SETTING_NAME_UNDERLOAD_THRESHOLD,
pulsar.getConfiguration().getLoadBalancerBrokerUnderloadedThresholdPercentage());
}
private long getLoadBalancerBrokerOverloadedThresholdPercentage() {
return (long) this.getDynamicConfigurationDouble(LOADBALANCER_DYNAMIC_SETTING_OVERLOAD_THRESHOLD_ZPATH,
SETTING_NAME_OVERLOAD_THRESHOLD,
pulsar.getConfiguration().getLoadBalancerBrokerOverloadedThresholdPercentage());
SETTING_NAME_OVERLOAD_THRESHOLD,
pulsar.getConfiguration().getLoadBalancerBrokerOverloadedThresholdPercentage());
}
private long getLoadBalancerBrokerComfortLoadThresholdPercentage() {
return (long) this.getDynamicConfigurationDouble(LOADBALANCER_DYNAMIC_SETTING_COMFORT_LOAD_THRESHOLD_ZPATH,
SETTING_NAME_COMFORTLOAD_THRESHOLD,
pulsar.getConfiguration().getLoadBalancerBrokerComfortLoadLevelPercentage());
SETTING_NAME_COMFORTLOAD_THRESHOLD,
pulsar.getConfiguration().getLoadBalancerBrokerComfortLoadLevelPercentage());
}
private boolean getLoadBalancerAutoBundleSplitEnabled() {
return this.getDynamicConfigurationBoolean(LOADBALANCER_DYNAMIC_SETTING_AUTO_BUNDLE_SPLIT_ENABLED,
SETTING_NAME_AUTO_BUNDLE_SPLIT_ENABLED,
pulsar.getConfiguration().getLoadBalancerAutoBundleSplitEnabled());
SETTING_NAME_AUTO_BUNDLE_SPLIT_ENABLED,
pulsar.getConfiguration().getLoadBalancerAutoBundleSplitEnabled());
}
/*
......@@ -451,19 +456,19 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
}
private ResourceQuota timeSmoothQuota(ResourceQuota oldQuota, double msgRateIn, double msgRateOut,
double bandwidthIn, double bandwidthOut, double memory, long timePast) {
double bandwidthIn, double bandwidthOut, double memory, long timePast) {
if (oldQuota.getDynamic()) {
ResourceQuota newQuota = new ResourceQuota();
newQuota.setMsgRateIn(timeSmoothValue(oldQuota.getMsgRateIn(), msgRateIn, RESOURCE_QUOTA_MIN_MSGRATE_IN,
RESOURCE_QUOTA_MAX_MSGRATE_IN, timePast));
RESOURCE_QUOTA_MAX_MSGRATE_IN, timePast));
newQuota.setMsgRateOut(timeSmoothValue(oldQuota.getMsgRateOut(), msgRateOut, RESOURCE_QUOTA_MIN_MSGRATE_OUT,
RESOURCE_QUOTA_MAX_MSGRATE_OUT, timePast));
RESOURCE_QUOTA_MAX_MSGRATE_OUT, timePast));
newQuota.setBandwidthIn(timeSmoothValue(oldQuota.getBandwidthIn(), bandwidthIn,
RESOURCE_QUOTA_MIN_BANDWIDTH_IN, RESOURCE_QUOTA_MAX_BANDWIDTH_IN, timePast));
RESOURCE_QUOTA_MIN_BANDWIDTH_IN, RESOURCE_QUOTA_MAX_BANDWIDTH_IN, timePast));
newQuota.setBandwidthOut(timeSmoothValue(oldQuota.getBandwidthOut(), bandwidthOut,
RESOURCE_QUOTA_MIN_BANDWIDTH_OUT, RESOURCE_QUOTA_MAX_BANDWIDTH_OUT, timePast));
RESOURCE_QUOTA_MIN_BANDWIDTH_OUT, RESOURCE_QUOTA_MAX_BANDWIDTH_OUT, timePast));
newQuota.setMemory(timeSmoothValue(oldQuota.getMemory(), memory, RESOURCE_QUOTA_MIN_MEMORY,
RESOURCE_QUOTA_MAX_MEMORY, timePast));
RESOURCE_QUOTA_MAX_MEMORY, timePast));
return newQuota;
} else {
return oldQuota;
......@@ -500,7 +505,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
totalBundles++;
NamespaceBundleStats stats = statsEntry.getValue();
totalMemGroups += (1
+ (stats.topics + stats.producerCount + stats.consumerCount) / memObjectGroupSize);
+ (stats.topics + stats.producerCount + stats.consumerCount) / memObjectGroupSize);
totalBandwidthIn += stats.msgThroughputIn;
totalBandwidthOut += stats.msgThroughputOut;
}
......@@ -517,18 +522,18 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
this.lastResourceQuotaUpdateTimestamp = loadReportTimestamp;
if (totalMsgRate > 1000 && totalMemGroups > 30) {
this.realtimeCpuLoadFactor = timeSmoothValue(this.realtimeCpuLoadFactor, totalCpuUsage / totalMsgRate,
RESOURCE_QUOTA_MIN_CPU_FACTOR, RESOURCE_QUOTA_MAX_CPU_FACTOR, timePast);
RESOURCE_QUOTA_MIN_CPU_FACTOR, RESOURCE_QUOTA_MAX_CPU_FACTOR, timePast);
this.realtimeMemoryLoadFactor = timeSmoothValue(this.realtimeMemoryLoadFactor,
totalMemoryUsage / totalMemGroups, RESOURCE_QUOTA_MIN_MEM_FACTOR, RESOURCE_QUOTA_MAX_MEM_FACTOR,
timePast);
totalMemoryUsage / totalMemGroups, RESOURCE_QUOTA_MIN_MEM_FACTOR, RESOURCE_QUOTA_MAX_MEM_FACTOR,
timePast);
}
// calculate average bundle
if (totalBundles > 30 && this.realtimeAvgResourceQuota.getDynamic()) {
ResourceQuota oldQuota = this.realtimeAvgResourceQuota;
ResourceQuota newQuota = timeSmoothQuota(oldQuota, totalMsgRateIn / totalBundles,
totalMsgRateOut / totalBundles, totalBandwidthIn / totalBundles,
totalBandwidthOut / totalBundles, totalMemoryUsage / totalBundles, timePast);
totalMsgRateOut / totalBundles, totalBandwidthIn / totalBundles,
totalBandwidthOut / totalBundles, totalMemoryUsage / totalBundles, timePast);
this.realtimeAvgResourceQuota = newQuota;
}
......@@ -546,12 +551,12 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
String bundle = statsEntry.getKey();
NamespaceBundleStats stats = statsEntry.getValue();
long memGroupCount = (1
+ (stats.topics + stats.producerCount + stats.consumerCount) / memObjectGroupSize);
+ (stats.topics + stats.producerCount + stats.consumerCount) / memObjectGroupSize);
double newMemoryQuota = memGroupCount * this.realtimeMemoryLoadFactor;
ResourceQuota oldQuota = getResourceQuota(bundle);
ResourceQuota newQuota = timeSmoothQuota(oldQuota, stats.msgRateIn, stats.msgRateOut,
stats.msgThroughputIn, stats.msgThroughputOut, newMemoryQuota, timePast);
stats.msgThroughputIn, stats.msgThroughputOut, newMemoryQuota, timePast);
newQuotas.put(bundle, newQuota);
}
}
......@@ -562,20 +567,20 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
private void compareAndWriteQuota(String bundle, ResourceQuota oldQuota, ResourceQuota newQuota) throws Exception {
boolean needUpdate = true;
if (!oldQuota.getDynamic() || (Math
.abs(newQuota.getMsgRateIn() - oldQuota.getMsgRateIn()) < RESOURCE_QUOTA_MIN_MSGRATE_IN
&& Math.abs(newQuota.getMsgRateOut() - oldQuota.getMsgRateOut()) < RESOURCE_QUOTA_MIN_MSGRATE_OUT
&& Math.abs(newQuota.getBandwidthIn() - oldQuota.getBandwidthOut()) < RESOURCE_QUOTA_MIN_BANDWIDTH_IN
&& Math.abs(newQuota.getBandwidthOut() - oldQuota.getBandwidthOut()) < RESOURCE_QUOTA_MIN_BANDWIDTH_OUT
&& Math.abs(newQuota.getMemory() - oldQuota.getMemory()) < RESOURCE_QUOTA_MIN_MEMORY)) {
.abs(newQuota.getMsgRateIn() - oldQuota.getMsgRateIn()) < RESOURCE_QUOTA_MIN_MSGRATE_IN
&& Math.abs(newQuota.getMsgRateOut() - oldQuota.getMsgRateOut()) < RESOURCE_QUOTA_MIN_MSGRATE_OUT
&& Math.abs(newQuota.getBandwidthIn() - oldQuota.getBandwidthOut()) < RESOURCE_QUOTA_MIN_BANDWIDTH_IN
&& Math.abs(newQuota.getBandwidthOut() - oldQuota.getBandwidthOut()) < RESOURCE_QUOTA_MIN_BANDWIDTH_OUT
&& Math.abs(newQuota.getMemory() - oldQuota.getMemory()) < RESOURCE_QUOTA_MIN_MEMORY)) {
needUpdate = false;
}
if (needUpdate) {
if (log.isDebugEnabled()) {
log.debug(String.format(
"Update quota %s - msgRateIn: %.1f, msgRateOut: %.1f, bandwidthIn: %.1f, bandwidthOut: %.1f, memory: %.1f",
(bundle == null) ? "default" : bundle, newQuota.getMsgRateIn(), newQuota.getMsgRateOut(),
newQuota.getBandwidthIn(), newQuota.getBandwidthOut(), newQuota.getMemory()));
"Update quota %s - msgRateIn: %.1f, msgRateOut: %.1f, bandwidthIn: %.1f, bandwidthOut: %.1f, memory: %.1f",
(bundle == null) ? "default" : bundle, newQuota.getMsgRateIn(), newQuota.getMsgRateOut(),
newQuota.getBandwidthIn(), newQuota.getBandwidthOut(), newQuota.getMemory()));
}
if (bundle == null) {
......@@ -656,13 +661,13 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
ResourceQuota allocatedQuota = getTotalAllocatedQuota(loadedBundles);
ResourceQuota preAllocatedQuota = getTotalAllocatedQuota(preAllocatedBundles);
ResourceUnitRanking ranking = new ResourceUnitRanking(loadReport.getSystemResourceUsage(),
loadedBundles, allocatedQuota, preAllocatedBundles, preAllocatedQuota);
loadedBundles, allocatedQuota, preAllocatedBundles, preAllocatedQuota);
newResourceUnitRankings.put(resourceUnit, ranking);
// generated sorted ranking
double loadPercentage = ranking.getEstimatedLoadPercentage();
long maxCapacity = ranking.estimateMaxCapacity(
pulsar.getLocalZkCacheService().getResourceQuotaCache().getDefaultQuota());
pulsar.getLocalZkCacheService().getResourceQuotaCache().getDefaultQuota());
long finalRank = 0;
if (strategy.equals(LOADBALANCER_STRATEGY_LLS)) {
finalRank = (long) loadPercentage;
......@@ -689,7 +694,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
}
} else {
log.info("Leader broker[{}] No ResourceUnits to rank this run, Using Old Ranking",
pulsar.getWebServiceAddress());
pulsar.getWebServiceAddress());
}
}
......@@ -834,15 +839,15 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
}
private Multimap<Long, ResourceUnit> getFinalCandidatesWithPolicy(NamespaceName namespace,
Multimap<Long, ResourceUnit> primaries, Multimap<Long, ResourceUnit> shared) {
Multimap<Long, ResourceUnit> primaries, Multimap<Long, ResourceUnit> shared) {
Multimap<Long, ResourceUnit> finalCandidates = TreeMultimap.create();
// if not enough primary then it should be union of primaries and secondaries
finalCandidates.putAll(primaries);
if (policies.shouldFailoverToSecondaries(namespace, primaries.size())) {
log.debug(
"Not enough of primaries [{}] available for namespace - [{}], "
+ "adding shared [{}] as possible candidate owners",
primaries.size(), namespace.toString(), shared.size());
"Not enough of primaries [{}] available for namespace - [{}], "
+ "adding shared [{}] as possible candidate owners",
primaries.size(), namespace.toString(), shared.size());
finalCandidates.putAll(shared);
}
return finalCandidates;
......@@ -856,7 +861,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
}
private Multimap<Long, ResourceUnit> getFinalCandidates(ServiceUnitId serviceUnit,
Map<Long, Set<ResourceUnit>> availableBrokers) {
Map<Long, Set<ResourceUnit>> availableBrokers) {
// need multimap or at least set of RUs
Multimap<Long, ResourceUnit> matchedPrimaries = TreeMultimap.create();
Multimap<Long, ResourceUnit> matchedShared = TreeMultimap.create();
......@@ -869,7 +874,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
for (Map.Entry<Long, Set<ResourceUnit>> entry : availableBrokers.entrySet()) {
for (ResourceUnit ru : entry.getValue()) {
log.debug("Considering Resource Unit [{}] with Rank [{}] for serviceUnit [{}]", ru.getResourceId(),
entry.getKey(), serviceUnit);
entry.getKey(), serviceUnit);
URL brokerUrl = null;
try {
brokerUrl = new URL(String.format(ru.getResourceId()));
......@@ -884,22 +889,22 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
matchedPrimaries.put(entry.getKey(), ru);
if (log.isDebugEnabled()) {
log.debug(
"Added Primary Broker - [{}] as possible Candidates for"
+ " namespace - [{}] with policies",
brokerUrl.getHost(), namespace.toString());
"Added Primary Broker - [{}] as possible Candidates for"
+ " namespace - [{}] with policies",
brokerUrl.getHost(), namespace.toString());
}
} else if (policies.isSharedBroker(brokerUrl.getHost())) {
matchedShared.put(entry.getKey(), ru);
if (log.isDebugEnabled()) {
log.debug(
"Added Shared Broker - [{}] as possible "
+ "Candidates for namespace - [{}] with policies",
brokerUrl.getHost(), namespace.toString());
"Added Shared Broker - [{}] as possible "
+ "Candidates for namespace - [{}] with policies",
brokerUrl.getHost(), namespace.toString());
}
} else {
if (log.isDebugEnabled()) {
log.debug("Skipping Broker - [{}] not primary broker and not shared"
+ " for namespace - [{}] ", brokerUrl.getHost(), namespace.toString());
+ " for namespace - [{}] ", brokerUrl.getHost(), namespace.toString());
}
}
......@@ -907,7 +912,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
if (policies.isSharedBroker(brokerUrl.getHost())) {
matchedShared.put(entry.getKey(), ru);
log.debug("Added Shared Broker - [{}] as possible Candidates for namespace - [{}]",
brokerUrl.getHost(), namespace.toString());
brokerUrl.getHost(), namespace.toString());
}
}
}
......@@ -916,9 +921,9 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
return getFinalCandidatesWithPolicy(namespace, matchedPrimaries, matchedShared);
} else {
log.debug(
"Policies not present for namespace - [{}] so only "
+ "considering shared [{}] brokers for possible owner",
namespace.toString(), matchedShared.size());
"Policies not present for namespace - [{}] so only "
+ "considering shared [{}] brokers for possible owner",
namespace.toString(), matchedShared.size());
return getFinalCandidatesNoPolicy(matchedShared);
}
}
......@@ -946,7 +951,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
availableBrokers = Maps.newTreeMap();
for (String broker : activeBrokers) {
ResourceUnit resourceUnit = new SimpleResourceUnit(String.format("http://%s", broker),
new PulsarResourceDescription());
new PulsarResourceDescription());
availableBrokers.computeIfAbsent(0L, key -> Sets.newTreeSet()).add(resourceUnit);
}
log.info("Choosing at random from broker list: [{}]", availableBrokers.values());
......@@ -955,7 +960,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
}
private ResourceUnit getLeastLoadedBroker(ServiceUnitId serviceUnit,
Map<Long, Set<ResourceUnit>> availableBrokers) {
Map<Long, Set<ResourceUnit>> availableBrokers) {
ResourceUnit selectedBroker = null;
Multimap<Long, ResourceUnit> finalCandidates = getFinalCandidates(serviceUnit, availableBrokers);
// Remove candidates that point to inactive brokers
......@@ -982,7 +987,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
selectedBroker = placementStrategy.findBrokerForPlacement(finalCandidates);
}
log.debug("Selected : [{}] for ServiceUnit : [{}]", selectedBroker.getResourceId(),
serviceUnit.getNamespaceObject().toString());
serviceUnit.getNamespaceObject().toString());
return selectedBroker;
} else {
// No available broker found
......@@ -1012,9 +1017,9 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
try {
String key = String.format("%s/%s", LOADBALANCE_BROKERS_ROOT, broker);
LoadReport lr = loadReportCacheZk.get(key)
.orElseThrow(() -> new KeeperException.NoNodeException());
.orElseThrow(() -> new KeeperException.NoNodeException());
ResourceUnit ru = new SimpleResourceUnit(String.format("http://%s", lr.getName()),
fromLoadReport(lr));
fromLoadReport(lr));
this.currentLoadReports.put(ru, lr);
} catch (Exception e) {
log.warn("Error reading load report from Cache for broker - [{}], [{}]", broker, e);
......@@ -1030,14 +1035,14 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
public static boolean isAboveLoadLevel(SystemResourceUsage usage, float thresholdPercentage) {
return (usage.bandwidthOut.percentUsage() > thresholdPercentage
|| usage.bandwidthIn.percentUsage() > thresholdPercentage
|| usage.cpu.percentUsage() > thresholdPercentage || usage.memory.percentUsage() > thresholdPercentage);
|| usage.bandwidthIn.percentUsage() > thresholdPercentage
|| usage.cpu.percentUsage() > thresholdPercentage || usage.memory.percentUsage() > thresholdPercentage);
}
public static boolean isBelowLoadLevel(SystemResourceUsage usage, float thresholdPercentage) {
return (usage.bandwidthOut.percentUsage() < thresholdPercentage
&& usage.bandwidthIn.percentUsage() < thresholdPercentage
&& usage.cpu.percentUsage() < thresholdPercentage && usage.memory.percentUsage() < thresholdPercentage);
&& usage.bandwidthIn.percentUsage() < thresholdPercentage
&& usage.cpu.percentUsage() < thresholdPercentage && usage.memory.percentUsage() < thresholdPercentage);
}
private static long getRealtimeJvmHeapUsageMBytes() {
......@@ -1060,22 +1065,18 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
}
private SystemResourceUsage getSystemResourceUsage() throws IOException {
SystemResourceUsage systemResourceUsage = new SystemResourceUsage();
if (isNotEmpty(pulsar.getConfiguration().getLoadBalancerHostUsageScriptPath())) {
systemResourceUsage = ObjectMapperFactory.getThreadLocal().readValue(brokerHostUsage.getBrokerHostUsage(),
SystemResourceUsage.class);
// Override System memory usage and limit with JVM heap usage and limit
long maxHeapMemoryInBytes = Runtime.getRuntime().maxMemory();
long memoryUsageInMBytes = getAverageJvmHeapUsageMBytes();
systemResourceUsage.memory.usage = (double) memoryUsageInMBytes;
systemResourceUsage.memory.limit = (double) (maxHeapMemoryInBytes) / MBytes;
// Collect JVM direct memory
systemResourceUsage.directMemory.usage = (double) (sun.misc.SharedSecrets.getJavaNioAccess()
.getDirectBufferPool().getMemoryUsed() / MBytes);
systemResourceUsage.directMemory.limit = (double) (sun.misc.VM.maxDirectMemory() / MBytes);
}
SystemResourceUsage systemResourceUsage = brokerHostUsage.getBrokerHostUsage();
// Override System memory usage and limit with JVM heap usage and limit
long maxHeapMemoryInBytes = Runtime.getRuntime().maxMemory();
long memoryUsageInMBytes = getAverageJvmHeapUsageMBytes();
systemResourceUsage.memory.usage = (double) memoryUsageInMBytes;
systemResourceUsage.memory.limit = (double) (maxHeapMemoryInBytes) / MBytes;
// Collect JVM direct memory
systemResourceUsage.directMemory.usage = (double) (sun.misc.SharedSecrets.getJavaNioAccess()
.getDirectBufferPool().getMemoryUsed() / MBytes);
systemResourceUsage.directMemory.limit = (double) (sun.misc.VM.maxDirectMemory() / MBytes);
return systemResourceUsage;
}
......@@ -1089,13 +1090,13 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
try {
LoadReport loadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
loadReport.setName(String.format("%s:%s", pulsar.getAdvertisedAddress(), pulsar.getConfiguration().getWebServicePort()));
SystemResourceUsage systemResourceUsage = this.getSystemResourceUsage();
loadReport.setOverLoaded(
isAboveLoadLevel(systemResourceUsage, this.getLoadBalancerBrokerOverloadedThresholdPercentage()));
isAboveLoadLevel(systemResourceUsage, this.getLoadBalancerBrokerOverloadedThresholdPercentage()));
loadReport.setUnderLoaded(
isBelowLoadLevel(systemResourceUsage, this.getLoadBalancerBrokerUnderloadedThresholdPercentage()));
isBelowLoadLevel(systemResourceUsage, this.getLoadBalancerBrokerUnderloadedThresholdPercentage()));
loadReport.setSystemResourceUsage(systemResourceUsage);
loadReport.setBundleStats(pulsar.getBrokerService().getBundleStats());
......@@ -1144,49 +1145,49 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
long newBundleCount = pulsar.getBrokerService().getNumberOfNamespaceBundles();
long bundleCountChange = Math.abs(oldBundleCount - newBundleCount);
long maxCapacity = ResourceUnitRanking.calculateBrokerMaxCapacity(
lastLoadReport.getSystemResourceUsage(),
pulsar.getLocalZkCacheService().getResourceQuotaCache().getDefaultQuota());
lastLoadReport.getSystemResourceUsage(),
pulsar.getLocalZkCacheService().getResourceQuotaCache().getDefaultQuota());
double bundlePercentageChange = (maxCapacity > 0) ? (bundleCountChange * 100 / maxCapacity) : 0;
if (newBundleCount < oldBundleCount || bundlePercentageChange > pulsar.getConfiguration()
.getLoadBalancerReportUpdateThresholdPercentage()) {
.getLoadBalancerReportUpdateThresholdPercentage()) {
needUpdate = true;
}
// check resource usage comparing with last LoadReport
if (!needUpdate && timestampNow - this.lastResourceUsageTimestamp > TimeUnit.MINUTES
.toMillis(pulsar.getConfiguration().getLoadBalancerHostUsageCheckIntervalMinutes())) {
.toMillis(pulsar.getConfiguration().getLoadBalancerHostUsageCheckIntervalMinutes())) {
SystemResourceUsage oldUsage = lastLoadReport.getSystemResourceUsage();
SystemResourceUsage newUsage = this.getSystemResourceUsage();
this.lastResourceUsageTimestamp = timestampNow;
// calculate percentage of change
double cpuChange = (newUsage.cpu.limit > 0)
? ((newUsage.cpu.usage - oldUsage.cpu.usage) * 100 / newUsage.cpu.limit) : 0;
? ((newUsage.cpu.usage - oldUsage.cpu.usage) * 100 / newUsage.cpu.limit) : 0;
double memChange = (newUsage.memory.limit > 0)
? ((newUsage.memory.usage - oldUsage.memory.usage) * 100 / newUsage.memory.limit) : 0;
? ((newUsage.memory.usage - oldUsage.memory.usage) * 100 / newUsage.memory.limit) : 0;
double directMemChange = (newUsage.directMemory.limit > 0)
? ((newUsage.directMemory.usage - oldUsage.directMemory.usage) * 100
/ newUsage.directMemory.limit)
: 0;
? ((newUsage.directMemory.usage - oldUsage.directMemory.usage) * 100
/ newUsage.directMemory.limit)
: 0;
double bandwidthOutChange = (newUsage.bandwidthOut.limit > 0)
? ((newUsage.bandwidthOut.usage - oldUsage.bandwidthOut.usage) * 100
/ newUsage.bandwidthOut.limit)
: 0;
? ((newUsage.bandwidthOut.usage - oldUsage.bandwidthOut.usage) * 100
/ newUsage.bandwidthOut.limit)
: 0;
double bandwidthInChange = (newUsage.bandwidthIn.limit > 0)
? ((newUsage.bandwidthIn.usage - oldUsage.bandwidthIn.usage) * 100
/ newUsage.bandwidthIn.limit)
: 0;
? ((newUsage.bandwidthIn.usage - oldUsage.bandwidthIn.usage) * 100
/ newUsage.bandwidthIn.limit)
: 0;
long resourceChange = (long) Math.min(100.0,
Math.max(Math.abs(cpuChange),
Math.max(Math.abs(directMemChange), Math.max(Math.abs(memChange),
Math.max(Math.abs(bandwidthOutChange), Math.abs(bandwidthInChange))))));
Math.max(Math.abs(cpuChange),
Math.max(Math.abs(directMemChange), Math.max(Math.abs(memChange),
Math.max(Math.abs(bandwidthOutChange), Math.abs(bandwidthInChange))))));
if (resourceChange > pulsar.getConfiguration().getLoadBalancerReportUpdateThresholdPercentage()) {
needUpdate = true;
log.info("LoadReport update triggered by change on resource usage, detal ({}).",
String.format(
"cpu: %.1f%%, mem: %.1f%%, directMemory: %.1f%%, bandwidthIn: %.1f%%, bandwidthOut: %.1f%%)",
cpuChange, memChange, directMemChange, bandwidthInChange, bandwidthOutChange));
String.format(
"cpu: %.1f%%, mem: %.1f%%, directMemory: %.1f%%, bandwidthIn: %.1f%%, bandwidthOut: %.1f%%)",
cpuChange, memChange, directMemChange, bandwidthInChange, bandwidthOutChange));
}
}
}
......@@ -1195,7 +1196,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
if (needUpdate) {
LoadReport lr = generateLoadReport();
pulsar.getZkClient().setData(brokerZnodePath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(lr),
-1);
-1);
this.lastLoadReport = lr;
this.lastResourceUsageTimestamp = lr.getTimestamp();
}
......@@ -1247,10 +1248,10 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
boolean unloadDisabledInLoadShedding = false;
try {
unloadDisabledInLoadShedding = pulsar.getGlobalZkCache()
.exists(AdminResource.LOAD_SHEDDING_UNLOAD_DISABLED_FLAG_PATH);
.exists(AdminResource.LOAD_SHEDDING_UNLOAD_DISABLED_FLAG_PATH);
} catch (Exception e) {
log.warn("Unable to fetch contents of [{}] from global zookeeper",
AdminResource.LOAD_SHEDDING_UNLOAD_DISABLED_FLAG_PATH, e);
AdminResource.LOAD_SHEDDING_UNLOAD_DISABLED_FLAG_PATH, e);
}
return unloadDisabledInLoadShedding;
}
......@@ -1264,17 +1265,17 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
if (!isUnloadDisabledInLoadShedding()) {
log.info("Unloading namespace {} from overloaded broker {}", bundleName, brokerName);
adminCache.get(brokerName).namespaces().unloadNamespaceBundle(
getNamespaceNameFromBundleName(bundleName), getBundleRangeFromBundleName(bundleName));
getNamespaceNameFromBundleName(bundleName), getBundleRangeFromBundleName(bundleName));
log.info("Successfully unloaded namespace {} from broker {}", bundleName, brokerName);
} else {
log.info("DRY RUN: Unload in Load Shedding is disabled. Namespace {} would have been "
+ "unloaded from overloaded broker {} otherwise.", bundleName, brokerName);
+ "unloaded from overloaded broker {} otherwise.", bundleName, brokerName);
}
unloadedHotNamespaceCache.put(bundleName, System.currentTimeMillis());
} else {
// we can't unload this namespace so move to next one
log.info("Can't unload Namespace {} because it was unloaded last at {} and unload interval has "
+ "not exceeded.", bundleName, LocalDateTime.now());
+ "not exceeded.", bundleName, LocalDateTime.now());
}
} catch (Exception e) {
log.warn("ERROR failed to unload the bundle {} from overloaded broker {}", bundleName, brokerName, e);
......@@ -1287,7 +1288,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
long overloadThreshold = this.getLoadBalancerBrokerOverloadedThresholdPercentage();
long comfortLoadLevel = this.getLoadBalancerBrokerComfortLoadThresholdPercentage();
log.info("Running load shedding task as leader broker, overload threshold {}, comfort loadlevel {}",
overloadThreshold, comfortLoadLevel);
overloadThreshold, comfortLoadLevel);
// overloadedRU --> bundleName
Map<ResourceUnit, String> namespaceBundlesToBeUnloaded = new HashMap<>();
synchronized (currentLoadReports) {
......@@ -1302,9 +1303,9 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
// can't unload one namespace, just issue a warning message
String bundleName = lr.getBundleStats().keySet().iterator().next();
log.warn(
"HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. "
+ "No Load Shedding will be done on this broker",
bundleName, overloadedRU.getResourceId());
"HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. "
+ "No Load Shedding will be done on this broker",
bundleName, overloadedRU.getResourceId());
continue;
}
for (Map.Entry<String, NamespaceBundleStats> bundleStat : bundleStats.entrySet()) {
......@@ -1313,14 +1314,14 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
// We need at least one underloaded RU from list of candidates that can host this bundle
if (isBrokerAvailableForRebalancing(bundleStat.getKey(), comfortLoadLevel)) {
log.info(
"Namespace bundle {} will be unloaded from overloaded broker {}, bundle stats (topics: {}, producers {}, "
+ "consumers {}, bandwidthIn {}, bandwidthOut {})",
bundleName, overloadedRU.getResourceId(), stats.topics, stats.producerCount,
stats.consumerCount, stats.msgThroughputIn, stats.msgThroughputOut);
"Namespace bundle {} will be unloaded from overloaded broker {}, bundle stats (topics: {}, producers {}, "
+ "consumers {}, bandwidthIn {}, bandwidthOut {})",
bundleName, overloadedRU.getResourceId(), stats.topics, stats.producerCount,
stats.consumerCount, stats.msgThroughputIn, stats.msgThroughputOut);
namespaceBundlesToBeUnloaded.put(overloadedRU, bundleName);
} else {
log.info("Unable to shed load from broker {}, no brokers with enough capacity available "
+ "for re-balancing {}", overloadedRU.getResourceId(), bundleName);
+ "for re-balancing {}", overloadedRU.getResourceId(), bundleName);
}
break;
}
......@@ -1342,8 +1343,8 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
long maxBundleBandwidth = pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * MBytes;
log.info(
"Running namespace bundle split with thresholds: topics {}, sessions {}, msgRate {}, bandwidth {}, maxBundles {}",
maxBundleTopics, maxBundleSessions, maxBundleMsgRate, maxBundleBandwidth, maxBundleCount);
"Running namespace bundle split with thresholds: topics {}, sessions {}, msgRate {}, bandwidth {}, maxBundles {}",
maxBundleTopics, maxBundleSessions, maxBundleMsgRate, maxBundleBandwidth, maxBundleCount);
if (this.lastLoadReport == null || this.lastLoadReport.getBundleStats() == null) {
return;
}
......@@ -1360,7 +1361,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
boolean needSplit = false;
if (stats.topics > maxBundleTopics || totalSessions > maxBundleSessions || totalMsgRate > maxBundleMsgRate
|| totalBandwidth > maxBundleBandwidth) {
|| totalBandwidth > maxBundleBandwidth) {
if (stats.topics <= 1) {
log.info("Unable to split hot namespace bundle {} since there is only one topic.", bundleName);
} else {
......@@ -1368,7 +1369,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
int numBundles = pulsar.getNamespaceService().getBundleCount(namespaceName);
if (numBundles >= maxBundleCount) {
log.info("Unable to split hot namespace bundle {} since the namespace has too many bundles.",
bundleName);
bundleName);
} else {
needSplit = true;
}
......@@ -1378,13 +1379,13 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
if (needSplit) {
if (this.getLoadBalancerAutoBundleSplitEnabled()) {
log.info(
"Will split hot namespace bundle {}, topics {}, producers+consumers {}, msgRate in+out {}, bandwidth in+out {}",
bundleName, stats.topics, totalSessions, totalMsgRate, totalBandwidth);
"Will split hot namespace bundle {}, topics {}, producers+consumers {}, msgRate in+out {}, bandwidth in+out {}",
bundleName, stats.topics, totalSessions, totalMsgRate, totalBandwidth);
bundlesToBeSplit.add(bundleName);
} else {
log.info(
"DRY RUN - split hot namespace bundle {}, topics {}, producers+consumers {}, msgRate in+out {}, bandwidth in+out {}",
bundleName, stats.topics, totalSessions, totalMsgRate, totalBandwidth);
"DRY RUN - split hot namespace bundle {}, topics {}, producers+consumers {}, msgRate in+out {}, bandwidth in+out {}",
bundleName, stats.topics, totalSessions, totalMsgRate, totalBandwidth);
}
}
}
......@@ -1393,7 +1394,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
for (String bundleName : bundlesToBeSplit) {
try {
pulsar.getAdminClient().namespaces().splitNamespaceBundle(
getNamespaceNameFromBundleName(bundleName), getBundleRangeFromBundleName(bundleName));
getNamespaceNameFromBundleName(bundleName), getBundleRangeFromBundleName(bundleName));
log.info("Successfully split namespace bundle {}", bundleName);
} catch (Exception e) {
log.error("Failed to split namespace bundle {}", bundleName, e);
......
......@@ -168,7 +168,6 @@ public class PulsarBrokerStarterTest {
printWriter.println("managedLedgerDefaultAckQuorum=1");
printWriter.println("loadBalancerEnabled=false");
printWriter.println("loadBalancerHostUsageScriptPath=/usr/bin/my_pulsar-broker-host-usage");
printWriter.println("loadBalancerHostUsageCheckIntervalMinutes=4");
printWriter.println("loadBalancerReportUpdateThresholdPercentage=15");
printWriter.println("loadBalancerReportUpdateMaxIntervalMinutes=20");
......@@ -186,7 +185,6 @@ public class PulsarBrokerStarterTest {
Assert.assertTrue(ServiceConfiguration.class.isInstance(returnValue));
ServiceConfiguration serviceConfig = (ServiceConfiguration) returnValue;
Assert.assertEquals(serviceConfig.isLoadBalancerEnabled(), false);
Assert.assertEquals(serviceConfig.getLoadBalancerHostUsageScriptPath(), "/usr/bin/my_pulsar-broker-host-usage");
Assert.assertEquals(serviceConfig.getLoadBalancerHostUsageCheckIntervalMinutes(), 4);
Assert.assertEquals(serviceConfig.getLoadBalancerReportUpdateThresholdPercentage(), 15);
Assert.assertEquals(serviceConfig.getLoadBalancerReportUpdateMaxIntervalMinutes(), 20);
......
......@@ -211,9 +211,11 @@ public class LoadBalancerTest {
printSortedRanking(sortedRanking);
// all brokers have same rank to it would be 0 --> set-of-all-the-brokers
assertEquals(sortedRanking.get().size(), 1);
assertTrue(sortedRanking.get().get(0L) != null);
assertEquals(sortedRanking.get().get(0L).size(), BROKER_COUNT);
int brokerCount = 0;
for (Map.Entry<Long, Set<ResourceUnit>> entry : sortedRanking.get().entrySet()) {
brokerCount += entry.getValue().size();
}
assertEquals(brokerCount, BROKER_COUNT);
DestinationName fqdn = DestinationName.get("persistent://pulsar/use/primary-ns/test-topic");
ResourceUnit found = pulsarServices[i].getLoadManager()
.getLeastLoaded(pulsarServices[i].getNamespaceService().getBundle(fqdn));
......
......@@ -27,6 +27,7 @@ import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import org.apache.commons.lang3.SystemUtils;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URL;
......@@ -41,6 +42,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import com.yahoo.pulsar.broker.loadbalance.impl.*;
import org.apache.bookkeeper.test.PortManager;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.CreateMode;
......@@ -57,12 +59,6 @@ import com.google.common.collect.Sets;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.admin.AdminResource;
import com.yahoo.pulsar.broker.loadbalance.impl.PulsarLoadReportImpl;
import com.yahoo.pulsar.broker.loadbalance.impl.PulsarResourceDescription;
import com.yahoo.pulsar.broker.loadbalance.impl.ResourceAvailabilityRanker;
import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadCalculatorImpl;
import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import com.yahoo.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
import com.yahoo.pulsar.client.admin.BrokerStats;
import com.yahoo.pulsar.client.admin.PulsarAdmin;
import com.yahoo.pulsar.client.api.Authentication;
......@@ -443,14 +439,14 @@ public class SimpleLoadManagerImplTest {
@Test
public void testBrokerHostUsage() {
when(pulsar1.getConfiguration().getLoadBalancerHostUsageScriptPath()).thenReturn("usageScript");
BrokerHostUsage brokerUsage = new BrokerHostUsage(pulsar1);
try {
brokerUsage.getBrokerHostUsage();
fail();
} catch (IOException e) {
// Ok
BrokerHostUsage brokerUsage;
if (SystemUtils.IS_OS_LINUX) {
brokerUsage = new LinuxBrokerHostUsageImpl(pulsar1);
} else {
brokerUsage = new GenericBrokerHostUsageImpl(pulsar1);
}
brokerUsage.getBrokerHostUsage();
// Ok
}
@Test
......
......@@ -52,7 +52,6 @@ managedLedgerCursorRolloverTimeInSeconds=14400
loadBalancerEnabled=false
loadBalancerReportUpdateThresholdPercentage=10
loadBalancerReportUpdateMaxIntervalMinutes=15
loadBalancerHostUsageScriptPath=/usr/bin/pulsar-broker-host-usage
loadBalancerHostUsageCheckIntervalMinutes=1
loadBalancerSheddingIntervalMinutes=30
loadBalancerSheddingGracePeriodMinutes=30
......
#!/bin/bash
# response to test the load-report API : BrokerHostUsage execs a script that returns system resource usage;
# Since the original script is system dependent and also requires /home/y location, this script is the dummy one
# returning a fake/mock response, so we can test the load-report API end-to-end including the path executing the script
echo "{\"bandwidthIn\":{\"usage\": 0,\"limit\": 0},\"bandwidthOut\":{\"usage\": 0,\"limit\": 0},\"memory\":{\"usage\":3240,\"limit\": 12829},\"cpu\": {\"usage\": 10.0,\"limit\": 16}, \"threads\":{\"usage\":10,\"limit\":100}}"
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册