From f9993942932f67549fda74a79f90d7b736a15127 Mon Sep 17 00:00:00 2001 From: sschepens Date: Wed, 16 Nov 2016 20:24:35 +0200 Subject: [PATCH] Implement BrokerHostUsage using java (#88) --- conf/broker.conf | 5 +- conf/standalone.conf | 5 +- .../pulsar/broker/ServiceConfiguration.java | 13 +- .../yahoo/pulsar/broker/PulsarService.java | 14 +- .../broker/loadbalance/BrokerHostUsage.java | 68 +---- .../impl/GenericBrokerHostUsageImpl.java | 96 ++++++ .../impl/LinuxBrokerHostUsageImpl.java | 241 +++++++++++++++ .../impl/SimpleLoadManagerImpl.java | 285 +++++++++--------- .../yahoo/pulsar/PulsarBrokerStarterTest.java | 2 - .../broker/loadbalance/LoadBalancerTest.java | 8 +- .../SimpleLoadManagerImplTest.java | 22 +- .../configurations/pulsar_broker_test.conf | 1 - .../test-script-pulsar-broker-host-usage | 7 - 13 files changed, 507 insertions(+), 260 deletions(-) create mode 100644 pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/GenericBrokerHostUsageImpl.java create mode 100644 pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java delete mode 100644 pulsar-broker/src/test/resources/test-script-pulsar-broker-host-usage diff --git a/conf/broker.conf b/conf/broker.conf index b532478e56d..5318fea9199 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -212,10 +212,7 @@ loadBalancerReportUpdateThresholdPercentage=10 # maximum interval to update load report loadBalancerReportUpdateMaxIntervalMinutes=15 -# Path for the script used to retrieve system usage -loadBalancerHostUsageScriptPath= - -# Frequency of sar report to collect +# Frequency of report to collect loadBalancerHostUsageCheckIntervalMinutes=1 # Load shedding interval. Broker periodically checks whether some traffic should be offload from diff --git a/conf/standalone.conf b/conf/standalone.conf index bc42c985223..3bf62d54885 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -185,10 +185,7 @@ loadBalancerReportUpdateThresholdPercentage=10 # maximum interval to update load report loadBalancerReportUpdateMaxIntervalMinutes=15 -# Path for the script used to retrieve system usage -loadBalancerHostUsageScriptPath= - -# Frequency of sar report to collect +# Frequency of report to collect loadBalancerHostUsageCheckIntervalMinutes=1 # Load shedding interval. Broker periodically checks whether some traffic should be offload from diff --git a/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java index 80ce45a0743..11c22201017 100644 --- a/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java @@ -186,10 +186,7 @@ public class ServiceConfiguration { private int loadBalancerReportUpdateThresholdPercentage = 10; // maximum interval to update load report private int loadBalancerReportUpdateMaxIntervalMinutes = 15; - // Path for the script used to retrieve system usage - @FieldContext(required = false) - private String loadBalancerHostUsageScriptPath; - // Frequency of sar report to collect + // Frequency of report to collect private int loadBalancerHostUsageCheckIntervalMinutes = 1; // Load shedding interval. Broker periodically checks whether some traffic // should be offload from @@ -715,14 +712,6 @@ public class ServiceConfiguration { this.loadBalancerReportUpdateMaxIntervalMinutes = loadBalancerReportUpdateMaxIntervalMinutes; } - public String getLoadBalancerHostUsageScriptPath() { - return loadBalancerHostUsageScriptPath; - } - - public void setLoadBalancerHostUsageScriptPath(String loadBalancerHostUsageScriptPath) { - this.loadBalancerHostUsageScriptPath = loadBalancerHostUsageScriptPath; - } - public int getLoadBalancerHostUsageCheckIntervalMinutes() { return loadBalancerHostUsageCheckIntervalMinutes; } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java index 01b753e7742..3d722789419 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java @@ -128,6 +128,7 @@ public class PulsarService implements AutoCloseable { this.brokerServiceUrlTls = brokerUrlTls(config); this.config = config; this.shutdownService = new MessagingServiceShutdownHook(this); + loadManagerExecutor = Executors.newSingleThreadScheduledExecutor(); } /** @@ -233,11 +234,11 @@ public class PulsarService implements AutoCloseable { managedLedgerClientFactory = new ManagedLedgerClientFactory(config, getZkClient(), getBookKeeperClientFactory()); + this.brokerService = new BrokerService(this); + // Start load management service (even if load balancing is disabled) this.loadManager = new SimpleLoadManagerImpl(this); - this.brokerService = new BrokerService(this); - this.startLoadManagementService(); // needs load management service @@ -400,11 +401,6 @@ public class PulsarService implements AutoCloseable { this.loadManager.start(); if (config.isLoadBalancerEnabled()) { - if (loadManagerExecutor == null) { - loadManagerExecutor = Executors.newSingleThreadScheduledExecutor(); - ; - } - LOG.info("Starting load balancer"); if (this.loadReportTask == null) { long loadReportMinInterval = SimpleLoadManagerImpl.LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL; @@ -533,6 +529,10 @@ public class PulsarService implements AutoCloseable { return executor; } + public ScheduledExecutorService getLoadManagerExecutor() { + return loadManagerExecutor; + } + public OrderedSafeExecutor getOrderedExecutor() { return orderedExecutor; } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/BrokerHostUsage.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/BrokerHostUsage.java index 85378ff8170..530c9a4ec69 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/BrokerHostUsage.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/BrokerHostUsage.java @@ -16,80 +16,18 @@ package com.yahoo.pulsar.broker.loadbalance; import java.io.IOException; -import java.io.StringWriter; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.yahoo.pulsar.broker.PulsarService; -import com.yahoo.pulsar.utils.CmdUtility; +import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage; /** * Class that will return the broker host usage. * * */ -public class BrokerHostUsage { - // The interval for host usage check command - private final int hostUsageCheckInterval; - - // Path to the pulsar-broker-host-usage script - private final String usageScriptPath; - - private static final Logger LOG = LoggerFactory.getLogger(BrokerHostUsage.class); - - public BrokerHostUsage(PulsarService pulsar) { - this.usageScriptPath = pulsar.getConfiguration().getLoadBalancerHostUsageScriptPath(); - this.hostUsageCheckInterval = pulsar.getConfiguration().getLoadBalancerHostUsageCheckIntervalMinutes(); - } - +public interface BrokerHostUsage { /** * Returns the host usage information in the following format - * - *
-     *    {
-     *         "bandwidthIn" : {
-     *           "usage" : "100",
-     *           "limit" : "1000",
-     *        },
-     *        "bandwidthOut" : {
-     *           "usage" : "659",
-     *           "limit" : "1000",
-     *        },
-     *        "memory" : {
-     *           "usage" : "16.0",
-     *           "limit" : "16070",
-     *       }
-     *       "cpu-utilization" : {
-     *           "usage"    : "160.0"
-     *           "limit"    : "1600"
-     *       }
-     *     }
-     * 
- * * @return Broker host usage in the json string format - * - * @throws IOException */ - public String getBrokerHostUsage() throws IOException { - StringWriter writer = new StringWriter(); - try { - /** - * Spawns a python process and runs the usage exporter script. The script return the machine information in - * the json format. - */ - - int exitCode = CmdUtility.exec(writer, usageScriptPath, "--host-usage-check-interval", - Integer.toString(hostUsageCheckInterval)); - if (exitCode != 0) { - LOG.warn("Process exited with non-zero exit code - [{}], stderr - [{}] ", exitCode, writer.toString()); - throw new IOException(writer.toString()); - } - } catch (IOException e) { - e.printStackTrace(); - LOG.warn("Error running the usage script {}", e.getMessage()); - throw e; - } - return writer.toString(); - } + SystemResourceUsage getBrokerHostUsage(); } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/GenericBrokerHostUsageImpl.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/GenericBrokerHostUsageImpl.java new file mode 100644 index 00000000000..1837404a2e5 --- /dev/null +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/GenericBrokerHostUsageImpl.java @@ -0,0 +1,96 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yahoo.pulsar.broker.loadbalance.impl; + +import com.sun.management.OperatingSystemMXBean; +import com.yahoo.pulsar.broker.PulsarService; +import com.yahoo.pulsar.broker.loadbalance.BrokerHostUsage; +import com.yahoo.pulsar.common.policies.data.loadbalancer.ResourceUsage; +import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.concurrent.TimeUnit; + +/** + * Class that will return the broker host usage. + */ +public class GenericBrokerHostUsageImpl implements BrokerHostUsage { + // The interval for host usage check command + private static final int CPU_CHECK_MILLIS = 1000; + private static final Logger LOG = LoggerFactory.getLogger(GenericBrokerHostUsageImpl.class); + private final int hostUsageCheckIntervalMin; + private long lastCollection; + private double totalCpuLimit; + private double cpuUsageSum = 0d; + private int cpuUsageCount = 0; + private OperatingSystemMXBean systemBean; + private SystemResourceUsage usage; + + public GenericBrokerHostUsageImpl(PulsarService pulsar) { + this.hostUsageCheckIntervalMin = pulsar.getConfiguration().getLoadBalancerHostUsageCheckIntervalMinutes(); + this.systemBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); + this.lastCollection = 0L; + this.usage = new SystemResourceUsage(); + this.totalCpuLimit = getTotalCpuLimit(); + pulsar.getLoadManagerExecutor().scheduleAtFixedRate(this::checkCpuLoad, 0, CPU_CHECK_MILLIS, TimeUnit.MILLISECONDS); + pulsar.getLoadManagerExecutor().scheduleAtFixedRate(this::calculateBrokerHostUsage, 0, hostUsageCheckIntervalMin, TimeUnit.MINUTES); + } + + @Override + public SystemResourceUsage getBrokerHostUsage() { + return usage; + } + + private void checkCpuLoad() { + cpuUsageSum += systemBean.getSystemCpuLoad(); + cpuUsageCount++; + } + + private void calculateBrokerHostUsage() { + SystemResourceUsage usage = new SystemResourceUsage(); + usage.setCpu(getCpuUsage()); + usage.setMemory(getMemUsage()); + + this.usage = usage; + } + + private double getTotalCpuLimit() { + return (double) (100 * Runtime.getRuntime().availableProcessors()); + } + + private double getTotalCpuUsage() { + double cpuUsage = cpuUsageSum / cpuUsageCount; + cpuUsageSum = 0d; + cpuUsageCount = 0; + return cpuUsage; + } + + private ResourceUsage getCpuUsage() { + if (cpuUsageCount == 0) { + return new ResourceUsage(0, totalCpuLimit); + } + return new ResourceUsage(getTotalCpuUsage() * totalCpuLimit, totalCpuLimit); + } + + private ResourceUsage getMemUsage() { + double total = ((double) systemBean.getTotalPhysicalMemorySize()) / (1024 * 1024); + double free = ((double) systemBean.getFreePhysicalMemorySize()) / (1024 * 1024); + return new ResourceUsage(total - free, total); + } +} diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java new file mode 100644 index 00000000000..fe353da92a1 --- /dev/null +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java @@ -0,0 +1,241 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yahoo.pulsar.broker.loadbalance.impl; + +import com.sun.management.OperatingSystemMXBean; +import com.yahoo.pulsar.broker.PulsarService; +import com.yahoo.pulsar.broker.loadbalance.BrokerHostUsage; +import com.yahoo.pulsar.common.policies.data.loadbalancer.ResourceUsage; +import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Class that will return the broker host usage. + * + * + */ +public class LinuxBrokerHostUsageImpl implements BrokerHostUsage { + // The interval for host usage check command + private final int hostUsageCheckInterval; + private long lastCollection; + private double lastTotalNicUsageTx; + private double lastTotalNicUsageRx; + private CpuStat lastCpuStat; + private OperatingSystemMXBean systemBean; + private SystemResourceUsage usage; + + private static final Logger LOG = LoggerFactory.getLogger(LinuxBrokerHostUsageImpl.class); + + public LinuxBrokerHostUsageImpl(PulsarService pulsar) { + this.hostUsageCheckInterval = pulsar.getConfiguration().getLoadBalancerHostUsageCheckIntervalMinutes(); + this.systemBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); + this.lastCollection = 0L; + this.usage = new SystemResourceUsage(); + pulsar.getLoadManagerExecutor().scheduleAtFixedRate(this::calculateBrokerHostUsage, 0, hostUsageCheckInterval, TimeUnit.SECONDS); + } + + @Override + public SystemResourceUsage getBrokerHostUsage() { + return usage; + } + + private void calculateBrokerHostUsage() { + List nics = getNics(); + double totalNicLimit = getTotalNicLimitKbps(nics); + double totalNicUsageTx = getTotalNicUsageTxKb(nics); + double totalNicUsageRx = getTotalNicUsageRxKb(nics); + double totalCpuLimit = getTotalCpuLimit(); + CpuStat cpuStat = getTotalCpuUsage(); + SystemResourceUsage usage = new SystemResourceUsage(); + long now = System.currentTimeMillis(); + + if (lastCollection == 0L) { + usage.setMemory(getMemUsage()); + usage.setBandwidthIn(new ResourceUsage(0d, totalNicLimit)); + usage.setBandwidthOut(new ResourceUsage(0d, totalNicLimit)); + usage.setCpu(new ResourceUsage(0d, totalCpuLimit)); + } else { + double elapsedSeconds = (now - lastCollection) / 1000d; + double nicUsageTx = (totalNicUsageTx - lastTotalNicUsageTx) / elapsedSeconds; + double nicUsageRx = (totalNicUsageRx - lastTotalNicUsageRx) / elapsedSeconds; + + if (cpuStat != null && lastCpuStat != null) { + // we need two non null stats to get a usage report + long cpuTimeDiff = cpuStat.getTotalTime() - lastCpuStat.getTotalTime(); + long cpuUsageDiff = cpuStat.getUsage() - lastCpuStat.getUsage(); + double cpuUsage = ((double) cpuUsageDiff / (double) cpuTimeDiff) * totalCpuLimit; + usage.setCpu(new ResourceUsage(cpuUsage, totalCpuLimit)); + } + + usage.setMemory(getMemUsage()); + usage.setBandwidthIn(new ResourceUsage(nicUsageRx, totalNicLimit)); + usage.setBandwidthOut(new ResourceUsage(nicUsageTx, totalNicLimit)); + } + + lastTotalNicUsageTx = totalNicUsageTx; + lastTotalNicUsageRx = totalNicUsageRx; + lastCpuStat = cpuStat; + lastCollection = System.currentTimeMillis(); + this.usage = usage; + } + + private double getTotalCpuLimit() { + return (double) (100 * Runtime.getRuntime().availableProcessors()); + } + + /** + * Reads first line of /proc/stat to get total cpu usage. + *
+     *     cpu  user   nice system idle    iowait irq softirq steal guest guest_nice
+     *     cpu  317808 128  58637  2503692 7634   0   13472   0     0     0
+     * 
+ * Line is split in "words", filtering the first. + * The sum of all numbers give the amount of cpu cycles used this far. + * Real CPU usage should equal the sum substracting the idle cycles, + * this would include iowait, irq and steal. + */ + private CpuStat getTotalCpuUsage() { + try { + String[] words = Files.lines(Paths.get("/proc/stat")) + .findFirst() + .get().split("\\s+"); + + long total = Arrays.stream(words) + .filter(s -> !s.contains("cpu")) + .mapToLong(Long::parseLong) + .sum(); + + long idle = Long.parseLong(words[4]); + + return new CpuStat(total, total - idle); + } catch (IOException e) { + LOG.error("Failed to read CPU usage from /proc/stat", e); + return null; + } + } + + private ResourceUsage getMemUsage() { + double total = ((double) systemBean.getTotalPhysicalMemorySize()) / (1024 * 1024); + double free = ((double) systemBean.getFreePhysicalMemorySize()) / (1024 * 1024); + return new ResourceUsage(total - free, total); + } + + private List getNics() { + try { + return Files.list(Paths.get("/sys/class/net/")) + .filter(this::isPhysicalNic) + .map(path -> path.getFileName().toString()) + .collect(Collectors.toList()); + } catch (IOException e) { + LOG.error("Failed to find NICs", e); + return Collections.emptyList(); + } + } + + private boolean isPhysicalNic(Path path) { + try { + if (!Files.readSymbolicLink(path).toString().contains("/virtual/")) { + try { + Files.readAllBytes(path.resolve("speed")); + return true; + } catch (Exception e) { + // wireless nics don't report speed, ignore them. + return false; + } + } + return false; + } catch (IOException e) { + LOG.error("Failed to read link target for NIC " + path, e); + return false; + } + } + + private Path getNicSpeedPath(String nic) { + return Paths.get(String.format("/sys/class/net/%s/speed", nic)); + } + + private double getTotalNicLimitKbps(List nics) { + // Nic speed is in Mbits/s, return kbits/s + return nics.stream().mapToDouble(s -> { + try { + return Double.parseDouble(new String(Files.readAllBytes(getNicSpeedPath(s)))); + } catch (IOException e) { + LOG.error("Failed to read speed for nic " + s, e); + return 0d; + } + }).sum() * 1024; + } + + private Path getNicTxPath(String nic) { + return Paths.get(String.format("/sys/class/net/%s/statistics/tx_bytes", nic)); + } + + private Path getNicRxPath(String nic) { + return Paths.get(String.format("/sys/class/net/%s/statistics/rx_bytes", nic)); + } + + private double getTotalNicUsageRxKb(List nics) { + return nics.stream().mapToDouble(s -> { + try { + return Double.parseDouble(new String(Files.readAllBytes(getNicRxPath(s)))); + } catch (IOException e) { + LOG.error("Failed to read rx_bytes for NIC " + s, e); + return 0d; + } + }).sum() * 8 / 1024; + } + + private double getTotalNicUsageTxKb(List nics) { + return nics.stream().mapToDouble(s -> { + try { + return Double.parseDouble(new String(Files.readAllBytes(getNicTxPath(s)))); + } catch (IOException e) { + LOG.error("Failed to read tx_bytes for NIC " + s, e); + return 0d; + } + }).sum() * 8 / 1024; + } + + private class CpuStat { + private long totalTime; + private long usage; + + CpuStat(long totalTime, long usage) { + this.totalTime = totalTime; + this.usage = usage; + } + + long getTotalTime() { + return totalTime; + } + + long getUsage() { + return usage; + } + } +} diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index cbb9476d50f..9a2443d5641 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.apache.bookkeeper.util.ZkUtils; +import org.apache.commons.lang3.SystemUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Ids; @@ -176,8 +177,12 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene this.realtimeAvgResourceQuota = new ResourceQuota(); placementStrategy = new WRRPlacementStrategy(); lastLoadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), - pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls()); - brokerHostUsage = new BrokerHostUsage(pulsar); + pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls()); + if (SystemUtils.IS_OS_LINUX) { + brokerHostUsage = new LinuxBrokerHostUsageImpl(pulsar); + } else { + brokerHostUsage = new GenericBrokerHostUsageImpl(pulsar); + } loadReportCacheZk = new ZooKeeperDataCache(pulsar.getLocalZkCache()) { @Override public LoadReport deserialize(String key, byte[] content) throws Exception { @@ -200,17 +205,17 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene public PulsarAdmin load(String key) throws Exception { // key - broker name already is valid URL, has prefix "http://" return new PulsarAdmin(new URL(key), pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(), - pulsar.getConfiguration().getBrokerClientAuthenticationParameters()); + pulsar.getConfiguration().getBrokerClientAuthenticationParameters()); } }); int entryExpiryTime = (int) pulsar.getConfiguration().getLoadBalancerSheddingGracePeriodMinutes(); unloadedHotNamespaceCache = CacheBuilder.newBuilder().expireAfterWrite(entryExpiryTime, TimeUnit.MINUTES) - .build(new CacheLoader() { - @Override - public Long load(String key) throws Exception { - return System.currentTimeMillis(); - } - }); + .build(new CacheLoader() { + @Override + public Long load(String key) throws Exception { + return System.currentTimeMillis(); + } + }); availableActiveBrokers = new ZooKeeperChildrenCache(pulsar.getLocalZkCache(), LOADBALANCE_BROKERS_ROOT); availableActiveBrokers.registerListener(new ZooKeeperCacheListener>() { @Override @@ -233,7 +238,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene if (pulsar.getZkClient().exists(LOADBALANCE_BROKERS_ROOT, false) == null) { try { ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), LOADBALANCE_BROKERS_ROOT, new byte[0], - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (KeeperException.NodeExistsException e) { // ignore the exception, node might be present already } @@ -254,7 +259,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene } try { ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), brokerZnodePath, - loadReportJson.getBytes(Charsets.UTF_8), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + loadReportJson.getBytes(Charsets.UTF_8), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } catch (Exception e) { // Catching excption here to print the right error message log.error("Unable to create znode - [{}] for load balance on zookeeper ", brokerZnodePath, e); @@ -268,11 +273,11 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene this.realtimeAvgResourceQuota = pulsar.getLocalZkCacheService().getResourceQuotaCache().getDefaultQuota(); this.lastResourceQuotaUpdateTimestamp = System.currentTimeMillis(); this.realtimeCpuLoadFactor = getDynamicConfigurationDouble( - LOADBALANCER_DYNAMIC_SETTING_LOAD_FACTOR_CPU_ZPATH, SETTING_NAME_LOAD_FACTOR_CPU, - this.realtimeCpuLoadFactor); + LOADBALANCER_DYNAMIC_SETTING_LOAD_FACTOR_CPU_ZPATH, SETTING_NAME_LOAD_FACTOR_CPU, + this.realtimeCpuLoadFactor); this.realtimeMemoryLoadFactor = getDynamicConfigurationDouble( - LOADBALANCER_DYNAMIC_SETTING_LOAD_FACTOR_MEM_ZPATH, SETTING_NAME_LOAD_FACTOR_MEM, - this.realtimeMemoryLoadFactor); + LOADBALANCER_DYNAMIC_SETTING_LOAD_FACTOR_MEM_ZPATH, SETTING_NAME_LOAD_FACTOR_MEM, + this.realtimeMemoryLoadFactor); } catch (Exception e) { log.error("Unable to create znode - [{}] for load balance on zookeeper ", brokerZnodePath, e); throw new PulsarServerException(e); @@ -301,7 +306,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene pulsar.getZkClient().setData(zkPath, settingBytes, -1); } else { ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), zkPath, settingBytes, Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); + CreateMode.PERSISTENT); } } catch (Exception e) { log.warn("Got exception when writing to ZooKeeper path [{}]:", zkPath, e); @@ -346,7 +351,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene private String getLoadBalancerPlacementStrategy() { String strategy = this.getDynamicConfigurationFromZK(LOADBALANCER_DYNAMIC_SETTING_STRATEGY_ZPATH, - SETTING_NAME_STRATEGY, pulsar.getConfiguration().getLoadBalancerPlacementStrategy()); + SETTING_NAME_STRATEGY, pulsar.getConfiguration().getLoadBalancerPlacementStrategy()); if (!LOADBALANCER_STRATEGY_LLS.equals(strategy) && !LOADBALANCER_STRATEGY_RAND.equals(strategy)) { strategy = LOADBALANCER_STRATEGY_RAND; } @@ -355,12 +360,12 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene private double getCpuLoadFactorFromZK(double defaultValue) { return getDynamicConfigurationDouble(LOADBALANCER_DYNAMIC_SETTING_LOAD_FACTOR_CPU_ZPATH, - SETTING_NAME_LOAD_FACTOR_CPU, defaultValue); + SETTING_NAME_LOAD_FACTOR_CPU, defaultValue); } private double getMemoryLoadFactorFromZK(double defaultValue) { return getDynamicConfigurationDouble(LOADBALANCER_DYNAMIC_SETTING_LOAD_FACTOR_MEM_ZPATH, - SETTING_NAME_LOAD_FACTOR_MEM, defaultValue); + SETTING_NAME_LOAD_FACTOR_MEM, defaultValue); } @Override @@ -371,26 +376,26 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene private long getLoadBalancerBrokerUnderloadedThresholdPercentage() { return (long) this.getDynamicConfigurationDouble(LOADBALANCER_DYNAMIC_SETTING_UNDERLOAD_THRESHOLD_ZPATH, - SETTING_NAME_UNDERLOAD_THRESHOLD, - pulsar.getConfiguration().getLoadBalancerBrokerUnderloadedThresholdPercentage()); + SETTING_NAME_UNDERLOAD_THRESHOLD, + pulsar.getConfiguration().getLoadBalancerBrokerUnderloadedThresholdPercentage()); } private long getLoadBalancerBrokerOverloadedThresholdPercentage() { return (long) this.getDynamicConfigurationDouble(LOADBALANCER_DYNAMIC_SETTING_OVERLOAD_THRESHOLD_ZPATH, - SETTING_NAME_OVERLOAD_THRESHOLD, - pulsar.getConfiguration().getLoadBalancerBrokerOverloadedThresholdPercentage()); + SETTING_NAME_OVERLOAD_THRESHOLD, + pulsar.getConfiguration().getLoadBalancerBrokerOverloadedThresholdPercentage()); } private long getLoadBalancerBrokerComfortLoadThresholdPercentage() { return (long) this.getDynamicConfigurationDouble(LOADBALANCER_DYNAMIC_SETTING_COMFORT_LOAD_THRESHOLD_ZPATH, - SETTING_NAME_COMFORTLOAD_THRESHOLD, - pulsar.getConfiguration().getLoadBalancerBrokerComfortLoadLevelPercentage()); + SETTING_NAME_COMFORTLOAD_THRESHOLD, + pulsar.getConfiguration().getLoadBalancerBrokerComfortLoadLevelPercentage()); } private boolean getLoadBalancerAutoBundleSplitEnabled() { return this.getDynamicConfigurationBoolean(LOADBALANCER_DYNAMIC_SETTING_AUTO_BUNDLE_SPLIT_ENABLED, - SETTING_NAME_AUTO_BUNDLE_SPLIT_ENABLED, - pulsar.getConfiguration().getLoadBalancerAutoBundleSplitEnabled()); + SETTING_NAME_AUTO_BUNDLE_SPLIT_ENABLED, + pulsar.getConfiguration().getLoadBalancerAutoBundleSplitEnabled()); } /* @@ -451,19 +456,19 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene } private ResourceQuota timeSmoothQuota(ResourceQuota oldQuota, double msgRateIn, double msgRateOut, - double bandwidthIn, double bandwidthOut, double memory, long timePast) { + double bandwidthIn, double bandwidthOut, double memory, long timePast) { if (oldQuota.getDynamic()) { ResourceQuota newQuota = new ResourceQuota(); newQuota.setMsgRateIn(timeSmoothValue(oldQuota.getMsgRateIn(), msgRateIn, RESOURCE_QUOTA_MIN_MSGRATE_IN, - RESOURCE_QUOTA_MAX_MSGRATE_IN, timePast)); + RESOURCE_QUOTA_MAX_MSGRATE_IN, timePast)); newQuota.setMsgRateOut(timeSmoothValue(oldQuota.getMsgRateOut(), msgRateOut, RESOURCE_QUOTA_MIN_MSGRATE_OUT, - RESOURCE_QUOTA_MAX_MSGRATE_OUT, timePast)); + RESOURCE_QUOTA_MAX_MSGRATE_OUT, timePast)); newQuota.setBandwidthIn(timeSmoothValue(oldQuota.getBandwidthIn(), bandwidthIn, - RESOURCE_QUOTA_MIN_BANDWIDTH_IN, RESOURCE_QUOTA_MAX_BANDWIDTH_IN, timePast)); + RESOURCE_QUOTA_MIN_BANDWIDTH_IN, RESOURCE_QUOTA_MAX_BANDWIDTH_IN, timePast)); newQuota.setBandwidthOut(timeSmoothValue(oldQuota.getBandwidthOut(), bandwidthOut, - RESOURCE_QUOTA_MIN_BANDWIDTH_OUT, RESOURCE_QUOTA_MAX_BANDWIDTH_OUT, timePast)); + RESOURCE_QUOTA_MIN_BANDWIDTH_OUT, RESOURCE_QUOTA_MAX_BANDWIDTH_OUT, timePast)); newQuota.setMemory(timeSmoothValue(oldQuota.getMemory(), memory, RESOURCE_QUOTA_MIN_MEMORY, - RESOURCE_QUOTA_MAX_MEMORY, timePast)); + RESOURCE_QUOTA_MAX_MEMORY, timePast)); return newQuota; } else { return oldQuota; @@ -500,7 +505,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene totalBundles++; NamespaceBundleStats stats = statsEntry.getValue(); totalMemGroups += (1 - + (stats.topics + stats.producerCount + stats.consumerCount) / memObjectGroupSize); + + (stats.topics + stats.producerCount + stats.consumerCount) / memObjectGroupSize); totalBandwidthIn += stats.msgThroughputIn; totalBandwidthOut += stats.msgThroughputOut; } @@ -517,18 +522,18 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene this.lastResourceQuotaUpdateTimestamp = loadReportTimestamp; if (totalMsgRate > 1000 && totalMemGroups > 30) { this.realtimeCpuLoadFactor = timeSmoothValue(this.realtimeCpuLoadFactor, totalCpuUsage / totalMsgRate, - RESOURCE_QUOTA_MIN_CPU_FACTOR, RESOURCE_QUOTA_MAX_CPU_FACTOR, timePast); + RESOURCE_QUOTA_MIN_CPU_FACTOR, RESOURCE_QUOTA_MAX_CPU_FACTOR, timePast); this.realtimeMemoryLoadFactor = timeSmoothValue(this.realtimeMemoryLoadFactor, - totalMemoryUsage / totalMemGroups, RESOURCE_QUOTA_MIN_MEM_FACTOR, RESOURCE_QUOTA_MAX_MEM_FACTOR, - timePast); + totalMemoryUsage / totalMemGroups, RESOURCE_QUOTA_MIN_MEM_FACTOR, RESOURCE_QUOTA_MAX_MEM_FACTOR, + timePast); } // calculate average bundle if (totalBundles > 30 && this.realtimeAvgResourceQuota.getDynamic()) { ResourceQuota oldQuota = this.realtimeAvgResourceQuota; ResourceQuota newQuota = timeSmoothQuota(oldQuota, totalMsgRateIn / totalBundles, - totalMsgRateOut / totalBundles, totalBandwidthIn / totalBundles, - totalBandwidthOut / totalBundles, totalMemoryUsage / totalBundles, timePast); + totalMsgRateOut / totalBundles, totalBandwidthIn / totalBundles, + totalBandwidthOut / totalBundles, totalMemoryUsage / totalBundles, timePast); this.realtimeAvgResourceQuota = newQuota; } @@ -546,12 +551,12 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene String bundle = statsEntry.getKey(); NamespaceBundleStats stats = statsEntry.getValue(); long memGroupCount = (1 - + (stats.topics + stats.producerCount + stats.consumerCount) / memObjectGroupSize); + + (stats.topics + stats.producerCount + stats.consumerCount) / memObjectGroupSize); double newMemoryQuota = memGroupCount * this.realtimeMemoryLoadFactor; ResourceQuota oldQuota = getResourceQuota(bundle); ResourceQuota newQuota = timeSmoothQuota(oldQuota, stats.msgRateIn, stats.msgRateOut, - stats.msgThroughputIn, stats.msgThroughputOut, newMemoryQuota, timePast); + stats.msgThroughputIn, stats.msgThroughputOut, newMemoryQuota, timePast); newQuotas.put(bundle, newQuota); } } @@ -562,20 +567,20 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene private void compareAndWriteQuota(String bundle, ResourceQuota oldQuota, ResourceQuota newQuota) throws Exception { boolean needUpdate = true; if (!oldQuota.getDynamic() || (Math - .abs(newQuota.getMsgRateIn() - oldQuota.getMsgRateIn()) < RESOURCE_QUOTA_MIN_MSGRATE_IN - && Math.abs(newQuota.getMsgRateOut() - oldQuota.getMsgRateOut()) < RESOURCE_QUOTA_MIN_MSGRATE_OUT - && Math.abs(newQuota.getBandwidthIn() - oldQuota.getBandwidthOut()) < RESOURCE_QUOTA_MIN_BANDWIDTH_IN - && Math.abs(newQuota.getBandwidthOut() - oldQuota.getBandwidthOut()) < RESOURCE_QUOTA_MIN_BANDWIDTH_OUT - && Math.abs(newQuota.getMemory() - oldQuota.getMemory()) < RESOURCE_QUOTA_MIN_MEMORY)) { + .abs(newQuota.getMsgRateIn() - oldQuota.getMsgRateIn()) < RESOURCE_QUOTA_MIN_MSGRATE_IN + && Math.abs(newQuota.getMsgRateOut() - oldQuota.getMsgRateOut()) < RESOURCE_QUOTA_MIN_MSGRATE_OUT + && Math.abs(newQuota.getBandwidthIn() - oldQuota.getBandwidthOut()) < RESOURCE_QUOTA_MIN_BANDWIDTH_IN + && Math.abs(newQuota.getBandwidthOut() - oldQuota.getBandwidthOut()) < RESOURCE_QUOTA_MIN_BANDWIDTH_OUT + && Math.abs(newQuota.getMemory() - oldQuota.getMemory()) < RESOURCE_QUOTA_MIN_MEMORY)) { needUpdate = false; } if (needUpdate) { if (log.isDebugEnabled()) { log.debug(String.format( - "Update quota %s - msgRateIn: %.1f, msgRateOut: %.1f, bandwidthIn: %.1f, bandwidthOut: %.1f, memory: %.1f", - (bundle == null) ? "default" : bundle, newQuota.getMsgRateIn(), newQuota.getMsgRateOut(), - newQuota.getBandwidthIn(), newQuota.getBandwidthOut(), newQuota.getMemory())); + "Update quota %s - msgRateIn: %.1f, msgRateOut: %.1f, bandwidthIn: %.1f, bandwidthOut: %.1f, memory: %.1f", + (bundle == null) ? "default" : bundle, newQuota.getMsgRateIn(), newQuota.getMsgRateOut(), + newQuota.getBandwidthIn(), newQuota.getBandwidthOut(), newQuota.getMemory())); } if (bundle == null) { @@ -656,13 +661,13 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene ResourceQuota allocatedQuota = getTotalAllocatedQuota(loadedBundles); ResourceQuota preAllocatedQuota = getTotalAllocatedQuota(preAllocatedBundles); ResourceUnitRanking ranking = new ResourceUnitRanking(loadReport.getSystemResourceUsage(), - loadedBundles, allocatedQuota, preAllocatedBundles, preAllocatedQuota); + loadedBundles, allocatedQuota, preAllocatedBundles, preAllocatedQuota); newResourceUnitRankings.put(resourceUnit, ranking); // generated sorted ranking double loadPercentage = ranking.getEstimatedLoadPercentage(); long maxCapacity = ranking.estimateMaxCapacity( - pulsar.getLocalZkCacheService().getResourceQuotaCache().getDefaultQuota()); + pulsar.getLocalZkCacheService().getResourceQuotaCache().getDefaultQuota()); long finalRank = 0; if (strategy.equals(LOADBALANCER_STRATEGY_LLS)) { finalRank = (long) loadPercentage; @@ -689,7 +694,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene } } else { log.info("Leader broker[{}] No ResourceUnits to rank this run, Using Old Ranking", - pulsar.getWebServiceAddress()); + pulsar.getWebServiceAddress()); } } @@ -834,15 +839,15 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene } private Multimap getFinalCandidatesWithPolicy(NamespaceName namespace, - Multimap primaries, Multimap shared) { + Multimap primaries, Multimap shared) { Multimap finalCandidates = TreeMultimap.create(); // if not enough primary then it should be union of primaries and secondaries finalCandidates.putAll(primaries); if (policies.shouldFailoverToSecondaries(namespace, primaries.size())) { log.debug( - "Not enough of primaries [{}] available for namespace - [{}], " - + "adding shared [{}] as possible candidate owners", - primaries.size(), namespace.toString(), shared.size()); + "Not enough of primaries [{}] available for namespace - [{}], " + + "adding shared [{}] as possible candidate owners", + primaries.size(), namespace.toString(), shared.size()); finalCandidates.putAll(shared); } return finalCandidates; @@ -856,7 +861,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene } private Multimap getFinalCandidates(ServiceUnitId serviceUnit, - Map> availableBrokers) { + Map> availableBrokers) { // need multimap or at least set of RUs Multimap matchedPrimaries = TreeMultimap.create(); Multimap matchedShared = TreeMultimap.create(); @@ -869,7 +874,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene for (Map.Entry> entry : availableBrokers.entrySet()) { for (ResourceUnit ru : entry.getValue()) { log.debug("Considering Resource Unit [{}] with Rank [{}] for serviceUnit [{}]", ru.getResourceId(), - entry.getKey(), serviceUnit); + entry.getKey(), serviceUnit); URL brokerUrl = null; try { brokerUrl = new URL(String.format(ru.getResourceId())); @@ -884,22 +889,22 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene matchedPrimaries.put(entry.getKey(), ru); if (log.isDebugEnabled()) { log.debug( - "Added Primary Broker - [{}] as possible Candidates for" - + " namespace - [{}] with policies", - brokerUrl.getHost(), namespace.toString()); + "Added Primary Broker - [{}] as possible Candidates for" + + " namespace - [{}] with policies", + brokerUrl.getHost(), namespace.toString()); } } else if (policies.isSharedBroker(brokerUrl.getHost())) { matchedShared.put(entry.getKey(), ru); if (log.isDebugEnabled()) { log.debug( - "Added Shared Broker - [{}] as possible " - + "Candidates for namespace - [{}] with policies", - brokerUrl.getHost(), namespace.toString()); + "Added Shared Broker - [{}] as possible " + + "Candidates for namespace - [{}] with policies", + brokerUrl.getHost(), namespace.toString()); } } else { if (log.isDebugEnabled()) { log.debug("Skipping Broker - [{}] not primary broker and not shared" - + " for namespace - [{}] ", brokerUrl.getHost(), namespace.toString()); + + " for namespace - [{}] ", brokerUrl.getHost(), namespace.toString()); } } @@ -907,7 +912,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene if (policies.isSharedBroker(brokerUrl.getHost())) { matchedShared.put(entry.getKey(), ru); log.debug("Added Shared Broker - [{}] as possible Candidates for namespace - [{}]", - brokerUrl.getHost(), namespace.toString()); + brokerUrl.getHost(), namespace.toString()); } } } @@ -916,9 +921,9 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene return getFinalCandidatesWithPolicy(namespace, matchedPrimaries, matchedShared); } else { log.debug( - "Policies not present for namespace - [{}] so only " - + "considering shared [{}] brokers for possible owner", - namespace.toString(), matchedShared.size()); + "Policies not present for namespace - [{}] so only " + + "considering shared [{}] brokers for possible owner", + namespace.toString(), matchedShared.size()); return getFinalCandidatesNoPolicy(matchedShared); } } @@ -946,7 +951,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene availableBrokers = Maps.newTreeMap(); for (String broker : activeBrokers) { ResourceUnit resourceUnit = new SimpleResourceUnit(String.format("http://%s", broker), - new PulsarResourceDescription()); + new PulsarResourceDescription()); availableBrokers.computeIfAbsent(0L, key -> Sets.newTreeSet()).add(resourceUnit); } log.info("Choosing at random from broker list: [{}]", availableBrokers.values()); @@ -955,7 +960,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene } private ResourceUnit getLeastLoadedBroker(ServiceUnitId serviceUnit, - Map> availableBrokers) { + Map> availableBrokers) { ResourceUnit selectedBroker = null; Multimap finalCandidates = getFinalCandidates(serviceUnit, availableBrokers); // Remove candidates that point to inactive brokers @@ -982,7 +987,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene selectedBroker = placementStrategy.findBrokerForPlacement(finalCandidates); } log.debug("Selected : [{}] for ServiceUnit : [{}]", selectedBroker.getResourceId(), - serviceUnit.getNamespaceObject().toString()); + serviceUnit.getNamespaceObject().toString()); return selectedBroker; } else { // No available broker found @@ -1012,9 +1017,9 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene try { String key = String.format("%s/%s", LOADBALANCE_BROKERS_ROOT, broker); LoadReport lr = loadReportCacheZk.get(key) - .orElseThrow(() -> new KeeperException.NoNodeException()); + .orElseThrow(() -> new KeeperException.NoNodeException()); ResourceUnit ru = new SimpleResourceUnit(String.format("http://%s", lr.getName()), - fromLoadReport(lr)); + fromLoadReport(lr)); this.currentLoadReports.put(ru, lr); } catch (Exception e) { log.warn("Error reading load report from Cache for broker - [{}], [{}]", broker, e); @@ -1030,14 +1035,14 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene public static boolean isAboveLoadLevel(SystemResourceUsage usage, float thresholdPercentage) { return (usage.bandwidthOut.percentUsage() > thresholdPercentage - || usage.bandwidthIn.percentUsage() > thresholdPercentage - || usage.cpu.percentUsage() > thresholdPercentage || usage.memory.percentUsage() > thresholdPercentage); + || usage.bandwidthIn.percentUsage() > thresholdPercentage + || usage.cpu.percentUsage() > thresholdPercentage || usage.memory.percentUsage() > thresholdPercentage); } public static boolean isBelowLoadLevel(SystemResourceUsage usage, float thresholdPercentage) { return (usage.bandwidthOut.percentUsage() < thresholdPercentage - && usage.bandwidthIn.percentUsage() < thresholdPercentage - && usage.cpu.percentUsage() < thresholdPercentage && usage.memory.percentUsage() < thresholdPercentage); + && usage.bandwidthIn.percentUsage() < thresholdPercentage + && usage.cpu.percentUsage() < thresholdPercentage && usage.memory.percentUsage() < thresholdPercentage); } private static long getRealtimeJvmHeapUsageMBytes() { @@ -1060,22 +1065,18 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene } private SystemResourceUsage getSystemResourceUsage() throws IOException { - SystemResourceUsage systemResourceUsage = new SystemResourceUsage(); - if (isNotEmpty(pulsar.getConfiguration().getLoadBalancerHostUsageScriptPath())) { - systemResourceUsage = ObjectMapperFactory.getThreadLocal().readValue(brokerHostUsage.getBrokerHostUsage(), - SystemResourceUsage.class); - - // Override System memory usage and limit with JVM heap usage and limit - long maxHeapMemoryInBytes = Runtime.getRuntime().maxMemory(); - long memoryUsageInMBytes = getAverageJvmHeapUsageMBytes(); - systemResourceUsage.memory.usage = (double) memoryUsageInMBytes; - systemResourceUsage.memory.limit = (double) (maxHeapMemoryInBytes) / MBytes; - - // Collect JVM direct memory - systemResourceUsage.directMemory.usage = (double) (sun.misc.SharedSecrets.getJavaNioAccess() - .getDirectBufferPool().getMemoryUsed() / MBytes); - systemResourceUsage.directMemory.limit = (double) (sun.misc.VM.maxDirectMemory() / MBytes); - } + SystemResourceUsage systemResourceUsage = brokerHostUsage.getBrokerHostUsage(); + + // Override System memory usage and limit with JVM heap usage and limit + long maxHeapMemoryInBytes = Runtime.getRuntime().maxMemory(); + long memoryUsageInMBytes = getAverageJvmHeapUsageMBytes(); + systemResourceUsage.memory.usage = (double) memoryUsageInMBytes; + systemResourceUsage.memory.limit = (double) (maxHeapMemoryInBytes) / MBytes; + + // Collect JVM direct memory + systemResourceUsage.directMemory.usage = (double) (sun.misc.SharedSecrets.getJavaNioAccess() + .getDirectBufferPool().getMemoryUsed() / MBytes); + systemResourceUsage.directMemory.limit = (double) (sun.misc.VM.maxDirectMemory() / MBytes); return systemResourceUsage; } @@ -1089,13 +1090,13 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene try { LoadReport loadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(), - pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls()); + pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls()); loadReport.setName(String.format("%s:%s", pulsar.getAdvertisedAddress(), pulsar.getConfiguration().getWebServicePort())); SystemResourceUsage systemResourceUsage = this.getSystemResourceUsage(); loadReport.setOverLoaded( - isAboveLoadLevel(systemResourceUsage, this.getLoadBalancerBrokerOverloadedThresholdPercentage())); + isAboveLoadLevel(systemResourceUsage, this.getLoadBalancerBrokerOverloadedThresholdPercentage())); loadReport.setUnderLoaded( - isBelowLoadLevel(systemResourceUsage, this.getLoadBalancerBrokerUnderloadedThresholdPercentage())); + isBelowLoadLevel(systemResourceUsage, this.getLoadBalancerBrokerUnderloadedThresholdPercentage())); loadReport.setSystemResourceUsage(systemResourceUsage); loadReport.setBundleStats(pulsar.getBrokerService().getBundleStats()); @@ -1144,49 +1145,49 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene long newBundleCount = pulsar.getBrokerService().getNumberOfNamespaceBundles(); long bundleCountChange = Math.abs(oldBundleCount - newBundleCount); long maxCapacity = ResourceUnitRanking.calculateBrokerMaxCapacity( - lastLoadReport.getSystemResourceUsage(), - pulsar.getLocalZkCacheService().getResourceQuotaCache().getDefaultQuota()); + lastLoadReport.getSystemResourceUsage(), + pulsar.getLocalZkCacheService().getResourceQuotaCache().getDefaultQuota()); double bundlePercentageChange = (maxCapacity > 0) ? (bundleCountChange * 100 / maxCapacity) : 0; if (newBundleCount < oldBundleCount || bundlePercentageChange > pulsar.getConfiguration() - .getLoadBalancerReportUpdateThresholdPercentage()) { + .getLoadBalancerReportUpdateThresholdPercentage()) { needUpdate = true; } // check resource usage comparing with last LoadReport if (!needUpdate && timestampNow - this.lastResourceUsageTimestamp > TimeUnit.MINUTES - .toMillis(pulsar.getConfiguration().getLoadBalancerHostUsageCheckIntervalMinutes())) { + .toMillis(pulsar.getConfiguration().getLoadBalancerHostUsageCheckIntervalMinutes())) { SystemResourceUsage oldUsage = lastLoadReport.getSystemResourceUsage(); SystemResourceUsage newUsage = this.getSystemResourceUsage(); this.lastResourceUsageTimestamp = timestampNow; // calculate percentage of change double cpuChange = (newUsage.cpu.limit > 0) - ? ((newUsage.cpu.usage - oldUsage.cpu.usage) * 100 / newUsage.cpu.limit) : 0; + ? ((newUsage.cpu.usage - oldUsage.cpu.usage) * 100 / newUsage.cpu.limit) : 0; double memChange = (newUsage.memory.limit > 0) - ? ((newUsage.memory.usage - oldUsage.memory.usage) * 100 / newUsage.memory.limit) : 0; + ? ((newUsage.memory.usage - oldUsage.memory.usage) * 100 / newUsage.memory.limit) : 0; double directMemChange = (newUsage.directMemory.limit > 0) - ? ((newUsage.directMemory.usage - oldUsage.directMemory.usage) * 100 - / newUsage.directMemory.limit) - : 0; + ? ((newUsage.directMemory.usage - oldUsage.directMemory.usage) * 100 + / newUsage.directMemory.limit) + : 0; double bandwidthOutChange = (newUsage.bandwidthOut.limit > 0) - ? ((newUsage.bandwidthOut.usage - oldUsage.bandwidthOut.usage) * 100 - / newUsage.bandwidthOut.limit) - : 0; + ? ((newUsage.bandwidthOut.usage - oldUsage.bandwidthOut.usage) * 100 + / newUsage.bandwidthOut.limit) + : 0; double bandwidthInChange = (newUsage.bandwidthIn.limit > 0) - ? ((newUsage.bandwidthIn.usage - oldUsage.bandwidthIn.usage) * 100 - / newUsage.bandwidthIn.limit) - : 0; + ? ((newUsage.bandwidthIn.usage - oldUsage.bandwidthIn.usage) * 100 + / newUsage.bandwidthIn.limit) + : 0; long resourceChange = (long) Math.min(100.0, - Math.max(Math.abs(cpuChange), - Math.max(Math.abs(directMemChange), Math.max(Math.abs(memChange), - Math.max(Math.abs(bandwidthOutChange), Math.abs(bandwidthInChange)))))); + Math.max(Math.abs(cpuChange), + Math.max(Math.abs(directMemChange), Math.max(Math.abs(memChange), + Math.max(Math.abs(bandwidthOutChange), Math.abs(bandwidthInChange)))))); if (resourceChange > pulsar.getConfiguration().getLoadBalancerReportUpdateThresholdPercentage()) { needUpdate = true; log.info("LoadReport update triggered by change on resource usage, detal ({}).", - String.format( - "cpu: %.1f%%, mem: %.1f%%, directMemory: %.1f%%, bandwidthIn: %.1f%%, bandwidthOut: %.1f%%)", - cpuChange, memChange, directMemChange, bandwidthInChange, bandwidthOutChange)); + String.format( + "cpu: %.1f%%, mem: %.1f%%, directMemory: %.1f%%, bandwidthIn: %.1f%%, bandwidthOut: %.1f%%)", + cpuChange, memChange, directMemChange, bandwidthInChange, bandwidthOutChange)); } } } @@ -1195,7 +1196,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene if (needUpdate) { LoadReport lr = generateLoadReport(); pulsar.getZkClient().setData(brokerZnodePath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(lr), - -1); + -1); this.lastLoadReport = lr; this.lastResourceUsageTimestamp = lr.getTimestamp(); } @@ -1247,10 +1248,10 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene boolean unloadDisabledInLoadShedding = false; try { unloadDisabledInLoadShedding = pulsar.getGlobalZkCache() - .exists(AdminResource.LOAD_SHEDDING_UNLOAD_DISABLED_FLAG_PATH); + .exists(AdminResource.LOAD_SHEDDING_UNLOAD_DISABLED_FLAG_PATH); } catch (Exception e) { log.warn("Unable to fetch contents of [{}] from global zookeeper", - AdminResource.LOAD_SHEDDING_UNLOAD_DISABLED_FLAG_PATH, e); + AdminResource.LOAD_SHEDDING_UNLOAD_DISABLED_FLAG_PATH, e); } return unloadDisabledInLoadShedding; } @@ -1264,17 +1265,17 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene if (!isUnloadDisabledInLoadShedding()) { log.info("Unloading namespace {} from overloaded broker {}", bundleName, brokerName); adminCache.get(brokerName).namespaces().unloadNamespaceBundle( - getNamespaceNameFromBundleName(bundleName), getBundleRangeFromBundleName(bundleName)); + getNamespaceNameFromBundleName(bundleName), getBundleRangeFromBundleName(bundleName)); log.info("Successfully unloaded namespace {} from broker {}", bundleName, brokerName); } else { log.info("DRY RUN: Unload in Load Shedding is disabled. Namespace {} would have been " - + "unloaded from overloaded broker {} otherwise.", bundleName, brokerName); + + "unloaded from overloaded broker {} otherwise.", bundleName, brokerName); } unloadedHotNamespaceCache.put(bundleName, System.currentTimeMillis()); } else { // we can't unload this namespace so move to next one log.info("Can't unload Namespace {} because it was unloaded last at {} and unload interval has " - + "not exceeded.", bundleName, LocalDateTime.now()); + + "not exceeded.", bundleName, LocalDateTime.now()); } } catch (Exception e) { log.warn("ERROR failed to unload the bundle {} from overloaded broker {}", bundleName, brokerName, e); @@ -1287,7 +1288,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene long overloadThreshold = this.getLoadBalancerBrokerOverloadedThresholdPercentage(); long comfortLoadLevel = this.getLoadBalancerBrokerComfortLoadThresholdPercentage(); log.info("Running load shedding task as leader broker, overload threshold {}, comfort loadlevel {}", - overloadThreshold, comfortLoadLevel); + overloadThreshold, comfortLoadLevel); // overloadedRU --> bundleName Map namespaceBundlesToBeUnloaded = new HashMap<>(); synchronized (currentLoadReports) { @@ -1302,9 +1303,9 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene // can't unload one namespace, just issue a warning message String bundleName = lr.getBundleStats().keySet().iterator().next(); log.warn( - "HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. " - + "No Load Shedding will be done on this broker", - bundleName, overloadedRU.getResourceId()); + "HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. " + + "No Load Shedding will be done on this broker", + bundleName, overloadedRU.getResourceId()); continue; } for (Map.Entry bundleStat : bundleStats.entrySet()) { @@ -1313,14 +1314,14 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene // We need at least one underloaded RU from list of candidates that can host this bundle if (isBrokerAvailableForRebalancing(bundleStat.getKey(), comfortLoadLevel)) { log.info( - "Namespace bundle {} will be unloaded from overloaded broker {}, bundle stats (topics: {}, producers {}, " - + "consumers {}, bandwidthIn {}, bandwidthOut {})", - bundleName, overloadedRU.getResourceId(), stats.topics, stats.producerCount, - stats.consumerCount, stats.msgThroughputIn, stats.msgThroughputOut); + "Namespace bundle {} will be unloaded from overloaded broker {}, bundle stats (topics: {}, producers {}, " + + "consumers {}, bandwidthIn {}, bandwidthOut {})", + bundleName, overloadedRU.getResourceId(), stats.topics, stats.producerCount, + stats.consumerCount, stats.msgThroughputIn, stats.msgThroughputOut); namespaceBundlesToBeUnloaded.put(overloadedRU, bundleName); } else { log.info("Unable to shed load from broker {}, no brokers with enough capacity available " - + "for re-balancing {}", overloadedRU.getResourceId(), bundleName); + + "for re-balancing {}", overloadedRU.getResourceId(), bundleName); } break; } @@ -1342,8 +1343,8 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene long maxBundleBandwidth = pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * MBytes; log.info( - "Running namespace bundle split with thresholds: topics {}, sessions {}, msgRate {}, bandwidth {}, maxBundles {}", - maxBundleTopics, maxBundleSessions, maxBundleMsgRate, maxBundleBandwidth, maxBundleCount); + "Running namespace bundle split with thresholds: topics {}, sessions {}, msgRate {}, bandwidth {}, maxBundles {}", + maxBundleTopics, maxBundleSessions, maxBundleMsgRate, maxBundleBandwidth, maxBundleCount); if (this.lastLoadReport == null || this.lastLoadReport.getBundleStats() == null) { return; } @@ -1360,7 +1361,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene boolean needSplit = false; if (stats.topics > maxBundleTopics || totalSessions > maxBundleSessions || totalMsgRate > maxBundleMsgRate - || totalBandwidth > maxBundleBandwidth) { + || totalBandwidth > maxBundleBandwidth) { if (stats.topics <= 1) { log.info("Unable to split hot namespace bundle {} since there is only one topic.", bundleName); } else { @@ -1368,7 +1369,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene int numBundles = pulsar.getNamespaceService().getBundleCount(namespaceName); if (numBundles >= maxBundleCount) { log.info("Unable to split hot namespace bundle {} since the namespace has too many bundles.", - bundleName); + bundleName); } else { needSplit = true; } @@ -1378,13 +1379,13 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene if (needSplit) { if (this.getLoadBalancerAutoBundleSplitEnabled()) { log.info( - "Will split hot namespace bundle {}, topics {}, producers+consumers {}, msgRate in+out {}, bandwidth in+out {}", - bundleName, stats.topics, totalSessions, totalMsgRate, totalBandwidth); + "Will split hot namespace bundle {}, topics {}, producers+consumers {}, msgRate in+out {}, bandwidth in+out {}", + bundleName, stats.topics, totalSessions, totalMsgRate, totalBandwidth); bundlesToBeSplit.add(bundleName); } else { log.info( - "DRY RUN - split hot namespace bundle {}, topics {}, producers+consumers {}, msgRate in+out {}, bandwidth in+out {}", - bundleName, stats.topics, totalSessions, totalMsgRate, totalBandwidth); + "DRY RUN - split hot namespace bundle {}, topics {}, producers+consumers {}, msgRate in+out {}, bandwidth in+out {}", + bundleName, stats.topics, totalSessions, totalMsgRate, totalBandwidth); } } } @@ -1393,7 +1394,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene for (String bundleName : bundlesToBeSplit) { try { pulsar.getAdminClient().namespaces().splitNamespaceBundle( - getNamespaceNameFromBundleName(bundleName), getBundleRangeFromBundleName(bundleName)); + getNamespaceNameFromBundleName(bundleName), getBundleRangeFromBundleName(bundleName)); log.info("Successfully split namespace bundle {}", bundleName); } catch (Exception e) { log.error("Failed to split namespace bundle {}", bundleName, e); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/PulsarBrokerStarterTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/PulsarBrokerStarterTest.java index 27ab45081bd..bae5dd4f1d9 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/PulsarBrokerStarterTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/PulsarBrokerStarterTest.java @@ -168,7 +168,6 @@ public class PulsarBrokerStarterTest { printWriter.println("managedLedgerDefaultAckQuorum=1"); printWriter.println("loadBalancerEnabled=false"); - printWriter.println("loadBalancerHostUsageScriptPath=/usr/bin/my_pulsar-broker-host-usage"); printWriter.println("loadBalancerHostUsageCheckIntervalMinutes=4"); printWriter.println("loadBalancerReportUpdateThresholdPercentage=15"); printWriter.println("loadBalancerReportUpdateMaxIntervalMinutes=20"); @@ -186,7 +185,6 @@ public class PulsarBrokerStarterTest { Assert.assertTrue(ServiceConfiguration.class.isInstance(returnValue)); ServiceConfiguration serviceConfig = (ServiceConfiguration) returnValue; Assert.assertEquals(serviceConfig.isLoadBalancerEnabled(), false); - Assert.assertEquals(serviceConfig.getLoadBalancerHostUsageScriptPath(), "/usr/bin/my_pulsar-broker-host-usage"); Assert.assertEquals(serviceConfig.getLoadBalancerHostUsageCheckIntervalMinutes(), 4); Assert.assertEquals(serviceConfig.getLoadBalancerReportUpdateThresholdPercentage(), 15); Assert.assertEquals(serviceConfig.getLoadBalancerReportUpdateMaxIntervalMinutes(), 20); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/loadbalance/LoadBalancerTest.java index c0e173f070a..45f591c7e62 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/loadbalance/LoadBalancerTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/loadbalance/LoadBalancerTest.java @@ -211,9 +211,11 @@ public class LoadBalancerTest { printSortedRanking(sortedRanking); // all brokers have same rank to it would be 0 --> set-of-all-the-brokers - assertEquals(sortedRanking.get().size(), 1); - assertTrue(sortedRanking.get().get(0L) != null); - assertEquals(sortedRanking.get().get(0L).size(), BROKER_COUNT); + int brokerCount = 0; + for (Map.Entry> entry : sortedRanking.get().entrySet()) { + brokerCount += entry.getValue().size(); + } + assertEquals(brokerCount, BROKER_COUNT); DestinationName fqdn = DestinationName.get("persistent://pulsar/use/primary-ns/test-topic"); ResourceUnit found = pulsarServices[i].getLoadManager() .getLeastLoaded(pulsarServices[i].getNamespaceService().getBundle(fqdn)); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java index 68b644cd6ab..564837b41e7 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java @@ -27,6 +27,7 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import org.apache.commons.lang3.SystemUtils; import java.io.IOException; import java.lang.reflect.Field; import java.net.URL; @@ -41,6 +42,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import com.yahoo.pulsar.broker.loadbalance.impl.*; import org.apache.bookkeeper.test.PortManager; import org.apache.bookkeeper.util.ZkUtils; import org.apache.zookeeper.CreateMode; @@ -57,12 +59,6 @@ import com.google.common.collect.Sets; import com.yahoo.pulsar.broker.PulsarService; import com.yahoo.pulsar.broker.ServiceConfiguration; import com.yahoo.pulsar.broker.admin.AdminResource; -import com.yahoo.pulsar.broker.loadbalance.impl.PulsarLoadReportImpl; -import com.yahoo.pulsar.broker.loadbalance.impl.PulsarResourceDescription; -import com.yahoo.pulsar.broker.loadbalance.impl.ResourceAvailabilityRanker; -import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadCalculatorImpl; -import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; -import com.yahoo.pulsar.broker.loadbalance.impl.SimpleResourceUnit; import com.yahoo.pulsar.client.admin.BrokerStats; import com.yahoo.pulsar.client.admin.PulsarAdmin; import com.yahoo.pulsar.client.api.Authentication; @@ -443,14 +439,14 @@ public class SimpleLoadManagerImplTest { @Test public void testBrokerHostUsage() { - when(pulsar1.getConfiguration().getLoadBalancerHostUsageScriptPath()).thenReturn("usageScript"); - BrokerHostUsage brokerUsage = new BrokerHostUsage(pulsar1); - try { - brokerUsage.getBrokerHostUsage(); - fail(); - } catch (IOException e) { - // Ok + BrokerHostUsage brokerUsage; + if (SystemUtils.IS_OS_LINUX) { + brokerUsage = new LinuxBrokerHostUsageImpl(pulsar1); + } else { + brokerUsage = new GenericBrokerHostUsageImpl(pulsar1); } + brokerUsage.getBrokerHostUsage(); + // Ok } @Test diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf index e31a429e6a6..413160a5423 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf @@ -52,7 +52,6 @@ managedLedgerCursorRolloverTimeInSeconds=14400 loadBalancerEnabled=false loadBalancerReportUpdateThresholdPercentage=10 loadBalancerReportUpdateMaxIntervalMinutes=15 -loadBalancerHostUsageScriptPath=/usr/bin/pulsar-broker-host-usage loadBalancerHostUsageCheckIntervalMinutes=1 loadBalancerSheddingIntervalMinutes=30 loadBalancerSheddingGracePeriodMinutes=30 diff --git a/pulsar-broker/src/test/resources/test-script-pulsar-broker-host-usage b/pulsar-broker/src/test/resources/test-script-pulsar-broker-host-usage deleted file mode 100644 index a53466f290f..00000000000 --- a/pulsar-broker/src/test/resources/test-script-pulsar-broker-host-usage +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/bash - -# response to test the load-report API : BrokerHostUsage execs a script that returns system resource usage; -# Since the original script is system dependent and also requires /home/y location, this script is the dummy one -# returning a fake/mock response, so we can test the load-report API end-to-end including the path executing the script - -echo "{\"bandwidthIn\":{\"usage\": 0,\"limit\": 0},\"bandwidthOut\":{\"usage\": 0,\"limit\": 0},\"memory\":{\"usage\":3240,\"limit\": 12829},\"cpu\": {\"usage\": 10.0,\"limit\": 16}, \"threads\":{\"usage\":10,\"limit\":100}}" -- GitLab