diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java index 154e6ed4dac017a953fc46c22ce33f2d40e68ec7..7d26509d2bc2dc8bc393f789c7147009b38fc8ea 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.example.benchmark; +import java.util.concurrent.atomic.LongAdder; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; @@ -155,20 +156,20 @@ public class Consumer { MessageExt msg = msgs.get(0); long now = System.currentTimeMillis(); - statsBenchmarkConsumer.getReceiveMessageTotalCount().incrementAndGet(); + statsBenchmarkConsumer.getReceiveMessageTotalCount().increment(); long born2ConsumerRT = now - msg.getBornTimestamp(); - statsBenchmarkConsumer.getBorn2ConsumerTotalRT().addAndGet(born2ConsumerRT); + statsBenchmarkConsumer.getBorn2ConsumerTotalRT().add(born2ConsumerRT); long store2ConsumerRT = now - msg.getStoreTimestamp(); - statsBenchmarkConsumer.getStore2ConsumerTotalRT().addAndGet(store2ConsumerRT); + statsBenchmarkConsumer.getStore2ConsumerTotalRT().add(store2ConsumerRT); compareAndSetMax(statsBenchmarkConsumer.getBorn2ConsumerMaxRT(), born2ConsumerRT); compareAndSetMax(statsBenchmarkConsumer.getStore2ConsumerMaxRT(), store2ConsumerRT); if (ThreadLocalRandom.current().nextDouble() < failRate) { - statsBenchmarkConsumer.getFailCount().incrementAndGet(); + statsBenchmarkConsumer.getFailCount().increment(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } else { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; @@ -233,39 +234,39 @@ public class Consumer { } class StatsBenchmarkConsumer { - private final AtomicLong receiveMessageTotalCount = new AtomicLong(0L); + private final LongAdder receiveMessageTotalCount = new LongAdder(); - private final AtomicLong born2ConsumerTotalRT = new AtomicLong(0L); + private final LongAdder born2ConsumerTotalRT = new LongAdder(); - private final AtomicLong store2ConsumerTotalRT = new AtomicLong(0L); + private final LongAdder store2ConsumerTotalRT = new LongAdder(); private final AtomicLong born2ConsumerMaxRT = new AtomicLong(0L); private final AtomicLong store2ConsumerMaxRT = new AtomicLong(0L); - private final AtomicLong failCount = new AtomicLong(0L); + private final LongAdder failCount = new LongAdder(); public Long[] createSnapshot() { Long[] snap = new Long[] { System.currentTimeMillis(), - this.receiveMessageTotalCount.get(), - this.born2ConsumerTotalRT.get(), - this.store2ConsumerTotalRT.get(), - this.failCount.get() + this.receiveMessageTotalCount.longValue(), + this.born2ConsumerTotalRT.longValue(), + this.store2ConsumerTotalRT.longValue(), + this.failCount.longValue() }; return snap; } - public AtomicLong getReceiveMessageTotalCount() { + public LongAdder getReceiveMessageTotalCount() { return receiveMessageTotalCount; } - public AtomicLong getBorn2ConsumerTotalRT() { + public LongAdder getBorn2ConsumerTotalRT() { return born2ConsumerTotalRT; } - public AtomicLong getStore2ConsumerTotalRT() { + public LongAdder getStore2ConsumerTotalRT() { return store2ConsumerTotalRT; } @@ -277,7 +278,7 @@ class StatsBenchmarkConsumer { return store2ConsumerMaxRT; } - public AtomicLong getFailCount() { + public LongAdder getFailCount() { return failCount; } } diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java index b198a0fc99808cf0eeac4a81e005e004ec760cad..cc2999485a4f17ed5d61a48d7e9250ab79f48a81 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.example.benchmark; +import java.util.concurrent.atomic.LongAdder; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; @@ -156,8 +157,7 @@ public class Producer { msg.setDelayTimeLevel(delayLevel); } if (tagCount > 0) { - long sendSucCount = statsBenchmark.getReceiveResponseSuccessCount().get(); - msg.setTags(String.format("tag%d", sendSucCount % tagCount)); + msg.setTags(String.format("tag%d", System.currentTimeMillis() % tagCount)); } if (propertySize > 0) { if (msg.getProperties() != null) { @@ -180,20 +180,20 @@ public class Producer { } } producer.send(msg); - statsBenchmark.getSendRequestSuccessCount().incrementAndGet(); - statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet(); + statsBenchmark.getSendRequestSuccessCount().increment(); + statsBenchmark.getReceiveResponseSuccessCount().increment(); final long currentRT = System.currentTimeMillis() - beginTimestamp; - statsBenchmark.getSendMessageSuccessTimeTotal().addAndGet(currentRT); - long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get(); + statsBenchmark.getSendMessageSuccessTimeTotal().add(currentRT); + long prevMaxRT = statsBenchmark.getSendMessageMaxRT().longValue(); while (currentRT > prevMaxRT) { boolean updated = statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, currentRT); if (updated) break; - prevMaxRT = statsBenchmark.getSendMessageMaxRT().get(); + prevMaxRT = statsBenchmark.getSendMessageMaxRT().longValue(); } } catch (RemotingException e) { - statsBenchmark.getSendRequestFailedCount().incrementAndGet(); + statsBenchmark.getSendRequestFailedCount().increment(); log.error("[BENCHMARK_PRODUCER] Send Exception", e); try { @@ -201,16 +201,16 @@ public class Producer { } catch (InterruptedException ignored) { } } catch (InterruptedException e) { - statsBenchmark.getSendRequestFailedCount().incrementAndGet(); + statsBenchmark.getSendRequestFailedCount().increment(); try { Thread.sleep(3000); } catch (InterruptedException e1) { } } catch (MQClientException e) { - statsBenchmark.getSendRequestFailedCount().incrementAndGet(); + statsBenchmark.getSendRequestFailedCount().increment(); log.error("[BENCHMARK_PRODUCER] Send Exception", e); } catch (MQBrokerException e) { - statsBenchmark.getReceiveResponseFailedCount().incrementAndGet(); + statsBenchmark.getReceiveResponseFailedCount().increment(); log.error("[BENCHMARK_PRODUCER] Send Exception", e); try { Thread.sleep(3000); @@ -237,8 +237,8 @@ public class Producer { doPrintStats(snapshotList, statsBenchmark, true); } else { System.out.printf("[Complete] Send Total: %d Send Failed: %d Response Failed: %d%n", - statsBenchmark.getSendRequestSuccessCount().get() + statsBenchmark.getSendRequestFailedCount().get(), - statsBenchmark.getSendRequestFailedCount().get(), statsBenchmark.getReceiveResponseFailedCount().get()); + statsBenchmark.getSendRequestSuccessCount().longValue() + statsBenchmark.getSendRequestFailedCount().longValue(), + statsBenchmark.getSendRequestFailedCount().longValue(), statsBenchmark.getReceiveResponseFailedCount().longValue()); } producer.shutdown(); } catch (InterruptedException e) { @@ -294,7 +294,7 @@ public class Producer { Message msg = new Message(); msg.setTopic(topic); - StringBuilder sb = new StringBuilder(); + StringBuilder sb = new StringBuilder(messageSize); for (int i = 0; i < messageSize; i += 10) { sb.append("hello baby"); } @@ -313,58 +313,58 @@ public class Producer { if (done) { System.out.printf("[Complete] Send Total: %d Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n", - statsBenchmark.getSendRequestSuccessCount().get() + statsBenchmark.getSendRequestFailedCount().get(), - sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]); + statsBenchmark.getSendRequestSuccessCount().longValue() + statsBenchmark.getSendRequestFailedCount().longValue(), + sendTps, statsBenchmark.getSendMessageMaxRT().longValue(), averageRT, end[2], end[4]); } else { System.out.printf("Current Time: %s Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n", - System.currentTimeMillis(), sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]); + System.currentTimeMillis(), sendTps, statsBenchmark.getSendMessageMaxRT().longValue(), averageRT, end[2], end[4]); } } } class StatsBenchmarkProducer { - private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L); + private final LongAdder sendRequestSuccessCount = new LongAdder(); - private final AtomicLong sendRequestFailedCount = new AtomicLong(0L); + private final LongAdder sendRequestFailedCount = new LongAdder(); - private final AtomicLong receiveResponseSuccessCount = new AtomicLong(0L); + private final LongAdder receiveResponseSuccessCount = new LongAdder(); - private final AtomicLong receiveResponseFailedCount = new AtomicLong(0L); + private final LongAdder receiveResponseFailedCount = new LongAdder(); - private final AtomicLong sendMessageSuccessTimeTotal = new AtomicLong(0L); + private final LongAdder sendMessageSuccessTimeTotal = new LongAdder(); private final AtomicLong sendMessageMaxRT = new AtomicLong(0L); public Long[] createSnapshot() { Long[] snap = new Long[] { System.currentTimeMillis(), - this.sendRequestSuccessCount.get(), - this.sendRequestFailedCount.get(), - this.receiveResponseSuccessCount.get(), - this.receiveResponseFailedCount.get(), - this.sendMessageSuccessTimeTotal.get(), + this.sendRequestSuccessCount.longValue(), + this.sendRequestFailedCount.longValue(), + this.receiveResponseSuccessCount.longValue(), + this.receiveResponseFailedCount.longValue(), + this.sendMessageSuccessTimeTotal.longValue(), }; return snap; } - public AtomicLong getSendRequestSuccessCount() { + public LongAdder getSendRequestSuccessCount() { return sendRequestSuccessCount; } - public AtomicLong getSendRequestFailedCount() { + public LongAdder getSendRequestFailedCount() { return sendRequestFailedCount; } - public AtomicLong getReceiveResponseSuccessCount() { + public LongAdder getReceiveResponseSuccessCount() { return receiveResponseSuccessCount; } - public AtomicLong getReceiveResponseFailedCount() { + public LongAdder getReceiveResponseFailedCount() { return receiveResponseFailedCount; } - public AtomicLong getSendMessageSuccessTimeTotal() { + public LongAdder getSendMessageSuccessTimeTotal() { return sendMessageSuccessTimeTotal; }