提交 f3990857 编写于 作者: C chenzlalvin

[RIP-21] submodule tools

上级 acbc0be0
......@@ -66,6 +66,11 @@
</encoder>
</appender>
<logger name="STDOUT" additivity="false">
<level value="INFO"/>
<appender-ref ref="STDOUT"/>
</logger>
<logger name="RocketmqTools" additivity="false">
<level value="INFO"/>
<appender-ref ref="RocketmqToolsAppender"/>
......
......@@ -64,5 +64,9 @@
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
</dependencies>
</project>
......@@ -43,12 +43,16 @@ import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.MigrateLogicalQueueBody;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.topic.TopicValidator;
......@@ -209,7 +213,8 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
public TopicConfig examineTopicConfig(String addr, String topic) {
public TopicConfig examineTopicConfig(String addr,
String topic) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, MQBrokerException, InterruptedException {
return defaultMQAdminExtImpl.examineTopicConfig(addr, topic);
}
......@@ -578,4 +583,52 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
RemotingConnectException, MQClientException {
this.defaultMQAdminExtImpl.setMessageRequestMode(brokerAddr, topic, consumerGroup, mode, popShareQueueNum, timeoutMillis);
}
@Override
public void updateTopicLogicalQueueMapping(String brokerAddr, String topic, int queueId, int logicalQueueIndex) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
this.defaultMQAdminExtImpl.updateTopicLogicalQueueMapping(brokerAddr, topic, queueId, logicalQueueIndex);
}
@Override
public LogicalQueuesInfo queryTopicLogicalQueueMapping(String brokerAddr, String topic) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException {
return this.defaultMQAdminExtImpl.queryTopicLogicalQueueMapping(brokerAddr, topic);
}
@Override
public void deleteTopicLogicalQueueMapping(String brokerAddr, String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
this.defaultMQAdminExtImpl.deleteTopicLogicalQueueMapping(brokerAddr, topic);
}
@Override
public LogicalQueueRouteData sealTopicLogicalQueue(String brokerAddr, LogicalQueueRouteData queueRouteData) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException {
return this.defaultMQAdminExtImpl.sealTopicLogicalQueue(brokerAddr, queueRouteData);
}
@Override public LogicalQueueRouteData reuseTopicLogicalQueue(String brokerAddr, String topic, int queueId,
int logicalQueueIdx, MessageQueueRouteState messageQueueRouteState) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
return this.defaultMQAdminExtImpl.reuseTopicLogicalQueue(brokerAddr, topic, queueId, logicalQueueIdx, messageQueueRouteState);
}
@Override public LogicalQueueRouteData createMessageQueueForLogicalQueue(String brokerAddr, String topic,
int logicalQueueIdx, MessageQueueRouteState messageQueueStatus) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
return this.defaultMQAdminExtImpl.createMessageQueueForLogicalQueue(brokerAddr, topic, logicalQueueIdx, messageQueueStatus);
}
@Override public MigrateLogicalQueueBody migrateTopicLogicalQueuePrepare(
LogicalQueueRouteData fromQueueRouteData,
LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
return this.defaultMQAdminExtImpl.migrateTopicLogicalQueuePrepare(fromQueueRouteData, toQueueRouteData);
}
@Override public MigrateLogicalQueueBody migrateTopicLogicalQueueCommit(
LogicalQueueRouteData fromQueueRouteData,
LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
return this.defaultMQAdminExtImpl.migrateTopicLogicalQueueCommit(fromQueueRouteData, toQueueRouteData);
}
@Override public void migrateTopicLogicalQueueNotify(String brokerAddr,
LogicalQueueRouteData fromQueueRouteData,
LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
this.defaultMQAdminExtImpl.migrateTopicLogicalQueueNotify(brokerAddr, fromQueueRouteData, toQueueRouteData);
}
}
......@@ -66,6 +66,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.MigrateLogicalQueueBody;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
......@@ -75,6 +76,9 @@ import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
......@@ -223,8 +227,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
public TopicConfig examineTopicConfig(String addr, String topic) {
return null;
public TopicConfig examineTopicConfig(String addr, String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
return this.mqClientInstance.getMQClientAPIImpl().getTopicConfig(addr, topic, timeoutMillis);
}
@Override
......@@ -325,6 +329,54 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
return this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(topic, msgId);
}
@Override
public void updateTopicLogicalQueueMapping(String brokerAddr, String topic, int queueId, int logicalQueueIndex) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
this.mqClientInstance.getMQClientAPIImpl().updateTopicLogicalQueue(brokerAddr, topic, queueId, logicalQueueIndex, timeoutMillis);
}
@Override
public LogicalQueuesInfo queryTopicLogicalQueueMapping(String brokerAddr, String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
return this.mqClientInstance.getMQClientAPIImpl().queryTopicLogicalQueue(brokerAddr, topic, timeoutMillis);
}
@Override
public void deleteTopicLogicalQueueMapping(String brokerAddr, String topic) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException {
this.mqClientInstance.getMQClientAPIImpl().deleteTopicLogicalQueueMapping(brokerAddr, topic, timeoutMillis);
}
@Override
public LogicalQueueRouteData sealTopicLogicalQueue(String brokerAddr, LogicalQueueRouteData queueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
return this.mqClientInstance.getMQClientAPIImpl().sealTopicLogicalQueue(brokerAddr, queueRouteData, timeoutMillis);
}
@Override public LogicalQueueRouteData reuseTopicLogicalQueue(String brokerAddr, String topic, int queueId,
int logicalQueueIdx, MessageQueueRouteState messageQueueRouteState) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
return this.mqClientInstance.getMQClientAPIImpl().reuseTopicLogicalQueue(brokerAddr, topic, queueId, logicalQueueIdx, messageQueueRouteState, timeoutMillis);
}
@Override public LogicalQueueRouteData createMessageQueueForLogicalQueue(String brokerAddr, String topic,
int logicalQueueIdx, MessageQueueRouteState messageQueueStatus) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
return this.mqClientInstance.getMQClientAPIImpl().createMessageQueueForLogicalQueue(brokerAddr, topic, logicalQueueIdx, messageQueueStatus, timeoutMillis);
}
@Override public MigrateLogicalQueueBody migrateTopicLogicalQueuePrepare(
LogicalQueueRouteData fromQueueRouteData,
LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
return this.mqClientInstance.getMQClientAPIImpl().migrateTopicLogicalQueuePrepare(fromQueueRouteData.getBrokerAddr(), fromQueueRouteData, toQueueRouteData, timeoutMillis);
}
@Override public MigrateLogicalQueueBody migrateTopicLogicalQueueCommit(
LogicalQueueRouteData fromQueueRouteData,
LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
return this.mqClientInstance.getMQClientAPIImpl().migrateTopicLogicalQueueCommit(toQueueRouteData.getBrokerAddr(), fromQueueRouteData, toQueueRouteData, timeoutMillis);
}
@Override public void migrateTopicLogicalQueueNotify(String brokerAddr,
LogicalQueueRouteData fromQueueRouteData,
LogicalQueueRouteData toQueueRouteData) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException {
this.mqClientInstance.getMQClientAPIImpl().migrateTopicLogicalQueueNotify(brokerAddr, fromQueueRouteData, toQueueRouteData, timeoutMillis);
}
@Override
public ConsumerConnection examineConsumerConnectionInfo(
String consumerGroup) throws InterruptedException, MQBrokerException,
......@@ -982,6 +1034,10 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
return this.mqClientInstance.getMQAdminImpl().maxOffset(mq);
}
public long maxOffset(MessageQueue mq, boolean committed) throws MQClientException {
return this.mqClientInstance.getMQAdminImpl().maxOffset(mq, committed);
}
@Override
public long minOffset(MessageQueue mq) throws MQClientException {
return this.mqClientInstance.getMQAdminImpl().minOffset(mq);
......
......@@ -42,6 +42,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.MigrateLogicalQueueBody;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
......@@ -49,6 +50,9 @@ import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
......@@ -96,7 +100,8 @@ public interface MQAdminExt extends MQAdmin {
SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group);
TopicConfig examineTopicConfig(final String addr, final String topic);
TopicConfig examineTopicConfig(final String addr,
final String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
TopicStatsTable examineTopicStats(
final String topic) throws RemotingException, MQClientException, InterruptedException,
......@@ -295,4 +300,28 @@ public interface MQAdminExt extends MQAdmin {
MessageRequestMode mode, final int popWorkGroupSize, final long timeoutMillis)
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQClientException;
void updateTopicLogicalQueueMapping(String brokerAddr, String topic, int queueId, int logicalQueueIndex) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException;
LogicalQueuesInfo queryTopicLogicalQueueMapping(String brokerAddr, String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
void deleteTopicLogicalQueueMapping(String brokerAddr, String topic) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException;
LogicalQueueRouteData sealTopicLogicalQueue(String brokerAddr, LogicalQueueRouteData queueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
LogicalQueueRouteData reuseTopicLogicalQueue(String brokerAddr, String topic, int queueId, int logicalQueueIdx,
MessageQueueRouteState messageQueueRouteState) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
LogicalQueueRouteData createMessageQueueForLogicalQueue(String brokerAddr, String topic, int logicalQueueIdx,
MessageQueueRouteState messageQueueStatus) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
MigrateLogicalQueueBody migrateTopicLogicalQueuePrepare(LogicalQueueRouteData fromQueueRouteData, LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
MigrateLogicalQueueBody migrateTopicLogicalQueueCommit(
LogicalQueueRouteData fromQueueRouteData,
LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
void migrateTopicLogicalQueueNotify(String brokerAddr,
LogicalQueueRouteData fromQueueRouteData,
LogicalQueueRouteData toQueueRouteData) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException;
}
......@@ -52,6 +52,11 @@ import org.apache.rocketmq.tools.command.consumer.DeleteSubscriptionGroupCommand
import org.apache.rocketmq.tools.command.consumer.SetConsumeModeSubCommand;
import org.apache.rocketmq.tools.command.consumer.StartMonitoringSubCommand;
import org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand;
import org.apache.rocketmq.tools.command.logicalqueue.DeleteTopicLogicalQueueMappingCommand;
import org.apache.rocketmq.tools.command.logicalqueue.MigrateTopicLogicalQueueCommand;
import org.apache.rocketmq.tools.command.logicalqueue.QueryTopicLogicalQueueMappingCommand;
import org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueMappingCommand;
import org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueNumCommand;
import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand;
import org.apache.rocketmq.tools.command.message.ConsumeMessageCommand;
import org.apache.rocketmq.tools.command.message.PrintMessageByQueueCommand;
......@@ -215,6 +220,12 @@ public class MQAdminStartup {
initCommand(new ClusterAclConfigVersionListSubCommand());
initCommand(new UpdateGlobalWhiteAddrSubCommand());
initCommand(new GetAccessConfigSubCommand());
initCommand(new UpdateTopicLogicalQueueMappingCommand());
initCommand(new DeleteTopicLogicalQueueMappingCommand());
initCommand(new QueryTopicLogicalQueueMappingCommand());
initCommand(new MigrateTopicLogicalQueueCommand());
initCommand(new UpdateTopicLogicalQueueNumCommand());
}
private static void initLogback() throws JoranException {
......
......@@ -26,6 +26,10 @@ public class SubCommandException extends Exception {
super(msg);
}
public SubCommandException(String format, Object... args) {
super(String.format(format, args));
}
/**
* @param msg Message.
* @param cause Cause.
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.logicalqueue;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DeleteTopicLogicalQueueMappingCommand implements SubCommand {
private static final Logger log = LoggerFactory.getLogger(LoggerName.STDOUT_LOGGER_NAME);
@Override public String commandName() {
return "deleteTopicLogicalQueueMapping";
}
@Override public String commandDesc() {
return "delete logical queue mapping info of a topic";
}
@Override public Options buildCommandlineOptions(Options options) {
Option opt;
opt = new Option("t", "topic", true, "topic name.");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("c", "clusterName", true, "cluster name.");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("b", "brokerAddr", true, "broker addr.");
opt.setRequired(false);
options.addOption(opt);
return options;
}
@Override public void execute(CommandLine commandLine, Options options,
RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
String topic = commandLine.getOptionValue("t").trim();
List<String> brokerAddrs;
if (commandLine.hasOption("b")) {
brokerAddrs = Collections.singletonList(commandLine.getOptionValue("c").trim());
} else if (commandLine.hasOption("c")) {
String clusterName = commandLine.getOptionValue("c").trim();
brokerAddrs = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName).stream().sorted().collect(Collectors.toList());
} else {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
return;
}
for (String brokerAddr : brokerAddrs) {
log.info("deleteTopicLogicalQueueMapping {} {}", brokerAddr, topic);
defaultMQAdminExt.deleteTopicLogicalQueueMapping(brokerAddr, topic);
}
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.logicalqueue;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.body.MigrateLogicalQueueBody;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Optional.ofNullable;
public class MigrateTopicLogicalQueueCommand implements SubCommand {
private static final Logger log = LoggerFactory.getLogger(LoggerName.STDOUT_LOGGER_NAME);
@Override public String commandName() {
return "migrateTopicLogicalQueue";
}
@Override public String commandDesc() {
return "migrate a logical queue of a topic from one broker to another.";
}
@Override public Options buildCommandlineOptions(Options options) {
Option opt;
opt = new Option("t", "topic", true, "topic name.");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("i", "index", true, "logical queue index.");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("b", "brokerAddr", true, "new broker name");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("fd", "forceDelta", true, "assume fromBroker down, force migrate");
opt.setRequired(false);
options.addOption(opt);
return options;
}
public void execute(DefaultMQAdminExt mqAdminExt, String topic, int logicalQueueIdx,
String toBrokerName,
Long forceDelta) throws RemotingException, MQBrokerException, InterruptedException, SubCommandException, MQClientException {
TopicRouteData topicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic);
LogicalQueuesInfo logicalQueuesInfo = topicRouteInfo.getLogicalQueuesInfo();
if (logicalQueuesInfo == null) {
throw new SubCommandException("topic not enabled logical queue");
}
List<LogicalQueueRouteData> queueRouteDataList = logicalQueuesInfo.get(logicalQueueIdx);
if (queueRouteDataList == null) {
throw new SubCommandException("logical queue %d not exist", logicalQueueIdx);
}
Map<String, BrokerData> brokerAddrTable = mqAdminExt.examineBrokerClusterInfo().getBrokerAddrTable();
String toBrokerAddr = lookupBrokerMasterAddr(brokerAddrTable, toBrokerName);
if (toBrokerAddr == null) {
throw new SubCommandException("destination broker %s not found", toBrokerName);
}
LogicalQueueRouteData fromQueueRouteData = queueRouteDataList.stream().filter(LogicalQueueRouteData::isWritable).reduce((first, second) -> second).orElse(null);
if (fromQueueRouteData == null) {
throw new SubCommandException("logical queue %d not writable, can not migrate", logicalQueueIdx);
}
String fromBrokerName = fromQueueRouteData.getBrokerName();
String fromBrokerAddr = ofNullable(lookupBrokerMasterAddr(brokerAddrTable, fromBrokerName)).orElse(fromQueueRouteData.getBrokerAddr());
if (fromBrokerAddr == null) {
throw new SubCommandException("unexpected source broker name %s not found", fromBrokerName);
}
LogicalQueueRouteData toQueueRouteData;
RETRY:
while (true) {
TopicConfig topicConfig = mqAdminExt.examineTopicConfig(toBrokerAddr, topic);
LogicalQueuesInfo logicalQueuesInfoInBroker = ofNullable(mqAdminExt.queryTopicLogicalQueueMapping(toBrokerAddr, topic)).orElse(new LogicalQueuesInfo());
toQueueRouteData = logicalQueuesInfoInBroker.getOrDefault(logicalQueueIdx, Collections.emptyList()).stream().filter(queueRouteData -> Objects.equals(toBrokerName, queueRouteData.getBrokerName()) && queueRouteData.isWriteOnly()).findFirst().orElse(null);
if (toQueueRouteData == null) {
Multimap<Integer, LogicalQueueRouteData> m = Multimaps.index(logicalQueuesInfoInBroker.values().stream().flatMap(Collection::stream).filter(queueRouteData -> Objects.equals(toBrokerName, queueRouteData.getBrokerName())).iterator(), LogicalQueueRouteData::getQueueId);
for (int queueId = 0, writeQueueNums = topicConfig.getWriteQueueNums(); queueId < writeQueueNums; queueId++) {
if (m.get(queueId).stream().anyMatch(LogicalQueueRouteData::isWritable)) {
continue;
}
try {
toQueueRouteData = mqAdminExt.reuseTopicLogicalQueue(toBrokerAddr, topic, queueId, logicalQueueIdx, MessageQueueRouteState.WriteOnly);
log.info("reuseTopicLogicalQueue brokerName={} brokerAddr={} queueId={} logicalQueueIdx={} ok: {}", toBrokerName, toBrokerAddr, queueId, logicalQueueIdx, toQueueRouteData);
break;
} catch (MQBrokerException e) {
if ("queue writable".equals(e.getErrorMessage())) {
log.info("reuseTopicLogicalQueue brokerName={} brokerAddr={} queueId={} logicalQueueIdx={} writable, try again.", toBrokerName, toBrokerAddr, queueId, logicalQueueIdx);
continue RETRY;
} else {
throw e;
}
}
}
}
break;
}
if (toQueueRouteData == null) {
toQueueRouteData = mqAdminExt.createMessageQueueForLogicalQueue(toBrokerAddr, topic, logicalQueueIdx, MessageQueueRouteState.WriteOnly);
log.info("createMessageQueueForLogicalQueue brokerName={} brokerAddr={} logicalQueueIdx={} ok: {}", toBrokerName, toBrokerAddr, logicalQueueIdx, toQueueRouteData);
}
MigrateLogicalQueueBody migrateLogicalQueueBody;
if (forceDelta == null) {
try {
migrateLogicalQueueBody = mqAdminExt.migrateTopicLogicalQueuePrepare(fromQueueRouteData, toQueueRouteData);
} catch (RemotingConnectException e) {
throw new SubCommandException("migrateTopicLogicalQueuePrepare", e);
}
fromQueueRouteData = migrateLogicalQueueBody.getFromQueueRouteData();
toQueueRouteData = migrateLogicalQueueBody.getToQueueRouteData();
log.info("migrateTopicLogicalQueuePrepare from {} to {}", fromQueueRouteData, toQueueRouteData);
} else {
toQueueRouteData.setLogicalQueueDelta(forceDelta);
log.warn("migrateTopicLogicalQueuePrepare skip with forceDelta={}", forceDelta);
}
migrateLogicalQueueBody = mqAdminExt.migrateTopicLogicalQueueCommit(fromQueueRouteData, toQueueRouteData);
toQueueRouteData = migrateLogicalQueueBody.getToQueueRouteData();
log.info("migrateTopicLogicalQueueCommit got: {}", toQueueRouteData);
if (forceDelta == null) {
try {
mqAdminExt.migrateTopicLogicalQueueNotify(fromBrokerAddr, fromQueueRouteData, toQueueRouteData);
} finally {
log.info("migrateTopicLogicalQueueNotify fromBroker {} {}", fromQueueRouteData.getBrokerName(), fromBrokerAddr);
}
}
Collection<String> ignoreBrokerNames = Arrays.asList(fromBrokerName, toBrokerName);
Set<String> brokerNames = queueRouteDataList.stream()
.map(LogicalQueueRouteData::getBrokerName)
.filter(v -> !ignoreBrokerNames.contains(v))
.map(v -> lookupBrokerMasterAddr(brokerAddrTable, v))
.collect(Collectors.toSet());
int i = 1;
for (String brokerName : brokerNames) {
String brokerAddr = null;
try {
brokerAddr = lookupBrokerMasterAddr(brokerAddrTable, brokerName);
mqAdminExt.migrateTopicLogicalQueueNotify(brokerAddr, fromQueueRouteData, toQueueRouteData);
} finally {
log.info("migrateTopicLogicalQueueNotify otherBroker {}({}}) ({}/{})", brokerName, brokerAddr, i, brokerNames.size());
}
}
}
@Override public void execute(CommandLine commandLine, Options options,
RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(rpcHook);
mqAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
String topic = commandLine.getOptionValue("t").trim();
String newBrokerName = commandLine.getOptionValue("b").trim();
int logicalQueueIdx = Integer.parseInt(commandLine.getOptionValue("i").trim());
Long forceDelta = null;
if (commandLine.hasOption("fd")) {
forceDelta = Long.parseLong(commandLine.getOptionValue("fd").trim());
}
execute(mqAdminExt, topic, logicalQueueIdx, newBrokerName, forceDelta);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
mqAdminExt.shutdown();
}
}
private static String lookupBrokerMasterAddr(Map<String, BrokerData> brokerAddrTable, String brokerName) {
return ofNullable(brokerAddrTable.get(brokerName)).map(BrokerData::selectBrokerAddr).orElse(null);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.logicalqueue;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.SortedMap;
import java.util.stream.Collectors;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class QueryTopicLogicalQueueMappingCommand implements SubCommand {
private static final Logger log = LoggerFactory.getLogger(LoggerName.STDOUT_LOGGER_NAME);
@Override public String commandName() {
return "queryTopicLogicalQueueMapping";
}
@Override public String commandDesc() {
return "query logical queue mapping info of a topic";
}
@Override public Options buildCommandlineOptions(Options options) {
Option opt;
opt = new Option("t", "topic", true, "topic name.");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("b", "brokerAddr", true, "broker address.");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("c", "clusterName", true, "cluster name.");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("m", "merge", false, "merge all brokers' result into one.");
opt.setRequired(false);
options.addOption(opt);
return options;
}
@Override public void execute(CommandLine commandLine, Options options,
RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
String topic = commandLine.getOptionValue("t").trim();
List<String> brokerAddrs;
if (commandLine.hasOption("b")) {
brokerAddrs = Collections.singletonList(commandLine.getOptionValue("c").trim());
} else if (commandLine.hasOption("c")) {
String clusterName = commandLine.getOptionValue("c").trim();
brokerAddrs = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName).stream().sorted().collect(Collectors.toList());
} else {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
return;
}
SortedMap<String, SortedMap<Integer, List<LogicalQueueRouteData>>> result = Maps.newTreeMap();
for (String brokerAddr : brokerAddrs) {
LogicalQueuesInfo one = defaultMQAdminExt.queryTopicLogicalQueueMapping(brokerAddr, topic);
result.put(brokerAddr, ImmutableSortedMap.copyOf(one));
}
if (commandLine.hasOption("m")) {
SortedMap<Integer, List<LogicalQueueRouteData>> mergedResultMap = Maps.newTreeMap();
result.values().stream().flatMap(map -> map.values().stream()).flatMap(Collection::stream).forEach(queueRouteData -> {
List<LogicalQueueRouteData> sortedQueueRouteDataList = mergedResultMap.computeIfAbsent(queueRouteData.getLogicalQueueIndex(), ignore -> Lists.newArrayList());
int idx = Collections.binarySearch(sortedQueueRouteDataList, queueRouteData,
Comparator.comparingLong(LogicalQueueRouteData::getLogicalQueueDelta)
.thenComparing(LogicalQueueRouteData::getMessageQueue)
.thenComparingInt(LogicalQueueRouteData::getStateOrdinal));
if (idx < 0) {
idx = -idx - 1;
}
sortedQueueRouteDataList.add(idx, queueRouteData);
});
System.out.printf("%s%n", JSON.toJSONString(ImmutableList.copyOf(mergedResultMap.values())));
} else {
System.out.printf("%s%n", JSON.toJSONString(result));
}
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.logicalqueue;
import com.google.common.collect.Maps;
import java.util.BitSet;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class UpdateTopicLogicalQueueMappingCommand implements SubCommand {
private static final Logger log = LoggerFactory.getLogger(LoggerName.STDOUT_LOGGER_NAME);
@Override public String commandName() {
return "updateTopicLogicalQueueMapping";
}
@Override public String commandDesc() {
return "update logical queue mapping info of a topic";
}
@Override public Options buildCommandlineOptions(Options options) {
Option opt;
opt = new Option("t", "topic", true, "topic name.");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("q", "queue", true, "message queue id.");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("i", "index", true, "logical queue index.");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("b", "broker", true, "broker addr.");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("c", "clusterName", true, "cluster name.");
opt.setRequired(false);
options.addOption(opt);
return options;
}
public void execute(DefaultMQAdminExt defaultMQAdminExt, String topic, Collection<String> brokerAddrs) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
Map<String, TopicConfig> topicConfigMap = Maps.newHashMapWithExpectedSize(brokerAddrs.size());
Map<String, BitSet> allocatedMessageQueueMap = Maps.newHashMap();
BitSet allocatedLogicalQueueIndices = new BitSet();
brokerAddrs = brokerAddrs.stream().sorted().collect(Collectors.toList());
for (String brokerAddr : brokerAddrs) {
TopicConfig topicConfig = defaultMQAdminExt.examineTopicConfig(brokerAddr, topic);
if (topicConfig == null) {
log.warn("examineTopicConfig brokerAddr={} topic={} not exist, skip!", brokerAddr, topic);
continue;
}
topicConfigMap.put(brokerAddr, topicConfig);
BitSet allocatedQueueIds = new BitSet();
Optional.ofNullable(defaultMQAdminExt.queryTopicLogicalQueueMapping(brokerAddr, topic))
.ifPresent(queueRouteData -> queueRouteData.forEach((idx, value) -> {
allocatedLogicalQueueIndices.set(idx);
value.stream().mapToInt(LogicalQueueRouteData::getQueueId).forEach(allocatedQueueIds::set);
}));
allocatedMessageQueueMap.put(brokerAddr, allocatedQueueIds);
}
int unallocatedLogicalQueueIdx = -1;
for (Map.Entry<String, TopicConfig> entry : topicConfigMap.entrySet()) {
String brokerAddr = entry.getKey();
TopicConfig topicConfig = entry.getValue();
int queueNums = Integer.max(topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums());
BitSet allocatedQueueIds = allocatedMessageQueueMap.get(brokerAddr);
for (int unallocatedQueueId = allocatedQueueIds.nextClearBit(0); unallocatedQueueId < queueNums; unallocatedQueueId = allocatedQueueIds.nextClearBit(unallocatedQueueId + 1)) {
unallocatedLogicalQueueIdx = allocatedLogicalQueueIndices.nextClearBit(unallocatedLogicalQueueIdx + 1);
log.info("updateTopicLogicalQueueMapping brokerAddr={} topic={} queueId={} to {}", brokerAddr, topic, unallocatedQueueId, unallocatedLogicalQueueIdx);
defaultMQAdminExt.updateTopicLogicalQueueMapping(brokerAddr, topic, unallocatedQueueId, unallocatedLogicalQueueIdx);
allocatedQueueIds.set(unallocatedQueueId);
allocatedLogicalQueueIndices.set(unallocatedLogicalQueueIdx);
}
}
}
@Override public void execute(CommandLine commandLine, Options options,
RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
defaultMQAdminExt.start();
String topic = commandLine.getOptionValue("t").trim();
List<String> brokerAddrs;
if (commandLine.hasOption("b")) {
String brokerAddr = commandLine.getOptionValue("b").trim();
boolean hasQueueId = commandLine.hasOption("q");
boolean hasLogicalQueueIndex = commandLine.hasOption("i");
if (hasQueueId && hasLogicalQueueIndex) {
int queueId = Integer.parseInt(commandLine.getOptionValue("q").trim());
int logicalQueueIndex = Integer.parseInt(commandLine.getOptionValue("i").trim());
defaultMQAdminExt.updateTopicLogicalQueueMapping(brokerAddr, topic, queueId, logicalQueueIndex);
log.info("updateTopicLogicalQueueMapping brokerAddr={} topic={} queueId={} to {}", brokerAddr, topic, queueId, logicalQueueIndex);
return;
} else if (hasQueueId || hasLogicalQueueIndex) {
log.error("logicalQueueIndex and queueId must be specified together.");
return;
} else {
log.error("brokerAddr specified but no logicalQueueIndex and queueId found");
return;
}
} else if (commandLine.hasOption("c")) {
String clusterName = commandLine.getOptionValue("c").trim();
brokerAddrs = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName).stream().sorted().collect(Collectors.toList());
} else {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
return;
}
this.execute(defaultMQAdminExt, topic, brokerAddrs);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.logicalqueue;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimaps;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class UpdateTopicLogicalQueueNumCommand implements SubCommand {
private static final Logger log = LoggerFactory.getLogger(LoggerName.STDOUT_LOGGER_NAME);
@Override public String commandName() {
return "updateTopicLogicalQueueNum";
}
@Override public String commandDesc() {
return "change logical queue num (increase or decrease) of a topic.";
}
@Override public Options buildCommandlineOptions(Options options) {
Option opt;
opt = new Option("t", "topic", true, "topic name.");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("c", "clusterName", true, "cluster name.");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("n", "num", true, "logical queue num.");
opt.setRequired(true);
options.addOption(opt);
return options;
}
@Override public void execute(CommandLine commandLine, Options options,
RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
defaultMQAdminExt.start();
String clusterName = commandLine.getOptionValue("c").trim();
String topic = commandLine.getOptionValue("t").trim();
int newLogicalQueueNum = Integer.parseUnsignedInt(commandLine.getOptionValue("n"));
execute(defaultMQAdminExt, clusterName, topic, newLogicalQueueNum);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
public void execute(DefaultMQAdminExt defaultMQAdminExt, String clusterName, String topic,
int newLogicalQueueNum) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, MQBrokerException, InterruptedException, SubCommandException {
List<String> brokerAddrs = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName).stream().sorted().collect(Collectors.toList());
Map<String, TopicConfig> topicConfigsByBrokerAddr = Maps.newHashMapWithExpectedSize(brokerAddrs.size());
NavigableMap<Integer, List<LogicalQueueRouteData>> allLogicalQueueMapByIndex = Maps.newTreeMap();
Map<String, LogicalQueuesInfo> allLogicalQueueMapByBroker = Maps.newHashMap();
for (String brokerAddr : brokerAddrs) {
TopicConfig topicConfig = defaultMQAdminExt.examineTopicConfig(brokerAddr, topic);
if (topicConfig == null) {
log.info("examineTopicConfig brokerAddr={} topic={} not exist, skip!", brokerAddr, topic);
continue;
}
topicConfigsByBrokerAddr.put(brokerAddr, topicConfig);
LogicalQueuesInfo logicalQueuesInfo = defaultMQAdminExt.queryTopicLogicalQueueMapping(brokerAddr, topic);
if (logicalQueuesInfo == null) {
throw new SubCommandException(String.format(Locale.ENGLISH, "broker=%s topic=%s logical queue not enabled", brokerAddr, topic));
}
allLogicalQueueMapByBroker.put(brokerAddr, logicalQueuesInfo);
logicalQueuesInfo.values().stream().flatMap(Collection::stream).forEach(queueRouteData -> {
List<LogicalQueueRouteData> sortedQueueRouteDataList = allLogicalQueueMapByIndex.computeIfAbsent(queueRouteData.getLogicalQueueIndex(), ignore -> Lists.newArrayListWithExpectedSize(1));
int idx = Collections.binarySearch(sortedQueueRouteDataList, queueRouteData,
Comparator.comparingLong(LogicalQueueRouteData::getLogicalQueueDelta)
.thenComparing(LogicalQueueRouteData::getMessageQueue)
.thenComparingInt(LogicalQueueRouteData::getStateOrdinal));
if (idx < 0) {
idx = -idx - 1;
}
sortedQueueRouteDataList.add(idx, queueRouteData);
});
}
int oldLogicalQueueNum = (int) allLogicalQueueMapByIndex.values().stream().filter(queueRouteDataList -> queueRouteDataList.stream().anyMatch(LogicalQueueRouteData::isWritable)).count();
if (oldLogicalQueueNum == newLogicalQueueNum) {
log.info("logical queue num not changed!");
} else if (oldLogicalQueueNum < newLogicalQueueNum) {
increaseLogicalQueueNum(defaultMQAdminExt, allLogicalQueueMapByBroker, allLogicalQueueMapByIndex, topicConfigsByBrokerAddr, oldLogicalQueueNum, newLogicalQueueNum);
} else {
decreaseLogicalQueueNum(defaultMQAdminExt, allLogicalQueueMapByIndex, oldLogicalQueueNum, newLogicalQueueNum);
}
}
private void increaseLogicalQueueNum(DefaultMQAdminExt defaultMQAdminExt,
Map<String, LogicalQueuesInfo> allLogicalQueuesInfoMapByBroker,
NavigableMap<Integer, List<LogicalQueueRouteData>> allLogicalQueueMapByIndex,
Map<String, TopicConfig> topicConfigsByBrokerAddr, int oldLogicalQueueNum,
int newLogicalQueueNum) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
int curLogicalQueueNum = oldLogicalQueueNum;
String topic = topicConfigsByBrokerAddr.values().stream().findAny().map(TopicConfig::getTopicName).get();
// try use queue not be assigned as logical queue
for (Map.Entry<String, TopicConfig> e : topicConfigsByBrokerAddr.entrySet()) {
String brokerAddr = e.getKey();
TopicConfig topicConfig = e.getValue();
LogicalQueuesInfo logicalQueuesInfo = allLogicalQueuesInfoMapByBroker.getOrDefault(brokerAddr, new LogicalQueuesInfo());
ListMultimap<Integer, LogicalQueueRouteData> m = Multimaps.index(logicalQueuesInfo.values().stream().flatMap(Collection::stream).iterator(), LogicalQueueRouteData::getQueueId);
for (int queueId = 0, writeQueueNums = topicConfig.getWriteQueueNums(); queueId < writeQueueNums; queueId++) {
if (m.get(queueId).stream().anyMatch(LogicalQueueRouteData::isWritable)) {
continue;
}
int logicalQueueIdx = curLogicalQueueNum;
LogicalQueueRouteData queueRouteData;
try {
queueRouteData = defaultMQAdminExt.reuseTopicLogicalQueue(brokerAddr, topic, queueId, logicalQueueIdx, MessageQueueRouteState.Normal);
} finally {
log.info("updateTopicLogicalQueueMapping reuse expired message queue from sealing logical queue brokerAddr={} queueId={} logicalQueueIdx={}", brokerAddr, queueId, logicalQueueIdx);
}
curLogicalQueueNum++;
if (curLogicalQueueNum >= newLogicalQueueNum) {
return;
}
allLogicalQueueMapByIndex.computeIfAbsent(logicalQueueIdx, integer -> Lists.newArrayListWithExpectedSize(1)).add(queueRouteData);
logicalQueuesInfo.computeIfAbsent(logicalQueueIdx, integer -> Lists.newArrayListWithExpectedSize(1)).add(queueRouteData);
}
}
// try reuse still sealing logical queue
for (Map.Entry<Integer, List<LogicalQueueRouteData>> entry : allLogicalQueueMapByIndex.entrySet()) {
List<LogicalQueueRouteData> queueRouteDataList = entry.getValue();
if (queueRouteDataList.size() == 0 || queueRouteDataList.stream().anyMatch(LogicalQueueRouteData::isWritable)) {
continue;
}
int logicalQueueIdx = entry.getKey();
// this is a sealing logical queue
LogicalQueueRouteData queueRouteData = queueRouteDataList.get(queueRouteDataList.size() - 1);
String brokerAddr = queueRouteData.getBrokerAddr();
List<LogicalQueueRouteData> queueRouteDataListByBroker = allLogicalQueuesInfoMapByBroker.get(brokerAddr).get(logicalQueueIdx);
if (queueRouteData.isExpired()) {
int queueId = queueRouteData.getQueueId();
try {
queueRouteData = defaultMQAdminExt.reuseTopicLogicalQueue(brokerAddr, topic, queueId, logicalQueueIdx, MessageQueueRouteState.Normal);
} finally {
log.info("updateTopicLogicalQueueMapping reuse expired message queue from sealing logical queue brokerAddr={} queueId={} logicalQueueIdx={}", brokerAddr, queueId, logicalQueueIdx);
}
queueRouteDataList.add(queueRouteData);
queueRouteDataListByBroker.add(queueRouteData);
} else {
// create a message queue in last broker
// not expired message queue can not be reused, since delta value will not be described by one `long`
int queueId = -1;
try {
queueRouteData = defaultMQAdminExt.createMessageQueueForLogicalQueue(brokerAddr, topic, logicalQueueIdx, MessageQueueRouteState.Normal);
queueId = queueRouteData.getQueueId();
} finally {
log.info("updateTopicLogicalQueueMapping create message queue from sealing logical queue brokerAddr={} queueId={} logicalQueueIdx={}", brokerAddr, queueId, logicalQueueIdx);
}
queueRouteDataList.add(queueRouteData);
queueRouteDataListByBroker.add(queueRouteData);
topicConfigsByBrokerAddr.get(brokerAddr).setWriteQueueNums(queueId + 1);
}
curLogicalQueueNum++;
if (curLogicalQueueNum >= newLogicalQueueNum) {
return;
}
}
// try broker already with expired message queue
for (Map.Entry<String, LogicalQueuesInfo> entry : allLogicalQueuesInfoMapByBroker.entrySet()) {
String brokerAddr = entry.getKey();
for (Iterator<LogicalQueueRouteData> it = entry.getValue().values().stream().flatMap(Collection::stream)
.filter(LogicalQueueRouteData::isExpired)
.sorted(Comparator.comparingInt(LogicalQueueRouteData::getLogicalQueueIndex).thenComparingInt(LogicalQueueRouteData::getQueueId))
.limit(newLogicalQueueNum - curLogicalQueueNum)
.iterator(); it.hasNext(); ) {
LogicalQueueRouteData queueRouteData = it.next();
try {
LogicalQueueRouteData result = defaultMQAdminExt.reuseTopicLogicalQueue(brokerAddr, topic, queueRouteData.getQueueId(), queueRouteData.getLogicalQueueIndex(), MessageQueueRouteState.Normal);
// modify in-place
queueRouteData.copyFrom(result);
} finally {
log.info("updateTopicLogicalQueueMapping reuse expired message queue from sealing logical queue brokerAddr={} queueId={} logicalQueueIdx={}", brokerAddr, queueRouteData.getQueueId(), queueRouteData.getLogicalQueueIndex());
}
allLogicalQueueMapByIndex.get(queueRouteData.getLogicalQueueIndex()).stream()
.filter(LogicalQueueRouteData::isExpired)
.filter(v -> Objects.equals(brokerAddr, v.getBrokerAddr()) && queueRouteData.getQueueId() == v.getQueueId() && queueRouteData.getLogicalQueueIndex() == v.getLogicalQueueIndex())
.forEach(v -> v.copyFrom(queueRouteData));
curLogicalQueueNum++;
if (curLogicalQueueNum >= newLogicalQueueNum) {
return;
}
}
}
// try broker with least amount message queue, if amount equal, random select
for (; curLogicalQueueNum < newLogicalQueueNum; curLogicalQueueNum++) {
Map.Entry<String, LogicalQueuesInfo> entry = allLogicalQueuesInfoMapByBroker.entrySet().stream().min(Comparator.comparingInt(value -> value.getValue().values().stream().flatMapToInt(l -> IntStream.of(l.size())).sum())).get();
String brokerAddr = entry.getKey();
int logicalQueueIdx = curLogicalQueueNum;
int queueId = -1;
LogicalQueueRouteData queueRouteData;
try {
queueRouteData = defaultMQAdminExt.createMessageQueueForLogicalQueue(brokerAddr, topic, logicalQueueIdx, MessageQueueRouteState.Normal);
queueId = queueRouteData.getQueueId();
} finally {
log.info("updateTopicLogicalQueueMapping create message queue from fresh brokerAddr={} queueId={} logicalQueueIdx={}", brokerAddr, queueId, logicalQueueIdx);
}
entry.getValue().put(logicalQueueIdx, Lists.newArrayList(queueRouteData));
}
}
private void decreaseLogicalQueueNum(DefaultMQAdminExt defaultMQAdminExt,
NavigableMap<Integer, List<LogicalQueueRouteData>> allLogicalQueueMapByIndex,
int oldLogicalQueueNum,
int newLogicalQueueNum) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException, SubCommandException {
// seal logical queue from greatest index
Map.Entry<Integer, List<LogicalQueueRouteData>> maxActiveEntry = allLogicalQueueMapByIndex.lastEntry();
int curLogicalQueueNum = oldLogicalQueueNum;
while (curLogicalQueueNum > newLogicalQueueNum) {
boolean anyQueueSealed = false;
for (LogicalQueueRouteData queueRouteData : maxActiveEntry.getValue()) {
if (queueRouteData.isWritable()) {
anyQueueSealed = true;
LogicalQueueRouteData resultQueueRouteData = queueRouteData;
try {
resultQueueRouteData = defaultMQAdminExt.sealTopicLogicalQueue(queueRouteData.getBrokerAddr(), queueRouteData);
} finally {
log.info("seal message queue: {}", resultQueueRouteData);
}
}
}
if (anyQueueSealed) {
curLogicalQueueNum--;
}
maxActiveEntry = allLogicalQueueMapByIndex.lowerEntry(maxActiveEntry.getKey());
if (maxActiveEntry == null) {
throw new SubCommandException(String.format(Locale.ENGLISH, "oldLogicalQueueNum=%d newLogicalQueueNum=%d curLogicalQueueNum=%d but can not find lowerEntry, unexpected situation", oldLogicalQueueNum, newLogicalQueueNum, curLogicalQueueNum));
}
}
}
}
......@@ -29,6 +29,7 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueMappingCommand;
public class UpdateTopicSubCommand implements SubCommand {
......@@ -67,6 +68,10 @@ public class UpdateTopicSubCommand implements SubCommand {
opt.setRequired(false);
options.addOption(opt);
opt = new Option("lq", "logicalQueue", true, "set logical queue nums");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("p", "perm", true, "set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]");
opt.setRequired(false);
options.addOption(opt);
......@@ -132,7 +137,17 @@ public class UpdateTopicSubCommand implements SubCommand {
}
topicConfig.setOrder(isOrder);
boolean useLogicalQueue = false;
if (commandLine.hasOption("lq")) {
useLogicalQueue = Boolean.parseBoolean(commandLine.getOptionValue("lq").trim());
}
if (commandLine.hasOption('b')) {
if (useLogicalQueue) {
System.out.printf("-lq and -b can not be used together.%n");
return;
}
String addr = commandLine.getOptionValue('b').trim();
defaultMQAdminExt.start();
......@@ -156,6 +171,7 @@ public class UpdateTopicSubCommand implements SubCommand {
Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
for (String addr : masterSet) {
defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
System.out.printf("create topic to %s success.%n", addr);
......@@ -177,6 +193,10 @@ public class UpdateTopicSubCommand implements SubCommand {
}
System.out.printf("%s", topicConfig);
if (useLogicalQueue) {
new UpdateTopicLogicalQueueMappingCommand().execute(defaultMQAdminExt, topicConfig.getTopicName(), masterSet);
}
return;
}
......
......@@ -34,6 +34,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.admin.TopicOffset;
......@@ -59,6 +60,9 @@ import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
......@@ -68,6 +72,8 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.assertj.core.util.Lists;
import org.assertj.core.util.Maps;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
......@@ -75,14 +81,24 @@ import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class DefaultMQAdminExtTest {
private static final String broker1Addr = "127.0.0.1:10911";
private static final String broker1Name = "default-broker";
private static final String cluster = "default-cluster";
private static final String broker2Name = "broker-test";
private static final String broker2Addr = "127.0.0.2:10911";
private static final String topic1 = "topic_one";
private static final String topic2 = "topic_two";
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
......@@ -115,33 +131,40 @@ public class DefaultMQAdminExtTest {
when(mQClientAPIImpl.getBrokerConfig(anyString(), anyLong())).thenReturn(properties);
Set<String> topicSet = new HashSet<>();
topicSet.add("topic_one");
topicSet.add("topic_two");
topicSet.add(topic1);
topicSet.add(topic2);
topicList.setTopicList(topicSet);
when(mQClientAPIImpl.getTopicListFromNameServer(anyLong())).thenReturn(topicList);
List<BrokerData> brokerDatas = new ArrayList<>();
HashMap<Long, String> brokerAddrs = new HashMap<>();
brokerAddrs.put(1234l, "127.0.0.1:10911");
brokerAddrs.put(MixAll.MASTER_ID, broker1Addr);
BrokerData brokerData = new BrokerData();
brokerData.setCluster("default-cluster");
brokerData.setBrokerName("default-broker");
brokerData.setCluster(cluster);
brokerData.setBrokerName(broker1Name);
brokerData.setBrokerAddrs(brokerAddrs);
brokerDatas.add(brokerData);
brokerDatas.add(new BrokerData(cluster, broker2Name, (HashMap<Long, String>) Maps.newHashMap(MixAll.MASTER_ID, broker2Addr)));
topicRouteData.setBrokerDatas(brokerDatas);
topicRouteData.setQueueDatas(new ArrayList<QueueData>());
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);
LogicalQueuesInfo logicalQueuesInfoinfo = new LogicalQueuesInfo();
logicalQueuesInfoinfo.put(0, Lists.newArrayList(
new LogicalQueueRouteData(0, 0, new MessageQueue(topic1, broker1Name, 0), MessageQueueRouteState.ReadOnly, 0, 1000, 2000, 3000, broker1Addr),
new LogicalQueueRouteData(0, 1000, new MessageQueue(topic1, broker2Name, 0), MessageQueueRouteState.Normal, 0, -1, -1, -1, broker2Addr)
));
topicRouteData.setLogicalQueuesInfo(logicalQueuesInfoinfo);
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong(), anyBoolean(), any())).thenReturn(topicRouteData);
HashMap<String, String> result = new HashMap<>();
result.put("id", "1234");
result.put("brokerName", "default-broker");
result.put("id", String.valueOf(MixAll.MASTER_ID));
result.put("brokerName", broker1Name);
kvTable.setTable(result);
when(mQClientAPIImpl.getBrokerRuntimeInfo(anyString(), anyLong())).thenReturn(kvTable);
HashMap<String, BrokerData> brokerAddrTable = new HashMap<>();
brokerAddrTable.put("default-broker", brokerData);
brokerAddrTable.put("broker-test", new BrokerData());
brokerAddrTable.put(broker1Name, brokerData);
brokerAddrTable.put(broker2Name, new BrokerData());
clusterInfo.setBrokerAddrTable(brokerAddrTable);
clusterInfo.setClusterAddrTable(new HashMap<String, Set<String>>());
when(mQClientAPIImpl.getBrokerClusterInfo(anyLong())).thenReturn(clusterInfo);
......@@ -251,7 +274,7 @@ public class DefaultMQAdminExtTest {
@Test
public void testFetchBrokerRuntimeStats() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
KVTable brokerStats = defaultMQAdminExt.fetchBrokerRuntimeStats("127.0.0.1:10911");
assertThat(brokerStats.getTable().get("id")).isEqualTo("1234");
assertThat(brokerStats.getTable().get("id")).isEqualTo(String.valueOf(MixAll.MASTER_ID));
assertThat(brokerStats.getTable().get("brokerName")).isEqualTo("default-broker");
}
......@@ -277,7 +300,7 @@ public class DefaultMQAdminExtTest {
@Test
public void testExamineConsumeStats() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats("default-consumer-group", "unit-test");
assertThat(consumeStats.getConsumeTps()).isEqualTo(1234);
assertThat(consumeStats.getConsumeTps()).isGreaterThanOrEqualTo(1234);
}
@Test
......@@ -406,4 +429,32 @@ public class DefaultMQAdminExtTest {
assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getGroupName()).isEqualTo("Consumer-group-one");
assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").isConsumeBroadcastEnable()).isTrue();
}
@Test
public void testMaxOffset() throws Exception {
when(mQClientAPIImpl.getMaxOffset(anyString(), anyString(), anyInt(), anyBoolean(), anyBoolean(), anyLong())).thenReturn(100L);
assertThat(defaultMQAdminExt.maxOffset(new MessageQueue(topic1, broker1Name, 0))).isEqualTo(100L);
}
@Test
public void testSearchOffset() throws Exception {
when(mQClientAPIImpl.searchOffset(anyString(), anyString(), anyInt(), anyLong(), anyLong())).thenReturn(101L);
assertThat(defaultMQAdminExt.searchOffset(new MessageQueue(topic1, broker1Name, 0), System.currentTimeMillis())).isEqualTo(101L);
}
@Test
public void testMaxOffset_LogicalQueue() throws Exception {
when(mQClientAPIImpl.getMaxOffset(eq(broker2Addr), anyString(), anyInt(), anyBoolean(), anyBoolean(), anyLong())).thenReturn(10L);
assertThat(defaultMQAdminExt.maxOffset(new MessageQueue(topic1, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, 0))).isEqualTo(1010L);
}
@Test
public void testSearchOffset_LogicalQueue() throws Exception {
when(mQClientAPIImpl.searchOffset(eq(broker2Addr), anyString(), anyInt(), anyLong(), anyLong())).thenReturn(11L);
assertThat(defaultMQAdminExt.searchOffset(new MessageQueue(topic1, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, 0), System.currentTimeMillis())).isEqualTo(1011L);
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册