未验证 提交 311d76f9 编写于 作者: H huangli 提交者: GitHub

[ISSUE 3194] [PART C] Replace AtomicLong with LongAdder in StatsItem.java to...

[ISSUE 3194] [PART C] Replace AtomicLong with LongAdder in StatsItem.java to improve performance (#3351)
上级 147d23e3
...@@ -178,7 +178,7 @@ public class ConsumeMessageConcurrentlyServiceTest { ...@@ -178,7 +178,7 @@ public class ConsumeMessageConcurrentlyServiceTest {
StatsItemSet itemSet = (StatsItemSet)statItmeSetField.get(mgr); StatsItemSet itemSet = (StatsItemSet)statItmeSetField.get(mgr);
StatsItem item = itemSet.getAndCreateStatsItem(topic + "@" + pushConsumer.getDefaultMQPushConsumerImpl().groupName()); StatsItem item = itemSet.getAndCreateStatsItem(topic + "@" + pushConsumer.getDefaultMQPushConsumerImpl().groupName());
assertThat(item.getValue().get()).isGreaterThan(0L); assertThat(item.getValue().sum()).isGreaterThan(0L);
MessageExt msg = messageAtomic.get(); MessageExt msg = messageAtomic.get();
assertThat(msg).isNotNull(); assertThat(msg).isNotNull();
assertThat(msg.getTopic()).isEqualTo(topic); assertThat(msg.getTopic()).isEqualTo(topic);
......
...@@ -21,14 +21,16 @@ import java.util.LinkedList; ...@@ -21,14 +21,16 @@ import java.util.LinkedList;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
public class StatsItem { public class StatsItem {
private final AtomicLong value = new AtomicLong(0); private final LongAdder value = new LongAdder();
private final AtomicLong times = new AtomicLong(0); private final LongAdder times = new LongAdder();
private final LinkedList<CallSnapshot> csListMinute = new LinkedList<CallSnapshot>(); private final LinkedList<CallSnapshot> csListMinute = new LinkedList<CallSnapshot>();
...@@ -157,8 +159,8 @@ public class StatsItem { ...@@ -157,8 +159,8 @@ public class StatsItem {
if (this.csListMinute.size() == 0) { if (this.csListMinute.size() == 0) {
this.csListMinute.add(new CallSnapshot(System.currentTimeMillis() - 10 * 1000, 0, 0)); this.csListMinute.add(new CallSnapshot(System.currentTimeMillis() - 10 * 1000, 0, 0));
} }
this.csListMinute.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value this.csListMinute.add(new CallSnapshot(System.currentTimeMillis(), this.times.sum(), this.value
.get())); .sum()));
if (this.csListMinute.size() > 7) { if (this.csListMinute.size() > 7) {
this.csListMinute.removeFirst(); this.csListMinute.removeFirst();
} }
...@@ -170,8 +172,8 @@ public class StatsItem { ...@@ -170,8 +172,8 @@ public class StatsItem {
if (this.csListHour.size() == 0) { if (this.csListHour.size() == 0) {
this.csListHour.add(new CallSnapshot(System.currentTimeMillis() - 10 * 60 * 1000, 0, 0)); this.csListHour.add(new CallSnapshot(System.currentTimeMillis() - 10 * 60 * 1000, 0, 0));
} }
this.csListHour.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value this.csListHour.add(new CallSnapshot(System.currentTimeMillis(), this.times.sum(), this.value
.get())); .sum()));
if (this.csListHour.size() > 7) { if (this.csListHour.size() > 7) {
this.csListHour.removeFirst(); this.csListHour.removeFirst();
} }
...@@ -183,8 +185,8 @@ public class StatsItem { ...@@ -183,8 +185,8 @@ public class StatsItem {
if (this.csListDay.size() == 0) { if (this.csListDay.size() == 0) {
this.csListDay.add(new CallSnapshot(System.currentTimeMillis() - 1 * 60 * 60 * 1000, 0, 0)); this.csListDay.add(new CallSnapshot(System.currentTimeMillis() - 1 * 60 * 60 * 1000, 0, 0));
} }
this.csListDay.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value this.csListDay.add(new CallSnapshot(System.currentTimeMillis(), this.times.sum(), this.value
.get())); .sum()));
if (this.csListDay.size() > 25) { if (this.csListDay.size() > 25) {
this.csListDay.removeFirst(); this.csListDay.removeFirst();
} }
...@@ -214,7 +216,7 @@ public class StatsItem { ...@@ -214,7 +216,7 @@ public class StatsItem {
ss.getAvgpt()); ss.getAvgpt());
} }
public AtomicLong getValue() { public LongAdder getValue() {
return value; return value;
} }
...@@ -226,7 +228,7 @@ public class StatsItem { ...@@ -226,7 +228,7 @@ public class StatsItem {
return statsName; return statsName;
} }
public AtomicLong getTimes() { public LongAdder getTimes() {
return times; return times;
} }
} }
......
...@@ -154,14 +154,14 @@ public class StatsItemSet { ...@@ -154,14 +154,14 @@ public class StatsItemSet {
public void addValue(final String statsKey, final int incValue, final int incTimes) { public void addValue(final String statsKey, final int incValue, final int incTimes) {
StatsItem statsItem = this.getAndCreateStatsItem(statsKey); StatsItem statsItem = this.getAndCreateStatsItem(statsKey);
statsItem.getValue().addAndGet(incValue); statsItem.getValue().add(incValue);
statsItem.getTimes().addAndGet(incTimes); statsItem.getTimes().add(incTimes);
} }
public void addRTValue(final String statsKey, final int incValue, final int incTimes) { public void addRTValue(final String statsKey, final int incValue, final int incTimes) {
StatsItem statsItem = this.getAndCreateRTStatsItem(statsKey); StatsItem statsItem = this.getAndCreateRTStatsItem(statsKey);
statsItem.getValue().addAndGet(incValue); statsItem.getValue().add(incValue);
statsItem.getTimes().addAndGet(incTimes); statsItem.getTimes().add(incTimes);
} }
public void delValue(final String statsKey) { public void delValue(final String statsKey) {
......
...@@ -23,6 +23,8 @@ import java.util.concurrent.ScheduledExecutorService; ...@@ -23,6 +23,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.junit.After; import org.junit.After;
import org.junit.Test; import org.junit.Test;
...@@ -95,7 +97,7 @@ public class StatsItemSetTest { ...@@ -95,7 +97,7 @@ public class StatsItemSetTest {
} }
} }
private AtomicLong test_unit() throws InterruptedException { private LongAdder test_unit() throws InterruptedException {
final StatsItemSet statsItemSet = new StatsItemSet("topicTest", scheduler, null); final StatsItemSet statsItemSet = new StatsItemSet("topicTest", scheduler, null);
executor = new ThreadPoolExecutor(10, 20, 10, TimeUnit.SECONDS, executor = new ThreadPoolExecutor(10, 20, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(100), new ThreadFactoryImpl("testMultiThread")); new ArrayBlockingQueue<Runnable>(100), new ThreadFactoryImpl("testMultiThread"));
......
...@@ -213,15 +213,15 @@ public class BrokerStatsManager { ...@@ -213,15 +213,15 @@ public class BrokerStatsManager {
} }
public void incBrokerPutNums() { public void incBrokerPutNums() {
this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().incrementAndGet(); this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(1);
} }
public void incBrokerPutNums(final int incValue) { public void incBrokerPutNums(final int incValue) {
this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue); this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(incValue);
} }
public void incBrokerGetNums(final int incValue) { public void incBrokerGetNums(final int incValue) {
this.statsTable.get(BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue); this.statsTable.get(BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(incValue);
} }
public void incSendBackNums(final String group, final String topic) { public void incSendBackNums(final String group, final String topic) {
......
...@@ -149,9 +149,9 @@ public class ScheduleMessageServiceTest { ...@@ -149,9 +149,9 @@ public class ScheduleMessageServiceTest {
assertThat(messageResult.getStatus()).isEqualTo(GetMessageStatus.FOUND); assertThat(messageResult.getStatus()).isEqualTo(GetMessageStatus.FOUND);
// get the stats change // get the stats change
assertThat(messageStore.getBrokerStatsManager().getStatsItem(BROKER_PUT_NUMS, brokerConfig.getBrokerClusterName()).getValue().get()).isEqualTo(1); assertThat(messageStore.getBrokerStatsManager().getStatsItem(BROKER_PUT_NUMS, brokerConfig.getBrokerClusterName()).getValue().sum()).isEqualTo(1);
assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_NUMS, topic).getValue().get()).isEqualTo(1L); assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_NUMS, topic).getValue().sum()).isEqualTo(1L);
assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_SIZE, topic).getValue().get()).isEqualTo(messageResult.getBufferTotalSize()); assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_SIZE, topic).getValue().sum()).isEqualTo(messageResult.getBufferTotalSize());
// get the message body // get the message body
ByteBuffer byteBuffer = ByteBuffer.allocate(messageResult.getBufferTotalSize()); ByteBuffer byteBuffer = ByteBuffer.allocate(messageResult.getBufferTotalSize());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册