diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index c13ad4cfa22be8a5829075523b2d4e96510ca558..dd520f4859c8ba971aacc73bb74eacd073be4a53 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -152,6 +152,8 @@ public class MessageStoreConfig { private boolean isEnableBatchPush = false; + private boolean enableScheduleMessageStats = true; + public boolean isDebugLockEnable() { return debugLockEnable; } @@ -722,4 +724,12 @@ public class MessageStoreConfig { public void setEnableBatchPush(boolean enableBatchPush) { isEnableBatchPush = enableBatchPush; } + + public boolean isEnableScheduleMessageStats() { + return enableScheduleMessageStats; + } + + public void setEnableScheduleMessageStats(boolean enableScheduleMessageStats) { + this.enableScheduleMessageStats = enableScheduleMessageStats; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java index 3b19a16fd4b7fc238b9e5697a32c2672fd270e19..a78c5f8a82e42d42d8f95581fe7174fe43c78a2a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java @@ -316,6 +316,12 @@ public class ScheduleMessageService extends ConfigManager { if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { + if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) { + ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1); + ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), + putMessageResult.getAppendMessageResult().getWroteBytes()); + ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum()); + } continue; } else { // XXX: warn and notify me diff --git a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java index fd860e6b9d7ea5d8a3d2c6e3a4fee714cda35d57..fa3c6bfcd8b9fd08035d5c7c409a10337f32b8d0 100644 --- a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java @@ -40,6 +40,9 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; +import static org.apache.rocketmq.store.stats.BrokerStatsManager.BROKER_PUT_NUMS; +import static org.apache.rocketmq.store.stats.BrokerStatsManager.TOPIC_PUT_NUMS; +import static org.apache.rocketmq.store.stats.BrokerStatsManager.TOPIC_PUT_SIZE; import static org.assertj.core.api.Assertions.assertThat; @@ -112,6 +115,10 @@ public class ScheduleMessageServiceTest { @Test public void deliverDelayedMessageTimerTaskTest() throws Exception { + assertThat(messageStore.getMessageStoreConfig().isEnableScheduleMessageStats()).isTrue(); + + assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_NUMS, topic)).isNull(); + MessageExtBrokerInner msg = buildMessage(); int realQueueId = msg.getQueueId(); // set delayLevel,and send delay message @@ -141,6 +148,10 @@ public class ScheduleMessageServiceTest { // now,found the message assertThat(messageResult.getStatus()).isEqualTo(GetMessageStatus.FOUND); + // get the stats change + assertThat(messageStore.getBrokerStatsManager().getStatsItem(BROKER_PUT_NUMS, brokerConfig.getBrokerClusterName()).getValue().get()).isEqualTo(1); + assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_NUMS, topic).getValue().get()).isEqualTo(1L); + assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_SIZE, topic).getValue().get()).isEqualTo(messageResult.getBufferTotalSize()); // get the message body ByteBuffer byteBuffer = ByteBuffer.allocate(messageResult.getBufferTotalSize());