diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java index 518926774456b5413254b8fb7db83f928064ceec..aa98ee628704e032ff567e624ba9534ddd157a99 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java @@ -207,6 +207,7 @@ public class ConsumeMessageCommand implements SubCommand { pullResult = defaultMQPullConsumer.pull(mq, "*", offset, (int)(maxOffset - offset + 1)); } catch (Exception e) { e.printStackTrace(); + return; } if (pullResult != null) { offset = pullResult.getNextBeginOffset(); diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java index 9a5998e29e5e6482e85a3ec06de581af113fc297..11543958c0449cd6cf53cdc7f834541f11868745 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java @@ -16,6 +16,13 @@ */ package org.apache.rocketmq.tools.command.message; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; @@ -34,14 +41,6 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import java.io.ByteArrayOutputStream; -import java.io.PrintStream; -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; @@ -54,14 +53,14 @@ public class ConsumeMessageCommandTest { @BeforeClass public static void init() throws MQClientException, RemotingException, MQBrokerException, InterruptedException, - NoSuchFieldException, IllegalAccessException{ + NoSuchFieldException, IllegalAccessException { consumeMessageCommand = new ConsumeMessageCommand(); DefaultMQPullConsumer defaultMQPullConsumer = mock(DefaultMQPullConsumer.class); MessageExt msg = new MessageExt(); - msg.setBody(new byte[]{'a'}); + msg.setBody(new byte[] {'a'}); List msgFoundList = new ArrayList<>(); msgFoundList.add(msg); - final PullResult pullResult = new PullResult(PullStatus.FOUND,2, 0, 1, msgFoundList); + final PullResult pullResult = new PullResult(PullStatus.FOUND, 2, 0, 1, msgFoundList); when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenReturn(pullResult); when(defaultMQPullConsumer.minOffset(any(MessageQueue.class))).thenReturn(Long.valueOf(0)); @@ -73,8 +72,9 @@ public class ConsumeMessageCommandTest { Field producerField = ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer"); producerField.setAccessible(true); - producerField.set(consumeMessageCommand,defaultMQPullConsumer); + producerField.set(consumeMessageCommand, defaultMQPullConsumer); } + @AfterClass public static void terminate() { } @@ -102,11 +102,55 @@ public class ConsumeMessageCommandTest { System.setOut(new PrintStream(bos)); Options options = ServerUtil.buildCommandlineOptions(new Options()); - String[] subargs = new String[] {"-t mytopic","-b localhost","-i 0", "-n localhost:9876"}; + String[] subargs = new String[] {"-t mytopic", "-b localhost", "-i 0", "-n localhost:9876"}; CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + consumeMessageCommand.commandName(), subargs, consumeMessageCommand.buildCommandlineOptions(options), new PosixParser()); consumeMessageCommand.execute(commandLine, options, null); System.setOut(out); String s = new String(bos.toByteArray()); Assert.assertTrue(s.contains("Consume ok")); } + + @Test + public void testExecuteDefaultWhenPullMessageByQueueGotException() throws SubCommandException, InterruptedException, RemotingException, MQClientException, MQBrokerException, NoSuchFieldException, IllegalAccessException { + DefaultMQPullConsumer defaultMQPullConsumer = mock(DefaultMQPullConsumer.class); + when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenThrow(Exception.class); + Field producerField = ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer"); + producerField.setAccessible(true); + producerField.set(consumeMessageCommand, defaultMQPullConsumer); + + PrintStream out = System.out; + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + System.setOut(new PrintStream(bos)); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-t topic-not-existu", "-n localhost:9876"}; + CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + consumeMessageCommand.commandName(), + subargs, consumeMessageCommand.buildCommandlineOptions(options), new PosixParser()); + consumeMessageCommand.execute(commandLine, options, null); + + System.setOut(out); + String s = new String(bos.toByteArray()); + Assert.assertTrue(!s.contains("Consume ok")); + } + + @Test + public void testExecuteByConditionWhenPullMessageByQueueGotException() throws IllegalAccessException, InterruptedException, RemotingException, MQClientException, MQBrokerException, NoSuchFieldException, SubCommandException { + DefaultMQPullConsumer defaultMQPullConsumer = mock(DefaultMQPullConsumer.class); + when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenThrow(Exception.class); + Field producerField = ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer"); + producerField.setAccessible(true); + producerField.set(consumeMessageCommand, defaultMQPullConsumer); + + PrintStream out = System.out; + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + System.setOut(new PrintStream(bos)); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + + String[] subargs = new String[] {"-t mytopic", "-b localhost", "-i 0", "-n localhost:9876"}; + CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + consumeMessageCommand.commandName(), subargs, consumeMessageCommand.buildCommandlineOptions(options), new PosixParser()); + consumeMessageCommand.execute(commandLine, options, null); + + System.setOut(out); + String s = new String(bos.toByteArray()); + Assert.assertTrue(!s.contains("Consume ok")); + } } \ No newline at end of file