From 7edcd5aa33cec034047871ac1348b5b858b630f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E6=99=9F=20Wu=20Sheng?= Date: Fri, 8 May 2020 21:21:03 +0800 Subject: [PATCH] Change the model installation into the reactive module (#4761) --- .../apache/skywalking/oal/rt/OALRuntime.java | 9 +- .../oal/rt/parser/DeepAnalysisTest.java | 3 +- .../oal/rt/parser/ScriptParserTest.java | 3 +- .../oap/server/core/CoreModule.java | 8 +- .../oap/server/core/CoreModuleProvider.java | 18 +- .../analysis/StreamAnnotationListener.java | 3 +- .../server/core/analysis/StreamProcessor.java | 3 +- .../core/analysis/meter/MeterSystem.java | 230 +++++++----------- .../worker/MetricsStreamProcessor.java | 9 +- .../worker/NoneStreamingProcessor.java | 7 +- .../worker/RecordStreamProcessor.java | 7 +- .../analysis/worker/TopNStreamProcessor.java | 7 +- .../core/annotation/AnnotationListener.java | 3 +- .../core/annotation/AnnotationScan.java | 13 +- .../core/storage/annotation/Column.java | 4 +- .../{INewModel.java => ModelCreator.java} | 11 +- .../core/storage/model/ModelInstaller.java | 61 ++--- ...delOverride.java => ModelManipulator.java} | 2 +- .../core/storage/model/StorageModels.java | 43 +++- .../storage/StorageInstallerTestCase.java | 66 ----- .../core/storage/model/StorageModelsTest.java | 3 +- .../provider/PrometheusFetcherProvider.java | 19 +- .../StorageModuleElasticsearchProvider.java | 6 +- .../base/StorageEsInstaller.java | 8 +- .../StorageModuleElasticsearch7Provider.java | 5 +- .../base/StorageEs7Installer.java | 19 +- .../influxdb/InfluxStorageProvider.java | 5 +- .../plugin/influxdb/InfluxTableInstaller.java | 8 +- .../plugin/jdbc/h2/H2StorageProvider.java | 5 +- .../plugin/jdbc/h2/dao/H2TableInstaller.java | 8 +- .../jdbc/mysql/MySQLStorageProvider.java | 5 +- .../jdbc/mysql/MySQLTableInstaller.java | 6 +- .../profile/core/MockCoreModuleProvider.java | 16 +- .../mock/MockStreamAnnotationListener.java | 3 +- 34 files changed, 277 insertions(+), 349 deletions(-) rename oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/{INewModel.java => ModelCreator.java} (78%) rename oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/{IModelOverride.java => ModelManipulator.java} (95%) delete mode 100644 oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java diff --git a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/OALRuntime.java b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/OALRuntime.java index 6d7201bf9f..bd48f34298 100644 --- a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/OALRuntime.java +++ b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/OALRuntime.java @@ -64,6 +64,7 @@ import org.apache.skywalking.oap.server.core.analysis.Stream; import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener; import org.apache.skywalking.oap.server.core.oal.rt.OALCompileException; import org.apache.skywalking.oap.server.core.oal.rt.OALEngine; +import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.annotation.Column; import org.apache.skywalking.oap.server.library.module.ModuleStartException; import org.apache.skywalking.oap.server.library.util.ResourceUtils; @@ -165,7 +166,13 @@ public class OALRuntime implements OALEngine { @Override public void notifyAllListeners() throws ModuleStartException { - metricsClasses.forEach(streamAnnotationListener::notify); + for (Class metricsClass : metricsClasses) { + try { + streamAnnotationListener.notify(metricsClass); + } catch (StorageException e) { + throw new ModuleStartException(e.getMessage(), e); + } + } for (Class dispatcherClass : dispatcherClasses) { try { dispatcherDetectorListener.addIfAsSourceDispatcher(dispatcherClass); diff --git a/oap-server/oal-rt/src/test/java/org/apache/skywalking/oal/rt/parser/DeepAnalysisTest.java b/oap-server/oal-rt/src/test/java/org/apache/skywalking/oal/rt/parser/DeepAnalysisTest.java index 3966cca3c4..c86912521c 100644 --- a/oap-server/oal-rt/src/test/java/org/apache/skywalking/oal/rt/parser/DeepAnalysisTest.java +++ b/oap-server/oal-rt/src/test/java/org/apache/skywalking/oal/rt/parser/DeepAnalysisTest.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.List; import org.apache.skywalking.oap.server.core.annotation.AnnotationScan; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; +import org.apache.skywalking.oap.server.core.storage.StorageException; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -29,7 +30,7 @@ import org.junit.Test; public class DeepAnalysisTest { @BeforeClass - public static void init() throws IOException { + public static void init() throws IOException, StorageException { AnnotationScan scopeScan = new AnnotationScan(); scopeScan.registerListener(new DefaultScopeDefine.Listener()); scopeScan.scan(); diff --git a/oap-server/oal-rt/src/test/java/org/apache/skywalking/oal/rt/parser/ScriptParserTest.java b/oap-server/oal-rt/src/test/java/org/apache/skywalking/oal/rt/parser/ScriptParserTest.java index 226e391469..2b4c7c4a53 100644 --- a/oap-server/oal-rt/src/test/java/org/apache/skywalking/oal/rt/parser/ScriptParserTest.java +++ b/oap-server/oal-rt/src/test/java/org/apache/skywalking/oal/rt/parser/ScriptParserTest.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.List; import org.apache.skywalking.oap.server.core.annotation.AnnotationScan; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; +import org.apache.skywalking.oap.server.core.storage.StorageException; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -29,7 +30,7 @@ import org.junit.Test; public class ScriptParserTest { @BeforeClass - public static void init() throws IOException { + public static void init() throws IOException, StorageException { MetricsHolder.init(); AnnotationScan scopeScan = new AnnotationScan(); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java index cb10177682..dec296c665 100755 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java @@ -45,8 +45,8 @@ import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister; import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister; import org.apache.skywalking.oap.server.core.source.SourceReceiver; import org.apache.skywalking.oap.server.core.storage.model.IModelManager; -import org.apache.skywalking.oap.server.core.storage.model.IModelOverride; -import org.apache.skywalking.oap.server.core.storage.model.INewModel; +import org.apache.skywalking.oap.server.core.storage.model.ModelManipulator; +import org.apache.skywalking.oap.server.core.storage.model.ModelCreator; import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter; import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter; import org.apache.skywalking.oap.server.library.module.ModuleDefine; @@ -110,9 +110,9 @@ public class CoreModule extends ModuleDefine { } private void addInsideService(List classes) { - classes.add(INewModel.class); + classes.add(ModelCreator.class); classes.add(IModelManager.class); - classes.add(IModelOverride.class); + classes.add(ModelManipulator.class); classes.add(RemoteClientManager.class); classes.add(RemoteSenderService.class); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java index 0320650406..96aa177b43 100755 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java @@ -68,9 +68,10 @@ import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.core.source.SourceReceiver; import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl; import org.apache.skywalking.oap.server.core.storage.PersistenceTimer; +import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.model.IModelManager; -import org.apache.skywalking.oap.server.core.storage.model.IModelOverride; -import org.apache.skywalking.oap.server.core.storage.model.INewModel; +import org.apache.skywalking.oap.server.core.storage.model.ModelCreator; +import org.apache.skywalking.oap.server.core.storage.model.ModelManipulator; import org.apache.skywalking.oap.server.core.storage.model.StorageModels; import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer; import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter; @@ -155,15 +156,14 @@ public class CoreModuleProvider extends ModuleProvider { throw new ModuleStartException(e.getMessage(), e); } - MeterSystem meterSystem = MeterSystem.meterSystem(getManager()); - this.registerServiceImplementation(MeterSystem.class, meterSystem); + this.registerServiceImplementation(MeterSystem.class, new MeterSystem(getManager())); AnnotationScan oalDisable = new AnnotationScan(); oalDisable.registerListener(DisableRegister.INSTANCE); oalDisable.registerListener(new DisableRegister.SingleDisableScanListener()); try { oalDisable.scan(); - } catch (IOException e) { + } catch (IOException | StorageException e) { throw new ModuleStartException(e.getMessage(), e); } @@ -210,9 +210,9 @@ public class CoreModuleProvider extends ModuleProvider { this.registerServiceImplementation(IWorkerInstanceSetter.class, instancesService); this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager())); - this.registerServiceImplementation(INewModel.class, storageModels); + this.registerServiceImplementation(ModelCreator.class, storageModels); this.registerServiceImplementation(IModelManager.class, storageModels); - this.registerServiceImplementation(IModelOverride.class, storageModels); + this.registerServiceImplementation(ModelManipulator.class, storageModels); this.registerServiceImplementation( NetworkAddressAliasCache.class, new NetworkAddressAliasCache(moduleConfig)); @@ -256,8 +256,6 @@ public class CoreModuleProvider extends ModuleProvider { @Override public void start() throws ModuleStartException { - MeterSystem.closeMeterCreationChannel(); - grpcServer.addHandler(new RemoteServiceHandler(getManager())); grpcServer.addHandler(new HealthCheckServiceHandler()); remoteClientManager.start(); @@ -267,7 +265,7 @@ public class CoreModuleProvider extends ModuleProvider { annotationScan.scan(); oalEngine.notifyAllListeners(); - } catch (IOException | IllegalAccessException | InstantiationException e) { + } catch (IOException | IllegalAccessException | InstantiationException | StorageException e) { throw new ModuleStartException(e.getMessage(), e); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamAnnotationListener.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamAnnotationListener.java index 5d896970b2..4c75c78368 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamAnnotationListener.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamAnnotationListener.java @@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.analysis.worker.NoneStreamingProces import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor; import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor; import org.apache.skywalking.oap.server.core.annotation.AnnotationListener; +import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; /** @@ -45,7 +46,7 @@ public class StreamAnnotationListener implements AnnotationListener { @SuppressWarnings("unchecked") @Override - public void notify(Class aClass) { + public void notify(Class aClass) throws StorageException { if (aClass.isAnnotationPresent(Stream.class)) { Stream stream = (Stream) aClass.getAnnotation(Stream.class); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamProcessor.java index 11828e4a6a..5a57f4c3e5 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamProcessor.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/StreamProcessor.java @@ -18,11 +18,12 @@ package org.apache.skywalking.oap.server.core.analysis; +import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; public interface StreamProcessor { void in(STREAM stream); - void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class streamClass); + void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class streamClass) throws StorageException; } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java index 6ba9a479aa..9bddeacb3a 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java @@ -23,9 +23,7 @@ import com.google.common.reflect.ClassPath; import java.io.IOException; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import javassist.CannotCompileException; import javassist.ClassPool; @@ -34,7 +32,6 @@ import javassist.CtConstructor; import javassist.CtNewConstructor; import javassist.CtNewMethod; import javassist.NotFoundException; -import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -45,6 +42,7 @@ import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableV import org.apache.skywalking.oap.server.core.analysis.meter.function.MeterFunction; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor; +import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.Service; @@ -57,29 +55,18 @@ import org.apache.skywalking.oap.server.library.module.Service; @Slf4j public class MeterSystem implements Service { private static final String METER_CLASS_PACKAGE = "org.apache.skywalking.oap.server.core.analysis.meter.dynamic."; - private static ModuleManager MANAGER; - private static ClassPool CLASS_POOL; - private static List TO_BE_CREATED_METERS = new ArrayList<>(); - private static Map> FUNCTION_REGISTER = new HashMap<>(); + private ModuleManager manager; + private ClassPool classPool; + private Map> functionRegister = new HashMap<>(); /** * Host the dynamic meter prototype classes. These classes could be create dynamically through {@link * Object#clone()} in the runtime; */ - private static Map METER_PROTOTYPES = new HashMap<>(); - private static MeterSystem METER_SYSTEM; - private static boolean METER_CREATABLE = true; + private Map meterPrototypes = new HashMap<>(); - private MeterSystem() { - - } - - public synchronized static MeterSystem meterSystem(final ModuleManager manager) { - if (METER_SYSTEM != null) { - return METER_SYSTEM; - } - - MANAGER = manager; - CLASS_POOL = ClassPool.getDefault(); + public MeterSystem(final ModuleManager manager) { + this.manager = manager; + classPool = ClassPool.getDefault(); ClassPath classpath = null; try { @@ -97,14 +84,12 @@ public class MeterSystem implements Service { throw new IllegalArgumentException( "Function " + functionClass.getCanonicalName() + " doesn't implement AcceptableValue."); } - FUNCTION_REGISTER.put( + functionRegister.put( metricsFunction.functionName(), (Class) functionClass ); } } - METER_SYSTEM = new MeterSystem(); - return METER_SYSTEM; } /** @@ -121,124 +106,98 @@ public class MeterSystem implements Service { String functionName, ScopeType type, Class dataType) throws IllegalArgumentException { - if (!METER_CREATABLE) { - throw new IllegalStateException("Can't create new metrics anymore"); - } + /** + * Create a new meter class dynamically. + */ + final Class meterFunction = functionRegister.get(functionName); - final NewMeter newMeter = new NewMeter(metricsName, functionName, type, dataType); - if (TO_BE_CREATED_METERS.contains(newMeter)) { - return false; + if (meterFunction == null) { + throw new IllegalArgumentException("Function " + functionName + " can't be found."); } - TO_BE_CREATED_METERS.add(newMeter); - return true; - } - - /** - * Close the {@link #create(String, String, ScopeType, Class)} channel, and build the model and streaming - * definitions. - */ - public static void closeMeterCreationChannel() { - METER_CREATABLE = false; - - TO_BE_CREATED_METERS.forEach(newMeter -> { - String metricsName = newMeter.metricsName; - String functionName = newMeter.functionName; - ScopeType type = newMeter.type; - Class dataType = newMeter.dataType; - - /** - * Create a new meter class dynamically. - */ - final Class meterFunction = FUNCTION_REGISTER.get(functionName); - - if (meterFunction == null) { - throw new IllegalArgumentException("Function " + functionName + " can't be found."); - } - - boolean foundDataType = false; - String acceptance = null; - for (final Type genericInterface : meterFunction.getGenericInterfaces()) { - if (genericInterface instanceof ParameterizedType) { - ParameterizedType parameterizedType = (ParameterizedType) genericInterface; - if (parameterizedType.getRawType().getTypeName().equals(AcceptableValue.class.getName())) { - Type[] arguments = parameterizedType.getActualTypeArguments(); - if (arguments[0].equals(dataType)) { - foundDataType = true; - } else { - acceptance = arguments[0].getTypeName(); - } - } - if (foundDataType) { - break; + boolean foundDataType = false; + String acceptance = null; + for (final Type genericInterface : meterFunction.getGenericInterfaces()) { + if (genericInterface instanceof ParameterizedType) { + ParameterizedType parameterizedType = (ParameterizedType) genericInterface; + if (parameterizedType.getRawType().getTypeName().equals(AcceptableValue.class.getName())) { + Type[] arguments = parameterizedType.getActualTypeArguments(); + if (arguments[0].equals(dataType)) { + foundDataType = true; + } else { + acceptance = arguments[0].getTypeName(); } } - } - if (!foundDataType) { - throw new IllegalArgumentException("Function " + functionName - + " requires <" + acceptance + "> in AcceptableValue" - + " but using " + dataType.getName() + " in the creation"); - } - - final CtClass parentClass; - try { - parentClass = CLASS_POOL.get(meterFunction.getCanonicalName()); - if (!Metrics.class.isAssignableFrom(meterFunction)) { - throw new IllegalArgumentException( - "Function " + functionName + " doesn't inherit from Metrics."); + if (foundDataType) { + break; } - } catch (NotFoundException e) { - throw new IllegalArgumentException("Function " + functionName + " can't be found by javaassist."); - } - final String className = formatName(metricsName); - CtClass metricsClass = CLASS_POOL.makeClass(METER_CLASS_PACKAGE + className, parentClass); - - /** - * Create empty construct - */ - try { - CtConstructor defaultConstructor = CtNewConstructor.make( - "public " + className + "() {}", metricsClass); - metricsClass.addConstructor(defaultConstructor); - } catch (CannotCompileException e) { - log.error("Can't add empty constructor in " + className + ".", e); - throw new UnexpectedException(e.getMessage(), e); } + } + if (!foundDataType) { + throw new IllegalArgumentException("Function " + functionName + + " requires <" + acceptance + "> in AcceptableValue" + + " but using " + dataType.getName() + " in the creation"); + } - /** - * Generate `AcceptableValue createNew()` method. - */ - try { - metricsClass.addMethod(CtNewMethod.make( - "" - + "public org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue createNew() {" - + " return new " + METER_CLASS_PACKAGE + className + "();" - + " }" - , metricsClass)); - } catch (CannotCompileException e) { - log.error("Can't generate createNew method for " + className + ".", e); - throw new UnexpectedException(e.getMessage(), e); + final CtClass parentClass; + try { + parentClass = classPool.get(meterFunction.getCanonicalName()); + if (!Metrics.class.isAssignableFrom(meterFunction)) { + throw new IllegalArgumentException( + "Function " + functionName + " doesn't inherit from Metrics."); } + } catch (NotFoundException e) { + throw new IllegalArgumentException("Function " + functionName + " can't be found by javaassist."); + } + final String className = formatName(metricsName); + CtClass metricsClass = classPool.makeClass(METER_CLASS_PACKAGE + className, parentClass); - Class targetClass; - try { - targetClass = metricsClass.toClass(MeterSystem.class.getClassLoader(), null); - AcceptableValue prototype = (AcceptableValue) targetClass.newInstance(); - METER_PROTOTYPES.put(metricsName, new MeterDefinition(type, prototype, dataType)); + /** + * Create empty construct + */ + try { + CtConstructor defaultConstructor = CtNewConstructor.make( + "public " + className + "() {}", metricsClass); + metricsClass.addConstructor(defaultConstructor); + } catch (CannotCompileException e) { + log.error("Can't add empty constructor in " + className + ".", e); + throw new UnexpectedException(e.getMessage(), e); + } - log.debug("Generate metrics class, " + metricsClass.getName()); + /** + * Generate `AcceptableValue createNew()` method. + */ + try { + metricsClass.addMethod(CtNewMethod.make( + "" + + "public org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue createNew() {" + + " return new " + METER_CLASS_PACKAGE + className + "();" + + " }" + , metricsClass)); + } catch (CannotCompileException e) { + log.error("Can't generate createNew method for " + className + ".", e); + throw new UnexpectedException(e.getMessage(), e); + } - MetricsStreamProcessor.getInstance().create( - MANAGER, - new StreamDefinition( - metricsName, type.getScopeId(), prototype.builder(), MetricsStreamProcessor.class), - targetClass - ); - } catch (CannotCompileException | IllegalAccessException | InstantiationException e) { - log.error("Can't compile/load/init " + className + ".", e); - throw new UnexpectedException(e.getMessage(), e); - } - }); + Class targetClass; + try { + targetClass = metricsClass.toClass(MeterSystem.class.getClassLoader(), null); + AcceptableValue prototype = (AcceptableValue) targetClass.newInstance(); + meterPrototypes.put(metricsName, new MeterDefinition(type, prototype, dataType)); + + log.debug("Generate metrics class, " + metricsClass.getName()); + + MetricsStreamProcessor.getInstance().create( + manager, + new StreamDefinition( + metricsName, type.getScopeId(), prototype.builder(), MetricsStreamProcessor.class), + targetClass + ); + } catch (CannotCompileException | IllegalAccessException | InstantiationException | StorageException e) { + log.error("Can't compile/load/init " + className + ".", e); + throw new UnexpectedException(e.getMessage(), e); + } + return true; } /** @@ -253,7 +212,7 @@ public class MeterSystem implements Service { */ public AcceptableValue buildMetrics(String metricsName, Class dataType) { - MeterDefinition meterDefinition = METER_PROTOTYPES.get(metricsName); + MeterDefinition meterDefinition = meterPrototypes.get(metricsName); if (meterDefinition == null) { throw new IllegalArgumentException("Uncreated metrics " + metricsName); } @@ -284,15 +243,6 @@ public class MeterSystem implements Service { return metricsName.toLowerCase(); } - @RequiredArgsConstructor - @EqualsAndHashCode - public static class NewMeter { - private final String metricsName; - private final String functionName; - private final ScopeType type; - private final Class dataType; - } - @RequiredArgsConstructor @Getter private static class MeterDefinition { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java index 5452cb2965..a9b49b2750 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java @@ -36,9 +36,10 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.config.DownSamplingConfigService; import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; import org.apache.skywalking.oap.server.core.storage.StorageDAO; +import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.StorageModule; import org.apache.skywalking.oap.server.core.storage.annotation.Storage; -import org.apache.skywalking.oap.server.core.storage.model.INewModel; +import org.apache.skywalking.oap.server.core.storage.model.ModelCreator; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; @@ -92,14 +93,14 @@ public class MetricsStreamProcessor implements StreamProcessor { * @param stream definition of the metrics class. * @param metricsClass data type of the streaming calculation. */ - public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class metricsClass) { + public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class metricsClass) throws StorageException { this.create(moduleDefineHolder, StreamDefinition.from(stream), metricsClass); } @SuppressWarnings("unchecked") public void create(ModuleDefineHolder moduleDefineHolder, StreamDefinition stream, - Class metricsClass) { + Class metricsClass) throws StorageException { if (DisableRegister.INSTANCE.include(stream.getName())) { return; } @@ -112,7 +113,7 @@ public class MetricsStreamProcessor implements StreamProcessor { throw new UnexpectedException("Create " + stream.getBuilder().getSimpleName() + " metrics DAO failure.", e); } - INewModel modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(INewModel.class); + ModelCreator modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ModelCreator.class); DownSamplingConfigService configService = moduleDefineHolder.find(CoreModule.NAME) .provider() .getService(DownSamplingConfigService.class); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/NoneStreamingProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/NoneStreamingProcessor.java index 321377e91e..cda63c2eec 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/NoneStreamingProcessor.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/NoneStreamingProcessor.java @@ -29,9 +29,10 @@ import org.apache.skywalking.oap.server.core.analysis.StreamProcessor; import org.apache.skywalking.oap.server.core.analysis.config.NoneStream; import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO; import org.apache.skywalking.oap.server.core.storage.StorageDAO; +import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.StorageModule; import org.apache.skywalking.oap.server.core.storage.annotation.Storage; -import org.apache.skywalking.oap.server.core.storage.model.INewModel; +import org.apache.skywalking.oap.server.core.storage.model.ModelCreator; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; @@ -58,7 +59,7 @@ public class NoneStreamingProcessor implements StreamProcessor { } @Override - public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class streamClass) { + public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class streamClass) throws StorageException { if (DisableRegister.INSTANCE.include(stream.name())) { return; } @@ -72,7 +73,7 @@ public class NoneStreamingProcessor implements StreamProcessor { .getSimpleName() + " none stream record DAO failure.", e); } - INewModel modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(INewModel.class); + ModelCreator modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ModelCreator.class); Model model = modelSetter.add(streamClass, stream.scopeId(), new Storage(stream.name(), DownSampling.Second), true); final NoneStreamPersistentWorker persistentWorker = new NoneStreamPersistentWorker(moduleDefineHolder, model, noneStream); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java index ca17aea17f..30f9666d0e 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java @@ -29,9 +29,10 @@ import org.apache.skywalking.oap.server.core.analysis.StreamProcessor; import org.apache.skywalking.oap.server.core.analysis.record.Record; import org.apache.skywalking.oap.server.core.storage.IRecordDAO; import org.apache.skywalking.oap.server.core.storage.StorageDAO; +import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.StorageModule; import org.apache.skywalking.oap.server.core.storage.annotation.Storage; -import org.apache.skywalking.oap.server.core.storage.model.INewModel; +import org.apache.skywalking.oap.server.core.storage.model.ModelCreator; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; @@ -53,7 +54,7 @@ public class RecordStreamProcessor implements StreamProcessor { } @SuppressWarnings("unchecked") - public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class recordClass) { + public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class recordClass) throws StorageException { if (DisableRegister.INSTANCE.include(stream.name())) { return; } @@ -66,7 +67,7 @@ public class RecordStreamProcessor implements StreamProcessor { throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " record DAO failure.", e); } - INewModel modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(INewModel.class); + ModelCreator modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ModelCreator.class); Model model = modelSetter.add( recordClass, stream.scopeId(), new Storage(stream.name(), DownSampling.Second), true); RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, recordDAO); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java index e8900d5ba4..8cfc99fa99 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java @@ -34,9 +34,10 @@ import org.apache.skywalking.oap.server.core.analysis.record.Record; import org.apache.skywalking.oap.server.core.analysis.topn.TopN; import org.apache.skywalking.oap.server.core.storage.IRecordDAO; import org.apache.skywalking.oap.server.core.storage.StorageDAO; +import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.StorageModule; import org.apache.skywalking.oap.server.core.storage.annotation.Storage; -import org.apache.skywalking.oap.server.core.storage.model.INewModel; +import org.apache.skywalking.oap.server.core.storage.model.ModelCreator; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; @@ -63,7 +64,7 @@ public class TopNStreamProcessor implements StreamProcessor { } @SuppressWarnings("unchecked") - public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class topNClass) { + public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class topNClass) throws StorageException { if (DisableRegister.INSTANCE.include(stream.name())) { return; } @@ -77,7 +78,7 @@ public class TopNStreamProcessor implements StreamProcessor { .getSimpleName() + " top n record DAO failure.", e); } - INewModel modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(INewModel.class); + ModelCreator modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ModelCreator.class); Model model = modelSetter.add( topNClass, stream.scopeId(), new Storage(stream.name(), DownSampling.Second), true); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/annotation/AnnotationListener.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/annotation/AnnotationListener.java index 31cb4cd423..9f1915e9ac 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/annotation/AnnotationListener.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/annotation/AnnotationListener.java @@ -19,10 +19,11 @@ package org.apache.skywalking.oap.server.core.annotation; import java.lang.annotation.Annotation; +import org.apache.skywalking.oap.server.core.storage.StorageException; public interface AnnotationListener { Class annotation(); - void notify(Class aClass); + void notify(Class aClass) throws StorageException; } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/annotation/AnnotationScan.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/annotation/AnnotationScan.java index 5a1f224878..1551c1ea6e 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/annotation/AnnotationScan.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/annotation/AnnotationScan.java @@ -25,6 +25,7 @@ import java.lang.annotation.Annotation; import java.util.Comparator; import java.util.LinkedList; import java.util.List; +import org.apache.skywalking.oap.server.core.storage.StorageException; /** * Scan the annotation, and notify the listener(s) @@ -49,7 +50,7 @@ public class AnnotationScan { /** * Begin to scan classes. */ - public void scan() throws IOException { + public void scan() throws IOException, StorageException { ClassPath classpath = ClassPath.from(this.getClass().getClassLoader()); ImmutableSet classes = classpath.getTopLevelClassesRecursive("org.apache.skywalking"); for (ClassPath.ClassInfo classInfo : classes) { @@ -62,7 +63,9 @@ public class AnnotationScan { } } - listeners.forEach(AnnotationListenerCache::complete); + for (AnnotationListenerCache listener : listeners) { + listener.complete(); + } } private class AnnotationListenerCache { @@ -82,9 +85,11 @@ public class AnnotationScan { matchedClass.add(aClass); } - private void complete() { + private void complete() throws StorageException { matchedClass.sort(Comparator.comparing(Class::getName)); - matchedClass.forEach(aClass -> listener.notify(aClass)); + for (Class aClass : matchedClass) { + listener.notify(aClass); + } } } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java index 364b05ec00..83c6e529ca 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java @@ -24,7 +24,7 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import lombok.Getter; import org.apache.skywalking.oap.server.core.query.sql.Function; -import org.apache.skywalking.oap.server.core.storage.model.IModelOverride; +import org.apache.skywalking.oap.server.core.storage.model.ModelManipulator; /** * Data column of all persistent entity. @@ -34,7 +34,7 @@ import org.apache.skywalking.oap.server.core.storage.model.IModelOverride; public @interface Column { /** * column name in the storage. Most of the storage will keep the name consistently. But in same cases, this name - * could be a keyword, then, the implementation will use {@link IModelOverride} to replace the column name. + * could be a keyword, then, the implementation will use {@link ModelManipulator} to replace the column name. */ String columnName(); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/INewModel.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelCreator.java similarity index 78% rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/INewModel.java rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelCreator.java index 15df48ee9f..3f75a75401 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/INewModel.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelCreator.java @@ -18,17 +18,24 @@ package org.apache.skywalking.oap.server.core.storage.model; +import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.annotation.Storage; import org.apache.skywalking.oap.server.library.module.Service; /** * INewModel implementation supports creating a new module. */ -public interface INewModel extends Service { +public interface ModelCreator extends Service { /** * Add a new model * * @return the created new model */ - Model add(Class aClass, int scopeId, Storage storage, boolean record); + Model add(Class aClass, int scopeId, Storage storage, boolean record) throws StorageException; + + void addModelListener(CreatingListener listener) throws StorageException; + + interface CreatingListener { + void whenCreating(Model model) throws StorageException; + } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java index e890f53cba..6b9bd7141d 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java @@ -18,7 +18,7 @@ package org.apache.skywalking.oap.server.core.storage.model; -import java.util.List; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.RunningMode; @@ -29,63 +29,52 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager; /** * The core module installation controller. */ +@RequiredArgsConstructor @Slf4j -public abstract class ModelInstaller { +public abstract class ModelInstaller implements ModelCreator.CreatingListener { + protected final Client client; private final ModuleManager moduleManager; - public ModelInstaller(ModuleManager moduleManager) { - this.moduleManager = moduleManager; - } - - /** - * Entrance of the storage entity installation work. - */ - public final void install(Client client) throws StorageException { - IModelManager modelGetter = moduleManager.find(CoreModule.NAME).provider().getService(IModelManager.class); - - List models = modelGetter.allModels(); - + public void whenCreating(Model model) throws StorageException { if (RunningMode.isNoInitMode()) { - for (Model model : models) { - while (!isExists(client, model)) { - try { - log.info( - "table: {} does not exist. OAP is running in 'no-init' mode, waiting... retry 3s later.", - model - .getName() - ); - Thread.sleep(3000L); - } catch (InterruptedException e) { - log.error(e.getMessage()); - } + while (!isExists(model)) { + try { + log.info( + "table: {} does not exist. OAP is running in 'no-init' mode, waiting... retry 3s later.", + model + .getName() + ); + Thread.sleep(3000L); + } catch (InterruptedException e) { + log.error(e.getMessage()); } } } else { - for (Model model : models) { - if (!isExists(client, model)) { - log.info("table: {} does not exist", model.getName()); - createTable(client, model); - } + if (!isExists(model)) { + log.info("table: {} does not exist", model.getName()); + createTable(model); } } } /** * Installer implementation could use this API to request a column name replacement. This method delegates for - * {@link IModelOverride}. + * {@link ModelManipulator}. */ protected final void overrideColumnName(String columnName, String newName) { - IModelOverride modelOverride = moduleManager.find(CoreModule.NAME).provider().getService(IModelOverride.class); + ModelManipulator modelOverride = moduleManager.find(CoreModule.NAME) + .provider() + .getService(ModelManipulator.class); modelOverride.overrideColumnName(columnName, newName); } /** * Check whether the storage entity exists. Need to implement based on the real storage. */ - protected abstract boolean isExists(Client client, Model model) throws StorageException; + protected abstract boolean isExists(Model model) throws StorageException; /** - * Create the storage entity. All creations should be after the {@link #isExists(Client, Model)} check. + * Create the storage entity. All creations should be after the {@link #isExists(Model)} check. */ - protected abstract void createTable(Client client, Model model) throws StorageException; + protected abstract void createTable(Model model) throws StorageException; } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelOverride.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelManipulator.java similarity index 95% rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelOverride.java rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelManipulator.java index 91bbeeca11..e58d365449 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelOverride.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelManipulator.java @@ -23,6 +23,6 @@ import org.apache.skywalking.oap.server.library.module.Service; /** * Override service provides ways to rename the existing column or table name. */ -public interface IModelOverride extends Service { +public interface ModelManipulator extends Service { void overrideColumnName(String columnName, String newName); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java index 93e7d304dd..66fd998188 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java @@ -20,11 +20,12 @@ package org.apache.skywalking.oap.server.core.storage.model; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Collections; -import java.util.LinkedList; +import java.util.HashMap; import java.util.List; import java.util.Objects; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; +import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.annotation.Column; import org.apache.skywalking.oap.server.core.storage.annotation.MultipleQueryUnifiedIndex; import org.apache.skywalking.oap.server.core.storage.annotation.QueryUnifiedIndex; @@ -35,15 +36,19 @@ import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnMetad * StorageModels manages all models detected by the core. */ @Slf4j -public class StorageModels implements IModelManager, INewModel, IModelOverride { +public class StorageModels implements IModelManager, ModelCreator, ModelManipulator { private final List models; + private final HashMap columnNameOverrideRule; + private final List listeners; public StorageModels() { - this.models = new LinkedList<>(); + this.models = new ArrayList<>(); + this.columnNameOverrideRule = new HashMap<>(); + this.listeners = new ArrayList<>(); } @Override - public Model add(Class aClass, int scopeId, Storage storage, boolean record) { + public Model add(Class aClass, int scopeId, Storage storage, boolean record) throws StorageException { // Check this scope id is valid. DefaultScopeDefine.nameOf(scopeId); @@ -55,11 +60,30 @@ public class StorageModels implements IModelManager, INewModel, IModelOverride { storage.getModelName(), modelColumns, extraQueryIndices, scopeId, storage.getDownsampling(), record ); + this.followColumnNameRules(model); models.add(model); + for (final CreatingListener listener : listeners) { + listener.whenCreating(model); + } return model; } + /** + * CreatingListener listener could react when {@link #add(Class, int, Storage, boolean)} model happens. Also, the + * added models are being notified in this add operation. + */ + @Override + public void addModelListener(final CreatingListener listener) throws StorageException { + listeners.add(listener); + for (Model model : models) { + listener.whenCreating(model); + } + } + + /** + * Read model column metadata based on the class level definition. + */ private void retrieval(Class clazz, String modelName, List modelColumns, @@ -108,9 +132,14 @@ public class StorageModels implements IModelManager, INewModel, IModelOverride { @Override public void overrideColumnName(String columnName, String newName) { - models.forEach(model -> { - model.getColumns().forEach(column -> column.getColumnName().overrideName(columnName, newName)); - model.getExtraQueryIndices().forEach(extraQueryIndex -> extraQueryIndex.overrideName(columnName, newName)); + columnNameOverrideRule.put(columnName, newName); + models.forEach(this::followColumnNameRules); + } + + private void followColumnNameRules(Model model) { + columnNameOverrideRule.forEach((oldName, newName) -> { + model.getColumns().forEach(column -> column.getColumnName().overrideName(oldName, newName)); + model.getExtraQueryIndices().forEach(extraQueryIndex -> extraQueryIndex.overrideName(oldName, newName)); }); } diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java deleted file mode 100644 index b96571b435..0000000000 --- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.oap.server.core.storage; - -import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.CoreModuleProvider; -import org.apache.skywalking.oap.server.core.storage.model.Model; -import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller; -import org.apache.skywalking.oap.server.library.client.Client; -import org.apache.skywalking.oap.server.library.module.ModuleManager; -import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; -import org.junit.Test; -import org.mockito.Mockito; -import org.powermock.reflect.Whitebox; - -public class StorageInstallerTestCase { - - @Test - public void testInstall() throws StorageException, ServiceNotProvidedException { - CoreModuleProvider moduleProvider = Mockito.mock(CoreModuleProvider.class); - CoreModule moduleDefine = Mockito.spy(CoreModule.class); - ModuleManager moduleManager = Mockito.mock(ModuleManager.class); - - Whitebox.setInternalState(moduleDefine, "loadedProvider", moduleProvider); - - Mockito.when(moduleManager.find(CoreModule.NAME)).thenReturn(moduleDefine); - - // streamDataMapping.generate(); - - // TestStorageInstaller installer = new TestStorageInstaller(moduleManager); - // installer.install(null); - } - - class TestStorageInstaller extends ModelInstaller { - - public TestStorageInstaller(ModuleManager moduleManager) { - super(moduleManager); - } - - @Override - protected boolean isExists(Client client, Model tableDefine) throws StorageException { - return false; - } - - @Override - protected void createTable(Client client, Model tableDefine) throws StorageException { - - } - } -} diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/StorageModelsTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/StorageModelsTest.java index 2da971e965..23f4431e75 100644 --- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/StorageModelsTest.java +++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/StorageModelsTest.java @@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProces import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.core.storage.StorageBuilder; import org.apache.skywalking.oap.server.core.storage.StorageData; +import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.annotation.Column; import org.apache.skywalking.oap.server.core.storage.annotation.QueryUnifiedIndex; import org.apache.skywalking.oap.server.core.storage.annotation.Storage; @@ -47,7 +48,7 @@ public class StorageModelsTest { } @Test - public void testStorageModels() { + public void testStorageModels() throws StorageException { StorageModels models = new StorageModels(); models.add(TestModel.class, -1, new Storage("StorageModelsTest", DownSampling.Hour), diff --git a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java index 0e2a00f040..9c4a730455 100644 --- a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java +++ b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java @@ -59,17 +59,6 @@ public class PrometheusFetcherProvider extends ModuleProvider { @Override public void prepare() throws ServiceNotProvidedException, ModuleStartException { - if (config.isActive()) { - // TODO. This is only a demo about creating new metrics - // We should create it based on metrics configuration. - final MeterSystem meterSystem = MeterSystem.meterSystem(getManager()); - meterSystem.create("test_long_metrics", "avg", ScopeType.SERVICE, Long.class); - meterSystem.create("test_histogram_metrics", "histogram", ScopeType.SERVICE, BucketedValues.class); - meterSystem.create( - "test_percentile_metrics", "percentile", ScopeType.SERVICE, - PercentileFunction.PercentileArgument.class - ); - } } @Override @@ -82,6 +71,14 @@ public class PrometheusFetcherProvider extends ModuleProvider { if (config.isActive()) { // TODO. This is only a demo about fetching the data and push into the calculation stream. final MeterSystem service = getManager().find(CoreModule.NAME).provider().getService(MeterSystem.class); + + service.create("test_long_metrics", "avg", ScopeType.SERVICE, Long.class); + service.create("test_histogram_metrics", "histogram", ScopeType.SERVICE, BucketedValues.class); + service.create( + "test_percentile_metrics", "percentile", ScopeType.SERVICE, + PercentileFunction.PercentileArgument.class + ); + Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new Runnable() { @Override public void run() { diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java index 6d6023f90b..e5f51fb4fc 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java @@ -35,6 +35,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageDAO; import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.StorageModule; import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO; +import org.apache.skywalking.oap.server.core.storage.model.ModelCreator; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO; @@ -178,10 +179,9 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider { public void start() throws ModuleStartException { try { elasticSearchClient.connect(); + StorageEsInstaller installer = new StorageEsInstaller(elasticSearchClient, getManager(), config); - StorageEsInstaller installer = new StorageEsInstaller(getManager(), config); - installer.install(elasticSearchClient); - + getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer); } catch (StorageException | IOException | KeyStoreException | NoSuchAlgorithmException | KeyManagementException | CertificateException e) { throw new ModuleStartException(e.getMessage(), e); } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java index bf53041ca1..ddc0d9def4 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java @@ -41,14 +41,14 @@ public class StorageEsInstaller extends ModelInstaller { private final StorageModuleElasticsearchConfig config; protected final ColumnTypeEsMapping columnTypeEsMapping; - public StorageEsInstaller(ModuleManager moduleManager, final StorageModuleElasticsearchConfig config) { - super(moduleManager); + public StorageEsInstaller(Client client, ModuleManager moduleManager, final StorageModuleElasticsearchConfig config) { + super(client, moduleManager); this.columnTypeEsMapping = new ColumnTypeEsMapping(); this.config = config; } @Override - protected boolean isExists(Client client, Model model) throws StorageException { + protected boolean isExists(Model model) throws StorageException { ElasticSearchClient esClient = (ElasticSearchClient) client; try { String timeSeriesIndexName = TimeSeriesUtils.latestWriteIndexName(model); @@ -59,7 +59,7 @@ public class StorageEsInstaller extends ModelInstaller { } @Override - protected void createTable(Client client, Model model) throws StorageException { + protected void createTable(Model model) throws StorageException { ElasticSearchClient esClient = (ElasticSearchClient) client; Map settings = createSetting(model.isRecord()); diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java index 375f12abcc..aa740df0cd 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java @@ -33,6 +33,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageDAO; import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.StorageModule; import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO; +import org.apache.skywalking.oap.server.core.storage.model.ModelCreator; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO; @@ -184,8 +185,8 @@ public class StorageModuleElasticsearch7Provider extends ModuleProvider { try { elasticSearch7Client.connect(); - StorageEs7Installer installer = new StorageEs7Installer(getManager(), config); - installer.install(elasticSearch7Client); + StorageEs7Installer installer = new StorageEs7Installer(elasticSearch7Client, getManager(), config); + getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer); } catch (StorageException | IOException | KeyStoreException | NoSuchAlgorithmException | KeyManagementException | CertificateException e) { throw new ModuleStartException(e.getMessage(), e); } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/base/StorageEs7Installer.java b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/base/StorageEs7Installer.java index 2e7d286203..d36be6da63 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/base/StorageEs7Installer.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/base/StorageEs7Installer.java @@ -17,22 +17,21 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.base; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.core.storage.model.Model; +import org.apache.skywalking.oap.server.library.client.Client; import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.StorageEsInstaller; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.StorageModuleElasticsearch7Config; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; +@Slf4j public class StorageEs7Installer extends StorageEsInstaller { - - private static final Logger logger = LoggerFactory.getLogger(StorageEs7Installer.class); - - public StorageEs7Installer(final ModuleManager moduleManager, final StorageModuleElasticsearch7Config config) { - super(moduleManager, config); + public StorageEs7Installer(final Client client, + final ModuleManager moduleManager, + final StorageModuleElasticsearch7Config config) { + super(client, moduleManager, config); } @SuppressWarnings("unchecked") @@ -41,7 +40,7 @@ public class StorageEs7Installer extends StorageEsInstaller { Map type = (Map) mapping.remove(ElasticSearchClient.TYPE); mapping.put("properties", type.get("properties")); - logger.debug("elasticsearch index template setting: {}", mapping.toString()); + log.debug("elasticsearch index template setting: {}", mapping.toString()); return mapping; } diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxStorageProvider.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxStorageProvider.java index ae677488ca..a0e7409e62 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxStorageProvider.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxStorageProvider.java @@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageDAO; import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.StorageModule; import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO; +import org.apache.skywalking.oap.server.core.storage.model.ModelCreator; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO; @@ -114,9 +115,9 @@ public class InfluxStorageProvider extends ModuleProvider { public void start() throws ServiceNotProvidedException, ModuleStartException { client.connect(); - InfluxTableInstaller installer = new InfluxTableInstaller(getManager()); + InfluxTableInstaller installer = new InfluxTableInstaller(client, getManager()); try { - installer.install(client); + getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer); } catch (StorageException e) { throw new ModuleStartException(e.getMessage(), e); } diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxTableInstaller.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxTableInstaller.java index 585df52d0f..0246128a28 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxTableInstaller.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxTableInstaller.java @@ -26,18 +26,18 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager; public class InfluxTableInstaller extends ModelInstaller { - public InfluxTableInstaller(ModuleManager moduleManager) { - super(moduleManager); + public InfluxTableInstaller(Client client, ModuleManager moduleManager) { + super(client, moduleManager); } @Override - protected boolean isExists(final Client client, final Model model) throws StorageException { + protected boolean isExists(final Model model) throws StorageException { TableMetaInfo.addModel(model); return true; } @Override - protected void createTable(final Client client, final Model model) throws StorageException { + protected void createTable(final Model model) throws StorageException { // Automatically create table } } diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java index 346077818f..59e814238d 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java @@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageDAO; import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.StorageModule; import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO; +import org.apache.skywalking.oap.server.core.storage.model.ModelCreator; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO; @@ -132,8 +133,8 @@ public class H2StorageProvider extends ModuleProvider { try { h2Client.connect(); - H2TableInstaller installer = new H2TableInstaller(getManager()); - installer.install(h2Client); + H2TableInstaller installer = new H2TableInstaller(h2Client, getManager()); + getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer); } catch (StorageException e) { throw new ModuleStartException(e.getMessage(), e); } diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java index 53e5266ba5..ebbd958285 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java @@ -44,18 +44,18 @@ import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo; public class H2TableInstaller extends ModelInstaller { public static final String ID_COLUMN = "id"; - public H2TableInstaller(ModuleManager moduleManager) { - super(moduleManager); + public H2TableInstaller(Client client, ModuleManager moduleManager) { + super(client, moduleManager); } @Override - protected boolean isExists(Client client, Model model) throws StorageException { + protected boolean isExists(Model model) throws StorageException { TableMetaInfo.addModel(model); return false; } @Override - protected void createTable(Client client, Model model) throws StorageException { + protected void createTable(Model model) throws StorageException { JDBCHikariCPClient jdbcHikariCPClient = (JDBCHikariCPClient) client; try (Connection connection = jdbcHikariCPClient.getConnection()) { SQLBuilder tableCreateSQL = new SQLBuilder("CREATE TABLE IF NOT EXISTS " + model.getName() + " ("); diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java index b32fb6dd59..c047807da0 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java @@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageDAO; import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.StorageModule; import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO; +import org.apache.skywalking.oap.server.core.storage.model.ModelCreator; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO; @@ -122,8 +123,8 @@ public class MySQLStorageProvider extends ModuleProvider { try { mysqlClient.connect(); - MySQLTableInstaller installer = new MySQLTableInstaller(getManager()); - installer.install(mysqlClient); + MySQLTableInstaller installer = new MySQLTableInstaller(mysqlClient, getManager()); + getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer); } catch (StorageException e) { throw new ModuleStartException(e.getMessage(), e); } diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java index a31a8d75e1..609140c0d6 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java @@ -40,8 +40,8 @@ import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TableInstal */ @Slf4j public class MySQLTableInstaller extends H2TableInstaller { - public MySQLTableInstaller(ModuleManager moduleManager) { - super(moduleManager); + public MySQLTableInstaller(Client client, ModuleManager moduleManager) { + super(client, moduleManager); /* * Override column because the default column names in core have syntax conflict with MySQL. */ @@ -50,7 +50,7 @@ public class MySQLTableInstaller extends H2TableInstaller { } @Override - protected boolean isExists(Client client, Model model) throws StorageException { + protected boolean isExists(Model model) throws StorageException { TableMetaInfo.addModel(model); JDBCHikariCPClient h2Client = (JDBCHikariCPClient) client; try (Connection conn = h2Client.getConnection()) { diff --git a/oap-server/server-tools/profile-exporter/tool-profile-snapshot-server-mock/src/main/java/org/apache/skywalking/oap/server/tool/profile/core/MockCoreModuleProvider.java b/oap-server/server-tools/profile-exporter/tool-profile-snapshot-server-mock/src/main/java/org/apache/skywalking/oap/server/tool/profile/core/MockCoreModuleProvider.java index 412cb79d19..e2451d5386 100755 --- a/oap-server/server-tools/profile-exporter/tool-profile-snapshot-server-mock/src/main/java/org/apache/skywalking/oap/server/tool/profile/core/MockCoreModuleProvider.java +++ b/oap-server/server-tools/profile-exporter/tool-profile-snapshot-server-mock/src/main/java/org/apache/skywalking/oap/server/tool/profile/core/MockCoreModuleProvider.java @@ -49,9 +49,10 @@ import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister; import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.core.source.SourceReceiver; +import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.model.IModelManager; -import org.apache.skywalking.oap.server.core.storage.model.IModelOverride; -import org.apache.skywalking.oap.server.core.storage.model.INewModel; +import org.apache.skywalking.oap.server.core.storage.model.ModelCreator; +import org.apache.skywalking.oap.server.core.storage.model.ModelManipulator; import org.apache.skywalking.oap.server.core.storage.model.StorageModels; import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter; import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter; @@ -108,8 +109,7 @@ public class MockCoreModuleProvider extends CoreModuleProvider { throw new ModuleStartException(e.getMessage(), e); } - MeterSystem meterSystem = MeterSystem.meterSystem(getManager()); - this.registerServiceImplementation(MeterSystem.class, meterSystem); + this.registerServiceImplementation(MeterSystem.class, new MeterSystem(getManager())); CoreModuleConfig moduleConfig = new CoreModuleConfig(); this.registerServiceImplementation(ConfigService.class, new ConfigService(moduleConfig)); @@ -129,9 +129,9 @@ public class MockCoreModuleProvider extends CoreModuleProvider { this.registerServiceImplementation(IWorkerInstanceSetter.class, instancesService); this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager())); - this.registerServiceImplementation(INewModel.class, storageModels); + this.registerServiceImplementation(ModelCreator.class, storageModels); this.registerServiceImplementation(IModelManager.class, storageModels); - this.registerServiceImplementation(IModelOverride.class, storageModels); + this.registerServiceImplementation(ModelManipulator.class, storageModels); this.registerServiceImplementation( NetworkAddressAliasCache.class, new NetworkAddressAliasCache(moduleConfig)); @@ -160,11 +160,9 @@ public class MockCoreModuleProvider extends CoreModuleProvider { @Override public void start() throws ModuleStartException { - MeterSystem.closeMeterCreationChannel(); - try { annotationScan.scan(); - } catch (IOException e) { + } catch (IOException | StorageException e) { throw new ModuleStartException(e.getMessage(), e); } } diff --git a/oap-server/server-tools/profile-exporter/tool-profile-snapshot-server-mock/src/main/java/org/apache/skywalking/oap/server/tool/profile/core/mock/MockStreamAnnotationListener.java b/oap-server/server-tools/profile-exporter/tool-profile-snapshot-server-mock/src/main/java/org/apache/skywalking/oap/server/tool/profile/core/mock/MockStreamAnnotationListener.java index 350c1eb2f9..c39c00e10e 100644 --- a/oap-server/server-tools/profile-exporter/tool-profile-snapshot-server-mock/src/main/java/org/apache/skywalking/oap/server/tool/profile/core/mock/MockStreamAnnotationListener.java +++ b/oap-server/server-tools/profile-exporter/tool-profile-snapshot-server-mock/src/main/java/org/apache/skywalking/oap/server/tool/profile/core/mock/MockStreamAnnotationListener.java @@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener; import org.apache.skywalking.oap.server.core.analysis.worker.NoneStreamingProcessor; import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor; import org.apache.skywalking.oap.server.core.annotation.AnnotationListener; +import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; /** @@ -44,7 +45,7 @@ public class MockStreamAnnotationListener implements AnnotationListener { } @Override - public void notify(Class aClass) { + public void notify(Class aClass) throws StorageException { if (aClass.isAnnotationPresent(Stream.class)) { Stream stream = (Stream) aClass.getAnnotation(Stream.class); -- GitLab