提交 52742942 编写于 作者: D dongeforever

Finish the rewrite logic for AdminBrokerProcessor

上级 507553bd
......@@ -16,16 +16,17 @@
*/
package org.apache.rocketmq.broker.client.rebalance;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageQueue;
public class RebalanceLockManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.REBALANCE_LOCK_LOGGER_NAME);
......
......@@ -696,6 +696,28 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return response;
}
private RemotingCommand rewriteRequestForStaticTopic(GetMaxOffsetRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
if (mappingContext.getMappingDetail() == null) {
return null;
}
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
if (mappingItem == null
|| !mappingDetail.getBname().equals(mappingItem.getBname())) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
}
long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(mappingContext.getTopic(), mappingItem.getQueueId());
offset = mappingItem.computeStaticQueueOffset(offset);
final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader();
responseHeader.setOffset(offset);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
private RemotingCommand getMaxOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
......@@ -703,33 +725,13 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
final GetMaxOffsetRequestHeader requestHeader =
(GetMaxOffsetRequestHeader) request.decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class);
String topic = requestHeader.getTopic();
int queueId = requestHeader.getQueueId();
if (requestHeader.getLogicalQueue()) {
LogicalQueuesInfoInBroker logicalQueuesInfo = this.brokerController.getTopicConfigManager().selectLogicalQueuesInfo(topic);
if (logicalQueuesInfo != null) {
// max offset must be in the queue route with largest offset
LogicalQueueRouteData requestLogicalQueueRouteData = logicalQueuesInfo.queryQueueRouteDataByQueueId(queueId, Long.MAX_VALUE);
if (requestLogicalQueueRouteData != null) {
logicalQueuesInfo.readLock().lock();
try {
List<LogicalQueueRouteData> queueRouteDataList = logicalQueuesInfo.get(requestLogicalQueueRouteData.getLogicalQueueIndex());
if (queueRouteDataList != null && !queueRouteDataList.isEmpty()) {
LogicalQueueRouteData selectedLogicalQueueRouteData = queueRouteDataList.get(queueRouteDataList.size() - 1);
if (!Objects.equals(selectedLogicalQueueRouteData.getMessageQueue(), new MessageQueue(topic, this.brokerController.getBrokerConfig().getBrokerName(), queueId))) {
log.info("getMaxOffset topic={} queueId={} not latest, redirect: {}", topic, queueId, selectedLogicalQueueRouteData);
response.addExtField(MessageConst.PROPERTY_REDIRECT, "1");
}
}
} finally {
logicalQueuesInfo.readLock().unlock();
}
}
}
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
}
long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId, requestHeader.isCommitted());
long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId());
responseHeader.setOffset(offset);
......@@ -738,6 +740,36 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return response;
}
private RemotingCommand rewriteRequestForStaticTopic(GetMinOffsetRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
if (mappingContext.getMappingDetail() == null) {
return null;
}
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
if (mappingItem == null
|| !mappingDetail.getBname().equals(mappingItem.getBname())) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
};
try {
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getMinOffset(mappingItem.getBname(), rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout());
if (rpcResponse.getException() != null) {
throw rpcResponse.getException();
}
GetMinOffsetResponseHeader offsetResponseHeader = (GetMinOffsetResponseHeader) rpcResponse.getHeader();
long offset = mappingItem.computeStaticQueueOffset(offsetResponseHeader.getOffset());
final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader();
responseHeader.setOffset(offset);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
}
private RemotingCommand getMinOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
......@@ -745,6 +777,12 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
final GetMinOffsetRequestHeader requestHeader =
(GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false, 0L);
RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
}
long offset = this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId());
responseHeader.setOffset(offset);
......@@ -753,6 +791,35 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return response;
}
private RemotingCommand rewriteRequestForStaticTopic(GetEarliestMsgStoretimeRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
if (mappingContext.getMappingDetail() == null) {
return null;
}
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
if (mappingItem == null
|| !mappingDetail.getBname().equals(mappingItem.getBname())) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
};
try {
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getEarliestMsgStoretime(mappingItem.getBname(), rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout());
if (rpcResponse.getException() != null) {
throw rpcResponse.getException();
}
GetEarliestMsgStoretimeResponseHeader offsetResponseHeader = (GetEarliestMsgStoretimeResponseHeader) rpcResponse.getHeader();
final RemotingCommand response = RemotingCommand.createResponseCommand(GetEarliestMsgStoretimeResponseHeader.class);
final GetEarliestMsgStoretimeResponseHeader responseHeader = (GetEarliestMsgStoretimeResponseHeader) response.readCustomHeader();
responseHeader.setTimestamp(offsetResponseHeader.getTimestamp());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
}
private RemotingCommand getEarliestMsgStoretime(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetEarliestMsgStoretimeResponseHeader.class);
......@@ -760,6 +827,12 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
final GetEarliestMsgStoretimeRequestHeader requestHeader =
(GetEarliestMsgStoretimeRequestHeader) request.decodeCommandCustomHeader(GetEarliestMsgStoretimeRequestHeader.class);
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false, 0L);
RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
}
long timestamp =
this.brokerController.getMessageStore().getEarliestMessageTime(requestHeader.getTopic(), requestHeader.getQueueId());
......
......@@ -1126,7 +1126,6 @@ public class MQClientAPIImpl {
requestHeader.setTopic(topic);
requestHeader.setQueueId(queueId);
requestHeader.setCommitted(committed);
requestHeader.setLogicalQueue(fromLogicalQueue);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
......
......@@ -20,11 +20,11 @@
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class GetEarliestMsgStoretimeRequestHeader implements CommandCustomHeader {
public class GetEarliestMsgStoretimeRequestHeader extends TopicQueueRequestHeader {
@CFNotNull
private String topic;
@CFNotNull
......@@ -34,18 +34,22 @@ public class GetEarliestMsgStoretimeRequestHeader implements CommandCustomHeader
public void checkFields() throws RemotingCommandException {
}
@Override
public String getTopic() {
return topic;
}
@Override
public void setTopic(String topic) {
this.topic = topic;
}
@Override
public Integer getQueueId() {
return queueId;
}
@Override
public void setQueueId(Integer queueId) {
this.queueId = queueId;
}
......
......@@ -20,34 +20,37 @@
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class GetMaxOffsetRequestHeader implements CommandCustomHeader {
public class GetMaxOffsetRequestHeader extends TopicQueueRequestHeader {
@CFNotNull
private String topic;
@CFNotNull
private Integer queueId;
private boolean committed;
private boolean logicalQueue;
@Override
public void checkFields() throws RemotingCommandException {
}
@Override
public String getTopic() {
return topic;
}
@Override
public void setTopic(String topic) {
this.topic = topic;
}
@Override
public Integer getQueueId() {
return queueId;
}
@Override
public void setQueueId(Integer queueId) {
this.queueId = queueId;
}
......@@ -59,12 +62,4 @@ public class GetMaxOffsetRequestHeader implements CommandCustomHeader {
public boolean isCommitted() {
return committed;
}
public void setLogicalQueue(boolean logicalQueue) {
this.logicalQueue = logicalQueue;
}
public boolean getLogicalQueue() {
return logicalQueue;
}
}
......@@ -20,11 +20,11 @@
*/
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class GetMinOffsetRequestHeader implements CommandCustomHeader {
public class GetMinOffsetRequestHeader extends TopicQueueRequestHeader {
@CFNotNull
private String topic;
@CFNotNull
......@@ -34,18 +34,22 @@ public class GetMinOffsetRequestHeader implements CommandCustomHeader {
public void checkFields() throws RemotingCommandException {
}
@Override
public String getTopic() {
return topic;
}
@Override
public void setTopic(String topic) {
this.topic = topic;
}
@Override
public Integer getQueueId() {
return queueId;
}
@Override
public void setQueueId(Integer queueId) {
this.queueId = queueId;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册