提交 daf47490 编写于 作者: D dongeforever

Polish the use of route data

上级 40184357
......@@ -10,8 +10,11 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
......@@ -93,33 +96,46 @@ public class ClientMetadata {
|| route.getTopicQueueMappingByBroker().isEmpty()) {
return new ConcurrentHashMap<MessageQueue, String>();
}
ConcurrentMap<MessageQueue, String> mqEndPoints = new ConcurrentHashMap<MessageQueue, String>();
int totalNums = 0;
for (Map.Entry<String, TopicQueueMappingInfo> entry : route.getTopicQueueMappingByBroker().entrySet()) {
String brokerName = entry.getKey();
//TODO check the epoch of
if (entry.getValue().getTotalQueues() > totalNums) {
if (totalNums != 0) {
log.warn("The static logic queue totalNums dose not match before {} {} != {}", topic, totalNums, entry.getValue().getTotalQueues());
}
totalNums = entry.getValue().getTotalQueues();
ConcurrentMap<MessageQueue, TopicQueueMappingInfo> mqEndPoints = new ConcurrentHashMap<MessageQueue, TopicQueueMappingInfo>();
List<Map.Entry<String, TopicQueueMappingInfo>> mappingInfos = new ArrayList<Map.Entry<String, TopicQueueMappingInfo>>(route.getTopicQueueMappingByBroker().entrySet());
Collections.sort(mappingInfos, new Comparator<Map.Entry<String, TopicQueueMappingInfo>>() {
@Override
public int compare(Map.Entry<String, TopicQueueMappingInfo> o1, Map.Entry<String, TopicQueueMappingInfo> o2) {
return (int) (o2.getValue().getEpoch() - o1.getValue().getEpoch());
}
});
int maxTotalNums = 0;
long maxTotalNumOfEpoch = -1;
for (Map.Entry<String, TopicQueueMappingInfo> entry : mappingInfos) {
TopicQueueMappingInfo info = entry.getValue();
if (info.getEpoch() >= maxTotalNumOfEpoch && info.getTotalQueues() > maxTotalNums) {
maxTotalNums = entry.getValue().getTotalQueues();
}
for (Map.Entry<Integer, Integer> idEntry : entry.getValue().getCurrIdMap().entrySet()) {
int globalId = idEntry.getKey();
MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, globalId);
String oldBrokerName = mqEndPoints.put(mq, brokerName);
log.warn("The static logic queue is duplicated {} {} {} ", mq, oldBrokerName, brokerName);
TopicQueueMappingInfo oldInfo = mqEndPoints.get(mq);
if (oldInfo == null || oldInfo.getEpoch() <= info.getEpoch()) {
mqEndPoints.put(mq, info);
}
}
}
ConcurrentMap<MessageQueue, String> mqEndPointsOfBroker = new ConcurrentHashMap<MessageQueue, String>();
//accomplish the static logic queues
for (int i = 0; i < totalNums; i++) {
for (int i = 0; i < maxTotalNums; i++) {
MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, i);
if (!mqEndPoints.containsKey(mq)) {
mqEndPoints.put(mq, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST);
mqEndPointsOfBroker.put(mq, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST);
} else {
mqEndPointsOfBroker.put(mq, mqEndPoints.get(mq).getBname());
}
}
return mqEndPoints;
return mqEndPointsOfBroker;
}
}
......@@ -2,14 +2,14 @@ package org.apache.rocketmq.common.statictopic;
public class LogicQueueMappingItem {
private int gen; //generation, mutable
private int queueId;
private String bname;
private long logicOffset; // the start of the logic offset
private long startOffset; // the start of the physical offset, included
private long endOffset = -1; // the end of the physical offset, excluded
private long timeOfStart = -1; // mutable
private long timeOfEnd = -1; // mutable
private final int gen; // immutable
private final int queueId; //, immutable
private final String bname; //important, immutable
private long logicOffset; // the start of the logic offset, important, can be changed by command only once
private final long startOffset; // the start of the physical offset, should always be 0, immutable
private long endOffset = -1; // the end of the physical offset, excluded, revered -1, mutable
private long timeOfStart = -1; // mutable, reserved
private long timeOfEnd = -1; // mutable, reserved
public LogicQueueMappingItem(int gen, int queueId, String bname, long logicOffset, long startOffset, long endOffset, long timeOfStart, long timeOfEnd) {
this.gen = gen;
......
......@@ -45,14 +45,13 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
public void buildIdMap() {
this.currIdMap = buildIdMap(LEVEL_0);
this.prevIdMap = buildIdMap(LEVEL_1);
}
public ConcurrentMap<Integer, Integer> buildIdMap(int level) {
//level 0 means current leader in this broker
//level 1 means previous leader in this broker
assert level == LEVEL_0 || level == LEVEL_1;
//level 1 means previous leader in this broker, reserved for
assert level == LEVEL_0 ;
if (hostedQueues == null || hostedQueues.isEmpty()) {
return new ConcurrentHashMap<Integer, Integer>();
......@@ -67,12 +66,6 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
if (bname.equals(curr.getBname())) {
tmpIdMap.put(globalId, curr.getQueueId());
}
} else if (level == LEVEL_1
&& items.size() >= 2) {
LogicQueueMappingItem prev = items.get(items.size() - 1);
if (bname.equals(prev.getBname())) {
tmpIdMap.put(globalId, prev.getQueueId());
}
}
}
return tmpIdMap;
......@@ -120,8 +113,6 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
public TopicQueueMappingInfo cloneAsMappingInfo() {
TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname, this.epoch);
topicQueueMappingInfo.currIdMap = this.buildIdMap(LEVEL_0);
topicQueueMappingInfo.prevIdMap = this.buildIdMap(LEVEL_1);
return topicQueueMappingInfo;
}
......
......@@ -23,7 +23,6 @@ import java.util.concurrent.ConcurrentMap;
public class TopicQueueMappingInfo extends RemotingSerializable {
public static final int LEVEL_0 = 0;
public static final int LEVEL_1 = 1;
String topic; // redundant field
int totalQueues;
......@@ -32,8 +31,6 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
boolean dirty; //indicate if the data is dirty
//register to broker to construct the route
transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> currIdMap = new ConcurrentHashMap<Integer, Integer>();
//register to broker to help detect remapping failure
transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> prevIdMap = new ConcurrentHashMap<Integer, Integer>();
public TopicQueueMappingInfo(String topic, int totalQueues, String bname, long epoch) {
this.topic = topic;
......@@ -79,8 +76,4 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
public ConcurrentMap<Integer, Integer> getCurrIdMap() {
return currIdMap;
}
public ConcurrentMap<Integer, Integer> getPrevIdMap() {
return prevIdMap;
}
}
......@@ -168,6 +168,7 @@ public class RouteInfoManager {
if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<String, TopicQueueMappingInfo>());
}
//Note asset brokerName equal entry.getValue().getBname()
topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册