提交 19fb42f5 编写于 作者: Z zhangxu16

Add config enableScheduleMessageStats

上级 a8606d41
...@@ -144,7 +144,7 @@ public class DefaultMessageStore implements MessageStore { ...@@ -144,7 +144,7 @@ public class DefaultMessageStore implements MessageStore {
} }
this.reputMessageService = new ReputMessageService(); this.reputMessageService = new ReputMessageService();
this.scheduleMessageService = new ScheduleMessageService(this, brokerStatsManager); this.scheduleMessageService = new ScheduleMessageService(this);
this.transientStorePool = new TransientStorePool(messageStoreConfig); this.transientStorePool = new TransientStorePool(messageStoreConfig);
......
...@@ -152,6 +152,8 @@ public class MessageStoreConfig { ...@@ -152,6 +152,8 @@ public class MessageStoreConfig {
private boolean isEnableBatchPush = false; private boolean isEnableBatchPush = false;
private boolean enableScheduleMessageStats = true;
public boolean isDebugLockEnable() { public boolean isDebugLockEnable() {
return debugLockEnable; return debugLockEnable;
} }
...@@ -722,4 +724,12 @@ public class MessageStoreConfig { ...@@ -722,4 +724,12 @@ public class MessageStoreConfig {
public void setEnableBatchPush(boolean enableBatchPush) { public void setEnableBatchPush(boolean enableBatchPush) {
isEnableBatchPush = enableBatchPush; isEnableBatchPush = enableBatchPush;
} }
public boolean isEnableScheduleMessageStats() {
return enableScheduleMessageStats;
}
public void setEnableScheduleMessageStats(boolean enableScheduleMessageStats) {
this.enableScheduleMessageStats = enableScheduleMessageStats;
}
} }
...@@ -44,7 +44,6 @@ import org.apache.rocketmq.store.PutMessageResult; ...@@ -44,7 +44,6 @@ import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
public class ScheduleMessageService extends ConfigManager { public class ScheduleMessageService extends ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
...@@ -59,7 +58,6 @@ public class ScheduleMessageService extends ConfigManager { ...@@ -59,7 +58,6 @@ public class ScheduleMessageService extends ConfigManager {
private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable = private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
new ConcurrentHashMap<Integer, Long>(32); new ConcurrentHashMap<Integer, Long>(32);
private final DefaultMessageStore defaultMessageStore; private final DefaultMessageStore defaultMessageStore;
private final BrokerStatsManager brokerStatsManager;
private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean started = new AtomicBoolean(false);
private Timer timer; private Timer timer;
private MessageStore writeMessageStore; private MessageStore writeMessageStore;
...@@ -68,13 +66,6 @@ public class ScheduleMessageService extends ConfigManager { ...@@ -68,13 +66,6 @@ public class ScheduleMessageService extends ConfigManager {
public ScheduleMessageService(final DefaultMessageStore defaultMessageStore) { public ScheduleMessageService(final DefaultMessageStore defaultMessageStore) {
this.defaultMessageStore = defaultMessageStore; this.defaultMessageStore = defaultMessageStore;
this.writeMessageStore = defaultMessageStore; this.writeMessageStore = defaultMessageStore;
this.brokerStatsManager = null;
}
public ScheduleMessageService(final DefaultMessageStore defaultMessageStore, final BrokerStatsManager brokerStatsManager) {
this.defaultMessageStore = defaultMessageStore;
this.writeMessageStore = defaultMessageStore;
this.brokerStatsManager = brokerStatsManager;
} }
public static int queueId2DelayLevel(final int queueId) { public static int queueId2DelayLevel(final int queueId) {
...@@ -325,13 +316,12 @@ public class ScheduleMessageService extends ConfigManager { ...@@ -325,13 +316,12 @@ public class ScheduleMessageService extends ConfigManager {
if (putMessageResult != null if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
if (ScheduleMessageService.this.brokerStatsManager == null) { if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) {
continue; ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes());
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
} }
ScheduleMessageService.this.brokerStatsManager.incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
ScheduleMessageService.this.brokerStatsManager.incTopicPutSize(msgInner.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes());
ScheduleMessageService.this.brokerStatsManager.incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
continue; continue;
} else { } else {
// XXX: warn and notify me // XXX: warn and notify me
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册