From 45a64fd6a8ab66a8b0eb30cbe2dffd59b19274f9 Mon Sep 17 00:00:00 2001
From: Zhanhui Li <lizhanhui@apache.org>
Date: Wed, 29 Mar 2017 21:50:59 +0800
Subject: [PATCH] Include client IP per message queue of consumer progress
 command output

---
 .../consumer/ConsumerProgressSubCommand.java  | 41 +++++++++++++++----
 1 file changed, 34 insertions(+), 7 deletions(-)

diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
index 35fd2601..f341362c 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
@@ -16,10 +16,6 @@
  */
 package org.apache.rocketmq.tools.command.consumer;
 
-import java.util.Collections;
-import java.util.Date;
-import java.util.LinkedList;
-import java.util.List;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
@@ -30,7 +26,9 @@ import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.admin.ConsumeStats;
 import org.apache.rocketmq.common.admin.OffsetWrapper;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.Connection;
 import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.common.protocol.body.TopicList;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
@@ -40,6 +38,13 @@ import org.apache.rocketmq.tools.command.SubCommand;
 import org.apache.rocketmq.tools.command.SubCommandException;
 import org.slf4j.Logger;
 
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
 public class ConsumerProgressSubCommand implements SubCommand {
     private final Logger log = ClientLogger.getLog();
 
@@ -62,6 +67,24 @@ public class ConsumerProgressSubCommand implements SubCommand {
         return options;
     }
 
+    private Map<MessageQueue, String> getMessageQueueAllocationResult(DefaultMQAdminExt defaultMQAdminExt,
+        String groupName) {
+        Map<MessageQueue, String> results = new HashMap<>();
+        try {
+            ConsumerConnection consumerConnection = defaultMQAdminExt.examineConsumerConnectionInfo(groupName);
+            for (Connection connection : consumerConnection.getConnectionSet()) {
+                String clientId = connection.getClientId();
+                ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExt.getConsumerRunningInfo(groupName, clientId,
+                    false);
+                for (MessageQueue messageQueue : consumerRunningInfo.getMqTable().keySet()) {
+                    results.put(messageQueue, clientId.split("@")[0]);
+                }
+            }
+        } catch (Exception ignore) {
+        }
+        return results;
+    }
+
     @Override
     public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
         DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
@@ -75,13 +98,14 @@ public class ConsumerProgressSubCommand implements SubCommand {
                 List<MessageQueue> mqList = new LinkedList<MessageQueue>();
                 mqList.addAll(consumeStats.getOffsetTable().keySet());
                 Collections.sort(mqList);
-
-                System.out.printf("%-32s  %-32s  %-4s  %-20s  %-20s  %-20s  %s%n",
+                Map<MessageQueue, String> messageQueueAllocationResult = getMessageQueueAllocationResult(defaultMQAdminExt, consumerGroup);
+                System.out.printf("%-32s  %-32s  %-4s  %-20s  %-20s  %-20s %-20s  %s%n",
                     "#Topic",
                     "#Broker Name",
                     "#QID",
                     "#Broker Offset",
                     "#Consumer Offset",
+                    "#Client IP",
                     "#Diff",
                     "#LastTime");
 
@@ -95,12 +119,15 @@ public class ConsumerProgressSubCommand implements SubCommand {
                         lastTime = UtilAll.formatDate(new Date(offsetWrapper.getLastTimestamp()), UtilAll.YYYY_MM_DD_HH_MM_SS);
                     } catch (Exception e) {
                     }
-                    System.out.printf("%-32s  %-32s  %-4d  %-20d  %-20d  %-20d  %s%n",
+
+                    String clientIP = messageQueueAllocationResult.get(mq);
+                    System.out.printf("%-32s  %-32s  %-4d  %-20d  %-20d  %-20s %-20d  %s%n",
                         UtilAll.frontStringAtLeast(mq.getTopic(), 32),
                         UtilAll.frontStringAtLeast(mq.getBrokerName(), 32),
                         mq.getQueueId(),
                         offsetWrapper.getBrokerOffset(),
                         offsetWrapper.getConsumerOffset(),
+                        null != clientIP ? clientIP : "NA",
                         diff,
                         lastTime
                     );
-- 
GitLab