diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java index f6f8a80afb6d48e178f6d27ead85bac5464e820a..e66cead4121c118a92093fe898ed97363b8a49fd 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java @@ -30,6 +30,7 @@ import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.QueryMessageResult; import org.apache.rocketmq.store.SelectMappedBufferResult; +import org.apache.rocketmq.store.stats.BrokerStatsManager; public abstract class AbstractPluginMessageStore implements MessageStore { protected MessageStore next = null; @@ -246,4 +247,9 @@ public abstract class AbstractPluginMessageStore implements MessageStore { public ConsumeQueue getConsumeQueue(String topic, int queueId) { return next.getConsumeQueue(topic, queueId); } + + @Override + public BrokerStatsManager getBrokerStatsManager() { + return next.getBrokerStatsManager(); + }; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 356aafc46fb0637a7411c5042fc6170a33d2560c..73fe43942709136ec2d94c879feb66b9a0e23286 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -114,6 +114,7 @@ import org.apache.rocketmq.store.ConsumeQueue; import org.apache.rocketmq.store.ConsumeQueueExt; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.MessageFilter; +import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.SelectMappedBufferResult; public class AdminBrokerProcessor implements NettyRequestProcessor { @@ -760,12 +761,19 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { private RemotingCommand getAllDelayOffset(ChannelHandlerContext ctx, RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); + if (!(this.brokerController.getMessageStore() instanceof DefaultMessageStore)) { + log.error("Delay offset not supported in this messagetore, client: {} ", ctx.channel().remoteAddress()); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("Delay offset not supported in this messagetore"); + return response; + } + String content = ((DefaultMessageStore) this.brokerController.getMessageStore()).getScheduleMessageService().encode(); if (content != null && content.length() > 0) { try { response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); } catch (UnsupportedEncodingException e) { - log.error("get all delay offset from master error.", e); + log.error("Get all delay offset from master error.", e); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("UnsupportedEncodingException " + e); @@ -1051,7 +1059,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { final ViewBrokerStatsDataRequestHeader requestHeader = (ViewBrokerStatsDataRequestHeader) request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class); final RemotingCommand response = RemotingCommand.createResponseCommand(null); - DefaultMessageStore messageStore = (DefaultMessageStore) this.brokerController.getMessageStore(); + MessageStore messageStore = this.brokerController.getMessageStore(); StatsItem statsItem = messageStore.getBrokerStatsManager().getStatsItem(requestHeader.getStatsName(), requestHeader.getStatsKey()); if (null == statsItem) { diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 1ade7c2838e0382eb8223646fd6ad31a74b77def..ff431ed889a8fadabce0c320b1ac1e7e94ea6907 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -1371,6 +1371,7 @@ public class DefaultMessageStore implements MessageStore { cq.putMessagePositionInfoWrapper(dispatchRequest); } + @Override public BrokerStatsManager getBrokerStatsManager() { return brokerStatsManager; } diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index 907dfe2093b79d86c5b3cca4cc49e02a425a8f46..0f9b4f0ae6e35ef552c521855078b5638b705313 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -21,6 +21,7 @@ import java.util.LinkedList; import java.util.Set; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; +import org.apache.rocketmq.store.stats.BrokerStatsManager; /** * This class defines contracting interfaces to implement, allowing third-party vendor to use customized message store. @@ -358,4 +359,11 @@ public interface MessageStore { * @return Consume queue. */ ConsumeQueue getConsumeQueue(String topic, int queueId); + + /** + * Get BrokerStatsManager of the messageStore. + * + * @return BrokerStatsManager. + */ + BrokerStatsManager getBrokerStatsManager(); }