未验证 提交 3e1d5015 编写于 作者: H Heng Du 提交者: GitHub

Merge pull request #2684 from maixiaohai/add_delay_message_stats_to_stats_manager

[ISSUE #2683] Add delay message stats to brokerStatsManager
......@@ -152,6 +152,8 @@ public class MessageStoreConfig {
private boolean isEnableBatchPush = false;
private boolean enableScheduleMessageStats = true;
public boolean isDebugLockEnable() {
return debugLockEnable;
}
......@@ -722,4 +724,12 @@ public class MessageStoreConfig {
public void setEnableBatchPush(boolean enableBatchPush) {
isEnableBatchPush = enableBatchPush;
}
public boolean isEnableScheduleMessageStats() {
return enableScheduleMessageStats;
}
public void setEnableScheduleMessageStats(boolean enableScheduleMessageStats) {
this.enableScheduleMessageStats = enableScheduleMessageStats;
}
}
......@@ -316,6 +316,12 @@ public class ScheduleMessageService extends ConfigManager {
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) {
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes());
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
}
continue;
} else {
// XXX: warn and notify me
......
......@@ -40,6 +40,9 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.BROKER_PUT_NUMS;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.TOPIC_PUT_NUMS;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.TOPIC_PUT_SIZE;
import static org.assertj.core.api.Assertions.assertThat;
......@@ -112,6 +115,10 @@ public class ScheduleMessageServiceTest {
@Test
public void deliverDelayedMessageTimerTaskTest() throws Exception {
assertThat(messageStore.getMessageStoreConfig().isEnableScheduleMessageStats()).isTrue();
assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_NUMS, topic)).isNull();
MessageExtBrokerInner msg = buildMessage();
int realQueueId = msg.getQueueId();
// set delayLevel,and send delay message
......@@ -141,6 +148,10 @@ public class ScheduleMessageServiceTest {
// now,found the message
assertThat(messageResult.getStatus()).isEqualTo(GetMessageStatus.FOUND);
// get the stats change
assertThat(messageStore.getBrokerStatsManager().getStatsItem(BROKER_PUT_NUMS, brokerConfig.getBrokerClusterName()).getValue().get()).isEqualTo(1);
assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_NUMS, topic).getValue().get()).isEqualTo(1L);
assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_SIZE, topic).getValue().get()).isEqualTo(messageResult.getBufferTotalSize());
// get the message body
ByteBuffer byteBuffer = ByteBuffer.allocate(messageResult.getBufferTotalSize());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册