diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 8c3a2768a995c3c5aab88bf13d4fc65669b90aa1..89e2cc028443579c443cf233d6f7f9cc8cdab33a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -144,7 +144,7 @@ public class DefaultMessageStore implements MessageStore { } this.reputMessageService = new ReputMessageService(); - this.scheduleMessageService = new ScheduleMessageService(this, brokerStatsManager); + this.scheduleMessageService = new ScheduleMessageService(this); this.transientStorePool = new TransientStorePool(messageStoreConfig); diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index c13ad4cfa22be8a5829075523b2d4e96510ca558..dd520f4859c8ba971aacc73bb74eacd073be4a53 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -152,6 +152,8 @@ public class MessageStoreConfig { private boolean isEnableBatchPush = false; + private boolean enableScheduleMessageStats = true; + public boolean isDebugLockEnable() { return debugLockEnable; } @@ -722,4 +724,12 @@ public class MessageStoreConfig { public void setEnableBatchPush(boolean enableBatchPush) { isEnableBatchPush = enableBatchPush; } + + public boolean isEnableScheduleMessageStats() { + return enableScheduleMessageStats; + } + + public void setEnableScheduleMessageStats(boolean enableScheduleMessageStats) { + this.enableScheduleMessageStats = enableScheduleMessageStats; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java index ef5fac25cdc6b5d20167f0aeb578374811f0311c..a78c5f8a82e42d42d8f95581fe7174fe43c78a2a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java @@ -44,7 +44,6 @@ import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.config.StorePathConfigHelper; -import org.apache.rocketmq.store.stats.BrokerStatsManager; public class ScheduleMessageService extends ConfigManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); @@ -59,7 +58,6 @@ public class ScheduleMessageService extends ConfigManager { private final ConcurrentMap offsetTable = new ConcurrentHashMap(32); private final DefaultMessageStore defaultMessageStore; - private final BrokerStatsManager brokerStatsManager; private final AtomicBoolean started = new AtomicBoolean(false); private Timer timer; private MessageStore writeMessageStore; @@ -68,13 +66,6 @@ public class ScheduleMessageService extends ConfigManager { public ScheduleMessageService(final DefaultMessageStore defaultMessageStore) { this.defaultMessageStore = defaultMessageStore; this.writeMessageStore = defaultMessageStore; - this.brokerStatsManager = null; - } - - public ScheduleMessageService(final DefaultMessageStore defaultMessageStore, final BrokerStatsManager brokerStatsManager) { - this.defaultMessageStore = defaultMessageStore; - this.writeMessageStore = defaultMessageStore; - this.brokerStatsManager = brokerStatsManager; } public static int queueId2DelayLevel(final int queueId) { @@ -325,13 +316,12 @@ public class ScheduleMessageService extends ConfigManager { if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { - if (ScheduleMessageService.this.brokerStatsManager == null) { - continue; + if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) { + ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1); + ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), + putMessageResult.getAppendMessageResult().getWroteBytes()); + ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum()); } - ScheduleMessageService.this.brokerStatsManager.incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1); - ScheduleMessageService.this.brokerStatsManager.incTopicPutSize(msgInner.getTopic(), - putMessageResult.getAppendMessageResult().getWroteBytes()); - ScheduleMessageService.this.brokerStatsManager.incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum()); continue; } else { // XXX: warn and notify me