From 311d76f9c851107386b08a131909335fffa2a631 Mon Sep 17 00:00:00 2001 From: huangli Date: Tue, 14 Sep 2021 23:48:15 +0800 Subject: [PATCH] [ISSUE 3194] [PART C] Replace AtomicLong with LongAdder in StatsItem.java to improve performance (#3351) --- ...ConsumeMessageConcurrentlyServiceTest.java | 2 +- .../rocketmq/common/stats/StatsItem.java | 22 ++++++++++--------- .../rocketmq/common/stats/StatsItemSet.java | 8 +++---- .../common/stats/StatsItemSetTest.java | 4 +++- .../store/stats/BrokerStatsManager.java | 6 ++--- .../schedule/ScheduleMessageServiceTest.java | 6 ++--- 6 files changed, 26 insertions(+), 22 deletions(-) diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java index e8feb80d..6fa76e03 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java @@ -178,7 +178,7 @@ public class ConsumeMessageConcurrentlyServiceTest { StatsItemSet itemSet = (StatsItemSet)statItmeSetField.get(mgr); StatsItem item = itemSet.getAndCreateStatsItem(topic + "@" + pushConsumer.getDefaultMQPushConsumerImpl().groupName()); - assertThat(item.getValue().get()).isGreaterThan(0L); + assertThat(item.getValue().sum()).isGreaterThan(0L); MessageExt msg = messageAtomic.get(); assertThat(msg).isNotNull(); assertThat(msg.getTopic()).isEqualTo(topic); diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java index b078551a..6007cb0a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java @@ -21,14 +21,16 @@ import java.util.LinkedList; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; + import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.logging.InternalLogger; public class StatsItem { - private final AtomicLong value = new AtomicLong(0); + private final LongAdder value = new LongAdder(); - private final AtomicLong times = new AtomicLong(0); + private final LongAdder times = new LongAdder(); private final LinkedList csListMinute = new LinkedList(); @@ -157,8 +159,8 @@ public class StatsItem { if (this.csListMinute.size() == 0) { this.csListMinute.add(new CallSnapshot(System.currentTimeMillis() - 10 * 1000, 0, 0)); } - this.csListMinute.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value - .get())); + this.csListMinute.add(new CallSnapshot(System.currentTimeMillis(), this.times.sum(), this.value + .sum())); if (this.csListMinute.size() > 7) { this.csListMinute.removeFirst(); } @@ -170,8 +172,8 @@ public class StatsItem { if (this.csListHour.size() == 0) { this.csListHour.add(new CallSnapshot(System.currentTimeMillis() - 10 * 60 * 1000, 0, 0)); } - this.csListHour.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value - .get())); + this.csListHour.add(new CallSnapshot(System.currentTimeMillis(), this.times.sum(), this.value + .sum())); if (this.csListHour.size() > 7) { this.csListHour.removeFirst(); } @@ -183,8 +185,8 @@ public class StatsItem { if (this.csListDay.size() == 0) { this.csListDay.add(new CallSnapshot(System.currentTimeMillis() - 1 * 60 * 60 * 1000, 0, 0)); } - this.csListDay.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value - .get())); + this.csListDay.add(new CallSnapshot(System.currentTimeMillis(), this.times.sum(), this.value + .sum())); if (this.csListDay.size() > 25) { this.csListDay.removeFirst(); } @@ -214,7 +216,7 @@ public class StatsItem { ss.getAvgpt()); } - public AtomicLong getValue() { + public LongAdder getValue() { return value; } @@ -226,7 +228,7 @@ public class StatsItem { return statsName; } - public AtomicLong getTimes() { + public LongAdder getTimes() { return times; } } diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java index a28d008d..8d5418ef 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java @@ -154,14 +154,14 @@ public class StatsItemSet { public void addValue(final String statsKey, final int incValue, final int incTimes) { StatsItem statsItem = this.getAndCreateStatsItem(statsKey); - statsItem.getValue().addAndGet(incValue); - statsItem.getTimes().addAndGet(incTimes); + statsItem.getValue().add(incValue); + statsItem.getTimes().add(incTimes); } public void addRTValue(final String statsKey, final int incValue, final int incTimes) { StatsItem statsItem = this.getAndCreateRTStatsItem(statsKey); - statsItem.getValue().addAndGet(incValue); - statsItem.getTimes().addAndGet(incTimes); + statsItem.getValue().add(incValue); + statsItem.getTimes().add(incTimes); } public void delValue(final String statsKey) { diff --git a/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java b/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java index 5b4c5d82..d834160d 100644 --- a/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java @@ -23,6 +23,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; + import org.apache.rocketmq.common.ThreadFactoryImpl; import org.junit.After; import org.junit.Test; @@ -95,7 +97,7 @@ public class StatsItemSetTest { } } - private AtomicLong test_unit() throws InterruptedException { + private LongAdder test_unit() throws InterruptedException { final StatsItemSet statsItemSet = new StatsItemSet("topicTest", scheduler, null); executor = new ThreadPoolExecutor(10, 20, 10, TimeUnit.SECONDS, new ArrayBlockingQueue(100), new ThreadFactoryImpl("testMultiThread")); diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java index 3e643e32..b9e11fd5 100644 --- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java +++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java @@ -213,15 +213,15 @@ public class BrokerStatsManager { } public void incBrokerPutNums() { - this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().incrementAndGet(); + this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(1); } public void incBrokerPutNums(final int incValue) { - this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue); + this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(incValue); } public void incBrokerGetNums(final int incValue) { - this.statsTable.get(BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue); + this.statsTable.get(BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(incValue); } public void incSendBackNums(final String group, final String topic) { diff --git a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java index fa3c6bfc..d375fb0c 100644 --- a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java @@ -149,9 +149,9 @@ public class ScheduleMessageServiceTest { assertThat(messageResult.getStatus()).isEqualTo(GetMessageStatus.FOUND); // get the stats change - assertThat(messageStore.getBrokerStatsManager().getStatsItem(BROKER_PUT_NUMS, brokerConfig.getBrokerClusterName()).getValue().get()).isEqualTo(1); - assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_NUMS, topic).getValue().get()).isEqualTo(1L); - assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_SIZE, topic).getValue().get()).isEqualTo(messageResult.getBufferTotalSize()); + assertThat(messageStore.getBrokerStatsManager().getStatsItem(BROKER_PUT_NUMS, brokerConfig.getBrokerClusterName()).getValue().sum()).isEqualTo(1); + assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_NUMS, topic).getValue().sum()).isEqualTo(1L); + assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_SIZE, topic).getValue().sum()).isEqualTo(messageResult.getBufferTotalSize()); // get the message body ByteBuffer byteBuffer = ByteBuffer.allocate(messageResult.getBufferTotalSize()); -- GitLab