diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java index e2dd0762185f6ce3df0cc7e91aad3dfc7bce8dcc..c37a065d075dfbc5703239eee38ca155aeb7d1fa 100644 --- a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java +++ b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java @@ -10,6 +10,7 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -25,6 +26,42 @@ public class ClientMetadata { private final ConcurrentMap> brokerVersionTable = new ConcurrentHashMap>(); + public void freshTopicRoute(String topic, TopicRouteData topicRouteData) { + if (topic == null + || topicRouteData == null) { + return; + } + TopicRouteData old = this.topicRouteTable.get(topic); + if (!topicRouteDataIsChange(old, topicRouteData)) { + return ; + } + { + for (BrokerData bd : topicRouteData.getBrokerDatas()) { + this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs()); + } + } + { + ConcurrentMap mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, topicRouteData); + if (mqEndPoints != null + && !mqEndPoints.isEmpty()) { + topicEndPointsTable.put(topic, mqEndPoints); + } + } + } + + + public static boolean topicRouteDataIsChange(TopicRouteData olddata, TopicRouteData nowdata) { + if (olddata == null || nowdata == null) + return true; + TopicRouteData old = new TopicRouteData(olddata); + TopicRouteData now = new TopicRouteData(nowdata); + Collections.sort(old.getQueueDatas()); + Collections.sort(old.getBrokerDatas()); + Collections.sort(now.getQueueDatas()); + Collections.sort(now.getBrokerDatas()); + return !old.equals(now); + + } public String getBrokerNameFromMessageQueue(final MessageQueue mq) { if (topicEndPointsTable.get(mq.getTopic()) != null diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java index dc237807da32e37e45754f00e63dbe5fa20b5ebc..05f2807356a25f485ae33869c4e5b511da17c508 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java @@ -16,22 +16,27 @@ */ package org.apache.rocketmq.tools.command.topic; +import com.google.common.collect.ImmutableList; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.Options; -import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.LogicQueueMappingItem; +import org.apache.rocketmq.common.TopicConfigAndQueueMapping; +import org.apache.rocketmq.common.TopicQueueMappingDetail; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; -import org.apache.rocketmq.common.sysflag.TopicSysFlag; +import org.apache.rocketmq.common.rpc.ClientMetadata; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; -import org.apache.rocketmq.tools.command.CommandUtil; import org.apache.rocketmq.tools.command.SubCommand; import org.apache.rocketmq.tools.command.SubCommandException; -import org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueMappingCommand; -import java.util.Set; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; public class UpdateStaticTopicSubCommand implements SubCommand { @@ -68,124 +73,115 @@ public class UpdateStaticTopicSubCommand implements SubCommand { return options; } + private void validate(Map.Entry entry, boolean shouldNull) { + if (shouldNull) { + if (entry.getValue().getTopicQueueMappingInfo() != null) { + throw new RuntimeException("Mapping info should be null in broker " + entry.getKey()); + } + } else { + if (entry.getValue().getTopicQueueMappingInfo() == null) { + throw new RuntimeException("Mapping info should not be null in broker " + entry.getKey()); + } + if (!entry.getKey().equals(entry.getValue().getTopicQueueMappingInfo().getBname())) { + throw new RuntimeException(String.format("The broker name is not equal %s != %s ", entry.getKey(), entry.getValue().getTopicQueueMappingInfo().getBname())); + } + } + } + + public void validateQueueMappingInfo(Map> globalIdMap, TopicQueueMappingDetail mappingDetail) { + if (mappingDetail.isDirty()) { + throw new RuntimeException("The mapping info is dirty in broker " + mappingDetail.getBname()); + } + for (Map.Entry> entry : mappingDetail.getHostedQueues().entrySet()) { + Integer globalid = entry.getKey(); + String leaerBrokerName = entry.getValue().iterator().next().getBname(); + if (!leaerBrokerName.equals(mappingDetail.getBname())) { + //not the leader + continue; + } + if (globalIdMap.containsKey(globalid)) { + throw new RuntimeException(String.format("The queue id is duplicated in broker %s %s", leaerBrokerName, mappingDetail.getBname())); + } else { + globalIdMap.put(globalid, entry.getValue()); + } + } + } + @Override public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + ClientMetadata clientMetadata = new ClientMetadata(); + Map existedTopicConfigMap = new HashMap<>(); + Map> globalIdMap = new HashMap<>(); try { String topic = commandLine.getOptionValue('t').trim(); int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim()); String cluster = commandLine.getOptionValue('c').trim(); + ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); + if (clusterInfo == null + || clusterInfo.getClusterAddrTable().isEmpty() + || clusterInfo.getClusterAddrTable().get(cluster) == null + || clusterInfo.getClusterAddrTable().get(cluster).isEmpty()) { + throw new RuntimeException("The Cluster info is null for " + cluster); + } + clientMetadata.refreshClusterInfo(clusterInfo); - //first check the topic route + //first get the existed topic config and mapping { TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic); + clientMetadata.freshTopicRoute(topic, routeData); + if (routeData != null + && !routeData.getQueueDatas().isEmpty()) { + for (QueueData queueData: routeData.getQueueDatas()) { + String bname = queueData.getBrokerName(); + String addr = clientMetadata.findMasterBrokerAddr(bname); + TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic); + //allow the mapping info is null + if (mapping != null) { + existedTopicConfigMap.put(bname, mapping); + } + } + } } - TopicConfig topicConfig = new TopicConfig(); - topicConfig.setReadQueueNums(8); - topicConfig.setWriteQueueNums(8); - topicConfig.setTopicName(commandLine.getOptionValue('t').trim()); - - // readQueueNums - if (commandLine.hasOption('r')) { - topicConfig.setReadQueueNums(Integer.parseInt(commandLine.getOptionValue('r').trim())); - } - - // writeQueueNums - if (commandLine.hasOption('w')) { - topicConfig.setWriteQueueNums(Integer.parseInt(commandLine.getOptionValue('w').trim())); - } - - // perm - if (commandLine.hasOption('p')) { - topicConfig.setPerm(Integer.parseInt(commandLine.getOptionValue('p').trim())); - } - - boolean isUnit = false; - if (commandLine.hasOption('u')) { - isUnit = Boolean.parseBoolean(commandLine.getOptionValue('u').trim()); - } - - boolean isCenterSync = false; - if (commandLine.hasOption('s')) { - isCenterSync = Boolean.parseBoolean(commandLine.getOptionValue('s').trim()); + { + if (!existedTopicConfigMap.isEmpty()) { + Iterator> it = existedTopicConfigMap.entrySet().iterator(); + Map.Entry first = it.next(); + validate(first, false); + validateQueueMappingInfo(globalIdMap, first.getValue().getTopicQueueMappingInfo()); + TopicQueueMappingDetail firstMapping = first.getValue().getTopicQueueMappingInfo(); + while (it.hasNext()) { + Map.Entry next = it.next(); + validate(next, false); + validateQueueMappingInfo(globalIdMap, next.getValue().getTopicQueueMappingInfo()); + TopicQueueMappingDetail nextMapping = next.getValue().getTopicQueueMappingInfo(); + if (firstMapping.getEpoch() != nextMapping.getEpoch()) { + throw new RuntimeException(String.format("epoch dose not match %d != %d in %s %s", firstMapping.getEpoch(), nextMapping.getEpoch(), firstMapping.getBname(), nextMapping.getBname())); + } + if (firstMapping.getTotalQueues() != nextMapping.getTotalQueues()) { + throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s %s", firstMapping.getTotalQueues(), nextMapping.getTotalQueues(), firstMapping.getBname(), nextMapping.getBname())); + } + } + if (firstMapping.getTotalQueues() != globalIdMap.size()) { + throw new RuntimeException(String.format("The total queue number in config dose not match the real hosted queues %d != %d", firstMapping.getTotalQueues(), globalIdMap.size())); + } + } } - - int topicCenterSync = TopicSysFlag.buildSysFlag(isUnit, isCenterSync); - topicConfig.setTopicSysFlag(topicCenterSync); - - boolean isOrder = false; - if (commandLine.hasOption('o')) { - isOrder = Boolean.parseBoolean(commandLine.getOptionValue('o').trim()); + if (queueNum < globalIdMap.size()) { + throw new RuntimeException(String.format("Cannot decrease the queue num for static topic %d < %d", queueNum, globalIdMap.size())); } - topicConfig.setOrder(isOrder); - - boolean useLogicalQueue = false; - if (commandLine.hasOption("lq")) { - useLogicalQueue = Boolean.parseBoolean(commandLine.getOptionValue("lq").trim()); + //check the queue number + if (queueNum == globalIdMap.size()) { + throw new RuntimeException("The topic queue num is equal the existed queue num, do nothing"); } + //the check is ok, now do the real - if (commandLine.hasOption('b')) { - if (useLogicalQueue) { - System.out.printf("-lq and -b can not be used together.%n"); - return; - } - - String addr = commandLine.getOptionValue('b').trim(); - - defaultMQAdminExt.start(); - defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig); - - if (isOrder) { - String brokerName = CommandUtil.fetchBrokerNameByAddr(defaultMQAdminExt, addr); - String orderConf = brokerName + ":" + topicConfig.getWriteQueueNums(); - defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(), orderConf, false); - System.out.printf("%s", String.format("set broker orderConf. isOrder=%s, orderConf=[%s]", - isOrder, orderConf.toString())); - } - System.out.printf("create topic to %s success.%n", addr); - System.out.printf("%s", topicConfig); - return; - } else if (commandLine.hasOption('c')) { - String clusterName = commandLine.getOptionValue('c').trim(); - - defaultMQAdminExt.start(); - - Set masterSet = - CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); - - for (String addr : masterSet) { - defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig); - System.out.printf("create topic to %s success.%n", addr); - } - - if (isOrder) { - Set brokerNameSet = - CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExt, clusterName); - StringBuilder orderConf = new StringBuilder(); - String splitor = ""; - for (String s : brokerNameSet) { - orderConf.append(splitor).append(s).append(":") - .append(topicConfig.getWriteQueueNums()); - splitor = ";"; - } - defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(), - orderConf.toString(), true); - System.out.printf("set cluster orderConf. isOrder=%s, orderConf=[%s]", isOrder, orderConf); - } - - System.out.printf("%s", topicConfig); - - if (useLogicalQueue) { - new UpdateTopicLogicalQueueMappingCommand().execute(defaultMQAdminExt, topicConfig.getTopicName(), masterSet); - } - return; - } ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); } catch (Exception e) {