diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java index 9a0caaa112d038e4325ca6d427aaee9df0058328..19d515169a6acd1d108161645e392488859d5814 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java @@ -162,14 +162,11 @@ public class StatsItemSet { StatsItem statsItem = this.statsItemTable.get(statsKey); if (null == statsItem) { statsItem = new StatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log); - StatsItem prev = this.statsItemTable.put(statsKey, statsItem); - - if (null == prev) { - - // statsItem.init(); + StatsItem prev = this.statsItemTable.putIfAbsent(statsKey, statsItem); + if (null != prev) { + statsItem = prev; } } - return statsItem; } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/MetricsServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/MetricsServiceImpl.java index 50f2c67d4c85f2a60b2f2d4ebe0665b492e5ff19..84651fef7ec62eb335be69cd561e89ff48c21718 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/MetricsServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/MetricsServiceImpl.java @@ -16,38 +16,38 @@ */ package org.apache.rocketmq.snode.service.impl; +import io.prometheus.client.CollectorRegistry; import io.prometheus.client.Counter; import io.prometheus.client.Summary; import io.prometheus.client.exporter.HTTPServer; import io.prometheus.client.hotspot.DefaultExports; -import java.util.LinkedList; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.utils.CallSnapshot; +import org.apache.rocketmq.common.stats.StatsItemSet; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.snode.exception.SnodeException; import org.apache.rocketmq.snode.service.MetricsService; public class MetricsServiceImpl implements MetricsService { - private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.ROCKETMQ_STATS_LOGGER_NAME); - private HTTPServer server; + public MetricsServiceImpl() { + } - private final Map> requestTimesList = new ConcurrentHashMap<>(); + private HTTPServer server; - private final Map> topicSizeList = new ConcurrentHashMap<>(); + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "SNodeStatsScheduledThread"); + } + }); -// private Map requestTotalMap = new ConcurrentHashMap<>(512); -// -// private Map requestFailedTotalMap = new ConcurrentHashMap<>(512); -// -// private Map produceSizeTotal = new ConcurrentHashMap<>(512); -// -// private Map consumeSizeTotal = new ConcurrentHashMap<>(512); + StatsItemSet statsItemSet = new StatsItemSet("SnodeStats", scheduledExecutorService, log); private final Counter requestTotal = Counter.build().name("request_total").help("request total count").labelNames("requestCode").register(); @@ -66,30 +66,25 @@ public class MetricsServiceImpl implements MetricsService { .labelNames("topic") .name("sent_topic_size_bytes").help("Request size in bytes.").register(); -// public Double getLabelsValue(String labelValue) { -// return CollectorRegistry.defaultRegistry.getSampleValue("request_total", new String[] {"requestCode"}, new String[] {labelValue}); -// } - - private AtomicLong getValue(ConcurrentHashMap map, Integer key) { - AtomicLong value = (AtomicLong) map.get(key); - if (value == null) { - value = (AtomicLong) map.putIfAbsent(key, new AtomicLong(0)); - } - return value; + public Double getLabelsValue(String labelValue) { + return CollectorRegistry.defaultRegistry.getSampleValue("request_total", new String[] {"requestCode"}, new String[] {labelValue}); } @Override synchronized public void incRequestCount(int requestCode, boolean success) { if (!success) { this.requestFailedTotal.labels(requestCode + "").inc(); + this.statsItemSet.addValue("TotalFailed@" + requestCode, 1, 1); } else { this.requestTotal.labels(requestCode + "").inc(); + this.statsItemSet.addValue("Total@" + requestCode, 1, 1); } } @Override - synchronized public void recordRequestSize(String topic, double size) { + synchronized public void recordRequestSize(String topic, double size) { this.receivedBytes.labels(topic).observe(size); + this.statsItemSet.addValue("TotalSize@" + topic, new Double(size).intValue(), 1); } @Override