From daf47490c3e26720762534462c41a77b15e97ea2 Mon Sep 17 00:00:00 2001 From: dongeforever Date: Sat, 20 Nov 2021 13:20:39 +0800 Subject: [PATCH] Polish the use of route data --- .../rocketmq/common/rpc/ClientMetadata.java | 48 ++++++++++++------- .../statictopic/LogicQueueMappingItem.java | 16 +++---- .../statictopic/TopicQueueMappingDetail.java | 13 +---- .../statictopic/TopicQueueMappingInfo.java | 7 --- .../namesrv/routeinfo/RouteInfoManager.java | 1 + 5 files changed, 43 insertions(+), 42 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java index 38f1d513..53ffa51a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java @@ -10,8 +10,11 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -93,33 +96,46 @@ public class ClientMetadata { || route.getTopicQueueMappingByBroker().isEmpty()) { return new ConcurrentHashMap(); } - ConcurrentMap mqEndPoints = new ConcurrentHashMap(); - - int totalNums = 0; - for (Map.Entry entry : route.getTopicQueueMappingByBroker().entrySet()) { - String brokerName = entry.getKey(); - //TODO check the epoch of - if (entry.getValue().getTotalQueues() > totalNums) { - if (totalNums != 0) { - log.warn("The static logic queue totalNums dose not match before {} {} != {}", topic, totalNums, entry.getValue().getTotalQueues()); - } - totalNums = entry.getValue().getTotalQueues(); + ConcurrentMap mqEndPoints = new ConcurrentHashMap(); + + + List> mappingInfos = new ArrayList>(route.getTopicQueueMappingByBroker().entrySet()); + Collections.sort(mappingInfos, new Comparator>() { + @Override + public int compare(Map.Entry o1, Map.Entry o2) { + return (int) (o2.getValue().getEpoch() - o1.getValue().getEpoch()); + } + }); + + int maxTotalNums = 0; + long maxTotalNumOfEpoch = -1; + for (Map.Entry entry : mappingInfos) { + TopicQueueMappingInfo info = entry.getValue(); + if (info.getEpoch() >= maxTotalNumOfEpoch && info.getTotalQueues() > maxTotalNums) { + maxTotalNums = entry.getValue().getTotalQueues(); } for (Map.Entry idEntry : entry.getValue().getCurrIdMap().entrySet()) { int globalId = idEntry.getKey(); MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, globalId); - String oldBrokerName = mqEndPoints.put(mq, brokerName); - log.warn("The static logic queue is duplicated {} {} {} ", mq, oldBrokerName, brokerName); + TopicQueueMappingInfo oldInfo = mqEndPoints.get(mq); + if (oldInfo == null || oldInfo.getEpoch() <= info.getEpoch()) { + mqEndPoints.put(mq, info); + } } } + + ConcurrentMap mqEndPointsOfBroker = new ConcurrentHashMap(); + //accomplish the static logic queues - for (int i = 0; i < totalNums; i++) { + for (int i = 0; i < maxTotalNums; i++) { MessageQueue mq = new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, i); if (!mqEndPoints.containsKey(mq)) { - mqEndPoints.put(mq, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST); + mqEndPointsOfBroker.put(mq, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST); + } else { + mqEndPointsOfBroker.put(mq, mqEndPoints.get(mq).getBname()); } } - return mqEndPoints; + return mqEndPointsOfBroker; } } diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java index c855dfdc..479f75d6 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java @@ -2,14 +2,14 @@ package org.apache.rocketmq.common.statictopic; public class LogicQueueMappingItem { - private int gen; //generation, mutable - private int queueId; - private String bname; - private long logicOffset; // the start of the logic offset - private long startOffset; // the start of the physical offset, included - private long endOffset = -1; // the end of the physical offset, excluded - private long timeOfStart = -1; // mutable - private long timeOfEnd = -1; // mutable + private final int gen; // immutable + private final int queueId; //, immutable + private final String bname; //important, immutable + private long logicOffset; // the start of the logic offset, important, can be changed by command only once + private final long startOffset; // the start of the physical offset, should always be 0, immutable + private long endOffset = -1; // the end of the physical offset, excluded, revered -1, mutable + private long timeOfStart = -1; // mutable, reserved + private long timeOfEnd = -1; // mutable, reserved public LogicQueueMappingItem(int gen, int queueId, String bname, long logicOffset, long startOffset, long endOffset, long timeOfStart, long timeOfEnd) { this.gen = gen; diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java index c941fff9..7117cad9 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java @@ -45,14 +45,13 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { public void buildIdMap() { this.currIdMap = buildIdMap(LEVEL_0); - this.prevIdMap = buildIdMap(LEVEL_1); } public ConcurrentMap buildIdMap(int level) { //level 0 means current leader in this broker - //level 1 means previous leader in this broker - assert level == LEVEL_0 || level == LEVEL_1; + //level 1 means previous leader in this broker, reserved for + assert level == LEVEL_0 ; if (hostedQueues == null || hostedQueues.isEmpty()) { return new ConcurrentHashMap(); @@ -67,12 +66,6 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { if (bname.equals(curr.getBname())) { tmpIdMap.put(globalId, curr.getQueueId()); } - } else if (level == LEVEL_1 - && items.size() >= 2) { - LogicQueueMappingItem prev = items.get(items.size() - 1); - if (bname.equals(prev.getBname())) { - tmpIdMap.put(globalId, prev.getQueueId()); - } } } return tmpIdMap; @@ -120,8 +113,6 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { public TopicQueueMappingInfo cloneAsMappingInfo() { TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname, this.epoch); topicQueueMappingInfo.currIdMap = this.buildIdMap(LEVEL_0); - topicQueueMappingInfo.prevIdMap = this.buildIdMap(LEVEL_1); - return topicQueueMappingInfo; } diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java index b6ce222b..f6122c05 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java @@ -23,7 +23,6 @@ import java.util.concurrent.ConcurrentMap; public class TopicQueueMappingInfo extends RemotingSerializable { public static final int LEVEL_0 = 0; - public static final int LEVEL_1 = 1; String topic; // redundant field int totalQueues; @@ -32,8 +31,6 @@ public class TopicQueueMappingInfo extends RemotingSerializable { boolean dirty; //indicate if the data is dirty //register to broker to construct the route transient ConcurrentMap currIdMap = new ConcurrentHashMap(); - //register to broker to help detect remapping failure - transient ConcurrentMap prevIdMap = new ConcurrentHashMap(); public TopicQueueMappingInfo(String topic, int totalQueues, String bname, long epoch) { this.topic = topic; @@ -79,8 +76,4 @@ public class TopicQueueMappingInfo extends RemotingSerializable { public ConcurrentMap getCurrIdMap() { return currIdMap; } - - public ConcurrentMap getPrevIdMap() { - return prevIdMap; - } } diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java index b2108572..60a0f811 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java @@ -168,6 +168,7 @@ public class RouteInfoManager { if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) { topicQueueMappingInfoTable.put(entry.getKey(), new HashMap()); } + //Note asset brokerName equal entry.getValue().getBname() topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue()); } } -- GitLab