提交 e9cafe9f 编写于 作者: D dongeforever

Polish the remapping logic

上级 25a588b8
......@@ -194,6 +194,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
//the check is ok, now do the mapping allocation
int maxNum = maxEpochAndNum.getValue();
long maxEpoch = maxEpochAndNum.getKey();
TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(new HashMap<>(), brokers.stream().collect(Collectors.toMap( x -> x, x -> 0)));
allocator.upToNum(maxNum);
Map<String, Integer> expectedBrokerNumMap = allocator.getBrokerNumMap();
......@@ -217,40 +218,50 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
});
expectedBrokerNumMap.forEach((broker, queueNum) -> {
for (int i = 0; i < queueNum; i++) {
expectedIdToBroker.put(waitAssignQueues.poll(), broker);
Integer queueId = waitAssignQueues.poll();
assert queueId != null;
expectedIdToBroker.put(queueId, broker);
}
});
Set<Broker>
//Now construct the remapping info
//construct the topic configAndMapping
long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
for (Map.Entry<Integer, String> e : expectedIdToBroker.entrySet()) {
Integer queueId = e.getKey();
String broker = e.getValue();
if (globalIdMap.containsKey(queueId)) {
//ignore the exited
Set<String> brokersToMapOut = new HashSet<>();
Set<String> brokersToMapIn = new HashSet<>();
for (Map.Entry<Integer, String> mapEntry : expectedIdToBroker.entrySet()) {
Integer queueId = mapEntry.getKey();
String broker = mapEntry.getValue();
TopicQueueMappingOne topicQueueMappingOne = globalIdMap.get(queueId);
assert topicQueueMappingOne != null;
if (topicQueueMappingOne.getBname().equals(broker)) {
continue;
}
TopicConfigAndQueueMapping configMapping;
if (!existedTopicConfigMap.containsKey(broker)) {
TopicConfig topicConfig = new TopicConfig(topic, 1, 1);
TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, 0, broker, epoch);
configMapping = new TopicConfigAndQueueMapping(topicConfig, mappingDetail);
existedTopicConfigMap.put(broker, configMapping);
} else {
configMapping = existedTopicConfigMap.get(broker);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.getMappingDetail().setEpoch(epoch);
configMapping.getMappingDetail().setTotalQueues(0);
}
LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1);
configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
//remapping
String mapInBroker = broker;
String mapOutBroker = topicQueueMappingOne.getBname();
brokersToMapIn.add(mapInBroker);
brokersToMapOut.add(mapOutBroker);
TopicConfigAndQueueMapping mapInConfig = existedTopicConfigMap.get(mapInBroker);
TopicConfigAndQueueMapping mapOutConfig = existedTopicConfigMap.get(mapOutBroker);
mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1);
mapInConfig.setWriteQueueNums(mapInConfig.getWriteQueueNums() + 1);
List<LogicQueueMappingItem> items = new ArrayList<>(topicQueueMappingOne.getItems());
LogicQueueMappingItem last = items.get(items.size() - 1);
items.add(new LogicQueueMappingItem(last.getGen() + 1, mapInConfig.getWriteQueueNums() - 1, mapInBroker, -1, 0, -1, -1, -1));
ImmutableList<LogicQueueMappingItem> resultItems = ImmutableList.copyOf(items);
mapInConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
mapOutConfig.getMappingDetail().putMappingInfo(queueId, resultItems);
}
long epoch = Math.max(maxEpochAndNum.getKey() + 1000, System.currentTimeMillis());
existedTopicConfigMap.values().forEach( configMapping -> {
configMapping.getMappingDetail().setEpoch(epoch);
configMapping.getMappingDetail().setTotalQueues(maxNum);
});
//decide the new offset
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : existedTopicConfigMap.entrySet()) {
String broker = entry.getKey();
......
......@@ -226,7 +226,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
} else {
configMapping = existedTopicConfigMap.get(broker);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.setReadQueueNums(configMapping.getReadQueueNums() + 1);
}
LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1);
configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册