提交 4a22c0c0 编写于 作者: J jungle 提交者: von gosling

[ISSUE #411] Fixed ClassCastException when get the instance of the store (#423)

* Fixed issue #411

* fix cast in getAllDelayOffset

* Update AdminBrokerProcessor.java
上级 945cedaa
...@@ -30,6 +30,7 @@ import org.apache.rocketmq.store.MessageStore; ...@@ -30,6 +30,7 @@ import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.QueryMessageResult; import org.apache.rocketmq.store.QueryMessageResult;
import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
public abstract class AbstractPluginMessageStore implements MessageStore { public abstract class AbstractPluginMessageStore implements MessageStore {
protected MessageStore next = null; protected MessageStore next = null;
...@@ -246,4 +247,9 @@ public abstract class AbstractPluginMessageStore implements MessageStore { ...@@ -246,4 +247,9 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
public ConsumeQueue getConsumeQueue(String topic, int queueId) { public ConsumeQueue getConsumeQueue(String topic, int queueId) {
return next.getConsumeQueue(topic, queueId); return next.getConsumeQueue(topic, queueId);
} }
@Override
public BrokerStatsManager getBrokerStatsManager() {
return next.getBrokerStatsManager();
};
} }
...@@ -114,6 +114,7 @@ import org.apache.rocketmq.store.ConsumeQueue; ...@@ -114,6 +114,7 @@ import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.ConsumeQueueExt; import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageFilter; import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.SelectMappedBufferResult;
public class AdminBrokerProcessor implements NettyRequestProcessor { public class AdminBrokerProcessor implements NettyRequestProcessor {
...@@ -760,12 +761,19 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { ...@@ -760,12 +761,19 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand getAllDelayOffset(ChannelHandlerContext ctx, RemotingCommand request) { private RemotingCommand getAllDelayOffset(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null); final RemotingCommand response = RemotingCommand.createResponseCommand(null);
if (!(this.brokerController.getMessageStore() instanceof DefaultMessageStore)) {
log.error("Delay offset not supported in this messagetore, client: {} ", ctx.channel().remoteAddress());
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("Delay offset not supported in this messagetore");
return response;
}
String content = ((DefaultMessageStore) this.brokerController.getMessageStore()).getScheduleMessageService().encode(); String content = ((DefaultMessageStore) this.brokerController.getMessageStore()).getScheduleMessageService().encode();
if (content != null && content.length() > 0) { if (content != null && content.length() > 0) {
try { try {
response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) { } catch (UnsupportedEncodingException e) {
log.error("get all delay offset from master error.", e); log.error("Get all delay offset from master error.", e);
response.setCode(ResponseCode.SYSTEM_ERROR); response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("UnsupportedEncodingException " + e); response.setRemark("UnsupportedEncodingException " + e);
...@@ -1051,7 +1059,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { ...@@ -1051,7 +1059,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
final ViewBrokerStatsDataRequestHeader requestHeader = final ViewBrokerStatsDataRequestHeader requestHeader =
(ViewBrokerStatsDataRequestHeader) request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class); (ViewBrokerStatsDataRequestHeader) request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class);
final RemotingCommand response = RemotingCommand.createResponseCommand(null); final RemotingCommand response = RemotingCommand.createResponseCommand(null);
DefaultMessageStore messageStore = (DefaultMessageStore) this.brokerController.getMessageStore(); MessageStore messageStore = this.brokerController.getMessageStore();
StatsItem statsItem = messageStore.getBrokerStatsManager().getStatsItem(requestHeader.getStatsName(), requestHeader.getStatsKey()); StatsItem statsItem = messageStore.getBrokerStatsManager().getStatsItem(requestHeader.getStatsName(), requestHeader.getStatsKey());
if (null == statsItem) { if (null == statsItem) {
......
...@@ -1371,6 +1371,7 @@ public class DefaultMessageStore implements MessageStore { ...@@ -1371,6 +1371,7 @@ public class DefaultMessageStore implements MessageStore {
cq.putMessagePositionInfoWrapper(dispatchRequest); cq.putMessagePositionInfoWrapper(dispatchRequest);
} }
@Override
public BrokerStatsManager getBrokerStatsManager() { public BrokerStatsManager getBrokerStatsManager() {
return brokerStatsManager; return brokerStatsManager;
} }
......
...@@ -21,6 +21,7 @@ import java.util.LinkedList; ...@@ -21,6 +21,7 @@ import java.util.LinkedList;
import java.util.Set; import java.util.Set;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
/** /**
* This class defines contracting interfaces to implement, allowing third-party vendor to use customized message store. * This class defines contracting interfaces to implement, allowing third-party vendor to use customized message store.
...@@ -358,4 +359,11 @@ public interface MessageStore { ...@@ -358,4 +359,11 @@ public interface MessageStore {
* @return Consume queue. * @return Consume queue.
*/ */
ConsumeQueue getConsumeQueue(String topic, int queueId); ConsumeQueue getConsumeQueue(String topic, int queueId);
/**
* Get BrokerStatsManager of the messageStore.
*
* @return BrokerStatsManager.
*/
BrokerStatsManager getBrokerStatsManager();
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册