提交 99b42f24 编写于 作者: 斜阳

[ISSUE #2735] QueryMsgByUniqueKey tool should return all messages with same unique key

上级 7028af68
...@@ -265,17 +265,23 @@ public class MQAdminImpl { ...@@ -265,17 +265,23 @@ public class MQAdminImpl {
messageId.getOffset(), timeoutMillis); messageId.getOffset(), timeoutMillis);
} }
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
long end) throws MQClientException, throws MQClientException, InterruptedException {
InterruptedException {
return queryMessage(topic, key, maxNum, begin, end, false); return queryMessage(topic, key, maxNum, begin, end, false);
} }
public QueryResult queryMessageByUniqKey(String topic, String uniqKey, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
return queryMessage(topic, uniqKey, maxNum, begin, end, true);
}
public MessageExt queryMessageByUniqKey(String topic, public MessageExt queryMessageByUniqKey(String topic,
String uniqKey) throws InterruptedException, MQClientException { String uniqKey) throws InterruptedException, MQClientException {
QueryResult qr = this.queryMessage(topic, uniqKey, 32, QueryResult qr = queryMessageByUniqKey(topic, uniqKey, 32,
MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime() - 1000, Long.MAX_VALUE, true); MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime() - 1000, Long.MAX_VALUE);
if (qr != null && qr.getMessageList() != null && qr.getMessageList().size() > 0) { if (qr != null && qr.getMessageList() != null && qr.getMessageList().size() > 0) {
return qr.getMessageList().get(0); return qr.getMessageList().get(0);
} else { } else {
......
...@@ -128,12 +128,18 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { ...@@ -128,12 +128,18 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
} }
@Override @Override
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
long end) throws MQClientException, throws MQClientException, InterruptedException {
InterruptedException {
return defaultMQAdminExtImpl.queryMessage(topic, key, maxNum, begin, end); return defaultMQAdminExtImpl.queryMessage(topic, key, maxNum, begin, end);
} }
public QueryResult queryMessageByUniqueKey(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
return defaultMQAdminExtImpl.queryMessageByUniqKey(topic, key, maxNum, begin, end);
}
@Override @Override
public void start() throws MQClientException { public void start() throws MQClientException {
defaultMQAdminExtImpl.start(); defaultMQAdminExtImpl.start();
......
...@@ -991,12 +991,18 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { ...@@ -991,12 +991,18 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
} }
@Override @Override
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
long end) throws MQClientException, throws MQClientException, InterruptedException {
InterruptedException {
return this.mqClientInstance.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end); return this.mqClientInstance.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
} }
public QueryResult queryMessageByUniqKey(String topic, String key, int maxNum, long begin,
long end) throws MQClientException,
InterruptedException {
return this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(topic, key, maxNum, begin, end);
}
@Override @Override
public void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq, public void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq,
long offset) throws RemotingException, InterruptedException, MQBrokerException { long offset) throws RemotingException, InterruptedException, MQBrokerException {
......
...@@ -24,6 +24,7 @@ import java.util.List; ...@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option; import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
...@@ -57,82 +58,40 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand { ...@@ -57,82 +58,40 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand {
} }
} }
public static void queryById(final DefaultMQAdminExt admin, final String topic, public static void queryById(final DefaultMQAdminExt admin, final String topic, final String msgId,
final String msgId) throws MQClientException, final boolean showAll) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException, IOException { RemotingException, MQBrokerException, InterruptedException, IOException {
MessageExt msg = admin.viewMessage(topic, msgId);
QueryResult queryResult = admin.queryMessageByUniqueKey(topic, msgId, 32, 0, Long.MAX_VALUE);
String bodyTmpFilePath = createBodyFile(msg); assert queryResult != null;
List<MessageExt> list = queryResult.getMessageList();
System.out.printf("%-20s %s%n", list.sort((o1, o2) -> (int) (o1.getStoreTimestamp() - o2.getStoreTimestamp()));
"Topic:", for (int i = 0; i < (showAll ? list.size() : 1); i++) {
msg.getTopic() showMessage(admin, list.get(i), i);
); }
}
System.out.printf("%-20s %s%n",
"Tags:", private static void showMessage(final DefaultMQAdminExt admin, MessageExt msg, int index) throws IOException {
"[" + msg.getTags() + "]" String bodyTmpFilePath = createBodyFile(msg, index);
);
final String strFormat = "%-20s %s%n";
System.out.printf("%-20s %s%n", final String intFormat = "%-20s %d%n";
"Keys:",
"[" + msg.getKeys() + "]" System.out.printf(strFormat, "Topic:", msg.getTopic());
); System.out.printf(strFormat, "Tags:", "[" + msg.getTags() + "]");
System.out.printf(strFormat, "Keys:", "[" + msg.getKeys() + "]");
System.out.printf("%-20s %d%n", System.out.printf(intFormat, "Queue ID:", msg.getQueueId());
"Queue ID:", System.out.printf(intFormat, "Queue Offset:", msg.getQueueOffset());
msg.getQueueId() System.out.printf(intFormat, "CommitLog Offset:", msg.getCommitLogOffset());
); System.out.printf(intFormat, "Reconsume Times:", msg.getReconsumeTimes());
System.out.printf(strFormat, "Born Timestamp:", UtilAll.timeMillisToHumanString2(msg.getBornTimestamp()));
System.out.printf("%-20s %d%n", System.out.printf(strFormat, "Store Timestamp:", UtilAll.timeMillisToHumanString2(msg.getStoreTimestamp()));
"Queue Offset:", System.out.printf(strFormat, "Born Host:", RemotingHelper.parseSocketAddressAddr(msg.getBornHost()));
msg.getQueueOffset() System.out.printf(strFormat, "Store Host:", RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()));
); System.out.printf(intFormat, "System Flag:", msg.getSysFlag());
System.out.printf(strFormat, "Properties:",
System.out.printf("%-20s %d%n", msg.getProperties() != null ? msg.getProperties().toString() : "");
"CommitLog Offset:", System.out.printf(strFormat, "Message Body Path:", bodyTmpFilePath);
msg.getCommitLogOffset()
);
System.out.printf("%-20s %d%n",
"Reconsume Times:",
msg.getReconsumeTimes()
);
System.out.printf("%-20s %s%n",
"Born Timestamp:",
UtilAll.timeMillisToHumanString2(msg.getBornTimestamp())
);
System.out.printf("%-20s %s%n",
"Store Timestamp:",
UtilAll.timeMillisToHumanString2(msg.getStoreTimestamp())
);
System.out.printf("%-20s %s%n",
"Born Host:",
RemotingHelper.parseSocketAddressAddr(msg.getBornHost())
);
System.out.printf("%-20s %s%n",
"Store Host:",
RemotingHelper.parseSocketAddressAddr(msg.getStoreHost())
);
System.out.printf("%-20s %d%n",
"System Flag:",
msg.getSysFlag()
);
System.out.printf("%-20s %s%n",
"Properties:",
msg.getProperties() != null ? msg.getProperties().toString() : ""
);
System.out.printf("%-20s %s%n",
"Message Body Path:",
bodyTmpFilePath
);
try { try {
List<MessageTrack> mtdList = admin.messageTrackDetail(msg); List<MessageTrack> mtdList = admin.messageTrackDetail(msg);
...@@ -149,18 +108,21 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand { ...@@ -149,18 +108,21 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand {
} }
} }
private static String createBodyFile(MessageExt msg) throws IOException { private static String createBodyFile(MessageExt msg, int index) throws IOException {
DataOutputStream dos = null; DataOutputStream dos = null;
try { try {
String bodyTmpFilePath = "/tmp/rocketmq/msgbodys"; StringBuffer bodyTmpFilePath = new StringBuffer("/tmp/rocketmq/msgbodys");
File file = new File(bodyTmpFilePath); File file = new File(bodyTmpFilePath.toString());
if (!file.exists()) { if (!file.exists()) {
file.mkdirs(); file.mkdirs();
} }
bodyTmpFilePath = bodyTmpFilePath + "/" + msg.getMsgId(); bodyTmpFilePath.append("/").append(msg.getMsgId());
dos = new DataOutputStream(new FileOutputStream(bodyTmpFilePath)); if (index > 0) {
bodyTmpFilePath.append("_" + index);
}
dos = new DataOutputStream(new FileOutputStream(bodyTmpFilePath.toString()));
dos.write(msg.getBody()); dos.write(msg.getBody());
return bodyTmpFilePath; return bodyTmpFilePath.toString();
} finally { } finally {
if (dos != null) if (dos != null)
dos.close(); dos.close();
...@@ -195,6 +157,10 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand { ...@@ -195,6 +157,10 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand {
opt.setRequired(true); opt.setRequired(true);
options.addOption(opt); options.addOption(opt);
opt = new Option("a", "showAll", false, "Print all message, the limit is 32");
opt.setRequired(false);
options.addOption(opt);
return options; return options;
} }
...@@ -202,11 +168,11 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand { ...@@ -202,11 +168,11 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
try { try {
defaultMQAdminExt = createMQAdminExt(rpcHook); defaultMQAdminExt = createMQAdminExt(rpcHook);
final String msgId = commandLine.getOptionValue('i').trim(); final String msgId = commandLine.getOptionValue('i').trim();
final String topic = commandLine.getOptionValue('t').trim(); final String topic = commandLine.getOptionValue('t').trim();
final boolean showAll = commandLine.hasOption('a');
if (commandLine.hasOption('g') && commandLine.hasOption('d')) { if (commandLine.hasOption('g') && commandLine.hasOption('d')) {
final String consumerGroup = commandLine.getOptionValue('g').trim(); final String consumerGroup = commandLine.getOptionValue('g').trim();
final String clientId = commandLine.getOptionValue('d').trim(); final String clientId = commandLine.getOptionValue('d').trim();
...@@ -214,7 +180,7 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand { ...@@ -214,7 +180,7 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand {
defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId); defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
System.out.printf("%s", result); System.out.printf("%s", result);
} else { } else {
queryById(defaultMQAdminExt, topic, msgId); queryById(defaultMQAdminExt, topic, msgId, showAll);
} }
} catch (Exception e) { } catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册