diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java index 5cee8c8f95226c17ca1637bb16f7d87fd76cbffd..29a7261b846b90c9250acb797abc41869a92d47d 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.tools.command.topic; import com.google.common.collect.ImmutableList; +import com.sun.xml.internal.ws.api.BindingIDFactory; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionGroup; @@ -201,22 +202,28 @@ public class UpdateStaticTopicSubCommand implements SubCommand { //construct the topic configAndMapping long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis()); - newIdToBroker.forEach( (queueId, broker) -> { + for (Map.Entry e : newIdToBroker.entrySet()) { + Integer queueId = e.getKey(); + String value = e.getValue(); + if (globalIdMap.containsKey(queueId)) { + //ignore the exited + continue; + } TopicConfigAndQueueMapping configMapping; - if (!existedTopicConfigMap.containsKey(broker)) { + if (!existedTopicConfigMap.containsKey(value)) { TopicConfig topicConfig = new TopicConfig(topic, 1, 1); - TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, queueNum, broker, epoch); + TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, queueNum, value, epoch); configMapping = new TopicConfigAndQueueMapping(topicConfig, mappingDetail); } else { - configMapping = existedTopicConfigMap.get(broker); + configMapping = existedTopicConfigMap.get(value); configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1); configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1); configMapping.getMappingDetail().setEpoch(epoch); configMapping.getMappingDetail().setTotalQueues(queueNum); } - LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1); + LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, value, 0, 0, -1, -1, -1); configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem)); - }); + } //If some succeed, and others fail, it will cause inconsistent data for (Map.Entry entry : existedTopicConfigMap.entrySet()) {