提交 61258bf8 编写于 作者: D duhenglucky

Add StatsItem print logic

上级 9297d560
...@@ -162,14 +162,11 @@ public class StatsItemSet { ...@@ -162,14 +162,11 @@ public class StatsItemSet {
StatsItem statsItem = this.statsItemTable.get(statsKey); StatsItem statsItem = this.statsItemTable.get(statsKey);
if (null == statsItem) { if (null == statsItem) {
statsItem = new StatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log); statsItem = new StatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log);
StatsItem prev = this.statsItemTable.put(statsKey, statsItem); StatsItem prev = this.statsItemTable.putIfAbsent(statsKey, statsItem);
if (null != prev) {
if (null == prev) { statsItem = prev;
// statsItem.init();
} }
} }
return statsItem; return statsItem;
} }
......
...@@ -16,38 +16,38 @@ ...@@ -16,38 +16,38 @@
*/ */
package org.apache.rocketmq.snode.service.impl; package org.apache.rocketmq.snode.service.impl;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter; import io.prometheus.client.Counter;
import io.prometheus.client.Summary; import io.prometheus.client.Summary;
import io.prometheus.client.exporter.HTTPServer; import io.prometheus.client.exporter.HTTPServer;
import io.prometheus.client.hotspot.DefaultExports; import io.prometheus.client.hotspot.DefaultExports;
import java.util.LinkedList; import java.util.concurrent.Executors;
import java.util.Map; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.utils.CallSnapshot; import org.apache.rocketmq.common.stats.StatsItemSet;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.snode.exception.SnodeException; import org.apache.rocketmq.snode.exception.SnodeException;
import org.apache.rocketmq.snode.service.MetricsService; import org.apache.rocketmq.snode.service.MetricsService;
public class MetricsServiceImpl implements MetricsService { public class MetricsServiceImpl implements MetricsService {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.ROCKETMQ_STATS_LOGGER_NAME);
private HTTPServer server; public MetricsServiceImpl() {
}
private final Map<Integer, LinkedList<CallSnapshot>> requestTimesList = new ConcurrentHashMap<>(); private HTTPServer server;
private final Map<String, LinkedList<CallSnapshot>> topicSizeList = new ConcurrentHashMap<>(); private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "SNodeStatsScheduledThread");
}
});
// private Map<Integer, AtomicLong> requestTotalMap = new ConcurrentHashMap<>(512); StatsItemSet statsItemSet = new StatsItemSet("SnodeStats", scheduledExecutorService, log);
//
// private Map<Integer, AtomicLong> requestFailedTotalMap = new ConcurrentHashMap<>(512);
//
// private Map<String, AtomicLong> produceSizeTotal = new ConcurrentHashMap<>(512);
//
// private Map<String, AtomicLong> consumeSizeTotal = new ConcurrentHashMap<>(512);
private final Counter requestTotal = Counter.build().name("request_total").help("request total count").labelNames("requestCode").register(); private final Counter requestTotal = Counter.build().name("request_total").help("request total count").labelNames("requestCode").register();
...@@ -66,30 +66,25 @@ public class MetricsServiceImpl implements MetricsService { ...@@ -66,30 +66,25 @@ public class MetricsServiceImpl implements MetricsService {
.labelNames("topic") .labelNames("topic")
.name("sent_topic_size_bytes").help("Request size in bytes.").register(); .name("sent_topic_size_bytes").help("Request size in bytes.").register();
// public Double getLabelsValue(String labelValue) { public Double getLabelsValue(String labelValue) {
// return CollectorRegistry.defaultRegistry.getSampleValue("request_total", new String[] {"requestCode"}, new String[] {labelValue}); return CollectorRegistry.defaultRegistry.getSampleValue("request_total", new String[] {"requestCode"}, new String[] {labelValue});
// }
private AtomicLong getValue(ConcurrentHashMap map, Integer key) {
AtomicLong value = (AtomicLong) map.get(key);
if (value == null) {
value = (AtomicLong) map.putIfAbsent(key, new AtomicLong(0));
}
return value;
} }
@Override @Override
synchronized public void incRequestCount(int requestCode, boolean success) { synchronized public void incRequestCount(int requestCode, boolean success) {
if (!success) { if (!success) {
this.requestFailedTotal.labels(requestCode + "").inc(); this.requestFailedTotal.labels(requestCode + "").inc();
this.statsItemSet.addValue("TotalFailed@" + requestCode, 1, 1);
} else { } else {
this.requestTotal.labels(requestCode + "").inc(); this.requestTotal.labels(requestCode + "").inc();
this.statsItemSet.addValue("Total@" + requestCode, 1, 1);
} }
} }
@Override @Override
synchronized public void recordRequestSize(String topic, double size) { synchronized public void recordRequestSize(String topic, double size) {
this.receivedBytes.labels(topic).observe(size); this.receivedBytes.labels(topic).observe(size);
this.statsItemSet.addValue("TotalSize@" + topic, new Double(size).intValue(), 1);
} }
@Override @Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册