From b75af6b0c253f7b9c39e812fd2e7938a99b1495d Mon Sep 17 00:00:00 2001 From: what-a-good-jungle <353187194@qq.com> Date: Sat, 14 Jul 2018 09:55:59 +0800 Subject: [PATCH] [ROCKETMQ-353] Add sendMessageCommand and consumeMessageCommand (#332) --- .../tools/command/MQAdminStartup.java | 4 + .../message/ConsumeMessageCommand.java | 292 ++++++++++++++++++ .../command/message/SendMessageCommand.java | 156 ++++++++++ .../message/ConsumeMessageCommandTest.java | 112 +++++++ .../message/SendMessageCommandTest.java | 90 ++++++ 5 files changed, 654 insertions(+) create mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java create mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java create mode 100644 tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java create mode 100644 tools/src/test/java/org/apache/rocketmq/tools/command/message/SendMessageCommandTest.java diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java index d3342e81..c189e866 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java @@ -48,12 +48,14 @@ import org.apache.rocketmq.tools.command.consumer.DeleteSubscriptionGroupCommand import org.apache.rocketmq.tools.command.consumer.StartMonitoringSubCommand; import org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand; import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand; +import org.apache.rocketmq.tools.command.message.ConsumeMessageCommand; import org.apache.rocketmq.tools.command.message.PrintMessageByQueueCommand; import org.apache.rocketmq.tools.command.message.PrintMessageSubCommand; import org.apache.rocketmq.tools.command.message.QueryMsgByIdSubCommand; import org.apache.rocketmq.tools.command.message.QueryMsgByKeySubCommand; import org.apache.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand; import org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand; +import org.apache.rocketmq.tools.command.message.SendMessageCommand; import org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand; import org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand; import org.apache.rocketmq.tools.command.namesrv.UpdateKvConfigCommand; @@ -193,6 +195,8 @@ public class MQAdminStartup { initCommand(new GetBrokerConfigCommand()); initCommand(new QueryConsumeQueueCommand()); + initCommand(new SendMessageCommand()); + initCommand(new ConsumeMessageCommand()); } private static void initLogback() throws JoranException { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java new file mode 100644 index 00000000..51892677 --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.tools.command.message; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.tools.command.SubCommand; +import org.apache.rocketmq.tools.command.SubCommandException; + +import java.util.Set; + +public class ConsumeMessageCommand implements SubCommand { + + private String topic = null; + private long messageCount = 128; + private DefaultMQPullConsumer defaultMQPullConsumer; + + + public enum ConsumeType { + /** + * Topic only + */ + DEFAULT, + /** + * Topic brokerName queueId set + */ + BYQUEUE, + /** + * Topic brokerName queueId offset set + */ + BYOFFSET + } + + private static long timestampFormat(final String value) { + long timestamp; + try { + timestamp = Long.parseLong(value); + } catch (NumberFormatException e) { + timestamp = UtilAll.parseDate(value, UtilAll.YYYY_MM_DD_HH_MM_SS_SSS).getTime(); + } + + return timestamp; + } + @Override + public String commandName() { + return "consumeMessage"; + } + + @Override + public String commandDesc() { + return "Consume message"; + } + + @Override + public Options buildCommandlineOptions(final Options options) { + Option opt = new Option("t", "topic", true, "Topic name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("b", "brokerName", true, "Broker name"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("i", "queueId", true, "Queue Id"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("o", "offset", true, "Queue offset"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("g", "consumerGroup", true, "Consumer group name"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("s", "beginTimestamp ", true, + "Begin timestamp[currentTimeMillis|yyyy-MM-dd#HH:mm:ss:SSS]"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("e", "endTimestamp ", true, + "End timestamp[currentTimeMillis|yyyy-MM-dd#HH:mm:ss:SSS]"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("c", "MessageNumber", true, "Number of message to be consumed"); + opt.setRequired(false); + options.addOption(opt); + + + return options; + + } + + @Override + public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException { + if (defaultMQPullConsumer == null) { + defaultMQPullConsumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook); + } + defaultMQPullConsumer.setInstanceName(Long.toString(System.currentTimeMillis())); + + long offset = 0; + long timeValueEnd = 0; + long timeValueBegin = 0; + String queueId = null; + String brokerName = null; + ConsumeType consumeType = ConsumeType.DEFAULT; + + try { + /* Group name must be set before consumer start */ + if (commandLine.hasOption('g')) { + String consumerGroup = commandLine.getOptionValue('b').trim(); + defaultMQPullConsumer.setConsumerGroup(consumerGroup); + } + + defaultMQPullConsumer.start(); + + topic = commandLine.getOptionValue('t').trim(); + + if (commandLine.hasOption('c')) { + messageCount = Long.parseLong(commandLine.getOptionValue('c').trim()); + if (messageCount <= 0) { + System.out.print("please input a positive messageNumber!"); + return; + } + } + if (commandLine.hasOption('b')) { + brokerName = commandLine.getOptionValue('b').trim(); + + } + if (commandLine.hasOption('i')) { + if (!commandLine.hasOption('b')) { + System.out.print("Please set the brokerName before queueId!"); + return; + } + queueId = commandLine.getOptionValue('i').trim(); + + consumeType = ConsumeType.BYQUEUE; + } + if (commandLine.hasOption('o')) { + if (consumeType != ConsumeType.BYQUEUE) { + System.out.print("please set queueId before offset!"); + return; + } + offset = Long.parseLong(commandLine.getOptionValue('o').trim()); + consumeType = ConsumeType.BYOFFSET; + } + + if (commandLine.hasOption('s')) { + String timestampStr = commandLine.getOptionValue('s').trim(); + timeValueBegin = timestampFormat(timestampStr); + } + if (commandLine.hasOption('e')) { + String timestampStr = commandLine.getOptionValue('e').trim(); + timeValueEnd = timestampFormat(timestampStr); + } + + switch (consumeType) { + case DEFAULT: + executeDefault(timeValueBegin, timeValueEnd); + break; + case BYOFFSET: + executeByCondition(brokerName, queueId, offset, timeValueBegin, timeValueEnd); + break; + case BYQUEUE: + executeByCondition(brokerName, queueId, 0, timeValueBegin, timeValueEnd); + break; + default: + System.out.print("Unknown type of consume!"); + break; + } + + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } finally { + defaultMQPullConsumer.shutdown(); + } + } + + private void pullMessageByQueue(MessageQueue mq, long minOffset, long maxOffset) { + READQ: + for (long offset = minOffset; offset <= maxOffset; ) { + PullResult pullResult = null; + try { + pullResult = defaultMQPullConsumer.pull(mq, "*", offset, (int)(maxOffset - offset + 1)); + } catch (Exception e) { + e.printStackTrace(); + } + if (pullResult != null) { + offset = pullResult.getNextBeginOffset(); + switch (pullResult.getPullStatus()) { + case FOUND: + System.out.print("Consume ok\n"); + PrintMessageByQueueCommand.printMessage(pullResult.getMsgFoundList(), "UTF-8", + true, true); + break; + case NO_MATCHED_MSG: + System.out.printf("%s no matched msg. status=%s, offset=%s\n", mq, pullResult.getPullStatus(), + offset); + break; + case NO_NEW_MSG: + case OFFSET_ILLEGAL: + System.out.printf("%s print msg finished. status=%s, offset=%s\n", mq, + pullResult.getPullStatus(), offset); + break READQ; + default: + break; + } + } + } + } + + private void executeDefault(long timeValueBegin, long timeValueEnd) { + try { + Set mqs = defaultMQPullConsumer.fetchSubscribeMessageQueues(topic); + long countLeft = messageCount; + for (MessageQueue mq : mqs) { + if (countLeft == 0) { + return; + } + long minOffset = defaultMQPullConsumer.minOffset(mq); + long maxOffset = defaultMQPullConsumer.maxOffset(mq); + if (timeValueBegin > 0) { + minOffset = defaultMQPullConsumer.searchOffset(mq, timeValueBegin); + } + if (timeValueEnd > 0) { + maxOffset = defaultMQPullConsumer.searchOffset(mq, timeValueEnd); + } + if (maxOffset - minOffset > countLeft) { + System.out.printf("The older %d message of the %d queue will be provided\n", countLeft, mq.getQueueId()); + maxOffset = minOffset + countLeft - 1; + countLeft = 0; + } else { + countLeft = countLeft - (maxOffset - minOffset) - 1; + } + + pullMessageByQueue(mq, minOffset, maxOffset); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + private void executeByCondition(String brokerName, String queueId, long offset, long timeValueBegin, long timeValueEnd) { + MessageQueue mq = new MessageQueue(topic, brokerName, Integer.parseInt(queueId)); + try { + long minOffset = defaultMQPullConsumer.minOffset(mq); + long maxOffset = defaultMQPullConsumer.maxOffset(mq); + if (timeValueBegin > 0) { + minOffset = defaultMQPullConsumer.searchOffset(mq, timeValueBegin); + } + if (timeValueEnd > 0) { + maxOffset = defaultMQPullConsumer.searchOffset(mq, timeValueEnd); + } + if (offset > maxOffset) { + System.out.printf("%s no matched msg, offset=%s\n", mq, offset); + return; + } + minOffset = minOffset > offset ? minOffset : offset; + if (maxOffset - minOffset > messageCount) { + System.out.printf("The oldler %d message will be provided\n", messageCount); + maxOffset = minOffset + messageCount - 1; + } + + pullMessageByQueue(mq, minOffset, maxOffset); + } catch (Exception e) { + e.printStackTrace(); + } + } +} \ No newline at end of file diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java new file mode 100644 index 00000000..e4921c6f --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.tools.command.message; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.tools.command.SubCommand; +import org.apache.rocketmq.tools.command.SubCommandException; + +public class SendMessageCommand implements SubCommand { + + private DefaultMQProducer producer; + + @Override + public String commandName() { + return "sendMessage"; + } + + @Override + public String commandDesc() { + return "Send a message"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("t", "topic", true, "Topic name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("p", "body", true, "UTF-8 string format of the message body"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("k", "key", true, "Message keys"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("c", "tags", true, "Message tags"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("b", "broker", true, "Send message to target broker"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("i", "qid", true, "Send message to target queue"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + private DefaultMQProducer createProducer(RPCHook rpcHook) { + if (this.producer != null) { + return producer; + } else { + producer = new DefaultMQProducer(rpcHook); + producer.setProducerGroup(Long.toString(System.currentTimeMillis())); + return producer; + } + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { + Message msg = null; + String topic = commandLine.getOptionValue('t').trim(); + String body = commandLine.getOptionValue('p').trim(); + String tag = null; + String keys = null; + String brokerName = null; + int queueId = -1; + try { + if (commandLine.hasOption('k')) { + keys = commandLine.getOptionValue('k').trim(); + } + if (commandLine.hasOption('c')) { + tag = commandLine.getOptionValue('c').trim(); + } + if (commandLine.hasOption('b')) { + brokerName = commandLine.getOptionValue('b').trim(); + } + if (commandLine.hasOption('i')) { + if (!commandLine.hasOption('b')) { + System.out.print("Broker name must be set if the queue is chosen!"); + return; + } else { + queueId = Integer.parseInt(commandLine.getOptionValue('i').trim()); + } + } + msg = new Message(topic, tag, keys, body.getBytes("utf-8")); + } catch (Exception e) { + throw new RuntimeException(this.getClass().getSimpleName() + " command failed", e); + } + + DefaultMQProducer producer = this.createProducer(rpcHook); + SendResult result; + try { + producer.start(); + if (brokerName != null && queueId > -1) { + MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId); + result = producer.send(msg, messageQueue); + } else { + result = producer.send(msg); + } + + } catch (Exception e) { + throw new RuntimeException(this.getClass().getSimpleName() + " command failed", e); + } finally { + producer.shutdown(); + } + + System.out.printf("%-32s %-4s %-20s %s%n", + "#Broker Name", + "#QID", + "#Send Result", + "#MsgId" + ); + + if (result != null) { + System.out.printf("%-32s %-4s %-20s %s%n", + result.getMessageQueue().getBrokerName(), + result.getMessageQueue().getQueueId(), + result.getSendStatus(), + result.getMsgId() + ); + } else { + System.out.printf("%-32s %-4s %-20s %s%n", + "Unknown", + "Unknown", + "Failed", + "None" + ); + } + } +} \ No newline at end of file diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java new file mode 100644 index 00000000..9a5998e2 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.message; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullStatus; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.command.SubCommandException; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ConsumeMessageCommandTest { + private static ConsumeMessageCommand consumeMessageCommand; + + @BeforeClass + public static void init() throws MQClientException, RemotingException, MQBrokerException, InterruptedException, + NoSuchFieldException, IllegalAccessException{ + consumeMessageCommand = new ConsumeMessageCommand(); + DefaultMQPullConsumer defaultMQPullConsumer = mock(DefaultMQPullConsumer.class); + MessageExt msg = new MessageExt(); + msg.setBody(new byte[]{'a'}); + List msgFoundList = new ArrayList<>(); + msgFoundList.add(msg); + final PullResult pullResult = new PullResult(PullStatus.FOUND,2, 0, 1, msgFoundList); + + when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenReturn(pullResult); + when(defaultMQPullConsumer.minOffset(any(MessageQueue.class))).thenReturn(Long.valueOf(0)); + when(defaultMQPullConsumer.maxOffset(any(MessageQueue.class))).thenReturn(Long.valueOf(1)); + + final Set mqList = new HashSet<>(); + mqList.add(new MessageQueue()); + when(defaultMQPullConsumer.fetchSubscribeMessageQueues(anyString())).thenReturn(mqList); + + Field producerField = ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer"); + producerField.setAccessible(true); + producerField.set(consumeMessageCommand,defaultMQPullConsumer); + } + @AfterClass + public static void terminate() { + } + + @Test + public void testExecuteDefault() throws SubCommandException { + PrintStream out = System.out; + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + System.setOut(new PrintStream(bos)); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-t mytopic", "-n localhost:9876"}; + CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + consumeMessageCommand.commandName(), + subargs, consumeMessageCommand.buildCommandlineOptions(options), new PosixParser()); + consumeMessageCommand.execute(commandLine, options, null); + + System.setOut(out); + String s = new String(bos.toByteArray()); + Assert.assertTrue(s.contains("Consume ok")); + } + + @Test + public void testExecuteByCondition() throws SubCommandException { + PrintStream out = System.out; + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + System.setOut(new PrintStream(bos)); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + + String[] subargs = new String[] {"-t mytopic","-b localhost","-i 0", "-n localhost:9876"}; + CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + consumeMessageCommand.commandName(), subargs, consumeMessageCommand.buildCommandlineOptions(options), new PosixParser()); + consumeMessageCommand.execute(commandLine, options, null); + System.setOut(out); + String s = new String(bos.toByteArray()); + Assert.assertTrue(s.contains("Consume ok")); + } +} \ No newline at end of file diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/message/SendMessageCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/message/SendMessageCommandTest.java new file mode 100644 index 00000000..e4c6673d --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/message/SendMessageCommandTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.tools.command.message; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.command.SubCommandException; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.lang.reflect.Field; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SendMessageCommandTest { + + private static SendMessageCommand sendMessageCommand = new SendMessageCommand(); + + @BeforeClass + public static void init() throws MQClientException, RemotingException, InterruptedException, MQBrokerException, NoSuchFieldException, IllegalAccessException { + + DefaultMQProducer defaultMQProducer = mock(DefaultMQProducer.class); + SendResult sendResult = new SendResult(); + sendResult.setMessageQueue(new MessageQueue()); + sendResult.getMessageQueue().setBrokerName("broker1"); + sendResult.getMessageQueue().setQueueId(1); + sendResult.setSendStatus(SendStatus.SEND_OK); + sendResult.setMsgId("fgwejigherughwueyutyu4t4343t43"); + + when(defaultMQProducer.send(any(Message.class))).thenReturn(sendResult); + when(defaultMQProducer.send(any(Message.class), any(MessageQueue.class))).thenReturn(sendResult); + + Field producerField = SendMessageCommand.class.getDeclaredField("producer"); + producerField.setAccessible(true); + producerField.set(sendMessageCommand, defaultMQProducer); + } + + @AfterClass + public static void terminate() { + } + + @Test + public void testExecute() throws SubCommandException { + PrintStream out = System.out; + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + System.setOut(new PrintStream(bos)); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-t mytopic","-p 'send message test'","-c tagA","-k order-16546745756"}; + CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + sendMessageCommand.commandName(), subargs, sendMessageCommand.buildCommandlineOptions(options), new PosixParser()); + sendMessageCommand.execute(commandLine, options, null); + + subargs = new String[] {"-t mytopic","-p 'send message test'","-c tagA","-k order-16546745756","-b brokera","-i 1"}; + commandLine = ServerUtil.parseCmdLine("mqadmin " + sendMessageCommand.commandName(), subargs, sendMessageCommand.buildCommandlineOptions(options), new PosixParser()); + sendMessageCommand.execute(commandLine, options, null); + System.setOut(out); + String s = new String(bos.toByteArray()); + Assert.assertTrue(s.contains("SEND_OK")); + } +} -- GitLab