提交 e9671cc4 编写于 作者: J Jaskey 提交者: RongtongJin

[ISSUE #2088] Optimize rocketmq client's stats of RT to make sense

Fix issue #2088 , make the log output of RT stat makes sense.
上级 7a359956
...@@ -62,7 +62,7 @@ public class ConsumerStatsManager { ...@@ -62,7 +62,7 @@ public class ConsumerStatsManager {
} }
public void incPullRT(final String group, final String topic, final long rt) { public void incPullRT(final String group, final String topic, final long rt) {
this.topicAndGroupPullRT.addValue(topic + "@" + group, (int) rt, 1); this.topicAndGroupPullRT.addRTValue(topic + "@" + group, (int) rt, 1);
} }
public void incPullTPS(final String group, final String topic, final long msgs) { public void incPullTPS(final String group, final String topic, final long msgs) {
...@@ -70,7 +70,7 @@ public class ConsumerStatsManager { ...@@ -70,7 +70,7 @@ public class ConsumerStatsManager {
} }
public void incConsumeRT(final String group, final String topic, final long rt) { public void incConsumeRT(final String group, final String topic, final long rt) {
this.topicAndGroupConsumeRT.addValue(topic + "@" + group, (int) rt, 1); this.topicAndGroupConsumeRT.addRTValue(topic + "@" + group, (int) rt, 1);
} }
public void incConsumeOKTPS(final String group, final String topic, final long msgs) { public void incConsumeOKTPS(final String group, final String topic, final long msgs) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.stats;
import org.apache.rocketmq.logging.InternalLogger;
import java.util.concurrent.ScheduledExecutorService;
/**
* A StatItem for response time, the only difference between from StatsItem is it has a different log output.
*/
public class RTStatsItem extends StatsItem {
public RTStatsItem(String statsName, String statsKey, ScheduledExecutorService scheduledExecutorService, InternalLogger log) {
super(statsName, statsKey, scheduledExecutorService, log);
}
/**
* For Response Time stat Item, the print detail should be a little different, TPS and SUM makes no sense.
* And we give a name "AVGRT" rather than AVGPT for value getAvgpt()
*/
@Override
protected String statPrintDetail(StatsSnapshot ss) {
return String.format("TIMES: %d AVGRT: %.2f", ss.getTimes(), ss.getAvgpt());
}
}
...@@ -55,13 +55,14 @@ public class StatsItem { ...@@ -55,13 +55,14 @@ public class StatsItem {
double tps = 0; double tps = 0;
double avgpt = 0; double avgpt = 0;
long sum = 0; long sum = 0;
long timesDiff = 0;
if (!csList.isEmpty()) { if (!csList.isEmpty()) {
CallSnapshot first = csList.getFirst(); CallSnapshot first = csList.getFirst();
CallSnapshot last = csList.getLast(); CallSnapshot last = csList.getLast();
sum = last.getValue() - first.getValue(); sum = last.getValue() - first.getValue();
tps = (sum * 1000.0d) / (last.getTimestamp() - first.getTimestamp()); tps = (sum * 1000.0d) / (last.getTimestamp() - first.getTimestamp());
long timesDiff = last.getTimes() - first.getTimes(); timesDiff = last.getTimes() - first.getTimes();
if (timesDiff > 0) { if (timesDiff > 0) {
avgpt = (sum * 1.0d) / timesDiff; avgpt = (sum * 1.0d) / timesDiff;
} }
...@@ -70,6 +71,7 @@ public class StatsItem { ...@@ -70,6 +71,7 @@ public class StatsItem {
statsSnapshot.setSum(sum); statsSnapshot.setSum(sum);
statsSnapshot.setTps(tps); statsSnapshot.setTps(tps);
statsSnapshot.setAvgpt(avgpt); statsSnapshot.setAvgpt(avgpt);
statsSnapshot.setTimes(timesDiff);
} }
return statsSnapshot; return statsSnapshot;
...@@ -191,32 +193,25 @@ public class StatsItem { ...@@ -191,32 +193,25 @@ public class StatsItem {
public void printAtMinutes() { public void printAtMinutes() {
StatsSnapshot ss = computeStatsData(this.csListMinute); StatsSnapshot ss = computeStatsData(this.csListMinute);
log.info(String.format("[%s] [%s] Stats In One Minute, SUM: %d TPS: %.2f AVGPT: %.2f", log.info(String.format("[%s] [%s] Stats In One Minute, ", this.statsName, this.statsKey) + statPrintDetail(ss));
this.statsName,
this.statsKey,
ss.getSum(),
ss.getTps(),
ss.getAvgpt()));
} }
public void printAtHour() { public void printAtHour() {
StatsSnapshot ss = computeStatsData(this.csListHour); StatsSnapshot ss = computeStatsData(this.csListHour);
log.info(String.format("[%s] [%s] Stats In One Hour, SUM: %d TPS: %.2f AVGPT: %.2f", log.info(String.format("[%s] [%s] Stats In One Hour, ", this.statsName, this.statsKey) + statPrintDetail(ss));
this.statsName,
this.statsKey,
ss.getSum(),
ss.getTps(),
ss.getAvgpt()));
} }
public void printAtDay() { public void printAtDay() {
StatsSnapshot ss = computeStatsData(this.csListDay); StatsSnapshot ss = computeStatsData(this.csListDay);
log.info(String.format("[%s] [%s] Stats In One Day, SUM: %d TPS: %.2f AVGPT: %.2f", log.info(String.format("[%s] [%s] Stats In One Day, ", this.statsName, this.statsKey) + statPrintDetail(ss));
this.statsName, }
this.statsKey,
ss.getSum(), protected String statPrintDetail(StatsSnapshot ss) {
ss.getTps(), return String.format("SUM: %d TPS: %.2f AVGPT: %.2f",
ss.getAvgpt())); ss.getSum(),
ss.getTps(),
ss.getAvgpt());
} }
public AtomicLong getValue() { public AtomicLong getValue() {
......
...@@ -158,6 +158,12 @@ public class StatsItemSet { ...@@ -158,6 +158,12 @@ public class StatsItemSet {
statsItem.getTimes().addAndGet(incTimes); statsItem.getTimes().addAndGet(incTimes);
} }
public void addRTValue(final String statsKey, final int incValue, final int incTimes) {
StatsItem statsItem = this.getAndCreateRTStatsItem(statsKey);
statsItem.getValue().addAndGet(incValue);
statsItem.getTimes().addAndGet(incTimes);
}
public void delValue(final String statsKey) { public void delValue(final String statsKey) {
StatsItem statsItem = this.statsItemTable.get(statsKey); StatsItem statsItem = this.statsItemTable.get(statsKey);
if (null != statsItem) { if (null != statsItem) {
...@@ -196,9 +202,21 @@ public class StatsItemSet { ...@@ -196,9 +202,21 @@ public class StatsItemSet {
} }
public StatsItem getAndCreateStatsItem(final String statsKey) { public StatsItem getAndCreateStatsItem(final String statsKey) {
return getAndCreateItem(statsKey, false);
}
public StatsItem getAndCreateRTStatsItem(final String statsKey) {
return getAndCreateItem(statsKey, true);
}
public StatsItem getAndCreateItem(final String statsKey, boolean rtItem) {
StatsItem statsItem = this.statsItemTable.get(statsKey); StatsItem statsItem = this.statsItemTable.get(statsKey);
if (null == statsItem) { if (null == statsItem) {
statsItem = new StatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log); if (rtItem) {
statsItem = new RTStatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log);
} else {
statsItem = new StatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log);
}
StatsItem prev = this.statsItemTable.putIfAbsent(statsKey, statsItem); StatsItem prev = this.statsItemTable.putIfAbsent(statsKey, statsItem);
if (null != prev) { if (null != prev) {
......
...@@ -20,6 +20,8 @@ package org.apache.rocketmq.common.stats; ...@@ -20,6 +20,8 @@ package org.apache.rocketmq.common.stats;
public class StatsSnapshot { public class StatsSnapshot {
private long sum; private long sum;
private double tps; private double tps;
private long times;
private double avgpt; private double avgpt;
public long getSum() { public long getSum() {
...@@ -45,4 +47,12 @@ public class StatsSnapshot { ...@@ -45,4 +47,12 @@ public class StatsSnapshot {
public void setAvgpt(double avgpt) { public void setAvgpt(double avgpt) {
this.avgpt = avgpt; this.avgpt = avgpt;
} }
public long getTimes() {
return times;
}
public void setTimes(long times) {
this.times = times;
}
} }
...@@ -46,14 +46,17 @@ public class StatsItemSetTest { ...@@ -46,14 +46,17 @@ public class StatsItemSetTest {
@Test @Test
public void test_statsOfFirstStatisticsCycle() throws InterruptedException { public void test_statsOfFirstStatisticsCycle() throws InterruptedException {
final StatsItemSet statsItemSet = new StatsItemSet("topicTest", scheduler, null); final String tpsStatKey = "tpsTest";
final String rtStatKey = "rtTest";
final StatsItemSet statsItemSet = new StatsItemSet(tpsStatKey, scheduler, null);
executor = new ThreadPoolExecutor(10, 20, 10, TimeUnit.SECONDS, executor = new ThreadPoolExecutor(10, 20, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(100), new ThreadFactoryImpl("testMultiThread")); new ArrayBlockingQueue<Runnable>(100), new ThreadFactoryImpl("testMultiThread"));
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
executor.submit(new Runnable() { executor.submit(new Runnable() {
@Override @Override
public void run() { public void run() {
statsItemSet.addValue("topicTest", 2, 1); statsItemSet.addValue(tpsStatKey, 2, 1);
statsItemSet.addRTValue(rtStatKey, 2, 1);
} }
}); });
} }
...@@ -63,14 +66,33 @@ public class StatsItemSetTest { ...@@ -63,14 +66,33 @@ public class StatsItemSetTest {
} }
Thread.sleep(1000); Thread.sleep(1000);
} }
// simulate schedule task execution // simulate schedule task execution , tps stat
statsItemSet.getStatsItem("topicTest").samplingInSeconds(); {
statsItemSet.getStatsItem("topicTest").samplingInMinutes(); statsItemSet.getStatsItem(tpsStatKey).samplingInSeconds();
statsItemSet.getStatsItem("topicTest").samplingInHour(); statsItemSet.getStatsItem(tpsStatKey).samplingInMinutes();
statsItemSet.getStatsItem(tpsStatKey).samplingInHour();
assertEquals(20L, statsItemSet.getStatsDataInMinute(tpsStatKey).getSum());
assertEquals(20L, statsItemSet.getStatsDataInHour(tpsStatKey).getSum());
assertEquals(20L, statsItemSet.getStatsDataInDay(tpsStatKey).getSum());
assertEquals(10L, statsItemSet.getStatsDataInDay(tpsStatKey).getTimes());
assertEquals(10L, statsItemSet.getStatsDataInHour(tpsStatKey).getTimes());
assertEquals(10L, statsItemSet.getStatsDataInDay(tpsStatKey).getTimes());
}
// simulate schedule task execution , rt stat
{
statsItemSet.getStatsItem(rtStatKey).samplingInSeconds();
statsItemSet.getStatsItem(rtStatKey).samplingInMinutes();
statsItemSet.getStatsItem(rtStatKey).samplingInHour();
assertEquals(20L, statsItemSet.getStatsDataInMinute("topicTest").getSum()); assertEquals(20L, statsItemSet.getStatsDataInMinute(rtStatKey).getSum());
assertEquals(20L, statsItemSet.getStatsDataInHour("topicTest").getSum()); assertEquals(20L, statsItemSet.getStatsDataInHour(rtStatKey).getSum());
assertEquals(20L, statsItemSet.getStatsDataInDay("topicTest").getSum()); assertEquals(20L, statsItemSet.getStatsDataInDay(rtStatKey).getSum());
assertEquals(10L, statsItemSet.getStatsDataInDay(rtStatKey).getTimes());
assertEquals(10L, statsItemSet.getStatsDataInHour(rtStatKey).getTimes());
assertEquals(10L, statsItemSet.getStatsDataInDay(rtStatKey).getTimes());
}
} }
private AtomicLong test_unit() throws InterruptedException { private AtomicLong test_unit() throws InterruptedException {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册