From 857d28df5e6dfe697fd01d86bcc1a48c0bdef234 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E3=80=90keepal=E3=80=91?= <974567732@qq.com> Date: Sun, 5 Sep 2021 06:23:32 +0800 Subject: [PATCH] [ISSUE #3284]Optimizing benchmark code (#3317) --- .../example/benchmark/BatchProducer.java | 68 ++++++++++--------- .../benchmark/TransactionProducer.java | 57 ++++++++-------- 2 files changed, 65 insertions(+), 60 deletions(-) diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java index 843b84bf..f3e8b604 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java @@ -26,6 +26,8 @@ import java.util.TimerTask; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; @@ -100,22 +102,22 @@ public class BatchProducer { try { long beginTimestamp = System.currentTimeMillis(); - long sendSucCount = statsBenchmark.getSendMessageSuccessCount().get(); + long sendSucCount = statsBenchmark.getSendMessageSuccessCount().longValue(); setKeys(keyEnable, msgs, String.valueOf(beginTimestamp / 1000)); setTags(tagCount, msgs, sendSucCount); setProperties(propertySize, msgs); SendResult sendResult = producer.send(msgs); if (sendResult.getSendStatus() == SendStatus.SEND_OK) { - statsBenchmark.getSendRequestSuccessCount().incrementAndGet(); - statsBenchmark.getSendMessageSuccessCount().addAndGet(msgs.size()); + statsBenchmark.getSendRequestSuccessCount().increment(); + statsBenchmark.getSendMessageSuccessCount().add(msgs.size()); } else { - statsBenchmark.getSendRequestFailedCount().incrementAndGet(); - statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size()); + statsBenchmark.getSendRequestFailedCount().increment(); + statsBenchmark.getSendMessageFailedCount().add(msgs.size()); } long currentRT = System.currentTimeMillis() - beginTimestamp; - statsBenchmark.getSendMessageSuccessTimeTotal().addAndGet(currentRT); - long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get(); + statsBenchmark.getSendMessageSuccessTimeTotal().add(currentRT); + long prevMaxRT = statsBenchmark.getSendMessageMaxRT().longValue(); while (currentRT > prevMaxRT) { boolean updated = statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, currentRT); if (updated) { @@ -125,8 +127,8 @@ public class BatchProducer { prevMaxRT = statsBenchmark.getSendMessageMaxRT().get(); } } catch (RemotingException e) { - statsBenchmark.getSendRequestFailedCount().incrementAndGet(); - statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size()); + statsBenchmark.getSendRequestFailedCount().increment(); + statsBenchmark.getSendMessageFailedCount().add(msgs.size()); log.error("[BENCHMARK_PRODUCER] Send Exception", e); try { @@ -134,22 +136,22 @@ public class BatchProducer { } catch (InterruptedException ignored) { } } catch (InterruptedException e) { - statsBenchmark.getSendRequestFailedCount().incrementAndGet(); - statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size()); + statsBenchmark.getSendRequestFailedCount().increment(); + statsBenchmark.getSendMessageFailedCount().add(msgs.size()); try { Thread.sleep(3000); } catch (InterruptedException e1) { } - statsBenchmark.getSendRequestFailedCount().incrementAndGet(); - statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size()); + statsBenchmark.getSendRequestFailedCount().increment(); + statsBenchmark.getSendMessageFailedCount().add(msgs.size()); log.error("[BENCHMARK_PRODUCER] Send Exception", e); } catch (MQClientException e) { - statsBenchmark.getSendRequestFailedCount().incrementAndGet(); - statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size()); + statsBenchmark.getSendRequestFailedCount().increment(); + statsBenchmark.getSendMessageFailedCount().add(msgs.size()); log.error("[BENCHMARK_PRODUCER] Send Exception", e); } catch (MQBrokerException e) { - statsBenchmark.getSendRequestFailedCount().incrementAndGet(); - statsBenchmark.getSendMessageFailedCount().addAndGet(msgs.size()); + statsBenchmark.getSendRequestFailedCount().increment(); + statsBenchmark.getSendMessageFailedCount().add(msgs.size()); log.error("[BENCHMARK_PRODUCER] Send Exception", e); try { Thread.sleep(3000); @@ -313,17 +315,17 @@ public class BatchProducer { class StatsBenchmarkBatchProducer { - private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L); + private final LongAdder sendRequestSuccessCount = new LongAdder(); - private final AtomicLong sendRequestFailedCount = new AtomicLong(0L); + private final LongAdder sendRequestFailedCount = new LongAdder(); - private final AtomicLong sendMessageSuccessTimeTotal = new AtomicLong(0L); + private final LongAdder sendMessageSuccessTimeTotal = new LongAdder(); private final AtomicLong sendMessageMaxRT = new AtomicLong(0L); - private final AtomicLong sendMessageSuccessCount = new AtomicLong(0L); + private final LongAdder sendMessageSuccessCount = new LongAdder(); - private final AtomicLong sendMessageFailedCount = new AtomicLong(0L); + private final LongAdder sendMessageFailedCount = new LongAdder(); private final Timer timer = new Timer("BenchmarkTimerThread", true); @@ -332,25 +334,25 @@ class StatsBenchmarkBatchProducer { public Long[] createSnapshot() { Long[] snap = new Long[] { System.currentTimeMillis(), - this.sendRequestSuccessCount.get(), - this.sendRequestFailedCount.get(), - this.sendMessageSuccessCount.get(), - this.sendMessageFailedCount.get(), - this.sendMessageSuccessTimeTotal.get(), + this.sendRequestSuccessCount.longValue(), + this.sendRequestFailedCount.longValue(), + this.sendMessageSuccessCount.longValue(), + this.sendMessageFailedCount.longValue(), + this.sendMessageSuccessTimeTotal.longValue(), }; return snap; } - public AtomicLong getSendRequestSuccessCount() { + public LongAdder getSendRequestSuccessCount() { return sendRequestSuccessCount; } - public AtomicLong getSendRequestFailedCount() { + public LongAdder getSendRequestFailedCount() { return sendRequestFailedCount; } - public AtomicLong getSendMessageSuccessTimeTotal() { + public LongAdder getSendMessageSuccessTimeTotal() { return sendMessageSuccessTimeTotal; } @@ -358,11 +360,11 @@ class StatsBenchmarkBatchProducer { return sendMessageMaxRT; } - public AtomicLong getSendMessageSuccessCount() { + public LongAdder getSendMessageSuccessCount() { return sendMessageSuccessCount; } - public AtomicLong getSendMessageFailedCount() { + public LongAdder getSendMessageFailedCount() { return sendMessageFailedCount; } @@ -390,7 +392,7 @@ class StatsBenchmarkBatchProducer { final double averageMsgRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]); System.out.printf("Current Time: %s Send TPS: %d Send MPS: %d Max RT(ms): %d Average RT(ms): %7.3f Average Message RT(ms): %7.3f Send Failed: %d Send Message Failed: %d%n", - System.currentTimeMillis(), sendTps, sendMps, getSendMessageMaxRT().get(), averageRT, averageMsgRT, end[2], end[4]); + System.currentTimeMillis(), sendTps, sendMps, getSendMessageMaxRT().longValue(), averageRT, averageMsgRT, end[2], end[4]); } } diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java index c4f14a48..767a96b4 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java @@ -50,10 +50,11 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; public class TransactionProducer { private static final long START_TIME = System.currentTimeMillis(); - private static final AtomicLong MSG_COUNT = new AtomicLong(0); + private static final LongAdder MSG_COUNT = new LongAdder(); //broker max check times should less than this value static final int MAX_CHECK_RESULT_IN_MSG = 20; @@ -158,7 +159,7 @@ public class TransactionProducer { success = false; } finally { final long currentRT = System.currentTimeMillis() - beginTimestamp; - statsBenchmark.getSendMessageTimeTotal().addAndGet(currentRT); + statsBenchmark.getSendMessageTimeTotal().add(currentRT); long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get(); while (currentRT > prevMaxRT) { boolean updated = statsBenchmark.getSendMessageMaxRT() @@ -169,9 +170,9 @@ public class TransactionProducer { prevMaxRT = statsBenchmark.getSendMessageMaxRT().get(); } if (success) { - statsBenchmark.getSendRequestSuccessCount().incrementAndGet(); + statsBenchmark.getSendRequestSuccessCount().increment(); } else { - statsBenchmark.getSendRequestFailedCount().incrementAndGet(); + statsBenchmark.getSendRequestFailedCount().increment(); } if (config.sendInterval > 0) { try { @@ -194,7 +195,9 @@ public class TransactionProducer { ByteBuffer buf = ByteBuffer.wrap(bs); buf.putLong(config.batchId); long sendMachineId = START_TIME << 32; - long msgId = sendMachineId | MSG_COUNT.getAndIncrement(); + long count = MSG_COUNT.longValue(); + long msgId = sendMachineId | count; + MSG_COUNT.increment(); buf.putLong(msgId); // save send tx result in message @@ -316,7 +319,7 @@ class TransactionListenerImpl implements TransactionListener { // message not generated in this test return LocalTransactionState.ROLLBACK_MESSAGE; } - statBenchmark.getCheckCount().incrementAndGet(); + statBenchmark.getCheckCount().increment(); int times = 0; try { @@ -339,7 +342,7 @@ class TransactionListenerImpl implements TransactionListener { dup = newCheckLog.equals(oldCheckLog); } if (dup) { - statBenchmark.getDuplicatedCheckCount().incrementAndGet(); + statBenchmark.getDuplicatedCheckCount().increment(); } if (msgMeta.sendResult != LocalTransactionState.UNKNOW) { System.out.printf("%s unexpected check: msgId=%s,txId=%s,checkTimes=%s,sendResult=%s\n", @@ -347,7 +350,7 @@ class TransactionListenerImpl implements TransactionListener { msg.getMsgId(), msg.getTransactionId(), msg.getUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES), msgMeta.sendResult.toString()); - statBenchmark.getUnexpectedCheckCount().incrementAndGet(); + statBenchmark.getUnexpectedCheckCount().increment(); return msgMeta.sendResult; } @@ -358,7 +361,7 @@ class TransactionListenerImpl implements TransactionListener { new SimpleDateFormat("HH:mm:ss,SSS").format(new Date()), msg.getMsgId(), msg.getTransactionId(), msg.getUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES), s); - statBenchmark.getUnexpectedCheckCount().incrementAndGet(); + statBenchmark.getUnexpectedCheckCount().increment(); return s; } } @@ -385,42 +388,42 @@ class Snapshot { } class StatsBenchmarkTProducer { - private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L); + private final LongAdder sendRequestSuccessCount = new LongAdder(); - private final AtomicLong sendRequestFailedCount = new AtomicLong(0L); + private final LongAdder sendRequestFailedCount = new LongAdder(); - private final AtomicLong sendMessageTimeTotal = new AtomicLong(0L); + private final LongAdder sendMessageTimeTotal = new LongAdder(); private final AtomicLong sendMessageMaxRT = new AtomicLong(0L); - private final AtomicLong checkCount = new AtomicLong(0L); + private final LongAdder checkCount = new LongAdder(); - private final AtomicLong unexpectedCheckCount = new AtomicLong(0L); + private final LongAdder unexpectedCheckCount = new LongAdder(); - private final AtomicLong duplicatedCheckCount = new AtomicLong(0); + private final LongAdder duplicatedCheckCount = new LongAdder(); public Snapshot createSnapshot() { Snapshot s = new Snapshot(); s.endTime = System.currentTimeMillis(); - s.sendRequestSuccessCount = sendRequestSuccessCount.get(); - s.sendRequestFailedCount = sendRequestFailedCount.get(); - s.sendMessageTimeTotal = sendMessageTimeTotal.get(); + s.sendRequestSuccessCount = sendRequestSuccessCount.longValue(); + s.sendRequestFailedCount = sendRequestFailedCount.longValue(); + s.sendMessageTimeTotal = sendMessageTimeTotal.longValue(); s.sendMessageMaxRT = sendMessageMaxRT.get(); - s.checkCount = checkCount.get(); - s.unexpectedCheckCount = unexpectedCheckCount.get(); - s.duplicatedCheck = duplicatedCheckCount.get(); + s.checkCount = checkCount.longValue(); + s.unexpectedCheckCount = unexpectedCheckCount.longValue(); + s.duplicatedCheck = duplicatedCheckCount.longValue(); return s; } - public AtomicLong getSendRequestSuccessCount() { + public LongAdder getSendRequestSuccessCount() { return sendRequestSuccessCount; } - public AtomicLong getSendRequestFailedCount() { + public LongAdder getSendRequestFailedCount() { return sendRequestFailedCount; } - public AtomicLong getSendMessageTimeTotal() { + public LongAdder getSendMessageTimeTotal() { return sendMessageTimeTotal; } @@ -428,15 +431,15 @@ class StatsBenchmarkTProducer { return sendMessageMaxRT; } - public AtomicLong getCheckCount() { + public LongAdder getCheckCount() { return checkCount; } - public AtomicLong getUnexpectedCheckCount() { + public LongAdder getUnexpectedCheckCount() { return unexpectedCheckCount; } - public AtomicLong getDuplicatedCheckCount() { + public LongAdder getDuplicatedCheckCount() { return duplicatedCheckCount; } } -- GitLab