From e9671cc4f25cd7596b857fb125bdddbe76430ba6 Mon Sep 17 00:00:00 2001 From: Jaskey Date: Tue, 23 Jun 2020 10:02:40 +0800 Subject: [PATCH] [ISSUE #2088] Optimize rocketmq client's stats of RT to make sense Fix issue #2088 , make the log output of RT stat makes sense. --- .../client/stat/ConsumerStatsManager.java | 4 +- .../rocketmq/common/stats/RTStatsItem.java | 41 +++++++++++++++++++ .../rocketmq/common/stats/StatsItem.java | 33 +++++++-------- .../rocketmq/common/stats/StatsItemSet.java | 20 ++++++++- .../rocketmq/common/stats/StatsSnapshot.java | 10 +++++ .../common/stats/StatsItemSetTest.java | 40 ++++++++++++++---- 6 files changed, 117 insertions(+), 31 deletions(-) create mode 100644 common/src/main/java/org/apache/rocketmq/common/stats/RTStatsItem.java diff --git a/client/src/main/java/org/apache/rocketmq/client/stat/ConsumerStatsManager.java b/client/src/main/java/org/apache/rocketmq/client/stat/ConsumerStatsManager.java index cf347b4d..ba4773ae 100644 --- a/client/src/main/java/org/apache/rocketmq/client/stat/ConsumerStatsManager.java +++ b/client/src/main/java/org/apache/rocketmq/client/stat/ConsumerStatsManager.java @@ -62,7 +62,7 @@ public class ConsumerStatsManager { } public void incPullRT(final String group, final String topic, final long rt) { - this.topicAndGroupPullRT.addValue(topic + "@" + group, (int) rt, 1); + this.topicAndGroupPullRT.addRTValue(topic + "@" + group, (int) rt, 1); } public void incPullTPS(final String group, final String topic, final long msgs) { @@ -70,7 +70,7 @@ public class ConsumerStatsManager { } public void incConsumeRT(final String group, final String topic, final long rt) { - this.topicAndGroupConsumeRT.addValue(topic + "@" + group, (int) rt, 1); + this.topicAndGroupConsumeRT.addRTValue(topic + "@" + group, (int) rt, 1); } public void incConsumeOKTPS(final String group, final String topic, final long msgs) { diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/RTStatsItem.java b/common/src/main/java/org/apache/rocketmq/common/stats/RTStatsItem.java new file mode 100644 index 00000000..102148cd --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/stats/RTStatsItem.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common.stats; + +import org.apache.rocketmq.logging.InternalLogger; + +import java.util.concurrent.ScheduledExecutorService; + +/** + * A StatItem for response time, the only difference between from StatsItem is it has a different log output. + */ +public class RTStatsItem extends StatsItem { + + public RTStatsItem(String statsName, String statsKey, ScheduledExecutorService scheduledExecutorService, InternalLogger log) { + super(statsName, statsKey, scheduledExecutorService, log); + } + + /** + * For Response Time stat Item, the print detail should be a little different, TPS and SUM makes no sense. + * And we give a name "AVGRT" rather than AVGPT for value getAvgpt() + */ + @Override + protected String statPrintDetail(StatsSnapshot ss) { + return String.format("TIMES: %d AVGRT: %.2f", ss.getTimes(), ss.getAvgpt()); + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java index 6304ea2c..b078551a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java @@ -55,13 +55,14 @@ public class StatsItem { double tps = 0; double avgpt = 0; long sum = 0; + long timesDiff = 0; if (!csList.isEmpty()) { CallSnapshot first = csList.getFirst(); CallSnapshot last = csList.getLast(); sum = last.getValue() - first.getValue(); tps = (sum * 1000.0d) / (last.getTimestamp() - first.getTimestamp()); - long timesDiff = last.getTimes() - first.getTimes(); + timesDiff = last.getTimes() - first.getTimes(); if (timesDiff > 0) { avgpt = (sum * 1.0d) / timesDiff; } @@ -70,6 +71,7 @@ public class StatsItem { statsSnapshot.setSum(sum); statsSnapshot.setTps(tps); statsSnapshot.setAvgpt(avgpt); + statsSnapshot.setTimes(timesDiff); } return statsSnapshot; @@ -191,32 +193,25 @@ public class StatsItem { public void printAtMinutes() { StatsSnapshot ss = computeStatsData(this.csListMinute); - log.info(String.format("[%s] [%s] Stats In One Minute, SUM: %d TPS: %.2f AVGPT: %.2f", - this.statsName, - this.statsKey, - ss.getSum(), - ss.getTps(), - ss.getAvgpt())); + log.info(String.format("[%s] [%s] Stats In One Minute, ", this.statsName, this.statsKey) + statPrintDetail(ss)); } public void printAtHour() { StatsSnapshot ss = computeStatsData(this.csListHour); - log.info(String.format("[%s] [%s] Stats In One Hour, SUM: %d TPS: %.2f AVGPT: %.2f", - this.statsName, - this.statsKey, - ss.getSum(), - ss.getTps(), - ss.getAvgpt())); + log.info(String.format("[%s] [%s] Stats In One Hour, ", this.statsName, this.statsKey) + statPrintDetail(ss)); + } public void printAtDay() { StatsSnapshot ss = computeStatsData(this.csListDay); - log.info(String.format("[%s] [%s] Stats In One Day, SUM: %d TPS: %.2f AVGPT: %.2f", - this.statsName, - this.statsKey, - ss.getSum(), - ss.getTps(), - ss.getAvgpt())); + log.info(String.format("[%s] [%s] Stats In One Day, ", this.statsName, this.statsKey) + statPrintDetail(ss)); + } + + protected String statPrintDetail(StatsSnapshot ss) { + return String.format("SUM: %d TPS: %.2f AVGPT: %.2f", + ss.getSum(), + ss.getTps(), + ss.getAvgpt()); } public AtomicLong getValue() { diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java index bcf96659..a28d008d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java @@ -158,6 +158,12 @@ public class StatsItemSet { statsItem.getTimes().addAndGet(incTimes); } + public void addRTValue(final String statsKey, final int incValue, final int incTimes) { + StatsItem statsItem = this.getAndCreateRTStatsItem(statsKey); + statsItem.getValue().addAndGet(incValue); + statsItem.getTimes().addAndGet(incTimes); + } + public void delValue(final String statsKey) { StatsItem statsItem = this.statsItemTable.get(statsKey); if (null != statsItem) { @@ -196,9 +202,21 @@ public class StatsItemSet { } public StatsItem getAndCreateStatsItem(final String statsKey) { + return getAndCreateItem(statsKey, false); + } + + public StatsItem getAndCreateRTStatsItem(final String statsKey) { + return getAndCreateItem(statsKey, true); + } + + public StatsItem getAndCreateItem(final String statsKey, boolean rtItem) { StatsItem statsItem = this.statsItemTable.get(statsKey); if (null == statsItem) { - statsItem = new StatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log); + if (rtItem) { + statsItem = new RTStatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log); + } else { + statsItem = new StatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log); + } StatsItem prev = this.statsItemTable.putIfAbsent(statsKey, statsItem); if (null != prev) { diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsSnapshot.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsSnapshot.java index 136f21a7..0cecce99 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsSnapshot.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsSnapshot.java @@ -20,6 +20,8 @@ package org.apache.rocketmq.common.stats; public class StatsSnapshot { private long sum; private double tps; + + private long times; private double avgpt; public long getSum() { @@ -45,4 +47,12 @@ public class StatsSnapshot { public void setAvgpt(double avgpt) { this.avgpt = avgpt; } + + public long getTimes() { + return times; + } + + public void setTimes(long times) { + this.times = times; + } } diff --git a/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java b/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java index 4b4a8676..5b4c5d82 100644 --- a/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java @@ -46,14 +46,17 @@ public class StatsItemSetTest { @Test public void test_statsOfFirstStatisticsCycle() throws InterruptedException { - final StatsItemSet statsItemSet = new StatsItemSet("topicTest", scheduler, null); + final String tpsStatKey = "tpsTest"; + final String rtStatKey = "rtTest"; + final StatsItemSet statsItemSet = new StatsItemSet(tpsStatKey, scheduler, null); executor = new ThreadPoolExecutor(10, 20, 10, TimeUnit.SECONDS, new ArrayBlockingQueue(100), new ThreadFactoryImpl("testMultiThread")); for (int i = 0; i < 10; i++) { executor.submit(new Runnable() { @Override public void run() { - statsItemSet.addValue("topicTest", 2, 1); + statsItemSet.addValue(tpsStatKey, 2, 1); + statsItemSet.addRTValue(rtStatKey, 2, 1); } }); } @@ -63,14 +66,33 @@ public class StatsItemSetTest { } Thread.sleep(1000); } - // simulate schedule task execution - statsItemSet.getStatsItem("topicTest").samplingInSeconds(); - statsItemSet.getStatsItem("topicTest").samplingInMinutes(); - statsItemSet.getStatsItem("topicTest").samplingInHour(); + // simulate schedule task execution , tps stat + { + statsItemSet.getStatsItem(tpsStatKey).samplingInSeconds(); + statsItemSet.getStatsItem(tpsStatKey).samplingInMinutes(); + statsItemSet.getStatsItem(tpsStatKey).samplingInHour(); + + assertEquals(20L, statsItemSet.getStatsDataInMinute(tpsStatKey).getSum()); + assertEquals(20L, statsItemSet.getStatsDataInHour(tpsStatKey).getSum()); + assertEquals(20L, statsItemSet.getStatsDataInDay(tpsStatKey).getSum()); + assertEquals(10L, statsItemSet.getStatsDataInDay(tpsStatKey).getTimes()); + assertEquals(10L, statsItemSet.getStatsDataInHour(tpsStatKey).getTimes()); + assertEquals(10L, statsItemSet.getStatsDataInDay(tpsStatKey).getTimes()); + } + + // simulate schedule task execution , rt stat + { + statsItemSet.getStatsItem(rtStatKey).samplingInSeconds(); + statsItemSet.getStatsItem(rtStatKey).samplingInMinutes(); + statsItemSet.getStatsItem(rtStatKey).samplingInHour(); - assertEquals(20L, statsItemSet.getStatsDataInMinute("topicTest").getSum()); - assertEquals(20L, statsItemSet.getStatsDataInHour("topicTest").getSum()); - assertEquals(20L, statsItemSet.getStatsDataInDay("topicTest").getSum()); + assertEquals(20L, statsItemSet.getStatsDataInMinute(rtStatKey).getSum()); + assertEquals(20L, statsItemSet.getStatsDataInHour(rtStatKey).getSum()); + assertEquals(20L, statsItemSet.getStatsDataInDay(rtStatKey).getSum()); + assertEquals(10L, statsItemSet.getStatsDataInDay(rtStatKey).getTimes()); + assertEquals(10L, statsItemSet.getStatsDataInHour(rtStatKey).getTimes()); + assertEquals(10L, statsItemSet.getStatsDataInDay(rtStatKey).getTimes()); + } } private AtomicLong test_unit() throws InterruptedException { -- GitLab