提交 a9addc3c 编写于 作者: D dongeforever

Add tests for command

上级 48db31b4
......@@ -1996,7 +1996,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return response;
}
TopicQueueMappingDetail topicQueueMappingDetail = null;
if (Boolean.TRUE.equals(requestHeader.getWithMapping())) {
if (Boolean.TRUE.equals(requestHeader.getLo())) {
topicQueueMappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
}
String content = JSONObject.toJSONString(new TopicConfigAndQueueMapping(topicConfig, topicQueueMappingDetail));
......
......@@ -134,6 +134,7 @@ public class TopicQueueMappingCleanService extends ServiceThread {
GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader();
header.setTopic(topic);
header.setBname(broker);
header.setLo(false);
try {
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null);
RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get();
......@@ -261,7 +262,7 @@ public class TopicQueueMappingCleanService extends ServiceThread {
GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
header.setTopic(topic);
header.setBname(broker);
header.setWithMapping(true);
header.setLo(true);
try {
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_CONFIG, header, null);
RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get();
......
......@@ -2581,7 +2581,7 @@ public class MQClientAPIImpl {
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
header.setTopic(topic);
header.setWithMapping(true);
header.setLo(true);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_CONFIG, header);
RemotingCommand response = this.remotingClient
.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
......
......@@ -18,11 +18,12 @@
package org.apache.rocketmq.common.protocol.header;
import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.common.rpc.TopicRequestHeader;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class GetTopicConfigRequestHeader extends RpcRequestHeader {
public class GetTopicConfigRequestHeader extends TopicRequestHeader {
@Override
public void checkFields() throws RemotingCommandException {
}
......@@ -30,7 +31,6 @@ public class GetTopicConfigRequestHeader extends RpcRequestHeader {
@CFNotNull
private String topic;
private Boolean withMapping;
/**
* @return the topic
......@@ -45,12 +45,4 @@ public class GetTopicConfigRequestHeader extends RpcRequestHeader {
public void setTopic(String topic) {
this.topic = topic;
}
public Boolean getWithMapping() {
return withMapping;
}
public void setWithMapping(Boolean withMapping) {
this.withMapping = withMapping;
}
}
\ No newline at end of file
......@@ -22,9 +22,16 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.log4j.Logger;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.BrokerData;
......@@ -34,9 +41,16 @@ import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminUtils;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.MQAdminStartup;
import org.apache.rocketmq.tools.command.topic.RemappingStaticTopicSubCommand;
import org.apache.rocketmq.tools.command.topic.UpdateStaticTopicSubCommand;
public class MQAdminTestUtils {
private static Logger log = Logger.getLogger(MQAdminTestUtils.class);
......@@ -231,5 +245,65 @@ public class MQAdminTestUtils {
}
public static void createStaticTopicWithCommand(String topic, int queueNum, Set<String> brokers, String cluster, String nameservers) throws Exception {
UpdateStaticTopicSubCommand cmd = new UpdateStaticTopicSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] args;
if (cluster != null) {
args = new String[] {
"-c", cluster,
"-t", topic,
"-qn", String.valueOf(queueNum),
"-n", nameservers
};
} else {
String brokerStr = String.join(",", brokers);
args = new String[] {
"-b", brokerStr,
"-t", topic,
"-qn", String.valueOf(queueNum),
"-n", nameservers
};
}
final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), args, cmd.buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
return;
}
if (commandLine.hasOption('n')) {
String namesrvAddr = commandLine.getOptionValue('n');
System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
}
cmd.execute(commandLine, options, null);
}
public static void remappingStaticTopicWithCommand(String topic, Set<String> brokers, String cluster, String nameservers) throws Exception {
RemappingStaticTopicSubCommand cmd = new RemappingStaticTopicSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] args;
if (cluster != null) {
args = new String[] {
"-c", cluster,
"-t", topic,
"-n", nameservers
};
} else {
String brokerStr = String.join(",", brokers);
args = new String[] {
"-b", brokerStr,
"-t", topic,
"-n", nameservers
};
}
final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), args, cmd.buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
return;
}
if (commandLine.hasOption('n')) {
String namesrvAddr = commandLine.getOptionValue('n');
System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
}
cmd.execute(commandLine, options, null);
}
}
......@@ -2,7 +2,11 @@ package org.apache.rocketmq.test.statictopic;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.log4j.Logger;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
......@@ -14,6 +18,7 @@ import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
......@@ -23,10 +28,12 @@ import org.apache.rocketmq.test.util.MQRandomUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminUtils;
import org.apache.rocketmq.tools.command.MQAdminStartup;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.FixMethodOrder;
import org.junit.Ignore;
import org.junit.Test;
import java.util.ArrayList;
......@@ -56,6 +63,51 @@ public class StaticTopicIT extends BaseConf {
defaultMQAdminExt.start();
}
@Test
public void testCommandsWithCluster() throws Exception {
//This case is used to mock the env to test the command manually
String topic = "static" + MQRandomUtils.getRandomTopic();
RMQNormalProducer producer = getProducer(nsAddr, topic);
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
int queueNum = 10;
int msgEachQueue = 100;
{
MQAdminTestUtils.createStaticTopicWithCommand(topic, queueNum, null, clusterName, nsAddr);
sendMessagesAndCheck(producer, getBrokers(), topic, queueNum, msgEachQueue, 0);
//consume and check
consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 1);
}
{
MQAdminTestUtils.remappingStaticTopicWithCommand(topic, null, clusterName, nsAddr);
Thread.sleep(500);
sendMessagesAndCheck(producer, getBrokers(), topic, queueNum, msgEachQueue, 100);
}
}
@Test
public void testCommandsWithBrokers() throws Exception {
//This case is used to mock the env to test the command manually
String topic = "static" + MQRandomUtils.getRandomTopic();
RMQNormalProducer producer = getProducer(nsAddr, topic);
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
int queueNum = 10;
int msgEachQueue = 100;
{
Set<String> brokers = ImmutableSet.of(broker1Name);
MQAdminTestUtils.createStaticTopicWithCommand(topic, queueNum, brokers, null, nsAddr);
sendMessagesAndCheck(producer, brokers, topic, queueNum, msgEachQueue, 0);
//consume and check
consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 1);
}
{
Set<String> brokers = ImmutableSet.of(broker2Name);
MQAdminTestUtils.remappingStaticTopicWithCommand(topic, brokers, null, nsAddr);
Thread.sleep(500);
sendMessagesAndCheck(producer, brokers, topic, queueNum, msgEachQueue, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 2);
}
}
@Test
public void testNoTargetBrokers() throws Exception {
......@@ -89,7 +141,7 @@ public class StaticTopicIT extends BaseConf {
}
private void sendMessagesAndCheck(RMQNormalProducer producer, Set<String> targetBrokers, String topic, int queueNum, int msgEachQueue, int gen) throws Exception {
private void sendMessagesAndCheck(RMQNormalProducer producer, Set<String> targetBrokers, String topic, int queueNum, int msgEachQueue, long baseOffset) throws Exception {
ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
List<MessageQueue> messageQueueList = producer.getMessageQueue();
Assert.assertEquals(queueNum, messageQueueList.size());
......@@ -109,12 +161,12 @@ public class StaticTopicIT extends BaseConf {
Thread.sleep(100);
for(MessageQueue messageQueue: messageQueueList) {
Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
Assert.assertEquals(msgEachQueue + gen * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, defaultMQAdminExt.maxOffset(messageQueue));
Assert.assertEquals(msgEachQueue + baseOffset, defaultMQAdminExt.maxOffset(messageQueue));
}
TopicStatsTable topicStatsTable = defaultMQAdminExt.examineTopicStats(topic);
for(MessageQueue messageQueue: messageQueueList) {
Assert.assertEquals(0, topicStatsTable.getOffsetTable().get(messageQueue).getMinOffset());
Assert.assertEquals(msgEachQueue + gen * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, topicStatsTable.getOffsetTable().get(messageQueue).getMaxOffset());
Assert.assertEquals(msgEachQueue + baseOffset, topicStatsTable.getOffsetTable().get(messageQueue).getMaxOffset());
}
}
......@@ -214,7 +266,6 @@ public class StaticTopicIT extends BaseConf {
sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 1);
}
System.out.println("=============================================================");
//remapping the static topic
{
Set<String> targetBrokers = ImmutableSet.of(broker2Name);
......@@ -228,7 +279,7 @@ public class StaticTopicIT extends BaseConf {
Assert.assertEquals(TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, mappingOne.getItems().get(mappingOne.getItems().size() - 1).getLogicOffset());
}
Thread.sleep(500);
sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 1);
sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 2);
}
}
......@@ -261,7 +312,7 @@ public class StaticTopicIT extends BaseConf {
MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
//make the metadata
Thread.sleep(500);
sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, i + 1);
sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, (i + 1) * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
}
consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 1, brokers.size());
......@@ -289,7 +340,7 @@ public class StaticTopicIT extends BaseConf {
MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
//leave the time to refresh the metadata
Thread.sleep(500);
sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 1);
sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 1 * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
}
//remapping to broker3Name
......@@ -298,7 +349,7 @@ public class StaticTopicIT extends BaseConf {
MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
//leave the time to refresh the metadata
Thread.sleep(500);
sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 2);
sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 2 * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
}
// 1 -> 2 -> 3, currently 1 should not has any mappings
......
......@@ -268,6 +268,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
}
if (topicStatsTable.getOffsetTable().isEmpty()) {
throw new MQClientException("Not found the topic stats info", null);
}
......
......@@ -20,13 +20,12 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
......@@ -91,6 +90,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
defaultMQAdminExt.start();
String topic = commandLine.getOptionValue('t').trim();
String mapFileName = commandLine.getOptionValue('f').trim();
......@@ -137,6 +137,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
Set<String> targetBrokers = new HashSet<>();
try {
defaultMQAdminExt.start();
if ((!commandLine.hasOption("b") && !commandLine.hasOption('c'))) {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
return;
......@@ -148,6 +149,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
|| clusterInfo.getClusterAddrTable().isEmpty()) {
throw new RuntimeException("The Cluster info is empty");
}
clientMetadata.refreshClusterInfo(clusterInfo);
{
if (commandLine.hasOption("b")) {
String brokerStrs = commandLine.getOptionValue("b").trim();
......
......@@ -92,9 +92,9 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
ClientMetadata clientMetadata = new ClientMetadata();
try {
defaultMQAdminExt.start();
String topic = commandLine.getOptionValue('t').trim();
String mapFileName = commandLine.getOptionValue('f').trim();
String mapData = MixAll.file2String(mapFileName);
......@@ -139,6 +139,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
Set<String> targetBrokers = new HashSet<>();
try {
defaultMQAdminExt.start();
if ((!commandLine.hasOption("b") && !commandLine.hasOption('c'))
|| !commandLine.hasOption("qn")) {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册