提交 f12eceb4 编写于 作者: D dongeforever

Add the UpdateStaticTopicSubCommand

上级 0c6ee5c5
......@@ -10,6 +10,7 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
......@@ -25,6 +26,42 @@ public class ClientMetadata {
private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
new ConcurrentHashMap<String, HashMap<String, Integer>>();
public void freshTopicRoute(String topic, TopicRouteData topicRouteData) {
if (topic == null
|| topicRouteData == null) {
return;
}
TopicRouteData old = this.topicRouteTable.get(topic);
if (!topicRouteDataIsChange(old, topicRouteData)) {
return ;
}
{
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
}
{
ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, topicRouteData);
if (mqEndPoints != null
&& !mqEndPoints.isEmpty()) {
topicEndPointsTable.put(topic, mqEndPoints);
}
}
}
public static boolean topicRouteDataIsChange(TopicRouteData olddata, TopicRouteData nowdata) {
if (olddata == null || nowdata == null)
return true;
TopicRouteData old = new TopicRouteData(olddata);
TopicRouteData now = new TopicRouteData(nowdata);
Collections.sort(old.getQueueDatas());
Collections.sort(old.getBrokerDatas());
Collections.sort(now.getQueueDatas());
Collections.sort(now.getBrokerDatas());
return !old.equals(now);
}
public String getBrokerNameFromMessageQueue(final MessageQueue mq) {
if (topicEndPointsTable.get(mq.getTopic()) != null
......
......@@ -16,22 +16,27 @@
*/
package org.apache.rocketmq.tools.command.topic;
import com.google.common.collect.ImmutableList;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.LogicQueueMappingItem;
import org.apache.rocketmq.common.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueMappingCommand;
import java.util.Set;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
public class UpdateStaticTopicSubCommand implements SubCommand {
......@@ -68,124 +73,115 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
return options;
}
private void validate(Map.Entry<String, TopicConfigAndQueueMapping> entry, boolean shouldNull) {
if (shouldNull) {
if (entry.getValue().getTopicQueueMappingInfo() != null) {
throw new RuntimeException("Mapping info should be null in broker " + entry.getKey());
}
} else {
if (entry.getValue().getTopicQueueMappingInfo() == null) {
throw new RuntimeException("Mapping info should not be null in broker " + entry.getKey());
}
if (!entry.getKey().equals(entry.getValue().getTopicQueueMappingInfo().getBname())) {
throw new RuntimeException(String.format("The broker name is not equal %s != %s ", entry.getKey(), entry.getValue().getTopicQueueMappingInfo().getBname()));
}
}
}
public void validateQueueMappingInfo(Map<Integer, ImmutableList<LogicQueueMappingItem>> globalIdMap, TopicQueueMappingDetail mappingDetail) {
if (mappingDetail.isDirty()) {
throw new RuntimeException("The mapping info is dirty in broker " + mappingDetail.getBname());
}
for (Map.Entry<Integer, ImmutableList<LogicQueueMappingItem>> entry : mappingDetail.getHostedQueues().entrySet()) {
Integer globalid = entry.getKey();
String leaerBrokerName = entry.getValue().iterator().next().getBname();
if (!leaerBrokerName.equals(mappingDetail.getBname())) {
//not the leader
continue;
}
if (globalIdMap.containsKey(globalid)) {
throw new RuntimeException(String.format("The queue id is duplicated in broker %s %s", leaerBrokerName, mappingDetail.getBname()));
} else {
globalIdMap.put(globalid, entry.getValue());
}
}
}
@Override
public void execute(final CommandLine commandLine, final Options options,
RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
ClientMetadata clientMetadata = new ClientMetadata();
Map<String, TopicConfigAndQueueMapping> existedTopicConfigMap = new HashMap<>();
Map<Integer, ImmutableList<LogicQueueMappingItem>> globalIdMap = new HashMap<>();
try {
String topic = commandLine.getOptionValue('t').trim();
int queueNum = Integer.parseInt(commandLine.getOptionValue("qn").trim());
String cluster = commandLine.getOptionValue('c').trim();
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
if (clusterInfo == null
|| clusterInfo.getClusterAddrTable().isEmpty()
|| clusterInfo.getClusterAddrTable().get(cluster) == null
|| clusterInfo.getClusterAddrTable().get(cluster).isEmpty()) {
throw new RuntimeException("The Cluster info is null for " + cluster);
}
clientMetadata.refreshClusterInfo(clusterInfo);
//first check the topic route
//first get the existed topic config and mapping
{
TopicRouteData routeData = defaultMQAdminExt.examineTopicRouteInfo(topic);
clientMetadata.freshTopicRoute(topic, routeData);
if (routeData != null
&& !routeData.getQueueDatas().isEmpty()) {
for (QueueData queueData: routeData.getQueueDatas()) {
String bname = queueData.getBrokerName();
String addr = clientMetadata.findMasterBrokerAddr(bname);
TopicConfigAndQueueMapping mapping = (TopicConfigAndQueueMapping) defaultMQAdminExt.examineTopicConfig(addr, topic);
//allow the mapping info is null
if (mapping != null) {
existedTopicConfigMap.put(bname, mapping);
}
}
}
}
TopicConfig topicConfig = new TopicConfig();
topicConfig.setReadQueueNums(8);
topicConfig.setWriteQueueNums(8);
topicConfig.setTopicName(commandLine.getOptionValue('t').trim());
// readQueueNums
if (commandLine.hasOption('r')) {
topicConfig.setReadQueueNums(Integer.parseInt(commandLine.getOptionValue('r').trim()));
}
// writeQueueNums
if (commandLine.hasOption('w')) {
topicConfig.setWriteQueueNums(Integer.parseInt(commandLine.getOptionValue('w').trim()));
}
// perm
if (commandLine.hasOption('p')) {
topicConfig.setPerm(Integer.parseInt(commandLine.getOptionValue('p').trim()));
}
boolean isUnit = false;
if (commandLine.hasOption('u')) {
isUnit = Boolean.parseBoolean(commandLine.getOptionValue('u').trim());
}
boolean isCenterSync = false;
if (commandLine.hasOption('s')) {
isCenterSync = Boolean.parseBoolean(commandLine.getOptionValue('s').trim());
{
if (!existedTopicConfigMap.isEmpty()) {
Iterator<Map.Entry<String, TopicConfigAndQueueMapping>> it = existedTopicConfigMap.entrySet().iterator();
Map.Entry<String, TopicConfigAndQueueMapping> first = it.next();
validate(first, false);
validateQueueMappingInfo(globalIdMap, first.getValue().getTopicQueueMappingInfo());
TopicQueueMappingDetail firstMapping = first.getValue().getTopicQueueMappingInfo();
while (it.hasNext()) {
Map.Entry<String, TopicConfigAndQueueMapping> next = it.next();
validate(next, false);
validateQueueMappingInfo(globalIdMap, next.getValue().getTopicQueueMappingInfo());
TopicQueueMappingDetail nextMapping = next.getValue().getTopicQueueMappingInfo();
if (firstMapping.getEpoch() != nextMapping.getEpoch()) {
throw new RuntimeException(String.format("epoch dose not match %d != %d in %s %s", firstMapping.getEpoch(), nextMapping.getEpoch(), firstMapping.getBname(), nextMapping.getBname()));
}
if (firstMapping.getTotalQueues() != nextMapping.getTotalQueues()) {
throw new RuntimeException(String.format("total queue number dose not match %d != %d in %s %s", firstMapping.getTotalQueues(), nextMapping.getTotalQueues(), firstMapping.getBname(), nextMapping.getBname()));
}
}
if (firstMapping.getTotalQueues() != globalIdMap.size()) {
throw new RuntimeException(String.format("The total queue number in config dose not match the real hosted queues %d != %d", firstMapping.getTotalQueues(), globalIdMap.size()));
}
}
}
int topicCenterSync = TopicSysFlag.buildSysFlag(isUnit, isCenterSync);
topicConfig.setTopicSysFlag(topicCenterSync);
boolean isOrder = false;
if (commandLine.hasOption('o')) {
isOrder = Boolean.parseBoolean(commandLine.getOptionValue('o').trim());
if (queueNum < globalIdMap.size()) {
throw new RuntimeException(String.format("Cannot decrease the queue num for static topic %d < %d", queueNum, globalIdMap.size()));
}
topicConfig.setOrder(isOrder);
boolean useLogicalQueue = false;
if (commandLine.hasOption("lq")) {
useLogicalQueue = Boolean.parseBoolean(commandLine.getOptionValue("lq").trim());
//check the queue number
if (queueNum == globalIdMap.size()) {
throw new RuntimeException("The topic queue num is equal the existed queue num, do nothing");
}
//the check is ok, now do the real
if (commandLine.hasOption('b')) {
if (useLogicalQueue) {
System.out.printf("-lq and -b can not be used together.%n");
return;
}
String addr = commandLine.getOptionValue('b').trim();
defaultMQAdminExt.start();
defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
if (isOrder) {
String brokerName = CommandUtil.fetchBrokerNameByAddr(defaultMQAdminExt, addr);
String orderConf = brokerName + ":" + topicConfig.getWriteQueueNums();
defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(), orderConf, false);
System.out.printf("%s", String.format("set broker orderConf. isOrder=%s, orderConf=[%s]",
isOrder, orderConf.toString()));
}
System.out.printf("create topic to %s success.%n", addr);
System.out.printf("%s", topicConfig);
return;
} else if (commandLine.hasOption('c')) {
String clusterName = commandLine.getOptionValue('c').trim();
defaultMQAdminExt.start();
Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
for (String addr : masterSet) {
defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
System.out.printf("create topic to %s success.%n", addr);
}
if (isOrder) {
Set<String> brokerNameSet =
CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExt, clusterName);
StringBuilder orderConf = new StringBuilder();
String splitor = "";
for (String s : brokerNameSet) {
orderConf.append(splitor).append(s).append(":")
.append(topicConfig.getWriteQueueNums());
splitor = ";";
}
defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(),
orderConf.toString(), true);
System.out.printf("set cluster orderConf. isOrder=%s, orderConf=[%s]", isOrder, orderConf);
}
System.out.printf("%s", topicConfig);
if (useLogicalQueue) {
new UpdateTopicLogicalQueueMappingCommand().execute(defaultMQAdminExt, topicConfig.getTopicName(), masterSet);
}
return;
}
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
} catch (Exception e) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册