提交 9eca70f5 编写于 作者: Z zhangxu16

Add delay message stats to brokerStatsManager

上级 39bb9386
......@@ -144,7 +144,7 @@ public class DefaultMessageStore implements MessageStore {
}
this.reputMessageService = new ReputMessageService();
this.scheduleMessageService = new ScheduleMessageService(this);
this.scheduleMessageService = new ScheduleMessageService(this, brokerStatsManager);
this.transientStorePool = new TransientStorePool(messageStoreConfig);
......
......@@ -44,6 +44,7 @@ import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
public class ScheduleMessageService extends ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
......@@ -58,14 +59,16 @@ public class ScheduleMessageService extends ConfigManager {
private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
new ConcurrentHashMap<Integer, Long>(32);
private final DefaultMessageStore defaultMessageStore;
private final BrokerStatsManager brokerStatsManager;
private final AtomicBoolean started = new AtomicBoolean(false);
private Timer timer;
private MessageStore writeMessageStore;
private int maxDelayLevel;
public ScheduleMessageService(final DefaultMessageStore defaultMessageStore) {
public ScheduleMessageService(final DefaultMessageStore defaultMessageStore, final BrokerStatsManager brokerStatsManager) {
this.defaultMessageStore = defaultMessageStore;
this.writeMessageStore = defaultMessageStore;
this.brokerStatsManager = brokerStatsManager;
}
public static int queueId2DelayLevel(final int queueId) {
......@@ -316,6 +319,10 @@ public class ScheduleMessageService extends ConfigManager {
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
ScheduleMessageService.this.brokerStatsManager.incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
ScheduleMessageService.this.brokerStatsManager.incTopicPutSize(msgInner.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes());
ScheduleMessageService.this.brokerStatsManager.incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
continue;
} else {
// XXX: warn and notify me
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册