未验证 提交 d7a830cf 编写于 作者: H huangli 提交者: GitHub

[ISSUE 3194] [PART A] Use LongAdder instead of AtomicLong in...

[ISSUE 3194] [PART A] Use LongAdder instead of AtomicLong in BrokerStatsService to improve performance. (#3195)
上级 857d28df
......@@ -685,8 +685,8 @@ public class CommitLog {
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
......@@ -802,8 +802,8 @@ public class CommitLog {
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum());
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes());
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).add(result.getMsgNum());
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).add(result.getWroteBytes());
CompletableFuture<PutMessageStatus> flushOKFuture = submitFlushRequest(result, messageExtBatch);
CompletableFuture<PutMessageStatus> replicaOKFuture = submitReplicaRequest(result, messageExtBatch);
......
......@@ -438,7 +438,7 @@ public class DefaultMessageStore implements MessageStore {
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
this.storeStatsService.getPutMessageFailedTimes().add(1);
}
});
......@@ -468,7 +468,7 @@ public class DefaultMessageStore implements MessageStore {
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
this.storeStatsService.getPutMessageFailedTimes().add(1);
}
});
......@@ -634,7 +634,7 @@ public class DefaultMessageStore implements MessageStore {
continue;
}
this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
this.storeStatsService.getGetMessageTransferedMsgCount().add(1);
getResult.addMessage(selectResult);
status = GetMessageStatus.FOUND;
nextPhyFileStartOffset = Long.MIN_VALUE;
......@@ -668,9 +668,9 @@ public class DefaultMessageStore implements MessageStore {
}
if (GetMessageStatus.FOUND == status) {
this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
this.storeStatsService.getGetMessageTimesTotalFound().add(1);
} else {
this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
this.storeStatsService.getGetMessageTimesTotalMiss().add(1);
}
long elapsedTime = this.getSystemClock().now() - beginTime;
this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime);
......@@ -1922,10 +1922,10 @@ public class DefaultMessageStore implements MessageStore {
readSize += size;
if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
.addAndGet(dispatchRequest.getMsgSize());
.add(dispatchRequest.getMsgSize());
}
} else if (size == 0) {
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
......
......@@ -22,7 +22,7 @@ import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.constant.LoggerName;
......@@ -41,23 +41,23 @@ public class StoreStatsService extends ServiceThread {
private static int printTPSInterval = 60 * 1;
private final AtomicLong putMessageFailedTimes = new AtomicLong(0);
private final LongAdder putMessageFailedTimes = new LongAdder();
private final ConcurrentMap<String, AtomicLong> putMessageTopicTimesTotal =
new ConcurrentHashMap<String, AtomicLong>(128);
private final ConcurrentMap<String, AtomicLong> putMessageTopicSizeTotal =
new ConcurrentHashMap<String, AtomicLong>(128);
private final ConcurrentMap<String, LongAdder> putMessageTopicTimesTotal =
new ConcurrentHashMap<>(128);
private final ConcurrentMap<String, LongAdder> putMessageTopicSizeTotal =
new ConcurrentHashMap<>(128);
private final AtomicLong getMessageTimesTotalFound = new AtomicLong(0);
private final AtomicLong getMessageTransferedMsgCount = new AtomicLong(0);
private final AtomicLong getMessageTimesTotalMiss = new AtomicLong(0);
private final LongAdder getMessageTimesTotalFound = new LongAdder();
private final LongAdder getMessageTransferedMsgCount = new LongAdder();
private final LongAdder getMessageTimesTotalMiss = new LongAdder();
private final LinkedList<CallSnapshot> putTimesList = new LinkedList<CallSnapshot>();
private final LinkedList<CallSnapshot> getTimesFoundList = new LinkedList<CallSnapshot>();
private final LinkedList<CallSnapshot> getTimesMissList = new LinkedList<CallSnapshot>();
private final LinkedList<CallSnapshot> transferedMsgCountList = new LinkedList<CallSnapshot>();
private volatile AtomicLong[] putMessageDistributeTime;
private volatile AtomicLong[] lastPutMessageDistributeTime;
private volatile LongAdder[] putMessageDistributeTime;
private volatile LongAdder[] lastPutMessageDistributeTime;
private long messageStoreBootTimestamp = System.currentTimeMillis();
private volatile long putMessageEntireTimeMax = 0;
private volatile long getMessageEntireTimeMax = 0;
......@@ -75,10 +75,10 @@ public class StoreStatsService extends ServiceThread {
this.initPutMessageDistributeTime();
}
private AtomicLong[] initPutMessageDistributeTime() {
AtomicLong[] next = new AtomicLong[13];
private LongAdder[] initPutMessageDistributeTime() {
LongAdder[] next = new LongAdder[13];
for (int i = 0; i < next.length; i++) {
next[i] = new AtomicLong(0);
next[i] = new LongAdder();
}
this.lastPutMessageDistributeTime = this.putMessageDistributeTime;
......@@ -93,48 +93,48 @@ public class StoreStatsService extends ServiceThread {
}
public void setPutMessageEntireTimeMax(long value) {
final AtomicLong[] times = this.putMessageDistributeTime;
final LongAdder[] times = this.putMessageDistributeTime;
if (null == times)
return;
// us
if (value <= 0) {
times[0].incrementAndGet();
times[0].add(1);
} else if (value < 10) {
times[1].incrementAndGet();
times[1].add(1);
} else if (value < 50) {
times[2].incrementAndGet();
times[2].add(1);
} else if (value < 100) {
times[3].incrementAndGet();
times[3].add(1);
} else if (value < 200) {
times[4].incrementAndGet();
times[4].add(1);
} else if (value < 500) {
times[5].incrementAndGet();
times[5].add(1);
} else if (value < 1000) {
times[6].incrementAndGet();
times[6].add(1);
}
// 2s
else if (value < 2000) {
times[7].incrementAndGet();
times[7].add(1);
}
// 3s
else if (value < 3000) {
times[8].incrementAndGet();
times[8].add(1);
}
// 4s
else if (value < 4000) {
times[9].incrementAndGet();
times[9].add(1);
}
// 5s
else if (value < 5000) {
times[10].incrementAndGet();
times[10].add(1);
}
// 10s
else if (value < 10000) {
times[11].incrementAndGet();
times[11].add(1);
} else {
times[12].incrementAndGet();
times[12].add(1);
}
if (value > this.putMessageEntireTimeMax) {
......@@ -194,8 +194,8 @@ public class StoreStatsService extends ServiceThread {
public long getPutMessageTimesTotal() {
long rs = 0;
for (AtomicLong data : putMessageTopicTimesTotal.values()) {
rs += data.get();
for (LongAdder data : putMessageTopicTimesTotal.values()) {
rs += data.longValue();
}
return rs;
}
......@@ -218,8 +218,8 @@ public class StoreStatsService extends ServiceThread {
public long getPutMessageSizeTotal() {
long rs = 0;
for (AtomicLong data : putMessageTopicSizeTotal.values()) {
rs += data.get();
for (LongAdder data : putMessageTopicSizeTotal.values()) {
rs += data.longValue();
}
return rs;
}
......@@ -299,13 +299,13 @@ public class StoreStatsService extends ServiceThread {
}
private String putMessageDistributeTimeToString() {
final AtomicLong[] times = this.lastPutMessageDistributeTime;
final LongAdder[] times = this.lastPutMessageDistributeTime;
if (null == times)
return null;
final StringBuilder sb = new StringBuilder();
for (int i = 0; i < times.length; i++) {
long value = times[i].get();
long value = times[i].longValue();
sb.append(String.format("%s:%d", PUT_MESSAGE_ENTIRE_TIME_MAX_DESC[i], value));
sb.append(" ");
}
......@@ -477,19 +477,19 @@ public class StoreStatsService extends ServiceThread {
}
this.getTimesFoundList.add(new CallSnapshot(System.currentTimeMillis(),
this.getMessageTimesTotalFound.get()));
this.getMessageTimesTotalFound.longValue()));
if (this.getTimesFoundList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) {
this.getTimesFoundList.removeFirst();
}
this.getTimesMissList.add(new CallSnapshot(System.currentTimeMillis(),
this.getMessageTimesTotalMiss.get()));
this.getMessageTimesTotalMiss.longValue()));
if (this.getTimesMissList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) {
this.getTimesMissList.removeFirst();
}
this.transferedMsgCountList.add(new CallSnapshot(System.currentTimeMillis(),
this.getMessageTransferedMsgCount.get()));
this.getMessageTransferedMsgCount.longValue()));
if (this.transferedMsgCountList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) {
this.transferedMsgCountList.removeFirst();
}
......@@ -510,14 +510,14 @@ public class StoreStatsService extends ServiceThread {
this.getGetTransferedTps(printTPSInterval)
);
final AtomicLong[] times = this.initPutMessageDistributeTime();
final LongAdder[] times = this.initPutMessageDistributeTime();
if (null == times)
return;
final StringBuilder sb = new StringBuilder();
long totalPut = 0;
for (int i = 0; i < times.length; i++) {
long value = times[i].get();
long value = times[i].longValue();
totalPut += value;
sb.append(String.format("%s:%d", PUT_MESSAGE_ENTIRE_TIME_MAX_DESC[i], value));
sb.append(" ");
......@@ -527,27 +527,27 @@ public class StoreStatsService extends ServiceThread {
}
}
public AtomicLong getGetMessageTimesTotalFound() {
public LongAdder getGetMessageTimesTotalFound() {
return getMessageTimesTotalFound;
}
public AtomicLong getGetMessageTimesTotalMiss() {
public LongAdder getGetMessageTimesTotalMiss() {
return getMessageTimesTotalMiss;
}
public AtomicLong getGetMessageTransferedMsgCount() {
public LongAdder getGetMessageTransferedMsgCount() {
return getMessageTransferedMsgCount;
}
public AtomicLong getPutMessageFailedTimes() {
public LongAdder getPutMessageFailedTimes() {
return putMessageFailedTimes;
}
public AtomicLong getSinglePutMessageTopicSizeTotal(String topic) {
AtomicLong rs = putMessageTopicSizeTotal.get(topic);
public LongAdder getSinglePutMessageTopicSizeTotal(String topic) {
LongAdder rs = putMessageTopicSizeTotal.get(topic);
if (null == rs) {
rs = new AtomicLong(0);
AtomicLong previous = putMessageTopicSizeTotal.putIfAbsent(topic, rs);
rs = new LongAdder();
LongAdder previous = putMessageTopicSizeTotal.putIfAbsent(topic, rs);
if (previous != null) {
rs = previous;
}
......@@ -555,11 +555,11 @@ public class StoreStatsService extends ServiceThread {
return rs;
}
public AtomicLong getSinglePutMessageTopicTimesTotal(String topic) {
AtomicLong rs = putMessageTopicTimesTotal.get(topic);
public LongAdder getSinglePutMessageTopicTimesTotal(String topic) {
LongAdder rs = putMessageTopicTimesTotal.get(topic);
if (null == rs) {
rs = new AtomicLong(0);
AtomicLong previous = putMessageTopicTimesTotal.putIfAbsent(topic, rs);
rs = new LongAdder();
LongAdder previous = putMessageTopicTimesTotal.putIfAbsent(topic, rs);
if (previous != null) {
rs = previous;
}
......@@ -567,11 +567,11 @@ public class StoreStatsService extends ServiceThread {
return rs;
}
public Map<String, AtomicLong> getPutMessageTopicTimesTotal() {
public Map<String, LongAdder> getPutMessageTopicTimesTotal() {
return putMessageTopicTimesTotal;
}
public Map<String, AtomicLong> getPutMessageTopicSizeTotal() {
public Map<String, LongAdder> getPutMessageTopicSizeTotal() {
return putMessageTopicSizeTotal;
}
......
......@@ -502,8 +502,8 @@ public class DLedgerCommitLog extends CommitLog {
PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult);
if (putMessageStatus == PutMessageStatus.PUT_OK) {
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(finalTopic).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(msg.getTopic()).addAndGet(appendResult.getWroteBytes());
storeStatsService.getSinglePutMessageTopicTimesTotal(finalTopic).add(1);
storeStatsService.getSinglePutMessageTopicSizeTotal(msg.getTopic()).add(appendResult.getWroteBytes());
}
return putMessageResult;
});
......@@ -629,8 +629,8 @@ public class DLedgerCommitLog extends CommitLog {
PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult);
if (putMessageStatus == PutMessageStatus.PUT_OK) {
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(appendResult.getWroteBytes());
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).add(1);
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).add(appendResult.getWroteBytes());
}
return putMessageResult;
});
......
......@@ -45,7 +45,7 @@ public class BrokerStats {
this.msgPutTotalTodayMorning =
this.defaultMessageStore.getStoreStatsService().getPutMessageTimesTotal();
this.msgGetTotalTodayMorning =
this.defaultMessageStore.getStoreStatsService().getGetMessageTransferedMsgCount().get();
this.defaultMessageStore.getStoreStatsService().getGetMessageTransferedMsgCount().longValue();
log.info("yesterday put message total: {}", msgPutTotalTodayMorning - msgPutTotalYesterdayMorning);
log.info("yesterday get message total: {}", msgGetTotalTodayMorning - msgGetTotalYesterdayMorning);
......@@ -88,6 +88,6 @@ public class BrokerStats {
}
public long getMsgGetTotalTodayNow() {
return this.defaultMessageStore.getStoreStatsService().getGetMessageTransferedMsgCount().get();
return this.defaultMessageStore.getStoreStatsService().getGetMessageTransferedMsgCount().longValue();
}
}
......@@ -21,6 +21,8 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import org.junit.Test;
public class StoreStatsServiceTest {
......@@ -30,7 +32,7 @@ public class StoreStatsServiceTest {
final StoreStatsService storeStatsService = new StoreStatsService();
int num = Runtime.getRuntime().availableProcessors() * 2;
for (int j = 0; j < 100; j++) {
final AtomicReference<AtomicLong> reference = new AtomicReference<>(null);
final AtomicReference<LongAdder> reference = new AtomicReference<>(null);
final CountDownLatch latch = new CountDownLatch(num);
final CyclicBarrier barrier = new CyclicBarrier(num);
for (int i = 0; i < num; i++) {
......@@ -39,9 +41,9 @@ public class StoreStatsServiceTest {
public void run() {
try {
barrier.await();
AtomicLong atomicLong = storeStatsService.getSinglePutMessageTopicSizeTotal("test");
if (reference.compareAndSet(null, atomicLong)) {
} else if (reference.get() != atomicLong) {
LongAdder longAdder = storeStatsService.getSinglePutMessageTopicSizeTotal("test");
if (reference.compareAndSet(null, longAdder)) {
} else if (reference.get() != longAdder) {
throw new RuntimeException("Reference should be same!");
}
} catch (InterruptedException | BrokenBarrierException e) {
......@@ -61,7 +63,7 @@ public class StoreStatsServiceTest {
final StoreStatsService storeStatsService = new StoreStatsService();
int num = Runtime.getRuntime().availableProcessors() * 2;
for (int j = 0; j < 100; j++) {
final AtomicReference<AtomicLong> reference = new AtomicReference<>(null);
final AtomicReference<LongAdder> reference = new AtomicReference<>(null);
final CountDownLatch latch = new CountDownLatch(num);
final CyclicBarrier barrier = new CyclicBarrier(num);
for (int i = 0; i < num; i++) {
......@@ -70,9 +72,9 @@ public class StoreStatsServiceTest {
public void run() {
try {
barrier.await();
AtomicLong atomicLong = storeStatsService.getSinglePutMessageTopicTimesTotal("test");
if (reference.compareAndSet(null, atomicLong)) {
} else if (reference.get() != atomicLong) {
LongAdder longAdder = storeStatsService.getSinglePutMessageTopicTimesTotal("test");
if (reference.compareAndSet(null, longAdder)) {
} else if (reference.get() != longAdder) {
throw new RuntimeException("Reference should be same!");
}
} catch (InterruptedException | BrokenBarrierException e) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册