提交 45a64fd6 编写于 作者: Z Zhanhui Li

Include client IP per message queue of consumer progress command output

上级 ab013861
......@@ -16,10 +16,6 @@
*/
package org.apache.rocketmq.tools.command.consumer;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
......@@ -30,7 +26,9 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
......@@ -40,6 +38,13 @@ import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.slf4j.Logger;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
public class ConsumerProgressSubCommand implements SubCommand {
private final Logger log = ClientLogger.getLog();
......@@ -62,6 +67,24 @@ public class ConsumerProgressSubCommand implements SubCommand {
return options;
}
private Map<MessageQueue, String> getMessageQueueAllocationResult(DefaultMQAdminExt defaultMQAdminExt,
String groupName) {
Map<MessageQueue, String> results = new HashMap<>();
try {
ConsumerConnection consumerConnection = defaultMQAdminExt.examineConsumerConnectionInfo(groupName);
for (Connection connection : consumerConnection.getConnectionSet()) {
String clientId = connection.getClientId();
ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExt.getConsumerRunningInfo(groupName, clientId,
false);
for (MessageQueue messageQueue : consumerRunningInfo.getMqTable().keySet()) {
results.put(messageQueue, clientId.split("@")[0]);
}
}
} catch (Exception ignore) {
}
return results;
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
......@@ -75,13 +98,14 @@ public class ConsumerProgressSubCommand implements SubCommand {
List<MessageQueue> mqList = new LinkedList<MessageQueue>();
mqList.addAll(consumeStats.getOffsetTable().keySet());
Collections.sort(mqList);
System.out.printf("%-32s %-32s %-4s %-20s %-20s %-20s %s%n",
Map<MessageQueue, String> messageQueueAllocationResult = getMessageQueueAllocationResult(defaultMQAdminExt, consumerGroup);
System.out.printf("%-32s %-32s %-4s %-20s %-20s %-20s %-20s %s%n",
"#Topic",
"#Broker Name",
"#QID",
"#Broker Offset",
"#Consumer Offset",
"#Client IP",
"#Diff",
"#LastTime");
......@@ -95,12 +119,15 @@ public class ConsumerProgressSubCommand implements SubCommand {
lastTime = UtilAll.formatDate(new Date(offsetWrapper.getLastTimestamp()), UtilAll.YYYY_MM_DD_HH_MM_SS);
} catch (Exception e) {
}
System.out.printf("%-32s %-32s %-4d %-20d %-20d %-20d %s%n",
String clientIP = messageQueueAllocationResult.get(mq);
System.out.printf("%-32s %-32s %-4d %-20d %-20d %-20s %-20d %s%n",
UtilAll.frontStringAtLeast(mq.getTopic(), 32),
UtilAll.frontStringAtLeast(mq.getBrokerName(), 32),
mq.getQueueId(),
offsetWrapper.getBrokerOffset(),
offsetWrapper.getConsumerOffset(),
null != clientIP ? clientIP : "NA",
diff,
lastTime
);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册