From a2897db4715092b8e4079fd57b462606deee2fdf Mon Sep 17 00:00:00 2001 From: kezhenxu94 Date: Sat, 20 Nov 2021 16:51:54 +0800 Subject: [PATCH] Fix the so11y latency of persistence execution latency not correct in ElasticSearch storage (#8161) The ES persistence execution is now asynchronous and the execution latency only counts the time to insert the requests into the bulk processor, instead of the time after the requests are flushed into the storage, this patch fixes that issue. There is the same issue in prepare latency but that needs more changes so I'll leave it to another pull request. --- CHANGES.md | 1 + .../worker/MetricsPersistentWorker.java | 4 +- .../oap/server/core/storage/IBatchDAO.java | 3 +- .../server/core/storage/PersistenceTimer.java | 110 +++++++++--------- .../core/storage/PersistenceTimerTest.java | 7 +- .../elasticsearch/bulk/BulkProcessor.java | 36 +++--- .../elasticsearch/base/BatchProcessEsDAO.java | 12 +- .../plugin/influxdb/base/BatchDAO.java | 10 +- .../plugin/jdbc/h2/dao/H2BatchDAO.java | 6 +- 9 files changed, 106 insertions(+), 83 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 86f224292a..25f1ba11bb 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -54,6 +54,7 @@ Release Notes. * Add filter mechanism in MAL core to filter metrics. * Fix concurrency bug in MAL `increase`-related calculation. * Fix a null pointer bug when building `SampleFamily`. +* Fix the so11y latency of persistence execution latency not correct in ElasticSearch storage. #### UI diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java index 07d14ef0d4..9758cb3e89 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java @@ -179,14 +179,14 @@ public class MetricsPersistentWorker extends PersistenceWorker { @Override public List buildBatchRequests() { if (persistentCounter++ % persistentMod != 0) { - return Collections.EMPTY_LIST; + return Collections.emptyList(); } final List lastCollection = getCache().read(); long start = System.currentTimeMillis(); if (lastCollection.size() == 0) { - return Collections.EMPTY_LIST; + return Collections.emptyList(); } /* diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java index ed3172ff66..f7279f276a 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java @@ -19,6 +19,7 @@ package org.apache.skywalking.oap.server.core.storage; import java.util.List; +import java.util.concurrent.CompletableFuture; import org.apache.skywalking.oap.server.library.client.request.InsertRequest; import org.apache.skywalking.oap.server.library.client.request.PrepareRequest; @@ -43,5 +44,5 @@ public interface IBatchDAO extends DAO { * * @param prepareRequests data to insert or update. No delete happens in streaming mode. */ - void flush(List prepareRequests); + CompletableFuture flush(List prepareRequests); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java index 594cac1563..3af6a780bb 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java @@ -21,12 +21,11 @@ package org.apache.skywalking.oap.server.core.storage; import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; -import org.apache.skywalking.oap.server.library.util.RunnableWithExceptionProtection; import org.apache.skywalking.oap.server.core.CoreModuleConfig; import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor; import org.apache.skywalking.oap.server.core.analysis.worker.PersistenceWorker; @@ -34,6 +33,7 @@ import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor import org.apache.skywalking.oap.server.library.client.request.PrepareRequest; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.util.CollectionUtils; +import org.apache.skywalking.oap.server.library.util.RunnableWithExceptionProtection; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics; @@ -56,21 +56,25 @@ public enum PersistenceTimer { public void start(ModuleManager moduleManager, CoreModuleConfig moduleConfig) { log.info("persistence timer start"); - IBatchDAO batchDAO = moduleManager.find(StorageModule.NAME).provider().getService(IBatchDAO.class); + IBatchDAO batchDAO = + moduleManager.find(StorageModule.NAME).provider().getService(IBatchDAO.class); MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME) .provider() .getService(MetricsCreator.class); errorCounter = metricsCreator.createCounter( - "persistence_timer_bulk_error_count", "Error execution of the prepare stage in persistence timer", + "persistence_timer_bulk_error_count", + "Error execution of the prepare stage in persistence timer", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE ); prepareLatency = metricsCreator.createHistogramMetric( - "persistence_timer_bulk_prepare_latency", "Latency of the prepare stage in persistence timer", + "persistence_timer_bulk_prepare_latency", + "Latency of the prepare stage in persistence timer", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE ); executeLatency = metricsCreator.createHistogramMetric( - "persistence_timer_bulk_execute_latency", "Latency of the execute stage in persistence timer", + "persistence_timer_bulk_execute_latency", + "Latency of the execute stage in persistence timer", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE ); allLatency = metricsCreator.createHistogramMetric( @@ -82,69 +86,69 @@ public enum PersistenceTimer { if (!isStarted) { Executors.newSingleThreadScheduledExecutor() .scheduleWithFixedDelay( - new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO), t -> log - .error("Extract data and save failure.", t)), 5, moduleConfig.getPersistentPeriod(), - TimeUnit.SECONDS + new RunnableWithExceptionProtection( + () -> extractDataAndSave(batchDAO).join(), + t -> log.error("Extract data and save failure.", t) + ), 5, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS ); this.isStarted = true; } } - private void extractDataAndSave(IBatchDAO batchDAO) { + private CompletableFuture extractDataAndSave(IBatchDAO batchDAO) { if (log.isDebugEnabled()) { log.debug("Extract data and save"); } long startTime = System.currentTimeMillis(); - try (HistogramMetrics.Timer allTimer = allLatency.createTimer()) { - List> persistenceWorkers = new ArrayList<>(); - persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers()); - persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers()); - - CountDownLatch countDownLatch = new CountDownLatch(persistenceWorkers.size()); - persistenceWorkers.forEach(worker -> { - prepareExecutorService.submit(() -> { - List innerPrepareRequests = null; - try { - // Prepare stage - try (HistogramMetrics.Timer timer = prepareLatency.createTimer()) { - if (log.isDebugEnabled()) { - log.debug("extract {} worker data and save", worker.getClass().getName()); - } - - innerPrepareRequests = worker.buildBatchRequests(); - - worker.endOfRound(); - } catch (Throwable e) { - log.error(e.getMessage(), e); + HistogramMetrics.Timer allTimer = allLatency.createTimer(); + List> workers = new ArrayList<>(); + workers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers()); + workers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers()); + + final CompletableFuture future = + CompletableFuture.allOf(workers.stream().map(worker -> { + return CompletableFuture.runAsync(() -> { + List innerPrepareRequests; + // Prepare stage + try (HistogramMetrics.Timer ignored = prepareLatency.createTimer()) { + if (log.isDebugEnabled()) { + log.debug( + "extract {} worker data and save", + worker.getClass().getName() + ); } - // Execution stage - try (HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer()) { - if (CollectionUtils.isNotEmpty(innerPrepareRequests)) { - batchDAO.flush(innerPrepareRequests); - } - } catch (Throwable e) { - log.error(e.getMessage(), e); - } - } finally { - countDownLatch.countDown(); + innerPrepareRequests = worker.buildBatchRequests(); + + worker.endOfRound(); + } + + if (CollectionUtils.isEmpty(innerPrepareRequests)) { + return; } - }); - }); - - countDownLatch.await(); - } catch (Throwable e) { - errorCounter.inc(); - log.error(e.getMessage(), e); - } finally { + + // Execution stage + HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer(); + batchDAO.flush(innerPrepareRequests) + .whenComplete(($1, $2) -> executeLatencyTimer.close()); + }, prepareExecutorService); + }).toArray(CompletableFuture[]::new)); + future.whenComplete((unused, throwable) -> { + allTimer.close(); if (log.isDebugEnabled()) { - log.debug("Persistence data save finish"); + log.debug( + "Batch persistence duration: {} ms", + System.currentTimeMillis() - startTime + ); } - } - - log.debug("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime); + if (throwable != null) { + errorCounter.inc(); + log.error(throwable.getMessage(), throwable); + } + }); + return future; } } diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimerTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimerTest.java index bbe9ca6dfd..f10f78c863 100644 --- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimerTest.java +++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimerTest.java @@ -23,6 +23,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import lombok.Data; import org.apache.skywalking.oap.server.core.CoreModuleConfig; import org.apache.skywalking.oap.server.core.analysis.worker.MetricsPersistentWorker; @@ -61,10 +62,11 @@ public class PersistenceTimerTest { } @Override - public void flush(final List prepareRequests) { + public CompletableFuture flush(final List prepareRequests) { synchronized (result) { result.addAll(prepareRequests); } + return CompletableFuture.completedFuture(null); } }; for (int i = 0; i < workCount; i++) { @@ -79,7 +81,8 @@ public class PersistenceTimerTest { PersistenceTimer.INSTANCE.isStarted = true; PersistenceTimer.INSTANCE.start(moduleManager, moduleConfig); - Whitebox.invokeMethod(PersistenceTimer.INSTANCE, "extractDataAndSave", iBatchDAO); + CompletableFuture f = Whitebox.invokeMethod(PersistenceTimer.INSTANCE, "extractDataAndSave", iBatchDAO); + f.join(); Assert.assertEquals(count * workCount * 2, result.size()); } diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java index 93c49aec5c..2733b13018 100644 --- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java +++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java @@ -30,6 +30,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.library.elasticsearch.ElasticSearch; @@ -41,7 +42,7 @@ import static java.util.Objects.requireNonNull; @Slf4j public final class BulkProcessor { - private final ArrayBlockingQueue requests; + private final ArrayBlockingQueue requests; private final AtomicReference es; private final int bulkActions; @@ -74,21 +75,21 @@ public final class BulkProcessor { this::flush, 0, flushInterval.getSeconds(), TimeUnit.SECONDS); } - public BulkProcessor add(IndexRequest request) { - internalAdd(request); - return this; + public CompletableFuture add(IndexRequest request) { + return internalAdd(request); } - public BulkProcessor add(UpdateRequest request) { - internalAdd(request); - return this; + public CompletableFuture add(UpdateRequest request) { + return internalAdd(request); } @SneakyThrows - private void internalAdd(Object request) { + private CompletableFuture internalAdd(Object request) { requireNonNull(request, "request"); - requests.put(request); + final CompletableFuture f = new CompletableFuture<>(); + requests.put(new Holder(f, request)); flushIfNeeded(); + return f; } @SneakyThrows @@ -110,7 +111,7 @@ public final class BulkProcessor { return; } - final List batch = new ArrayList<>(requests.size()); + final List batch = new ArrayList<>(requests.size()); requests.drainTo(batch); final CompletableFuture flush = doFlush(batch); @@ -118,7 +119,7 @@ public final class BulkProcessor { flush.join(); } - private CompletableFuture doFlush(final List batch) { + private CompletableFuture doFlush(final List batch) { log.debug("Executing bulk with {} requests", batch.size()); if (batch.isEmpty()) { @@ -129,8 +130,8 @@ public final class BulkProcessor { try { final RequestFactory rf = v.requestFactory(); final List bs = new ArrayList<>(); - for (final Object request : batch) { - bs.add(v.codec().encode(request)); + for (final Holder holder : batch) { + bs.add(v.codec().encode(holder.request)); bs.add("\n".getBytes()); } final ByteBuf content = Unpooled.wrappedBuffer(bs.toArray(new byte[0][])); @@ -147,11 +148,20 @@ public final class BulkProcessor { }); future.whenComplete((ignored, exception) -> { if (exception != null) { + batch.stream().map(it -> it.future) + .forEach(it -> it.completeExceptionally(exception)); log.error("Failed to execute requests in bulk", exception); } else { log.debug("Succeeded to execute {} requests in bulk", batch.size()); + batch.stream().map(it -> it.future).forEach(it -> it.complete(null)); } }); return future; } + + @RequiredArgsConstructor + static class Holder { + private final CompletableFuture future; + private final Object request; + } } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java index 0cc371dd63..ef8ec0062f 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java @@ -19,6 +19,7 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base; import java.util.List; +import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.library.elasticsearch.bulk.BulkProcessor; import org.apache.skywalking.oap.server.core.storage.IBatchDAO; @@ -56,19 +57,20 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO { } @Override - public void flush(List prepareRequests) { + public CompletableFuture flush(List prepareRequests) { if (bulkProcessor == null) { this.bulkProcessor = getClient().createBulkProcessor(bulkActions, flushInterval, concurrentRequests); } if (CollectionUtils.isNotEmpty(prepareRequests)) { - for (PrepareRequest prepareRequest : prepareRequests) { + return CompletableFuture.allOf(prepareRequests.stream().map(prepareRequest -> { if (prepareRequest instanceof InsertRequest) { - this.bulkProcessor.add(((IndexRequestWrapper) prepareRequest).getRequest()); + return bulkProcessor.add(((IndexRequestWrapper) prepareRequest).getRequest()); } else { - this.bulkProcessor.add(((UpdateRequestWrapper) prepareRequest).getRequest()); + return bulkProcessor.add(((UpdateRequestWrapper) prepareRequest).getRequest()); } - } + }).toArray(CompletableFuture[]::new)); } + return CompletableFuture.completedFuture(null); } } diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/BatchDAO.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/BatchDAO.java index 105383d422..00fc791f8a 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/BatchDAO.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/BatchDAO.java @@ -19,6 +19,7 @@ package org.apache.skywalking.oap.server.storage.plugin.influxdb.base; import java.util.List; +import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.core.storage.IBatchDAO; import org.apache.skywalking.oap.server.library.client.request.InsertRequest; @@ -41,9 +42,9 @@ public class BatchDAO implements IBatchDAO { } @Override - public void flush(List prepareRequests) { + public CompletableFuture flush(List prepareRequests) { if (CollectionUtils.isEmpty(prepareRequests)) { - return; + return CompletableFuture.completedFuture(null); } if (log.isDebugEnabled()) { @@ -51,10 +52,9 @@ public class BatchDAO implements IBatchDAO { } final BatchPoints.Builder builder = BatchPoints.builder(); - prepareRequests.forEach(e -> { - builder.point(((InfluxInsertRequest) e).getPoint()); - }); + prepareRequests.forEach(e -> builder.point(((InfluxInsertRequest) e).getPoint())); client.write(builder.build()); + return CompletableFuture.completedFuture(null); } } diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java index 6188cf7eee..3dcd04677b 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java @@ -23,6 +23,7 @@ import java.sql.SQLException; import java.util.List; import java.util.Properties; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -54,9 +55,9 @@ public class H2BatchDAO implements IBatchDAO { } @Override - public void flush(List prepareRequests) { + public CompletableFuture flush(List prepareRequests) { if (CollectionUtils.isEmpty(prepareRequests)) { - return; + return CompletableFuture.completedFuture(null); } if (log.isDebugEnabled()) { log.debug("to execute sql statements execute, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize); @@ -80,6 +81,7 @@ public class H2BatchDAO implements IBatchDAO { if (log.isDebugEnabled()) { log.debug("execute sql statements done, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize); } + return CompletableFuture.completedFuture(null); } @Override -- GitLab