From d7a830cf8f1b5713af7ff643c4e9f847d5331c6c Mon Sep 17 00:00:00 2001 From: huangli Date: Mon, 6 Sep 2021 20:16:11 +0800 Subject: [PATCH] [ISSUE 3194] [PART A] Use LongAdder instead of AtomicLong in BrokerStatsService to improve performance. (#3195) --- .../org/apache/rocketmq/store/CommitLog.java | 8 +- .../rocketmq/store/DefaultMessageStore.java | 14 +-- .../rocketmq/store/StoreStatsService.java | 106 +++++++++--------- .../store/dledger/DLedgerCommitLog.java | 8 +- .../rocketmq/store/stats/BrokerStats.java | 4 +- .../rocketmq/store/StoreStatsServiceTest.java | 18 +-- 6 files changed, 80 insertions(+), 78 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 5e92654a..32e9a657 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -685,8 +685,8 @@ public class CommitLog { PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); // Statistics - storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet(); - storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes()); + storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1); + storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes()); CompletableFuture flushResultFuture = submitFlushRequest(result, msg); CompletableFuture replicaResultFuture = submitReplicaRequest(result, msg); @@ -802,8 +802,8 @@ public class CommitLog { PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); // Statistics - storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum()); - storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes()); + storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).add(result.getMsgNum()); + storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).add(result.getWroteBytes()); CompletableFuture flushOKFuture = submitFlushRequest(result, messageExtBatch); CompletableFuture replicaOKFuture = submitReplicaRequest(result, messageExtBatch); diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 69019c15..5bf68ac5 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -438,7 +438,7 @@ public class DefaultMessageStore implements MessageStore { this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); if (null == result || !result.isOk()) { - this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); + this.storeStatsService.getPutMessageFailedTimes().add(1); } }); @@ -468,7 +468,7 @@ public class DefaultMessageStore implements MessageStore { this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); if (null == result || !result.isOk()) { - this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); + this.storeStatsService.getPutMessageFailedTimes().add(1); } }); @@ -634,7 +634,7 @@ public class DefaultMessageStore implements MessageStore { continue; } - this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet(); + this.storeStatsService.getGetMessageTransferedMsgCount().add(1); getResult.addMessage(selectResult); status = GetMessageStatus.FOUND; nextPhyFileStartOffset = Long.MIN_VALUE; @@ -668,9 +668,9 @@ public class DefaultMessageStore implements MessageStore { } if (GetMessageStatus.FOUND == status) { - this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet(); + this.storeStatsService.getGetMessageTimesTotalFound().add(1); } else { - this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet(); + this.storeStatsService.getGetMessageTimesTotalMiss().add(1); } long elapsedTime = this.getSystemClock().now() - beginTime; this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime); @@ -1922,10 +1922,10 @@ public class DefaultMessageStore implements MessageStore { readSize += size; if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { DefaultMessageStore.this.storeStatsService - .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet(); + .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1); DefaultMessageStore.this.storeStatsService .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()) - .addAndGet(dispatchRequest.getMsgSize()); + .add(dispatchRequest.getMsgSize()); } } else if (size == 0) { this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset); diff --git a/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java b/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java index 8372845e..395f5e30 100644 --- a/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java +++ b/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java @@ -22,7 +22,7 @@ import java.util.LinkedList; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantLock; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.constant.LoggerName; @@ -41,23 +41,23 @@ public class StoreStatsService extends ServiceThread { private static int printTPSInterval = 60 * 1; - private final AtomicLong putMessageFailedTimes = new AtomicLong(0); + private final LongAdder putMessageFailedTimes = new LongAdder(); - private final ConcurrentMap putMessageTopicTimesTotal = - new ConcurrentHashMap(128); - private final ConcurrentMap putMessageTopicSizeTotal = - new ConcurrentHashMap(128); + private final ConcurrentMap putMessageTopicTimesTotal = + new ConcurrentHashMap<>(128); + private final ConcurrentMap putMessageTopicSizeTotal = + new ConcurrentHashMap<>(128); - private final AtomicLong getMessageTimesTotalFound = new AtomicLong(0); - private final AtomicLong getMessageTransferedMsgCount = new AtomicLong(0); - private final AtomicLong getMessageTimesTotalMiss = new AtomicLong(0); + private final LongAdder getMessageTimesTotalFound = new LongAdder(); + private final LongAdder getMessageTransferedMsgCount = new LongAdder(); + private final LongAdder getMessageTimesTotalMiss = new LongAdder(); private final LinkedList putTimesList = new LinkedList(); private final LinkedList getTimesFoundList = new LinkedList(); private final LinkedList getTimesMissList = new LinkedList(); private final LinkedList transferedMsgCountList = new LinkedList(); - private volatile AtomicLong[] putMessageDistributeTime; - private volatile AtomicLong[] lastPutMessageDistributeTime; + private volatile LongAdder[] putMessageDistributeTime; + private volatile LongAdder[] lastPutMessageDistributeTime; private long messageStoreBootTimestamp = System.currentTimeMillis(); private volatile long putMessageEntireTimeMax = 0; private volatile long getMessageEntireTimeMax = 0; @@ -75,10 +75,10 @@ public class StoreStatsService extends ServiceThread { this.initPutMessageDistributeTime(); } - private AtomicLong[] initPutMessageDistributeTime() { - AtomicLong[] next = new AtomicLong[13]; + private LongAdder[] initPutMessageDistributeTime() { + LongAdder[] next = new LongAdder[13]; for (int i = 0; i < next.length; i++) { - next[i] = new AtomicLong(0); + next[i] = new LongAdder(); } this.lastPutMessageDistributeTime = this.putMessageDistributeTime; @@ -93,48 +93,48 @@ public class StoreStatsService extends ServiceThread { } public void setPutMessageEntireTimeMax(long value) { - final AtomicLong[] times = this.putMessageDistributeTime; + final LongAdder[] times = this.putMessageDistributeTime; if (null == times) return; // us if (value <= 0) { - times[0].incrementAndGet(); + times[0].add(1); } else if (value < 10) { - times[1].incrementAndGet(); + times[1].add(1); } else if (value < 50) { - times[2].incrementAndGet(); + times[2].add(1); } else if (value < 100) { - times[3].incrementAndGet(); + times[3].add(1); } else if (value < 200) { - times[4].incrementAndGet(); + times[4].add(1); } else if (value < 500) { - times[5].incrementAndGet(); + times[5].add(1); } else if (value < 1000) { - times[6].incrementAndGet(); + times[6].add(1); } // 2s else if (value < 2000) { - times[7].incrementAndGet(); + times[7].add(1); } // 3s else if (value < 3000) { - times[8].incrementAndGet(); + times[8].add(1); } // 4s else if (value < 4000) { - times[9].incrementAndGet(); + times[9].add(1); } // 5s else if (value < 5000) { - times[10].incrementAndGet(); + times[10].add(1); } // 10s else if (value < 10000) { - times[11].incrementAndGet(); + times[11].add(1); } else { - times[12].incrementAndGet(); + times[12].add(1); } if (value > this.putMessageEntireTimeMax) { @@ -194,8 +194,8 @@ public class StoreStatsService extends ServiceThread { public long getPutMessageTimesTotal() { long rs = 0; - for (AtomicLong data : putMessageTopicTimesTotal.values()) { - rs += data.get(); + for (LongAdder data : putMessageTopicTimesTotal.values()) { + rs += data.longValue(); } return rs; } @@ -218,8 +218,8 @@ public class StoreStatsService extends ServiceThread { public long getPutMessageSizeTotal() { long rs = 0; - for (AtomicLong data : putMessageTopicSizeTotal.values()) { - rs += data.get(); + for (LongAdder data : putMessageTopicSizeTotal.values()) { + rs += data.longValue(); } return rs; } @@ -299,13 +299,13 @@ public class StoreStatsService extends ServiceThread { } private String putMessageDistributeTimeToString() { - final AtomicLong[] times = this.lastPutMessageDistributeTime; + final LongAdder[] times = this.lastPutMessageDistributeTime; if (null == times) return null; final StringBuilder sb = new StringBuilder(); for (int i = 0; i < times.length; i++) { - long value = times[i].get(); + long value = times[i].longValue(); sb.append(String.format("%s:%d", PUT_MESSAGE_ENTIRE_TIME_MAX_DESC[i], value)); sb.append(" "); } @@ -477,19 +477,19 @@ public class StoreStatsService extends ServiceThread { } this.getTimesFoundList.add(new CallSnapshot(System.currentTimeMillis(), - this.getMessageTimesTotalFound.get())); + this.getMessageTimesTotalFound.longValue())); if (this.getTimesFoundList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) { this.getTimesFoundList.removeFirst(); } this.getTimesMissList.add(new CallSnapshot(System.currentTimeMillis(), - this.getMessageTimesTotalMiss.get())); + this.getMessageTimesTotalMiss.longValue())); if (this.getTimesMissList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) { this.getTimesMissList.removeFirst(); } this.transferedMsgCountList.add(new CallSnapshot(System.currentTimeMillis(), - this.getMessageTransferedMsgCount.get())); + this.getMessageTransferedMsgCount.longValue())); if (this.transferedMsgCountList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) { this.transferedMsgCountList.removeFirst(); } @@ -510,14 +510,14 @@ public class StoreStatsService extends ServiceThread { this.getGetTransferedTps(printTPSInterval) ); - final AtomicLong[] times = this.initPutMessageDistributeTime(); + final LongAdder[] times = this.initPutMessageDistributeTime(); if (null == times) return; final StringBuilder sb = new StringBuilder(); long totalPut = 0; for (int i = 0; i < times.length; i++) { - long value = times[i].get(); + long value = times[i].longValue(); totalPut += value; sb.append(String.format("%s:%d", PUT_MESSAGE_ENTIRE_TIME_MAX_DESC[i], value)); sb.append(" "); @@ -527,27 +527,27 @@ public class StoreStatsService extends ServiceThread { } } - public AtomicLong getGetMessageTimesTotalFound() { + public LongAdder getGetMessageTimesTotalFound() { return getMessageTimesTotalFound; } - public AtomicLong getGetMessageTimesTotalMiss() { + public LongAdder getGetMessageTimesTotalMiss() { return getMessageTimesTotalMiss; } - public AtomicLong getGetMessageTransferedMsgCount() { + public LongAdder getGetMessageTransferedMsgCount() { return getMessageTransferedMsgCount; } - public AtomicLong getPutMessageFailedTimes() { + public LongAdder getPutMessageFailedTimes() { return putMessageFailedTimes; } - public AtomicLong getSinglePutMessageTopicSizeTotal(String topic) { - AtomicLong rs = putMessageTopicSizeTotal.get(topic); + public LongAdder getSinglePutMessageTopicSizeTotal(String topic) { + LongAdder rs = putMessageTopicSizeTotal.get(topic); if (null == rs) { - rs = new AtomicLong(0); - AtomicLong previous = putMessageTopicSizeTotal.putIfAbsent(topic, rs); + rs = new LongAdder(); + LongAdder previous = putMessageTopicSizeTotal.putIfAbsent(topic, rs); if (previous != null) { rs = previous; } @@ -555,11 +555,11 @@ public class StoreStatsService extends ServiceThread { return rs; } - public AtomicLong getSinglePutMessageTopicTimesTotal(String topic) { - AtomicLong rs = putMessageTopicTimesTotal.get(topic); + public LongAdder getSinglePutMessageTopicTimesTotal(String topic) { + LongAdder rs = putMessageTopicTimesTotal.get(topic); if (null == rs) { - rs = new AtomicLong(0); - AtomicLong previous = putMessageTopicTimesTotal.putIfAbsent(topic, rs); + rs = new LongAdder(); + LongAdder previous = putMessageTopicTimesTotal.putIfAbsent(topic, rs); if (previous != null) { rs = previous; } @@ -567,11 +567,11 @@ public class StoreStatsService extends ServiceThread { return rs; } - public Map getPutMessageTopicTimesTotal() { + public Map getPutMessageTopicTimesTotal() { return putMessageTopicTimesTotal; } - public Map getPutMessageTopicSizeTotal() { + public Map getPutMessageTopicSizeTotal() { return putMessageTopicSizeTotal; } diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index 011cbe16..49391758 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -502,8 +502,8 @@ public class DLedgerCommitLog extends CommitLog { PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult); if (putMessageStatus == PutMessageStatus.PUT_OK) { // Statistics - storeStatsService.getSinglePutMessageTopicTimesTotal(finalTopic).incrementAndGet(); - storeStatsService.getSinglePutMessageTopicSizeTotal(msg.getTopic()).addAndGet(appendResult.getWroteBytes()); + storeStatsService.getSinglePutMessageTopicTimesTotal(finalTopic).add(1); + storeStatsService.getSinglePutMessageTopicSizeTotal(msg.getTopic()).add(appendResult.getWroteBytes()); } return putMessageResult; }); @@ -629,8 +629,8 @@ public class DLedgerCommitLog extends CommitLog { PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult); if (putMessageStatus == PutMessageStatus.PUT_OK) { // Statistics - storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).incrementAndGet(); - storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(appendResult.getWroteBytes()); + storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).add(1); + storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).add(appendResult.getWroteBytes()); } return putMessageResult; }); diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java index 38ace7d3..a7cbdd36 100644 --- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java +++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java @@ -45,7 +45,7 @@ public class BrokerStats { this.msgPutTotalTodayMorning = this.defaultMessageStore.getStoreStatsService().getPutMessageTimesTotal(); this.msgGetTotalTodayMorning = - this.defaultMessageStore.getStoreStatsService().getGetMessageTransferedMsgCount().get(); + this.defaultMessageStore.getStoreStatsService().getGetMessageTransferedMsgCount().longValue(); log.info("yesterday put message total: {}", msgPutTotalTodayMorning - msgPutTotalYesterdayMorning); log.info("yesterday get message total: {}", msgGetTotalTodayMorning - msgGetTotalYesterdayMorning); @@ -88,6 +88,6 @@ public class BrokerStats { } public long getMsgGetTotalTodayNow() { - return this.defaultMessageStore.getStoreStatsService().getGetMessageTransferedMsgCount().get(); + return this.defaultMessageStore.getStoreStatsService().getGetMessageTransferedMsgCount().longValue(); } } diff --git a/store/src/test/java/org/apache/rocketmq/store/StoreStatsServiceTest.java b/store/src/test/java/org/apache/rocketmq/store/StoreStatsServiceTest.java index b8a99701..6e66a448 100644 --- a/store/src/test/java/org/apache/rocketmq/store/StoreStatsServiceTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/StoreStatsServiceTest.java @@ -21,6 +21,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; + import org.junit.Test; public class StoreStatsServiceTest { @@ -30,7 +32,7 @@ public class StoreStatsServiceTest { final StoreStatsService storeStatsService = new StoreStatsService(); int num = Runtime.getRuntime().availableProcessors() * 2; for (int j = 0; j < 100; j++) { - final AtomicReference reference = new AtomicReference<>(null); + final AtomicReference reference = new AtomicReference<>(null); final CountDownLatch latch = new CountDownLatch(num); final CyclicBarrier barrier = new CyclicBarrier(num); for (int i = 0; i < num; i++) { @@ -39,9 +41,9 @@ public class StoreStatsServiceTest { public void run() { try { barrier.await(); - AtomicLong atomicLong = storeStatsService.getSinglePutMessageTopicSizeTotal("test"); - if (reference.compareAndSet(null, atomicLong)) { - } else if (reference.get() != atomicLong) { + LongAdder longAdder = storeStatsService.getSinglePutMessageTopicSizeTotal("test"); + if (reference.compareAndSet(null, longAdder)) { + } else if (reference.get() != longAdder) { throw new RuntimeException("Reference should be same!"); } } catch (InterruptedException | BrokenBarrierException e) { @@ -61,7 +63,7 @@ public class StoreStatsServiceTest { final StoreStatsService storeStatsService = new StoreStatsService(); int num = Runtime.getRuntime().availableProcessors() * 2; for (int j = 0; j < 100; j++) { - final AtomicReference reference = new AtomicReference<>(null); + final AtomicReference reference = new AtomicReference<>(null); final CountDownLatch latch = new CountDownLatch(num); final CyclicBarrier barrier = new CyclicBarrier(num); for (int i = 0; i < num; i++) { @@ -70,9 +72,9 @@ public class StoreStatsServiceTest { public void run() { try { barrier.await(); - AtomicLong atomicLong = storeStatsService.getSinglePutMessageTopicTimesTotal("test"); - if (reference.compareAndSet(null, atomicLong)) { - } else if (reference.get() != atomicLong) { + LongAdder longAdder = storeStatsService.getSinglePutMessageTopicTimesTotal("test"); + if (reference.compareAndSet(null, longAdder)) { + } else if (reference.get() != longAdder) { throw new RuntimeException("Reference should be same!"); } } catch (InterruptedException | BrokenBarrierException e) { -- GitLab