提交 54b01223 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

Aggregate worker will hold the metrics while there is no new metrics in. (#3100)

上级 8faa3564
......@@ -19,7 +19,7 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.*;
......@@ -40,15 +40,10 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
private AbstractWorker<Metrics> nextWorker;
private final DataCarrier<Metrics> dataCarrier;
private final MergeDataCache<Metrics> mergeDataCache;
private final String modelName;
private CounterMetrics aggregationCounter;
private final long l2AggregationSendCycle;
private long lastSendTimestamp;
MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker<Metrics> nextWorker,
String modelName) {
MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker<Metrics> nextWorker, String modelName) {
super(moduleDefineHolder);
this.modelName = modelName;
this.nextWorker = nextWorker;
this.mergeDataCache = new MergeDataCache<>();
String name = "METRICS_L1_AGGREGATION";
......@@ -65,9 +60,6 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
MetricsCreator metricsCreator = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
aggregationCounter = metricsCreator.createCounter("metrics_aggregation", "The number of rows in aggregation",
new MetricsTag.Keys("metricName", "level", "dimensionality"), new MetricsTag.Values(modelName, "1", "min"));
lastSendTimestamp = System.currentTimeMillis();
l2AggregationSendCycle = EnvUtil.getLong("METRICS_L1_AGGREGATION_SEND_CYCLE", 1000);
}
@Override public final void in(Metrics metrics) {
......@@ -80,20 +72,8 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
aggregate(metrics);
if (metrics.getEndOfBatchContext().isEndOfBatch()) {
if (shouldSend()) {
sendToNext();
}
}
}
private boolean shouldSend() {
long now = System.currentTimeMillis();
// Continue L2 aggregation in certain cycle.
if (now - lastSendTimestamp > l2AggregationSendCycle) {
lastSendTimestamp = now;
return true;
sendToNext();
}
return false;
}
private void sendToNext() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册