提交 82364fc2 编写于 作者: D dongeforever

Polish the tools

上级 ffcc5489
......@@ -103,7 +103,7 @@ public class TopicQueueMappingUtils {
return new AbstractMap.SimpleImmutableEntry<Long, Integer>(epoch, queueNum);
public static Map<Integer, TopicQueueMappingOne> buildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace) {
public static Map<Integer, TopicQueueMappingOne> buildMappingItems(List<TopicQueueMappingDetail> mappingDetailList, boolean replace, boolean checkConsistence) {
Collections.sort(mappingDetailList, new Comparator<TopicQueueMappingDetail>() {
public int compare(TopicQueueMappingDetail o1, TopicQueueMappingDetail o2) {
......@@ -111,8 +111,12 @@ public class TopicQueueMappingUtils {
int maxNum = 0;
Map<Integer, TopicQueueMappingOne> globalIdMap = new HashMap<Integer, TopicQueueMappingOne>();
for (TopicQueueMappingDetail mappingDetail : mappingDetailList) {
if (mappingDetail.totalQueues > maxNum) {
maxNum = mappingDetail.totalQueues;
for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mappingDetail.getHostedQueues().entrySet()) {
Integer globalid = entry.getKey();
String leaderBrokerName = getLeaderBroker(entry.getValue());
......@@ -129,6 +133,16 @@ public class TopicQueueMappingUtils {
if (checkConsistence) {
if (maxNum != globalIdMap.size()) {
throw new RuntimeException(String.format("The total queue number in config dose not match the real hosted queues %d != %d", maxNum, globalIdMap.size()));
for (int i = 0; i < maxNum; i++) {
if (!globalIdMap.containsKey(i)) {
throw new RuntimeException(String.format("The queue number %s is not in globalIdMap", i));
return globalIdMap;
......@@ -87,7 +87,7 @@ public class MigrateTopicLogicalQueueCommand implements SubCommand {
String toBrokerName,
Long forceDelta) throws RemotingException, MQBrokerException, InterruptedException, SubCommandException, MQClientException {
TopicRouteData topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic);
LogicalQueuesInfo logicalQueuesInfo = topicRouteInfo.getLogicalQueuesInfo();
LogicalQueuesInfo logicalQueuesInfo = null; /*topicRouteInfo.getLogicalQueuesInfo();*/
if (logicalQueuesInfo == null) {
throw new SubCommandException("topic not enabled logical queue");
......@@ -172,26 +172,15 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
List<TopicQueueMappingDetail> detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList());
//check the epoch and qnum
maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
final Map.Entry<Long, Integer> tmpMaxEpochAndNum = maxEpochAndNum;
detailList.forEach( mappingDetail -> {
if (tmpMaxEpochAndNum.getKey() != mappingDetail.getEpoch()) {
throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", tmpMaxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname()));
for (TopicQueueMappingDetail mappingDetail : detailList) {
if (maxEpochAndNum.getKey() != mappingDetail.getEpoch()) {
throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", maxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname()));
if (tmpMaxEpochAndNum.getValue() != mappingDetail.getTotalQueues()) {
throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", tmpMaxEpochAndNum.getValue(), mappingDetail.getTotalQueues(), mappingDetail.getBname()));
globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(detailList), false);
if (maxEpochAndNum.getValue() != globalIdMap.size()) {
throw new RuntimeException(String.format("The total queue number in config dose not match the real hosted queues %d != %d", maxEpochAndNum.getValue(), globalIdMap.size()));
for (int i = 0; i < maxEpochAndNum.getValue(); i++) {
if (!globalIdMap.containsKey(i)) {
throw new RuntimeException(String.format("The queue number %s is not in globalIdMap", i));
if (maxEpochAndNum.getValue() != mappingDetail.getTotalQueues()) {
throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", maxEpochAndNum.getValue(), mappingDetail.getTotalQueues(), mappingDetail.getBname()));
globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(detailList), false, true);
//the check is ok, now do the mapping allocation
int maxNum = maxEpochAndNum.getValue();
......@@ -262,6 +251,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList())), false, true);
// now do the remapping
//Step1: let the new leader can be write without the logicOffset
for (String broker: brokersToMapIn) {
......@@ -64,7 +64,10 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
Option opt = null;
opt = new Option("c", "clusters", true, "create topic to clusters, comma separated");
opt = new Option("c", "clusters", true, "remapping static topic to clusters, comma separated");
opt = new Option("b", "brokers", true, "remapping static topic to brokers, comma separated");
......@@ -94,22 +97,23 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
Set<String> brokers = new HashSet<>();
try {
if (!commandLine.hasOption('t')
|| !commandLine.hasOption('c')
if ((!commandLine.hasOption("b") && !commandLine.hasOption('c'))
|| !commandLine.hasOption('t')
|| !commandLine.hasOption("qn")) {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
String topic = commandLine.getOptionValue('t').trim();
int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim());
String clusters = commandLine.getOptionValue('c').trim();
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
if (clusterInfo == null
|| clusterInfo.getClusterAddrTable().isEmpty()) {
throw new RuntimeException("The Cluster info is empty");
} else {
String topic = commandLine.getOptionValue('t').trim();
int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim());
String clusters = commandLine.getOptionValue('c').trim();
for (String cluster : clusters.split(",")) {
cluster = cluster.trim();
if (clusterInfo.getClusterAddrTable().get(cluster) != null) {
......@@ -163,26 +167,15 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
List<TopicQueueMappingDetail> detailList = existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList());
//check the epoch and qnum
maxEpochAndNum = TopicQueueMappingUtils.findMaxEpochAndQueueNum(detailList);
final Map.Entry<Long, Integer> tmpMaxEpochAndNum = maxEpochAndNum;
detailList.forEach( mappingDetail -> {
if (tmpMaxEpochAndNum.getKey() != mappingDetail.getEpoch()) {
throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", tmpMaxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname()));
if (tmpMaxEpochAndNum.getValue() != mappingDetail.getTotalQueues()) {
throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", tmpMaxEpochAndNum.getValue(), mappingDetail.getTotalQueues(), mappingDetail.getBname()));
for (TopicQueueMappingDetail mappingDetail : detailList) {
if (maxEpochAndNum.getKey() != mappingDetail.getEpoch()) {
throw new RuntimeException(String.format("epoch dose not match %d != %d in %s", maxEpochAndNum.getKey(), mappingDetail.getEpoch(), mappingDetail.getBname()));
globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(detailList), false);
if (maxEpochAndNum.getValue() != globalIdMap.size()) {
throw new RuntimeException(String.format("The total queue number in config dose not match the real hosted queues %d != %d", maxEpochAndNum.getValue(), globalIdMap.size()));
for (int i = 0; i < maxEpochAndNum.getValue(); i++) {
if (!globalIdMap.containsKey(i)) {
throw new RuntimeException(String.format("The queue number %s is not in globalIdMap", i));
if (maxEpochAndNum.getValue() != mappingDetail.getTotalQueues()) {
throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s", maxEpochAndNum.getValue(), mappingDetail.getTotalQueues(), mappingDetail.getBname()));
globalIdMap = TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(detailList), false, true);
if (queueNum < globalIdMap.size()) {
throw new RuntimeException(String.format("Cannot decrease the queue num for static topic %d < %d", queueNum, globalIdMap.size()));
......@@ -230,10 +223,14 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(0, configMapping.getWriteQueueNums() - 1, broker, 0, 0, -1, -1, -1);
configMapping.getMappingDetail().putMappingInfo(queueId, ImmutableList.of(mappingItem));
//double check the topic config map
existedTopicConfigMap.values().forEach( configMapping -> {
TopicQueueMappingUtils.buildMappingItems(new ArrayList<>(existedTopicConfigMap.values().stream().map(TopicConfigAndQueueMapping::getMappingDetail).collect(Collectors.toList())), false, true);
//If some succeed, and others fail, it will cause inconsistent data
for (Map.Entry<String, TopicConfigAndQueueMapping> entry : existedTopicConfigMap.entrySet()) {
String broker = entry.getKey();
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册