diff --git a/distribution/conf/logback_tools.xml b/distribution/conf/logback_tools.xml
index 28283ad1d1dc4af193bdd0ea193690e96ab36c78..d4bd5c277141d6a4e6e3404e7beaaea48cd41d1b 100644
--- a/distribution/conf/logback_tools.xml
+++ b/distribution/conf/logback_tools.xml
@@ -66,6 +66,11 @@
+
+
+
+
+
diff --git a/tools/pom.xml b/tools/pom.xml
index abe8197de3974b6c3b1f52f9e5efbbf5078224dd..096d531fa75f09d4840f09ff699efdff1ac986e3 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -64,5 +64,9 @@
org.yaml
snakeyaml
+
+ com.google.guava
+ guava
+
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index c27c85c66de2ae226038de951308e0251ed06f18..1c20324d1fa073cbe7320f773f5fd58e8c124272 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -43,12 +43,16 @@ import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.KVTable;
+import org.apache.rocketmq.common.protocol.body.MigrateLogicalQueueBody;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
+import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
+import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.topic.TopicValidator;
@@ -209,7 +213,8 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
- public TopicConfig examineTopicConfig(String addr, String topic) {
+ public TopicConfig examineTopicConfig(String addr,
+ String topic) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, MQBrokerException, InterruptedException {
return defaultMQAdminExtImpl.examineTopicConfig(addr, topic);
}
@@ -578,4 +583,52 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
RemotingConnectException, MQClientException {
this.defaultMQAdminExtImpl.setMessageRequestMode(brokerAddr, topic, consumerGroup, mode, popShareQueueNum, timeoutMillis);
}
+
+ @Override
+ public void updateTopicLogicalQueueMapping(String brokerAddr, String topic, int queueId, int logicalQueueIndex) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+ this.defaultMQAdminExtImpl.updateTopicLogicalQueueMapping(brokerAddr, topic, queueId, logicalQueueIndex);
+ }
+
+ @Override
+ public LogicalQueuesInfo queryTopicLogicalQueueMapping(String brokerAddr, String topic) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException {
+ return this.defaultMQAdminExtImpl.queryTopicLogicalQueueMapping(brokerAddr, topic);
+ }
+
+ @Override
+ public void deleteTopicLogicalQueueMapping(String brokerAddr, String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ this.defaultMQAdminExtImpl.deleteTopicLogicalQueueMapping(brokerAddr, topic);
+ }
+
+ @Override
+ public LogicalQueueRouteData sealTopicLogicalQueue(String brokerAddr, LogicalQueueRouteData queueRouteData) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException {
+ return this.defaultMQAdminExtImpl.sealTopicLogicalQueue(brokerAddr, queueRouteData);
+ }
+
+ @Override public LogicalQueueRouteData reuseTopicLogicalQueue(String brokerAddr, String topic, int queueId,
+ int logicalQueueIdx, MessageQueueRouteState messageQueueRouteState) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ return this.defaultMQAdminExtImpl.reuseTopicLogicalQueue(brokerAddr, topic, queueId, logicalQueueIdx, messageQueueRouteState);
+ }
+
+ @Override public LogicalQueueRouteData createMessageQueueForLogicalQueue(String brokerAddr, String topic,
+ int logicalQueueIdx, MessageQueueRouteState messageQueueStatus) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ return this.defaultMQAdminExtImpl.createMessageQueueForLogicalQueue(brokerAddr, topic, logicalQueueIdx, messageQueueStatus);
+ }
+
+ @Override public MigrateLogicalQueueBody migrateTopicLogicalQueuePrepare(
+ LogicalQueueRouteData fromQueueRouteData,
+ LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ return this.defaultMQAdminExtImpl.migrateTopicLogicalQueuePrepare(fromQueueRouteData, toQueueRouteData);
+ }
+
+ @Override public MigrateLogicalQueueBody migrateTopicLogicalQueueCommit(
+ LogicalQueueRouteData fromQueueRouteData,
+ LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ return this.defaultMQAdminExtImpl.migrateTopicLogicalQueueCommit(fromQueueRouteData, toQueueRouteData);
+ }
+
+ @Override public void migrateTopicLogicalQueueNotify(String brokerAddr,
+ LogicalQueueRouteData fromQueueRouteData,
+ LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ this.defaultMQAdminExtImpl.migrateTopicLogicalQueueNotify(brokerAddr, fromQueueRouteData, toQueueRouteData);
+ }
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 8ae68cdecbedf40865f2c49f2c07b37689cd18b7..ac936561e969f08ef58969cb6c0d1a0d9ff21a37 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -66,6 +66,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.KVTable;
+import org.apache.rocketmq.common.protocol.body.MigrateLogicalQueueBody;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
@@ -75,6 +76,9 @@ import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
+import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
+import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
@@ -223,8 +227,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public TopicConfig examineTopicConfig(String addr, String topic) {
- return null;
+ public TopicConfig examineTopicConfig(String addr, String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ return this.mqClientInstance.getMQClientAPIImpl().getTopicConfig(addr, topic, timeoutMillis);
}
@Override
@@ -325,6 +329,54 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
return this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(topic, msgId);
}
+ @Override
+ public void updateTopicLogicalQueueMapping(String brokerAddr, String topic, int queueId, int logicalQueueIndex) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+ this.mqClientInstance.getMQClientAPIImpl().updateTopicLogicalQueue(brokerAddr, topic, queueId, logicalQueueIndex, timeoutMillis);
+ }
+
+ @Override
+ public LogicalQueuesInfo queryTopicLogicalQueueMapping(String brokerAddr, String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ return this.mqClientInstance.getMQClientAPIImpl().queryTopicLogicalQueue(brokerAddr, topic, timeoutMillis);
+ }
+
+ @Override
+ public void deleteTopicLogicalQueueMapping(String brokerAddr, String topic) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException {
+ this.mqClientInstance.getMQClientAPIImpl().deleteTopicLogicalQueueMapping(brokerAddr, topic, timeoutMillis);
+ }
+
+ @Override
+ public LogicalQueueRouteData sealTopicLogicalQueue(String brokerAddr, LogicalQueueRouteData queueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ return this.mqClientInstance.getMQClientAPIImpl().sealTopicLogicalQueue(brokerAddr, queueRouteData, timeoutMillis);
+ }
+
+ @Override public LogicalQueueRouteData reuseTopicLogicalQueue(String brokerAddr, String topic, int queueId,
+ int logicalQueueIdx, MessageQueueRouteState messageQueueRouteState) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ return this.mqClientInstance.getMQClientAPIImpl().reuseTopicLogicalQueue(brokerAddr, topic, queueId, logicalQueueIdx, messageQueueRouteState, timeoutMillis);
+ }
+
+ @Override public LogicalQueueRouteData createMessageQueueForLogicalQueue(String brokerAddr, String topic,
+ int logicalQueueIdx, MessageQueueRouteState messageQueueStatus) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ return this.mqClientInstance.getMQClientAPIImpl().createMessageQueueForLogicalQueue(brokerAddr, topic, logicalQueueIdx, messageQueueStatus, timeoutMillis);
+ }
+
+ @Override public MigrateLogicalQueueBody migrateTopicLogicalQueuePrepare(
+ LogicalQueueRouteData fromQueueRouteData,
+ LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ return this.mqClientInstance.getMQClientAPIImpl().migrateTopicLogicalQueuePrepare(fromQueueRouteData.getBrokerAddr(), fromQueueRouteData, toQueueRouteData, timeoutMillis);
+ }
+
+ @Override public MigrateLogicalQueueBody migrateTopicLogicalQueueCommit(
+ LogicalQueueRouteData fromQueueRouteData,
+ LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ return this.mqClientInstance.getMQClientAPIImpl().migrateTopicLogicalQueueCommit(toQueueRouteData.getBrokerAddr(), fromQueueRouteData, toQueueRouteData, timeoutMillis);
+ }
+
+ @Override public void migrateTopicLogicalQueueNotify(String brokerAddr,
+ LogicalQueueRouteData fromQueueRouteData,
+ LogicalQueueRouteData toQueueRouteData) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException {
+ this.mqClientInstance.getMQClientAPIImpl().migrateTopicLogicalQueueNotify(brokerAddr, fromQueueRouteData, toQueueRouteData, timeoutMillis);
+ }
+
@Override
public ConsumerConnection examineConsumerConnectionInfo(
String consumerGroup) throws InterruptedException, MQBrokerException,
@@ -982,6 +1034,10 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
return this.mqClientInstance.getMQAdminImpl().maxOffset(mq);
}
+ public long maxOffset(MessageQueue mq, boolean committed) throws MQClientException {
+ return this.mqClientInstance.getMQAdminImpl().maxOffset(mq, committed);
+ }
+
@Override
public long minOffset(MessageQueue mq) throws MQClientException {
return this.mqClientInstance.getMQAdminImpl().minOffset(mq);
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 5ce6db18997dae5f9aff531c52511ee7952b680e..c1b42f5aeb4e40a195616cbca80a628fe5d6eb12 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -42,6 +42,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.KVTable;
+import org.apache.rocketmq.common.protocol.body.MigrateLogicalQueueBody;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
@@ -49,6 +50,9 @@ import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
+import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
+import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
@@ -96,7 +100,8 @@ public interface MQAdminExt extends MQAdmin {
SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group);
- TopicConfig examineTopicConfig(final String addr, final String topic);
+ TopicConfig examineTopicConfig(final String addr,
+ final String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
TopicStatsTable examineTopicStats(
final String topic) throws RemotingException, MQClientException, InterruptedException,
@@ -295,4 +300,28 @@ public interface MQAdminExt extends MQAdmin {
MessageRequestMode mode, final int popWorkGroupSize, final long timeoutMillis)
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQClientException;
+
+ void updateTopicLogicalQueueMapping(String brokerAddr, String topic, int queueId, int logicalQueueIndex) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException;
+
+ LogicalQueuesInfo queryTopicLogicalQueueMapping(String brokerAddr, String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
+
+ void deleteTopicLogicalQueueMapping(String brokerAddr, String topic) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException;
+
+ LogicalQueueRouteData sealTopicLogicalQueue(String brokerAddr, LogicalQueueRouteData queueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
+
+ LogicalQueueRouteData reuseTopicLogicalQueue(String brokerAddr, String topic, int queueId, int logicalQueueIdx,
+ MessageQueueRouteState messageQueueRouteState) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
+
+ LogicalQueueRouteData createMessageQueueForLogicalQueue(String brokerAddr, String topic, int logicalQueueIdx,
+ MessageQueueRouteState messageQueueStatus) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
+
+ MigrateLogicalQueueBody migrateTopicLogicalQueuePrepare(LogicalQueueRouteData fromQueueRouteData, LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
+
+ MigrateLogicalQueueBody migrateTopicLogicalQueueCommit(
+ LogicalQueueRouteData fromQueueRouteData,
+ LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
+
+ void migrateTopicLogicalQueueNotify(String brokerAddr,
+ LogicalQueueRouteData fromQueueRouteData,
+ LogicalQueueRouteData toQueueRouteData) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException;
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index 6e075f79595fe042b4863be15baa23cafd74edf4..3e989391f8001844ec9a07a1ed68d8ec30757f94 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -52,6 +52,11 @@ import org.apache.rocketmq.tools.command.consumer.DeleteSubscriptionGroupCommand
import org.apache.rocketmq.tools.command.consumer.SetConsumeModeSubCommand;
import org.apache.rocketmq.tools.command.consumer.StartMonitoringSubCommand;
import org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand;
+import org.apache.rocketmq.tools.command.logicalqueue.DeleteTopicLogicalQueueMappingCommand;
+import org.apache.rocketmq.tools.command.logicalqueue.MigrateTopicLogicalQueueCommand;
+import org.apache.rocketmq.tools.command.logicalqueue.QueryTopicLogicalQueueMappingCommand;
+import org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueMappingCommand;
+import org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueNumCommand;
import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand;
import org.apache.rocketmq.tools.command.message.ConsumeMessageCommand;
import org.apache.rocketmq.tools.command.message.PrintMessageByQueueCommand;
@@ -215,6 +220,12 @@ public class MQAdminStartup {
initCommand(new ClusterAclConfigVersionListSubCommand());
initCommand(new UpdateGlobalWhiteAddrSubCommand());
initCommand(new GetAccessConfigSubCommand());
+
+ initCommand(new UpdateTopicLogicalQueueMappingCommand());
+ initCommand(new DeleteTopicLogicalQueueMappingCommand());
+ initCommand(new QueryTopicLogicalQueueMappingCommand());
+ initCommand(new MigrateTopicLogicalQueueCommand());
+ initCommand(new UpdateTopicLogicalQueueNumCommand());
}
private static void initLogback() throws JoranException {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/SubCommandException.java b/tools/src/main/java/org/apache/rocketmq/tools/command/SubCommandException.java
index fadd8532ef18178746df3c54819f91eacb9770e1..d907059c03a2a1a3b33379b5137f4e57fea95cc5 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/SubCommandException.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/SubCommandException.java
@@ -26,6 +26,10 @@ public class SubCommandException extends Exception {
super(msg);
}
+ public SubCommandException(String format, Object... args) {
+ super(String.format(format, args));
+ }
+
/**
* @param msg Message.
* @param cause Cause.
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/logicalqueue/DeleteTopicLogicalQueueMappingCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/logicalqueue/DeleteTopicLogicalQueueMappingCommand.java
new file mode 100644
index 0000000000000000000000000000000000000000..77eb918b0ee849aa6d773944f0e201f64f339ded
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/logicalqueue/DeleteTopicLogicalQueueMappingCommand.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.logicalqueue;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DeleteTopicLogicalQueueMappingCommand implements SubCommand {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.STDOUT_LOGGER_NAME);
+
+ @Override public String commandName() {
+ return "deleteTopicLogicalQueueMapping";
+ }
+
+ @Override public String commandDesc() {
+ return "delete logical queue mapping info of a topic";
+ }
+
+ @Override public Options buildCommandlineOptions(Options options) {
+ Option opt;
+
+ opt = new Option("t", "topic", true, "topic name.");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("c", "clusterName", true, "cluster name.");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("b", "brokerAddr", true, "broker addr.");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override public void execute(CommandLine commandLine, Options options,
+ RPCHook rpcHook) throws SubCommandException {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+ String topic = commandLine.getOptionValue("t").trim();
+ List brokerAddrs;
+ if (commandLine.hasOption("b")) {
+ brokerAddrs = Collections.singletonList(commandLine.getOptionValue("c").trim());
+ } else if (commandLine.hasOption("c")) {
+ String clusterName = commandLine.getOptionValue("c").trim();
+ brokerAddrs = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName).stream().sorted().collect(Collectors.toList());
+ } else {
+ ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+ return;
+ }
+ for (String brokerAddr : brokerAddrs) {
+ log.info("deleteTopicLogicalQueueMapping {} {}", brokerAddr, topic);
+ defaultMQAdminExt.deleteTopicLogicalQueueMapping(brokerAddr, topic);
+ }
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/logicalqueue/MigrateTopicLogicalQueueCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/logicalqueue/MigrateTopicLogicalQueueCommand.java
new file mode 100644
index 0000000000000000000000000000000000000000..5da8b0d4808cc1c92091aa5054ba97950e063f3f
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/logicalqueue/MigrateTopicLogicalQueueCommand.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.logicalqueue;
+
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.MigrateLogicalQueueBody;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
+import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
+import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Optional.ofNullable;
+
+public class MigrateTopicLogicalQueueCommand implements SubCommand {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.STDOUT_LOGGER_NAME);
+
+ @Override public String commandName() {
+ return "migrateTopicLogicalQueue";
+ }
+
+ @Override public String commandDesc() {
+ return "migrate a logical queue of a topic from one broker to another.";
+ }
+
+ @Override public Options buildCommandlineOptions(Options options) {
+ Option opt;
+
+ opt = new Option("t", "topic", true, "topic name.");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("i", "index", true, "logical queue index.");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("b", "brokerAddr", true, "new broker name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("fd", "forceDelta", true, "assume fromBroker down, force migrate");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ public void execute(DefaultMQAdminExt mqAdminExt, String topic, int logicalQueueIdx,
+ String toBrokerName,
+ Long forceDelta) throws RemotingException, MQBrokerException, InterruptedException, SubCommandException, MQClientException {
+ TopicRouteData topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic);
+ LogicalQueuesInfo logicalQueuesInfo = topicRouteInfo.getLogicalQueuesInfo();
+ if (logicalQueuesInfo == null) {
+ throw new SubCommandException("topic not enabled logical queue");
+ }
+ List queueRouteDataList = logicalQueuesInfo.get(logicalQueueIdx);
+ if (queueRouteDataList == null) {
+ throw new SubCommandException("logical queue %d not exist", logicalQueueIdx);
+ }
+ Map brokerAddrTable = mqAdminExt.examineBrokerClusterInfo().getBrokerAddrTable();
+ String toBrokerAddr = lookupBrokerMasterAddr(brokerAddrTable, toBrokerName);
+ if (toBrokerAddr == null) {
+ throw new SubCommandException("destination broker %s not found", toBrokerName);
+ }
+ LogicalQueueRouteData fromQueueRouteData = queueRouteDataList.stream().filter(LogicalQueueRouteData::isWritable).reduce((first, second) -> second).orElse(null);
+ if (fromQueueRouteData == null) {
+ throw new SubCommandException("logical queue %d not writable, can not migrate", logicalQueueIdx);
+ }
+ String fromBrokerName = fromQueueRouteData.getBrokerName();
+ String fromBrokerAddr = ofNullable(lookupBrokerMasterAddr(brokerAddrTable, fromBrokerName)).orElse(fromQueueRouteData.getBrokerAddr());
+ if (fromBrokerAddr == null) {
+ throw new SubCommandException("unexpected source broker name %s not found", fromBrokerName);
+ }
+ LogicalQueueRouteData toQueueRouteData;
+ RETRY:
+ while (true) {
+ TopicConfig topicConfig = mqAdminExt.examineTopicConfig(toBrokerAddr, topic);
+ LogicalQueuesInfo logicalQueuesInfoInBroker = ofNullable(mqAdminExt.queryTopicLogicalQueueMapping(toBrokerAddr, topic)).orElse(new LogicalQueuesInfo());
+ toQueueRouteData = logicalQueuesInfoInBroker.getOrDefault(logicalQueueIdx, Collections.emptyList()).stream().filter(queueRouteData -> Objects.equals(toBrokerName, queueRouteData.getBrokerName()) && queueRouteData.isWriteOnly()).findFirst().orElse(null);
+ if (toQueueRouteData == null) {
+ Multimap m = Multimaps.index(logicalQueuesInfoInBroker.values().stream().flatMap(Collection::stream).filter(queueRouteData -> Objects.equals(toBrokerName, queueRouteData.getBrokerName())).iterator(), LogicalQueueRouteData::getQueueId);
+ for (int queueId = 0, writeQueueNums = topicConfig.getWriteQueueNums(); queueId < writeQueueNums; queueId++) {
+ if (m.get(queueId).stream().anyMatch(LogicalQueueRouteData::isWritable)) {
+ continue;
+ }
+ try {
+ toQueueRouteData = mqAdminExt.reuseTopicLogicalQueue(toBrokerAddr, topic, queueId, logicalQueueIdx, MessageQueueRouteState.WriteOnly);
+ log.info("reuseTopicLogicalQueue brokerName={} brokerAddr={} queueId={} logicalQueueIdx={} ok: {}", toBrokerName, toBrokerAddr, queueId, logicalQueueIdx, toQueueRouteData);
+ break;
+ } catch (MQBrokerException e) {
+ if ("queue writable".equals(e.getErrorMessage())) {
+ log.info("reuseTopicLogicalQueue brokerName={} brokerAddr={} queueId={} logicalQueueIdx={} writable, try again.", toBrokerName, toBrokerAddr, queueId, logicalQueueIdx);
+ continue RETRY;
+ } else {
+ throw e;
+ }
+ }
+ }
+ }
+ break;
+ }
+ if (toQueueRouteData == null) {
+ toQueueRouteData = mqAdminExt.createMessageQueueForLogicalQueue(toBrokerAddr, topic, logicalQueueIdx, MessageQueueRouteState.WriteOnly);
+ log.info("createMessageQueueForLogicalQueue brokerName={} brokerAddr={} logicalQueueIdx={} ok: {}", toBrokerName, toBrokerAddr, logicalQueueIdx, toQueueRouteData);
+ }
+ MigrateLogicalQueueBody migrateLogicalQueueBody;
+ if (forceDelta == null) {
+ try {
+ migrateLogicalQueueBody = mqAdminExt.migrateTopicLogicalQueuePrepare(fromQueueRouteData, toQueueRouteData);
+ } catch (RemotingConnectException e) {
+ throw new SubCommandException("migrateTopicLogicalQueuePrepare", e);
+ }
+ fromQueueRouteData = migrateLogicalQueueBody.getFromQueueRouteData();
+ toQueueRouteData = migrateLogicalQueueBody.getToQueueRouteData();
+ log.info("migrateTopicLogicalQueuePrepare from {} to {}", fromQueueRouteData, toQueueRouteData);
+ } else {
+ toQueueRouteData.setLogicalQueueDelta(forceDelta);
+ log.warn("migrateTopicLogicalQueuePrepare skip with forceDelta={}", forceDelta);
+ }
+ migrateLogicalQueueBody = mqAdminExt.migrateTopicLogicalQueueCommit(fromQueueRouteData, toQueueRouteData);
+ toQueueRouteData = migrateLogicalQueueBody.getToQueueRouteData();
+ log.info("migrateTopicLogicalQueueCommit got: {}", toQueueRouteData);
+ if (forceDelta == null) {
+ try {
+ mqAdminExt.migrateTopicLogicalQueueNotify(fromBrokerAddr, fromQueueRouteData, toQueueRouteData);
+ } finally {
+ log.info("migrateTopicLogicalQueueNotify fromBroker {} {}", fromQueueRouteData.getBrokerName(), fromBrokerAddr);
+ }
+ }
+ Collection ignoreBrokerNames = Arrays.asList(fromBrokerName, toBrokerName);
+ Set brokerNames = queueRouteDataList.stream()
+ .map(LogicalQueueRouteData::getBrokerName)
+ .filter(v -> !ignoreBrokerNames.contains(v))
+ .map(v -> lookupBrokerMasterAddr(brokerAddrTable, v))
+ .collect(Collectors.toSet());
+ int i = 1;
+ for (String brokerName : brokerNames) {
+ String brokerAddr = null;
+ try {
+ brokerAddr = lookupBrokerMasterAddr(brokerAddrTable, brokerName);
+ mqAdminExt.migrateTopicLogicalQueueNotify(brokerAddr, fromQueueRouteData, toQueueRouteData);
+ } finally {
+ log.info("migrateTopicLogicalQueueNotify otherBroker {}({}}) ({}/{})", brokerName, brokerAddr, i, brokerNames.size());
+ }
+ }
+ }
+
+ @Override public void execute(CommandLine commandLine, Options options,
+ RPCHook rpcHook) throws SubCommandException {
+ DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(rpcHook);
+ mqAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+ String topic = commandLine.getOptionValue("t").trim();
+ String newBrokerName = commandLine.getOptionValue("b").trim();
+ int logicalQueueIdx = Integer.parseInt(commandLine.getOptionValue("i").trim());
+ Long forceDelta = null;
+ if (commandLine.hasOption("fd")) {
+ forceDelta = Long.parseLong(commandLine.getOptionValue("fd").trim());
+ }
+ execute(mqAdminExt, topic, logicalQueueIdx, newBrokerName, forceDelta);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ } finally {
+ mqAdminExt.shutdown();
+ }
+ }
+
+ private static String lookupBrokerMasterAddr(Map brokerAddrTable, String brokerName) {
+ return ofNullable(brokerAddrTable.get(brokerName)).map(BrokerData::selectBrokerAddr).orElse(null);
+ }
+}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/logicalqueue/QueryTopicLogicalQueueMappingCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/logicalqueue/QueryTopicLogicalQueueMappingCommand.java
new file mode 100644
index 0000000000000000000000000000000000000000..6ce4bac35bc11fb34f1bda10702e8827a81cb215
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/logicalqueue/QueryTopicLogicalQueueMappingCommand.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.logicalqueue;
+
+import com.alibaba.fastjson.JSON;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.stream.Collectors;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
+import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QueryTopicLogicalQueueMappingCommand implements SubCommand {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.STDOUT_LOGGER_NAME);
+
+ @Override public String commandName() {
+ return "queryTopicLogicalQueueMapping";
+ }
+
+ @Override public String commandDesc() {
+ return "query logical queue mapping info of a topic";
+ }
+
+ @Override public Options buildCommandlineOptions(Options options) {
+ Option opt;
+
+ opt = new Option("t", "topic", true, "topic name.");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("b", "brokerAddr", true, "broker address.");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("c", "clusterName", true, "cluster name.");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("m", "merge", false, "merge all brokers' result into one.");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override public void execute(CommandLine commandLine, Options options,
+ RPCHook rpcHook) throws SubCommandException {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+ String topic = commandLine.getOptionValue("t").trim();
+ List brokerAddrs;
+ if (commandLine.hasOption("b")) {
+ brokerAddrs = Collections.singletonList(commandLine.getOptionValue("c").trim());
+ } else if (commandLine.hasOption("c")) {
+ String clusterName = commandLine.getOptionValue("c").trim();
+ brokerAddrs = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName).stream().sorted().collect(Collectors.toList());
+ } else {
+ ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+ return;
+ }
+ SortedMap>> result = Maps.newTreeMap();
+ for (String brokerAddr : brokerAddrs) {
+ LogicalQueuesInfo one = defaultMQAdminExt.queryTopicLogicalQueueMapping(brokerAddr, topic);
+ result.put(brokerAddr, ImmutableSortedMap.copyOf(one));
+ }
+ if (commandLine.hasOption("m")) {
+ SortedMap> mergedResultMap = Maps.newTreeMap();
+ result.values().stream().flatMap(map -> map.values().stream()).flatMap(Collection::stream).forEach(queueRouteData -> {
+ List sortedQueueRouteDataList = mergedResultMap.computeIfAbsent(queueRouteData.getLogicalQueueIndex(), ignore -> Lists.newArrayList());
+ int idx = Collections.binarySearch(sortedQueueRouteDataList, queueRouteData,
+ Comparator.comparingLong(LogicalQueueRouteData::getLogicalQueueDelta)
+ .thenComparing(LogicalQueueRouteData::getMessageQueue)
+ .thenComparingInt(LogicalQueueRouteData::getStateOrdinal));
+ if (idx < 0) {
+ idx = -idx - 1;
+ }
+ sortedQueueRouteDataList.add(idx, queueRouteData);
+ });
+ System.out.printf("%s%n", JSON.toJSONString(ImmutableList.copyOf(mergedResultMap.values())));
+ } else {
+ System.out.printf("%s%n", JSON.toJSONString(result));
+ }
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/logicalqueue/UpdateTopicLogicalQueueMappingCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/logicalqueue/UpdateTopicLogicalQueueMappingCommand.java
new file mode 100644
index 0000000000000000000000000000000000000000..46276a31cde0a11ce64ca540eaf80a11bfd9eb5f
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/logicalqueue/UpdateTopicLogicalQueueMappingCommand.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.logicalqueue;
+
+import com.google.common.collect.Maps;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdateTopicLogicalQueueMappingCommand implements SubCommand {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.STDOUT_LOGGER_NAME);
+
+ @Override public String commandName() {
+ return "updateTopicLogicalQueueMapping";
+ }
+
+ @Override public String commandDesc() {
+ return "update logical queue mapping info of a topic";
+ }
+
+ @Override public Options buildCommandlineOptions(Options options) {
+ Option opt;
+
+ opt = new Option("t", "topic", true, "topic name.");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("q", "queue", true, "message queue id.");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("i", "index", true, "logical queue index.");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("b", "broker", true, "broker addr.");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("c", "clusterName", true, "cluster name.");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ public void execute(DefaultMQAdminExt defaultMQAdminExt, String topic, Collection brokerAddrs) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ Map topicConfigMap = Maps.newHashMapWithExpectedSize(brokerAddrs.size());
+ Map allocatedMessageQueueMap = Maps.newHashMap();
+ BitSet allocatedLogicalQueueIndices = new BitSet();
+ brokerAddrs = brokerAddrs.stream().sorted().collect(Collectors.toList());
+ for (String brokerAddr : brokerAddrs) {
+ TopicConfig topicConfig = defaultMQAdminExt.examineTopicConfig(brokerAddr, topic);
+ if (topicConfig == null) {
+ log.warn("examineTopicConfig brokerAddr={} topic={} not exist, skip!", brokerAddr, topic);
+ continue;
+ }
+ topicConfigMap.put(brokerAddr, topicConfig);
+
+ BitSet allocatedQueueIds = new BitSet();
+ Optional.ofNullable(defaultMQAdminExt.queryTopicLogicalQueueMapping(brokerAddr, topic))
+ .ifPresent(queueRouteData -> queueRouteData.forEach((idx, value) -> {
+ allocatedLogicalQueueIndices.set(idx);
+ value.stream().mapToInt(LogicalQueueRouteData::getQueueId).forEach(allocatedQueueIds::set);
+ }));
+ allocatedMessageQueueMap.put(brokerAddr, allocatedQueueIds);
+ }
+ int unallocatedLogicalQueueIdx = -1;
+ for (Map.Entry entry : topicConfigMap.entrySet()) {
+ String brokerAddr = entry.getKey();
+ TopicConfig topicConfig = entry.getValue();
+ int queueNums = Integer.max(topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums());
+ BitSet allocatedQueueIds = allocatedMessageQueueMap.get(brokerAddr);
+ for (int unallocatedQueueId = allocatedQueueIds.nextClearBit(0); unallocatedQueueId < queueNums; unallocatedQueueId = allocatedQueueIds.nextClearBit(unallocatedQueueId + 1)) {
+ unallocatedLogicalQueueIdx = allocatedLogicalQueueIndices.nextClearBit(unallocatedLogicalQueueIdx + 1);
+ log.info("updateTopicLogicalQueueMapping brokerAddr={} topic={} queueId={} to {}", brokerAddr, topic, unallocatedQueueId, unallocatedLogicalQueueIdx);
+ defaultMQAdminExt.updateTopicLogicalQueueMapping(brokerAddr, topic, unallocatedQueueId, unallocatedLogicalQueueIdx);
+ allocatedQueueIds.set(unallocatedQueueId);
+ allocatedLogicalQueueIndices.set(unallocatedLogicalQueueIdx);
+ }
+ }
+ }
+
+ @Override public void execute(CommandLine commandLine, Options options,
+ RPCHook rpcHook) throws SubCommandException {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+ defaultMQAdminExt.start();
+ String topic = commandLine.getOptionValue("t").trim();
+ List brokerAddrs;
+ if (commandLine.hasOption("b")) {
+ String brokerAddr = commandLine.getOptionValue("b").trim();
+ boolean hasQueueId = commandLine.hasOption("q");
+ boolean hasLogicalQueueIndex = commandLine.hasOption("i");
+ if (hasQueueId && hasLogicalQueueIndex) {
+ int queueId = Integer.parseInt(commandLine.getOptionValue("q").trim());
+ int logicalQueueIndex = Integer.parseInt(commandLine.getOptionValue("i").trim());
+ defaultMQAdminExt.updateTopicLogicalQueueMapping(brokerAddr, topic, queueId, logicalQueueIndex);
+ log.info("updateTopicLogicalQueueMapping brokerAddr={} topic={} queueId={} to {}", brokerAddr, topic, queueId, logicalQueueIndex);
+ return;
+ } else if (hasQueueId || hasLogicalQueueIndex) {
+ log.error("logicalQueueIndex and queueId must be specified together.");
+ return;
+ } else {
+ log.error("brokerAddr specified but no logicalQueueIndex and queueId found");
+ return;
+ }
+ } else if (commandLine.hasOption("c")) {
+ String clusterName = commandLine.getOptionValue("c").trim();
+ brokerAddrs = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName).stream().sorted().collect(Collectors.toList());
+ } else {
+ ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+ return;
+ }
+ this.execute(defaultMQAdminExt, topic, brokerAddrs);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/logicalqueue/UpdateTopicLogicalQueueNumCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/logicalqueue/UpdateTopicLogicalQueueNumCommand.java
new file mode 100644
index 0000000000000000000000000000000000000000..6b9dd2fdf829520df218f353200dc77d106279ef
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/logicalqueue/UpdateTopicLogicalQueueNumCommand.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.logicalqueue;
+
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimaps;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
+import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
+import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdateTopicLogicalQueueNumCommand implements SubCommand {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.STDOUT_LOGGER_NAME);
+
+ @Override public String commandName() {
+ return "updateTopicLogicalQueueNum";
+ }
+
+ @Override public String commandDesc() {
+ return "change logical queue num (increase or decrease) of a topic.";
+ }
+
+ @Override public Options buildCommandlineOptions(Options options) {
+ Option opt;
+
+ opt = new Option("t", "topic", true, "topic name.");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("c", "clusterName", true, "cluster name.");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("n", "num", true, "logical queue num.");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override public void execute(CommandLine commandLine, Options options,
+ RPCHook rpcHook) throws SubCommandException {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+ try {
+ defaultMQAdminExt.start();
+ String clusterName = commandLine.getOptionValue("c").trim();
+ String topic = commandLine.getOptionValue("t").trim();
+ int newLogicalQueueNum = Integer.parseUnsignedInt(commandLine.getOptionValue("n"));
+ execute(defaultMQAdminExt, clusterName, topic, newLogicalQueueNum);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+
+ public void execute(DefaultMQAdminExt defaultMQAdminExt, String clusterName, String topic,
+ int newLogicalQueueNum) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, MQBrokerException, InterruptedException, SubCommandException {
+ List brokerAddrs = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName).stream().sorted().collect(Collectors.toList());
+ Map topicConfigsByBrokerAddr = Maps.newHashMapWithExpectedSize(brokerAddrs.size());
+ NavigableMap> allLogicalQueueMapByIndex = Maps.newTreeMap();
+ Map allLogicalQueueMapByBroker = Maps.newHashMap();
+ for (String brokerAddr : brokerAddrs) {
+ TopicConfig topicConfig = defaultMQAdminExt.examineTopicConfig(brokerAddr, topic);
+ if (topicConfig == null) {
+ log.info("examineTopicConfig brokerAddr={} topic={} not exist, skip!", brokerAddr, topic);
+ continue;
+ }
+ topicConfigsByBrokerAddr.put(brokerAddr, topicConfig);
+
+ LogicalQueuesInfo logicalQueuesInfo = defaultMQAdminExt.queryTopicLogicalQueueMapping(brokerAddr, topic);
+ if (logicalQueuesInfo == null) {
+ throw new SubCommandException(String.format(Locale.ENGLISH, "broker=%s topic=%s logical queue not enabled", brokerAddr, topic));
+ }
+ allLogicalQueueMapByBroker.put(brokerAddr, logicalQueuesInfo);
+ logicalQueuesInfo.values().stream().flatMap(Collection::stream).forEach(queueRouteData -> {
+ List sortedQueueRouteDataList = allLogicalQueueMapByIndex.computeIfAbsent(queueRouteData.getLogicalQueueIndex(), ignore -> Lists.newArrayListWithExpectedSize(1));
+ int idx = Collections.binarySearch(sortedQueueRouteDataList, queueRouteData,
+ Comparator.comparingLong(LogicalQueueRouteData::getLogicalQueueDelta)
+ .thenComparing(LogicalQueueRouteData::getMessageQueue)
+ .thenComparingInt(LogicalQueueRouteData::getStateOrdinal));
+ if (idx < 0) {
+ idx = -idx - 1;
+ }
+ sortedQueueRouteDataList.add(idx, queueRouteData);
+ });
+ }
+ int oldLogicalQueueNum = (int) allLogicalQueueMapByIndex.values().stream().filter(queueRouteDataList -> queueRouteDataList.stream().anyMatch(LogicalQueueRouteData::isWritable)).count();
+ if (oldLogicalQueueNum == newLogicalQueueNum) {
+ log.info("logical queue num not changed!");
+ } else if (oldLogicalQueueNum < newLogicalQueueNum) {
+ increaseLogicalQueueNum(defaultMQAdminExt, allLogicalQueueMapByBroker, allLogicalQueueMapByIndex, topicConfigsByBrokerAddr, oldLogicalQueueNum, newLogicalQueueNum);
+ } else {
+ decreaseLogicalQueueNum(defaultMQAdminExt, allLogicalQueueMapByIndex, oldLogicalQueueNum, newLogicalQueueNum);
+ }
+ }
+
+ private void increaseLogicalQueueNum(DefaultMQAdminExt defaultMQAdminExt,
+ Map allLogicalQueuesInfoMapByBroker,
+ NavigableMap> allLogicalQueueMapByIndex,
+ Map topicConfigsByBrokerAddr, int oldLogicalQueueNum,
+ int newLogicalQueueNum) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ int curLogicalQueueNum = oldLogicalQueueNum;
+ String topic = topicConfigsByBrokerAddr.values().stream().findAny().map(TopicConfig::getTopicName).get();
+ // try use queue not be assigned as logical queue
+ for (Map.Entry e : topicConfigsByBrokerAddr.entrySet()) {
+ String brokerAddr = e.getKey();
+ TopicConfig topicConfig = e.getValue();
+ LogicalQueuesInfo logicalQueuesInfo = allLogicalQueuesInfoMapByBroker.getOrDefault(brokerAddr, new LogicalQueuesInfo());
+ ListMultimap m = Multimaps.index(logicalQueuesInfo.values().stream().flatMap(Collection::stream).iterator(), LogicalQueueRouteData::getQueueId);
+ for (int queueId = 0, writeQueueNums = topicConfig.getWriteQueueNums(); queueId < writeQueueNums; queueId++) {
+ if (m.get(queueId).stream().anyMatch(LogicalQueueRouteData::isWritable)) {
+ continue;
+ }
+ int logicalQueueIdx = curLogicalQueueNum;
+ LogicalQueueRouteData queueRouteData;
+ try {
+ queueRouteData = defaultMQAdminExt.reuseTopicLogicalQueue(brokerAddr, topic, queueId, logicalQueueIdx, MessageQueueRouteState.Normal);
+ } finally {
+ log.info("updateTopicLogicalQueueMapping reuse expired message queue from sealing logical queue brokerAddr={} queueId={} logicalQueueIdx={}", brokerAddr, queueId, logicalQueueIdx);
+ }
+ curLogicalQueueNum++;
+ if (curLogicalQueueNum >= newLogicalQueueNum) {
+ return;
+ }
+ allLogicalQueueMapByIndex.computeIfAbsent(logicalQueueIdx, integer -> Lists.newArrayListWithExpectedSize(1)).add(queueRouteData);
+ logicalQueuesInfo.computeIfAbsent(logicalQueueIdx, integer -> Lists.newArrayListWithExpectedSize(1)).add(queueRouteData);
+ }
+ }
+ // try reuse still sealing logical queue
+ for (Map.Entry> entry : allLogicalQueueMapByIndex.entrySet()) {
+ List queueRouteDataList = entry.getValue();
+ if (queueRouteDataList.size() == 0 || queueRouteDataList.stream().anyMatch(LogicalQueueRouteData::isWritable)) {
+ continue;
+ }
+ int logicalQueueIdx = entry.getKey();
+ // this is a sealing logical queue
+ LogicalQueueRouteData queueRouteData = queueRouteDataList.get(queueRouteDataList.size() - 1);
+ String brokerAddr = queueRouteData.getBrokerAddr();
+ List queueRouteDataListByBroker = allLogicalQueuesInfoMapByBroker.get(brokerAddr).get(logicalQueueIdx);
+ if (queueRouteData.isExpired()) {
+ int queueId = queueRouteData.getQueueId();
+ try {
+ queueRouteData = defaultMQAdminExt.reuseTopicLogicalQueue(brokerAddr, topic, queueId, logicalQueueIdx, MessageQueueRouteState.Normal);
+ } finally {
+ log.info("updateTopicLogicalQueueMapping reuse expired message queue from sealing logical queue brokerAddr={} queueId={} logicalQueueIdx={}", brokerAddr, queueId, logicalQueueIdx);
+ }
+ queueRouteDataList.add(queueRouteData);
+ queueRouteDataListByBroker.add(queueRouteData);
+ } else {
+ // create a message queue in last broker
+ // not expired message queue can not be reused, since delta value will not be described by one `long`
+ int queueId = -1;
+ try {
+ queueRouteData = defaultMQAdminExt.createMessageQueueForLogicalQueue(brokerAddr, topic, logicalQueueIdx, MessageQueueRouteState.Normal);
+ queueId = queueRouteData.getQueueId();
+ } finally {
+ log.info("updateTopicLogicalQueueMapping create message queue from sealing logical queue brokerAddr={} queueId={} logicalQueueIdx={}", brokerAddr, queueId, logicalQueueIdx);
+ }
+ queueRouteDataList.add(queueRouteData);
+ queueRouteDataListByBroker.add(queueRouteData);
+ topicConfigsByBrokerAddr.get(brokerAddr).setWriteQueueNums(queueId + 1);
+ }
+ curLogicalQueueNum++;
+ if (curLogicalQueueNum >= newLogicalQueueNum) {
+ return;
+ }
+ }
+ // try broker already with expired message queue
+ for (Map.Entry entry : allLogicalQueuesInfoMapByBroker.entrySet()) {
+ String brokerAddr = entry.getKey();
+ for (Iterator it = entry.getValue().values().stream().flatMap(Collection::stream)
+ .filter(LogicalQueueRouteData::isExpired)
+ .sorted(Comparator.comparingInt(LogicalQueueRouteData::getLogicalQueueIndex).thenComparingInt(LogicalQueueRouteData::getQueueId))
+ .limit(newLogicalQueueNum - curLogicalQueueNum)
+ .iterator(); it.hasNext(); ) {
+ LogicalQueueRouteData queueRouteData = it.next();
+ try {
+ LogicalQueueRouteData result = defaultMQAdminExt.reuseTopicLogicalQueue(brokerAddr, topic, queueRouteData.getQueueId(), queueRouteData.getLogicalQueueIndex(), MessageQueueRouteState.Normal);
+ // modify in-place
+ queueRouteData.copyFrom(result);
+ } finally {
+ log.info("updateTopicLogicalQueueMapping reuse expired message queue from sealing logical queue brokerAddr={} queueId={} logicalQueueIdx={}", brokerAddr, queueRouteData.getQueueId(), queueRouteData.getLogicalQueueIndex());
+ }
+ allLogicalQueueMapByIndex.get(queueRouteData.getLogicalQueueIndex()).stream()
+ .filter(LogicalQueueRouteData::isExpired)
+ .filter(v -> Objects.equals(brokerAddr, v.getBrokerAddr()) && queueRouteData.getQueueId() == v.getQueueId() && queueRouteData.getLogicalQueueIndex() == v.getLogicalQueueIndex())
+ .forEach(v -> v.copyFrom(queueRouteData));
+ curLogicalQueueNum++;
+ if (curLogicalQueueNum >= newLogicalQueueNum) {
+ return;
+ }
+ }
+ }
+
+ // try broker with least amount message queue, if amount equal, random select
+ for (; curLogicalQueueNum < newLogicalQueueNum; curLogicalQueueNum++) {
+ Map.Entry entry = allLogicalQueuesInfoMapByBroker.entrySet().stream().min(Comparator.comparingInt(value -> value.getValue().values().stream().flatMapToInt(l -> IntStream.of(l.size())).sum())).get();
+ String brokerAddr = entry.getKey();
+ int logicalQueueIdx = curLogicalQueueNum;
+ int queueId = -1;
+ LogicalQueueRouteData queueRouteData;
+ try {
+ queueRouteData = defaultMQAdminExt.createMessageQueueForLogicalQueue(brokerAddr, topic, logicalQueueIdx, MessageQueueRouteState.Normal);
+ queueId = queueRouteData.getQueueId();
+ } finally {
+ log.info("updateTopicLogicalQueueMapping create message queue from fresh brokerAddr={} queueId={} logicalQueueIdx={}", brokerAddr, queueId, logicalQueueIdx);
+ }
+ entry.getValue().put(logicalQueueIdx, Lists.newArrayList(queueRouteData));
+ }
+ }
+
+ private void decreaseLogicalQueueNum(DefaultMQAdminExt defaultMQAdminExt,
+ NavigableMap> allLogicalQueueMapByIndex,
+ int oldLogicalQueueNum,
+ int newLogicalQueueNum) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException, SubCommandException {
+ // seal logical queue from greatest index
+ Map.Entry> maxActiveEntry = allLogicalQueueMapByIndex.lastEntry();
+ int curLogicalQueueNum = oldLogicalQueueNum;
+ while (curLogicalQueueNum > newLogicalQueueNum) {
+ boolean anyQueueSealed = false;
+ for (LogicalQueueRouteData queueRouteData : maxActiveEntry.getValue()) {
+ if (queueRouteData.isWritable()) {
+ anyQueueSealed = true;
+ LogicalQueueRouteData resultQueueRouteData = queueRouteData;
+ try {
+ resultQueueRouteData = defaultMQAdminExt.sealTopicLogicalQueue(queueRouteData.getBrokerAddr(), queueRouteData);
+ } finally {
+ log.info("seal message queue: {}", resultQueueRouteData);
+ }
+ }
+ }
+ if (anyQueueSealed) {
+ curLogicalQueueNum--;
+ }
+ maxActiveEntry = allLogicalQueueMapByIndex.lowerEntry(maxActiveEntry.getKey());
+ if (maxActiveEntry == null) {
+ throw new SubCommandException(String.format(Locale.ENGLISH, "oldLogicalQueueNum=%d newLogicalQueueNum=%d curLogicalQueueNum=%d but can not find lowerEntry, unexpected situation", oldLogicalQueueNum, newLogicalQueueNum, curLogicalQueueNum));
+ }
+ }
+ }
+}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java
index c33ae52dbd3151acf83d41844fc719a0e4118ea0..b7f5379d36e996886d44ed2f181c324c8717fab4 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
+import org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueMappingCommand;
public class UpdateTopicSubCommand implements SubCommand {
@@ -67,6 +68,10 @@ public class UpdateTopicSubCommand implements SubCommand {
opt.setRequired(false);
options.addOption(opt);
+ opt = new Option("lq", "logicalQueue", true, "set logical queue nums");
+ opt.setRequired(false);
+ options.addOption(opt);
+
opt = new Option("p", "perm", true, "set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]");
opt.setRequired(false);
options.addOption(opt);
@@ -132,7 +137,17 @@ public class UpdateTopicSubCommand implements SubCommand {
}
topicConfig.setOrder(isOrder);
+ boolean useLogicalQueue = false;
+ if (commandLine.hasOption("lq")) {
+ useLogicalQueue = Boolean.parseBoolean(commandLine.getOptionValue("lq").trim());
+ }
+
if (commandLine.hasOption('b')) {
+ if (useLogicalQueue) {
+ System.out.printf("-lq and -b can not be used together.%n");
+ return;
+ }
+
String addr = commandLine.getOptionValue('b').trim();
defaultMQAdminExt.start();
@@ -156,6 +171,7 @@ public class UpdateTopicSubCommand implements SubCommand {
Set masterSet =
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+
for (String addr : masterSet) {
defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
System.out.printf("create topic to %s success.%n", addr);
@@ -177,6 +193,10 @@ public class UpdateTopicSubCommand implements SubCommand {
}
System.out.printf("%s", topicConfig);
+
+ if (useLogicalQueue) {
+ new UpdateTopicLogicalQueueMappingCommand().execute(defaultMQAdminExt, topicConfig.getTopicName(), masterSet);
+ }
return;
}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
index 3146b1781154792a2d7928e0306e9cf172b5ddac..e76095480fbe6e2dd629f9697486944923662da2 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
@@ -34,6 +34,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.admin.TopicOffset;
@@ -59,6 +60,9 @@ import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
+import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
+import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
@@ -68,6 +72,8 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
+import org.assertj.core.util.Lists;
+import org.assertj.core.util.Maps;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -75,14 +81,24 @@ import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class DefaultMQAdminExtTest {
+ private static final String broker1Addr = "127.0.0.1:10911";
+ private static final String broker1Name = "default-broker";
+ private static final String cluster = "default-cluster";
+ private static final String broker2Name = "broker-test";
+ private static final String broker2Addr = "127.0.0.2:10911";
+ private static final String topic1 = "topic_one";
+ private static final String topic2 = "topic_two";
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
@@ -115,33 +131,40 @@ public class DefaultMQAdminExtTest {
when(mQClientAPIImpl.getBrokerConfig(anyString(), anyLong())).thenReturn(properties);
Set topicSet = new HashSet<>();
- topicSet.add("topic_one");
- topicSet.add("topic_two");
+ topicSet.add(topic1);
+ topicSet.add(topic2);
topicList.setTopicList(topicSet);
when(mQClientAPIImpl.getTopicListFromNameServer(anyLong())).thenReturn(topicList);
List brokerDatas = new ArrayList<>();
HashMap brokerAddrs = new HashMap<>();
- brokerAddrs.put(1234l, "127.0.0.1:10911");
+ brokerAddrs.put(MixAll.MASTER_ID, broker1Addr);
BrokerData brokerData = new BrokerData();
- brokerData.setCluster("default-cluster");
- brokerData.setBrokerName("default-broker");
+ brokerData.setCluster(cluster);
+ brokerData.setBrokerName(broker1Name);
brokerData.setBrokerAddrs(brokerAddrs);
brokerDatas.add(brokerData);
+ brokerDatas.add(new BrokerData(cluster, broker2Name, (HashMap) Maps.newHashMap(MixAll.MASTER_ID, broker2Addr)));
topicRouteData.setBrokerDatas(brokerDatas);
topicRouteData.setQueueDatas(new ArrayList());
topicRouteData.setFilterServerTable(new HashMap>());
- when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);
+ LogicalQueuesInfo logicalQueuesInfoinfo = new LogicalQueuesInfo();
+ logicalQueuesInfoinfo.put(0, Lists.newArrayList(
+ new LogicalQueueRouteData(0, 0, new MessageQueue(topic1, broker1Name, 0), MessageQueueRouteState.ReadOnly, 0, 1000, 2000, 3000, broker1Addr),
+ new LogicalQueueRouteData(0, 1000, new MessageQueue(topic1, broker2Name, 0), MessageQueueRouteState.Normal, 0, -1, -1, -1, broker2Addr)
+ ));
+ topicRouteData.setLogicalQueuesInfo(logicalQueuesInfoinfo);
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), any())).thenReturn(topicRouteData);
HashMap result = new HashMap<>();
- result.put("id", "1234");
- result.put("brokerName", "default-broker");
+ result.put("id", String.valueOf(MixAll.MASTER_ID));
+ result.put("brokerName", broker1Name);
kvTable.setTable(result);
when(mQClientAPIImpl.getBrokerRuntimeInfo(anyString(), anyLong())).thenReturn(kvTable);
HashMap brokerAddrTable = new HashMap<>();
- brokerAddrTable.put("default-broker", brokerData);
- brokerAddrTable.put("broker-test", new BrokerData());
+ brokerAddrTable.put(broker1Name, brokerData);
+ brokerAddrTable.put(broker2Name, new BrokerData());
clusterInfo.setBrokerAddrTable(brokerAddrTable);
clusterInfo.setClusterAddrTable(new HashMap>());
when(mQClientAPIImpl.getBrokerClusterInfo(anyLong())).thenReturn(clusterInfo);
@@ -251,7 +274,7 @@ public class DefaultMQAdminExtTest {
@Test
public void testFetchBrokerRuntimeStats() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
KVTable brokerStats = defaultMQAdminExt.fetchBrokerRuntimeStats("127.0.0.1:10911");
- assertThat(brokerStats.getTable().get("id")).isEqualTo("1234");
+ assertThat(brokerStats.getTable().get("id")).isEqualTo(String.valueOf(MixAll.MASTER_ID));
assertThat(brokerStats.getTable().get("brokerName")).isEqualTo("default-broker");
}
@@ -277,7 +300,7 @@ public class DefaultMQAdminExtTest {
@Test
public void testExamineConsumeStats() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats("default-consumer-group", "unit-test");
- assertThat(consumeStats.getConsumeTps()).isEqualTo(1234);
+ assertThat(consumeStats.getConsumeTps()).isGreaterThanOrEqualTo(1234);
}
@Test
@@ -406,4 +429,32 @@ public class DefaultMQAdminExtTest {
assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getGroupName()).isEqualTo("Consumer-group-one");
assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").isConsumeBroadcastEnable()).isTrue();
}
+
+ @Test
+ public void testMaxOffset() throws Exception {
+ when(mQClientAPIImpl.getMaxOffset(anyString(), anyString(), anyInt(), anyBoolean(), anyBoolean(), anyLong())).thenReturn(100L);
+
+ assertThat(defaultMQAdminExt.maxOffset(new MessageQueue(topic1, broker1Name, 0))).isEqualTo(100L);
+ }
+
+ @Test
+ public void testSearchOffset() throws Exception {
+ when(mQClientAPIImpl.searchOffset(anyString(), anyString(), anyInt(), anyLong(), anyLong())).thenReturn(101L);
+
+ assertThat(defaultMQAdminExt.searchOffset(new MessageQueue(topic1, broker1Name, 0), System.currentTimeMillis())).isEqualTo(101L);
+ }
+
+ @Test
+ public void testMaxOffset_LogicalQueue() throws Exception {
+ when(mQClientAPIImpl.getMaxOffset(eq(broker2Addr), anyString(), anyInt(), anyBoolean(), anyBoolean(), anyLong())).thenReturn(10L);
+
+ assertThat(defaultMQAdminExt.maxOffset(new MessageQueue(topic1, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, 0))).isEqualTo(1010L);
+ }
+
+ @Test
+ public void testSearchOffset_LogicalQueue() throws Exception {
+ when(mQClientAPIImpl.searchOffset(eq(broker2Addr), anyString(), anyInt(), anyLong(), anyLong())).thenReturn(11L);
+
+ assertThat(defaultMQAdminExt.searchOffset(new MessageQueue(topic1, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, 0), System.currentTimeMillis())).isEqualTo(1011L);
+ }
}
\ No newline at end of file