未验证 提交 a2897db4 编写于 作者: K kezhenxu94 提交者: GitHub

Fix the so11y latency of persistence execution latency not correct in ElasticSearch storage (#8161)

The ES persistence execution is now asynchronous and the execution
latency only counts the time to insert the requests into the bulk
processor, instead of the time after the requests are flushed into the
storage, this patch fixes that issue.

There is the same issue in prepare latency but that needs more changes
so I'll leave it to another pull request.
上级 973fba08
......@@ -54,6 +54,7 @@ Release Notes.
* Add filter mechanism in MAL core to filter metrics.
* Fix concurrency bug in MAL `increase`-related calculation.
* Fix a null pointer bug when building `SampleFamily`.
* Fix the so11y latency of persistence execution latency not correct in ElasticSearch storage.
#### UI
......
......@@ -179,14 +179,14 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
@Override
public List<PrepareRequest> buildBatchRequests() {
if (persistentCounter++ % persistentMod != 0) {
return Collections.EMPTY_LIST;
return Collections.emptyList();
}
final List<Metrics> lastCollection = getCache().read();
long start = System.currentTimeMillis();
if (lastCollection.size() == 0) {
return Collections.EMPTY_LIST;
return Collections.emptyList();
}
/*
......
......@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core.storage;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
......@@ -43,5 +44,5 @@ public interface IBatchDAO extends DAO {
*
* @param prepareRequests data to insert or update. No delete happens in streaming mode.
*/
void flush(List<PrepareRequest> prepareRequests);
CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests);
}
......@@ -21,12 +21,11 @@ package org.apache.skywalking.oap.server.core.storage;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.library.util.RunnableWithExceptionProtection;
import org.apache.skywalking.oap.server.core.CoreModuleConfig;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.analysis.worker.PersistenceWorker;
......@@ -34,6 +33,7 @@ import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.RunnableWithExceptionProtection;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
......@@ -56,21 +56,25 @@ public enum PersistenceTimer {
public void start(ModuleManager moduleManager, CoreModuleConfig moduleConfig) {
log.info("persistence timer start");
IBatchDAO batchDAO = moduleManager.find(StorageModule.NAME).provider().getService(IBatchDAO.class);
IBatchDAO batchDAO =
moduleManager.find(StorageModule.NAME).provider().getService(IBatchDAO.class);
MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class);
errorCounter = metricsCreator.createCounter(
"persistence_timer_bulk_error_count", "Error execution of the prepare stage in persistence timer",
"persistence_timer_bulk_error_count",
"Error execution of the prepare stage in persistence timer",
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
);
prepareLatency = metricsCreator.createHistogramMetric(
"persistence_timer_bulk_prepare_latency", "Latency of the prepare stage in persistence timer",
"persistence_timer_bulk_prepare_latency",
"Latency of the prepare stage in persistence timer",
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
);
executeLatency = metricsCreator.createHistogramMetric(
"persistence_timer_bulk_execute_latency", "Latency of the execute stage in persistence timer",
"persistence_timer_bulk_execute_latency",
"Latency of the execute stage in persistence timer",
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
);
allLatency = metricsCreator.createHistogramMetric(
......@@ -82,69 +86,69 @@ public enum PersistenceTimer {
if (!isStarted) {
Executors.newSingleThreadScheduledExecutor()
.scheduleWithFixedDelay(
new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO), t -> log
.error("Extract data and save failure.", t)), 5, moduleConfig.getPersistentPeriod(),
TimeUnit.SECONDS
new RunnableWithExceptionProtection(
() -> extractDataAndSave(batchDAO).join(),
t -> log.error("Extract data and save failure.", t)
), 5, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS
);
this.isStarted = true;
}
}
private void extractDataAndSave(IBatchDAO batchDAO) {
private CompletableFuture<Void> extractDataAndSave(IBatchDAO batchDAO) {
if (log.isDebugEnabled()) {
log.debug("Extract data and save");
}
long startTime = System.currentTimeMillis();
try (HistogramMetrics.Timer allTimer = allLatency.createTimer()) {
List<PersistenceWorker<? extends StorageData>> persistenceWorkers = new ArrayList<>();
persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
CountDownLatch countDownLatch = new CountDownLatch(persistenceWorkers.size());
persistenceWorkers.forEach(worker -> {
prepareExecutorService.submit(() -> {
List<PrepareRequest> innerPrepareRequests = null;
try {
// Prepare stage
try (HistogramMetrics.Timer timer = prepareLatency.createTimer()) {
if (log.isDebugEnabled()) {
log.debug("extract {} worker data and save", worker.getClass().getName());
}
innerPrepareRequests = worker.buildBatchRequests();
worker.endOfRound();
} catch (Throwable e) {
log.error(e.getMessage(), e);
HistogramMetrics.Timer allTimer = allLatency.createTimer();
List<PersistenceWorker<? extends StorageData>> workers = new ArrayList<>();
workers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
workers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
final CompletableFuture<Void> future =
CompletableFuture.allOf(workers.stream().map(worker -> {
return CompletableFuture.runAsync(() -> {
List<PrepareRequest> innerPrepareRequests;
// Prepare stage
try (HistogramMetrics.Timer ignored = prepareLatency.createTimer()) {
if (log.isDebugEnabled()) {
log.debug(
"extract {} worker data and save",
worker.getClass().getName()
);
}
// Execution stage
try (HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer()) {
if (CollectionUtils.isNotEmpty(innerPrepareRequests)) {
batchDAO.flush(innerPrepareRequests);
}
} catch (Throwable e) {
log.error(e.getMessage(), e);
}
} finally {
countDownLatch.countDown();
innerPrepareRequests = worker.buildBatchRequests();
worker.endOfRound();
}
if (CollectionUtils.isEmpty(innerPrepareRequests)) {
return;
}
});
});
countDownLatch.await();
} catch (Throwable e) {
errorCounter.inc();
log.error(e.getMessage(), e);
} finally {
// Execution stage
HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer();
batchDAO.flush(innerPrepareRequests)
.whenComplete(($1, $2) -> executeLatencyTimer.close());
}, prepareExecutorService);
}).toArray(CompletableFuture[]::new));
future.whenComplete((unused, throwable) -> {
allTimer.close();
if (log.isDebugEnabled()) {
log.debug("Persistence data save finish");
log.debug(
"Batch persistence duration: {} ms",
System.currentTimeMillis() - startTime
);
}
}
log.debug("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
if (throwable != null) {
errorCounter.inc();
log.error(throwable.getMessage(), throwable);
}
});
return future;
}
}
......@@ -23,6 +23,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import lombok.Data;
import org.apache.skywalking.oap.server.core.CoreModuleConfig;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsPersistentWorker;
......@@ -61,10 +62,11 @@ public class PersistenceTimerTest {
}
@Override
public void flush(final List<PrepareRequest> prepareRequests) {
public CompletableFuture<Void> flush(final List<PrepareRequest> prepareRequests) {
synchronized (result) {
result.addAll(prepareRequests);
}
return CompletableFuture.completedFuture(null);
}
};
for (int i = 0; i < workCount; i++) {
......@@ -79,7 +81,8 @@ public class PersistenceTimerTest {
PersistenceTimer.INSTANCE.isStarted = true;
PersistenceTimer.INSTANCE.start(moduleManager, moduleConfig);
Whitebox.invokeMethod(PersistenceTimer.INSTANCE, "extractDataAndSave", iBatchDAO);
CompletableFuture<Void> f = Whitebox.invokeMethod(PersistenceTimer.INSTANCE, "extractDataAndSave", iBatchDAO);
f.join();
Assert.assertEquals(count * workCount * 2, result.size());
}
......
......@@ -30,6 +30,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.library.elasticsearch.ElasticSearch;
......@@ -41,7 +42,7 @@ import static java.util.Objects.requireNonNull;
@Slf4j
public final class BulkProcessor {
private final ArrayBlockingQueue<Object> requests;
private final ArrayBlockingQueue<Holder> requests;
private final AtomicReference<ElasticSearch> es;
private final int bulkActions;
......@@ -74,21 +75,21 @@ public final class BulkProcessor {
this::flush, 0, flushInterval.getSeconds(), TimeUnit.SECONDS);
}
public BulkProcessor add(IndexRequest request) {
internalAdd(request);
return this;
public CompletableFuture<Void> add(IndexRequest request) {
return internalAdd(request);
}
public BulkProcessor add(UpdateRequest request) {
internalAdd(request);
return this;
public CompletableFuture<Void> add(UpdateRequest request) {
return internalAdd(request);
}
@SneakyThrows
private void internalAdd(Object request) {
private CompletableFuture<Void> internalAdd(Object request) {
requireNonNull(request, "request");
requests.put(request);
final CompletableFuture<Void> f = new CompletableFuture<>();
requests.put(new Holder(f, request));
flushIfNeeded();
return f;
}
@SneakyThrows
......@@ -110,7 +111,7 @@ public final class BulkProcessor {
return;
}
final List<Object> batch = new ArrayList<>(requests.size());
final List<Holder> batch = new ArrayList<>(requests.size());
requests.drainTo(batch);
final CompletableFuture<Void> flush = doFlush(batch);
......@@ -118,7 +119,7 @@ public final class BulkProcessor {
flush.join();
}
private CompletableFuture<Void> doFlush(final List<Object> batch) {
private CompletableFuture<Void> doFlush(final List<Holder> batch) {
log.debug("Executing bulk with {} requests", batch.size());
if (batch.isEmpty()) {
......@@ -129,8 +130,8 @@ public final class BulkProcessor {
try {
final RequestFactory rf = v.requestFactory();
final List<byte[]> bs = new ArrayList<>();
for (final Object request : batch) {
bs.add(v.codec().encode(request));
for (final Holder holder : batch) {
bs.add(v.codec().encode(holder.request));
bs.add("\n".getBytes());
}
final ByteBuf content = Unpooled.wrappedBuffer(bs.toArray(new byte[0][]));
......@@ -147,11 +148,20 @@ public final class BulkProcessor {
});
future.whenComplete((ignored, exception) -> {
if (exception != null) {
batch.stream().map(it -> it.future)
.forEach(it -> it.completeExceptionally(exception));
log.error("Failed to execute requests in bulk", exception);
} else {
log.debug("Succeeded to execute {} requests in bulk", batch.size());
batch.stream().map(it -> it.future).forEach(it -> it.complete(null));
}
});
return future;
}
@RequiredArgsConstructor
static class Holder {
private final CompletableFuture<Void> future;
private final Object request;
}
}
......@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.library.elasticsearch.bulk.BulkProcessor;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
......@@ -56,19 +57,20 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
}
@Override
public void flush(List<PrepareRequest> prepareRequests) {
public CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests) {
if (bulkProcessor == null) {
this.bulkProcessor = getClient().createBulkProcessor(bulkActions, flushInterval, concurrentRequests);
}
if (CollectionUtils.isNotEmpty(prepareRequests)) {
for (PrepareRequest prepareRequest : prepareRequests) {
return CompletableFuture.allOf(prepareRequests.stream().map(prepareRequest -> {
if (prepareRequest instanceof InsertRequest) {
this.bulkProcessor.add(((IndexRequestWrapper) prepareRequest).getRequest());
return bulkProcessor.add(((IndexRequestWrapper) prepareRequest).getRequest());
} else {
this.bulkProcessor.add(((UpdateRequestWrapper) prepareRequest).getRequest());
return bulkProcessor.add(((UpdateRequestWrapper) prepareRequest).getRequest());
}
}
}).toArray(CompletableFuture[]::new));
}
return CompletableFuture.completedFuture(null);
}
}
......@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.storage.plugin.influxdb.base;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
......@@ -41,9 +42,9 @@ public class BatchDAO implements IBatchDAO {
}
@Override
public void flush(List<PrepareRequest> prepareRequests) {
public CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests) {
if (CollectionUtils.isEmpty(prepareRequests)) {
return;
return CompletableFuture.completedFuture(null);
}
if (log.isDebugEnabled()) {
......@@ -51,10 +52,9 @@ public class BatchDAO implements IBatchDAO {
}
final BatchPoints.Builder builder = BatchPoints.builder();
prepareRequests.forEach(e -> {
builder.point(((InfluxInsertRequest) e).getPoint());
});
prepareRequests.forEach(e -> builder.point(((InfluxInsertRequest) e).getPoint()));
client.write(builder.build());
return CompletableFuture.completedFuture(null);
}
}
......@@ -23,6 +23,7 @@ import java.sql.SQLException;
import java.util.List;
import java.util.Properties;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
......@@ -54,9 +55,9 @@ public class H2BatchDAO implements IBatchDAO {
}
@Override
public void flush(List<PrepareRequest> prepareRequests) {
public CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests) {
if (CollectionUtils.isEmpty(prepareRequests)) {
return;
return CompletableFuture.completedFuture(null);
}
if (log.isDebugEnabled()) {
log.debug("to execute sql statements execute, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
......@@ -80,6 +81,7 @@ public class H2BatchDAO implements IBatchDAO {
if (log.isDebugEnabled()) {
log.debug("execute sql statements done, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
}
return CompletableFuture.completedFuture(null);
}
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册