diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java index 4fdd05c7ae552ecaa1c704d9f8037f3ea815715c..2e283221c68a49451932a3ffbbf8215656063d05 100644 --- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java +++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java @@ -26,7 +26,7 @@ import lombok.*; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; import org.apache.skywalking.oap.server.core.analysis.metrics.*; -import org.apache.skywalking.oap.server.core.exporter.MetricValuesExportService; +import org.apache.skywalking.oap.server.core.exporter.*; import org.apache.skywalking.oap.server.exporter.grpc.*; import org.apache.skywalking.oap.server.exporter.provider.MetricFormatter; import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient; @@ -56,9 +56,15 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS subscriptionSet = new HashSet<>(); } - @Override public void export(MetricsMetaInfo meta, Metrics metrics) { - if (subscriptionSet.size() == 0 || subscriptionSet.contains(meta.getMetricsName())) { - exportBuffer.produce(new ExportData(meta, metrics)); + @Override public void export(ExportEvent event) { + if (ExportEvent.EventType.TOTAL == event.getType()) { + Metrics metrics = event.getMetrics(); + if (metrics instanceof WithMetadata) { + MetricsMetaInfo meta = ((WithMetadata)metrics).getMeta(); + if (subscriptionSet.size() == 0 || subscriptionSet.contains(meta.getMetricsName())) { + exportBuffer.produce(new ExportData(meta, metrics)); + } + } } } diff --git a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java index 0b347d443d4b561330d3808338835ad0cd7f4a4d..5c0d50dea92011f98658a8ca78097753e379a7cc 100644 --- a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java +++ b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterTest.java @@ -19,7 +19,8 @@ package org.apache.skywalking.oap.server.exporter.provider.grpc; import io.grpc.testing.GrpcServerRule; -import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo; +import org.apache.skywalking.oap.server.core.analysis.metrics.*; +import org.apache.skywalking.oap.server.core.exporter.ExportEvent; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.exporter.grpc.MetricExportServiceGrpc; import org.junit.Before; @@ -58,7 +59,14 @@ public class GRPCExporterTest { @Test public void export() { - exporter.export(metaInfo, new MockMetrics()); + ExportEvent event = new ExportEvent(new MockExporterMetrics(), ExportEvent.EventType.TOTAL); + exporter.export(event); + } + + public static class MockExporterMetrics extends MockMetrics implements WithMetadata { + @Override public MetricsMetaInfo getMeta() { + return new MetricsMetaInfo("mock-metrics", DefaultScopeDefine.ALL); + } } @Test diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java index 6f1edc02aedf2972609ea50b85bcd18c7106dd03..09fa143f92ae833a9177fba0119e4336853095ac 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java @@ -18,7 +18,6 @@ package org.apache.skywalking.oap.server.core.analysis.worker; -import org.apache.skywalking.oap.server.core.analysis.metrics.*; import org.apache.skywalking.oap.server.core.exporter.*; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; @@ -26,21 +25,20 @@ import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; /** * @author wusheng */ -public class ExportWorker extends AbstractWorker { +public class ExportWorker extends AbstractWorker { private MetricValuesExportService exportService; public ExportWorker(ModuleDefineHolder moduleDefineHolder) { super(moduleDefineHolder); } - @Override public void in(Metrics metrics) { + @Override public void in(ExportEvent event) { if (exportService != null || getModuleDefineHolder().has(ExporterModule.NAME)) { - if (metrics instanceof WithMetadata) { - if (exportService == null) { - exportService = getModuleDefineHolder().find(ExporterModule.NAME).provider().getService(MetricValuesExportService.class); - } - exportService.export(((WithMetadata)metrics).getMeta(), metrics); + if (exportService == null) { + exportService = getModuleDefineHolder().find(ExporterModule.NAME).provider().getService(MetricValuesExportService.class); } + exportService.export(event); } } + } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java index 513f3aec3fe5503d27e21ceb6a027fe35f800586..b4b4fd1283f182f30627fe4b02536b74cdb87b21 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java @@ -24,6 +24,7 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.*; import org.apache.skywalking.oap.server.core.UnexpectedException; import org.apache.skywalking.oap.server.core.analysis.data.*; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; +import org.apache.skywalking.oap.server.core.exporter.ExportEvent; import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; @@ -43,12 +44,12 @@ public class MetricsPersistentWorker extends PersistenceWorker mergeDataCache; private final IMetricsDAO metricsDAO; private final AbstractWorker nextAlarmWorker; - private final AbstractWorker nextExportWorker; + private final AbstractWorker nextExportWorker; private final DataCarrier dataCarrier; MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, int batchSize, IMetricsDAO metricsDAO, AbstractWorker nextAlarmWorker, - AbstractWorker nextExportWorker) { + AbstractWorker nextExportWorker) { super(moduleDefineHolder, batchSize); this.model = model; this.mergeDataCache = new MergeDataCache<>(); @@ -100,6 +101,11 @@ public class MetricsPersistentWorker extends PersistenceWorker prepareBatch(MergeDataCache cache) { List batchCollection = new LinkedList<>(); cache.getLast().collection().forEach(data -> { + if (Objects.nonNull(nextExportWorker)) { + ExportEvent event = new ExportEvent(data, ExportEvent.EventType.INCREMENT); + nextExportWorker.in(event); + } + Metrics dbData = null; try { dbData = metricsDAO.get(model, data); @@ -120,7 +126,8 @@ public class MetricsPersistentWorker extends PersistenceWorker