From 21978205778c10234281a4f2a963380cb0773ce1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E6=99=9F=20Wu=20Sheng?= Date: Sun, 9 Jun 2019 11:32:27 +0800 Subject: [PATCH] Support exporter runs in increment and total modes (#2840) * Make exporter interface better. * Fix review. --- .../exporter/provider/grpc/GRPCExporter.java | 14 +++-- .../provider/grpc/GRPCExporterTest.java | 12 ++++- .../core/analysis/worker/ExportWorker.java | 14 +++-- .../worker/MetricsPersistentWorker.java | 13 +++-- .../oap/server/core/exporter/ExportEvent.java | 53 +++++++++++++++++++ .../exporter/MetricValuesExportService.java | 8 ++- 6 files changed, 95 insertions(+), 19 deletions(-) create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExportEvent.java diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java index 4fdd05c7ae..2e283221c6 100644 --- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java +++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java @@ -26,7 +26,7 @@ import lombok.*; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; import org.apache.skywalking.oap.server.core.analysis.metrics.*; -import org.apache.skywalking.oap.server.core.exporter.MetricValuesExportService; +import org.apache.skywalking.oap.server.core.exporter.*; import org.apache.skywalking.oap.server.exporter.grpc.*; import org.apache.skywalking.oap.server.exporter.provider.MetricFormatter; import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient; @@ -56,9 +56,15 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS subscriptionSet = new HashSet<>(); } - @Override public void export(MetricsMetaInfo meta, Metrics metrics) { - if (subscriptionSet.size() == 0 || subscriptionSet.contains(meta.getMetricsName())) { - exportBuffer.produce(new ExportData(meta, metrics)); + @Override public void export(ExportEvent event) { + if (ExportEvent.EventType.TOTAL == event.getType()) { + Metrics metrics = event.getMetrics(); + if (metrics instanceof WithMetadata) { + MetricsMetaInfo meta = ((WithMetadata)metrics).getMeta(); + if (subscriptionSet.size() == 0 || subscriptionSet.contains(meta.getMetricsName())) { + exportBuffer.produce(new ExportData(meta, metrics)); + } + } } } diff --git a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java index 0b347d443d..5c0d50dea9 100644 --- a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java +++ b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java @@ -19,7 +19,8 @@ package org.apache.skywalking.oap.server.exporter.provider.grpc; import io.grpc.testing.GrpcServerRule; -import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo; +import org.apache.skywalking.oap.server.core.analysis.metrics.*; +import org.apache.skywalking.oap.server.core.exporter.ExportEvent; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.exporter.grpc.MetricExportServiceGrpc; import org.junit.Before; @@ -58,7 +59,14 @@ public class GRPCExporterTest { @Test public void export() { - exporter.export(metaInfo, new MockMetrics()); + ExportEvent event = new ExportEvent(new MockExporterMetrics(), ExportEvent.EventType.TOTAL); + exporter.export(event); + } + + public static class MockExporterMetrics extends MockMetrics implements WithMetadata { + @Override public MetricsMetaInfo getMeta() { + return new MetricsMetaInfo("mock-metrics", DefaultScopeDefine.ALL); + } } @Test diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java index 6f1edc02ae..09fa143f92 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java @@ -18,7 +18,6 @@ package org.apache.skywalking.oap.server.core.analysis.worker; -import org.apache.skywalking.oap.server.core.analysis.metrics.*; import org.apache.skywalking.oap.server.core.exporter.*; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; @@ -26,21 +25,20 @@ import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; /** * @author wusheng */ -public class ExportWorker extends AbstractWorker { +public class ExportWorker extends AbstractWorker { private MetricValuesExportService exportService; public ExportWorker(ModuleDefineHolder moduleDefineHolder) { super(moduleDefineHolder); } - @Override public void in(Metrics metrics) { + @Override public void in(ExportEvent event) { if (exportService != null || getModuleDefineHolder().has(ExporterModule.NAME)) { - if (metrics instanceof WithMetadata) { - if (exportService == null) { - exportService = getModuleDefineHolder().find(ExporterModule.NAME).provider().getService(MetricValuesExportService.class); - } - exportService.export(((WithMetadata)metrics).getMeta(), metrics); + if (exportService == null) { + exportService = getModuleDefineHolder().find(ExporterModule.NAME).provider().getService(MetricValuesExportService.class); } + exportService.export(event); } } + } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java index 513f3aec3f..b4b4fd1283 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java @@ -24,6 +24,7 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.*; import org.apache.skywalking.oap.server.core.UnexpectedException; import org.apache.skywalking.oap.server.core.analysis.data.*; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; +import org.apache.skywalking.oap.server.core.exporter.ExportEvent; import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; @@ -43,12 +44,12 @@ public class MetricsPersistentWorker extends PersistenceWorker mergeDataCache; private final IMetricsDAO metricsDAO; private final AbstractWorker nextAlarmWorker; - private final AbstractWorker nextExportWorker; + private final AbstractWorker nextExportWorker; private final DataCarrier dataCarrier; MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, int batchSize, IMetricsDAO metricsDAO, AbstractWorker nextAlarmWorker, - AbstractWorker nextExportWorker) { + AbstractWorker nextExportWorker) { super(moduleDefineHolder, batchSize); this.model = model; this.mergeDataCache = new MergeDataCache<>(); @@ -100,6 +101,11 @@ public class MetricsPersistentWorker extends PersistenceWorker prepareBatch(MergeDataCache cache) { List batchCollection = new LinkedList<>(); cache.getLast().collection().forEach(data -> { + if (Objects.nonNull(nextExportWorker)) { + ExportEvent event = new ExportEvent(data, ExportEvent.EventType.INCREMENT); + nextExportWorker.in(event); + } + Metrics dbData = null; try { dbData = metricsDAO.get(model, data); @@ -120,7 +126,8 @@ public class MetricsPersistentWorker extends PersistenceWorker