提交 b75af6b0 编写于 作者: W what-a-good-jungle 提交者: von gosling

[ROCKETMQ-353] Add sendMessageCommand and consumeMessageCommand (#332)

上级 840e04c7
......@@ -48,12 +48,14 @@ import org.apache.rocketmq.tools.command.consumer.DeleteSubscriptionGroupCommand
import org.apache.rocketmq.tools.command.consumer.StartMonitoringSubCommand;
import org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand;
import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand;
import org.apache.rocketmq.tools.command.message.ConsumeMessageCommand;
import org.apache.rocketmq.tools.command.message.PrintMessageByQueueCommand;
import org.apache.rocketmq.tools.command.message.PrintMessageSubCommand;
import org.apache.rocketmq.tools.command.message.QueryMsgByIdSubCommand;
import org.apache.rocketmq.tools.command.message.QueryMsgByKeySubCommand;
import org.apache.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand;
import org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand;
import org.apache.rocketmq.tools.command.message.SendMessageCommand;
import org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand;
import org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand;
import org.apache.rocketmq.tools.command.namesrv.UpdateKvConfigCommand;
......@@ -193,6 +195,8 @@ public class MQAdminStartup {
initCommand(new GetBrokerConfigCommand());
initCommand(new QueryConsumeQueueCommand());
initCommand(new SendMessageCommand());
initCommand(new ConsumeMessageCommand());
}
private static void initLogback() throws JoranException {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.message;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import java.util.Set;
public class ConsumeMessageCommand implements SubCommand {
private String topic = null;
private long messageCount = 128;
private DefaultMQPullConsumer defaultMQPullConsumer;
public enum ConsumeType {
/**
* Topic only
*/
DEFAULT,
/**
* Topic brokerName queueId set
*/
BYQUEUE,
/**
* Topic brokerName queueId offset set
*/
BYOFFSET
}
private static long timestampFormat(final String value) {
long timestamp;
try {
timestamp = Long.parseLong(value);
} catch (NumberFormatException e) {
timestamp = UtilAll.parseDate(value, UtilAll.YYYY_MM_DD_HH_MM_SS_SSS).getTime();
}
return timestamp;
}
@Override
public String commandName() {
return "consumeMessage";
}
@Override
public String commandDesc() {
return "Consume message";
}
@Override
public Options buildCommandlineOptions(final Options options) {
Option opt = new Option("t", "topic", true, "Topic name");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("b", "brokerName", true, "Broker name");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("i", "queueId", true, "Queue Id");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("o", "offset", true, "Queue offset");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("g", "consumerGroup", true, "Consumer group name");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("s", "beginTimestamp ", true,
"Begin timestamp[currentTimeMillis|yyyy-MM-dd#HH:mm:ss:SSS]");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("e", "endTimestamp ", true,
"End timestamp[currentTimeMillis|yyyy-MM-dd#HH:mm:ss:SSS]");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("c", "MessageNumber", true, "Number of message to be consumed");
opt.setRequired(false);
options.addOption(opt);
return options;
}
@Override
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException {
if (defaultMQPullConsumer == null) {
defaultMQPullConsumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
}
defaultMQPullConsumer.setInstanceName(Long.toString(System.currentTimeMillis()));
long offset = 0;
long timeValueEnd = 0;
long timeValueBegin = 0;
String queueId = null;
String brokerName = null;
ConsumeType consumeType = ConsumeType.DEFAULT;
try {
/* Group name must be set before consumer start */
if (commandLine.hasOption('g')) {
String consumerGroup = commandLine.getOptionValue('b').trim();
defaultMQPullConsumer.setConsumerGroup(consumerGroup);
}
defaultMQPullConsumer.start();
topic = commandLine.getOptionValue('t').trim();
if (commandLine.hasOption('c')) {
messageCount = Long.parseLong(commandLine.getOptionValue('c').trim());
if (messageCount <= 0) {
System.out.print("please input a positive messageNumber!");
return;
}
}
if (commandLine.hasOption('b')) {
brokerName = commandLine.getOptionValue('b').trim();
}
if (commandLine.hasOption('i')) {
if (!commandLine.hasOption('b')) {
System.out.print("Please set the brokerName before queueId!");
return;
}
queueId = commandLine.getOptionValue('i').trim();
consumeType = ConsumeType.BYQUEUE;
}
if (commandLine.hasOption('o')) {
if (consumeType != ConsumeType.BYQUEUE) {
System.out.print("please set queueId before offset!");
return;
}
offset = Long.parseLong(commandLine.getOptionValue('o').trim());
consumeType = ConsumeType.BYOFFSET;
}
if (commandLine.hasOption('s')) {
String timestampStr = commandLine.getOptionValue('s').trim();
timeValueBegin = timestampFormat(timestampStr);
}
if (commandLine.hasOption('e')) {
String timestampStr = commandLine.getOptionValue('e').trim();
timeValueEnd = timestampFormat(timestampStr);
}
switch (consumeType) {
case DEFAULT:
executeDefault(timeValueBegin, timeValueEnd);
break;
case BYOFFSET:
executeByCondition(brokerName, queueId, offset, timeValueBegin, timeValueEnd);
break;
case BYQUEUE:
executeByCondition(brokerName, queueId, 0, timeValueBegin, timeValueEnd);
break;
default:
System.out.print("Unknown type of consume!");
break;
}
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQPullConsumer.shutdown();
}
}
private void pullMessageByQueue(MessageQueue mq, long minOffset, long maxOffset) {
READQ:
for (long offset = minOffset; offset <= maxOffset; ) {
PullResult pullResult = null;
try {
pullResult = defaultMQPullConsumer.pull(mq, "*", offset, (int)(maxOffset - offset + 1));
} catch (Exception e) {
e.printStackTrace();
}
if (pullResult != null) {
offset = pullResult.getNextBeginOffset();
switch (pullResult.getPullStatus()) {
case FOUND:
System.out.print("Consume ok\n");
PrintMessageByQueueCommand.printMessage(pullResult.getMsgFoundList(), "UTF-8",
true, true);
break;
case NO_MATCHED_MSG:
System.out.printf("%s no matched msg. status=%s, offset=%s\n", mq, pullResult.getPullStatus(),
offset);
break;
case NO_NEW_MSG:
case OFFSET_ILLEGAL:
System.out.printf("%s print msg finished. status=%s, offset=%s\n", mq,
pullResult.getPullStatus(), offset);
break READQ;
default:
break;
}
}
}
}
private void executeDefault(long timeValueBegin, long timeValueEnd) {
try {
Set<MessageQueue> mqs = defaultMQPullConsumer.fetchSubscribeMessageQueues(topic);
long countLeft = messageCount;
for (MessageQueue mq : mqs) {
if (countLeft == 0) {
return;
}
long minOffset = defaultMQPullConsumer.minOffset(mq);
long maxOffset = defaultMQPullConsumer.maxOffset(mq);
if (timeValueBegin > 0) {
minOffset = defaultMQPullConsumer.searchOffset(mq, timeValueBegin);
}
if (timeValueEnd > 0) {
maxOffset = defaultMQPullConsumer.searchOffset(mq, timeValueEnd);
}
if (maxOffset - minOffset > countLeft) {
System.out.printf("The older %d message of the %d queue will be provided\n", countLeft, mq.getQueueId());
maxOffset = minOffset + countLeft - 1;
countLeft = 0;
} else {
countLeft = countLeft - (maxOffset - minOffset) - 1;
}
pullMessageByQueue(mq, minOffset, maxOffset);
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void executeByCondition(String brokerName, String queueId, long offset, long timeValueBegin, long timeValueEnd) {
MessageQueue mq = new MessageQueue(topic, brokerName, Integer.parseInt(queueId));
try {
long minOffset = defaultMQPullConsumer.minOffset(mq);
long maxOffset = defaultMQPullConsumer.maxOffset(mq);
if (timeValueBegin > 0) {
minOffset = defaultMQPullConsumer.searchOffset(mq, timeValueBegin);
}
if (timeValueEnd > 0) {
maxOffset = defaultMQPullConsumer.searchOffset(mq, timeValueEnd);
}
if (offset > maxOffset) {
System.out.printf("%s no matched msg, offset=%s\n", mq, offset);
return;
}
minOffset = minOffset > offset ? minOffset : offset;
if (maxOffset - minOffset > messageCount) {
System.out.printf("The oldler %d message will be provided\n", messageCount);
maxOffset = minOffset + messageCount - 1;
}
pullMessageByQueue(mq, minOffset, maxOffset);
} catch (Exception e) {
e.printStackTrace();
}
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.message;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class SendMessageCommand implements SubCommand {
private DefaultMQProducer producer;
@Override
public String commandName() {
return "sendMessage";
}
@Override
public String commandDesc() {
return "Send a message";
}
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("t", "topic", true, "Topic name");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("p", "body", true, "UTF-8 string format of the message body");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("k", "key", true, "Message keys");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("c", "tags", true, "Message tags");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("b", "broker", true, "Send message to target broker");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("i", "qid", true, "Send message to target queue");
opt.setRequired(false);
options.addOption(opt);
return options;
}
private DefaultMQProducer createProducer(RPCHook rpcHook) {
if (this.producer != null) {
return producer;
} else {
producer = new DefaultMQProducer(rpcHook);
producer.setProducerGroup(Long.toString(System.currentTimeMillis()));
return producer;
}
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
Message msg = null;
String topic = commandLine.getOptionValue('t').trim();
String body = commandLine.getOptionValue('p').trim();
String tag = null;
String keys = null;
String brokerName = null;
int queueId = -1;
try {
if (commandLine.hasOption('k')) {
keys = commandLine.getOptionValue('k').trim();
}
if (commandLine.hasOption('c')) {
tag = commandLine.getOptionValue('c').trim();
}
if (commandLine.hasOption('b')) {
brokerName = commandLine.getOptionValue('b').trim();
}
if (commandLine.hasOption('i')) {
if (!commandLine.hasOption('b')) {
System.out.print("Broker name must be set if the queue is chosen!");
return;
} else {
queueId = Integer.parseInt(commandLine.getOptionValue('i').trim());
}
}
msg = new Message(topic, tag, keys, body.getBytes("utf-8"));
} catch (Exception e) {
throw new RuntimeException(this.getClass().getSimpleName() + " command failed", e);
}
DefaultMQProducer producer = this.createProducer(rpcHook);
SendResult result;
try {
producer.start();
if (brokerName != null && queueId > -1) {
MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId);
result = producer.send(msg, messageQueue);
} else {
result = producer.send(msg);
}
} catch (Exception e) {
throw new RuntimeException(this.getClass().getSimpleName() + " command failed", e);
} finally {
producer.shutdown();
}
System.out.printf("%-32s %-4s %-20s %s%n",
"#Broker Name",
"#QID",
"#Send Result",
"#MsgId"
);
if (result != null) {
System.out.printf("%-32s %-4s %-20s %s%n",
result.getMessageQueue().getBrokerName(),
result.getMessageQueue().getQueueId(),
result.getSendStatus(),
result.getMsgId()
);
} else {
System.out.printf("%-32s %-4s %-20s %s%n",
"Unknown",
"Unknown",
"Failed",
"None"
);
}
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.message;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ConsumeMessageCommandTest {
private static ConsumeMessageCommand consumeMessageCommand;
@BeforeClass
public static void init() throws MQClientException, RemotingException, MQBrokerException, InterruptedException,
NoSuchFieldException, IllegalAccessException{
consumeMessageCommand = new ConsumeMessageCommand();
DefaultMQPullConsumer defaultMQPullConsumer = mock(DefaultMQPullConsumer.class);
MessageExt msg = new MessageExt();
msg.setBody(new byte[]{'a'});
List<MessageExt> msgFoundList = new ArrayList<>();
msgFoundList.add(msg);
final PullResult pullResult = new PullResult(PullStatus.FOUND,2, 0, 1, msgFoundList);
when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenReturn(pullResult);
when(defaultMQPullConsumer.minOffset(any(MessageQueue.class))).thenReturn(Long.valueOf(0));
when(defaultMQPullConsumer.maxOffset(any(MessageQueue.class))).thenReturn(Long.valueOf(1));
final Set<MessageQueue> mqList = new HashSet<>();
mqList.add(new MessageQueue());
when(defaultMQPullConsumer.fetchSubscribeMessageQueues(anyString())).thenReturn(mqList);
Field producerField = ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
producerField.setAccessible(true);
producerField.set(consumeMessageCommand,defaultMQPullConsumer);
}
@AfterClass
public static void terminate() {
}
@Test
public void testExecuteDefault() throws SubCommandException {
PrintStream out = System.out;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
System.setOut(new PrintStream(bos));
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-t mytopic", "-n localhost:9876"};
CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + consumeMessageCommand.commandName(),
subargs, consumeMessageCommand.buildCommandlineOptions(options), new PosixParser());
consumeMessageCommand.execute(commandLine, options, null);
System.setOut(out);
String s = new String(bos.toByteArray());
Assert.assertTrue(s.contains("Consume ok"));
}
@Test
public void testExecuteByCondition() throws SubCommandException {
PrintStream out = System.out;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
System.setOut(new PrintStream(bos));
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-t mytopic","-b localhost","-i 0", "-n localhost:9876"};
CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + consumeMessageCommand.commandName(), subargs, consumeMessageCommand.buildCommandlineOptions(options), new PosixParser());
consumeMessageCommand.execute(commandLine, options, null);
System.setOut(out);
String s = new String(bos.toByteArray());
Assert.assertTrue(s.contains("Consume ok"));
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.message;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.lang.reflect.Field;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class SendMessageCommandTest {
private static SendMessageCommand sendMessageCommand = new SendMessageCommand();
@BeforeClass
public static void init() throws MQClientException, RemotingException, InterruptedException, MQBrokerException, NoSuchFieldException, IllegalAccessException {
DefaultMQProducer defaultMQProducer = mock(DefaultMQProducer.class);
SendResult sendResult = new SendResult();
sendResult.setMessageQueue(new MessageQueue());
sendResult.getMessageQueue().setBrokerName("broker1");
sendResult.getMessageQueue().setQueueId(1);
sendResult.setSendStatus(SendStatus.SEND_OK);
sendResult.setMsgId("fgwejigherughwueyutyu4t4343t43");
when(defaultMQProducer.send(any(Message.class))).thenReturn(sendResult);
when(defaultMQProducer.send(any(Message.class), any(MessageQueue.class))).thenReturn(sendResult);
Field producerField = SendMessageCommand.class.getDeclaredField("producer");
producerField.setAccessible(true);
producerField.set(sendMessageCommand, defaultMQProducer);
}
@AfterClass
public static void terminate() {
}
@Test
public void testExecute() throws SubCommandException {
PrintStream out = System.out;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
System.setOut(new PrintStream(bos));
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-t mytopic","-p 'send message test'","-c tagA","-k order-16546745756"};
CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + sendMessageCommand.commandName(), subargs, sendMessageCommand.buildCommandlineOptions(options), new PosixParser());
sendMessageCommand.execute(commandLine, options, null);
subargs = new String[] {"-t mytopic","-p 'send message test'","-c tagA","-k order-16546745756","-b brokera","-i 1"};
commandLine = ServerUtil.parseCmdLine("mqadmin " + sendMessageCommand.commandName(), subargs, sendMessageCommand.buildCommandlineOptions(options), new PosixParser());
sendMessageCommand.execute(commandLine, options, null);
System.setOut(out);
String s = new String(bos.toByteArray());
Assert.assertTrue(s.contains("SEND_OK"));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册