提交 b38da884 编写于 作者: B bobbeyreese 提交者: Matteo Merli

Improve Simulation Tools and Add Documentation (#328)

* Add documentation, improve simulation tools.

* Add global summary for monitor

* monitor -> monitor-brokers

* Don't modify performance producer/consumer

* Use property/cluster instead of cluster/property for simulation

* Fix regex
上级 8080e88b
......@@ -73,12 +73,11 @@ pulsar_help() {
cat <<EOF
Usage: pulsar <command>
where command is one of:
produce Run a producer
consume Run a consumer
monitor-simple-load-manager Continuously receive broker data when using SimpleLoadManagerImpl
monitor-modular-load-manager Continuously receive broker data when using ModularLoadManagerImpl
simulation-client Run a simulation server acting as a Pulsar client
simulation-controller Run a simulation controller to give commands to servers
produce Run a producer
consume Run a consumer
monitor-brokers Continuously receive broker data and/or load reports
simulation-client Run a simulation server acting as a Pulsar client
simulation-controller Run a simulation controller to give commands to servers
help This help message
......@@ -142,10 +141,8 @@ if [ "$COMMAND" == "produce" ]; then
exec $JAVA $OPTS com.yahoo.pulsar.testclient.PerformanceProducer --conf-file $PULSAR_PERFTEST_CONF "$@"
elif [ "$COMMAND" == "consume" ]; then
exec $JAVA $OPTS com.yahoo.pulsar.testclient.PerformanceConsumer --conf-file $PULSAR_PERFTEST_CONF "$@"
elif [ "$COMMAND" == "monitor-simple-load-manager" ]; then
exec $JAVA $OPTS com.yahoo.pulsar.testclient.SimpleLoadManagerBrokerMonitor "$@"
elif [ "$COMMAND" == "monitor-modular-load-manager" ]; then
exec $JAVA $OPTS com.yahoo.pulsar.testclient.ModularLoadManagerBrokerMonitor "$@"
elif [ "$COMMAND" == "monitor-brokers" ]; then
exec $JAVA $OPTS com.yahoo.pulsar.testclient.BrokerMonitor "$@"
elif [ "$COMMAND" == "simulation-client" ]; then
exec $JAVA $OPTS com.yahoo.pulsar.testclient.LoadSimulationClient "$@"
elif [ "$COMMAND" == "simulation-controller" ]; then
......
......@@ -14,6 +14,7 @@
- [Apache Storm adaptor](PulsarStorm.md)
- [Spark Streaming Pulsar Receiver](PulsarSpark.md)
- [Modular Load Manager](ModularLoadManager.md)
- [Simulation Tools](Simulation.md)
* Internal Docs
- [Binary protocol specification](BinaryProtocol.md)
* Other Languages
......
# Simulation Tools
It is sometimes necessary create an test environment and incur artificial load to observe how well load managers
handle the load. The load simulation controller, the load simulation client, and the broker monitor were created as an
effort to make create this load and observe the effects on the managers more easily.
## Simulation Client
The simulation client is a machine which will create and subscribe to topics with configurable message rates and sizes.
Because it is sometimes necessary in simulating large load to use multiple client machines, the user does not interact
with the simulation client directly, but instead delegates their requests to the simulation controller, which will then
send signals to clients to start incurring load. The client implementation is in the class
`com.yahoo.pulsar.testclient.LoadSimulationClient`.
### Usage
To Start a simulation client, use the `pulsar-perf` script with the command `simulation-client` as follows:
```
pulsar-perf simulation-client --port <listen port> --service-url <pulsar service url>
```
The client will then be ready to receive controller commands.
## Simulation Controller
The simulation controller send signals to the simulation clients, requesting them to create new topics, stop old
topics, change the load incurred by topics, as well as several other tasks. It is implemented in the class
`com.yahoo.pulsar.testclient.LoadSimulationController` and presents a shell to the user as an interface to send
command with.
### Usage
To start a simulation controller, use the `pulsar-perf` script with the command `simulation-controller` as follows:
```
pulsar-perf simulation-controller --cluster <cluster to simulate on> --client-port <listen port for clients>
--clients <comma-separated list of client host names>
```
The clients should already be started before the controller is started. You will then be presented with a simple prompt,
where you can issue commands to simulation clients. Arguments often refer to tenant names, namespace names, and topic
names. In all cases, the BASE name of the tenants, namespaces, and topics are used. For example, for the topic
`persistent://my_cluster/my_tenant/my_namespace/my_topic`, the tenant name is `my_tenant`, the namespace name is
`my_namespace`, and the topic name is `my_topic`. The controller can perform the following actions:
* Create a topic with a producer and a consumer
* `trade <tenant> <namespace> <topic> [--rate <message rate per second>]
[--rand-rate <lower bound>,<upper bound>]
[--size <message size in bytes>]`
* Create a group of topics with a producer and a consumer
* `trade_group <tenant> <group> <num_namespaces> [--rate <message rate per second>]
[--rand-rate <lower bound>,<upper bound>]
[--separation <separation between creating topics in ms>] [--size <message size in bytes>]
[--topics-per-namespace <number of topics to create per namespace>]`
* Change the configuration of an existing topic
* `change <tenant> <namespace> <topic> [--rate <message rate per second>]
[--rand-rate <lower bound>,<upper bound>]
[--size <message size in bytes>]`
* Change the configuration of a group of topics
* `change_group <tenant> <group> [--rate <message rate per second>] [--rand-rate <lower bound>,<upper bound>]
[--size <message size in bytes>] [--topics-per-namespace <number of topics to create per namespace>]`
* Shutdown a previously created topic
* `stop <tenant> <namespace> <topic>`
* Shutdown a previously created group of topics
* `stop_group <tenant> <group>`
* Copy the historical data from one ZooKeeper to another and simulate based on the message rates and sizes in that
history
* `copy <tenant> <source zookeeper> <target zookeeper> [--rate-multiplier value]`
* Simulate the load of the historical data on the current ZooKeeper (should be same ZooKeeper being simulated on)
* `simulate <tenant> <zookeeper> [--rate-multiplier value]`
* Stream the latest data from the given active ZooKeeper to simulate the real-time load of that ZooKeeper.
* `stream <tenant> <zookeeper> [--rate-multiplier value]`
The "group" arguments in these commands allow the user to create or affect multiple topics at once. Groups are created
when calling the `trade_group` command, and all topics from these groups may be subsequently modified or stopped
with the `change_group` and `stop_group` commands respectively. All ZooKeeper arguments are of the form
`zookeeper_host:port`.
#### Difference Between Copy, Simulate, and Stream
The commands `copy`, `simulate`, and `stream` are very similar but have significant differences. `copy` is used when
you want to simulate the load of a static, external ZooKeeper on the ZooKeeper you are simulating on. Thus,
`source zookeeper` should be the ZooKeeper you want to copy and `target zookeeper` should be the ZooKeeper you are
simulating on, and then it will get the full benefit of the historical data of the source in both load manager
implementations. `simulate` on the other hand takes in only one ZooKeeper, the one you are simulating on. It assumes
that you are simulating on a ZooKeeper that has historical data for `SimpleLoadManagerImpl` and creates equivalent
historical data for `ModularLoadManagerImpl`. Then, the load according to the historical data is simulated by the
clients. Finally, `stream` takes in an active ZooKeeper different than the ZooKeeper being simulated on and streams
load data from it and simulates the real-time load. In all cases, the optional `rate-multiplier` argument allows the
user to simulate some proportion of the load. For instance, using `--rate-multiplier 0.05` will cause messages to
be sent at only `5%` of the rate of the load that is being simulated.
## Broker Monitor
To observe the behavior of the load manager in these simulations, one may utilize the broker monitor, which is
implemented in `com.yahoo.pulsar.testclient.BrokerMonitor`. The broker monitor will print tabular load data to the
console as it is updated using watchers.
### Usage
To start a broker monitor, use the `monitor-brokers` command in the `pulsar-perf` script:
```
pulsar-perf monitor-brokers --connect-string <zookeeper host:port>
```
The console will then continuously print load data until it is interrupted.
......@@ -17,6 +17,9 @@ package com.yahoo.pulsar.broker.loadbalance;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.yahoo.pulsar.broker.PulsarServerException;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.ServiceConfiguration;
......@@ -25,8 +28,6 @@ import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import com.yahoo.pulsar.broker.stats.Metrics;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* LoadManager runs though set of load reports collected from different brokers and generates a recommendation of
......
......@@ -15,12 +15,7 @@
*/
package com.yahoo.pulsar.broker.loadbalance.impl;
import com.yahoo.pulsar.broker.loadbalance.BrokerHostUsage;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
import java.net.MalformedURLException;
......@@ -28,7 +23,13 @@ import java.net.URL;
import java.util.HashSet;
import java.util.Set;
import static com.google.common.base.Preconditions.checkArgument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.yahoo.pulsar.broker.loadbalance.BrokerHostUsage;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
/**
* This class contains code which in shared between the two load manager implementations.
......
......@@ -15,10 +15,7 @@
*/
package com.yahoo.pulsar.broker.loadbalance.impl;
import static com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
......@@ -31,9 +28,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.yahoo.pulsar.broker.TimeAverageMessageData;
import com.yahoo.pulsar.broker.loadbalance.LoadManager;
import com.yahoo.pulsar.common.policies.data.ResourceQuota;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
......@@ -57,14 +51,17 @@ import com.yahoo.pulsar.broker.PulsarServerException;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.TimeAverageBrokerData;
import com.yahoo.pulsar.broker.TimeAverageMessageData;
import com.yahoo.pulsar.broker.loadbalance.BrokerFilter;
import com.yahoo.pulsar.broker.loadbalance.BrokerHostUsage;
import com.yahoo.pulsar.broker.loadbalance.LoadData;
import com.yahoo.pulsar.broker.loadbalance.LoadManager;
import com.yahoo.pulsar.broker.loadbalance.LoadSheddingStrategy;
import com.yahoo.pulsar.broker.loadbalance.ModularLoadManager;
import com.yahoo.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
import com.yahoo.pulsar.client.admin.PulsarAdmin;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.ResourceQuota;
import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
......
......@@ -15,11 +15,9 @@
*/
package com.yahoo.pulsar.broker.loadbalance.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.time.LocalDateTime;
import java.util.ArrayList;
......
......@@ -30,6 +30,8 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.commons.lang.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
......@@ -49,8 +51,6 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* LoadSimulationClient is used to simulate client load by maintaining producers and consumers for topics. Instances of
......@@ -59,22 +59,24 @@ import org.slf4j.LoggerFactory;
public class LoadSimulationClient {
private final static Logger log = LoggerFactory.getLogger(LoadSimulationClient.class);
// Values for command responses.
public static final byte FOUND_TOPIC = 0;
public static final byte NO_SUCH_TOPIC = 1;
public static final byte REDUNDANT_COMMAND = 2;
// Values for command encodings.
public static final byte CHANGE_COMMAND = 0;
public static final byte STOP_COMMAND = 1;
public static final byte TRADE_COMMAND = 2;
public static final byte CHANGE_GROUP_COMMAND = 3;
public static final byte STOP_GROUP_COMMAND = 4;
public static final byte FIND_COMMAND = 5;
private final ExecutorService executor;
// Map from a message size to a cached byte[] of that size.
private final Map<Integer, byte[]> payloadCache;
// Map from a full topic name to the TradeUnit created for that topic.
private final Map<String, TradeUnit> topicsToTradeUnits;
// Pulsar client to create producers and consumers with.
private final PulsarClient client;
private final ProducerConfiguration producerConf;
private final ConsumerConfiguration consumerConf;
private final ClientConfiguration clientConf;
......@@ -100,8 +102,8 @@ public class LoadSimulationClient {
final Map<Integer, byte[]> payloadCache;
public TradeUnit(final TradeConfiguration tradeConf, final PulsarClient client,
final ProducerConfiguration producerConf, final ConsumerConfiguration consumerConf,
final Map<Integer, byte[]> payloadCache) throws Exception {
final ProducerConfiguration producerConf, final ConsumerConfiguration consumerConf,
final Map<Integer, byte[]> payloadCache) throws Exception {
consumerFuture = client.subscribeAsync(tradeConf.topic, "Subscriber-" + tradeConf.topic, consumerConf);
producerFuture = client.createProducerAsync(tradeConf.topic, producerConf);
this.payload = new AtomicReference<>();
......@@ -227,95 +229,86 @@ public class LoadSimulationClient {
final TradeConfiguration tradeConf = new TradeConfiguration();
tradeConf.command = command;
switch (command) {
case CHANGE_COMMAND:
// Change the topic's settings if it exists. Report whether the
// topic was found on this server.
decodeProducerOptions(tradeConf, inputStream);
if (topicsToTradeUnits.containsKey(tradeConf.topic)) {
topicsToTradeUnits.get(tradeConf.topic).change(tradeConf);
outputStream.write(FOUND_TOPIC);
} else {
outputStream.write(NO_SUCH_TOPIC);
}
break;
case STOP_COMMAND:
// Stop the topic if it exists. Report whether the topic was found,
// and whether it was already stopped.
tradeConf.topic = inputStream.readUTF();
if (topicsToTradeUnits.containsKey(tradeConf.topic)) {
final boolean wasStopped = topicsToTradeUnits.get(tradeConf.topic).stop.getAndSet(true);
outputStream.write(wasStopped ? REDUNDANT_COMMAND : FOUND_TOPIC);
} else {
outputStream.write(NO_SUCH_TOPIC);
}
break;
case TRADE_COMMAND:
// Create the topic. It is assumed that the topic does not already
// exist.
decodeProducerOptions(tradeConf, inputStream);
final TradeUnit tradeUnit = new TradeUnit(tradeConf, client, producerConf, consumerConf, payloadCache);
topicsToTradeUnits.put(tradeConf.topic, tradeUnit);
executor.submit(() -> {
try {
tradeUnit.start();
} catch (Exception ex) {
throw new RuntimeException(ex);
case CHANGE_COMMAND:
// Change the topic's settings if it exists.
decodeProducerOptions(tradeConf, inputStream);
if (topicsToTradeUnits.containsKey(tradeConf.topic)) {
topicsToTradeUnits.get(tradeConf.topic).change(tradeConf);
}
});
// Tell controller topic creation is finished.
outputStream.write(NO_SUCH_TOPIC);
break;
case CHANGE_GROUP_COMMAND:
// Change the settings of all topics belonging to a group. Report
// the number of topics changed.
decodeGroupOptions(tradeConf, inputStream);
tradeConf.size = inputStream.readInt();
tradeConf.rate = inputStream.readDouble();
// See if a topic belongs to this tenant and group using this regex.
final String groupRegex = ".*://.*/" + tradeConf.tenant + "/" + tradeConf.group + "-.*/.*";
int numFound = 0;
for (Map.Entry<String, TradeUnit> entry : topicsToTradeUnits.entrySet()) {
final String destination = entry.getKey();
final TradeUnit unit = entry.getValue();
if (destination.matches(groupRegex)) {
++numFound;
unit.change(tradeConf);
break;
case STOP_COMMAND:
// Stop the topic if it exists.
tradeConf.topic = inputStream.readUTF();
if (topicsToTradeUnits.containsKey(tradeConf.topic)) {
topicsToTradeUnits.get(tradeConf.topic).stop.set(true);
}
}
outputStream.writeInt(numFound);
break;
case STOP_GROUP_COMMAND:
// Stop all topics belonging to a group. Report the number of topics
// stopped.
decodeGroupOptions(tradeConf, inputStream);
// See if a topic belongs to this tenant and group using this regex.
final String regex = ".*://.*/" + tradeConf.tenant + "/" + tradeConf.group + "-.*/.*";
int numStopped = 0;
for (Map.Entry<String, TradeUnit> entry : topicsToTradeUnits.entrySet()) {
final String destination = entry.getKey();
final TradeUnit unit = entry.getValue();
if (destination.matches(regex) && !unit.stop.getAndSet(true)) {
++numStopped;
break;
case TRADE_COMMAND:
// Create the topic. It is assumed that the topic does not already exist.
decodeProducerOptions(tradeConf, inputStream);
final TradeUnit tradeUnit = new TradeUnit(tradeConf, client, producerConf, consumerConf, payloadCache);
topicsToTradeUnits.put(tradeConf.topic, tradeUnit);
executor.submit(() -> {
try {
tradeUnit.start();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
});
break;
case CHANGE_GROUP_COMMAND:
// Change the settings of all topics belonging to a group.
decodeGroupOptions(tradeConf, inputStream);
tradeConf.size = inputStream.readInt();
tradeConf.rate = inputStream.readDouble();
// See if a topic belongs to this tenant and group using this regex.
final String groupRegex = ".*://" + tradeConf.tenant + "/.*/" + tradeConf.group + "-.*/.*";
for (Map.Entry<String, TradeUnit> entry : topicsToTradeUnits.entrySet()) {
final String destination = entry.getKey();
final TradeUnit unit = entry.getValue();
if (destination.matches(groupRegex)) {
unit.change(tradeConf);
}
}
}
outputStream.writeInt(numStopped);
break;
default:
throw new IllegalArgumentException("Unrecognized command code received: " + command);
break;
case STOP_GROUP_COMMAND:
// Stop all topics belonging to a group.
decodeGroupOptions(tradeConf, inputStream);
// See if a topic belongs to this tenant and group using this regex.
final String regex = ".*://" + tradeConf.tenant + "/.*/" + tradeConf.group + "-.*/.*";
for (Map.Entry<String, TradeUnit> entry : topicsToTradeUnits.entrySet()) {
final String destination = entry.getKey();
final TradeUnit unit = entry.getValue();
if (destination.matches(regex)) {
unit.stop.set(true);
}
}
break;
case FIND_COMMAND:
// Write a single boolean indicating if the topic was found.
outputStream.writeBoolean(topicsToTradeUnits.containsKey(inputStream.readUTF()));
outputStream.flush();
break;
default:
throw new IllegalArgumentException("Unrecognized command code received: " + command);
}
outputStream.flush();
}
// Make listener as lightweight as possible.
private static final MessageListener ackListener = Consumer::acknowledgeAsync;
/**
* Create a LoadSimulationClient with the given JCommander arguments.
* @param arguments Arguments to configure this from.
*/
public LoadSimulationClient(final MainArguments arguments) throws Exception {
payloadCache = new ConcurrentHashMap<>();
topicsToTradeUnits = new ConcurrentHashMap<>();
final EventLoopGroup eventLoopGroup = SystemUtils.IS_OS_LINUX
? new EpollEventLoopGroup(Runtime.getRuntime().availableProcessors(),
new DefaultThreadFactory("pulsar-test-client"))
new DefaultThreadFactory("pulsar-test-client"))
: new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(),
new DefaultThreadFactory("pulsar-test-client"));
new DefaultThreadFactory("pulsar-test-client"));
clientConf = new ClientConfiguration();
clientConf.setConnectionsPerBroker(4);
......@@ -340,6 +333,10 @@ public class LoadSimulationClient {
executor = Executors.newCachedThreadPool(new DefaultThreadFactory("test-client"));
}
/**
* Start a client with command line arguments.
* @param args Command line arguments to pass in.
*/
public static void main(String[] args) throws Exception {
final MainArguments mainArguments = new MainArguments();
final JCommander jc = new JCommander(mainArguments);
......@@ -352,6 +349,9 @@ public class LoadSimulationClient {
(new LoadSimulationClient(mainArguments)).run();
}
/**
* Start listening for controller commands to create producers and consumers.
*/
public void run() throws Exception {
final ServerSocket serverSocket = new ServerSocket(port);
......@@ -361,7 +361,7 @@ public class LoadSimulationClient {
// has not been tested or considered and is not recommended.
log.info("Listening for controller command...");
final Socket socket = serverSocket.accept();
log.info("Connected to {}\n", socket.getInetAddress().getHostName());
log.info("Connected to {}", socket.getInetAddress().getHostName());
executor.submit(() -> {
try {
handle(socket);
......
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.testclient;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.google.gson.Gson;
import com.yahoo.pulsar.broker.LocalBrokerData;
import com.yahoo.pulsar.broker.TimeAverageBrokerData;
import com.yahoo.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ModularLoadManagerBrokerMonitor {
private static final Logger log = LoggerFactory.getLogger(ModularLoadManagerImpl.class);
private static final String BROKER_ROOT = "/loadbalance/brokers";
private static final int ZOOKEEPER_TIMEOUT_MILLIS = 5000;
private final ZooKeeper zkClient;
private static final Gson gson = new Gson();
private static class BrokerWatcher implements Watcher {
public final ZooKeeper zkClient;
public Set<String> brokers;
public BrokerWatcher(final ZooKeeper zkClient) {
this.zkClient = zkClient;
this.brokers = Collections.emptySet();
}
public synchronized void process(final WatchedEvent event) {
try {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
updateBrokers(event.getPath());
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
public synchronized void updateBrokers(final String path) {
final Set<String> newBrokers = new HashSet<>();
try {
newBrokers.addAll(zkClient.getChildren(path, this));
} catch (Exception ex) {
throw new RuntimeException(ex);
}
for (String oldBroker : brokers) {
if (!newBrokers.contains(oldBroker)) {
log.info("Lost broker: " + oldBroker);
}
}
for (String newBroker : newBrokers) {
if (!brokers.contains(newBroker)) {
log.info("Gained broker: " + newBroker);
final BrokerDataWatcher brokerDataWatcher = new BrokerDataWatcher(zkClient);
brokerDataWatcher.printBrokerData(path + "/" + newBroker);
}
}
this.brokers = newBrokers;
}
}
private static class BrokerDataWatcher implements Watcher {
private final ZooKeeper zkClient;
public BrokerDataWatcher(final ZooKeeper zkClient) {
this.zkClient = zkClient;
}
public static String brokerNameFromPath(final String path) {
return path.substring(path.lastIndexOf('/') + 1);
}
public synchronized void process(final WatchedEvent event) {
try {
if (event.getType() == Event.EventType.NodeDataChanged) {
final String broker = brokerNameFromPath(event.getPath());
printBrokerData(event.getPath());
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
private static void printMessageData(final double msgThroughputIn, final double msgThroughputOut,
final double msgRateIn, final double msgRateOut) {
log.info(String.format("Message Throughput In: %.2f KB/s", msgThroughputIn / 1024));
log.info(String.format("Message Throughput Out: %.2f KB/s", msgThroughputOut / 1024));
log.info(String.format("Message Rate In: %.2f msgs/s", msgRateIn));
log.info(String.format("Message Rate Out: %.2f msgs/s", msgRateOut));
}
public synchronized void printBrokerData(final String brokerPath) {
final String broker = brokerNameFromPath(brokerPath);
final String timeAveragePath = ModularLoadManagerImpl.TIME_AVERAGE_BROKER_ZPATH + "/" + broker;
LocalBrokerData localBrokerData;
try {
localBrokerData = gson.fromJson(new String(zkClient.getData(brokerPath, this, null)),
LocalBrokerData.class);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
log.info("Broker Data for " + broker + ":");
log.info("---------------");
log.info("Num Topics: " + localBrokerData.getNumTopics());
log.info("Num Bundles: " + localBrokerData.getNumBundles());
log.info("Num Consumers: " + localBrokerData.getNumConsumers());
log.info("Num Producers: " + localBrokerData.getNumProducers());
log.info(String.format("CPU: %.2f%%", localBrokerData.getCpu().percentUsage()));
log.info(String.format("Memory: %.2f%%", localBrokerData.getMemory().percentUsage()));
log.info(String.format("Direct Memory: %.2f%%", localBrokerData.getDirectMemory().percentUsage()));
log.info("Latest Data:");
printMessageData(localBrokerData.getMsgThroughputIn(), localBrokerData.getMsgThroughputOut(),
localBrokerData.getMsgRateIn(), localBrokerData.getMsgRateOut());
TimeAverageBrokerData timeAverageData;
try {
timeAverageData = gson.fromJson(new String(zkClient.getData(timeAveragePath, null, null)),
TimeAverageBrokerData.class);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
log.info("Short Term Data:");
printMessageData(timeAverageData.getShortTermMsgThroughputIn(),
timeAverageData.getShortTermMsgThroughputOut(), timeAverageData.getShortTermMsgRateIn(),
timeAverageData.getShortTermMsgRateOut());
log.info("Long Term Data:");
printMessageData(timeAverageData.getLongTermMsgThroughputIn(),
timeAverageData.getLongTermMsgThroughputOut(), timeAverageData.getLongTermMsgRateIn(),
timeAverageData.getLongTermMsgRateOut());
if (!localBrokerData.getLastBundleGains().isEmpty()) {
for (String bundle : localBrokerData.getLastBundleGains()) {
log.info("Gained Bundle: " + bundle);
}
}
if (!localBrokerData.getLastBundleLosses().isEmpty()) {
for (String bundle : localBrokerData.getLastBundleLosses()) {
log.info("Lost Bundle: " + bundle);
}
}
}
}
static class Arguments {
@Parameter(names = { "--connect-string" }, description = "Zookeeper connect string", required = true)
public String connectString = null;
}
public ModularLoadManagerBrokerMonitor(final ZooKeeper zkClient) {
this.zkClient = zkClient;
}
private void start() {
try {
final BrokerWatcher brokerWatcher = new BrokerWatcher(zkClient);
brokerWatcher.updateBrokers(BROKER_ROOT);
while (true) {
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
public static void main(String[] args) {
try {
final Arguments arguments = new Arguments();
final JCommander jc = new JCommander(arguments);
jc.parse(args);
final ZooKeeper zkClient = new ZooKeeper(arguments.connectString, ZOOKEEPER_TIMEOUT_MILLIS, null);
final ModularLoadManagerBrokerMonitor monitor = new ModularLoadManagerBrokerMonitor(zkClient);
monitor.start();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.testclient;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.google.gson.Gson;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
/**
* To use the monitor, simply start one via pulsar-perf monitor --connect-string <zk hostname>:<zk port> You will then
* receive updates in LoadReports as they occur.
*/
public class SimpleLoadManagerBrokerMonitor {
private static final Logger log = LoggerFactory.getLogger(SimpleLoadManagerBrokerMonitor.class);
private static final String BROKER_ROOT = "/loadbalance/brokers";
private static final int ZOOKEEPER_TIMEOUT_MILLIS = 5000;
private final ZooKeeper zkClient;
private static final Gson gson = new Gson();
private static class BrokerWatcher implements Watcher {
public final ZooKeeper zkClient;
public Set<String> brokers;
public BrokerWatcher(final ZooKeeper zkClient) {
this.zkClient = zkClient;
this.brokers = Collections.emptySet();
}
public synchronized void process(final WatchedEvent event) {
try {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
updateBrokers(event.getPath());
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
public synchronized void updateBrokers(final String path) {
final Set<String> newBrokers = new HashSet<>();
try {
newBrokers.addAll(zkClient.getChildren(path, this));
} catch (Exception ex) {
throw new RuntimeException(ex);
}
for (String oldBroker : brokers) {
if (!newBrokers.contains(oldBroker)) {
log.info("Lost broker: " + oldBroker);
}
}
for (String newBroker : newBrokers) {
if (!brokers.contains(newBroker)) {
log.info("Gained broker: " + newBroker);
final LoadReportWatcher loadReportWatcher = new LoadReportWatcher(zkClient);
loadReportWatcher.printLoadReport(path + "/" + newBroker);
}
}
this.brokers = newBrokers;
}
}
private static class LoadReportWatcher implements Watcher {
private final ZooKeeper zkClient;
public LoadReportWatcher(final ZooKeeper zkClient) {
this.zkClient = zkClient;
}
public synchronized void process(final WatchedEvent event) {
try {
if (event.getType() == Event.EventType.NodeDataChanged) {
printLoadReport(event.getPath());
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
public synchronized void printLoadReport(final String path) {
final String brokerName = path.substring(path.lastIndexOf('/') + 1);
LoadReport loadReport;
try {
loadReport = gson.fromJson(new String(zkClient.getData(path, this, null)), LoadReport.class);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
final SystemResourceUsage resourceUsage = loadReport.getSystemResourceUsage();
log.info("Load Report for " + brokerName + ":");
log.info("---------------");
log.info("Num Topics: " + loadReport.getNumTopics());
log.info("Num Bundles: " + loadReport.getNumBundles());
log.info(String.format("Raw CPU: %.2f%%", resourceUsage.getCpu().percentUsage()));
log.info(String.format("Allocated CPU: %.2f%%",
percentUsage(loadReport.getAllocatedCPU(), resourceUsage.getCpu().limit)));
log.info(String.format("Preallocated CPU: %.2f%%",
percentUsage(loadReport.getPreAllocatedCPU(), resourceUsage.getCpu().limit)));
log.info(String.format("Raw Memory: %.2f%%", resourceUsage.getMemory().percentUsage()));
log.info(String.format("Allocated Memory: %.2f%%",
percentUsage(loadReport.getAllocatedMemory(), resourceUsage.getMemory().limit)));
log.info(String.format("Preallocated Memory: %.2f%%",
percentUsage(loadReport.getPreAllocatedMemory(), resourceUsage.getMemory().limit)));
log.info(String.format("Raw Bandwidth In: %.2f%%", resourceUsage.getBandwidthIn().percentUsage()));
log.info(String.format("Allocated Bandwidth In: %.2f%%",
percentUsage(loadReport.getAllocatedBandwidthIn(), resourceUsage.getBandwidthIn().limit)));
log.info(String.format("Preallocated Bandwidth In: %.2f%%",
percentUsage(loadReport.getPreAllocatedBandwidthIn(), resourceUsage.getBandwidthIn().limit)));
log.info(String.format("Raw Bandwidth Out: %.2f%%", resourceUsage.getBandwidthOut().percentUsage()));
log.info(String.format("Allocated Bandwidth Out: %.2f%%",
percentUsage(loadReport.getAllocatedBandwidthOut(), resourceUsage.getBandwidthOut().limit)));
log.info(String.format("Preallocated Bandwidth Out: %.2f%%",
percentUsage(loadReport.getPreAllocatedBandwidthOut(), resourceUsage.getBandwidthOut().limit)));
log.info(String.format("Direct Memory: %.2f%%", resourceUsage.getDirectMemory().percentUsage()));
log.info(String.format("Messages In Per Second: %.2f", loadReport.getMsgRateIn()));
log.info(String.format("Messages Out Per Second: %.2f", loadReport.getMsgRateOut()));
log.info(String.format("Preallocated Messages In Per Second: %.2f", loadReport.getPreAllocatedMsgRateIn()));
log.info(String.format("Preallocated Out Per Second: %.2f", loadReport.getPreAllocatedMsgRateOut()));
if (!loadReport.getBundleGains().isEmpty()) {
for (String bundle : loadReport.getBundleGains()) {
log.info("Gained Bundle: " + bundle);
}
}
if (!loadReport.getBundleLosses().isEmpty()) {
for (String bundle : loadReport.getBundleLosses()) {
log.info("Lost Bundle: " + bundle);
}
}
}
}
static class Arguments {
@Parameter(names = { "--connect-string" }, description = "Zookeeper connect string", required = true)
public String connectString = null;
}
public SimpleLoadManagerBrokerMonitor(final ZooKeeper zkClient) {
this.zkClient = zkClient;
}
private static double percentUsage(final double usage, final double limit) {
return limit > 0 && usage >= 0 ? 100 * Math.min(1, usage / limit) : 0;
}
private void start() {
try {
final BrokerWatcher brokerWatcher = new BrokerWatcher(zkClient);
brokerWatcher.updateBrokers(BROKER_ROOT);
while (true) {
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
public static void main(String[] args) {
try {
final Arguments arguments = new Arguments();
final JCommander jc = new JCommander(arguments);
jc.parse(args);
final ZooKeeper zkClient = new ZooKeeper(arguments.connectString, ZOOKEEPER_TIMEOUT_MILLIS, null);
final SimpleLoadManagerBrokerMonitor monitor = new SimpleLoadManagerBrokerMonitor(zkClient);
monitor.start();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}
package com.yahoo.pulsar.testclient.utils;
import java.util.Objects;
import java.util.function.Function;
/**
* Light-weight utility for creating rows where each column has a fixed length in a command-line setting.
*/
public class FixedColumnLengthTableMaker {
/**
* Character to duplicate to make the bottom border.
*/
public char bottomBorder = '=';
/**
* Format String to apply to decimal entries. If set to null, no special formatting is applied.
*/
public String decimalFormatter = null;
/**
* Length of table elements. Elements whose String representations exceed this length are trimmed down to this
* length.
*/
public int elementLength = 10;
/**
* The border to use to make the left side of the table.
*/
public String leftBorder = "||";
/**
* The amount of spacing to pad left of an element with.
*/
public int leftPadding = 0;
/**
* The border to use to make the right side of the table.
*/
public String rightBorder = "||";
/**
* The amount of spacing to pad right of an element with.
*/
public int rightPadding = 1;
/**
* The String to separate elements with.
*/
public String separator = "|";
/**
* Character to duplicate to make the top border.
*/
public char topBorder = '=';
/**
* If not null, lengthFunction should give the length for the given column index.
*/
public Function<Integer, Integer> lengthFunction = null;
// Helper function to add top and bottom borders.
private void addHorizontalBorder(final int length, final StringBuilder builder, final char borderChar) {
for (int i = 0; i < length; ++i) {
builder.append(borderChar);
}
}
// Helper function to pad with white space.
private void addSpace(final int amount, final StringBuilder builder) {
for (int i = 0; i < amount; ++i) {
builder.append(' ');
}
}
private int lengthFor(final int column) {
return lengthFunction == null ? elementLength : lengthFunction.apply(column);
}
/**
* Make a table using the specified settings.
*
* @param rows
* Rows to construct the table from.
* @return A String version of the table.
*/
public String make(final Object[][] rows) {
final StringBuilder builder = new StringBuilder();
int numColumns = 0;
for (final Object[] row : rows) {
// Take the largest number of columns out of any row to be the total.
numColumns = Math.max(numColumns, row.length);
}
// Total length of the table in characters.
int totalLength = numColumns * (leftPadding + rightPadding + separator.length()) - separator.length()
+ leftBorder.length() + rightBorder.length();
for (int i = 0; i < numColumns; ++i) {
totalLength += lengthFor(i);
}
addHorizontalBorder(totalLength, builder, topBorder);
builder.append('\n');
int i;
for (final Object[] row : rows) {
i = 0;
builder.append(leftBorder);
for (final Object element : row) {
addSpace(leftPadding, builder);
String elementString;
if ((element instanceof Float || element instanceof Double) && decimalFormatter != null) {
elementString = String.format(decimalFormatter, element);
} else {
// Avoid throwing NPE
elementString = Objects.toString(element, "");
}
if (elementString.length() > lengthFor(i)) {
// Trim down to the maximum number of characters.
elementString = elementString.substring(0, lengthFor(i));
}
builder.append(elementString);
// Add the space due to remaining characters and the right padding.
addSpace(lengthFor(i) - elementString.length() + rightPadding, builder);
if (i != numColumns - 1) {
// Don't add separator for the last column.
builder.append(separator);
}
i += 1;
}
// Put empty elements for remaining columns.
for (; i < numColumns; ++i) {
addSpace(leftPadding + rightPadding + lengthFor(i), builder);
if (i != numColumns - 1) {
builder.append(separator);
}
}
builder.append(rightBorder);
builder.append('\n');
}
addHorizontalBorder(totalLength, builder, bottomBorder);
return builder.toString();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册