From a8ef92e94dccbdbee52c86ae8a57620e1783d623 Mon Sep 17 00:00:00 2001 From: dongeforever Date: Wed, 24 Nov 2021 15:33:37 +0800 Subject: [PATCH] Fix the serialize probelm --- .../rocketmq/broker/BrokerController.java | 5 +- .../processor/AdminBrokerProcessor.java | 2 +- .../processor/PullMessageProcessor.java | 2 +- .../topic/TopicQueueMappingManager.java | 11 ++-- .../statictopic/LogicQueueMappingItem.java | 33 ++++++++-- .../statictopic/TopicQueueMappingContext.java | 8 ++- .../statictopic/TopicQueueMappingDetail.java | 65 ++++++++----------- .../statictopic/TopicQueueMappingInfo.java | 14 +++- .../statictopic/TopicQueueMappingOne.java | 8 ++- .../statictopic/TopicQueueMappingUtils.java | 17 +++-- .../statictopic/TopicQueueMappingTest.java | 31 ++++----- .../tools/admin/DefaultMQAdminExtImpl.java | 6 +- 12 files changed, 110 insertions(+), 92 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 9493c86c..b1e98f72 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -95,6 +95,7 @@ import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail; import org.apache.rocketmq.common.statictopic.TopicQueueMappingInfo; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; @@ -1084,7 +1085,7 @@ public class BrokerController { Map topicQueueMappingInfoMap = topicConfigList.stream() .map(TopicConfig::getTopicName) .map(topicName -> Optional.ofNullable(this.topicQueueMappingManager.getTopicQueueMapping(topicName)) - .map(info -> new AbstractMap.SimpleImmutableEntry<>(topicName, info.cloneAsMappingInfo())) + .map(info -> new AbstractMap.SimpleImmutableEntry<>(topicName, TopicQueueMappingDetail.cloneAsMappingInfo(info))) .orElse(null)) .filter(Objects::nonNull) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); @@ -1103,7 +1104,7 @@ public class BrokerController { topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable()); topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map( - entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), entry.getValue().cloneAsMappingInfo()) + entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue())) ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 443ae463..6fb95515 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -645,7 +645,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements || !mappingDetail.getBname().equals(mappingItem.getBname())) { return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname())); } - ImmutableList mappingItems = mappingContext.getMappingItemList(); + List mappingItems = mappingContext.getMappingItemList(); //TODO should make sure the timestampOfOffset is equal or bigger than the searched timestamp Long timestamp = requestHeader.getTimestamp(); long offset = -1; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index 24d3f079..7f704cc3 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -187,7 +187,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements //handle max offset { if (mappingItem.checkIfEndOffsetDecided()) { - responseHeader.setMaxOffset(Math.max(mappingItem.computeMaxStaticQueueOffset(), mappingDetail.computeMaxOffsetFromMapping(mappingContext.getGlobalId()))); + responseHeader.setMaxOffset(Math.max(mappingItem.computeMaxStaticQueueOffset(), TopicQueueMappingDetail.computeMaxOffsetFromMapping(mappingDetail, mappingContext.getGlobalId()))); } else { responseHeader.setMaxOffset(mappingItem.computeStaticQueueOffsetUpToEnd(responseHeader.getMaxOffset())); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java index d5b76b87..4b9d328b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java @@ -34,6 +34,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -93,8 +94,8 @@ public class TopicQueueMappingManager extends ConfigManager { throw new RuntimeException(String.format("Can't accept data with small epoch %d < %d", newDetail.getEpoch(), oldDetail.getEpoch())); } for (Integer globalId : oldDetail.getHostedQueues().keySet()) { - ImmutableList oldItems = oldDetail.getHostedQueues().get(globalId); - ImmutableList newItems = newDetail.getHostedQueues().get(globalId); + List oldItems = oldDetail.getHostedQueues().get(globalId); + List newItems = newDetail.getHostedQueues().get(globalId); if (newItems == null) { //keep the old newDetail.getHostedQueues().put(globalId, oldItems); @@ -191,18 +192,18 @@ public class TopicQueueMappingManager extends ConfigManager { return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, globalOffset, mappingDetail, null, null); } - ImmutableList mappingItemList = null; + List mappingItemList = null; LogicQueueMappingItem mappingItem = null; if (globalOffset == null || Long.MAX_VALUE == globalOffset) { - mappingItemList = mappingDetail.getMappingInfo(globalId); + mappingItemList = TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId); if (mappingItemList != null && mappingItemList.size() > 0) { mappingItem = mappingItemList.get(mappingItemList.size() - 1); } } else { - mappingItemList = mappingDetail.getMappingInfo(globalId); + mappingItemList = TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId); mappingItem = TopicQueueMappingDetail.findLogicQueueMappingItem(mappingItemList, globalOffset); } return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, globalOffset, mappingDetail, mappingItemList, mappingItem); diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java index b87d2f1a..16f41e5e 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java @@ -1,16 +1,23 @@ package org.apache.rocketmq.common.statictopic; -public class LogicQueueMappingItem { +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; - private final int gen; // immutable - private final int queueId; //, immutable - private final String bname; //important, immutable +public class LogicQueueMappingItem extends RemotingSerializable { + + private int gen; // immutable + private int queueId; //, immutable + private String bname; //important, immutable private long logicOffset; // the start of the logic offset, important, can be changed by command only once - private final long startOffset; // the start of the physical offset, should always be 0, immutable + private long startOffset; // the start of the physical offset, should always be 0, immutable private long endOffset = -1; // the end of the physical offset, excluded, revered -1, mutable private long timeOfStart = -1; // mutable, reserved private long timeOfEnd = -1; // mutable, reserved + //make sure it has a default constructor + public LogicQueueMappingItem() { + + } + public LogicQueueMappingItem(int gen, int queueId, String bname, long logicOffset, long startOffset, long endOffset, long timeOfStart, long timeOfEnd) { this.gen = gen; this.queueId = queueId; @@ -112,6 +119,22 @@ public class LogicQueueMappingItem { this.timeOfEnd = timeOfEnd; } + public void setGen(int gen) { + this.gen = gen; + } + + public void setQueueId(int queueId) { + this.queueId = queueId; + } + + public void setBname(String bname) { + this.bname = bname; + } + + public void setStartOffset(long startOffset) { + this.startOffset = startOffset; + } + @Override public String toString() { return "LogicQueueMappingItem{" + diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java index b639c6ad..d6d359d9 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java @@ -18,15 +18,17 @@ package org.apache.rocketmq.common.statictopic; import com.google.common.collect.ImmutableList; +import java.util.List; + public class TopicQueueMappingContext { private String topic; private Integer globalId; private Long globalOffset; private TopicQueueMappingDetail mappingDetail; - private ImmutableList mappingItemList; + private List mappingItemList; private LogicQueueMappingItem mappingItem; - public TopicQueueMappingContext(String topic, Integer globalId, Long globalOffset, TopicQueueMappingDetail mappingDetail, ImmutableList mappingItemList, LogicQueueMappingItem mappingItem) { + public TopicQueueMappingContext(String topic, Integer globalId, Long globalOffset, TopicQueueMappingDetail mappingDetail, List mappingItemList, LogicQueueMappingItem mappingItem) { this.topic = topic; this.globalId = globalId; this.globalOffset = globalOffset; @@ -73,7 +75,7 @@ public class TopicQueueMappingContext { this.mappingDetail = mappingDetail; } - public ImmutableList getMappingItemList() { + public List getMappingItemList() { return mappingItemList; } diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java index b80aa9d5..4a8bae3e 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java @@ -16,8 +16,6 @@ */ package org.apache.rocketmq.common.statictopic; -import com.google.common.collect.ImmutableList; - import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -27,50 +25,45 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { // the mapping info in current broker, do not register to nameserver // make sure this value is not null - private ConcurrentMap> hostedQueues = new ConcurrentHashMap>(); - - + private ConcurrentMap> hostedQueues = new ConcurrentHashMap>(); + //make sure there is a default constructor public TopicQueueMappingDetail() { } - public TopicQueueMappingDetail(String topic, int totalQueues, String bname, long epoch) { super(topic, totalQueues, bname, epoch); - buildIdMap(); } - public boolean putMappingInfo(Integer globalId, ImmutableList mappingInfo) { + public static boolean putMappingInfo(TopicQueueMappingDetail mappingDetail, Integer globalId, List mappingInfo) { if (mappingInfo.isEmpty()) { return true; } - hostedQueues.put(globalId, mappingInfo); - buildIdMap(); + mappingDetail.hostedQueues.put(globalId, mappingInfo); return true; } - public void buildIdMap() { - this.currIdMap = buildIdMap(LEVEL_0); + public static List getMappingInfo(TopicQueueMappingDetail mappingDetail, Integer globalId) { + return mappingDetail.hostedQueues.get(globalId); } - - public ConcurrentMap buildIdMap(int level) { + public static ConcurrentMap buildIdMap(TopicQueueMappingDetail mappingDetail, int level) { //level 0 means current leader in this broker //level 1 means previous leader in this broker, reserved for assert level == LEVEL_0 ; - if (hostedQueues == null || hostedQueues.isEmpty()) { + if (mappingDetail.hostedQueues == null || mappingDetail.hostedQueues.isEmpty()) { return new ConcurrentHashMap(); } ConcurrentMap tmpIdMap = new ConcurrentHashMap(); - for (Map.Entry> entry: hostedQueues.entrySet()) { + for (Map.Entry> entry: mappingDetail.hostedQueues.entrySet()) { Integer globalId = entry.getKey(); - ImmutableList items = entry.getValue(); + List items = entry.getValue(); if (level == LEVEL_0 && items.size() >= 1) { LogicQueueMappingItem curr = items.get(items.size() - 1); - if (bname.equals(curr.getBname())) { + if (mappingDetail.bname.equals(curr.getBname())) { tmpIdMap.put(globalId, curr.getQueueId()); } } @@ -78,14 +71,8 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { return tmpIdMap; } - public ImmutableList getMappingInfo(Integer globalId) { - return hostedQueues.get(globalId); - } - - - - public static LogicQueueMappingItem findLogicQueueMappingItem(ImmutableList mappingItems, long logicOffset) { + public static LogicQueueMappingItem findLogicQueueMappingItem(List mappingItems, long logicOffset) { if (mappingItems == null || mappingItems.isEmpty()) { return null; @@ -106,8 +93,8 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { return null; } - public long computeMaxOffsetFromMapping(Integer globalId) { - List mappingItems = getMappingInfo(globalId); + public static long computeMaxOffsetFromMapping(TopicQueueMappingDetail mappingDetail, Integer globalId) { + List mappingItems = getMappingInfo(mappingDetail, globalId); if (mappingItems == null || mappingItems.isEmpty()) { return -1; @@ -117,24 +104,24 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { } - public TopicQueueMappingInfo cloneAsMappingInfo() { - TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname, this.epoch); - topicQueueMappingInfo.currIdMap = this.buildIdMap(LEVEL_0); + public static TopicQueueMappingInfo cloneAsMappingInfo(TopicQueueMappingDetail mappingDetail) { + TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(mappingDetail.topic, mappingDetail.totalQueues, mappingDetail.bname, mappingDetail.epoch); + topicQueueMappingInfo.currIdMap = TopicQueueMappingDetail.buildIdMap(mappingDetail, LEVEL_0); return topicQueueMappingInfo; } - public ConcurrentMap> getHostedQueues() { - return hostedQueues; + public static boolean checkIfAsPhysical(TopicQueueMappingDetail mappingDetail, Integer globalId) { + List mappingItems = getMappingInfo(mappingDetail, globalId); + return mappingItems == null + || (mappingItems.size() == 1 + && mappingItems.get(0).getLogicOffset() == 0); } - public void setHostedQueues(ConcurrentMap> hostedQueues) { - this.hostedQueues = hostedQueues; + public ConcurrentMap> getHostedQueues() { + return hostedQueues; } - public boolean checkIfAsPhysical(Integer globalId) { - List mappingItems = getMappingInfo(globalId); - return mappingItems == null - || (mappingItems.size() == 1 - && mappingItems.get(0).getLogicOffset() == 0); + public void setHostedQueues(ConcurrentMap> hostedQueues) { + this.hostedQueues = hostedQueues; } } diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java index 39747b3a..ba5af9b8 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java @@ -30,7 +30,7 @@ public class TopicQueueMappingInfo extends RemotingSerializable { long epoch; //important to fence the old dirty data boolean dirty; //indicate if the data is dirty //register to broker to construct the route - transient ConcurrentMap currIdMap = new ConcurrentHashMap(); + protected ConcurrentMap currIdMap = new ConcurrentHashMap(); public TopicQueueMappingInfo() { @@ -80,4 +80,16 @@ public class TopicQueueMappingInfo extends RemotingSerializable { public ConcurrentMap getCurrIdMap() { return currIdMap; } + + public void setTopic(String topic) { + this.topic = topic; + } + + public void setBname(String bname) { + this.bname = bname; + } + + public void setCurrIdMap(ConcurrentMap currIdMap) { + this.currIdMap = currIdMap; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java index 644b3355..d802575a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java @@ -19,14 +19,16 @@ package org.apache.rocketmq.common.statictopic; import com.google.common.collect.ImmutableList; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; +import java.util.List; + public class TopicQueueMappingOne extends RemotingSerializable { String topic; // redundant field String bname; //identify the hosted broker name Integer globalId; - ImmutableList items; + List items; - public TopicQueueMappingOne(String topic, String bname, Integer globalId, ImmutableList items) { + public TopicQueueMappingOne(String topic, String bname, Integer globalId, List items) { this.topic = topic; this.bname = bname; this.globalId = globalId; @@ -45,7 +47,7 @@ public class TopicQueueMappingOne extends RemotingSerializable { return globalId; } - public ImmutableList getItems() { + public List getItems() { return items; } } diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java index e83ed2a5..6e6b15ba 100644 --- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java @@ -34,7 +34,6 @@ import java.util.Map; import java.util.Queue; import java.util.Random; import java.util.Set; -import java.util.stream.Collectors; public class TopicQueueMappingUtils { @@ -168,7 +167,7 @@ public class TopicQueueMappingUtils { return new AbstractMap.SimpleEntry(maxEpoch, maxNum); } - public static void makeSureLogicQueueMappingItemImmutable(ImmutableList oldItems, ImmutableList newItems) { + public static void makeSureLogicQueueMappingItemImmutable(List oldItems, List newItems) { if (oldItems == null || oldItems.isEmpty()) { return; } @@ -198,7 +197,7 @@ public class TopicQueueMappingUtils { } - public static void checkLogicQueueMappingItemOffset(ImmutableList items) { + public static void checkLogicQueueMappingItemOffset(List items) { if (items == null || items.isEmpty()) { return; @@ -248,7 +247,7 @@ public class TopicQueueMappingUtils { if (mappingDetail.totalQueues > maxNum) { maxNum = mappingDetail.totalQueues; } - for (Map.Entry> entry : mappingDetail.getHostedQueues().entrySet()) { + for (Map.Entry> entry : mappingDetail.getHostedQueues().entrySet()) { Integer globalid = entry.getKey(); checkLogicQueueMappingItemOffset(entry.getValue()); String leaderBrokerName = getLeaderBroker(entry.getValue()); @@ -278,10 +277,10 @@ public class TopicQueueMappingUtils { return globalIdMap; } - public static String getLeaderBroker(ImmutableList items) { + public static String getLeaderBroker(List items) { return getLeaderItem(items).getBname(); } - public static LogicQueueMappingItem getLeaderItem(ImmutableList items) { + public static LogicQueueMappingItem getLeaderItem(List items) { assert items.size() > 0; return items.get(items.size() - 1); } @@ -367,7 +366,7 @@ public class TopicQueueMappingUtils { configMapping.setReadQueueNums(configMapping.getReadQueueNums() + 1); } LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1); - configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem)); + TopicQueueMappingDetail.putMappingInfo(configMapping.getMappingDetail(), queueId, ImmutableList.of(mappingItem)); } // set the topic config @@ -458,8 +457,8 @@ public class TopicQueueMappingUtils { ImmutableList resultItems = ImmutableList.copyOf(items); //Use the same object - mapInConfig.getMappingDetail().putMappingInfo(queueId, resultItems); - mapOutConfig.getMappingDetail().putMappingInfo(queueId, resultItems); + TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), queueId, resultItems); + TopicQueueMappingDetail.putMappingInfo(mapOutConfig.getMappingDetail(), queueId, resultItems); } for (Map.Entry entry : brokerConfigMap.entrySet()) { diff --git a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java index b0cc5dd7..d571f659 100644 --- a/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingTest.java @@ -8,17 +8,10 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.junit.Assert; import org.junit.Test; -import java.io.File; import java.util.Map; public class TopicQueueMappingTest { - @Test - public void testWriteToFile() { - System.out.println(System.getProperty("java.io.tmpdir")); - System.out.println(File.separator); - } - @Test public void testJsonSerialize() { LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(1, 2, "broker01", 33333333333333333L, 44444444444444444L, 555555555555555555L, 6666666666666666L, 77777777777777777L); @@ -29,35 +22,33 @@ public class TopicQueueMappingTest { Assert.assertEquals(mappingItemMap.get("bname"), mappingItem.getBname()); Assert.assertEquals(mappingItemMap.get("gen"), mappingItem.getGen()); Assert.assertEquals(mappingItemMap.get("logicOffset"), mappingItem.getLogicOffset()); - Assert.assertEquals(mappingItemMap.get("queueId"), mappingItem.getQueueId()); Assert.assertEquals(mappingItemMap.get("startOffset"), mappingItem.getStartOffset()); Assert.assertEquals(mappingItemMap.get("endOffset"), mappingItem.getEndOffset()); Assert.assertEquals(mappingItemMap.get("timeOfStart"), mappingItem.getTimeOfStart()); Assert.assertEquals(mappingItemMap.get("timeOfEnd"), mappingItem.getTimeOfEnd()); + } + //test the decode encode { - String mappingItemJson2 = RemotingSerializable.toJson(RemotingSerializable.decode(mappingItemJson.getBytes(), LogicQueueMappingItem.class), false); - Assert.assertEquals(mappingItemJson, mappingItemJson2); + LogicQueueMappingItem mappingItemFromJson = RemotingSerializable.fromJson(mappingItemJson, LogicQueueMappingItem.class); + Assert.assertEquals(mappingItemJson, RemotingSerializable.toJson(mappingItemFromJson, false)); } TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail("test", 1, "broker01", System.currentTimeMillis()); - mappingDetail.putMappingInfo(0, ImmutableList.of(mappingItem)); + TopicQueueMappingDetail.putMappingInfo(mappingDetail, 0, ImmutableList.of(mappingItem)); String mappingDetailJson = JSON.toJSONString(mappingDetail); { Map mappingDetailMap = JSON.parseObject(mappingDetailJson); - Assert.assertFalse(mappingDetailMap.containsKey("prevIdMap")); - Assert.assertFalse(mappingDetailMap.containsKey("currIdMap")); - Assert.assertEquals(6, mappingDetailMap.size()); + Assert.assertTrue(mappingDetailMap.containsKey("currIdMap")); + Assert.assertEquals(7, mappingDetailMap.size()); Assert.assertEquals(1, ((JSONObject) mappingDetailMap.get("hostedQueues")).size()); Assert.assertEquals(1, ((JSONArray)((JSONObject) mappingDetailMap.get("hostedQueues")).get("0")).size()); } { - System.out.println(mappingDetailJson); - TopicQueueMappingDetail detailFromJson = RemotingSerializable.decode(mappingDetailJson.getBytes(), TopicQueueMappingDetail.class); - System.out.println(JSON.toJSONString(detailFromJson)); - - //Assert.assertEquals(1, detailFromJson.getHostedQueues().size()); - //Assert.assertEquals(1, detailFromJson.getHostedQueues().get("0").size()); + TopicQueueMappingDetail mappingDetailFromJson = RemotingSerializable.decode(mappingDetailJson.getBytes(), TopicQueueMappingDetail.class); + Assert.assertEquals(1, mappingDetailFromJson.getHostedQueues().size()); + Assert.assertEquals(1, mappingDetailFromJson.getHostedQueues().get(0).size()); + Assert.assertEquals(mappingDetailJson, RemotingSerializable.toJson(mappingDetailFromJson, false)); } } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 3c491cd0..d375b13e 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -1137,8 +1137,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { String addr = clientMetadata.findMasterBrokerAddr(broker); TopicStatsTable statsTable = examineTopicStats(addr, topic); TopicConfigAndQueueMapping mapOutConfig = brokerConfigMap.get(broker); - for (Map.Entry> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) { - ImmutableList items = entry.getValue(); + for (Map.Entry> entry : mapOutConfig.getMappingDetail().getHostedQueues().entrySet()) { + List items = entry.getValue(); Integer globalId = entry.getKey(); if (items.size() < 2) { continue; @@ -1159,7 +1159,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { newLeader.setLogicOffset(TopicQueueMappingUtils.blockSeqRoundUp(oldLeader.computeStaticQueueOffset(topicOffset.getMaxOffset()), blockSeqSize)); TopicConfigAndQueueMapping mapInConfig = brokerConfigMap.get(newLeader.getBname()); //fresh the new leader - mapInConfig.getMappingDetail().putMappingInfo(globalId, items); + TopicQueueMappingDetail.putMappingInfo(mapInConfig.getMappingDetail(), globalId, items); } } //Step4: write to the new leader with logic offset -- GitLab