未验证 提交 564ee290 编写于 作者: G Git_Yang 提交者: GitHub

[ISSUE] Add get stats and single queue stats for schedule topic

Signed-off-by: Nzhangyang21 <zhangyang21@xiaomi.com>
上级 84346587
......@@ -48,6 +48,7 @@ import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RemotingResponseCallback;
......@@ -515,6 +516,11 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
if (sendOK) {
if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(msg.getTopic())) {
this.brokerController.getBrokerStatsManager().incQueuePutNums(msg.getTopic(), msg.getQueueId(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
this.brokerController.getBrokerStatsManager().incQueuePutSize(msg.getTopic(), msg.getQueueId(), putMessageResult.getAppendMessageResult().getWroteBytes());
}
this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes());
......
......@@ -58,6 +58,7 @@ public class MixAll {
public static final String DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER";
public static final String DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER";
public static final String TOOLS_CONSUMER_GROUP = "TOOLS_CONSUMER";
public static final String SCHEDULE_CONSUMER_GROUP = "SCHEDULE_CONSUMER";
public static final String FILTERSRV_CONSUMER_GROUP = "FILTERSRV_CONSUMER";
public static final String MONITOR_CONSUMER_GROUP = "__MONITOR_CONSUMER";
public static final String CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER";
......
......@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.topic.TopicValidator;
......@@ -318,6 +319,10 @@ public class ScheduleMessageService extends ConfigManager {
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) {
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getMsgNum());
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getWroteBytes());
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getMsgNum());
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getWroteBytes());
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes());
......
......@@ -29,6 +29,10 @@ import org.apache.rocketmq.common.stats.StatsItemSet;
public class BrokerStatsManager {
public static final String QUEUE_PUT_NUMS = "QUEUE_PUT_NUMS";
public static final String QUEUE_PUT_SIZE = "QUEUE_PUT_SIZE";
public static final String QUEUE_GET_NUMS = "QUEUE_GET_NUMS";
public static final String QUEUE_GET_SIZE = "QUEUE_GET_SIZE";
public static final String TOPIC_PUT_NUMS = "TOPIC_PUT_NUMS";
public static final String TOPIC_PUT_SIZE = "TOPIC_PUT_SIZE";
public static final String GROUP_GET_NUMS = "GROUP_GET_NUMS";
......@@ -74,6 +78,10 @@ public class BrokerStatsManager {
public BrokerStatsManager(String clusterName) {
this.clusterName = clusterName;
this.statsTable.put(QUEUE_PUT_NUMS, new StatsItemSet(QUEUE_PUT_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(QUEUE_PUT_SIZE, new StatsItemSet(QUEUE_PUT_SIZE, this.scheduledExecutorService, log));
this.statsTable.put(QUEUE_GET_NUMS, new StatsItemSet(QUEUE_GET_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(QUEUE_GET_SIZE, new StatsItemSet(QUEUE_GET_SIZE, this.scheduledExecutorService, log));
this.statsTable.put(TOPIC_PUT_NUMS, new StatsItemSet(TOPIC_PUT_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(TOPIC_PUT_SIZE, new StatsItemSet(TOPIC_PUT_SIZE, this.scheduledExecutorService, log));
this.statsTable.put(GROUP_GET_NUMS, new StatsItemSet(GROUP_GET_NUMS, this.scheduledExecutorService, log));
......@@ -124,8 +132,12 @@ public class BrokerStatsManager {
public void onTopicDeleted(final String topic) {
this.statsTable.get(TOPIC_PUT_NUMS).delValue(topic);
this.statsTable.get(TOPIC_PUT_SIZE).delValue(topic);
this.statsTable.get(QUEUE_PUT_NUMS).delValueByPrefixKey(topic, "@");
this.statsTable.get(QUEUE_PUT_SIZE).delValueByPrefixKey(topic, "@");
this.statsTable.get(GROUP_GET_NUMS).delValueByPrefixKey(topic, "@");
this.statsTable.get(GROUP_GET_SIZE).delValueByPrefixKey(topic, "@");
this.statsTable.get(QUEUE_GET_NUMS).delValueByPrefixKey(topic, "@");
this.statsTable.get(QUEUE_GET_SIZE).delValueByPrefixKey(topic, "@");
this.statsTable.get(SNDBCK_PUT_NUMS).delValueByPrefixKey(topic, "@");
this.statsTable.get(GROUP_GET_LATENCY).delValueByInfixKey(topic, "@");
this.momentStatsItemSetFallSize.delValueByInfixKey(topic, "@");
......@@ -135,12 +147,36 @@ public class BrokerStatsManager {
public void onGroupDeleted(final String group) {
this.statsTable.get(GROUP_GET_NUMS).delValueBySuffixKey(group, "@");
this.statsTable.get(GROUP_GET_SIZE).delValueBySuffixKey(group, "@");
this.statsTable.get(QUEUE_GET_NUMS).delValueBySuffixKey(group, "@");
this.statsTable.get(QUEUE_GET_SIZE).delValueBySuffixKey(group, "@");
this.statsTable.get(SNDBCK_PUT_NUMS).delValueBySuffixKey(group, "@");
this.statsTable.get(GROUP_GET_LATENCY).delValueBySuffixKey(group, "@");
this.momentStatsItemSetFallSize.delValueBySuffixKey(group, "@");
this.momentStatsItemSetFallTime.delValueBySuffixKey(group, "@");
}
public void incQueuePutNums(final String topic, final Integer queueId) {
this.statsTable.get(QUEUE_PUT_NUMS).addValue(buildStatsKey(topic, String.valueOf(queueId)), 1, 1);
}
public void incQueuePutNums(final String topic, final Integer queueId, int num, int times) {
this.statsTable.get(QUEUE_PUT_NUMS).addValue(buildStatsKey(topic, String.valueOf(queueId)), num, times);
}
public void incQueuePutSize(final String topic, final Integer queueId, final int size) {
this.statsTable.get(QUEUE_PUT_SIZE).addValue(buildStatsKey(topic, String.valueOf(queueId)), size, 1);
}
public void incQueueGetNums(final String group, final String topic, final Integer queueId, final int incValue) {
final String statsKey = buildStatsKey(buildStatsKey(topic, String.valueOf(queueId)), group);
this.statsTable.get(QUEUE_GET_NUMS).addValue(statsKey, incValue, 1);
}
public void incQueueGetSize(final String group, final String topic, final Integer queueId, final int incValue) {
final String statsKey = buildStatsKey(buildStatsKey(topic, String.valueOf(queueId)), group);
this.statsTable.get(QUEUE_GET_SIZE).addValue(statsKey, incValue, 1);
}
public void incTopicPutNums(final String topic) {
this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, 1, 1);
}
......@@ -158,11 +194,11 @@ public class BrokerStatsManager {
this.statsTable.get(GROUP_GET_NUMS).addValue(statsKey, incValue, 1);
}
public String buildStatsKey(String topic, String group) {
StringBuilder strBuilder = new StringBuilder();
strBuilder.append(topic);
public String buildStatsKey(String prefix, String suffix) {
StringBuffer strBuilder = new StringBuffer();
strBuilder.append(prefix);
strBuilder.append("@");
strBuilder.append(group);
strBuilder.append(suffix);
return strBuilder.toString();
}
......
......@@ -29,6 +29,10 @@ import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_FALL_
import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_LATENCY;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_NUMS;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_SIZE;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.QUEUE_GET_NUMS;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.QUEUE_GET_SIZE;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.QUEUE_PUT_NUMS;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.QUEUE_PUT_SIZE;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.SNDBCK_PUT_NUMS;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.TOPIC_PUT_NUMS;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.TOPIC_PUT_SIZE;
......@@ -38,6 +42,7 @@ public class BrokerStatsManagerTest {
private BrokerStatsManager brokerStatsManager;
private String TOPIC = "TOPIC_TEST";
private Integer QUEUE_ID = 0;
private String GROUP_NAME = "GROUP_TEST";
@Before
......@@ -56,6 +61,36 @@ public class BrokerStatsManagerTest {
assertThat(brokerStatsManager.getStatsItem("TEST", "TEST")).isNull();
}
@Test
public void testIncQueuePutNums() {
brokerStatsManager.incQueuePutNums(TOPIC, QUEUE_ID);
String statsKey = brokerStatsManager.buildStatsKey(TOPIC, String.valueOf(QUEUE_ID));
assertThat(brokerStatsManager.getStatsItem(QUEUE_PUT_NUMS, statsKey).getTimes().doubleValue()).isEqualTo(1L);
brokerStatsManager.incQueuePutNums(TOPIC, QUEUE_ID, 2, 2);
assertThat(brokerStatsManager.getStatsItem(QUEUE_PUT_NUMS, statsKey).getValue().doubleValue()).isEqualTo(3L);
}
@Test
public void testIncQueuePutSize() {
brokerStatsManager.incQueuePutSize(TOPIC, QUEUE_ID, 2);
String statsKey = brokerStatsManager.buildStatsKey(TOPIC, String.valueOf(QUEUE_ID));
assertThat(brokerStatsManager.getStatsItem(QUEUE_PUT_SIZE, statsKey).getValue().doubleValue()).isEqualTo(2L);
}
@Test
public void testIncQueueGetNums() {
brokerStatsManager.incQueueGetNums(GROUP_NAME, TOPIC, QUEUE_ID, 1);
final String statsKey = brokerStatsManager.buildStatsKey(brokerStatsManager.buildStatsKey(TOPIC, String.valueOf(QUEUE_ID)), GROUP_NAME);
assertThat(brokerStatsManager.getStatsItem(QUEUE_GET_NUMS, statsKey).getValue().doubleValue()).isEqualTo(1L);
}
@Test
public void testIncQueueGetSize() {
brokerStatsManager.incQueueGetSize(GROUP_NAME, TOPIC, QUEUE_ID, 1);
final String statsKey = brokerStatsManager.buildStatsKey(brokerStatsManager.buildStatsKey(TOPIC, String.valueOf(QUEUE_ID)), GROUP_NAME);
assertThat(brokerStatsManager.getStatsItem(QUEUE_GET_SIZE, statsKey).getValue().doubleValue()).isEqualTo(1L);
}
@Test
public void testIncTopicPutNums() {
brokerStatsManager.incTopicPutNums(TOPIC);
......@@ -101,8 +136,12 @@ public class BrokerStatsManagerTest {
public void testOnTopicDeleted() {
brokerStatsManager.incTopicPutNums(TOPIC);
brokerStatsManager.incTopicPutSize(TOPIC, 100);
brokerStatsManager.incQueuePutNums(TOPIC, QUEUE_ID);
brokerStatsManager.incQueuePutSize(TOPIC, QUEUE_ID, 100);
brokerStatsManager.incGroupGetNums(GROUP_NAME, TOPIC, 1);
brokerStatsManager.incGroupGetSize(GROUP_NAME, TOPIC, 100);
brokerStatsManager.incQueueGetNums(GROUP_NAME, TOPIC, QUEUE_ID, 1);
brokerStatsManager.incQueueGetSize(GROUP_NAME, TOPIC, QUEUE_ID, 100);
brokerStatsManager.incSendBackNums(GROUP_NAME, TOPIC);
brokerStatsManager.incGroupGetLatency(GROUP_NAME, TOPIC, 1, 1);
brokerStatsManager.recordDiskFallBehindTime(GROUP_NAME, TOPIC, 1, 11L);
......@@ -112,8 +151,12 @@ public class BrokerStatsManagerTest {
Assert.assertNull(brokerStatsManager.getStatsItem(TOPIC_PUT_NUMS, TOPIC));
Assert.assertNull(brokerStatsManager.getStatsItem(TOPIC_PUT_SIZE, TOPIC));
Assert.assertNull(brokerStatsManager.getStatsItem(QUEUE_PUT_NUMS, TOPIC + "@" + QUEUE_ID));
Assert.assertNull(brokerStatsManager.getStatsItem(QUEUE_PUT_SIZE, TOPIC + "@" + QUEUE_ID));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_SIZE, TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_NUMS, TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(QUEUE_GET_SIZE, TOPIC + "@" + QUEUE_ID + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(QUEUE_GET_NUMS, TOPIC + "@" + QUEUE_ID + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(SNDBCK_PUT_NUMS, TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, "1@" + TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_SIZE, "1@" + TOPIC + "@" + GROUP_NAME));
......@@ -124,6 +167,8 @@ public class BrokerStatsManagerTest {
public void testOnGroupDeleted(){
brokerStatsManager.incGroupGetNums(GROUP_NAME, TOPIC, 1);
brokerStatsManager.incGroupGetSize(GROUP_NAME, TOPIC, 100);
brokerStatsManager.incQueueGetNums(GROUP_NAME, TOPIC, QUEUE_ID, 1);
brokerStatsManager.incQueueGetSize(GROUP_NAME, TOPIC, QUEUE_ID, 100);
brokerStatsManager.incSendBackNums(GROUP_NAME, TOPIC);
brokerStatsManager.incGroupGetLatency(GROUP_NAME, TOPIC, 1, 1);
brokerStatsManager.recordDiskFallBehindTime(GROUP_NAME, TOPIC, 1, 11L);
......@@ -133,6 +178,8 @@ public class BrokerStatsManagerTest {
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_SIZE, TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_NUMS, TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(QUEUE_GET_SIZE, TOPIC + "@" + QUEUE_ID + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(QUEUE_GET_NUMS, TOPIC + "@" + QUEUE_ID + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(SNDBCK_PUT_NUMS, TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, "1@" + TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_SIZE, "1@" + TOPIC + "@" + GROUP_NAME));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册