diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java index d1fa5743ae52164cc096122009f5ee26a8fe5f0e..6975ab58b0d63532ceac8bc8534908dd44af7478 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java @@ -17,12 +17,14 @@ package org.apache.rocketmq.example.benchmark; import java.io.UnsupportedEncodingException; +import java.util.Arrays; import java.util.LinkedList; import java.util.Random; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.cli.CommandLine; @@ -58,9 +60,10 @@ public class Producer { final int tagCount = commandLine.hasOption('l') ? Integer.parseInt(commandLine.getOptionValue('l')) : 0; final boolean msgTraceEnable = commandLine.hasOption('m') && Boolean.parseBoolean(commandLine.getOptionValue('m')); final boolean aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a')); + final long messageNum = commandLine.hasOption('q') ? Long.parseLong(commandLine.getOptionValue('q')) : 0; - System.out.printf("topic: %s threadCount: %d messageSize: %d keyEnable: %s propertySize: %d tagCount: %d traceEnable: %s aclEnable: %s%n", - topic, threadCount, messageSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable); + System.out.printf("topic: %s threadCount: %d messageSize: %d keyEnable: %s propertySize: %d tagCount: %d traceEnable: %s aclEnable: %s messageQuantity: %d%n", + topic, threadCount, messageSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, messageNum); final InternalLogger log = ClientLogger.getLog(); @@ -72,6 +75,16 @@ public class Producer { final LinkedList snapshotList = new LinkedList(); + final long[] msgNums = new long[threadCount]; + + if (messageNum > 0) { + Arrays.fill(msgNums, messageNum / threadCount); + long mod = messageNum % threadCount; + if (mod > 0) { + msgNums[0] += mod; + } + } + timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { @@ -85,14 +98,7 @@ public class Producer { timer.scheduleAtFixedRate(new TimerTask() { private void printStats() { if (snapshotList.size() >= 10) { - Long[] begin = snapshotList.getFirst(); - Long[] end = snapshotList.getLast(); - - final long sendTps = (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L); - final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]); - - System.out.printf("Current Time: %s Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n", - System.currentTimeMillis(), sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]); + doPrintStats(snapshotList, statsBenchmark, false); } } @@ -120,9 +126,14 @@ public class Producer { producer.start(); for (int i = 0; i < threadCount; i++) { + final long msgNumLimit = msgNums[i]; + if (messageNum > 0 && msgNumLimit == 0) { + break; + } sendThreadPool.execute(new Runnable() { @Override public void run() { + int num = 0; while (true) { try { final Message msg; @@ -198,10 +209,28 @@ public class Producer { } catch (InterruptedException ignored) { } } + if (messageNum > 0 && ++num >= msgNumLimit) { + break; + } } } }); } + try { + sendThreadPool.shutdown(); + sendThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); + timer.cancel(); + if (snapshotList.size() > 1) { + doPrintStats(snapshotList, statsBenchmark, true); + } else { + System.out.printf("[Complete] Send Total: %d Send Failed: %d Response Failed: %d%n", + statsBenchmark.getSendRequestSuccessCount().get() + statsBenchmark.getSendRequestFailedCount().get(), + statsBenchmark.getSendRequestFailedCount().get(), statsBenchmark.getReceiveResponseFailedCount().get()); + } + producer.shutdown(); + } catch (InterruptedException e) { + log.error("[Exit] Thread Interrupted Exception", e); + } } public static Options buildCommandlineOptions(final Options options) { @@ -233,6 +262,10 @@ public class Producer { opt.setRequired(false); options.addOption(opt); + opt = new Option("q", "messageQuantity", true, "Send message quantity, Default: 0, running forever"); + opt.setRequired(false); + options.addOption(opt); + return options; } @@ -249,6 +282,23 @@ public class Producer { return msg; } + + private static void doPrintStats(final LinkedList snapshotList, final StatsBenchmarkProducer statsBenchmark, boolean done) { + Long[] begin = snapshotList.getFirst(); + Long[] end = snapshotList.getLast(); + + final long sendTps = (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L); + final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]); + + if (done) { + System.out.printf("[Complete] Send Total: %d Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n", + statsBenchmark.getSendRequestSuccessCount().get() + statsBenchmark.getSendRequestFailedCount().get(), + sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]); + } else { + System.out.printf("Current Time: %s Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n", + System.currentTimeMillis(), sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]); + } + } } class StatsBenchmarkProducer {