diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java index e8feb80dd99f45f4713605f797130fba6e75f9e5..6fa76e0380b471dd654efea292d95670307f4a31 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java @@ -178,7 +178,7 @@ public class ConsumeMessageConcurrentlyServiceTest { StatsItemSet itemSet = (StatsItemSet)statItmeSetField.get(mgr); StatsItem item = itemSet.getAndCreateStatsItem(topic + "@" + pushConsumer.getDefaultMQPushConsumerImpl().groupName()); - assertThat(item.getValue().get()).isGreaterThan(0L); + assertThat(item.getValue().sum()).isGreaterThan(0L); MessageExt msg = messageAtomic.get(); assertThat(msg).isNotNull(); assertThat(msg.getTopic()).isEqualTo(topic); diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java index b078551ad2ca006d2b2c5f84589bbb3e059d068f..6007cb0a4251b3c807b6dcb58117799eedde1b04 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java @@ -21,14 +21,16 @@ import java.util.LinkedList; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; + import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.logging.InternalLogger; public class StatsItem { - private final AtomicLong value = new AtomicLong(0); + private final LongAdder value = new LongAdder(); - private final AtomicLong times = new AtomicLong(0); + private final LongAdder times = new LongAdder(); private final LinkedList csListMinute = new LinkedList(); @@ -157,8 +159,8 @@ public class StatsItem { if (this.csListMinute.size() == 0) { this.csListMinute.add(new CallSnapshot(System.currentTimeMillis() - 10 * 1000, 0, 0)); } - this.csListMinute.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value - .get())); + this.csListMinute.add(new CallSnapshot(System.currentTimeMillis(), this.times.sum(), this.value + .sum())); if (this.csListMinute.size() > 7) { this.csListMinute.removeFirst(); } @@ -170,8 +172,8 @@ public class StatsItem { if (this.csListHour.size() == 0) { this.csListHour.add(new CallSnapshot(System.currentTimeMillis() - 10 * 60 * 1000, 0, 0)); } - this.csListHour.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value - .get())); + this.csListHour.add(new CallSnapshot(System.currentTimeMillis(), this.times.sum(), this.value + .sum())); if (this.csListHour.size() > 7) { this.csListHour.removeFirst(); } @@ -183,8 +185,8 @@ public class StatsItem { if (this.csListDay.size() == 0) { this.csListDay.add(new CallSnapshot(System.currentTimeMillis() - 1 * 60 * 60 * 1000, 0, 0)); } - this.csListDay.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value - .get())); + this.csListDay.add(new CallSnapshot(System.currentTimeMillis(), this.times.sum(), this.value + .sum())); if (this.csListDay.size() > 25) { this.csListDay.removeFirst(); } @@ -214,7 +216,7 @@ public class StatsItem { ss.getAvgpt()); } - public AtomicLong getValue() { + public LongAdder getValue() { return value; } @@ -226,7 +228,7 @@ public class StatsItem { return statsName; } - public AtomicLong getTimes() { + public LongAdder getTimes() { return times; } } diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java index a28d008d3e2d07dd2be3b1dcda20a0241d8b023c..8d5418ef6ce448e01eae7ec6c00686e0a277da62 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java @@ -154,14 +154,14 @@ public class StatsItemSet { public void addValue(final String statsKey, final int incValue, final int incTimes) { StatsItem statsItem = this.getAndCreateStatsItem(statsKey); - statsItem.getValue().addAndGet(incValue); - statsItem.getTimes().addAndGet(incTimes); + statsItem.getValue().add(incValue); + statsItem.getTimes().add(incTimes); } public void addRTValue(final String statsKey, final int incValue, final int incTimes) { StatsItem statsItem = this.getAndCreateRTStatsItem(statsKey); - statsItem.getValue().addAndGet(incValue); - statsItem.getTimes().addAndGet(incTimes); + statsItem.getValue().add(incValue); + statsItem.getTimes().add(incTimes); } public void delValue(final String statsKey) { diff --git a/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java b/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java index 5b4c5d823d3357f38fb98601b2d2fb61f3190ca5..d834160d63c37116ebf5c72ef3fea87bc9641a36 100644 --- a/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java @@ -23,6 +23,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; + import org.apache.rocketmq.common.ThreadFactoryImpl; import org.junit.After; import org.junit.Test; @@ -95,7 +97,7 @@ public class StatsItemSetTest { } } - private AtomicLong test_unit() throws InterruptedException { + private LongAdder test_unit() throws InterruptedException { final StatsItemSet statsItemSet = new StatsItemSet("topicTest", scheduler, null); executor = new ThreadPoolExecutor(10, 20, 10, TimeUnit.SECONDS, new ArrayBlockingQueue(100), new ThreadFactoryImpl("testMultiThread")); diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java index 3e643e32a744dce2ddbfccc8694160a14aa2fcd0..b9e11fd5929b8980a7dab83816e1f621cfc35dc5 100644 --- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java +++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java @@ -213,15 +213,15 @@ public class BrokerStatsManager { } public void incBrokerPutNums() { - this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().incrementAndGet(); + this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(1); } public void incBrokerPutNums(final int incValue) { - this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue); + this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(incValue); } public void incBrokerGetNums(final int incValue) { - this.statsTable.get(BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue); + this.statsTable.get(BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(incValue); } public void incSendBackNums(final String group, final String topic) { diff --git a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java index fa3c6bfcd8b9fd08035d5c7c409a10337f32b8d0..d375fb0c89c084d0f74d61373ce0c75af1f7d277 100644 --- a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java @@ -149,9 +149,9 @@ public class ScheduleMessageServiceTest { assertThat(messageResult.getStatus()).isEqualTo(GetMessageStatus.FOUND); // get the stats change - assertThat(messageStore.getBrokerStatsManager().getStatsItem(BROKER_PUT_NUMS, brokerConfig.getBrokerClusterName()).getValue().get()).isEqualTo(1); - assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_NUMS, topic).getValue().get()).isEqualTo(1L); - assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_SIZE, topic).getValue().get()).isEqualTo(messageResult.getBufferTotalSize()); + assertThat(messageStore.getBrokerStatsManager().getStatsItem(BROKER_PUT_NUMS, brokerConfig.getBrokerClusterName()).getValue().sum()).isEqualTo(1); + assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_NUMS, topic).getValue().sum()).isEqualTo(1L); + assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_SIZE, topic).getValue().sum()).isEqualTo(messageResult.getBufferTotalSize()); // get the message body ByteBuffer byteBuffer = ByteBuffer.allocate(messageResult.getBufferTotalSize());