未验证 提交 a460c5c1 编写于 作者: Z zhangjidi2016 提交者: GitHub

[ISSUE #1770] Add a query message trace command in mqadmin. (#2303)

Co-authored-by: Nzhangjidi2016 <zhangjidi@cmss.chinamobile.com>
上级 9ddcab41
...@@ -103,6 +103,11 @@ public class TraceDataEncoder { ...@@ -103,6 +103,11 @@ public class TraceDataEncoder {
// add the context type // add the context type
subAfterContext.setContextCode(Integer.parseInt(line[6])); subAfterContext.setContextCode(Integer.parseInt(line[6]));
} }
// compatible with the old version
if (line.length >= 9) {
subAfterContext.setTimeStamp(Long.parseLong(line[7]));
subAfterContext.setGroupName(line[8]);
}
resList.add(subAfterContext); resList.add(subAfterContext);
} }
} }
...@@ -165,7 +170,10 @@ public class TraceDataEncoder { ...@@ -165,7 +170,10 @@ public class TraceDataEncoder {
.append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.isSuccess()).append(TraceConstants.CONTENT_SPLITOR)// .append(ctx.isSuccess()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)// .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getContextCode()).append(TraceConstants.FIELD_SPLITOR); .append(ctx.getContextCode()).append(TraceConstants.CONTENT_SPLITOR)
.append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)
.append(ctx.getGroupName()).append(TraceConstants.FIELD_SPLITOR);
} }
} }
break; break;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
import java.util.ArrayList;
import java.util.List;
public class TraceView {
private String msgId;
private String tags;
private String keys;
private String storeHost;
private String clientHost;
private int costTime;
private String msgType;
private String offSetMsgId;
private long timeStamp;
private long bornTime;
private String topic;
private String groupName;
private String status;
public static List<TraceView> decodeFromTraceTransData(String key, String messageBody) {
List<TraceView> messageTraceViewList = new ArrayList<TraceView>();
if (messageBody == null || messageBody.length() <= 0) {
return messageTraceViewList;
}
List<TraceContext> traceContextList = TraceDataEncoder.decoderFromTraceDataString(messageBody);
for (TraceContext context : traceContextList) {
TraceView messageTraceView = new TraceView();
TraceBean traceBean = context.getTraceBeans().get(0);
if (!traceBean.getMsgId().equals(key)) {
continue;
}
messageTraceView.setCostTime(context.getCostTime());
messageTraceView.setGroupName(context.getGroupName());
if (context.isSuccess()) {
messageTraceView.setStatus("success");
}
else {
messageTraceView.setStatus("failed");
}
messageTraceView.setKeys(traceBean.getKeys());
messageTraceView.setMsgId(traceBean.getMsgId());
messageTraceView.setTags(traceBean.getTags());
messageTraceView.setTopic(traceBean.getTopic());
messageTraceView.setMsgType(context.getTraceType().name());
messageTraceView.setOffSetMsgId(traceBean.getOffsetMsgId());
messageTraceView.setTimeStamp(context.getTimeStamp());
messageTraceView.setStoreHost(traceBean.getStoreHost());
messageTraceView.setClientHost(traceBean.getClientHost());
messageTraceViewList.add(messageTraceView);
}
return messageTraceViewList;
}
public String getMsgId() {
return msgId;
}
public void setMsgId(String msgId) {
this.msgId = msgId;
}
public String getTags() {
return tags;
}
public void setTags(String tags) {
this.tags = tags;
}
public String getKeys() {
return keys;
}
public void setKeys(String keys) {
this.keys = keys;
}
public String getStoreHost() {
return storeHost;
}
public void setStoreHost(String storeHost) {
this.storeHost = storeHost;
}
public String getClientHost() {
return clientHost;
}
public void setClientHost(String clientHost) {
this.clientHost = clientHost;
}
public int getCostTime() {
return costTime;
}
public void setCostTime(int costTime) {
this.costTime = costTime;
}
public String getMsgType() {
return msgType;
}
public void setMsgType(String msgType) {
this.msgType = msgType;
}
public String getOffSetMsgId() {
return offSetMsgId;
}
public void setOffSetMsgId(String offSetMsgId) {
this.offSetMsgId = offSetMsgId;
}
public long getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}
public long getBornTime() {
return bornTime;
}
public void setBornTime(long bornTime) {
this.bornTime = bornTime;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageType;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
public class TraceDataEncoderTest {
private String traceData;
private long time;
@Before
public void init() {
time = System.currentTimeMillis();
traceData = new StringBuilder()
.append("Pub").append(TraceConstants.CONTENT_SPLITOR)
.append(time).append(TraceConstants.CONTENT_SPLITOR)
.append("DefaultRegion").append(TraceConstants.CONTENT_SPLITOR)
.append("PID-test").append(TraceConstants.CONTENT_SPLITOR)
.append("topic-test").append(TraceConstants.CONTENT_SPLITOR)
.append("AC1415116D1418B4AAC217FE1B4E0000").append(TraceConstants.CONTENT_SPLITOR)
.append("Tags").append(TraceConstants.CONTENT_SPLITOR)
.append("Keys").append(TraceConstants.CONTENT_SPLITOR)
.append("127.0.0.1:10911").append(TraceConstants.CONTENT_SPLITOR)
.append(26).append(TraceConstants.CONTENT_SPLITOR)
.append(245).append(TraceConstants.CONTENT_SPLITOR)
.append(MessageType.Normal_Msg.ordinal()).append(TraceConstants.CONTENT_SPLITOR)
.append("0A9A002600002A9F0000000000002329").append(TraceConstants.CONTENT_SPLITOR)
.append(true).append(TraceConstants.CONTENT_SPLITOR)
.append(UtilAll.ipToIPv4Str(UtilAll.getIP())).append(TraceConstants.FIELD_SPLITOR)
.toString();
}
@Test
public void testDecoderFromTraceDataString() {
List<TraceContext> contexts = TraceDataEncoder.decoderFromTraceDataString(traceData);
Assert.assertEquals(contexts.size(), 1);
Assert.assertEquals(contexts.get(0).getTraceType(), TraceType.Pub);
}
@Test
public void testEncoderFromContextBean() {
TraceContext context = new TraceContext();
context.setTraceType(TraceType.Pub);
context.setGroupName("PID-test");
context.setRegionId("DefaultRegion");
context.setCostTime(245);
context.setSuccess(true);
context.setTimeStamp(time);
TraceBean traceBean = new TraceBean();
traceBean.setTopic("topic-test");
traceBean.setKeys("Keys");
traceBean.setTags("Tags");
traceBean.setMsgId("AC1415116D1418B4AAC217FE1B4E0000");
traceBean.setOffsetMsgId("0A9A002600002A9F0000000000002329");
traceBean.setStoreHost("127.0.0.1:10911");
traceBean.setStoreTime(time);
traceBean.setMsgType(MessageType.Normal_Msg);
traceBean.setBodyLength(26);
List<TraceBean> traceBeans = new ArrayList<TraceBean>();
traceBeans.add(traceBean);
context.setTraceBeans(traceBeans);
TraceTransferBean traceTransferBean = TraceDataEncoder.encoderFromContextBean(context);
Assert.assertEquals(traceTransferBean.getTransData(), traceData);
Assert.assertEquals(traceTransferBean.getTransKey().size(), 2);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageType;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
public class TraceViewTest {
@Test
public void testDecodeFromTraceTransData() {
String messageBody = new StringBuilder()
.append("Pub").append(TraceConstants.CONTENT_SPLITOR)
.append(System.currentTimeMillis()).append(TraceConstants.CONTENT_SPLITOR)
.append("DefaultRegion").append(TraceConstants.CONTENT_SPLITOR)
.append("PID-test").append(TraceConstants.CONTENT_SPLITOR)
.append("topic-test").append(TraceConstants.CONTENT_SPLITOR)
.append("AC1415116D1418B4AAC217FE1B4E0000").append(TraceConstants.CONTENT_SPLITOR)
.append("Tags").append(TraceConstants.CONTENT_SPLITOR)
.append("Keys").append(TraceConstants.CONTENT_SPLITOR)
.append("127.0.0.1:10911").append(TraceConstants.CONTENT_SPLITOR)
.append(26).append(TraceConstants.CONTENT_SPLITOR)
.append(245).append(TraceConstants.CONTENT_SPLITOR)
.append(MessageType.Normal_Msg.ordinal()).append(TraceConstants.CONTENT_SPLITOR)
.append("0A9A002600002A9F0000000000002329").append(TraceConstants.CONTENT_SPLITOR)
.append(true).append(TraceConstants.CONTENT_SPLITOR)
.append(UtilAll.ipToIPv4Str(UtilAll.getIP())).append(TraceConstants.FIELD_SPLITOR)
.toString();
String key = "AC1415116D1418B4AAC217FE1B4E0000";
List<TraceView> traceViews = TraceView.decodeFromTraceTransData(key, messageBody);
Assert.assertEquals(traceViews.size(), 1);
Assert.assertEquals(traceViews.get(0).getMsgId(), key);
key = "AD4233434334AAC217FEFFD0000";
traceViews = TraceView.decodeFromTraceTransData(key, messageBody);
Assert.assertEquals(traceViews.size(), 0);
}
}
...@@ -59,6 +59,7 @@ import org.apache.rocketmq.tools.command.message.QueryMsgByIdSubCommand; ...@@ -59,6 +59,7 @@ import org.apache.rocketmq.tools.command.message.QueryMsgByIdSubCommand;
import org.apache.rocketmq.tools.command.message.QueryMsgByKeySubCommand; import org.apache.rocketmq.tools.command.message.QueryMsgByKeySubCommand;
import org.apache.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand; import org.apache.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand;
import org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand; import org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand;
import org.apache.rocketmq.tools.command.message.QueryMsgTraceByIdSubCommand;
import org.apache.rocketmq.tools.command.message.SendMessageCommand; import org.apache.rocketmq.tools.command.message.SendMessageCommand;
import org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand; import org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand;
import org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand; import org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand;
...@@ -164,6 +165,7 @@ public class MQAdminStartup { ...@@ -164,6 +165,7 @@ public class MQAdminStartup {
initCommand(new QueryMsgByKeySubCommand()); initCommand(new QueryMsgByKeySubCommand());
initCommand(new QueryMsgByUniqueKeySubCommand()); initCommand(new QueryMsgByUniqueKeySubCommand());
initCommand(new QueryMsgByOffsetSubCommand()); initCommand(new QueryMsgByOffsetSubCommand());
initCommand(new QueryMsgTraceByIdSubCommand());
initCommand(new PrintMessageSubCommand()); initCommand(new PrintMessageSubCommand());
initCommand(new PrintMessageByQueueCommand()); initCommand(new PrintMessageByQueueCommand());
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.message;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.codec.Charsets;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.trace.TraceType;
import org.apache.rocketmq.client.trace.TraceView;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class QueryMsgTraceByIdSubCommand implements SubCommand {
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("i", "msgId", true, "Message Id");
opt.setRequired(true);
options.addOption(opt);
return options;
}
@Override
public String commandDesc() {
return "query a message trace";
}
@Override
public String commandName() {
return "QueryMsgTraceById";
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
final String msgId = commandLine.getOptionValue('i').trim();
this.queryTraceByMsgId(defaultMQAdminExt, msgId);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + "command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
private void queryTraceByMsgId(final DefaultMQAdminExt admin, String msgId)
throws MQClientException, InterruptedException {
admin.start();
QueryResult queryResult = admin.queryMessage(TopicValidator.RMQ_SYS_TRACE_TOPIC, msgId, 64, 0, System.currentTimeMillis());
List<MessageExt> messageList = queryResult.getMessageList();
List<TraceView> traceViews = new ArrayList<>();
for (MessageExt message : messageList) {
List<TraceView> traceView = TraceView.decodeFromTraceTransData(msgId, new String(message.getBody(), Charsets.UTF_8));
traceViews.addAll(traceView);
}
this.printMessageTrace(traceViews);
}
private void printMessageTrace(List<TraceView> traceViews) {
Map<String, List<TraceView>> consumerTraceMap = new HashMap<>(16);
for (TraceView traceView : traceViews) {
if (traceView.getMsgType().equals(TraceType.Pub.name())) {
System.out.printf("%-10s %-20s %-20s %-20s %-10s %-10s%n",
"#Type",
"#ProducerGroup",
"#ClientHost",
"#SendTime",
"#CostTimes",
"#Status"
);
System.out.printf("%-10s %-20s %-20s %-20s %-10s %-10s%n",
"Pub",
traceView.getGroupName(),
traceView.getClientHost(),
DateFormatUtils.format(traceView.getTimeStamp(), "yyyy-MM-dd HH:mm:ss"),
traceView.getCostTime() + "ms",
traceView.getStatus()
);
System.out.printf("\n");
}
if (traceView.getMsgType().equals(TraceType.SubAfter.name())) {
String groupName = traceView.getGroupName();
if (consumerTraceMap.containsKey(groupName)) {
consumerTraceMap.get(groupName).add(traceView);
} else {
ArrayList<TraceView> views = new ArrayList<>();
views.add(traceView);
consumerTraceMap.put(groupName, views);
}
}
}
Iterator<String> consumers = consumerTraceMap.keySet().iterator();
while (consumers.hasNext()) {
System.out.printf("%-10s %-20s %-20s %-20s %-10s %-10s%n",
"#Type",
"#ConsumerGroup",
"#ClientHost",
"#ConsumerTime",
"#CostTimes",
"#Status"
);
List<TraceView> consumerTraces = consumerTraceMap.get(consumers.next());
for (TraceView traceView : consumerTraces) {
System.out.printf("%-10s %-20s %-20s %-20s %-10s %-10s%n",
"Sub",
traceView.getGroupName(),
traceView.getClientHost(),
DateFormatUtils.format(traceView.getTimeStamp(), "yyyy-MM-dd HH:mm:ss"),
traceView.getCostTime() + "ms",
traceView.getStatus()
);
}
System.out.printf("\n");
}
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.message;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Field;
import static org.mockito.Mockito.mock;
public class QueryMsgTraceByIdSubCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
mQClientAPIImpl = mock(MQClientAPIImpl.class);
defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
field.setAccessible(true);
field.set(defaultMQAdminExtImpl, mqClientInstance);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mqClientInstance, mQClientAPIImpl);
field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
field.setAccessible(true);
field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
}
@AfterClass
public static void terminate() {
defaultMQAdminExt.shutdown();
}
@Ignore
@Test
public void testExecute() throws SubCommandException {
System.setProperty("rocketmq.namesrv.addr", "127.0.0.1:9876");
QueryMsgTraceByIdSubCommand cmd = new QueryMsgTraceByIdSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-i AC1FF54E81C418B4AAC24F92E1E00000"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册