From a64ad01020e77b22bef18d5e240cb99df18170d4 Mon Sep 17 00:00:00 2001 From: dongeforever Date: Mon, 8 Nov 2021 11:08:31 +0800 Subject: [PATCH] Add some notes, and revert the id map, set the globalId first --- .../org/apache/rocketmq/common/TopicQueueMappingDetail.java | 4 ++-- .../org/apache/rocketmq/common/TopicQueueMappingInfo.java | 4 ++-- .../apache/rocketmq/common/protocol/route/TopicRouteData.java | 3 ++- .../apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java | 2 +- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java index a90ca7eb..d3e3d92a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java @@ -63,13 +63,13 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { && items.size() >= 1) { LogicQueueMappingItem curr = items.get(items.size() - 1); if (bname.equals(curr.getBname())) { - tmpIdMap.put(curr.getQueueId(), globalId); + tmpIdMap.put(globalId, curr.getQueueId()); } } else if (level == LEVEL_1 && items.size() >= 2) { LogicQueueMappingItem prev = items.get(items.size() - 1); if (bname.equals(prev.getBname())) { - tmpIdMap.put(prev.getQueueId(), globalId); + tmpIdMap.put(globalId, prev.getQueueId()); } } } diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java index c5bbeef4..2d76365d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java @@ -29,9 +29,9 @@ public class TopicQueueMappingInfo extends RemotingSerializable { int totalQueues; String bname; //identify the hosted broker name //register to broker to construct the route - ConcurrentMap currIdMap = new ConcurrentHashMap(); + ConcurrentMap currIdMap = new ConcurrentHashMap(); //register to broker to help detect remapping failure - protected ConcurrentMap prevIdMap = new ConcurrentHashMap(); + ConcurrentMap prevIdMap = new ConcurrentHashMap(); public TopicQueueMappingInfo(String topic, int totalQueues, String bname) { this.topic = topic; diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java index c2cad6c4..2ce6a53b 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java @@ -33,7 +33,8 @@ public class TopicRouteData extends RemotingSerializable { private List queueDatas; private List brokerDatas; private HashMap/* Filter Server */> filterServerTable; - private Map topicQueueMappingByBroker; + //It could be null or empty + private Map topicQueueMappingByBroker; public TopicRouteData() { } diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java index bfdf53ca..90bc1976 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java @@ -163,7 +163,7 @@ public class RouteInfoManager { TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper); Map topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap(); - + //the topicQueueMappingInfoMap should never be null, but can be empty for (Map.Entry entry : topicQueueMappingInfoMap.entrySet()) { if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) { topicQueueMappingInfoTable.put(entry.getKey(), new HashMap()); -- GitLab