diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 1b1cf4ba53ad6778aaafc3967b0c969f3756f71b..2cd142fb3761095af21d1d0c08b3c58afc8edd27 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -48,6 +48,7 @@ import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.sysflag.TopicSysFlag; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.RemotingResponseCallback; @@ -515,6 +516,11 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER); if (sendOK) { + if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(msg.getTopic())) { + this.brokerController.getBrokerStatsManager().incQueuePutNums(msg.getTopic(), msg.getQueueId(), putMessageResult.getAppendMessageResult().getMsgNum(), 1); + this.brokerController.getBrokerStatsManager().incQueuePutSize(msg.getTopic(), msg.getQueueId(), putMessageResult.getAppendMessageResult().getWroteBytes()); + } + this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1); this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes()); diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java index 9d95ecb5ea4cfaa7d445318f079afcbb658654a9..ec1e1f0245f43f1ac0bad7eaa80fc68088e69002 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -58,6 +58,7 @@ public class MixAll { public static final String DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER"; public static final String DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER"; public static final String TOOLS_CONSUMER_GROUP = "TOOLS_CONSUMER"; + public static final String SCHEDULE_CONSUMER_GROUP = "SCHEDULE_CONSUMER"; public static final String FILTERSRV_CONSUMER_GROUP = "FILTERSRV_CONSUMER"; public static final String MONITOR_CONSUMER_GROUP = "__MONITOR_CONSUMER"; public static final String CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER"; diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java index bacae1e80bcfb6d7455a88ed493efb270cd595d1..1164ab8f1a53b5063f8dfc609997c113259c8b9e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.rocketmq.common.ConfigManager; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.topic.TopicValidator; @@ -318,6 +319,10 @@ public class ScheduleMessageService extends ConfigManager { if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) { + ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getMsgNum()); + ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getWroteBytes()); + ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getMsgNum()); + ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getWroteBytes()); ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1); ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes()); diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java index 7bb6a8b4f5a653801c82d52b93e38cfabfcf09d6..3e643e32a744dce2ddbfccc8694160a14aa2fcd0 100644 --- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java +++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java @@ -29,6 +29,10 @@ import org.apache.rocketmq.common.stats.StatsItemSet; public class BrokerStatsManager { + public static final String QUEUE_PUT_NUMS = "QUEUE_PUT_NUMS"; + public static final String QUEUE_PUT_SIZE = "QUEUE_PUT_SIZE"; + public static final String QUEUE_GET_NUMS = "QUEUE_GET_NUMS"; + public static final String QUEUE_GET_SIZE = "QUEUE_GET_SIZE"; public static final String TOPIC_PUT_NUMS = "TOPIC_PUT_NUMS"; public static final String TOPIC_PUT_SIZE = "TOPIC_PUT_SIZE"; public static final String GROUP_GET_NUMS = "GROUP_GET_NUMS"; @@ -74,6 +78,10 @@ public class BrokerStatsManager { public BrokerStatsManager(String clusterName) { this.clusterName = clusterName; + this.statsTable.put(QUEUE_PUT_NUMS, new StatsItemSet(QUEUE_PUT_NUMS, this.scheduledExecutorService, log)); + this.statsTable.put(QUEUE_PUT_SIZE, new StatsItemSet(QUEUE_PUT_SIZE, this.scheduledExecutorService, log)); + this.statsTable.put(QUEUE_GET_NUMS, new StatsItemSet(QUEUE_GET_NUMS, this.scheduledExecutorService, log)); + this.statsTable.put(QUEUE_GET_SIZE, new StatsItemSet(QUEUE_GET_SIZE, this.scheduledExecutorService, log)); this.statsTable.put(TOPIC_PUT_NUMS, new StatsItemSet(TOPIC_PUT_NUMS, this.scheduledExecutorService, log)); this.statsTable.put(TOPIC_PUT_SIZE, new StatsItemSet(TOPIC_PUT_SIZE, this.scheduledExecutorService, log)); this.statsTable.put(GROUP_GET_NUMS, new StatsItemSet(GROUP_GET_NUMS, this.scheduledExecutorService, log)); @@ -124,8 +132,12 @@ public class BrokerStatsManager { public void onTopicDeleted(final String topic) { this.statsTable.get(TOPIC_PUT_NUMS).delValue(topic); this.statsTable.get(TOPIC_PUT_SIZE).delValue(topic); + this.statsTable.get(QUEUE_PUT_NUMS).delValueByPrefixKey(topic, "@"); + this.statsTable.get(QUEUE_PUT_SIZE).delValueByPrefixKey(topic, "@"); this.statsTable.get(GROUP_GET_NUMS).delValueByPrefixKey(topic, "@"); this.statsTable.get(GROUP_GET_SIZE).delValueByPrefixKey(topic, "@"); + this.statsTable.get(QUEUE_GET_NUMS).delValueByPrefixKey(topic, "@"); + this.statsTable.get(QUEUE_GET_SIZE).delValueByPrefixKey(topic, "@"); this.statsTable.get(SNDBCK_PUT_NUMS).delValueByPrefixKey(topic, "@"); this.statsTable.get(GROUP_GET_LATENCY).delValueByInfixKey(topic, "@"); this.momentStatsItemSetFallSize.delValueByInfixKey(topic, "@"); @@ -135,12 +147,36 @@ public class BrokerStatsManager { public void onGroupDeleted(final String group) { this.statsTable.get(GROUP_GET_NUMS).delValueBySuffixKey(group, "@"); this.statsTable.get(GROUP_GET_SIZE).delValueBySuffixKey(group, "@"); + this.statsTable.get(QUEUE_GET_NUMS).delValueBySuffixKey(group, "@"); + this.statsTable.get(QUEUE_GET_SIZE).delValueBySuffixKey(group, "@"); this.statsTable.get(SNDBCK_PUT_NUMS).delValueBySuffixKey(group, "@"); this.statsTable.get(GROUP_GET_LATENCY).delValueBySuffixKey(group, "@"); this.momentStatsItemSetFallSize.delValueBySuffixKey(group, "@"); this.momentStatsItemSetFallTime.delValueBySuffixKey(group, "@"); } + public void incQueuePutNums(final String topic, final Integer queueId) { + this.statsTable.get(QUEUE_PUT_NUMS).addValue(buildStatsKey(topic, String.valueOf(queueId)), 1, 1); + } + + public void incQueuePutNums(final String topic, final Integer queueId, int num, int times) { + this.statsTable.get(QUEUE_PUT_NUMS).addValue(buildStatsKey(topic, String.valueOf(queueId)), num, times); + } + + public void incQueuePutSize(final String topic, final Integer queueId, final int size) { + this.statsTable.get(QUEUE_PUT_SIZE).addValue(buildStatsKey(topic, String.valueOf(queueId)), size, 1); + } + + public void incQueueGetNums(final String group, final String topic, final Integer queueId, final int incValue) { + final String statsKey = buildStatsKey(buildStatsKey(topic, String.valueOf(queueId)), group); + this.statsTable.get(QUEUE_GET_NUMS).addValue(statsKey, incValue, 1); + } + + public void incQueueGetSize(final String group, final String topic, final Integer queueId, final int incValue) { + final String statsKey = buildStatsKey(buildStatsKey(topic, String.valueOf(queueId)), group); + this.statsTable.get(QUEUE_GET_SIZE).addValue(statsKey, incValue, 1); + } + public void incTopicPutNums(final String topic) { this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, 1, 1); } @@ -158,11 +194,11 @@ public class BrokerStatsManager { this.statsTable.get(GROUP_GET_NUMS).addValue(statsKey, incValue, 1); } - public String buildStatsKey(String topic, String group) { - StringBuilder strBuilder = new StringBuilder(); - strBuilder.append(topic); + public String buildStatsKey(String prefix, String suffix) { + StringBuffer strBuilder = new StringBuffer(); + strBuilder.append(prefix); strBuilder.append("@"); - strBuilder.append(group); + strBuilder.append(suffix); return strBuilder.toString(); } diff --git a/store/src/test/java/stats/BrokerStatsManagerTest.java b/store/src/test/java/stats/BrokerStatsManagerTest.java index 17020729a63f89b80656f8d92f7bcdaf7cc12fc9..2b6d0f8b68d6f31619dfe47c1f651b1c59f81b86 100644 --- a/store/src/test/java/stats/BrokerStatsManagerTest.java +++ b/store/src/test/java/stats/BrokerStatsManagerTest.java @@ -29,6 +29,10 @@ import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_FALL_ import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_LATENCY; import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_NUMS; import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_SIZE; +import static org.apache.rocketmq.store.stats.BrokerStatsManager.QUEUE_GET_NUMS; +import static org.apache.rocketmq.store.stats.BrokerStatsManager.QUEUE_GET_SIZE; +import static org.apache.rocketmq.store.stats.BrokerStatsManager.QUEUE_PUT_NUMS; +import static org.apache.rocketmq.store.stats.BrokerStatsManager.QUEUE_PUT_SIZE; import static org.apache.rocketmq.store.stats.BrokerStatsManager.SNDBCK_PUT_NUMS; import static org.apache.rocketmq.store.stats.BrokerStatsManager.TOPIC_PUT_NUMS; import static org.apache.rocketmq.store.stats.BrokerStatsManager.TOPIC_PUT_SIZE; @@ -38,6 +42,7 @@ public class BrokerStatsManagerTest { private BrokerStatsManager brokerStatsManager; private String TOPIC = "TOPIC_TEST"; + private Integer QUEUE_ID = 0; private String GROUP_NAME = "GROUP_TEST"; @Before @@ -56,6 +61,36 @@ public class BrokerStatsManagerTest { assertThat(brokerStatsManager.getStatsItem("TEST", "TEST")).isNull(); } + @Test + public void testIncQueuePutNums() { + brokerStatsManager.incQueuePutNums(TOPIC, QUEUE_ID); + String statsKey = brokerStatsManager.buildStatsKey(TOPIC, String.valueOf(QUEUE_ID)); + assertThat(brokerStatsManager.getStatsItem(QUEUE_PUT_NUMS, statsKey).getTimes().doubleValue()).isEqualTo(1L); + brokerStatsManager.incQueuePutNums(TOPIC, QUEUE_ID, 2, 2); + assertThat(brokerStatsManager.getStatsItem(QUEUE_PUT_NUMS, statsKey).getValue().doubleValue()).isEqualTo(3L); + } + + @Test + public void testIncQueuePutSize() { + brokerStatsManager.incQueuePutSize(TOPIC, QUEUE_ID, 2); + String statsKey = brokerStatsManager.buildStatsKey(TOPIC, String.valueOf(QUEUE_ID)); + assertThat(brokerStatsManager.getStatsItem(QUEUE_PUT_SIZE, statsKey).getValue().doubleValue()).isEqualTo(2L); + } + + @Test + public void testIncQueueGetNums() { + brokerStatsManager.incQueueGetNums(GROUP_NAME, TOPIC, QUEUE_ID, 1); + final String statsKey = brokerStatsManager.buildStatsKey(brokerStatsManager.buildStatsKey(TOPIC, String.valueOf(QUEUE_ID)), GROUP_NAME); + assertThat(brokerStatsManager.getStatsItem(QUEUE_GET_NUMS, statsKey).getValue().doubleValue()).isEqualTo(1L); + } + + @Test + public void testIncQueueGetSize() { + brokerStatsManager.incQueueGetSize(GROUP_NAME, TOPIC, QUEUE_ID, 1); + final String statsKey = brokerStatsManager.buildStatsKey(brokerStatsManager.buildStatsKey(TOPIC, String.valueOf(QUEUE_ID)), GROUP_NAME); + assertThat(brokerStatsManager.getStatsItem(QUEUE_GET_SIZE, statsKey).getValue().doubleValue()).isEqualTo(1L); + } + @Test public void testIncTopicPutNums() { brokerStatsManager.incTopicPutNums(TOPIC); @@ -101,8 +136,12 @@ public class BrokerStatsManagerTest { public void testOnTopicDeleted() { brokerStatsManager.incTopicPutNums(TOPIC); brokerStatsManager.incTopicPutSize(TOPIC, 100); + brokerStatsManager.incQueuePutNums(TOPIC, QUEUE_ID); + brokerStatsManager.incQueuePutSize(TOPIC, QUEUE_ID, 100); brokerStatsManager.incGroupGetNums(GROUP_NAME, TOPIC, 1); brokerStatsManager.incGroupGetSize(GROUP_NAME, TOPIC, 100); + brokerStatsManager.incQueueGetNums(GROUP_NAME, TOPIC, QUEUE_ID, 1); + brokerStatsManager.incQueueGetSize(GROUP_NAME, TOPIC, QUEUE_ID, 100); brokerStatsManager.incSendBackNums(GROUP_NAME, TOPIC); brokerStatsManager.incGroupGetLatency(GROUP_NAME, TOPIC, 1, 1); brokerStatsManager.recordDiskFallBehindTime(GROUP_NAME, TOPIC, 1, 11L); @@ -112,8 +151,12 @@ public class BrokerStatsManagerTest { Assert.assertNull(brokerStatsManager.getStatsItem(TOPIC_PUT_NUMS, TOPIC)); Assert.assertNull(brokerStatsManager.getStatsItem(TOPIC_PUT_SIZE, TOPIC)); + Assert.assertNull(brokerStatsManager.getStatsItem(QUEUE_PUT_NUMS, TOPIC + "@" + QUEUE_ID)); + Assert.assertNull(brokerStatsManager.getStatsItem(QUEUE_PUT_SIZE, TOPIC + "@" + QUEUE_ID)); Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_SIZE, TOPIC + "@" + GROUP_NAME)); Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_NUMS, TOPIC + "@" + GROUP_NAME)); + Assert.assertNull(brokerStatsManager.getStatsItem(QUEUE_GET_SIZE, TOPIC + "@" + QUEUE_ID + "@" + GROUP_NAME)); + Assert.assertNull(brokerStatsManager.getStatsItem(QUEUE_GET_NUMS, TOPIC + "@" + QUEUE_ID + "@" + GROUP_NAME)); Assert.assertNull(brokerStatsManager.getStatsItem(SNDBCK_PUT_NUMS, TOPIC + "@" + GROUP_NAME)); Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, "1@" + TOPIC + "@" + GROUP_NAME)); Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_SIZE, "1@" + TOPIC + "@" + GROUP_NAME)); @@ -124,6 +167,8 @@ public class BrokerStatsManagerTest { public void testOnGroupDeleted(){ brokerStatsManager.incGroupGetNums(GROUP_NAME, TOPIC, 1); brokerStatsManager.incGroupGetSize(GROUP_NAME, TOPIC, 100); + brokerStatsManager.incQueueGetNums(GROUP_NAME, TOPIC, QUEUE_ID, 1); + brokerStatsManager.incQueueGetSize(GROUP_NAME, TOPIC, QUEUE_ID, 100); brokerStatsManager.incSendBackNums(GROUP_NAME, TOPIC); brokerStatsManager.incGroupGetLatency(GROUP_NAME, TOPIC, 1, 1); brokerStatsManager.recordDiskFallBehindTime(GROUP_NAME, TOPIC, 1, 11L); @@ -133,6 +178,8 @@ public class BrokerStatsManagerTest { Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_SIZE, TOPIC + "@" + GROUP_NAME)); Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_NUMS, TOPIC + "@" + GROUP_NAME)); + Assert.assertNull(brokerStatsManager.getStatsItem(QUEUE_GET_SIZE, TOPIC + "@" + QUEUE_ID + "@" + GROUP_NAME)); + Assert.assertNull(brokerStatsManager.getStatsItem(QUEUE_GET_NUMS, TOPIC + "@" + QUEUE_ID + "@" + GROUP_NAME)); Assert.assertNull(brokerStatsManager.getStatsItem(SNDBCK_PUT_NUMS, TOPIC + "@" + GROUP_NAME)); Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, "1@" + TOPIC + "@" + GROUP_NAME)); Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_SIZE, "1@" + TOPIC + "@" + GROUP_NAME));