From 19fb42f57c463c7bcf0747ca0d8432567a570662 Mon Sep 17 00:00:00 2001 From: zhangxu16 Date: Thu, 25 Feb 2021 20:54:29 +0800 Subject: [PATCH] Add config enableScheduleMessageStats --- .../rocketmq/store/DefaultMessageStore.java | 2 +- .../store/config/MessageStoreConfig.java | 10 ++++++++++ .../schedule/ScheduleMessageService.java | 20 +++++-------------- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 8c3a2768..89e2cc02 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -144,7 +144,7 @@ public class DefaultMessageStore implements MessageStore { } this.reputMessageService = new ReputMessageService(); - this.scheduleMessageService = new ScheduleMessageService(this, brokerStatsManager); + this.scheduleMessageService = new ScheduleMessageService(this); this.transientStorePool = new TransientStorePool(messageStoreConfig); diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index c13ad4cf..dd520f48 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -152,6 +152,8 @@ public class MessageStoreConfig { private boolean isEnableBatchPush = false; + private boolean enableScheduleMessageStats = true; + public boolean isDebugLockEnable() { return debugLockEnable; } @@ -722,4 +724,12 @@ public class MessageStoreConfig { public void setEnableBatchPush(boolean enableBatchPush) { isEnableBatchPush = enableBatchPush; } + + public boolean isEnableScheduleMessageStats() { + return enableScheduleMessageStats; + } + + public void setEnableScheduleMessageStats(boolean enableScheduleMessageStats) { + this.enableScheduleMessageStats = enableScheduleMessageStats; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java index ef5fac25..a78c5f8a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java @@ -44,7 +44,6 @@ import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.config.StorePathConfigHelper; -import org.apache.rocketmq.store.stats.BrokerStatsManager; public class ScheduleMessageService extends ConfigManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); @@ -59,7 +58,6 @@ public class ScheduleMessageService extends ConfigManager { private final ConcurrentMap offsetTable = new ConcurrentHashMap(32); private final DefaultMessageStore defaultMessageStore; - private final BrokerStatsManager brokerStatsManager; private final AtomicBoolean started = new AtomicBoolean(false); private Timer timer; private MessageStore writeMessageStore; @@ -68,13 +66,6 @@ public class ScheduleMessageService extends ConfigManager { public ScheduleMessageService(final DefaultMessageStore defaultMessageStore) { this.defaultMessageStore = defaultMessageStore; this.writeMessageStore = defaultMessageStore; - this.brokerStatsManager = null; - } - - public ScheduleMessageService(final DefaultMessageStore defaultMessageStore, final BrokerStatsManager brokerStatsManager) { - this.defaultMessageStore = defaultMessageStore; - this.writeMessageStore = defaultMessageStore; - this.brokerStatsManager = brokerStatsManager; } public static int queueId2DelayLevel(final int queueId) { @@ -325,13 +316,12 @@ public class ScheduleMessageService extends ConfigManager { if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { - if (ScheduleMessageService.this.brokerStatsManager == null) { - continue; + if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) { + ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1); + ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), + putMessageResult.getAppendMessageResult().getWroteBytes()); + ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum()); } - ScheduleMessageService.this.brokerStatsManager.incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1); - ScheduleMessageService.this.brokerStatsManager.incTopicPutSize(msgInner.getTopic(), - putMessageResult.getAppendMessageResult().getWroteBytes()); - ScheduleMessageService.this.brokerStatsManager.incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum()); continue; } else { // XXX: warn and notify me -- GitLab