提交 45946d47 编写于 作者: D dongeforever

Ignore the existed queueId in static topi creation

上级 c673cb73
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package org.apache.rocketmq.tools.command.topic; package org.apache.rocketmq.tools.command.topic;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.sun.xml.internal.ws.api.BindingIDFactory;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option; import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.OptionGroup;
...@@ -201,22 +202,28 @@ public class UpdateStaticTopicSubCommand implements SubCommand { ...@@ -201,22 +202,28 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
//construct the topic configAndMapping //construct the topic configAndMapping
long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis()); long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
newIdToBroker.forEach( (queueId, broker) -> { for (Map.Entry<Integer, String> e : newIdToBroker.entrySet()) {
Integer queueId = e.getKey();
String value = e.getValue();
if (globalIdMap.containsKey(queueId)) {
//ignore the exited
continue;
}
TopicConfigAndQueueMapping configMapping; TopicConfigAndQueueMapping configMapping;
if (!existedTopicConfigMap.containsKey(broker)) { if (!existedTopicConfigMap.containsKey(value)) {
TopicConfig topicConfig = new TopicConfig(topic, 1, 1); TopicConfig topicConfig = new TopicConfig(topic, 1, 1);
TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, queueNum, broker, epoch); TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, queueNum, value, epoch);
configMapping = new TopicConfigAndQueueMapping(topicConfig, mappingDetail); configMapping = new TopicConfigAndQueueMapping(topicConfig, mappingDetail);
} else { } else {
configMapping = existedTopicConfigMap.get(broker); configMapping = existedTopicConfigMap.get(value);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1); configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1); configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.getMappingDetail().setEpoch(epoch); configMapping.getMappingDetail().setEpoch(epoch);
configMapping.getMappingDetail().setTotalQueues(queueNum); configMapping.getMappingDetail().setTotalQueues(queueNum);
} }
LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1); LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, value, 0, 0, -1, -1, -1);
configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem)); configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
}); }
//If some succeed, and others fail, it will cause inconsistent data //If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : existedTopicConfigMap.entrySet()) { for (Map.Entry<String, TopicConfigAndQueueMapping> entry : existedTopicConfigMap.entrySet()) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册