From a3a85e520edc0a6d6c7a0a838f210522f8d575d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E6=99=9F=20Wu=20Sheng?= Date: Sat, 16 Mar 2019 23:27:15 +0800 Subject: [PATCH] Support Exporter in core (#2368) * Rename ALarmSupported to WithMetadata * Finish the base of exporter. * Fix class comment. --- oap-server/exporter/pom.xml | 39 +++++++++++++++ ...g.oap.server.library.module.ModuleProvider | 17 +++++++ .../code-templates/IndicatorImplementor.ftl | 8 ++-- .../IndicatorImplementorExpected.java | 8 ++-- oap-server/pom.xml | 1 + .../oap/server/core/alarm/AlarmEntrance.java | 18 +++---- .../indicator/IndicatorMetaInfo.java} | 16 +++++-- .../indicator/WithMetadata.java} | 8 ++-- .../analysis/worker/AlarmNotifyWorker.java | 8 ++-- .../core/analysis/worker/ExportWorker.java | 48 +++++++++++++++++++ .../worker/IndicatorPersistentWorker.java | 27 ++++++----- .../analysis/worker/IndicatorProcess.java | 7 ++- .../server/core/exporter/ExporterModule.java | 36 ++++++++++++++ .../exporter/MetricValuesExportService.java | 31 ++++++++++++ ...ing.oap.server.library.module.ModuleDefine | 3 +- 15 files changed, 228 insertions(+), 47 deletions(-) create mode 100644 oap-server/exporter/pom.xml create mode 100644 oap-server/exporter/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider rename oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/{alarm/AlarmMeta.java => analysis/indicator/IndicatorMetaInfo.java} (74%) rename oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/{alarm/AlarmSupported.java => analysis/indicator/WithMetadata.java} (79%) create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/ExporterModule.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java diff --git a/oap-server/exporter/pom.xml b/oap-server/exporter/pom.xml new file mode 100644 index 0000000000..492a8e2799 --- /dev/null +++ b/oap-server/exporter/pom.xml @@ -0,0 +1,39 @@ + + + + + + oap-server + org.apache.skywalking + 6.1.0-SNAPSHOT + + 4.0.0 + + exporter + + + + org.apache.skywalking + server-core + ${project.version} + + + \ No newline at end of file diff --git a/oap-server/exporter/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/oap-server/exporter/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider new file mode 100644 index 0000000000..eafb5530d5 --- /dev/null +++ b/oap-server/exporter/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# diff --git a/oap-server/generate-tool/src/main/resources/code-templates/IndicatorImplementor.ftl b/oap-server/generate-tool/src/main/resources/code-templates/IndicatorImplementor.ftl index 8fe2b5a937..aa7be17d53 100644 --- a/oap-server/generate-tool/src/main/resources/code-templates/IndicatorImplementor.ftl +++ b/oap-server/generate-tool/src/main/resources/code-templates/IndicatorImplementor.ftl @@ -28,8 +28,6 @@ import org.apache.skywalking.oap.server.core.Const; <#break> -import org.apache.skywalking.oap.server.core.alarm.AlarmMeta; -import org.apache.skywalking.oap.server.core.alarm.AlarmSupported; import org.apache.skywalking.oap.server.core.analysis.indicator.*; import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType; import org.apache.skywalking.oap.server.core.remote.annotation.StreamData; @@ -45,7 +43,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageBuilder; @IndicatorType @StreamData @StorageEntity(name = "${tableName}", builder = ${metricName}Indicator.Builder.class, sourceScopeId = ${sourceScopeId}) -public class ${metricName}Indicator extends ${indicatorClassName} implements AlarmSupported { +public class ${metricName}Indicator extends ${indicatorClassName} implements WithMetadata { <#list fieldsFromSource as sourceField> @Setter @Getter @Column(columnName = "${sourceField.columnName}") <#if sourceField.isID()>@IDColumn private ${sourceField.typeName} ${sourceField.fieldName}; @@ -170,8 +168,8 @@ public class ${metricName}Indicator extends ${indicatorClassName} implements Ala } - @Override public AlarmMeta getAlarmMeta() { - return new AlarmMeta("${varName}", ${sourceScopeId}<#if (fieldsFromSource?size>0) ><#list fieldsFromSource as field><#if field.isID()>, ${field.fieldName}); + @Override public IndicatorMetaInfo getMeta() { + return new IndicatorMetaInfo("${varName}", ${sourceScopeId}<#if (fieldsFromSource?size>0) ><#list fieldsFromSource as field><#if field.isID()>, ${field.fieldName}); } @Override diff --git a/oap-server/generate-tool/src/test/resources/expectedFiles/IndicatorImplementorExpected.java b/oap-server/generate-tool/src/test/resources/expectedFiles/IndicatorImplementorExpected.java index 461553be94..25a102a9fa 100644 --- a/oap-server/generate-tool/src/test/resources/expectedFiles/IndicatorImplementorExpected.java +++ b/oap-server/generate-tool/src/test/resources/expectedFiles/IndicatorImplementorExpected.java @@ -21,8 +21,6 @@ package org.apache.skywalking.oap.server.core.analysis.generated.service.service import java.util.*; import lombok.*; import org.apache.skywalking.oap.server.core.Const; -import org.apache.skywalking.oap.server.core.alarm.AlarmMeta; -import org.apache.skywalking.oap.server.core.alarm.AlarmSupported; import org.apache.skywalking.oap.server.core.analysis.indicator.*; import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType; import org.apache.skywalking.oap.server.core.remote.annotation.StreamData; @@ -38,7 +36,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageBuilder; @IndicatorType @StreamData @StorageEntity(name = "service_avg", builder = ServiceAvgIndicator.Builder.class, sourceScopeId = 1) -public class ServiceAvgIndicator extends LongAvgIndicator implements AlarmSupported { +public class ServiceAvgIndicator extends LongAvgIndicator implements WithMetadata { @Setter @Getter @Column(columnName = "entity_id") @IDColumn private java.lang.String entityId; @@ -108,8 +106,8 @@ public class ServiceAvgIndicator extends LongAvgIndicator implements AlarmSuppor } - @Override public AlarmMeta getAlarmMeta() { - return new AlarmMeta("generate_indicator", 1, entityId); + @Override public IndicatorMetaInfo getMeta() { + return new IndicatorMetaInfo("generate_indicator", 1, entityId); } @Override diff --git a/oap-server/pom.xml b/oap-server/pom.xml index 77bc8a4d85..dba03700a2 100644 --- a/oap-server/pom.xml +++ b/oap-server/pom.xml @@ -41,6 +41,7 @@ generate-tool server-telemetry generate-tool-grammar + exporter diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmEntrance.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmEntrance.java index d673e46e5f..7037a4a299 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmEntrance.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmEntrance.java @@ -20,7 +20,7 @@ package org.apache.skywalking.oap.server.core.alarm; import java.util.concurrent.locks.ReentrantLock; import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; +import org.apache.skywalking.oap.server.core.analysis.indicator.*; import org.apache.skywalking.oap.server.core.cache.*; import org.apache.skywalking.oap.server.core.register.*; import org.apache.skywalking.oap.server.library.module.ModuleManager; @@ -50,33 +50,33 @@ public class AlarmEntrance { init(); - AlarmMeta alarmMeta = ((AlarmSupported)indicator).getAlarmMeta(); + IndicatorMetaInfo indicatorMetaInfo = ((WithMetadata)indicator).getMeta(); MetaInAlarm metaInAlarm; - switch (alarmMeta.getScope()) { + switch (indicatorMetaInfo.getScope()) { case SERVICE: - int serviceId = Integer.parseInt(alarmMeta.getId()); + int serviceId = Integer.parseInt(indicatorMetaInfo.getId()); ServiceInventory serviceInventory = serviceInventoryCache.get(serviceId); ServiceMetaInAlarm serviceMetaInAlarm = new ServiceMetaInAlarm(); - serviceMetaInAlarm.setIndicatorName(alarmMeta.getIndicatorName()); + serviceMetaInAlarm.setIndicatorName(indicatorMetaInfo.getIndicatorName()); serviceMetaInAlarm.setId(serviceId); serviceMetaInAlarm.setName(serviceInventory.getName()); metaInAlarm = serviceMetaInAlarm; break; case SERVICE_INSTANCE: - int serviceInstanceId = Integer.parseInt(alarmMeta.getId()); + int serviceInstanceId = Integer.parseInt(indicatorMetaInfo.getId()); ServiceInstanceInventory serviceInstanceInventory = serviceInstanceInventoryCache.get(serviceInstanceId); ServiceInstanceMetaInAlarm instanceMetaInAlarm = new ServiceInstanceMetaInAlarm(); - instanceMetaInAlarm.setIndicatorName(alarmMeta.getIndicatorName()); + instanceMetaInAlarm.setIndicatorName(indicatorMetaInfo.getIndicatorName()); instanceMetaInAlarm.setId(serviceInstanceId); instanceMetaInAlarm.setName(serviceInstanceInventory.getName()); metaInAlarm = instanceMetaInAlarm; break; case ENDPOINT: - int endpointId = Integer.parseInt(alarmMeta.getId()); + int endpointId = Integer.parseInt(indicatorMetaInfo.getId()); EndpointInventory endpointInventory = endpointInventoryCache.get(endpointId); EndpointMetaInAlarm endpointMetaInAlarm = new EndpointMetaInAlarm(); - endpointMetaInAlarm.setIndicatorName(alarmMeta.getIndicatorName()); + endpointMetaInAlarm.setIndicatorName(indicatorMetaInfo.getIndicatorName()); endpointMetaInAlarm.setId(endpointId); serviceId = endpointInventory.getServiceId(); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmMeta.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/IndicatorMetaInfo.java similarity index 74% rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmMeta.java rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/IndicatorMetaInfo.java index c751df0b4c..1251791596 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmMeta.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/IndicatorMetaInfo.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.oap.server.core.alarm; +package org.apache.skywalking.oap.server.core.analysis.indicator; import lombok.*; import org.apache.skywalking.oap.server.core.Const; @@ -24,18 +24,18 @@ import org.apache.skywalking.oap.server.core.Const; /** * @author wusheng */ -public class AlarmMeta { +public class IndicatorMetaInfo { @Setter @Getter private String indicatorName; @Setter @Getter private int scope; @Setter @Getter private String id; - public AlarmMeta(String indicatorName, int scope) { + public IndicatorMetaInfo(String indicatorName, int scope) { this.indicatorName = indicatorName; this.scope = scope; this.id = Const.EMPTY_STRING; } - public AlarmMeta(String indicatorName, int scope, String id) { + public IndicatorMetaInfo(String indicatorName, int scope, String id) { this.indicatorName = indicatorName; this.scope = scope; this.id = id; @@ -48,4 +48,12 @@ public class AlarmMeta { public void setId(String id) { this.id = id; } + + @Override public String toString() { + return "IndicatorMetaInfo{" + + "indicatorName='" + indicatorName + '\'' + + ", scope=" + scope + + ", id='" + id + '\'' + + '}'; + } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmSupported.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/WithMetadata.java similarity index 79% rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmSupported.java rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/WithMetadata.java index 9846491d50..c3a2cc3a1f 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmSupported.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/WithMetadata.java @@ -16,13 +16,13 @@ * */ -package org.apache.skywalking.oap.server.core.alarm; +package org.apache.skywalking.oap.server.core.analysis.indicator; /** - * Alarm supported interface implementor could return the {@link AlarmMeta} + * Indicator, which implement this interface, could provide {@link IndicatorMetaInfo}. * * @author wusheng */ -public interface AlarmSupported { - AlarmMeta getAlarmMeta(); +public interface WithMetadata { + IndicatorMetaInfo getMeta(); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AlarmNotifyWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AlarmNotifyWorker.java index 2b63572b90..d325648c62 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AlarmNotifyWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AlarmNotifyWorker.java @@ -18,9 +18,9 @@ package org.apache.skywalking.oap.server.core.analysis.worker; -import org.apache.skywalking.oap.server.core.alarm.AlarmEntrance; -import org.apache.skywalking.oap.server.core.alarm.AlarmSupported; -import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; +import org.apache.skywalking.oap.server.core.alarm.*; +import org.apache.skywalking.oap.server.core.analysis.indicator.*; +import org.apache.skywalking.oap.server.core.analysis.indicator.WithMetadata; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.library.module.ModuleManager; @@ -40,7 +40,7 @@ public class AlarmNotifyWorker extends AbstractWorker { } @Override public void in(Indicator indicator) { - if (indicator instanceof AlarmSupported) { + if (indicator instanceof WithMetadata) { entrance.forward(indicator); } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java new file mode 100644 index 0000000000..21ab3abaa0 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/ExportWorker.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.worker; + +import org.apache.skywalking.oap.server.core.analysis.indicator.*; +import org.apache.skywalking.oap.server.core.exporter.*; +import org.apache.skywalking.oap.server.core.worker.AbstractWorker; +import org.apache.skywalking.oap.server.library.module.ModuleManager; + +/** + * @author wusheng + */ +public class ExportWorker extends AbstractWorker { + private ModuleManager moduleManager; + private MetricValuesExportService exportService; + + public ExportWorker(int workerId, ModuleManager moduleManager) { + super(workerId); + this.moduleManager = moduleManager; + } + + @Override public void in(Indicator indicator) { + if (exportService != null || moduleManager.has(ExporterModule.NAME)) { + if (indicator instanceof WithMetadata) { + if (exportService == null) { + exportService = moduleManager.find(ExporterModule.NAME).provider().getService(MetricValuesExportService.class); + } + exportService.export(((WithMetadata)indicator).getMeta(), indicator); + } + } + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java index 53134cfa6c..13a5193396 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java @@ -18,21 +18,16 @@ package org.apache.skywalking.oap.server.core.analysis.worker; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Objects; +import java.util.*; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; import org.apache.skywalking.apm.commons.datacarrier.consumer.*; import org.apache.skywalking.oap.server.core.UnexpectedException; -import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext; -import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache; +import org.apache.skywalking.oap.server.core.analysis.data.*; import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; import org.apache.skywalking.oap.server.core.storage.IIndicatorDAO; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.library.module.ModuleManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.slf4j.*; import static java.util.Objects.nonNull; @@ -46,16 +41,19 @@ public class IndicatorPersistentWorker extends PersistenceWorker mergeDataCache; private final IIndicatorDAO indicatorDAO; - private final AbstractWorker nextWorker; + private final AbstractWorker nextAlarmWorker; + private final AbstractWorker nextExportWorker; private final DataCarrier dataCarrier; IndicatorPersistentWorker(int workerId, String modelName, int batchSize, ModuleManager moduleManager, - IIndicatorDAO indicatorDAO, AbstractWorker nextWorker) { + IIndicatorDAO indicatorDAO, AbstractWorker nextAlarmWorker, + AbstractWorker nextExportWorker) { super(moduleManager, workerId, batchSize); this.modelName = modelName; this.mergeDataCache = new MergeDataCache<>(); this.indicatorDAO = indicatorDAO; - this.nextWorker = nextWorker; + this.nextAlarmWorker = nextAlarmWorker; + this.nextExportWorker = nextExportWorker; String name = "INDICATOR_L2_AGGREGATION"; int size = BulkConsumePool.Creator.recommendMaxSize() / 8; @@ -117,8 +115,11 @@ public class IndicatorPersistentWorker extends PersistenceWorker