未验证 提交 6d389dc1 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Optimize aggregation in L1 and L2. (#4735)

上级 d97af96c
......@@ -50,6 +50,11 @@ public class ReadWriteSafeCache<T> {
lock = new ReentrantLock();
}
/**
* Write the into the {@link #writeBufferPointer} buffer.
*
* @param data to enqueue.
*/
public void write(T data) {
lock.lock();
try {
......@@ -59,6 +64,20 @@ public class ReadWriteSafeCache<T> {
}
}
/**
* Write the collection of data into the {@link #writeBufferPointer} buffer.
*
* @param data to enqueue.
*/
public void write(List<T> data) {
lock.lock();
try {
data.forEach(writeBufferPointer::accept);
} finally {
lock.unlock();
}
}
public List<T> read() {
lock.lock();
try {
......
......@@ -18,7 +18,6 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.Iterator;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
......@@ -27,7 +26,6 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactor
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData;
import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
......@@ -46,14 +44,14 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
private AbstractWorker<Metrics> nextWorker;
private final DataCarrier<Metrics> dataCarrier;
private final ReadWriteSafeCache<Metrics> mergeDataCache;
private final MergableBufferedData<Metrics> mergeDataCache;
private CounterMetrics aggregationCounter;
MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker<Metrics> nextWorker,
String modelName) {
super(moduleDefineHolder);
this.nextWorker = nextWorker;
this.mergeDataCache = new ReadWriteSafeCache<>(new MergableBufferedData(), new MergableBufferedData());
this.mergeDataCache = new MergableBufferedData();
String name = "METRICS_L1_AGGREGATION";
this.dataCarrier = new DataCarrier<>("MetricsAggregateWorker." + modelName, name, 2, 10000);
......@@ -64,7 +62,7 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
} catch (Exception e) {
throw new UnexpectedException(e.getMessage(), e);
}
this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new AggregatorConsumer(this));
this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new AggregatorConsumer());
MetricsCreator metricsCreator = moduleDefineHolder.find(TelemetryModule.NAME)
.provider()
......@@ -75,54 +73,44 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
);
}
/**
* MetricsAggregateWorker#in operation does include enqueue only
*/
@Override
public final void in(Metrics metrics) {
metrics.resetEndOfBatch();
dataCarrier.produce(metrics);
}
private void onWork(Metrics metrics) {
aggregationCounter.inc();
mergeDataCache.write(metrics);
if (metrics.isEndOfBatch()) {
mergeDataCache.read().forEach(
data -> {
if (log.isDebugEnabled()) {
log.debug(data.toString());
}
nextWorker.in(data);
/**
* Dequeue consuming. According to {@link IConsumer#consume(List)}, this is a serial operation for every work
* instance.
*
* @param metricsList from the queue.
*/
private void onWork(List<Metrics> metricsList) {
metricsList.forEach(metrics -> {
aggregationCounter.inc();
mergeDataCache.accept(metrics);
});
mergeDataCache.read().forEach(
data -> {
if (log.isDebugEnabled()) {
log.debug(data.toString());
}
);
}
nextWorker.in(data);
}
);
}
private class AggregatorConsumer implements IConsumer<Metrics> {
private final MetricsAggregateWorker aggregator;
private AggregatorConsumer(MetricsAggregateWorker aggregator) {
this.aggregator = aggregator;
}
@Override
public void init() {
}
@Override
public void consume(List<Metrics> data) {
Iterator<Metrics> inputIterator = data.iterator();
int i = 0;
while (inputIterator.hasNext()) {
Metrics metrics = inputIterator.next();
i++;
if (i == data.size()) {
metrics.asEndOfBatch();
}
aggregator.onWork(metrics);
}
MetricsAggregateWorker.this.onWork(data);
}
@Override
......
......@@ -83,7 +83,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
}
this.dataCarrier = new DataCarrier<>("MetricsPersistentWorker." + model.getName(), name, 1, 2000);
this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new PersistentConsumer(this));
this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new PersistentConsumer());
}
/**
......@@ -97,11 +97,6 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
);
}
@Override
void onWork(Metrics metrics) {
cacheData(metrics);
}
/**
* Accept all metrics data and push them into the queue for serial processing
*/
......@@ -235,13 +230,6 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
* ID is declared through {@link Object#hashCode()} and {@link Object#equals(Object)} as usual.
*/
private class PersistentConsumer implements IConsumer<Metrics> {
private final MetricsPersistentWorker persistent;
private PersistentConsumer(MetricsPersistentWorker persistent) {
this.persistent = persistent;
}
@Override
public void init() {
......@@ -249,7 +237,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
@Override
public void consume(List<Metrics> data) {
data.forEach(persistent::onWork);
MetricsPersistentWorker.this.onWork(data);
}
@Override
......
......@@ -48,14 +48,7 @@ public abstract class PersistenceWorker<INPUT extends StorageData> extends Abstr
/**
* Accept the input, and push the data into the cache.
*/
void onWork(INPUT input) {
cacheData(input);
}
/**
* Cache data based on different strategies. See the implementations for more details.
*/
public void cacheData(INPUT input) {
void onWork(List<INPUT> input) {
cache.write(input);
}
......
......@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.Collection;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.analysis.data.LimitedSizeBufferedData;
......@@ -29,16 +30,12 @@ import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Top N worker is a persistence worker. Cache and order the data, flush in longer period.
*/
@Slf4j
public class TopNWorker extends PersistenceWorker<TopN> {
private static final Logger logger = LoggerFactory.getLogger(TopNWorker.class);
private final IRecordDAO recordDAO;
private final Model model;
private final DataCarrier<TopN> dataCarrier;
......@@ -80,7 +77,7 @@ public class TopNWorker extends PersistenceWorker<TopN> {
try {
prepareRequests.add(recordDAO.prepareBatchInsert(model, record));
} catch (Throwable t) {
logger.error(t.getMessage(), t);
log.error(t.getMessage(), t);
}
});
}
......@@ -98,24 +95,18 @@ public class TopNWorker extends PersistenceWorker<TopN> {
}
private class TopNConsumer implements IConsumer<TopN> {
@Override
public void init() {
}
@Override
public void consume(List<TopN> data) {
/*
* TopN is not following the batch size trigger mode.
* No need to implement this method, the memory size is limited always.
*/
data.forEach(TopNWorker.this::onWork);
TopNWorker.this.onWork(data);
}
@Override
public void onError(List<TopN> data, Throwable t) {
logger.error(t.getMessage(), t);
log.error(t.getMessage(), t);
}
@Override
......
......@@ -22,19 +22,5 @@ import org.apache.skywalking.oap.server.core.remote.Deserializable;
import org.apache.skywalking.oap.server.core.remote.Serializable;
public abstract class StreamData implements Serializable, Deserializable {
private boolean endOfBatch = false;
public void resetEndOfBatch() {
this.endOfBatch = false;
}
public void asEndOfBatch() {
this.endOfBatch = true;
}
public boolean isEndOfBatch() {
return this.endOfBatch;
}
public abstract int remoteHashCode();
}
......@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.oap.server.core.CoreModuleConfig;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
......@@ -35,14 +36,10 @@ import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Slf4j
public enum PersistenceTimer {
INSTANCE;
private static final Logger logger = LoggerFactory.getLogger(PersistenceTimer.class);
private Boolean isStarted = false;
private final Boolean debug;
private CounterMetrics errorCounter;
......@@ -56,7 +53,7 @@ public enum PersistenceTimer {
}
public void start(ModuleManager moduleManager, CoreModuleConfig moduleConfig) {
logger.info("persistence timer start");
log.info("persistence timer start");
IBatchDAO batchDAO = moduleManager.find(StorageModule.NAME).provider().getService(IBatchDAO.class);
MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME)
......@@ -78,7 +75,7 @@ public enum PersistenceTimer {
if (!isStarted) {
Executors.newSingleThreadScheduledExecutor()
.scheduleWithFixedDelay(
new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO), t -> logger
new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO), t -> log
.error("Extract data and save failure.", t)), 5, moduleConfig.getPersistentPeriod(),
TimeUnit.SECONDS
);
......@@ -88,8 +85,8 @@ public enum PersistenceTimer {
}
private void extractDataAndSave(IBatchDAO batchDAO) {
if (logger.isDebugEnabled()) {
logger.debug("Extract data and save");
if (log.isDebugEnabled()) {
log.debug("Extract data and save");
}
long startTime = System.currentTimeMillis();
......@@ -103,8 +100,8 @@ public enum PersistenceTimer {
persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
persistenceWorkers.forEach(worker -> {
if (logger.isDebugEnabled()) {
logger.debug("extract {} worker data and save", worker.getClass().getName());
if (log.isDebugEnabled()) {
log.debug("extract {} worker data and save", worker.getClass().getName());
}
worker.buildBatchRequests(prepareRequests);
......@@ -113,7 +110,7 @@ public enum PersistenceTimer {
});
if (debug) {
logger.info("build batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
log.info("build batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
}
} finally {
timer.finish();
......@@ -129,10 +126,10 @@ public enum PersistenceTimer {
}
} catch (Throwable e) {
errorCounter.inc();
logger.error(e.getMessage(), e);
log.error(e.getMessage(), e);
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Persistence data save finish");
if (log.isDebugEnabled()) {
log.debug("Persistence data save finish");
}
prepareRequests.clear();
......@@ -140,7 +137,7 @@ public enum PersistenceTimer {
}
if (debug) {
logger.info("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
log.info("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册