提交 e952dd67 编写于 作者: D dongeforever

Finish the produce and consume test for remapped static topic

上级 f7f32e70
...@@ -109,6 +109,7 @@ import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; ...@@ -109,6 +109,7 @@ import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext; import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.stats.StatsItem; import org.apache.rocketmq.common.stats.StatsItem;
import org.apache.rocketmq.common.stats.StatsSnapshot; import org.apache.rocketmq.common.stats.StatsSnapshot;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
...@@ -621,14 +622,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -621,14 +622,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
if (mappingContext.getMappingDetail() == null) { if (mappingContext.getMappingDetail() == null) {
return null; return null;
} }
if (requestHeader.getPhysical() != null && requestHeader.getPhysical()) {
return null;
}
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
List<LogicQueueMappingItem> mappingItems = mappingContext.getMappingItemList(); List<LogicQueueMappingItem> mappingItems = mappingContext.getMappingItemList();
if (mappingItems == null if (!mappingContext.isLeader()) {
|| mappingItems.isEmpty()) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
} }
//TODO should make sure the timestampOfOffset is equal or bigger than the searched timestamp //TODO should make sure the timestampOfOffset is equal or bigger than the searched timestamp
...@@ -702,14 +699,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -702,14 +699,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
if (mappingContext.getMappingDetail() == null) { if (mappingContext.getMappingDetail() == null) {
return null; return null;
} }
if (requestHeader.getPhysical() != null && requestHeader.getPhysical()) {
return null;
}
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); LogicQueueMappingItem mappingItem = mappingContext.getLeaderItem();
if (mappingItem == null if (!mappingContext.isLeader()) {
|| !mappingDetail.getBname().equals(mappingItem.getBname())) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
} }
long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(mappingContext.getTopic(), mappingItem.getQueueId()); long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(mappingContext.getTopic(), mappingItem.getQueueId());
...@@ -750,16 +743,15 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -750,16 +743,15 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
if (mappingContext.getMappingDetail() == null) { if (mappingContext.getMappingDetail() == null) {
return null; return null;
} }
if (requestHeader.getPhysical() != null && requestHeader.getPhysical()) {
return null;
}
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); if (!mappingContext.isLeader()) {
if (mappingItem == null) {
//this may not //this may not
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
}; };
LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true);
assert mappingItem != null;
try { try {
requestHeader.setBname(mappingItem.getBname()); requestHeader.setBname(mappingItem.getBname());
requestHeader.setPhysical(true); requestHeader.setPhysical(true);
...@@ -786,7 +778,6 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -786,7 +778,6 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
response.setRemark(null); response.setRemark(null);
return response; return response;
} catch (Throwable t) { } catch (Throwable t) {
t.printStackTrace();
log.error("rewriteRequestForStaticTopic failed", t); log.error("rewriteRequestForStaticTopic failed", t);
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
} }
...@@ -800,7 +791,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -800,7 +791,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
(GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class); (GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false, 0L); TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false);
RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext); RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) { if (rewriteResult != null) {
return rewriteResult; return rewriteResult;
...@@ -818,19 +809,18 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -818,19 +809,18 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
if (mappingContext.getMappingDetail() == null) { if (mappingContext.getMappingDetail() == null) {
return null; return null;
} }
if (requestHeader.getPhysical() != null && requestHeader.getPhysical()) {
return null;
}
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); if (!mappingContext.isLeader()) {
if (mappingItem == null) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
}; };
LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true);
assert mappingItem != null;
try { try {
requestHeader.setBname(mappingItem.getBname()); requestHeader.setBname(mappingItem.getBname());
requestHeader.setPhysical(true); requestHeader.setPhysical(true);
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null); RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, requestHeader, null);
//TODO check if it is leader //TODO check if it is in current broker
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get(); RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) { if (rpcResponse.getException() != null) {
throw rpcResponse.getException(); throw rpcResponse.getException();
...@@ -855,7 +845,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements ...@@ -855,7 +845,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
final GetEarliestMsgStoretimeRequestHeader requestHeader = final GetEarliestMsgStoretimeRequestHeader requestHeader =
(GetEarliestMsgStoretimeRequestHeader) request.decodeCommandCustomHeader(GetEarliestMsgStoretimeRequestHeader.class); (GetEarliestMsgStoretimeRequestHeader) request.decodeCommandCustomHeader(GetEarliestMsgStoretimeRequestHeader.class);
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false, 0L); TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false);
RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext); RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) { if (rewriteResult != null) {
return rewriteResult; return rewriteResult;
......
...@@ -54,6 +54,7 @@ import org.apache.rocketmq.common.rpc.RpcResponse; ...@@ -54,6 +54,7 @@ import org.apache.rocketmq.common.rpc.RpcResponse;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem; import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext; import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.sysflag.PullSysFlag; import org.apache.rocketmq.common.sysflag.PullSysFlag;
...@@ -68,7 +69,6 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; ...@@ -68,7 +69,6 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RequestTask; import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageFilter; import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageResult;
...@@ -79,7 +79,6 @@ import java.nio.ByteBuffer; ...@@ -79,7 +79,6 @@ import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse; import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
import static org.apache.rocketmq.remoting.protocol.RemotingCommand.decode;
public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
...@@ -111,15 +110,14 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -111,15 +110,14 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
String topic = mappingContext.getTopic(); String topic = mappingContext.getTopic();
Integer globalId = mappingContext.getGlobalId(); Integer globalId = mappingContext.getGlobalId();
Long globalOffset = mappingContext.getGlobalOffset(); // if the leader? consider the order consumer, which will lock the mq
if (!mappingContext.isLeader()) {
LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
if (mappingItem == null) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d cannot find mapping item in request process of current broker %s", topic, globalId, mappingDetail.getBname())); return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d cannot find mapping item in request process of current broker %s", topic, globalId, mappingDetail.getBname()));
} }
Long globalOffset = requestHeader.getQueueOffset();
//TODO Check if the leader? consider the order consumer, which will lock the mq LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), globalOffset, true);
// mappingContext.setCurrentItem(mappingItem);
if (globalOffset < mappingItem.getLogicOffset()) { if (globalOffset < mappingItem.getLogicOffset()) {
//handleOffsetMoved //handleOffsetMoved
...@@ -174,68 +172,122 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -174,68 +172,122 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
private RemotingCommand rewriteResponseForStaticTopic(PullMessageRequestHeader requestHeader, PullMessageResponseHeader responseHeader, private RemotingCommand rewriteResponseForStaticTopic(PullMessageRequestHeader requestHeader, PullMessageResponseHeader responseHeader,
TopicQueueMappingContext mappingContext, final int code) { TopicQueueMappingContext mappingContext, final int code) {
try { try {
if (mappingContext == null) { if (mappingContext.getMappingDetail() == null) {
return null; return null;
} }
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); LogicQueueMappingItem leaderItem = mappingContext.getLeaderItem();
LogicQueueMappingItem currentItem = mappingContext.getCurrentItem();
LogicQueueMappingItem earlistItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true);
assert currentItem.getLogicOffset() >= 0;
long requestOffset = requestHeader.getQueueOffset(); long requestOffset = requestHeader.getQueueOffset();
long nextBeginOffset = responseHeader.getNextBeginOffset(); long nextBeginOffset = responseHeader.getNextBeginOffset();
long minOffset = responseHeader.getMinOffset(); long minOffset = responseHeader.getMinOffset();
long maxOffset = responseHeader.getMaxOffset(); long maxOffset = responseHeader.getMaxOffset();
int responseCode = code; int responseCode = code;
if (responseCode != ResponseCode.SUCCESS //consider the following situations
&& responseCode != ResponseCode.PULL_RETRY_IMMEDIATELY) { // 1. read from slave, currently not supported
if (mappingContext.isLeader()) { // 2. the middle queue is truncated because of deleting commitlog
if (requestOffset < minOffset) { if (code != ResponseCode.SUCCESS) {
//note the currentItem maybe both the leader and the earliest
if (leaderItem.getGen() == currentItem.getGen()) {
//read the leader
if (requestOffset > maxOffset) {
//actually, we need do nothing, but keep the code structure here
if (code == ResponseCode.PULL_OFFSET_MOVED) {
responseCode = ResponseCode.PULL_OFFSET_MOVED;
} else {
//maybe current broker is the slave
responseCode = code;
}
} else if (requestOffset < minOffset) {
nextBeginOffset = minOffset; nextBeginOffset = minOffset;
responseCode = ResponseCode.PULL_NOT_FOUND; responseCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
} else if (requestOffset > maxOffset) { } else {
responseCode = code;
}
}
//note the currentItem maybe both the leader and the earliest
if (earlistItem.getGen() == currentItem.getGen()) {
//read the earliest one
if (requestOffset < minOffset) {
if (code == ResponseCode.PULL_OFFSET_MOVED) {
responseCode = ResponseCode.PULL_OFFSET_MOVED;
} else {
//maybe read from slave, but we still set it to moved
responseCode = ResponseCode.PULL_OFFSET_MOVED; responseCode = ResponseCode.PULL_OFFSET_MOVED;
} else if (requestOffset == maxOffset) { }
nextBeginOffset = minOffset;
} else if (requestOffset >= maxOffset) {
//just move to another item
LogicQueueMappingItem nextItem = TopicQueueMappingUtils.findNext(mappingContext.getMappingItemList(), currentItem, true);
if (nextItem != null) {
currentItem = nextItem;
nextBeginOffset = currentItem.getStartOffset();
minOffset = currentItem.getStartOffset();
maxOffset = minOffset;
responseCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
} else {
//maybe the next one's logic offset is -1
responseCode = ResponseCode.PULL_NOT_FOUND; responseCode = ResponseCode.PULL_NOT_FOUND;
}
} else { } else {
//let it go //let it go
responseCode = code;
} }
} else { }
//read from the middle item, ignore the PULL_OFFSET_MOVED
if (leaderItem.getGen() != currentItem.getGen()
&& earlistItem.getGen() != currentItem.getGen()) {
if (requestOffset < minOffset) { if (requestOffset < minOffset) {
nextBeginOffset = minOffset; nextBeginOffset = minOffset;
responseCode = ResponseCode.PULL_NOT_FOUND; responseCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
} else if (requestOffset >= maxOffset) { } else if (requestOffset >= maxOffset) {
responseCode = ResponseCode.PULL_NOT_FOUND;
//just move to another item //just move to another item
mappingItem = mappingContext.findNext(); LogicQueueMappingItem nextItem = TopicQueueMappingUtils.findNext(mappingContext.getMappingItemList(), currentItem, true);
assert mappingItem != null; if (nextItem != null) {
nextBeginOffset = mappingItem.getStartOffset(); currentItem = nextItem;
minOffset = mappingItem.getStartOffset(); nextBeginOffset = currentItem.getStartOffset();
minOffset = currentItem.getStartOffset();
maxOffset = minOffset; maxOffset = minOffset;
responseCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
} else {
//maybe the next one's logic offset is -1
responseCode = ResponseCode.PULL_NOT_FOUND;
}
} else {
responseCode = code;
} }
} }
} }
//handle nextBeginOffset //handle nextBeginOffset
//the next begin offset should no more than the end offset //the next begin offset should no more than the end offset
if (mappingItem.checkIfEndOffsetDecided() if (currentItem.checkIfEndOffsetDecided()
&& nextBeginOffset >= mappingItem.getEndOffset()) { && nextBeginOffset >= currentItem.getEndOffset()) {
nextBeginOffset = mappingItem.getEndOffset(); nextBeginOffset = currentItem.getEndOffset();
} }
responseHeader.setNextBeginOffset(mappingItem.computeStaticQueueOffsetUpToEnd(nextBeginOffset)); responseHeader.setNextBeginOffset(currentItem.computeStaticQueueOffsetUpToEnd(nextBeginOffset));
//handle min offset //handle min offset
responseHeader.setMinOffset(mappingItem.computeStaticQueueOffsetUpToEnd(Math.max(mappingItem.getStartOffset(), minOffset))); responseHeader.setMinOffset(currentItem.computeStaticQueueOffsetUpToEnd(Math.max(currentItem.getStartOffset(), minOffset)));
//handle max offset //handle max offset
responseHeader.setMaxOffset(Math.max(mappingItem.computeStaticQueueOffsetUpToEnd(maxOffset), responseHeader.setMaxOffset(Math.max(currentItem.computeStaticQueueOffsetUpToEnd(maxOffset),
TopicQueueMappingDetail.computeMaxOffsetFromMapping(mappingDetail, mappingContext.getGlobalId()))); TopicQueueMappingDetail.computeMaxOffsetFromMapping(mappingDetail, mappingContext.getGlobalId())));
//set the offsetDelta //set the offsetDelta
responseHeader.setOffsetDelta(mappingItem.computeOffsetDelta()); responseHeader.setOffsetDelta(currentItem.computeOffsetDelta());
if (code != ResponseCode.SUCCESS if (code != ResponseCode.SUCCESS) {
&& code != ResponseCode.PULL_RETRY_IMMEDIATELY) {
return RemotingCommand.createResponseCommandWithHeader(responseCode, responseHeader); return RemotingCommand.createResponseCommandWithHeader(responseCode, responseHeader);
} else { } else {
return null; return null;
} }
} catch (Throwable t) { } catch (Throwable t) {
t.printStackTrace();
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
} }
} }
...@@ -292,7 +344,8 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements ...@@ -292,7 +344,8 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
return response; return response;
} }
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false, requestHeader.getQueueOffset());
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false);
{ {
RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext); RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
......
...@@ -101,7 +101,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -101,7 +101,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
if (requestHeader == null) { if (requestHeader == null) {
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, true, Long.MAX_VALUE); TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, true);
RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext); RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) { if (rewriteResult != null) {
return CompletableFuture.completedFuture(rewriteResult); return CompletableFuture.completedFuture(rewriteResult);
...@@ -130,7 +130,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -130,7 +130,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
} }
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); LogicQueueMappingItem mappingItem = mappingContext.getLeaderItem();
if (mappingItem == null) { if (mappingItem == null) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
} }
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
package org.apache.rocketmq.broker.topic; package org.apache.rocketmq.broker.topic;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.google.common.collect.ImmutableList;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager; import org.apache.rocketmq.common.ConfigManager;
...@@ -164,24 +163,24 @@ public class TopicQueueMappingManager extends ConfigManager { ...@@ -164,24 +163,24 @@ public class TopicQueueMappingManager extends ConfigManager {
} }
public TopicQueueMappingContext buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader) { public TopicQueueMappingContext buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader) {
return buildTopicQueueMappingContext(requestHeader, false, Long.MAX_VALUE); return buildTopicQueueMappingContext(requestHeader, false);
} }
//Do not return a null context //Do not return a null context
public TopicQueueMappingContext buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader, boolean selectOneWhenMiss, Long globalOffset) { public TopicQueueMappingContext buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader, boolean selectOneWhenMiss) {
if (requestHeader.getPhysical() != null if (requestHeader.getPhysical() != null
&& Boolean.TRUE.equals(requestHeader.getPhysical())) { && Boolean.TRUE.equals(requestHeader.getPhysical())) {
return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null, null); return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null);
} }
TopicQueueMappingDetail mappingDetail = getTopicQueueMapping(requestHeader.getTopic()); TopicQueueMappingDetail mappingDetail = getTopicQueueMapping(requestHeader.getTopic());
if (mappingDetail == null) { if (mappingDetail == null) {
//it is not static topic //it is not static topic
return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null, null); return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null);
} }
//If not find mappingItem, it encounters some errors //If not find mappingItem, it encounters some errors
Integer globalId = requestHeader.getQueueId(); Integer globalId = requestHeader.getQueueId();
if (globalId < 0 && !selectOneWhenMiss) { if (globalId < 0 && !selectOneWhenMiss) {
return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, globalOffset, mappingDetail, null, null); return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, mappingDetail, null, null);
} }
if (globalId < 0) { if (globalId < 0) {
...@@ -194,24 +193,17 @@ public class TopicQueueMappingManager extends ConfigManager { ...@@ -194,24 +193,17 @@ public class TopicQueueMappingManager extends ConfigManager {
} }
} }
if (globalId < 0) { if (globalId < 0) {
return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, globalOffset, mappingDetail, null, null); return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, mappingDetail, null, null);
} }
List<LogicQueueMappingItem> mappingItemList = null; List<LogicQueueMappingItem> mappingItemList = null;
LogicQueueMappingItem mappingItem = null; LogicQueueMappingItem leaderItem = null;
if (globalOffset == null
|| Long.MAX_VALUE == globalOffset) {
mappingItemList = TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId); mappingItemList = TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId);
if (mappingItemList != null if (mappingItemList != null
&& mappingItemList.size() > 0) { && mappingItemList.size() > 0) {
mappingItem = mappingItemList.get(mappingItemList.size() - 1); leaderItem = mappingItemList.get(mappingItemList.size() - 1);
}
} else {
mappingItemList = TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId);
mappingItem = TopicQueueMappingDetail.findLogicQueueMappingItem(mappingItemList, globalOffset);
} }
return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, globalOffset, mappingDetail, mappingItemList, mappingItem); return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, mappingDetail, mappingItemList, leaderItem);
} }
...@@ -221,11 +213,10 @@ public class TopicQueueMappingManager extends ConfigManager { ...@@ -221,11 +213,10 @@ public class TopicQueueMappingManager extends ConfigManager {
return null; return null;
} }
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail(); TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
LogicQueueMappingItem mappingItem = mappingContext.getMappingItem(); if (!mappingContext.isLeader()) {
if (mappingItem == null
|| !mappingDetail.getBname().equals(mappingItem.getBname())) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), mappingDetail.getBname())); return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), mappingDetail.getBname()));
} }
LogicQueueMappingItem mappingItem = mappingContext.getLeaderItem();
requestHeader.setQueueId(mappingItem.getQueueId()); requestHeader.setQueueId(mappingItem.getQueueId());
return null; return null;
} catch (Throwable t) { } catch (Throwable t) {
......
...@@ -23,6 +23,8 @@ import java.util.Random; ...@@ -23,6 +23,8 @@ import java.util.Random;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.consumer.PopCallback; import org.apache.rocketmq.client.consumer.PopCallback;
import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
import org.apache.rocketmq.client.consumer.PullCallback; import org.apache.rocketmq.client.consumer.PullCallback;
...@@ -170,6 +172,7 @@ public class PullAPIWrapper { ...@@ -170,6 +172,7 @@ public class PullAPIWrapper {
this.recalculatePullFromWhichNode(mq), false); this.recalculatePullFromWhichNode(mq), false);
} }
if (findBrokerResult != null) { if (findBrokerResult != null) {
{ {
// check version // check version
...@@ -203,6 +206,7 @@ public class PullAPIWrapper { ...@@ -203,6 +206,7 @@ public class PullAPIWrapper {
brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr); brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);
} }
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
brokerAddr, brokerAddr,
requestHeader, requestHeader,
......
...@@ -32,6 +32,10 @@ public class LogicQueueMappingItem extends RemotingSerializable { ...@@ -32,6 +32,10 @@ public class LogicQueueMappingItem extends RemotingSerializable {
} }
public long computeStaticQueueOffsetUpToEnd(long physicalQueueOffset) { public long computeStaticQueueOffsetUpToEnd(long physicalQueueOffset) {
//consider the newly mapped item
if (logicOffset < 0) {
return -1;
}
if (physicalQueueOffset < startOffset) { if (physicalQueueOffset < startOffset) {
return logicOffset; return logicOffset;
} }
...@@ -43,6 +47,9 @@ public class LogicQueueMappingItem extends RemotingSerializable { ...@@ -43,6 +47,9 @@ public class LogicQueueMappingItem extends RemotingSerializable {
} }
public long computeStaticQueueOffset(long physicalQueueOffset) { public long computeStaticQueueOffset(long physicalQueueOffset) {
if (logicOffset < 0) {
return logicOffset;
}
if (physicalQueueOffset < startOffset) { if (physicalQueueOffset < startOffset) {
return logicOffset; return logicOffset;
} }
......
...@@ -23,18 +23,18 @@ import java.util.List; ...@@ -23,18 +23,18 @@ import java.util.List;
public class TopicQueueMappingContext { public class TopicQueueMappingContext {
private String topic; private String topic;
private Integer globalId; private Integer globalId;
private Long globalOffset;
private TopicQueueMappingDetail mappingDetail; private TopicQueueMappingDetail mappingDetail;
private List<LogicQueueMappingItem> mappingItemList; private List<LogicQueueMappingItem> mappingItemList;
private LogicQueueMappingItem mappingItem; private LogicQueueMappingItem leaderItem;
public TopicQueueMappingContext(String topic, Integer globalId, Long globalOffset, TopicQueueMappingDetail mappingDetail, List<LogicQueueMappingItem> mappingItemList, LogicQueueMappingItem mappingItem) { private LogicQueueMappingItem currentItem;
public TopicQueueMappingContext(String topic, Integer globalId, TopicQueueMappingDetail mappingDetail, List<LogicQueueMappingItem> mappingItemList, LogicQueueMappingItem leaderItem) {
this.topic = topic; this.topic = topic;
this.globalId = globalId; this.globalId = globalId;
this.globalOffset = globalOffset;
this.mappingDetail = mappingDetail; this.mappingDetail = mappingDetail;
this.mappingItemList = mappingItemList; this.mappingItemList = mappingItemList;
this.mappingItem = mappingItem; this.leaderItem = leaderItem;
} }
...@@ -45,33 +45,7 @@ public class TopicQueueMappingContext { ...@@ -45,33 +45,7 @@ public class TopicQueueMappingContext {
} }
public boolean isLeader() { public boolean isLeader() {
if (mappingDetail == null return leaderItem != null && leaderItem.getBname().equals(mappingDetail.getBname());
|| mappingItemList == null
|| mappingItemList.isEmpty()) {
return false;
}
LogicQueueMappingItem mappingItem = mappingItemList.get(mappingItemList.size() - 1);
return mappingItem.getBname().equals(mappingDetail.getBname());
}
public LogicQueueMappingItem findNext() {
if (mappingDetail == null
|| mappingItem == null
|| mappingItemList == null
|| mappingItemList.isEmpty()) {
return null;
}
for (int i = 0; i < mappingItemList.size(); i++) {
LogicQueueMappingItem item = mappingItemList.get(i);
if (item.getGen() == mappingItem.getGen()) {
if (i < mappingItemList.size() - 1) {
return mappingItemList.get(i + 1);
} else {
return null;
}
}
}
return null;
} }
...@@ -91,13 +65,6 @@ public class TopicQueueMappingContext { ...@@ -91,13 +65,6 @@ public class TopicQueueMappingContext {
this.globalId = globalId; this.globalId = globalId;
} }
public Long getGlobalOffset() {
return globalOffset;
}
public void setGlobalOffset(Long globalOffset) {
this.globalOffset = globalOffset;
}
public TopicQueueMappingDetail getMappingDetail() { public TopicQueueMappingDetail getMappingDetail() {
return mappingDetail; return mappingDetail;
...@@ -115,13 +82,23 @@ public class TopicQueueMappingContext { ...@@ -115,13 +82,23 @@ public class TopicQueueMappingContext {
this.mappingItemList = mappingItemList; this.mappingItemList = mappingItemList;
} }
public LogicQueueMappingItem getMappingItem() { public LogicQueueMappingItem getLeaderItem() {
return mappingItem; return leaderItem;
} }
public void setMappingItem(LogicQueueMappingItem mappingItem) { public void setLeaderItem(LogicQueueMappingItem leaderItem) {
this.mappingItem = mappingItem; this.leaderItem = leaderItem;
} }
public LogicQueueMappingItem getCurrentItem() {
return currentItem;
}
public void setCurrentItem(LogicQueueMappingItem currentItem) {
this.currentItem = currentItem;
}
public void setMappingItemList(List<LogicQueueMappingItem> mappingItemList) {
this.mappingItemList = mappingItemList;
}
} }
...@@ -77,28 +77,6 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { ...@@ -77,28 +77,6 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
} }
public static LogicQueueMappingItem findLogicQueueMappingItem(List<LogicQueueMappingItem> mappingItems, long logicOffset) {
if (mappingItems == null
|| mappingItems.isEmpty()) {
return null;
}
//Could use bi-search to polish performance
for (int i = mappingItems.size() - 1; i >= 0; i--) {
LogicQueueMappingItem item = mappingItems.get(i);
if (item.getLogicOffset() >= 0
&& logicOffset >= item.getLogicOffset()) {
return item;
}
}
//if not found, maybe out of range, return the first one
for (int i = 0; i < mappingItems.size(); i++) {
if (!mappingItems.get(i).checkIfShouldDeleted()) {
return mappingItems.get(i);
}
}
return null;
}
public static long computeMaxOffsetFromMapping(TopicQueueMappingDetail mappingDetail, Integer globalId) { public static long computeMaxOffsetFromMapping(TopicQueueMappingDetail mappingDetail, Integer globalId) {
List<LogicQueueMappingItem> mappingItems = getMappingInfo(mappingDetail, globalId); List<LogicQueueMappingItem> mappingItems = getMappingInfo(mappingDetail, globalId);
if (mappingItems == null if (mappingItems == null
......
...@@ -618,4 +618,58 @@ public class TopicQueueMappingUtils { ...@@ -618,4 +618,58 @@ public class TopicQueueMappingUtils {
return new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_REMAPPING, newEpoch, brokerConfigMap, brokersToMapIn, brokersToMapOut); return new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_REMAPPING, newEpoch, brokerConfigMap, brokersToMapIn, brokersToMapOut);
} }
public static LogicQueueMappingItem findLogicQueueMappingItem(List<LogicQueueMappingItem> mappingItems, long logicOffset, boolean ignoreNegative) {
if (mappingItems == null
|| mappingItems.isEmpty()) {
return null;
}
//Could use bi-search to polish performance
for (int i = mappingItems.size() - 1; i >= 0; i--) {
LogicQueueMappingItem item = mappingItems.get(i);
if (ignoreNegative && item.getLogicOffset() < 0) {
continue;
}
if (logicOffset >= item.getLogicOffset()) {
return item;
}
}
//if not found, maybe out of range, return the first one
for (int i = 0; i < mappingItems.size(); i++) {
LogicQueueMappingItem item = mappingItems.get(i);
if (ignoreNegative && item.getLogicOffset() < 0) {
continue;
}
if (!item.checkIfShouldDeleted()) {
return mappingItems.get(i);
}
}
return null;
}
public static LogicQueueMappingItem findNext(List<LogicQueueMappingItem> items, LogicQueueMappingItem currentItem, boolean ignoreNegative) {
if (items == null
|| currentItem == null) {
return null;
}
for (int i = 0; i < items.size(); i++) {
LogicQueueMappingItem item = items.get(i);
if (ignoreNegative && item.getLogicOffset() < 0) {
continue;
}
if (item.getGen() == currentItem.getGen()) {
if (i < items.size() - 1) {
item = items.get(i + 1);
if (ignoreNegative && item.getLogicOffset() < 0) {
return null;
} else {
return item;
}
} else {
return null;
}
}
}
return null;
}
} }
...@@ -48,6 +48,7 @@ public class StaticTopicIT extends BaseConf { ...@@ -48,6 +48,7 @@ public class StaticTopicIT extends BaseConf {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
System.setProperty("rocketmq.client.rebalance.waitInterval", "500");
defaultMQAdminExt = getAdmin(nsAddr); defaultMQAdminExt = getAdmin(nsAddr);
waitBrokerRegistered(nsAddr, clusterName); waitBrokerRegistered(nsAddr, clusterName);
clientMetadata = new ClientMetadata(); clientMetadata = new ClientMetadata();
...@@ -173,7 +174,6 @@ public class StaticTopicIT extends BaseConf { ...@@ -173,7 +174,6 @@ public class StaticTopicIT extends BaseConf {
String topic = "static" + MQRandomUtils.getRandomTopic(); String topic = "static" + MQRandomUtils.getRandomTopic();
RMQNormalProducer producer = getProducer(nsAddr, topic); RMQNormalProducer producer = getProducer(nsAddr, topic);
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener()); RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
producer.getProducer().setPollNameServerInterval(100);
int queueNum = 10; int queueNum = 10;
int msgEachQueue = 100; int msgEachQueue = 100;
...@@ -183,6 +183,9 @@ public class StaticTopicIT extends BaseConf { ...@@ -183,6 +183,9 @@ public class StaticTopicIT extends BaseConf {
targetBrokers.add(broker1Name); targetBrokers.add(broker1Name);
createStaticTopic(topic, queueNum, targetBrokers); createStaticTopic(topic, queueNum, targetBrokers);
} }
//System.out.printf("%s %s\n", broker1Name, clientMetadata.findMasterBrokerAddr(broker1Name));
//System.out.printf("%s %s\n", broker2Name, clientMetadata.findMasterBrokerAddr(broker2Name));
//produce the messages //produce the messages
{ {
List<MessageQueue> messageQueueList = producer.getMessageQueue(); List<MessageQueue> messageQueueList = producer.getMessageQueue();
...@@ -203,6 +206,11 @@ public class StaticTopicIT extends BaseConf { ...@@ -203,6 +206,11 @@ public class StaticTopicIT extends BaseConf {
} }
} }
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody());
//remapping the static topic //remapping the static topic
{ {
Set<String> targetBrokers = new HashSet<>(); Set<String> targetBrokers = new HashSet<>();
...@@ -242,8 +250,7 @@ public class StaticTopicIT extends BaseConf { ...@@ -242,8 +250,7 @@ public class StaticTopicIT extends BaseConf {
} }
} }
{ {
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000); consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 30000);
System.out.println("Consume: " + consumer.getListener().getAllMsgBody().size());
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListener().getAllMsgBody())) consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody()); .containsExactlyElementsIn(producer.getAllMsgBody());
...@@ -258,7 +265,7 @@ public class StaticTopicIT extends BaseConf { ...@@ -258,7 +265,7 @@ public class StaticTopicIT extends BaseConf {
Assert.assertEquals(queueNum, messagesByQueue.size()); Assert.assertEquals(queueNum, messagesByQueue.size());
for (int i = 0; i < queueNum; i++) { for (int i = 0; i < queueNum; i++) {
List<MessageExt> messageExts = messagesByQueue.get(i); List<MessageExt> messageExts = messagesByQueue.get(i);
Assert.assertEquals(msgEachQueue, messageExts.size()); Assert.assertEquals(msgEachQueue * 2, messageExts.size());
Collections.sort(messageExts, new Comparator<MessageExt>() { Collections.sort(messageExts, new Comparator<MessageExt>() {
@Override @Override
public int compare(MessageExt o1, MessageExt o2) { public int compare(MessageExt o1, MessageExt o2) {
...@@ -268,12 +275,16 @@ public class StaticTopicIT extends BaseConf { ...@@ -268,12 +275,16 @@ public class StaticTopicIT extends BaseConf {
for (int j = 0; j < msgEachQueue; j++) { for (int j = 0; j < msgEachQueue; j++) {
Assert.assertEquals(j, messageExts.get(j).getQueueOffset()); Assert.assertEquals(j, messageExts.get(j).getQueueOffset());
} }
for (int j = msgEachQueue; j < msgEachQueue * 2; j++) {
Assert.assertEquals(j + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE - msgEachQueue, messageExts.get(j).getQueueOffset());
}
} }
} }
} }
@After @After
public void tearDown() { public void tearDown() {
System.setProperty("rocketmq.client.rebalance.waitInterval", "20000");
super.shutdown(); super.shutdown();
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册