From 215c0e4d4b3ae091e80e39570dffabdbc7ff4f7c Mon Sep 17 00:00:00 2001 From: dongeforever Date: Wed, 17 Nov 2021 15:23:30 +0800 Subject: [PATCH] Add the adminExt --- .../processor/AdminBrokerProcessor.java | 2 +- .../rocketmq/client/impl/MQAdminImpl.java | 19 ++ .../rocketmq/client/impl/MQClientAPIImpl.java | 37 +++- .../common/TopicConfigAndQueueMapping.java | 13 +- .../common/TopicQueueMappingDetail.java | 6 +- .../common/TopicQueueMappingInfo.java | 8 +- .../protocol/body/TopicQueueMappingBody.java | 31 ++- .../tools/admin/DefaultMQAdminExt.java | 6 + .../tools/admin/DefaultMQAdminExtImpl.java | 6 + .../rocketmq/tools/admin/MQAdminExt.java | 8 +- .../topic/UpdateStaticTopicSubCommand.java | 186 ++++++++++++++++++ 11 files changed, 303 insertions(+), 19 deletions(-) create mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 35843160..c6c74ca4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -329,7 +329,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig); - this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingBody); + this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingBody.getMappingDetail()); this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion()); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java index ce4c94ad..2fec60ab 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java @@ -28,6 +28,7 @@ import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.TopicQueueMappingDetail; import org.apache.rocketmq.common.help.FAQUrl; import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageConst; @@ -79,6 +80,24 @@ public class MQAdminImpl { this.timeoutMillis = timeoutMillis; } + public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail) throws MQClientException { + MQClientException exception = null; + for (int i = 0; i < 3; i++) { + try { + this.mQClientFactory.getMQClientAPIImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, timeoutMillis); + break; + } catch (Exception e) { + if (2 == i) { + exception = new MQClientException("create topic to broker exception", e); + } + } + } + if (exception != null) { + throw exception; + } + } + + public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { createTopic(key, newTopic, queueNum, 0); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 2d9f17db..8dedfc72 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -57,6 +57,8 @@ import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.TopicConfigAndQueueMapping; +import org.apache.rocketmq.common.TopicQueueMappingDetail; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.TopicStatsTable; @@ -2704,22 +2706,53 @@ public class MQClientAPIImpl { migrateTopicLogicalQueue(RequestCode.MIGRATE_TOPIC_LOGICAL_QUEUE_NOTIFY, brokerAddr, fromQueueRouteData, toQueueRouteData, timeoutMillis); } - public TopicConfig getTopicConfig(final String brokerAddr, String topic, + public TopicConfigAndQueueMapping getTopicConfig(final String brokerAddr, String topic, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader(); header.setTopic(topic); + header.setWithMapping(true); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_CONFIG, header); RemotingCommand response = this.remotingClient .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { - return RemotingSerializable.decode(response.getBody(), TopicConfig.class); + return RemotingSerializable.decode(response.getBody(), TopicConfigAndQueueMapping.class); } default: break; } throw new MQBrokerException(response.getCode(), response.getRemark()); } + + public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail topicQueueMappingDetail, + final long timeoutMillis) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader(); + requestHeader.setTopic(topicConfig.getTopicName()); + requestHeader.setDefaultTopic(defaultTopic); + requestHeader.setReadQueueNums(topicConfig.getReadQueueNums()); + requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums()); + requestHeader.setPerm(topicConfig.getPerm()); + requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name()); + requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag()); + requestHeader.setOrder(topicConfig.isOrder()); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_STATIC_TOPIC, requestHeader); + request.setBody(topicQueueMappingDetail.encode()); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return; + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java b/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java index e7fef8dd..3bc7f24d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicConfigAndQueueMapping.java @@ -16,20 +16,19 @@ */ package org.apache.rocketmq.common; -public class TopicConfigAndQueueMapping { - private TopicConfig topicConfig; +public class TopicConfigAndQueueMapping extends TopicConfig { private TopicQueueMappingDetail topicQueueMappingDetail; + public TopicConfigAndQueueMapping() { + } + public TopicConfigAndQueueMapping(TopicConfig topicConfig, TopicQueueMappingDetail topicQueueMappingDetail) { - this.topicConfig = topicConfig; + super(topicConfig); this.topicQueueMappingDetail = topicQueueMappingDetail; } + public TopicQueueMappingDetail getTopicQueueMappingInfo() { return topicQueueMappingDetail; } - - public TopicConfig getTopicConfig() { - return topicConfig; - } } diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java index 75f2c521..c5d6ebb0 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java @@ -29,8 +29,8 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { // make sure this value is not null private ConcurrentMap> hostedQueues = new ConcurrentHashMap>(); - public TopicQueueMappingDetail(String topic, int totalQueues, String bname) { - super(topic, totalQueues, bname); + public TopicQueueMappingDetail(String topic, int totalQueues, String bname, int gen) { + super(topic, totalQueues, bname, gen); buildIdMap(); } @@ -118,7 +118,7 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo { public TopicQueueMappingInfo cloneAsMappingInfo() { - TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname); + TopicQueueMappingInfo topicQueueMappingInfo = new TopicQueueMappingInfo(this.topic, this.totalQueues, this.bname, this.gen); topicQueueMappingInfo.currIdMap = this.buildIdMap(LEVEL_0); topicQueueMappingInfo.prevIdMap = this.buildIdMap(LEVEL_1); diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java index b4a92f38..7f4a201d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingInfo.java @@ -28,15 +28,17 @@ public class TopicQueueMappingInfo extends RemotingSerializable { String topic; // redundant field int totalQueues; String bname; //identify the hosted broker name + int gen; //important to fence the old dirty data //register to broker to construct the route transient ConcurrentMap currIdMap = new ConcurrentHashMap(); //register to broker to help detect remapping failure transient ConcurrentMap prevIdMap = new ConcurrentHashMap(); - public TopicQueueMappingInfo(String topic, int totalQueues, String bname) { + public TopicQueueMappingInfo(String topic, int totalQueues, String bname, int gen) { this.topic = topic; this.totalQueues = totalQueues; this.bname = bname; + this.gen = gen; } @@ -56,6 +58,10 @@ public class TopicQueueMappingInfo extends RemotingSerializable { return topic; } + public int getGen() { + return gen; + } + public ConcurrentMap getCurrIdMap() { return currIdMap; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java index edb1d34c..f55ad40f 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingBody.java @@ -17,10 +17,35 @@ package org.apache.rocketmq.common.protocol.body; import org.apache.rocketmq.common.TopicQueueMappingDetail; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; -public class TopicQueueMappingBody extends TopicQueueMappingDetail { +public class TopicQueueMappingBody extends RemotingSerializable { - public TopicQueueMappingBody(String topic, int totalQueues, String bname) { - super(topic, totalQueues, bname); + private boolean force; + private int prevGen; + private TopicQueueMappingDetail mappingDetail; + + public int getPrevGen() { + return prevGen; + } + + public void setPrevGen(int prevGen) { + this.prevGen = prevGen; + } + + public TopicQueueMappingDetail getMappingDetail() { + return mappingDetail; + } + + public void setMappingDetail(TopicQueueMappingDetail mappingDetail) { + this.mappingDetail = mappingDetail; + } + + public boolean isForce() { + return force; + } + + public void setForce(boolean force) { + this.force = force; } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index d17c717e..52eecffb 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -28,6 +28,7 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.TopicQueueMappingDetail; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.RollbackStats; import org.apache.rocketmq.common.admin.TopicStatsTable; @@ -653,6 +654,11 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { return this.defaultMQAdminExtImpl.migrateTopicLogicalQueueCommit(fromQueueRouteData, toQueueRouteData); } + @Override + public void createStaticTopic(String addr, String defaultTopic, TopicConfig topicConfig, TopicQueueMappingDetail mappingDetail) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail); + } + @Override public void migrateTopicLogicalQueueNotify(String brokerAddr, LogicalQueueRouteData fromQueueRouteData, LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 2f2e4913..4ddf1619 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -42,6 +42,7 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.TopicQueueMappingDetail; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.OffsetWrapper; @@ -1087,6 +1088,11 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { this.mqClientInstance.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag); } + @Override + public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail) throws MQClientException { + this.mqClientInstance.getMQAdminImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail); + } + @Override public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, timestamp); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 5c53ead6..15b97dba 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -27,6 +27,7 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.TopicQueueMappingDetail; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.RollbackStats; import org.apache.rocketmq.common.admin.TopicStatsTable; @@ -332,7 +333,10 @@ public interface MQAdminExt extends MQAdmin { LogicalQueueRouteData fromQueueRouteData, LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException; - void migrateTopicLogicalQueueNotify(String brokerAddr, - LogicalQueueRouteData fromQueueRouteData, + + void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException; + + void migrateTopicLogicalQueueNotify(String brokerAddr, LogicalQueueRouteData fromQueueRouteData, LogicalQueueRouteData toQueueRouteData) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException; } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java new file mode 100644 index 00000000..1ca9fd5e --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.topic; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionGroup; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.sysflag.TopicSysFlag; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.CommandUtil; +import org.apache.rocketmq.tools.command.SubCommand; +import org.apache.rocketmq.tools.command.SubCommandException; +import org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueMappingCommand; + +import java.util.Set; + +public class UpdateStaticTopicSubCommand implements SubCommand { + + @Override + public String commandName() { + return "updateStaticTopic"; + } + + @Override + public String commandDesc() { + return "Update or create static topic, which has fixed number of queues"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + OptionGroup optionGroup = new OptionGroup(); + + Option opt = null; + + opt = new Option("c", "clusterName", true, "create topic to which cluster"); + optionGroup.addOption(opt); + + optionGroup.setRequired(true); + options.addOptionGroup(optionGroup); + + opt = new Option("t", "topic", true, "topic name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("qn", "totalQueueNum", true, "total queue num"); + opt.setRequired(true); + options.addOption(opt); + + return options; + } + + @Override + public void execute(final CommandLine commandLine, final Options options, + RPCHook rpcHook) throws SubCommandException { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setReadQueueNums(8); + topicConfig.setWriteQueueNums(8); + topicConfig.setTopicName(commandLine.getOptionValue('t').trim()); + + // readQueueNums + if (commandLine.hasOption('r')) { + topicConfig.setReadQueueNums(Integer.parseInt(commandLine.getOptionValue('r').trim())); + } + + // writeQueueNums + if (commandLine.hasOption('w')) { + topicConfig.setWriteQueueNums(Integer.parseInt(commandLine.getOptionValue('w').trim())); + } + + // perm + if (commandLine.hasOption('p')) { + topicConfig.setPerm(Integer.parseInt(commandLine.getOptionValue('p').trim())); + } + + boolean isUnit = false; + if (commandLine.hasOption('u')) { + isUnit = Boolean.parseBoolean(commandLine.getOptionValue('u').trim()); + } + + boolean isCenterSync = false; + if (commandLine.hasOption('s')) { + isCenterSync = Boolean.parseBoolean(commandLine.getOptionValue('s').trim()); + } + + int topicCenterSync = TopicSysFlag.buildSysFlag(isUnit, isCenterSync); + topicConfig.setTopicSysFlag(topicCenterSync); + + boolean isOrder = false; + if (commandLine.hasOption('o')) { + isOrder = Boolean.parseBoolean(commandLine.getOptionValue('o').trim()); + } + topicConfig.setOrder(isOrder); + + boolean useLogicalQueue = false; + if (commandLine.hasOption("lq")) { + useLogicalQueue = Boolean.parseBoolean(commandLine.getOptionValue("lq").trim()); + } + + if (commandLine.hasOption('b')) { + if (useLogicalQueue) { + System.out.printf("-lq and -b can not be used together.%n"); + return; + } + + String addr = commandLine.getOptionValue('b').trim(); + + defaultMQAdminExt.start(); + defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig); + + if (isOrder) { + String brokerName = CommandUtil.fetchBrokerNameByAddr(defaultMQAdminExt, addr); + String orderConf = brokerName + ":" + topicConfig.getWriteQueueNums(); + defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(), orderConf, false); + System.out.printf("%s", String.format("set broker orderConf. isOrder=%s, orderConf=[%s]", + isOrder, orderConf.toString())); + } + System.out.printf("create topic to %s success.%n", addr); + System.out.printf("%s", topicConfig); + return; + + } else if (commandLine.hasOption('c')) { + String clusterName = commandLine.getOptionValue('c').trim(); + + defaultMQAdminExt.start(); + + Set masterSet = + CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); + + for (String addr : masterSet) { + defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig); + System.out.printf("create topic to %s success.%n", addr); + } + + if (isOrder) { + Set brokerNameSet = + CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExt, clusterName); + StringBuilder orderConf = new StringBuilder(); + String splitor = ""; + for (String s : brokerNameSet) { + orderConf.append(splitor).append(s).append(":") + .append(topicConfig.getWriteQueueNums()); + splitor = ";"; + } + defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(), + orderConf.toString(), true); + System.out.printf("set cluster orderConf. isOrder=%s, orderConf=[%s]", isOrder, orderConf); + } + + System.out.printf("%s", topicConfig); + + if (useLogicalQueue) { + new UpdateTopicLogicalQueueMappingCommand().execute(defaultMQAdminExt, topicConfig.getTopicName(), masterSet); + } + return; + } + + ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} -- GitLab