提交 36a82fa6 编写于 作者: D dongeforever

Polish the rewrite logic for SendProcessor

上级 71af9c75
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package org.apache.rocketmq.broker.processor; package org.apache.rocketmq.broker.processor;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
...@@ -126,16 +127,28 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -126,16 +127,28 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
try { try {
TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic()); TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
if (mappingDetail == null) { if (mappingDetail == null) {
//it is not static topic
return null; return null;
} }
if (!mappingDetail.getCurrIdMap().containsKey(requestHeader.getQueueId())) { Integer phyQueueId = null;
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName())); //compatible with the old logic, but it fact, this should not happen
if (requestHeader.getQueueId() < 0) {
Iterator<Map.Entry<Integer, Integer>> it = mappingDetail.getCurrIdMap().entrySet().iterator();
if (it.hasNext()) {
phyQueueId = it.next().getValue();
}
} else {
phyQueueId = mappingDetail.getCurrIdMap().get(requestHeader.getQueueId());
}
if (phyQueueId == null) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName()));
} else {
requestHeader.setQueueId(phyQueueId);
return null;
} }
requestHeader.setQueueId(mappingDetail.getCurrIdMap().get(requestHeader.getQueueId()));
} catch (Throwable t) { } catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
} }
return null;
} }
private RemotingCommand rewriteResponseForStaticTopic(String topic, SendMessageResponseHeader responseHeader) { private RemotingCommand rewriteResponseForStaticTopic(String topic, SendMessageResponseHeader responseHeader) {
...@@ -144,14 +157,16 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -144,14 +157,16 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
if (mappingDetail == null) { if (mappingDetail == null) {
return null; return null;
} }
if (!mappingDetail.getCurrIdMap().containsKey(responseHeader.getQueueId())) { Integer globalId = mappingDetail.getCurrIdMapRevert().get(responseHeader.getQueueId());
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in current broker %s", topic, responseHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName())); if (globalId == null) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exist in response process of current broker %s", topic, responseHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName()));
} }
long staticLogicOffset = mappingDetail.convertToLogicOffset(responseHeader.getQueueId(), responseHeader.getQueueOffset()); long staticLogicOffset = mappingDetail.convertToLogicOffset(globalId, responseHeader.getQueueOffset());
if (staticLogicOffset < 0) { if (staticLogicOffset < 0) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", topic, responseHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName())); return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", topic, responseHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName()));
} }
responseHeader.setQueueId(globalId);
responseHeader.setQueueOffset(staticLogicOffset); responseHeader.setQueueOffset(staticLogicOffset);
} catch (Throwable t) { } catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage()); return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
......
...@@ -27,6 +27,9 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { ...@@ -27,6 +27,9 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
// the mapping info in current broker, do not register to nameserver // the mapping info in current broker, do not register to nameserver
ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>(); ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
transient ConcurrentMap<Integer/*physicalId*/, Integer/*logicId*/> currIdMapRevert = new ConcurrentHashMap<Integer, Integer>();
public TopicQueueMappingDetail(String topic, int totalQueues, String bname) { public TopicQueueMappingDetail(String topic, int totalQueues, String bname) {
super(topic, totalQueues, bname); super(topic, totalQueues, bname);
...@@ -45,6 +48,18 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { ...@@ -45,6 +48,18 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
public void buildIdMap() { public void buildIdMap() {
this.currIdMap = buildIdMap(LEVEL_0); this.currIdMap = buildIdMap(LEVEL_0);
this.prevIdMap = buildIdMap(LEVEL_1); this.prevIdMap = buildIdMap(LEVEL_1);
this.currIdMapRevert = revert(this.currIdMap);
}
public ConcurrentMap<Integer, Integer> revert(ConcurrentMap<Integer, Integer> original) {
if (original == null || original.isEmpty()) {
return new ConcurrentHashMap<Integer, Integer>();
}
ConcurrentMap<Integer, Integer> tmpIdMap = new ConcurrentHashMap<Integer, Integer>();
for (Map.Entry<Integer, Integer> entry: tmpIdMap.entrySet()) {
tmpIdMap.put(entry.getValue(), entry.getKey());
}
return tmpIdMap;
} }
public ConcurrentMap<Integer, Integer> buildIdMap(int level) { public ConcurrentMap<Integer, Integer> buildIdMap(int level) {
...@@ -107,6 +122,14 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { ...@@ -107,6 +122,14 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
return topicQueueMappingInfo; return topicQueueMappingInfo;
} }
public ConcurrentMap<Integer, Integer> getCurrIdMapRevert() {
return currIdMapRevert;
}
public void setCurrIdMapRevert(ConcurrentMap<Integer, Integer> currIdMapRevert) {
this.currIdMapRevert = currIdMapRevert;
}
public int getTotalQueues() { public int getTotalQueues() {
return totalQueues; return totalQueues;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册