From e9cafe9fef748b92ea4d020e16bb95192ffaadff Mon Sep 17 00:00:00 2001 From: dongeforever Date: Thu, 18 Nov 2021 14:36:47 +0800 Subject: [PATCH] Polish the remapping logic --- .../topic/RemappingStaticTopicSubCommand.java | 63 +++++++++++-------- .../topic/UpdateStaticTopicSubCommand.java | 2 +- 2 files changed, 38 insertions(+), 27 deletions(-) diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java index 4cc5acfc..34aec179 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java @@ -194,6 +194,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand { //the check is ok, now do the mapping allocation int maxNum = maxEpochAndNum.getValue(); + long maxEpoch = maxEpochAndNum.getKey(); TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<>(), brokers.stream().collect(Collectors.toMap( x -> x, x -> 0))); allocator.upToNum(maxNum); Map expectedBrokerNumMap = allocator.getBrokerNumMap(); @@ -217,40 +218,50 @@ public class RemappingStaticTopicSubCommand implements SubCommand { }); expectedBrokerNumMap.forEach((broker, queueNum) -> { for (int i = 0; i < queueNum; i++) { - expectedIdToBroker.put(waitAssignQueues.poll(), broker); + Integer queueId = waitAssignQueues.poll(); + assert queueId != null; + expectedIdToBroker.put(queueId, broker); } }); - Set - //Now construct the remapping info - - //construct the topic configAndMapping - long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis()); - for (Map.Entry e : expectedIdToBroker.entrySet()) { - Integer queueId = e.getKey(); - String broker = e.getValue(); - if (globalIdMap.containsKey(queueId)) { - //ignore the exited + Set brokersToMapOut = new HashSet<>(); + Set brokersToMapIn = new HashSet<>(); + for (Map.Entry mapEntry : expectedIdToBroker.entrySet()) { + Integer queueId = mapEntry.getKey(); + String broker = mapEntry.getValue(); + TopicQueueMappingOne topicQueueMappingOne = globalIdMap.get(queueId); + assert topicQueueMappingOne != null; + if (topicQueueMappingOne.getBname().equals(broker)) { continue; } - TopicConfigAndQueueMapping configMapping; - if (!existedTopicConfigMap.containsKey(broker)) { - TopicConfig topicConfig = new TopicConfig(topic, 1, 1); - TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, 0, broker, epoch); - configMapping = new TopicConfigAndQueueMapping(topicConfig, mappingDetail); - existedTopicConfigMap.put(broker, configMapping); - } else { - configMapping = existedTopicConfigMap.get(broker); - configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1); - configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1); - configMapping.getMappingDetail().setEpoch(epoch); - configMapping.getMappingDetail().setTotalQueues(0); - } - LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1); - configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem)); + //remapping + String mapInBroker = broker; + String mapOutBroker = topicQueueMappingOne.getBname(); + brokersToMapIn.add(mapInBroker); + brokersToMapOut.add(mapOutBroker); + TopicConfigAndQueueMapping mapInConfig = existedTopicConfigMap.get(mapInBroker); + TopicConfigAndQueueMapping mapOutConfig = existedTopicConfigMap.get(mapOutBroker); + + mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1); + mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1); + + List items = new ArrayList<>(topicQueueMappingOne.getItems()); + LogicQueueMappingItem last = items.get(items.size() - 1); + items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, -1, 0, -1, -1, -1)); + + ImmutableList resultItems = ImmutableList.copyOf(items); + mapInConfig.getMappingDetail().putMappingInfo(queueId, resultItems); + mapOutConfig.getMappingDetail().putMappingInfo(queueId, resultItems); } + long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis()); + existedTopicConfigMap.values().forEach( configMapping -> { + configMapping.getMappingDetail().setEpoch(epoch); + configMapping.getMappingDetail().setTotalQueues(maxNum); + }); + //decide the new offset + //If some succeed, and others fail, it will cause inconsistent data for (Map.Entry entry : existedTopicConfigMap.entrySet()) { String broker = entry.getKey(); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java index a1ff0b00..278ea9f0 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java @@ -226,7 +226,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand { } else { configMapping = existedTopicConfigMap.get(broker); configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1); - configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1); + configMapping.setReadQueueNums(configMapping.getReadQueueNums() + 1); } LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1); configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem)); -- GitLab