From 9eca70f51c9e71970e7e49d8d31d0c453531f7c4 Mon Sep 17 00:00:00 2001 From: zhangxu16 Date: Fri, 19 Feb 2021 20:54:21 +0800 Subject: [PATCH] Add delay message stats to brokerStatsManager --- .../org/apache/rocketmq/store/DefaultMessageStore.java | 2 +- .../rocketmq/store/schedule/ScheduleMessageService.java | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 89e2cc02..8c3a2768 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -144,7 +144,7 @@ public class DefaultMessageStore implements MessageStore { } this.reputMessageService = new ReputMessageService(); - this.scheduleMessageService = new ScheduleMessageService(this); + this.scheduleMessageService = new ScheduleMessageService(this, brokerStatsManager); this.transientStorePool = new TransientStorePool(messageStoreConfig); diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java index 3b19a16f..9006ecb0 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java @@ -44,6 +44,7 @@ import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.config.StorePathConfigHelper; +import org.apache.rocketmq.store.stats.BrokerStatsManager; public class ScheduleMessageService extends ConfigManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); @@ -58,14 +59,16 @@ public class ScheduleMessageService extends ConfigManager { private final ConcurrentMap offsetTable = new ConcurrentHashMap(32); private final DefaultMessageStore defaultMessageStore; + private final BrokerStatsManager brokerStatsManager; private final AtomicBoolean started = new AtomicBoolean(false); private Timer timer; private MessageStore writeMessageStore; private int maxDelayLevel; - public ScheduleMessageService(final DefaultMessageStore defaultMessageStore) { + public ScheduleMessageService(final DefaultMessageStore defaultMessageStore, final BrokerStatsManager brokerStatsManager) { this.defaultMessageStore = defaultMessageStore; this.writeMessageStore = defaultMessageStore; + this.brokerStatsManager = brokerStatsManager; } public static int queueId2DelayLevel(final int queueId) { @@ -316,6 +319,10 @@ public class ScheduleMessageService extends ConfigManager { if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { + ScheduleMessageService.this.brokerStatsManager.incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1); + ScheduleMessageService.this.brokerStatsManager.incTopicPutSize(msgInner.getTopic(), + putMessageResult.getAppendMessageResult().getWroteBytes()); + ScheduleMessageService.this.brokerStatsManager.incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum()); continue; } else { // XXX: warn and notify me -- GitLab