提交 3f7dda3c 编写于 作者: X XiaoZYang 提交者: von gosling

[ISSUE #377] FIX Admin subcommand consumeMessage should quit when catching an exception (#378)

上级 bc0c04bf
...@@ -207,6 +207,7 @@ public class ConsumeMessageCommand implements SubCommand { ...@@ -207,6 +207,7 @@ public class ConsumeMessageCommand implements SubCommand {
pullResult = defaultMQPullConsumer.pull(mq, "*", offset, (int)(maxOffset - offset + 1)); pullResult = defaultMQPullConsumer.pull(mq, "*", offset, (int)(maxOffset - offset + 1));
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
return;
} }
if (pullResult != null) { if (pullResult != null) {
offset = pullResult.getNextBeginOffset(); offset = pullResult.getNextBeginOffset();
......
...@@ -16,6 +16,13 @@ ...@@ -16,6 +16,13 @@
*/ */
package org.apache.rocketmq.tools.command.message; package org.apache.rocketmq.tools.command.message;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser; import org.apache.commons.cli.PosixParser;
...@@ -34,14 +41,6 @@ import org.junit.Assert; ...@@ -34,14 +41,6 @@ import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyLong;
...@@ -54,14 +53,14 @@ public class ConsumeMessageCommandTest { ...@@ -54,14 +53,14 @@ public class ConsumeMessageCommandTest {
@BeforeClass @BeforeClass
public static void init() throws MQClientException, RemotingException, MQBrokerException, InterruptedException, public static void init() throws MQClientException, RemotingException, MQBrokerException, InterruptedException,
NoSuchFieldException, IllegalAccessException{ NoSuchFieldException, IllegalAccessException {
consumeMessageCommand = new ConsumeMessageCommand(); consumeMessageCommand = new ConsumeMessageCommand();
DefaultMQPullConsumer defaultMQPullConsumer = mock(DefaultMQPullConsumer.class); DefaultMQPullConsumer defaultMQPullConsumer = mock(DefaultMQPullConsumer.class);
MessageExt msg = new MessageExt(); MessageExt msg = new MessageExt();
msg.setBody(new byte[]{'a'}); msg.setBody(new byte[] {'a'});
List<MessageExt> msgFoundList = new ArrayList<>(); List<MessageExt> msgFoundList = new ArrayList<>();
msgFoundList.add(msg); msgFoundList.add(msg);
final PullResult pullResult = new PullResult(PullStatus.FOUND,2, 0, 1, msgFoundList); final PullResult pullResult = new PullResult(PullStatus.FOUND, 2, 0, 1, msgFoundList);
when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenReturn(pullResult); when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenReturn(pullResult);
when(defaultMQPullConsumer.minOffset(any(MessageQueue.class))).thenReturn(Long.valueOf(0)); when(defaultMQPullConsumer.minOffset(any(MessageQueue.class))).thenReturn(Long.valueOf(0));
...@@ -73,8 +72,9 @@ public class ConsumeMessageCommandTest { ...@@ -73,8 +72,9 @@ public class ConsumeMessageCommandTest {
Field producerField = ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer"); Field producerField = ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
producerField.setAccessible(true); producerField.setAccessible(true);
producerField.set(consumeMessageCommand,defaultMQPullConsumer); producerField.set(consumeMessageCommand, defaultMQPullConsumer);
} }
@AfterClass @AfterClass
public static void terminate() { public static void terminate() {
} }
...@@ -102,11 +102,55 @@ public class ConsumeMessageCommandTest { ...@@ -102,11 +102,55 @@ public class ConsumeMessageCommandTest {
System.setOut(new PrintStream(bos)); System.setOut(new PrintStream(bos));
Options options = ServerUtil.buildCommandlineOptions(new Options()); Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-t mytopic","-b localhost","-i 0", "-n localhost:9876"}; String[] subargs = new String[] {"-t mytopic", "-b localhost", "-i 0", "-n localhost:9876"};
CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + consumeMessageCommand.commandName(), subargs, consumeMessageCommand.buildCommandlineOptions(options), new PosixParser()); CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + consumeMessageCommand.commandName(), subargs, consumeMessageCommand.buildCommandlineOptions(options), new PosixParser());
consumeMessageCommand.execute(commandLine, options, null); consumeMessageCommand.execute(commandLine, options, null);
System.setOut(out); System.setOut(out);
String s = new String(bos.toByteArray()); String s = new String(bos.toByteArray());
Assert.assertTrue(s.contains("Consume ok")); Assert.assertTrue(s.contains("Consume ok"));
} }
@Test
public void testExecuteDefaultWhenPullMessageByQueueGotException() throws SubCommandException, InterruptedException, RemotingException, MQClientException, MQBrokerException, NoSuchFieldException, IllegalAccessException {
DefaultMQPullConsumer defaultMQPullConsumer = mock(DefaultMQPullConsumer.class);
when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenThrow(Exception.class);
Field producerField = ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
producerField.setAccessible(true);
producerField.set(consumeMessageCommand, defaultMQPullConsumer);
PrintStream out = System.out;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
System.setOut(new PrintStream(bos));
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-t topic-not-existu", "-n localhost:9876"};
CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + consumeMessageCommand.commandName(),
subargs, consumeMessageCommand.buildCommandlineOptions(options), new PosixParser());
consumeMessageCommand.execute(commandLine, options, null);
System.setOut(out);
String s = new String(bos.toByteArray());
Assert.assertTrue(!s.contains("Consume ok"));
}
@Test
public void testExecuteByConditionWhenPullMessageByQueueGotException() throws IllegalAccessException, InterruptedException, RemotingException, MQClientException, MQBrokerException, NoSuchFieldException, SubCommandException {
DefaultMQPullConsumer defaultMQPullConsumer = mock(DefaultMQPullConsumer.class);
when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenThrow(Exception.class);
Field producerField = ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
producerField.setAccessible(true);
producerField.set(consumeMessageCommand, defaultMQPullConsumer);
PrintStream out = System.out;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
System.setOut(new PrintStream(bos));
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-t mytopic", "-b localhost", "-i 0", "-n localhost:9876"};
CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + consumeMessageCommand.commandName(), subargs, consumeMessageCommand.buildCommandlineOptions(options), new PosixParser());
consumeMessageCommand.execute(commandLine, options, null);
System.setOut(out);
String s = new String(bos.toByteArray());
Assert.assertTrue(!s.contains("Consume ok"));
}
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册