From 54b01223de49cf56dd8cb174b5a199b595112e0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BD=AD=E5=8B=87=E5=8D=87=20pengys?= Date: Wed, 17 Jul 2019 13:59:54 +0800 Subject: [PATCH] Aggregate worker will hold the metrics while there is no new metrics in. (#3100) --- .../worker/MetricsAggregateWorker.java | 26 +++---------------- 1 file changed, 3 insertions(+), 23 deletions(-) diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java index 7dffea3e4..59c75bb59 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java @@ -19,7 +19,7 @@ package org.apache.skywalking.oap.server.core.analysis.worker; import java.util.*; -import org.apache.skywalking.apm.commons.datacarrier.*; +import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; import org.apache.skywalking.apm.commons.datacarrier.consumer.*; import org.apache.skywalking.oap.server.core.UnexpectedException; import org.apache.skywalking.oap.server.core.analysis.data.*; @@ -40,15 +40,10 @@ public class MetricsAggregateWorker extends AbstractWorker { private AbstractWorker nextWorker; private final DataCarrier dataCarrier; private final MergeDataCache mergeDataCache; - private final String modelName; private CounterMetrics aggregationCounter; - private final long l2AggregationSendCycle; - private long lastSendTimestamp; - MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker nextWorker, - String modelName) { + MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker nextWorker, String modelName) { super(moduleDefineHolder); - this.modelName = modelName; this.nextWorker = nextWorker; this.mergeDataCache = new MergeDataCache<>(); String name = "METRICS_L1_AGGREGATION"; @@ -65,9 +60,6 @@ public class MetricsAggregateWorker extends AbstractWorker { MetricsCreator metricsCreator = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class); aggregationCounter = metricsCreator.createCounter("metrics_aggregation", "The number of rows in aggregation", new MetricsTag.Keys("metricName", "level", "dimensionality"), new MetricsTag.Values(modelName, "1", "min")); - lastSendTimestamp = System.currentTimeMillis(); - - l2AggregationSendCycle = EnvUtil.getLong("METRICS_L1_AGGREGATION_SEND_CYCLE", 1000); } @Override public final void in(Metrics metrics) { @@ -80,20 +72,8 @@ public class MetricsAggregateWorker extends AbstractWorker { aggregate(metrics); if (metrics.getEndOfBatchContext().isEndOfBatch()) { - if (shouldSend()) { - sendToNext(); - } - } - } - - private boolean shouldSend() { - long now = System.currentTimeMillis(); - // Continue L2 aggregation in certain cycle. - if (now - lastSendTimestamp > l2AggregationSendCycle) { - lastSendTimestamp = now; - return true; + sendToNext(); } - return false; } private void sendToNext() { -- GitLab