diff --git a/distribution/benchmark/shutdown.sh b/distribution/benchmark/shutdown.sh new file mode 100644 index 0000000000000000000000000000000000000000..9ecd32600e3c08bc1743708f4edd83c4c2a9ce0e --- /dev/null +++ b/distribution/benchmark/shutdown.sh @@ -0,0 +1,63 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +case $1 in + producer) + + pid=`ps ax | grep -i 'org.apache.rocketmq.example.benchmark.Producer' |grep java | grep -v grep | awk '{print $1}'` + if [ -z "$pid" ] ; then + echo "No benchmark producer running." + exit -1; + fi + + echo "The benchmkar producer(${pid}) is running..." + + kill ${pid} + + echo "Send shutdown request to benchmark producer(${pid}) OK" + ;; + consumer) + + pid=`ps ax | grep -i 'org.apache.rocketmq.example.benchmark.Consumer' |grep java | grep -v grep | awk '{print $1}'` + if [ -z "$pid" ] ; then + echo "No benchmark consumer running." + exit -1; + fi + + echo "The benchmark consumer(${pid}) is running..." + + kill ${pid} + + echo "Send shutdown request to benchmark consumer(${pid}) OK" + ;; + tproducer) + + pid=`ps ax | grep -i 'org.apache.rocketmq.example.benchmark.TransactionProducer' |grep java | grep -v grep | awk '{print $1}'` + if [ -z "$pid" ] ; then + echo "No benchmark transaction producer running." + exit -1; + fi + + echo "The benchmkar transaction producer(${pid}) is running..." + + kill ${pid} + + echo "Send shutdown request to benchmark transaction producer(${pid}) OK" + ;; + *) + echo "Useage: shutdown producer | consumer | tproducer" +esac diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java index c0a2a8b53a6ef34df63273bfb090c864bac14999..b6c6ae410f61afc4c9dd249ec5b56a0e74068c10 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java @@ -52,6 +52,7 @@ public class Consumer { } final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest"; + final int threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 20; final String groupPrefix = commandLine.hasOption('g') ? commandLine.getOptionValue('g').trim() : "benchmark_consumer"; final String isSuffixEnable = commandLine.hasOption('p') ? commandLine.getOptionValue('p').trim() : "true"; final String filterType = commandLine.hasOption('f') ? commandLine.getOptionValue('f').trim() : null; @@ -65,8 +66,8 @@ public class Consumer { group = groupPrefix + "_" + (System.currentTimeMillis() % 100); } - System.out.printf("topic: %s, group: %s, suffix: %s, filterType: %s, expression: %s, msgTraceEnable: %s, aclEnable: %s%n", - topic, group, isSuffixEnable, filterType, expression, msgTraceEnable, aclEnable); + System.out.printf("topic: %s, threadCount %d, group: %s, suffix: %s, filterType: %s, expression: %s, msgTraceEnable: %s, aclEnable: %s%n", + topic, threadCount, group, isSuffixEnable, filterType, expression, msgTraceEnable, aclEnable); final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer(); @@ -101,8 +102,8 @@ public class Consumer { statsBenchmarkConsumer.getBorn2ConsumerMaxRT().set(0); statsBenchmarkConsumer.getStore2ConsumerMaxRT().set(0); - System.out.printf("TPS: %d FAIL: %d AVG(B2C) RT: %7.3f AVG(S2C) RT: %7.3f MAX(B2C) RT: %d MAX(S2C) RT: %d%n", - consumeTps, failCount, averageB2CRT, averageS2CRT, b2cMax, s2cMax + System.out.printf("Current Time: %s TPS: %d FAIL: %d AVG(B2C) RT(ms): %7.3f AVG(S2C) RT(ms): %7.3f MAX(B2C) RT(ms): %d MAX(S2C) RT(ms): %d%n", + System.currentTimeMillis(), consumeTps, failCount, averageB2CRT, averageS2CRT, b2cMax, s2cMax ); } } @@ -123,6 +124,8 @@ public class Consumer { String ns = commandLine.getOptionValue('n'); consumer.setNamesrvAddr(ns); } + consumer.setConsumeThreadMin(threadCount); + consumer.setConsumeThreadMax(threadCount); consumer.setInstanceName(Long.toString(System.currentTimeMillis())); if (filterType == null || expression == null) { @@ -179,6 +182,10 @@ public class Consumer { opt.setRequired(false); options.addOption(opt); + opt = new Option("w", "threadCount", true, "Thread count, Default: 20"); + opt.setRequired(false); + options.addOption(opt); + opt = new Option("g", "group", true, "Consumer group name, Default: benchmark_consumer"); opt.setRequired(false); options.addOption(opt); diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java index dbad169221aac6c5c1e0dfdd22fe3fc7014284ef..93271cbb20547b1ee00d37a3cce2dd482f0ec986 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java @@ -89,8 +89,8 @@ public class Producer { final long sendTps = (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L); final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]); - System.out.printf("Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d%n", - sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]); + System.out.printf("Current Time: %s Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n", + System.currentTimeMillis(), sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]); } } diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java index 951b718d1ed925506dbd788605f87fa9390fba50..85af04eabb6c9a9a91b50b6b8988e9b38ecad096 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java @@ -105,8 +105,8 @@ public class TransactionProducer { final long dupCheck = end.duplicatedCheck - begin.duplicatedCheck; System.out.printf( - "Send TPS:%5d Max RT:%5d AVG RT:%3.1f Send Failed: %d check: %d unexpectedCheck: %d duplicatedCheck: %d %n", - sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, failCount, checkCount, + "Current Time: %s Send TPS:%5d Max RT(ms):%5d AVG RT(ms):%3.1f Send Failed: %d check: %d unexpectedCheck: %d duplicatedCheck: %d %n", + System.currentTimeMillis(), sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, failCount, checkCount, unexpectedCheck, dupCheck); statsBenchmark.getSendMessageMaxRT().set(0); }