未验证 提交 4daf0ee7 编写于 作者: H Heng Du 提交者: GitHub

Merge pull request #2943 from lizhimins/develop

[ISSUE #2735] QueryMsgByUniqueKey tool should return all messages with same unique key
......@@ -268,17 +268,23 @@ public class MQAdminImpl {
messageId.getOffset(), timeoutMillis);
}
public QueryResult queryMessage(String topic, String key, int maxNum, long begin,
long end) throws MQClientException,
InterruptedException {
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
return queryMessage(topic, key, maxNum, begin, end, false);
}
public QueryResult queryMessageByUniqKey(String topic, String uniqKey, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
return queryMessage(topic, uniqKey, maxNum, begin, end, true);
}
public MessageExt queryMessageByUniqKey(String topic,
String uniqKey) throws InterruptedException, MQClientException {
QueryResult qr = this.queryMessage(topic, uniqKey, 32,
MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime() - 1000, Long.MAX_VALUE, true);
QueryResult qr = queryMessageByUniqKey(topic, uniqKey, 32,
MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime() - 1000, Long.MAX_VALUE);
if (qr != null && qr.getMessageList() != null && qr.getMessageList().size() > 0) {
return qr.getMessageList().get(0);
} else {
......
......@@ -128,12 +128,18 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
public QueryResult queryMessage(String topic, String key, int maxNum, long begin,
long end) throws MQClientException,
InterruptedException {
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
return defaultMQAdminExtImpl.queryMessage(topic, key, maxNum, begin, end);
}
public QueryResult queryMessageByUniqKey(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
return defaultMQAdminExtImpl.queryMessageByUniqKey(topic, key, maxNum, begin, end);
}
@Override
public void start() throws MQClientException {
defaultMQAdminExtImpl.start();
......
......@@ -998,12 +998,18 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
public QueryResult queryMessage(String topic, String key, int maxNum, long begin,
long end) throws MQClientException,
InterruptedException {
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
return this.mqClientInstance.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
}
public QueryResult queryMessageByUniqKey(String topic, String key, int maxNum, long begin,
long end) throws MQClientException, InterruptedException {
return this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(topic, key, maxNum, begin, end);
}
@Override
public void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq,
long offset) throws RemotingException, InterruptedException, MQBrokerException {
......
......@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.UtilAll;
......@@ -57,82 +58,43 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand {
}
}
public static void queryById(final DefaultMQAdminExt admin, final String topic,
final String msgId) throws MQClientException,
public static void queryById(final DefaultMQAdminExt admin, final String topic, final String msgId,
final boolean showAll) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException, IOException {
MessageExt msg = admin.viewMessage(topic, msgId);
String bodyTmpFilePath = createBodyFile(msg);
System.out.printf("%-20s %s%n",
"Topic:",
msg.getTopic()
);
System.out.printf("%-20s %s%n",
"Tags:",
"[" + msg.getTags() + "]"
);
System.out.printf("%-20s %s%n",
"Keys:",
"[" + msg.getKeys() + "]"
);
System.out.printf("%-20s %d%n",
"Queue ID:",
msg.getQueueId()
);
System.out.printf("%-20s %d%n",
"Queue Offset:",
msg.getQueueOffset()
);
System.out.printf("%-20s %d%n",
"CommitLog Offset:",
msg.getCommitLogOffset()
);
System.out.printf("%-20s %d%n",
"Reconsume Times:",
msg.getReconsumeTimes()
);
System.out.printf("%-20s %s%n",
"Born Timestamp:",
UtilAll.timeMillisToHumanString2(msg.getBornTimestamp())
);
System.out.printf("%-20s %s%n",
"Store Timestamp:",
UtilAll.timeMillisToHumanString2(msg.getStoreTimestamp())
);
System.out.printf("%-20s %s%n",
"Born Host:",
RemotingHelper.parseSocketAddressAddr(msg.getBornHost())
);
System.out.printf("%-20s %s%n",
"Store Host:",
RemotingHelper.parseSocketAddressAddr(msg.getStoreHost())
);
System.out.printf("%-20s %d%n",
"System Flag:",
msg.getSysFlag()
);
System.out.printf("%-20s %s%n",
"Properties:",
msg.getProperties() != null ? msg.getProperties().toString() : ""
);
System.out.printf("%-20s %s%n",
"Message Body Path:",
bodyTmpFilePath
);
QueryResult queryResult = admin.queryMessageByUniqKey(topic, msgId, 32, 0, Long.MAX_VALUE);
assert queryResult != null;
List<MessageExt> list = queryResult.getMessageList();
if (list == null || list.size() == 0) {
return;
}
list.sort((o1, o2) -> (int) (o1.getStoreTimestamp() - o2.getStoreTimestamp()));
for (int i = 0; i < (showAll ? list.size() : 1); i++) {
showMessage(admin, list.get(i), i);
}
}
private static void showMessage(final DefaultMQAdminExt admin, MessageExt msg, int index) throws IOException {
String bodyTmpFilePath = createBodyFile(msg, index);
final String strFormat = "%-20s %s%n";
final String intFormat = "%-20s %d%n";
System.out.printf(strFormat, "Topic:", msg.getTopic());
System.out.printf(strFormat, "Tags:", "[" + msg.getTags() + "]");
System.out.printf(strFormat, "Keys:", "[" + msg.getKeys() + "]");
System.out.printf(intFormat, "Queue ID:", msg.getQueueId());
System.out.printf(intFormat, "Queue Offset:", msg.getQueueOffset());
System.out.printf(intFormat, "CommitLog Offset:", msg.getCommitLogOffset());
System.out.printf(intFormat, "Reconsume Times:", msg.getReconsumeTimes());
System.out.printf(strFormat, "Born Timestamp:", UtilAll.timeMillisToHumanString2(msg.getBornTimestamp()));
System.out.printf(strFormat, "Store Timestamp:", UtilAll.timeMillisToHumanString2(msg.getStoreTimestamp()));
System.out.printf(strFormat, "Born Host:", RemotingHelper.parseSocketAddressAddr(msg.getBornHost()));
System.out.printf(strFormat, "Store Host:", RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()));
System.out.printf(intFormat, "System Flag:", msg.getSysFlag());
System.out.printf(strFormat, "Properties:",
msg.getProperties() != null ? msg.getProperties().toString() : "");
System.out.printf(strFormat, "Message Body Path:", bodyTmpFilePath);
try {
List<MessageTrack> mtdList = admin.messageTrackDetail(msg);
......@@ -149,18 +111,21 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand {
}
}
private static String createBodyFile(MessageExt msg) throws IOException {
private static String createBodyFile(MessageExt msg, int index) throws IOException {
DataOutputStream dos = null;
try {
String bodyTmpFilePath = "/tmp/rocketmq/msgbodys";
File file = new File(bodyTmpFilePath);
StringBuffer bodyTmpFilePath = new StringBuffer("/tmp/rocketmq/msgbodys");
File file = new File(bodyTmpFilePath.toString());
if (!file.exists()) {
file.mkdirs();
}
bodyTmpFilePath = bodyTmpFilePath + "/" + msg.getMsgId();
dos = new DataOutputStream(new FileOutputStream(bodyTmpFilePath));
bodyTmpFilePath.append("/").append(msg.getMsgId());
if (index > 0) {
bodyTmpFilePath.append("_" + index);
}
dos = new DataOutputStream(new FileOutputStream(bodyTmpFilePath.toString()));
dos.write(msg.getBody());
return bodyTmpFilePath;
return bodyTmpFilePath.toString();
} finally {
if (dos != null)
dos.close();
......@@ -195,6 +160,10 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand {
opt.setRequired(true);
options.addOption(opt);
opt = new Option("a", "showAll", false, "Print all message, the limit is 32");
opt.setRequired(false);
options.addOption(opt);
return options;
}
......@@ -202,11 +171,11 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
try {
defaultMQAdminExt = createMQAdminExt(rpcHook);
final String msgId = commandLine.getOptionValue('i').trim();
final String topic = commandLine.getOptionValue('t').trim();
final boolean showAll = commandLine.hasOption('a');
if (commandLine.hasOption('g') && commandLine.hasOption('d')) {
final String consumerGroup = commandLine.getOptionValue('g').trim();
final String clientId = commandLine.getOptionValue('d').trim();
......@@ -214,7 +183,7 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand {
defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
System.out.printf("%s", result);
} else {
queryById(defaultMQAdminExt, topic, msgId);
queryById(defaultMQAdminExt, topic, msgId, showAll);
}
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册