diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java index 35fd26010ae3d94660816ed2e2d73619e31963fb..f341362c3f15390c2936136f59988ab47100687b 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java @@ -16,10 +16,6 @@ */ package org.apache.rocketmq.tools.command.consumer; -import java.util.Collections; -import java.util.Date; -import java.util.LinkedList; -import java.util.List; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; @@ -30,7 +26,9 @@ import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.OffsetWrapper; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.body.Connection; import org.apache.rocketmq.common.protocol.body.ConsumerConnection; +import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; @@ -40,6 +38,13 @@ import org.apache.rocketmq.tools.command.SubCommand; import org.apache.rocketmq.tools.command.SubCommandException; import org.slf4j.Logger; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + public class ConsumerProgressSubCommand implements SubCommand { private final Logger log = ClientLogger.getLog(); @@ -62,6 +67,24 @@ public class ConsumerProgressSubCommand implements SubCommand { return options; } + private Map getMessageQueueAllocationResult(DefaultMQAdminExt defaultMQAdminExt, + String groupName) { + Map results = new HashMap<>(); + try { + ConsumerConnection consumerConnection = defaultMQAdminExt.examineConsumerConnectionInfo(groupName); + for (Connection connection : consumerConnection.getConnectionSet()) { + String clientId = connection.getClientId(); + ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExt.getConsumerRunningInfo(groupName, clientId, + false); + for (MessageQueue messageQueue : consumerRunningInfo.getMqTable().keySet()) { + results.put(messageQueue, clientId.split("@")[0]); + } + } + } catch (Exception ignore) { + } + return results; + } + @Override public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); @@ -75,13 +98,14 @@ public class ConsumerProgressSubCommand implements SubCommand { List mqList = new LinkedList(); mqList.addAll(consumeStats.getOffsetTable().keySet()); Collections.sort(mqList); - - System.out.printf("%-32s %-32s %-4s %-20s %-20s %-20s %s%n", + Map messageQueueAllocationResult = getMessageQueueAllocationResult(defaultMQAdminExt, consumerGroup); + System.out.printf("%-32s %-32s %-4s %-20s %-20s %-20s %-20s %s%n", "#Topic", "#Broker Name", "#QID", "#Broker Offset", "#Consumer Offset", + "#Client IP", "#Diff", "#LastTime"); @@ -95,12 +119,15 @@ public class ConsumerProgressSubCommand implements SubCommand { lastTime = UtilAll.formatDate(new Date(offsetWrapper.getLastTimestamp()), UtilAll.YYYY_MM_DD_HH_MM_SS); } catch (Exception e) { } - System.out.printf("%-32s %-32s %-4d %-20d %-20d %-20d %s%n", + + String clientIP = messageQueueAllocationResult.get(mq); + System.out.printf("%-32s %-32s %-4d %-20d %-20d %-20s %-20d %s%n", UtilAll.frontStringAtLeast(mq.getTopic(), 32), UtilAll.frontStringAtLeast(mq.getBrokerName(), 32), mq.getQueueId(), offsetWrapper.getBrokerOffset(), offsetWrapper.getConsumerOffset(), + null != clientIP ? clientIP : "NA", diff, lastTime );