diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/ReadWriteSafeCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/ReadWriteSafeCache.java index 33a4e0178fece3c14a2e247df5971d0296ccd63c..e73bbfbd80104fd53691f6f4742db9d4c73154e3 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/ReadWriteSafeCache.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/ReadWriteSafeCache.java @@ -50,6 +50,11 @@ public class ReadWriteSafeCache { lock = new ReentrantLock(); } + /** + * Write the into the {@link #writeBufferPointer} buffer. + * + * @param data to enqueue. + */ public void write(T data) { lock.lock(); try { @@ -59,6 +64,20 @@ public class ReadWriteSafeCache { } } + /** + * Write the collection of data into the {@link #writeBufferPointer} buffer. + * + * @param data to enqueue. + */ + public void write(List data) { + lock.lock(); + try { + data.forEach(writeBufferPointer::accept); + } finally { + lock.unlock(); + } + } + public List read() { lock.lock(); try { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java index 3d5105f2914faf98a5e317d5617afb7299cde361..04f6601610d1614d0f64b0bbf50fae6053274a2f 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java @@ -18,7 +18,6 @@ package org.apache.skywalking.oap.server.core.analysis.worker; -import java.util.Iterator; import java.util.List; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; @@ -27,7 +26,6 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactor import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; import org.apache.skywalking.oap.server.core.UnexpectedException; import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData; -import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; @@ -46,14 +44,14 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; public class MetricsAggregateWorker extends AbstractWorker { private AbstractWorker nextWorker; private final DataCarrier dataCarrier; - private final ReadWriteSafeCache mergeDataCache; + private final MergableBufferedData mergeDataCache; private CounterMetrics aggregationCounter; MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker nextWorker, String modelName) { super(moduleDefineHolder); this.nextWorker = nextWorker; - this.mergeDataCache = new ReadWriteSafeCache<>(new MergableBufferedData(), new MergableBufferedData()); + this.mergeDataCache = new MergableBufferedData(); String name = "METRICS_L1_AGGREGATION"; this.dataCarrier = new DataCarrier<>("MetricsAggregateWorker." + modelName, name, 2, 10000); @@ -64,7 +62,7 @@ public class MetricsAggregateWorker extends AbstractWorker { } catch (Exception e) { throw new UnexpectedException(e.getMessage(), e); } - this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new AggregatorConsumer(this)); + this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new AggregatorConsumer()); MetricsCreator metricsCreator = moduleDefineHolder.find(TelemetryModule.NAME) .provider() @@ -75,54 +73,44 @@ public class MetricsAggregateWorker extends AbstractWorker { ); } + /** + * MetricsAggregateWorker#in operation does include enqueue only + */ @Override public final void in(Metrics metrics) { - metrics.resetEndOfBatch(); dataCarrier.produce(metrics); } - private void onWork(Metrics metrics) { - aggregationCounter.inc(); - mergeDataCache.write(metrics); - - if (metrics.isEndOfBatch()) { - mergeDataCache.read().forEach( - data -> { - if (log.isDebugEnabled()) { - log.debug(data.toString()); - } - nextWorker.in(data); + /** + * Dequeue consuming. According to {@link IConsumer#consume(List)}, this is a serial operation for every work + * instance. + * + * @param metricsList from the queue. + */ + private void onWork(List metricsList) { + metricsList.forEach(metrics -> { + aggregationCounter.inc(); + mergeDataCache.accept(metrics); + }); + + mergeDataCache.read().forEach( + data -> { + if (log.isDebugEnabled()) { + log.debug(data.toString()); } - ); - } + nextWorker.in(data); + } + ); } private class AggregatorConsumer implements IConsumer { - - private final MetricsAggregateWorker aggregator; - - private AggregatorConsumer(MetricsAggregateWorker aggregator) { - this.aggregator = aggregator; - } - @Override public void init() { - } @Override public void consume(List data) { - Iterator inputIterator = data.iterator(); - - int i = 0; - while (inputIterator.hasNext()) { - Metrics metrics = inputIterator.next(); - i++; - if (i == data.size()) { - metrics.asEndOfBatch(); - } - aggregator.onWork(metrics); - } + MetricsAggregateWorker.this.onWork(data); } @Override diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java index cab7dd396622866e6d8d071cffc90423a69c46db..a3cf58d0c2e539e5c4d4323c51c8c3b261e7fa69 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java @@ -83,7 +83,7 @@ public class MetricsPersistentWorker extends PersistenceWorker { } this.dataCarrier = new DataCarrier<>("MetricsPersistentWorker." + model.getName(), name, 1, 2000); - this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new PersistentConsumer(this)); + this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new PersistentConsumer()); } /** @@ -97,11 +97,6 @@ public class MetricsPersistentWorker extends PersistenceWorker { ); } - @Override - void onWork(Metrics metrics) { - cacheData(metrics); - } - /** * Accept all metrics data and push them into the queue for serial processing */ @@ -235,13 +230,6 @@ public class MetricsPersistentWorker extends PersistenceWorker { * ID is declared through {@link Object#hashCode()} and {@link Object#equals(Object)} as usual. */ private class PersistentConsumer implements IConsumer { - - private final MetricsPersistentWorker persistent; - - private PersistentConsumer(MetricsPersistentWorker persistent) { - this.persistent = persistent; - } - @Override public void init() { @@ -249,7 +237,7 @@ public class MetricsPersistentWorker extends PersistenceWorker { @Override public void consume(List data) { - data.forEach(persistent::onWork); + MetricsPersistentWorker.this.onWork(data); } @Override diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java index 536a6d98c9f6237c953d84d94a612e9948afc262..415371bdca25993ee6671a37ebe414604bdee042 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java @@ -48,14 +48,7 @@ public abstract class PersistenceWorker extends Abstr /** * Accept the input, and push the data into the cache. */ - void onWork(INPUT input) { - cacheData(input); - } - - /** - * Cache data based on different strategies. See the implementations for more details. - */ - public void cacheData(INPUT input) { + void onWork(List input) { cache.write(input); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java index 3c150c7e01710c906042b5b9466c2413bf4711b3..9093a2fb5d6f4499d14d85614472c8de45e2925e 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java @@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.core.analysis.worker; import java.util.Collection; import java.util.List; +import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; import org.apache.skywalking.oap.server.core.analysis.data.LimitedSizeBufferedData; @@ -29,16 +30,12 @@ import org.apache.skywalking.oap.server.core.storage.IRecordDAO; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.library.client.request.PrepareRequest; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Top N worker is a persistence worker. Cache and order the data, flush in longer period. */ +@Slf4j public class TopNWorker extends PersistenceWorker { - - private static final Logger logger = LoggerFactory.getLogger(TopNWorker.class); - private final IRecordDAO recordDAO; private final Model model; private final DataCarrier dataCarrier; @@ -80,7 +77,7 @@ public class TopNWorker extends PersistenceWorker { try { prepareRequests.add(recordDAO.prepareBatchInsert(model, record)); } catch (Throwable t) { - logger.error(t.getMessage(), t); + log.error(t.getMessage(), t); } }); } @@ -98,24 +95,18 @@ public class TopNWorker extends PersistenceWorker { } private class TopNConsumer implements IConsumer { - @Override public void init() { - } @Override public void consume(List data) { - /* - * TopN is not following the batch size trigger mode. - * No need to implement this method, the memory size is limited always. - */ - data.forEach(TopNWorker.this::onWork); + TopNWorker.this.onWork(data); } @Override public void onError(List data, Throwable t) { - logger.error(t.getMessage(), t); + log.error(t.getMessage(), t); } @Override diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java index 2c5b0b072af0c3226276146ea0ca7d036b17a72f..3c1c902de67c9ab55bc201743e43e088bb7da9d7 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java @@ -22,19 +22,5 @@ import org.apache.skywalking.oap.server.core.remote.Deserializable; import org.apache.skywalking.oap.server.core.remote.Serializable; public abstract class StreamData implements Serializable, Deserializable { - private boolean endOfBatch = false; - - public void resetEndOfBatch() { - this.endOfBatch = false; - } - - public void asEndOfBatch() { - this.endOfBatch = true; - } - - public boolean isEndOfBatch() { - return this.endOfBatch; - } - public abstract int remoteHashCode(); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java index 7fada1b414eb89ad639d634efce7778f48d8c74b..f35459cb111424cafeeb35e629f93f9a9b89293a 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.apm.util.RunnableWithExceptionProtection; import org.apache.skywalking.oap.server.core.CoreModuleConfig; import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor; @@ -35,14 +36,10 @@ import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics; import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +@Slf4j public enum PersistenceTimer { INSTANCE; - - private static final Logger logger = LoggerFactory.getLogger(PersistenceTimer.class); - private Boolean isStarted = false; private final Boolean debug; private CounterMetrics errorCounter; @@ -56,7 +53,7 @@ public enum PersistenceTimer { } public void start(ModuleManager moduleManager, CoreModuleConfig moduleConfig) { - logger.info("persistence timer start"); + log.info("persistence timer start"); IBatchDAO batchDAO = moduleManager.find(StorageModule.NAME).provider().getService(IBatchDAO.class); MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME) @@ -78,7 +75,7 @@ public enum PersistenceTimer { if (!isStarted) { Executors.newSingleThreadScheduledExecutor() .scheduleWithFixedDelay( - new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO), t -> logger + new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO), t -> log .error("Extract data and save failure.", t)), 5, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS ); @@ -88,8 +85,8 @@ public enum PersistenceTimer { } private void extractDataAndSave(IBatchDAO batchDAO) { - if (logger.isDebugEnabled()) { - logger.debug("Extract data and save"); + if (log.isDebugEnabled()) { + log.debug("Extract data and save"); } long startTime = System.currentTimeMillis(); @@ -103,8 +100,8 @@ public enum PersistenceTimer { persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers()); persistenceWorkers.forEach(worker -> { - if (logger.isDebugEnabled()) { - logger.debug("extract {} worker data and save", worker.getClass().getName()); + if (log.isDebugEnabled()) { + log.debug("extract {} worker data and save", worker.getClass().getName()); } worker.buildBatchRequests(prepareRequests); @@ -113,7 +110,7 @@ public enum PersistenceTimer { }); if (debug) { - logger.info("build batch persistence duration: {} ms", System.currentTimeMillis() - startTime); + log.info("build batch persistence duration: {} ms", System.currentTimeMillis() - startTime); } } finally { timer.finish(); @@ -129,10 +126,10 @@ public enum PersistenceTimer { } } catch (Throwable e) { errorCounter.inc(); - logger.error(e.getMessage(), e); + log.error(e.getMessage(), e); } finally { - if (logger.isDebugEnabled()) { - logger.debug("Persistence data save finish"); + if (log.isDebugEnabled()) { + log.debug("Persistence data save finish"); } prepareRequests.clear(); @@ -140,7 +137,7 @@ public enum PersistenceTimer { } if (debug) { - logger.info("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime); + log.info("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime); } } }