From 99b42f24f02c8a867120dcf7541390ada9cce291 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=9C=E9=98=B3?= Date: Mon, 24 May 2021 23:02:26 +0800 Subject: [PATCH] [ISSUE #2735] QueryMsgByUniqueKey tool should return all messages with same unique key --- .../rocketmq/client/impl/MQAdminImpl.java | 16 ++- .../tools/admin/DefaultMQAdminExt.java | 12 +- .../tools/admin/DefaultMQAdminExtImpl.java | 12 +- .../QueryMsgByUniqueKeySubCommand.java | 134 +++++++----------- 4 files changed, 79 insertions(+), 95 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java index ca89d613..e46527a5 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java @@ -265,17 +265,23 @@ public class MQAdminImpl { messageId.getOffset(), timeoutMillis); } - public QueryResult queryMessage(String topic, String key, int maxNum, long begin, - long end) throws MQClientException, - InterruptedException { + public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) + throws MQClientException, InterruptedException { + return queryMessage(topic, key, maxNum, begin, end, false); } + public QueryResult queryMessageByUniqKey(String topic, String uniqKey, int maxNum, long begin, long end) + throws MQClientException, InterruptedException { + + return queryMessage(topic, uniqKey, maxNum, begin, end, true); + } + public MessageExt queryMessageByUniqKey(String topic, String uniqKey) throws InterruptedException, MQClientException { - QueryResult qr = this.queryMessage(topic, uniqKey, 32, - MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime() - 1000, Long.MAX_VALUE, true); + QueryResult qr = queryMessageByUniqKey(topic, uniqKey, 32, + MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime() - 1000, Long.MAX_VALUE); if (qr != null && qr.getMessageList() != null && qr.getMessageList().size() > 0) { return qr.getMessageList().get(0); } else { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index 1ca3fe4c..cab36e7d 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -128,12 +128,18 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { } @Override - public QueryResult queryMessage(String topic, String key, int maxNum, long begin, - long end) throws MQClientException, - InterruptedException { + public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) + throws MQClientException, InterruptedException { + return defaultMQAdminExtImpl.queryMessage(topic, key, maxNum, begin, end); } + public QueryResult queryMessageByUniqueKey(String topic, String key, int maxNum, long begin, long end) + throws MQClientException, InterruptedException { + + return defaultMQAdminExtImpl.queryMessageByUniqKey(topic, key, maxNum, begin, end); + } + @Override public void start() throws MQClientException { defaultMQAdminExtImpl.start(); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 22d4005c..ddd99b28 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -991,12 +991,18 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public QueryResult queryMessage(String topic, String key, int maxNum, long begin, - long end) throws MQClientException, - InterruptedException { + public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) + throws MQClientException, InterruptedException { + return this.mqClientInstance.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end); } + public QueryResult queryMessageByUniqKey(String topic, String key, int maxNum, long begin, + long end) throws MQClientException, + InterruptedException { + return this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(topic, key, maxNum, begin, end); + } + @Override public void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq, long offset) throws RemotingException, InterruptedException, MQBrokerException { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java index 9ad07508..badac114 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; +import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.UtilAll; @@ -57,82 +58,40 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand { } } - public static void queryById(final DefaultMQAdminExt admin, final String topic, - final String msgId) throws MQClientException, - RemotingException, MQBrokerException, InterruptedException, IOException { - MessageExt msg = admin.viewMessage(topic, msgId); - - String bodyTmpFilePath = createBodyFile(msg); - - System.out.printf("%-20s %s%n", - "Topic:", - msg.getTopic() - ); - - System.out.printf("%-20s %s%n", - "Tags:", - "[" + msg.getTags() + "]" - ); - - System.out.printf("%-20s %s%n", - "Keys:", - "[" + msg.getKeys() + "]" - ); - - System.out.printf("%-20s %d%n", - "Queue ID:", - msg.getQueueId() - ); - - System.out.printf("%-20s %d%n", - "Queue Offset:", - msg.getQueueOffset() - ); - - System.out.printf("%-20s %d%n", - "CommitLog Offset:", - msg.getCommitLogOffset() - ); - - System.out.printf("%-20s %d%n", - "Reconsume Times:", - msg.getReconsumeTimes() - ); - - System.out.printf("%-20s %s%n", - "Born Timestamp:", - UtilAll.timeMillisToHumanString2(msg.getBornTimestamp()) - ); - - System.out.printf("%-20s %s%n", - "Store Timestamp:", - UtilAll.timeMillisToHumanString2(msg.getStoreTimestamp()) - ); - - System.out.printf("%-20s %s%n", - "Born Host:", - RemotingHelper.parseSocketAddressAddr(msg.getBornHost()) - ); - - System.out.printf("%-20s %s%n", - "Store Host:", - RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()) - ); - - System.out.printf("%-20s %d%n", - "System Flag:", - msg.getSysFlag() - ); - - System.out.printf("%-20s %s%n", - "Properties:", - msg.getProperties() != null ? msg.getProperties().toString() : "" - ); - - System.out.printf("%-20s %s%n", - "Message Body Path:", - bodyTmpFilePath - ); + public static void queryById(final DefaultMQAdminExt admin, final String topic, final String msgId, + final boolean showAll) throws MQClientException, + RemotingException, MQBrokerException, InterruptedException, IOException { + + QueryResult queryResult = admin.queryMessageByUniqueKey(topic, msgId, 32, 0, Long.MAX_VALUE); + assert queryResult != null; + List list = queryResult.getMessageList(); + list.sort((o1, o2) -> (int) (o1.getStoreTimestamp() - o2.getStoreTimestamp())); + for (int i = 0; i < (showAll ? list.size() : 1); i++) { + showMessage(admin, list.get(i), i); + } + } + + private static void showMessage(final DefaultMQAdminExt admin, MessageExt msg, int index) throws IOException { + String bodyTmpFilePath = createBodyFile(msg, index); + + final String strFormat = "%-20s %s%n"; + final String intFormat = "%-20s %d%n"; + + System.out.printf(strFormat, "Topic:", msg.getTopic()); + System.out.printf(strFormat, "Tags:", "[" + msg.getTags() + "]"); + System.out.printf(strFormat, "Keys:", "[" + msg.getKeys() + "]"); + System.out.printf(intFormat, "Queue ID:", msg.getQueueId()); + System.out.printf(intFormat, "Queue Offset:", msg.getQueueOffset()); + System.out.printf(intFormat, "CommitLog Offset:", msg.getCommitLogOffset()); + System.out.printf(intFormat, "Reconsume Times:", msg.getReconsumeTimes()); + System.out.printf(strFormat, "Born Timestamp:", UtilAll.timeMillisToHumanString2(msg.getBornTimestamp())); + System.out.printf(strFormat, "Store Timestamp:", UtilAll.timeMillisToHumanString2(msg.getStoreTimestamp())); + System.out.printf(strFormat, "Born Host:", RemotingHelper.parseSocketAddressAddr(msg.getBornHost())); + System.out.printf(strFormat, "Store Host:", RemotingHelper.parseSocketAddressAddr(msg.getStoreHost())); + System.out.printf(intFormat, "System Flag:", msg.getSysFlag()); + System.out.printf(strFormat, "Properties:", + msg.getProperties() != null ? msg.getProperties().toString() : ""); + System.out.printf(strFormat, "Message Body Path:", bodyTmpFilePath); try { List mtdList = admin.messageTrackDetail(msg); @@ -149,18 +108,21 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand { } } - private static String createBodyFile(MessageExt msg) throws IOException { + private static String createBodyFile(MessageExt msg, int index) throws IOException { DataOutputStream dos = null; try { - String bodyTmpFilePath = "/tmp/rocketmq/msgbodys"; - File file = new File(bodyTmpFilePath); + StringBuffer bodyTmpFilePath = new StringBuffer("/tmp/rocketmq/msgbodys"); + File file = new File(bodyTmpFilePath.toString()); if (!file.exists()) { file.mkdirs(); } - bodyTmpFilePath = bodyTmpFilePath + "/" + msg.getMsgId(); - dos = new DataOutputStream(new FileOutputStream(bodyTmpFilePath)); + bodyTmpFilePath.append("/").append(msg.getMsgId()); + if (index > 0) { + bodyTmpFilePath.append("_" + index); + } + dos = new DataOutputStream(new FileOutputStream(bodyTmpFilePath.toString())); dos.write(msg.getBody()); - return bodyTmpFilePath; + return bodyTmpFilePath.toString(); } finally { if (dos != null) dos.close(); @@ -195,6 +157,10 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand { opt.setRequired(true); options.addOption(opt); + opt = new Option("a", "showAll", false, "Print all message, the limit is 32"); + opt.setRequired(false); + options.addOption(opt); + return options; } @@ -202,11 +168,11 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand { public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { try { - defaultMQAdminExt = createMQAdminExt(rpcHook); final String msgId = commandLine.getOptionValue('i').trim(); final String topic = commandLine.getOptionValue('t').trim(); + final boolean showAll = commandLine.hasOption('a'); if (commandLine.hasOption('g') && commandLine.hasOption('d')) { final String consumerGroup = commandLine.getOptionValue('g').trim(); final String clientId = commandLine.getOptionValue('d').trim(); @@ -214,7 +180,7 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand { defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId); System.out.printf("%s", result); } else { - queryById(defaultMQAdminExt, topic, msgId); + queryById(defaultMQAdminExt, topic, msgId, showAll); } } catch (Exception e) { throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); -- GitLab