提交 a5af2cf5 编写于 作者: D dongeforever

Finish the UpdateStaticTopicSubCommand

上级 295a6bde
......@@ -47,6 +47,12 @@ public class TopicConfig {
this.order = other.order;
}
public TopicConfig(String topicName, int readQueueNums, int writeQueueNums) {
this.topicName = topicName;
this.readQueueNums = readQueueNums;
this.writeQueueNums = writeQueueNums;
}
public TopicConfig(String topicName, int readQueueNums, int writeQueueNums, int perm) {
this.topicName = topicName;
this.readQueueNums = readQueueNums;
......
......@@ -17,18 +17,17 @@
package org.apache.rocketmq.common;
public class TopicConfigAndQueueMapping extends TopicConfig {
private TopicQueueMappingDetail topicQueueMappingDetail;
private TopicQueueMappingDetail mappingDetail;
public TopicConfigAndQueueMapping() {
}
public TopicConfigAndQueueMapping(TopicConfig topicConfig, TopicQueueMappingDetail topicQueueMappingDetail) {
public TopicConfigAndQueueMapping(TopicConfig topicConfig, TopicQueueMappingDetail mappingDetail) {
super(topicConfig);
this.topicQueueMappingDetail = topicQueueMappingDetail;
this.mappingDetail = mappingDetail;
}
public TopicQueueMappingDetail getTopicQueueMappingInfo() {
return topicQueueMappingDetail;
public TopicQueueMappingDetail getMappingDetail() {
return mappingDetail;
}
}
......@@ -55,9 +55,6 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
return totalQueues;
}
public void setTotalQueues(int totalQueues) {
this.totalQueues = totalQueues;
}
public String getBname() {
return bname;
......@@ -71,6 +68,14 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
return epoch;
}
public void setEpoch(int epoch) {
this.epoch = epoch;
}
public void setTotalQueues(int totalQueues) {
this.totalQueues = totalQueues;
}
public ConcurrentMap<Integer, Integer> getCurrIdMap() {
return currIdMap;
}
......
......@@ -29,16 +29,18 @@ import java.util.Random;
public class TopicQueueMappingUtils {
public static class MappingState {
public static class MappingAllocator {
Map<String, Integer> brokerNumMap = new HashMap<String, Integer>();
Map<Integer, String> idToBroker = new HashMap<Integer, String>();
int currentIndex = 0;
Random random = new Random();
List<String> leastBrokers = new ArrayList<String>();
private MappingState(Map<String, Integer> brokerNumMap) {
private MappingAllocator(Map<Integer, String> idToBroker, Map<String, Integer> brokerNumMap) {
this.idToBroker.putAll(idToBroker);
this.brokerNumMap.putAll(brokerNumMap);
}
public void freshState() {
private void freshState() {
int minNum = -1;
for (Map.Entry<String, Integer> entry : brokerNumMap.entrySet()) {
if (entry.getValue() > minNum) {
......@@ -50,20 +52,41 @@ public class TopicQueueMappingUtils {
}
currentIndex = random.nextInt(leastBrokers.size());
}
public String nextBroker() {
private String nextBroker() {
if (leastBrokers.isEmpty()) {
freshState();
}
int tmpIndex = (++currentIndex) % leastBrokers.size();
String broker = leastBrokers.remove(tmpIndex);
currentIndex--;
return broker;
int tmpIndex = currentIndex % leastBrokers.size();
return leastBrokers.remove(tmpIndex);
}
public Map<String, Integer> getBrokerNumMap() {
return brokerNumMap;
}
public void upToNum(int maxQueueNum) {
int currSize = idToBroker.size();
if (maxQueueNum <= currSize) {
return;
}
for (int i = currSize; i < maxQueueNum; i++) {
String nextBroker = nextBroker();
if (brokerNumMap.containsKey(nextBroker)) {
brokerNumMap.put(nextBroker, brokerNumMap.get(nextBroker) + 1);
} else {
brokerNumMap.put(nextBroker, 1);
}
idToBroker.put(i, nextBroker);
}
}
public Map<Integer, String> getIdToBroker() {
return idToBroker;
}
}
public static MappingState buildMappingState(Map<String, Integer> brokerNumMap) {
return new MappingState(brokerNumMap);
public static MappingAllocator buildMappingAllocator(Map<Integer, String> idToBroker, Map<String, Integer> brokerNumMap) {
return new MappingAllocator(idToBroker, brokerNumMap);
}
public static Map.Entry<Integer, Integer> findMaxEpochAndQueueNum(List<TopicQueueMappingDetail> mappingDetailList) {
......@@ -92,14 +115,14 @@ public class TopicQueueMappingUtils {
for (TopicQueueMappingDetail mappingDetail : mappingDetailList) {
for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mappingDetail.getHostedQueues().entrySet()) {
Integer globalid = entry.getKey();
String leaerBrokerName = entry.getValue().iterator().next().getBname();
if (!leaerBrokerName.equals(mappingDetail.getBname())) {
String leaderBrokerName = getLeaderBroker(entry.getValue());
if (!leaderBrokerName.equals(mappingDetail.getBname())) {
//not the leader
continue;
}
if (globalIdMap.containsKey(globalid)) {
if (!replace) {
throw new RuntimeException(String.format("The queue id is duplicated in broker %s %s", leaerBrokerName, mappingDetail.getBname()));
throw new RuntimeException(String.format("The queue id is duplicated in broker %s %s", leaderBrokerName, mappingDetail.getBname()));
}
} else {
globalIdMap.put(globalid, entry.getValue());
......@@ -108,4 +131,12 @@ public class TopicQueueMappingUtils {
}
return globalIdMap;
}
public static String getLeaderBroker(ImmutableList<LogicQueueMappingItem> items) {
return getLeaderItem(items).getBname();
}
public static LogicQueueMappingItem getLeaderItem(ImmutableList<LogicQueueMappingItem> items) {
assert items.size() > 0;
return items.get(items.size() - 1);
}
}
......@@ -22,6 +22,7 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.LogicQueueMappingItem;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
import org.apache.rocketmq.common.TopicQueueMappingUtils;
......@@ -35,11 +36,13 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class UpdateStaticTopicSubCommand implements SubCommand {
......@@ -77,18 +80,6 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
return options;
}
private void validateIfNull(Map.Entry<String, TopicConfigAndQueueMapping> entry, boolean shouldNull) {
if (shouldNull) {
if (entry.getValue().getTopicQueueMappingInfo() != null) {
throw new RuntimeException("Mapping info should be null in broker " + entry.getKey());
}
} else {
if (entry.getValue().getTopicQueueMappingInfo() == null) {
throw new RuntimeException("Mapping info should not be null in broker " + entry.getKey());
}
}
}
@Override
public void execute(final CommandLine commandLine, final Options options,
......@@ -100,73 +91,88 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
Map<String, TopicConfigAndQueueMapping> existedTopicConfigMap = new HashMap<>();
Map<Integer, ImmutableList<LogicQueueMappingItem>> globalIdMap = new HashMap<>();
try {
if (!commandLine.hasOption('t')
|| !commandLine.hasOption('c')
|| !commandLine.hasOption("qn")) {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
return;
}
String topic = commandLine.getOptionValue('t').trim();
int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim());
String cluster = commandLine.getOptionValue('c').trim();
String clusters = commandLine.getOptionValue('c').trim();
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
if (clusterInfo == null
|| clusterInfo.getClusterAddrTable().isEmpty()
|| clusterInfo.getClusterAddrTable().get(cluster) == null
|| clusterInfo.getClusterAddrTable().get(cluster).isEmpty()) {
throw new RuntimeException("The Cluster info is null for " + cluster);
|| clusterInfo.getClusterAddrTable().isEmpty()) {
throw new RuntimeException("The Cluster info is empty");
} else {
clientMetadata.refreshClusterInfo(clusterInfo);
}
clientMetadata.refreshClusterInfo(clusterInfo);
//first get the existed topic config and mapping
{
TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
clientMetadata.freshTopicRoute(topic, routeData);
if (routeData != null
Set<String> brokers = new HashSet<>();
for (String cluster : clusters.split(",")) {
cluster = cluster.trim();
if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
brokers.addAll(clusterInfo.getClusterAddrTable().get(cluster));
}
}
if (brokers.isEmpty()) {
throw new RuntimeException("Find none brokers for " + clusters);
}
//get the existed topic config and mapping
TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
clientMetadata.freshTopicRoute(topic, routeData);
if (routeData != null
&& !routeData.getQueueDatas().isEmpty()) {
for (QueueData queueData: routeData.getQueueDatas()) {
String bname = queueData.getBrokerName();
String addr = clientMetadata.findMasterBrokerAddr(bname);
TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
//allow the mapping info is null
if (mapping != null) {
existedTopicConfigMap.put(bname, mapping);
}
for (QueueData queueData: routeData.getQueueDatas()) {
String bname = queueData.getBrokerName();
String addr = clientMetadata.findMasterBrokerAddr(bname);
TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
//allow the config is null
if (mapping != null) {
existedTopicConfigMap.put(bname, mapping);
}
}
}
// the
{
if (!existedTopicConfigMap.isEmpty()) {
//make sure it it not null
existedTopicConfigMap.entrySet().forEach(entry -> {
validateIfNull(entry, false);
});
//make sure the detail is not dirty
existedTopicConfigMap.entrySet().forEach(entry -> {
if (!entry.getKey().equals(entry.getValue().getTopicQueueMappingInfo().getBname())) {
throw new RuntimeException(String.format("The broker name is not equal %s != %s ", entry.getKey(), entry.getValue().getTopicQueueMappingInfo().getBname()));
}
if (entry.getValue().getTopicQueueMappingInfo().isDirty()) {
throw new RuntimeException("The mapping info is dirty in broker " + entry.getValue().getTopicQueueMappingInfo().getBname());
}
});
List<TopicQueueMappingDetail> detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getTopicQueueMappingInfo).collect(Collectors.toList());
//check the epoch and qnum
Map.Entry<Integer, Integer> maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
detailList.forEach( mappingDetail -> {
if (maxEpochAndNum.getKey() != mappingDetail.getEpoch()) {
throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", maxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname()));
}
if (maxEpochAndNum.getValue() != mappingDetail.getTotalQueues()) {
throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", maxEpochAndNum.getValue(), mappingDetail.getTotalQueues(), mappingDetail.getBname()));
}
});
globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(detailList), false);
if (maxEpochAndNum.getValue() != globalIdMap.size()) {
throw new RuntimeException(String.format("The total queue number in config dose not match the real hosted queues %d != %d", maxEpochAndNum.getValue(), globalIdMap.size()));
Map.Entry<Integer, Integer> maxEpochAndNum = new AbstractMap.SimpleImmutableEntry<>(-1, queueNum);
if (!existedTopicConfigMap.isEmpty()) {
//make sure it it not null
existedTopicConfigMap.forEach((key, value) -> {
if (value.getMappingDetail() != null) {
throw new RuntimeException("Mapping info should be null in broker " + key);
}
});
//make sure the detail is not dirty
existedTopicConfigMap.forEach((key, value) -> {
if (!key.equals(value.getMappingDetail().getBname())) {
throw new RuntimeException(String.format("The broker name is not equal %s != %s ", key, value.getMappingDetail().getBname()));
}
for (int i = 0; i < maxEpochAndNum.getValue(); i++) {
if (!globalIdMap.containsKey(i)) {
throw new RuntimeException(String.format("The queue number %s is not in globalIdMap", i));
}
if (value.getMappingDetail().isDirty()) {
throw new RuntimeException("The mapping info is dirty in broker " + value.getMappingDetail().getBname());
}
});
List<TopicQueueMappingDetail> detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList());
//check the epoch and qnum
maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
final Map.Entry<Integer, Integer> tmpMaxEpochAndNum = maxEpochAndNum;
detailList.forEach( mappingDetail -> {
if (tmpMaxEpochAndNum.getKey() != mappingDetail.getEpoch()) {
throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", tmpMaxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname()));
}
if (tmpMaxEpochAndNum.getValue() != mappingDetail.getTotalQueues()) {
throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", tmpMaxEpochAndNum.getValue(), mappingDetail.getTotalQueues(), mappingDetail.getBname()));
}
});
globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(detailList), false);
if (maxEpochAndNum.getValue() != globalIdMap.size()) {
throw new RuntimeException(String.format("The total queue number in config dose not match the real hosted queues %d != %d", maxEpochAndNum.getValue(), globalIdMap.size()));
}
for (int i = 0; i < maxEpochAndNum.getValue(); i++) {
if (!globalIdMap.containsKey(i)) {
throw new RuntimeException(String.format("The queue number %s is not in globalIdMap", i));
}
}
}
......@@ -177,11 +183,47 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
if (queueNum == globalIdMap.size()) {
throw new RuntimeException("The topic queue num is equal the existed queue num, do nothing");
}
//the check is ok, now do the real
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
//the check is ok, now do the mapping allocation
Map<String, Integer> brokerNumMap = brokers.stream().collect(Collectors.toMap( x -> x, x -> 0));
Map<Integer, String> idToBroker = new HashMap<>();
globalIdMap.forEach((key, value) -> {
String leaderbroker = TopicQueueMappingUtils.getLeaderBroker(value);
idToBroker.put(key, leaderbroker);
if (!brokerNumMap.containsKey(leaderbroker)) {
brokerNumMap.put(leaderbroker, 1);
} else {
brokerNumMap.put(leaderbroker, brokerNumMap.get(leaderbroker) + 1);
}
});
TopicQueueMappingUtils.MappingAllocator allocator = TopicQueueMappingUtils.buildMappingAllocator(idToBroker, brokerNumMap);
allocator.upToNum(queueNum);
Map<Integer, String> newIdToBroker = allocator.getIdToBroker();
//construct the topic configAndMapping
int epoch = maxEpochAndNum.getKey() + 1;
newIdToBroker.forEach( (queueId, broker) -> {
TopicConfigAndQueueMapping configMapping;
if (!existedTopicConfigMap.containsKey(broker)) {
TopicConfig topicConfig = new TopicConfig(topic, 1, 1);
TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail(topic, queueNum, broker, epoch);
configMapping = new TopicConfigAndQueueMapping(topicConfig, mappingDetail);
} else {
configMapping = existedTopicConfigMap.get(broker);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.setWriteQueueNums(configMapping.getWriteQueueNums() + 1);
configMapping.getMappingDetail().setEpoch(epoch);
configMapping.getMappingDetail().setTotalQueues(queueNum);
}
LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1);
configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
});
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : existedTopicConfigMap.entrySet()) {
String broker = entry.getKey();
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = entry.getValue();
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail());
}
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册