未验证 提交 21978205 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Support exporter runs in increment and total modes (#2840)

* Make exporter interface better.

* Fix review.
上级 3831c5ee
......@@ -26,7 +26,7 @@ import lombok.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.analysis.metrics.*;
import org.apache.skywalking.oap.server.core.exporter.MetricValuesExportService;
import org.apache.skywalking.oap.server.core.exporter.*;
import org.apache.skywalking.oap.server.exporter.grpc.*;
import org.apache.skywalking.oap.server.exporter.provider.MetricFormatter;
import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
......@@ -56,9 +56,15 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS
subscriptionSet = new HashSet<>();
}
@Override public void export(MetricsMetaInfo meta, Metrics metrics) {
if (subscriptionSet.size() == 0 || subscriptionSet.contains(meta.getMetricsName())) {
exportBuffer.produce(new ExportData(meta, metrics));
@Override public void export(ExportEvent event) {
if (ExportEvent.EventType.TOTAL == event.getType()) {
Metrics metrics = event.getMetrics();
if (metrics instanceof WithMetadata) {
MetricsMetaInfo meta = ((WithMetadata)metrics).getMeta();
if (subscriptionSet.size() == 0 || subscriptionSet.contains(meta.getMetricsName())) {
exportBuffer.produce(new ExportData(meta, metrics));
}
}
}
}
......
......@@ -19,7 +19,8 @@
package org.apache.skywalking.oap.server.exporter.provider.grpc;
import io.grpc.testing.GrpcServerRule;
import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo;
import org.apache.skywalking.oap.server.core.analysis.metrics.*;
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.exporter.grpc.MetricExportServiceGrpc;
import org.junit.Before;
......@@ -58,7 +59,14 @@ public class GRPCExporterTest {
@Test
public void export() {
exporter.export(metaInfo, new MockMetrics());
ExportEvent event = new ExportEvent(new MockExporterMetrics(), ExportEvent.EventType.TOTAL);
exporter.export(event);
}
public static class MockExporterMetrics extends MockMetrics implements WithMetadata {
@Override public MetricsMetaInfo getMeta() {
return new MetricsMetaInfo("mock-metrics", DefaultScopeDefine.ALL);
}
}
@Test
......
......@@ -18,7 +18,6 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import org.apache.skywalking.oap.server.core.analysis.metrics.*;
import org.apache.skywalking.oap.server.core.exporter.*;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
......@@ -26,21 +25,20 @@ import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
* @author wusheng
*/
public class ExportWorker extends AbstractWorker<Metrics> {
public class ExportWorker extends AbstractWorker<ExportEvent> {
private MetricValuesExportService exportService;
public ExportWorker(ModuleDefineHolder moduleDefineHolder) {
super(moduleDefineHolder);
}
@Override public void in(Metrics metrics) {
@Override public void in(ExportEvent event) {
if (exportService != null || getModuleDefineHolder().has(ExporterModule.NAME)) {
if (metrics instanceof WithMetadata) {
if (exportService == null) {
exportService = getModuleDefineHolder().find(ExporterModule.NAME).provider().getService(MetricValuesExportService.class);
}
exportService.export(((WithMetadata)metrics).getMeta(), metrics);
if (exportService == null) {
exportService = getModuleDefineHolder().find(ExporterModule.NAME).provider().getService(MetricValuesExportService.class);
}
exportService.export(event);
}
}
}
......@@ -24,6 +24,7 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.*;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
......@@ -43,12 +44,12 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
private final MergeDataCache<Metrics> mergeDataCache;
private final IMetricsDAO metricsDAO;
private final AbstractWorker<Metrics> nextAlarmWorker;
private final AbstractWorker<Metrics> nextExportWorker;
private final AbstractWorker<ExportEvent> nextExportWorker;
private final DataCarrier<Metrics> dataCarrier;
MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, int batchSize,
IMetricsDAO metricsDAO, AbstractWorker<Metrics> nextAlarmWorker,
AbstractWorker<Metrics> nextExportWorker) {
AbstractWorker<ExportEvent> nextExportWorker) {
super(moduleDefineHolder, batchSize);
this.model = model;
this.mergeDataCache = new MergeDataCache<>();
......@@ -100,6 +101,11 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
@Override public List<Object> prepareBatch(MergeDataCache<Metrics> cache) {
List<Object> batchCollection = new LinkedList<>();
cache.getLast().collection().forEach(data -> {
if (Objects.nonNull(nextExportWorker)) {
ExportEvent event = new ExportEvent(data, ExportEvent.EventType.INCREMENT);
nextExportWorker.in(event);
}
Metrics dbData = null;
try {
dbData = metricsDAO.get(model, data);
......@@ -120,7 +126,8 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
nextAlarmWorker.in(data);
}
if (Objects.nonNull(nextExportWorker)) {
nextExportWorker.in(data);
ExportEvent event = new ExportEvent(data, ExportEvent.EventType.TOTAL);
nextExportWorker.in(event);
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.exporter;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
/**
* The event for exporter {@link MetricValuesExportService} implementation processes.
* {@link #metrics} should not be changed in any case.
*
* @author wusheng
*/
@Getter
public class ExportEvent {
/**
* Fields of this should not be changed in any case.
*/
private Metrics metrics;
private EventType type;
public ExportEvent(Metrics metrics, EventType type) {
this.metrics = metrics;
this.type = type;
}
public enum EventType {
/**
* The metrics aggregated in this bulk, not include the existing persistent data.
*/
INCREMENT,
/**
* Final result of the metrics at this moment.
*/
TOTAL
}
}
......@@ -18,7 +18,6 @@
package org.apache.skywalking.oap.server.core.exporter;
import org.apache.skywalking.oap.server.core.analysis.metrics.*;
import org.apache.skywalking.oap.server.library.module.Service;
/**
......@@ -27,5 +26,10 @@ import org.apache.skywalking.oap.server.library.module.Service;
* @author wusheng
*/
public interface MetricValuesExportService extends Service {
void export(MetricsMetaInfo meta, Metrics metrics);
/**
* This method is sync-mode export, the performance effects the persistence result. Queue mode is highly recommended.
*
* @param event value is only accurate when the method invokes. Don't cache it.
*/
void export(ExportEvent event);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册