未验证 提交 7edcd5aa 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Change the model installation into the reactive module (#4761)

上级 1d5a0b0f
...@@ -64,6 +64,7 @@ import org.apache.skywalking.oap.server.core.analysis.Stream; ...@@ -64,6 +64,7 @@ import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener; import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener;
import org.apache.skywalking.oap.server.core.oal.rt.OALCompileException; import org.apache.skywalking.oap.server.core.oal.rt.OALCompileException;
import org.apache.skywalking.oap.server.core.oal.rt.OALEngine; import org.apache.skywalking.oap.server.core.oal.rt.OALEngine;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.annotation.Column; import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.library.module.ModuleStartException; import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.util.ResourceUtils; import org.apache.skywalking.oap.server.library.util.ResourceUtils;
...@@ -165,7 +166,13 @@ public class OALRuntime implements OALEngine { ...@@ -165,7 +166,13 @@ public class OALRuntime implements OALEngine {
@Override @Override
public void notifyAllListeners() throws ModuleStartException { public void notifyAllListeners() throws ModuleStartException {
metricsClasses.forEach(streamAnnotationListener::notify); for (Class metricsClass : metricsClasses) {
try {
streamAnnotationListener.notify(metricsClass);
} catch (StorageException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
for (Class dispatcherClass : dispatcherClasses) { for (Class dispatcherClass : dispatcherClasses) {
try { try {
dispatcherDetectorListener.addIfAsSourceDispatcher(dispatcherClass); dispatcherDetectorListener.addIfAsSourceDispatcher(dispatcherClass);
......
...@@ -22,6 +22,7 @@ import java.io.IOException; ...@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.List; import java.util.List;
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan; import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
...@@ -29,7 +30,7 @@ import org.junit.Test; ...@@ -29,7 +30,7 @@ import org.junit.Test;
public class DeepAnalysisTest { public class DeepAnalysisTest {
@BeforeClass @BeforeClass
public static void init() throws IOException { public static void init() throws IOException, StorageException {
AnnotationScan scopeScan = new AnnotationScan(); AnnotationScan scopeScan = new AnnotationScan();
scopeScan.registerListener(new DefaultScopeDefine.Listener()); scopeScan.registerListener(new DefaultScopeDefine.Listener());
scopeScan.scan(); scopeScan.scan();
......
...@@ -22,6 +22,7 @@ import java.io.IOException; ...@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.List; import java.util.List;
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan; import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
...@@ -29,7 +30,7 @@ import org.junit.Test; ...@@ -29,7 +30,7 @@ import org.junit.Test;
public class ScriptParserTest { public class ScriptParserTest {
@BeforeClass @BeforeClass
public static void init() throws IOException { public static void init() throws IOException, StorageException {
MetricsHolder.init(); MetricsHolder.init();
AnnotationScan scopeScan = new AnnotationScan(); AnnotationScan scopeScan = new AnnotationScan();
......
...@@ -45,8 +45,8 @@ import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister; ...@@ -45,8 +45,8 @@ import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister; import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
import org.apache.skywalking.oap.server.core.source.SourceReceiver; import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.core.storage.model.IModelManager; import org.apache.skywalking.oap.server.core.storage.model.IModelManager;
import org.apache.skywalking.oap.server.core.storage.model.IModelOverride; import org.apache.skywalking.oap.server.core.storage.model.ModelManipulator;
import org.apache.skywalking.oap.server.core.storage.model.INewModel; import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter; import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter; import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefine; import org.apache.skywalking.oap.server.library.module.ModuleDefine;
...@@ -110,9 +110,9 @@ public class CoreModule extends ModuleDefine { ...@@ -110,9 +110,9 @@ public class CoreModule extends ModuleDefine {
} }
private void addInsideService(List<Class> classes) { private void addInsideService(List<Class> classes) {
classes.add(INewModel.class); classes.add(ModelCreator.class);
classes.add(IModelManager.class); classes.add(IModelManager.class);
classes.add(IModelOverride.class); classes.add(ModelManipulator.class);
classes.add(RemoteClientManager.class); classes.add(RemoteClientManager.class);
classes.add(RemoteSenderService.class); classes.add(RemoteSenderService.class);
} }
......
...@@ -68,9 +68,10 @@ import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; ...@@ -68,9 +68,10 @@ import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.source.SourceReceiver; import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl; import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl;
import org.apache.skywalking.oap.server.core.storage.PersistenceTimer; import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.model.IModelManager; import org.apache.skywalking.oap.server.core.storage.model.IModelManager;
import org.apache.skywalking.oap.server.core.storage.model.IModelOverride; import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.model.INewModel; import org.apache.skywalking.oap.server.core.storage.model.ModelManipulator;
import org.apache.skywalking.oap.server.core.storage.model.StorageModels; import org.apache.skywalking.oap.server.core.storage.model.StorageModels;
import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer; import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter; import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
...@@ -155,15 +156,14 @@ public class CoreModuleProvider extends ModuleProvider { ...@@ -155,15 +156,14 @@ public class CoreModuleProvider extends ModuleProvider {
throw new ModuleStartException(e.getMessage(), e); throw new ModuleStartException(e.getMessage(), e);
} }
MeterSystem meterSystem = MeterSystem.meterSystem(getManager()); this.registerServiceImplementation(MeterSystem.class, new MeterSystem(getManager()));
this.registerServiceImplementation(MeterSystem.class, meterSystem);
AnnotationScan oalDisable = new AnnotationScan(); AnnotationScan oalDisable = new AnnotationScan();
oalDisable.registerListener(DisableRegister.INSTANCE); oalDisable.registerListener(DisableRegister.INSTANCE);
oalDisable.registerListener(new DisableRegister.SingleDisableScanListener()); oalDisable.registerListener(new DisableRegister.SingleDisableScanListener());
try { try {
oalDisable.scan(); oalDisable.scan();
} catch (IOException e) { } catch (IOException | StorageException e) {
throw new ModuleStartException(e.getMessage(), e); throw new ModuleStartException(e.getMessage(), e);
} }
...@@ -210,9 +210,9 @@ public class CoreModuleProvider extends ModuleProvider { ...@@ -210,9 +210,9 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(IWorkerInstanceSetter.class, instancesService); this.registerServiceImplementation(IWorkerInstanceSetter.class, instancesService);
this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager())); this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager()));
this.registerServiceImplementation(INewModel.class, storageModels); this.registerServiceImplementation(ModelCreator.class, storageModels);
this.registerServiceImplementation(IModelManager.class, storageModels); this.registerServiceImplementation(IModelManager.class, storageModels);
this.registerServiceImplementation(IModelOverride.class, storageModels); this.registerServiceImplementation(ModelManipulator.class, storageModels);
this.registerServiceImplementation( this.registerServiceImplementation(
NetworkAddressAliasCache.class, new NetworkAddressAliasCache(moduleConfig)); NetworkAddressAliasCache.class, new NetworkAddressAliasCache(moduleConfig));
...@@ -256,8 +256,6 @@ public class CoreModuleProvider extends ModuleProvider { ...@@ -256,8 +256,6 @@ public class CoreModuleProvider extends ModuleProvider {
@Override @Override
public void start() throws ModuleStartException { public void start() throws ModuleStartException {
MeterSystem.closeMeterCreationChannel();
grpcServer.addHandler(new RemoteServiceHandler(getManager())); grpcServer.addHandler(new RemoteServiceHandler(getManager()));
grpcServer.addHandler(new HealthCheckServiceHandler()); grpcServer.addHandler(new HealthCheckServiceHandler());
remoteClientManager.start(); remoteClientManager.start();
...@@ -267,7 +265,7 @@ public class CoreModuleProvider extends ModuleProvider { ...@@ -267,7 +265,7 @@ public class CoreModuleProvider extends ModuleProvider {
annotationScan.scan(); annotationScan.scan();
oalEngine.notifyAllListeners(); oalEngine.notifyAllListeners();
} catch (IOException | IllegalAccessException | InstantiationException e) { } catch (IOException | IllegalAccessException | InstantiationException | StorageException e) {
throw new ModuleStartException(e.getMessage(), e); throw new ModuleStartException(e.getMessage(), e);
} }
......
...@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.analysis.worker.NoneStreamingProces ...@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.analysis.worker.NoneStreamingProces
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor; import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor; import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor;
import org.apache.skywalking.oap.server.core.annotation.AnnotationListener; import org.apache.skywalking.oap.server.core.annotation.AnnotationListener;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/** /**
...@@ -45,7 +46,7 @@ public class StreamAnnotationListener implements AnnotationListener { ...@@ -45,7 +46,7 @@ public class StreamAnnotationListener implements AnnotationListener {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public void notify(Class aClass) { public void notify(Class aClass) throws StorageException {
if (aClass.isAnnotationPresent(Stream.class)) { if (aClass.isAnnotationPresent(Stream.class)) {
Stream stream = (Stream) aClass.getAnnotation(Stream.class); Stream stream = (Stream) aClass.getAnnotation(Stream.class);
......
...@@ -18,11 +18,12 @@ ...@@ -18,11 +18,12 @@
package org.apache.skywalking.oap.server.core.analysis; package org.apache.skywalking.oap.server.core.analysis;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
public interface StreamProcessor<STREAM> { public interface StreamProcessor<STREAM> {
void in(STREAM stream); void in(STREAM stream);
void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends STREAM> streamClass); void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends STREAM> streamClass) throws StorageException;
} }
...@@ -23,9 +23,7 @@ import com.google.common.reflect.ClassPath; ...@@ -23,9 +23,7 @@ import com.google.common.reflect.ClassPath;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.ParameterizedType; import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import javassist.CannotCompileException; import javassist.CannotCompileException;
import javassist.ClassPool; import javassist.ClassPool;
...@@ -34,7 +32,6 @@ import javassist.CtConstructor; ...@@ -34,7 +32,6 @@ import javassist.CtConstructor;
import javassist.CtNewConstructor; import javassist.CtNewConstructor;
import javassist.CtNewMethod; import javassist.CtNewMethod;
import javassist.NotFoundException; import javassist.NotFoundException;
import lombok.EqualsAndHashCode;
import lombok.Getter; import lombok.Getter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
...@@ -45,6 +42,7 @@ import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableV ...@@ -45,6 +42,7 @@ import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableV
import org.apache.skywalking.oap.server.core.analysis.meter.function.MeterFunction; import org.apache.skywalking.oap.server.core.analysis.meter.function.MeterFunction;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor; import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.Service; import org.apache.skywalking.oap.server.library.module.Service;
...@@ -57,29 +55,18 @@ import org.apache.skywalking.oap.server.library.module.Service; ...@@ -57,29 +55,18 @@ import org.apache.skywalking.oap.server.library.module.Service;
@Slf4j @Slf4j
public class MeterSystem implements Service { public class MeterSystem implements Service {
private static final String METER_CLASS_PACKAGE = "org.apache.skywalking.oap.server.core.analysis.meter.dynamic."; private static final String METER_CLASS_PACKAGE = "org.apache.skywalking.oap.server.core.analysis.meter.dynamic.";
private static ModuleManager MANAGER; private ModuleManager manager;
private static ClassPool CLASS_POOL; private ClassPool classPool;
private static List<NewMeter> TO_BE_CREATED_METERS = new ArrayList<>(); private Map<String, Class<? extends MeterFunction>> functionRegister = new HashMap<>();
private static Map<String, Class<? extends MeterFunction>> FUNCTION_REGISTER = new HashMap<>();
/** /**
* Host the dynamic meter prototype classes. These classes could be create dynamically through {@link * Host the dynamic meter prototype classes. These classes could be create dynamically through {@link
* Object#clone()} in the runtime; * Object#clone()} in the runtime;
*/ */
private static Map<String, MeterDefinition> METER_PROTOTYPES = new HashMap<>(); private Map<String, MeterDefinition> meterPrototypes = new HashMap<>();
private static MeterSystem METER_SYSTEM;
private static boolean METER_CREATABLE = true;
private MeterSystem() { public MeterSystem(final ModuleManager manager) {
this.manager = manager;
} classPool = ClassPool.getDefault();
public synchronized static MeterSystem meterSystem(final ModuleManager manager) {
if (METER_SYSTEM != null) {
return METER_SYSTEM;
}
MANAGER = manager;
CLASS_POOL = ClassPool.getDefault();
ClassPath classpath = null; ClassPath classpath = null;
try { try {
...@@ -97,14 +84,12 @@ public class MeterSystem implements Service { ...@@ -97,14 +84,12 @@ public class MeterSystem implements Service {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Function " + functionClass.getCanonicalName() + " doesn't implement AcceptableValue."); "Function " + functionClass.getCanonicalName() + " doesn't implement AcceptableValue.");
} }
FUNCTION_REGISTER.put( functionRegister.put(
metricsFunction.functionName(), metricsFunction.functionName(),
(Class<? extends MeterFunction>) functionClass (Class<? extends MeterFunction>) functionClass
); );
} }
} }
METER_SYSTEM = new MeterSystem();
return METER_SYSTEM;
} }
/** /**
...@@ -121,124 +106,98 @@ public class MeterSystem implements Service { ...@@ -121,124 +106,98 @@ public class MeterSystem implements Service {
String functionName, String functionName,
ScopeType type, ScopeType type,
Class<T> dataType) throws IllegalArgumentException { Class<T> dataType) throws IllegalArgumentException {
if (!METER_CREATABLE) { /**
throw new IllegalStateException("Can't create new metrics anymore"); * Create a new meter class dynamically.
} */
final Class<? extends MeterFunction> meterFunction = functionRegister.get(functionName);
final NewMeter newMeter = new NewMeter(metricsName, functionName, type, dataType); if (meterFunction == null) {
if (TO_BE_CREATED_METERS.contains(newMeter)) { throw new IllegalArgumentException("Function " + functionName + " can't be found.");
return false;
} }
TO_BE_CREATED_METERS.add(newMeter); boolean foundDataType = false;
return true; String acceptance = null;
} for (final Type genericInterface : meterFunction.getGenericInterfaces()) {
if (genericInterface instanceof ParameterizedType) {
/** ParameterizedType parameterizedType = (ParameterizedType) genericInterface;
* Close the {@link #create(String, String, ScopeType, Class)} channel, and build the model and streaming if (parameterizedType.getRawType().getTypeName().equals(AcceptableValue.class.getName())) {
* definitions. Type[] arguments = parameterizedType.getActualTypeArguments();
*/ if (arguments[0].equals(dataType)) {
public static void closeMeterCreationChannel() { foundDataType = true;
METER_CREATABLE = false; } else {
acceptance = arguments[0].getTypeName();
TO_BE_CREATED_METERS.forEach(newMeter -> {
String metricsName = newMeter.metricsName;
String functionName = newMeter.functionName;
ScopeType type = newMeter.type;
Class<?> dataType = newMeter.dataType;
/**
* Create a new meter class dynamically.
*/
final Class<? extends MeterFunction> meterFunction = FUNCTION_REGISTER.get(functionName);
if (meterFunction == null) {
throw new IllegalArgumentException("Function " + functionName + " can't be found.");
}
boolean foundDataType = false;
String acceptance = null;
for (final Type genericInterface : meterFunction.getGenericInterfaces()) {
if (genericInterface instanceof ParameterizedType) {
ParameterizedType parameterizedType = (ParameterizedType) genericInterface;
if (parameterizedType.getRawType().getTypeName().equals(AcceptableValue.class.getName())) {
Type[] arguments = parameterizedType.getActualTypeArguments();
if (arguments[0].equals(dataType)) {
foundDataType = true;
} else {
acceptance = arguments[0].getTypeName();
}
}
if (foundDataType) {
break;
} }
} }
} if (foundDataType) {
if (!foundDataType) { break;
throw new IllegalArgumentException("Function " + functionName
+ " requires <" + acceptance + "> in AcceptableValue"
+ " but using " + dataType.getName() + " in the creation");
}
final CtClass parentClass;
try {
parentClass = CLASS_POOL.get(meterFunction.getCanonicalName());
if (!Metrics.class.isAssignableFrom(meterFunction)) {
throw new IllegalArgumentException(
"Function " + functionName + " doesn't inherit from Metrics.");
} }
} catch (NotFoundException e) {
throw new IllegalArgumentException("Function " + functionName + " can't be found by javaassist.");
}
final String className = formatName(metricsName);
CtClass metricsClass = CLASS_POOL.makeClass(METER_CLASS_PACKAGE + className, parentClass);
/**
* Create empty construct
*/
try {
CtConstructor defaultConstructor = CtNewConstructor.make(
"public " + className + "() {}", metricsClass);
metricsClass.addConstructor(defaultConstructor);
} catch (CannotCompileException e) {
log.error("Can't add empty constructor in " + className + ".", e);
throw new UnexpectedException(e.getMessage(), e);
} }
}
if (!foundDataType) {
throw new IllegalArgumentException("Function " + functionName
+ " requires <" + acceptance + "> in AcceptableValue"
+ " but using " + dataType.getName() + " in the creation");
}
/** final CtClass parentClass;
* Generate `AcceptableValue<T> createNew()` method. try {
*/ parentClass = classPool.get(meterFunction.getCanonicalName());
try { if (!Metrics.class.isAssignableFrom(meterFunction)) {
metricsClass.addMethod(CtNewMethod.make( throw new IllegalArgumentException(
"" "Function " + functionName + " doesn't inherit from Metrics.");
+ "public org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue createNew() {"
+ " return new " + METER_CLASS_PACKAGE + className + "();"
+ " }"
, metricsClass));
} catch (CannotCompileException e) {
log.error("Can't generate createNew method for " + className + ".", e);
throw new UnexpectedException(e.getMessage(), e);
} }
} catch (NotFoundException e) {
throw new IllegalArgumentException("Function " + functionName + " can't be found by javaassist.");
}
final String className = formatName(metricsName);
CtClass metricsClass = classPool.makeClass(METER_CLASS_PACKAGE + className, parentClass);
Class targetClass; /**
try { * Create empty construct
targetClass = metricsClass.toClass(MeterSystem.class.getClassLoader(), null); */
AcceptableValue prototype = (AcceptableValue) targetClass.newInstance(); try {
METER_PROTOTYPES.put(metricsName, new MeterDefinition(type, prototype, dataType)); CtConstructor defaultConstructor = CtNewConstructor.make(
"public " + className + "() {}", metricsClass);
metricsClass.addConstructor(defaultConstructor);
} catch (CannotCompileException e) {
log.error("Can't add empty constructor in " + className + ".", e);
throw new UnexpectedException(e.getMessage(), e);
}
log.debug("Generate metrics class, " + metricsClass.getName()); /**
* Generate `AcceptableValue<T> createNew()` method.
*/
try {
metricsClass.addMethod(CtNewMethod.make(
""
+ "public org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue createNew() {"
+ " return new " + METER_CLASS_PACKAGE + className + "();"
+ " }"
, metricsClass));
} catch (CannotCompileException e) {
log.error("Can't generate createNew method for " + className + ".", e);
throw new UnexpectedException(e.getMessage(), e);
}
MetricsStreamProcessor.getInstance().create( Class targetClass;
MANAGER, try {
new StreamDefinition( targetClass = metricsClass.toClass(MeterSystem.class.getClassLoader(), null);
metricsName, type.getScopeId(), prototype.builder(), MetricsStreamProcessor.class), AcceptableValue prototype = (AcceptableValue) targetClass.newInstance();
targetClass meterPrototypes.put(metricsName, new MeterDefinition(type, prototype, dataType));
);
} catch (CannotCompileException | IllegalAccessException | InstantiationException e) { log.debug("Generate metrics class, " + metricsClass.getName());
log.error("Can't compile/load/init " + className + ".", e);
throw new UnexpectedException(e.getMessage(), e); MetricsStreamProcessor.getInstance().create(
} manager,
}); new StreamDefinition(
metricsName, type.getScopeId(), prototype.builder(), MetricsStreamProcessor.class),
targetClass
);
} catch (CannotCompileException | IllegalAccessException | InstantiationException | StorageException e) {
log.error("Can't compile/load/init " + className + ".", e);
throw new UnexpectedException(e.getMessage(), e);
}
return true;
} }
/** /**
...@@ -253,7 +212,7 @@ public class MeterSystem implements Service { ...@@ -253,7 +212,7 @@ public class MeterSystem implements Service {
*/ */
public <T> AcceptableValue<T> buildMetrics(String metricsName, public <T> AcceptableValue<T> buildMetrics(String metricsName,
Class<T> dataType) { Class<T> dataType) {
MeterDefinition meterDefinition = METER_PROTOTYPES.get(metricsName); MeterDefinition meterDefinition = meterPrototypes.get(metricsName);
if (meterDefinition == null) { if (meterDefinition == null) {
throw new IllegalArgumentException("Uncreated metrics " + metricsName); throw new IllegalArgumentException("Uncreated metrics " + metricsName);
} }
...@@ -284,15 +243,6 @@ public class MeterSystem implements Service { ...@@ -284,15 +243,6 @@ public class MeterSystem implements Service {
return metricsName.toLowerCase(); return metricsName.toLowerCase();
} }
@RequiredArgsConstructor
@EqualsAndHashCode
public static class NewMeter {
private final String metricsName;
private final String functionName;
private final ScopeType type;
private final Class<?> dataType;
}
@RequiredArgsConstructor @RequiredArgsConstructor
@Getter @Getter
private static class MeterDefinition { private static class MeterDefinition {
......
...@@ -36,9 +36,10 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; ...@@ -36,9 +36,10 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.config.DownSamplingConfigService; import org.apache.skywalking.oap.server.core.config.DownSamplingConfigService;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO; import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule; import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage; import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.model.INewModel; import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter; import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
...@@ -92,14 +93,14 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> { ...@@ -92,14 +93,14 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
* @param stream definition of the metrics class. * @param stream definition of the metrics class.
* @param metricsClass data type of the streaming calculation. * @param metricsClass data type of the streaming calculation.
*/ */
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Metrics> metricsClass) { public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Metrics> metricsClass) throws StorageException {
this.create(moduleDefineHolder, StreamDefinition.from(stream), metricsClass); this.create(moduleDefineHolder, StreamDefinition.from(stream), metricsClass);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void create(ModuleDefineHolder moduleDefineHolder, public void create(ModuleDefineHolder moduleDefineHolder,
StreamDefinition stream, StreamDefinition stream,
Class<? extends Metrics> metricsClass) { Class<? extends Metrics> metricsClass) throws StorageException {
if (DisableRegister.INSTANCE.include(stream.getName())) { if (DisableRegister.INSTANCE.include(stream.getName())) {
return; return;
} }
...@@ -112,7 +113,7 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> { ...@@ -112,7 +113,7 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
throw new UnexpectedException("Create " + stream.getBuilder().getSimpleName() + " metrics DAO failure.", e); throw new UnexpectedException("Create " + stream.getBuilder().getSimpleName() + " metrics DAO failure.", e);
} }
INewModel modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(INewModel.class); ModelCreator modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ModelCreator.class);
DownSamplingConfigService configService = moduleDefineHolder.find(CoreModule.NAME) DownSamplingConfigService configService = moduleDefineHolder.find(CoreModule.NAME)
.provider() .provider()
.getService(DownSamplingConfigService.class); .getService(DownSamplingConfigService.class);
......
...@@ -29,9 +29,10 @@ import org.apache.skywalking.oap.server.core.analysis.StreamProcessor; ...@@ -29,9 +29,10 @@ import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
import org.apache.skywalking.oap.server.core.analysis.config.NoneStream; import org.apache.skywalking.oap.server.core.analysis.config.NoneStream;
import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO; import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO; import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule; import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage; import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.model.INewModel; import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
...@@ -58,7 +59,7 @@ public class NoneStreamingProcessor implements StreamProcessor<NoneStream> { ...@@ -58,7 +59,7 @@ public class NoneStreamingProcessor implements StreamProcessor<NoneStream> {
} }
@Override @Override
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends NoneStream> streamClass) { public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends NoneStream> streamClass) throws StorageException {
if (DisableRegister.INSTANCE.include(stream.name())) { if (DisableRegister.INSTANCE.include(stream.name())) {
return; return;
} }
...@@ -72,7 +73,7 @@ public class NoneStreamingProcessor implements StreamProcessor<NoneStream> { ...@@ -72,7 +73,7 @@ public class NoneStreamingProcessor implements StreamProcessor<NoneStream> {
.getSimpleName() + " none stream record DAO failure.", e); .getSimpleName() + " none stream record DAO failure.", e);
} }
INewModel modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(INewModel.class); ModelCreator modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ModelCreator.class);
Model model = modelSetter.add(streamClass, stream.scopeId(), new Storage(stream.name(), DownSampling.Second), true); Model model = modelSetter.add(streamClass, stream.scopeId(), new Storage(stream.name(), DownSampling.Second), true);
final NoneStreamPersistentWorker persistentWorker = new NoneStreamPersistentWorker(moduleDefineHolder, model, noneStream); final NoneStreamPersistentWorker persistentWorker = new NoneStreamPersistentWorker(moduleDefineHolder, model, noneStream);
......
...@@ -29,9 +29,10 @@ import org.apache.skywalking.oap.server.core.analysis.StreamProcessor; ...@@ -29,9 +29,10 @@ import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
import org.apache.skywalking.oap.server.core.analysis.record.Record; import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO; import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO; import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule; import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage; import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.model.INewModel; import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
...@@ -53,7 +54,7 @@ public class RecordStreamProcessor implements StreamProcessor<Record> { ...@@ -53,7 +54,7 @@ public class RecordStreamProcessor implements StreamProcessor<Record> {
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Record> recordClass) { public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Record> recordClass) throws StorageException {
if (DisableRegister.INSTANCE.include(stream.name())) { if (DisableRegister.INSTANCE.include(stream.name())) {
return; return;
} }
...@@ -66,7 +67,7 @@ public class RecordStreamProcessor implements StreamProcessor<Record> { ...@@ -66,7 +67,7 @@ public class RecordStreamProcessor implements StreamProcessor<Record> {
throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " record DAO failure.", e); throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " record DAO failure.", e);
} }
INewModel modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(INewModel.class); ModelCreator modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ModelCreator.class);
Model model = modelSetter.add( Model model = modelSetter.add(
recordClass, stream.scopeId(), new Storage(stream.name(), DownSampling.Second), true); recordClass, stream.scopeId(), new Storage(stream.name(), DownSampling.Second), true);
RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, recordDAO); RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, recordDAO);
......
...@@ -34,9 +34,10 @@ import org.apache.skywalking.oap.server.core.analysis.record.Record; ...@@ -34,9 +34,10 @@ import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN; import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO; import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO; import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule; import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage; import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.model.INewModel; import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
...@@ -63,7 +64,7 @@ public class TopNStreamProcessor implements StreamProcessor<TopN> { ...@@ -63,7 +64,7 @@ public class TopNStreamProcessor implements StreamProcessor<TopN> {
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends TopN> topNClass) { public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends TopN> topNClass) throws StorageException {
if (DisableRegister.INSTANCE.include(stream.name())) { if (DisableRegister.INSTANCE.include(stream.name())) {
return; return;
} }
...@@ -77,7 +78,7 @@ public class TopNStreamProcessor implements StreamProcessor<TopN> { ...@@ -77,7 +78,7 @@ public class TopNStreamProcessor implements StreamProcessor<TopN> {
.getSimpleName() + " top n record DAO failure.", e); .getSimpleName() + " top n record DAO failure.", e);
} }
INewModel modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(INewModel.class); ModelCreator modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ModelCreator.class);
Model model = modelSetter.add( Model model = modelSetter.add(
topNClass, stream.scopeId(), new Storage(stream.name(), DownSampling.Second), true); topNClass, stream.scopeId(), new Storage(stream.name(), DownSampling.Second), true);
......
...@@ -19,10 +19,11 @@ ...@@ -19,10 +19,11 @@
package org.apache.skywalking.oap.server.core.annotation; package org.apache.skywalking.oap.server.core.annotation;
import java.lang.annotation.Annotation; import java.lang.annotation.Annotation;
import org.apache.skywalking.oap.server.core.storage.StorageException;
public interface AnnotationListener { public interface AnnotationListener {
Class<? extends Annotation> annotation(); Class<? extends Annotation> annotation();
void notify(Class aClass); void notify(Class aClass) throws StorageException;
} }
...@@ -25,6 +25,7 @@ import java.lang.annotation.Annotation; ...@@ -25,6 +25,7 @@ import java.lang.annotation.Annotation;
import java.util.Comparator; import java.util.Comparator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import org.apache.skywalking.oap.server.core.storage.StorageException;
/** /**
* Scan the annotation, and notify the listener(s) * Scan the annotation, and notify the listener(s)
...@@ -49,7 +50,7 @@ public class AnnotationScan { ...@@ -49,7 +50,7 @@ public class AnnotationScan {
/** /**
* Begin to scan classes. * Begin to scan classes.
*/ */
public void scan() throws IOException { public void scan() throws IOException, StorageException {
ClassPath classpath = ClassPath.from(this.getClass().getClassLoader()); ClassPath classpath = ClassPath.from(this.getClass().getClassLoader());
ImmutableSet<ClassPath.ClassInfo> classes = classpath.getTopLevelClassesRecursive("org.apache.skywalking"); ImmutableSet<ClassPath.ClassInfo> classes = classpath.getTopLevelClassesRecursive("org.apache.skywalking");
for (ClassPath.ClassInfo classInfo : classes) { for (ClassPath.ClassInfo classInfo : classes) {
...@@ -62,7 +63,9 @@ public class AnnotationScan { ...@@ -62,7 +63,9 @@ public class AnnotationScan {
} }
} }
listeners.forEach(AnnotationListenerCache::complete); for (AnnotationListenerCache listener : listeners) {
listener.complete();
}
} }
private class AnnotationListenerCache { private class AnnotationListenerCache {
...@@ -82,9 +85,11 @@ public class AnnotationScan { ...@@ -82,9 +85,11 @@ public class AnnotationScan {
matchedClass.add(aClass); matchedClass.add(aClass);
} }
private void complete() { private void complete() throws StorageException {
matchedClass.sort(Comparator.comparing(Class::getName)); matchedClass.sort(Comparator.comparing(Class::getName));
matchedClass.forEach(aClass -> listener.notify(aClass)); for (Class<?> aClass : matchedClass) {
listener.notify(aClass);
}
} }
} }
} }
...@@ -24,7 +24,7 @@ import java.lang.annotation.RetentionPolicy; ...@@ -24,7 +24,7 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target; import java.lang.annotation.Target;
import lombok.Getter; import lombok.Getter;
import org.apache.skywalking.oap.server.core.query.sql.Function; import org.apache.skywalking.oap.server.core.query.sql.Function;
import org.apache.skywalking.oap.server.core.storage.model.IModelOverride; import org.apache.skywalking.oap.server.core.storage.model.ModelManipulator;
/** /**
* Data column of all persistent entity. * Data column of all persistent entity.
...@@ -34,7 +34,7 @@ import org.apache.skywalking.oap.server.core.storage.model.IModelOverride; ...@@ -34,7 +34,7 @@ import org.apache.skywalking.oap.server.core.storage.model.IModelOverride;
public @interface Column { public @interface Column {
/** /**
* column name in the storage. Most of the storage will keep the name consistently. But in same cases, this name * column name in the storage. Most of the storage will keep the name consistently. But in same cases, this name
* could be a keyword, then, the implementation will use {@link IModelOverride} to replace the column name. * could be a keyword, then, the implementation will use {@link ModelManipulator} to replace the column name.
*/ */
String columnName(); String columnName();
......
...@@ -18,17 +18,24 @@ ...@@ -18,17 +18,24 @@
package org.apache.skywalking.oap.server.core.storage.model; package org.apache.skywalking.oap.server.core.storage.model;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage; import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.library.module.Service; import org.apache.skywalking.oap.server.library.module.Service;
/** /**
* INewModel implementation supports creating a new module. * INewModel implementation supports creating a new module.
*/ */
public interface INewModel extends Service { public interface ModelCreator extends Service {
/** /**
* Add a new model * Add a new model
* *
* @return the created new model * @return the created new model
*/ */
Model add(Class<?> aClass, int scopeId, Storage storage, boolean record); Model add(Class<?> aClass, int scopeId, Storage storage, boolean record) throws StorageException;
void addModelListener(CreatingListener listener) throws StorageException;
interface CreatingListener {
void whenCreating(Model model) throws StorageException;
}
} }
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
package org.apache.skywalking.oap.server.core.storage.model; package org.apache.skywalking.oap.server.core.storage.model;
import java.util.List; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.RunningMode; import org.apache.skywalking.oap.server.core.RunningMode;
...@@ -29,63 +29,52 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager; ...@@ -29,63 +29,52 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager;
/** /**
* The core module installation controller. * The core module installation controller.
*/ */
@RequiredArgsConstructor
@Slf4j @Slf4j
public abstract class ModelInstaller { public abstract class ModelInstaller implements ModelCreator.CreatingListener {
protected final Client client;
private final ModuleManager moduleManager; private final ModuleManager moduleManager;
public ModelInstaller(ModuleManager moduleManager) { public void whenCreating(Model model) throws StorageException {
this.moduleManager = moduleManager;
}
/**
* Entrance of the storage entity installation work.
*/
public final void install(Client client) throws StorageException {
IModelManager modelGetter = moduleManager.find(CoreModule.NAME).provider().getService(IModelManager.class);
List<Model> models = modelGetter.allModels();
if (RunningMode.isNoInitMode()) { if (RunningMode.isNoInitMode()) {
for (Model model : models) { while (!isExists(model)) {
while (!isExists(client, model)) { try {
try { log.info(
log.info( "table: {} does not exist. OAP is running in 'no-init' mode, waiting... retry 3s later.",
"table: {} does not exist. OAP is running in 'no-init' mode, waiting... retry 3s later.", model
model .getName()
.getName() );
); Thread.sleep(3000L);
Thread.sleep(3000L); } catch (InterruptedException e) {
} catch (InterruptedException e) { log.error(e.getMessage());
log.error(e.getMessage());
}
} }
} }
} else { } else {
for (Model model : models) { if (!isExists(model)) {
if (!isExists(client, model)) { log.info("table: {} does not exist", model.getName());
log.info("table: {} does not exist", model.getName()); createTable(model);
createTable(client, model);
}
} }
} }
} }
/** /**
* Installer implementation could use this API to request a column name replacement. This method delegates for * Installer implementation could use this API to request a column name replacement. This method delegates for
* {@link IModelOverride}. * {@link ModelManipulator}.
*/ */
protected final void overrideColumnName(String columnName, String newName) { protected final void overrideColumnName(String columnName, String newName) {
IModelOverride modelOverride = moduleManager.find(CoreModule.NAME).provider().getService(IModelOverride.class); ModelManipulator modelOverride = moduleManager.find(CoreModule.NAME)
.provider()
.getService(ModelManipulator.class);
modelOverride.overrideColumnName(columnName, newName); modelOverride.overrideColumnName(columnName, newName);
} }
/** /**
* Check whether the storage entity exists. Need to implement based on the real storage. * Check whether the storage entity exists. Need to implement based on the real storage.
*/ */
protected abstract boolean isExists(Client client, Model model) throws StorageException; protected abstract boolean isExists(Model model) throws StorageException;
/** /**
* Create the storage entity. All creations should be after the {@link #isExists(Client, Model)} check. * Create the storage entity. All creations should be after the {@link #isExists(Model)} check.
*/ */
protected abstract void createTable(Client client, Model model) throws StorageException; protected abstract void createTable(Model model) throws StorageException;
} }
...@@ -23,6 +23,6 @@ import org.apache.skywalking.oap.server.library.module.Service; ...@@ -23,6 +23,6 @@ import org.apache.skywalking.oap.server.library.module.Service;
/** /**
* Override service provides ways to rename the existing column or table name. * Override service provides ways to rename the existing column or table name.
*/ */
public interface IModelOverride extends Service { public interface ModelManipulator extends Service {
void overrideColumnName(String columnName, String newName); void overrideColumnName(String columnName, String newName);
} }
...@@ -20,11 +20,12 @@ package org.apache.skywalking.oap.server.core.storage.model; ...@@ -20,11 +20,12 @@ package org.apache.skywalking.oap.server.core.storage.model;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedList; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.annotation.Column; import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.MultipleQueryUnifiedIndex; import org.apache.skywalking.oap.server.core.storage.annotation.MultipleQueryUnifiedIndex;
import org.apache.skywalking.oap.server.core.storage.annotation.QueryUnifiedIndex; import org.apache.skywalking.oap.server.core.storage.annotation.QueryUnifiedIndex;
...@@ -35,15 +36,19 @@ import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnMetad ...@@ -35,15 +36,19 @@ import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnMetad
* StorageModels manages all models detected by the core. * StorageModels manages all models detected by the core.
*/ */
@Slf4j @Slf4j
public class StorageModels implements IModelManager, INewModel, IModelOverride { public class StorageModels implements IModelManager, ModelCreator, ModelManipulator {
private final List<Model> models; private final List<Model> models;
private final HashMap<String, String> columnNameOverrideRule;
private final List<CreatingListener> listeners;
public StorageModels() { public StorageModels() {
this.models = new LinkedList<>(); this.models = new ArrayList<>();
this.columnNameOverrideRule = new HashMap<>();
this.listeners = new ArrayList<>();
} }
@Override @Override
public Model add(Class<?> aClass, int scopeId, Storage storage, boolean record) { public Model add(Class<?> aClass, int scopeId, Storage storage, boolean record) throws StorageException {
// Check this scope id is valid. // Check this scope id is valid.
DefaultScopeDefine.nameOf(scopeId); DefaultScopeDefine.nameOf(scopeId);
...@@ -55,11 +60,30 @@ public class StorageModels implements IModelManager, INewModel, IModelOverride { ...@@ -55,11 +60,30 @@ public class StorageModels implements IModelManager, INewModel, IModelOverride {
storage.getModelName(), modelColumns, extraQueryIndices, scopeId, storage.getModelName(), modelColumns, extraQueryIndices, scopeId,
storage.getDownsampling(), record storage.getDownsampling(), record
); );
this.followColumnNameRules(model);
models.add(model); models.add(model);
for (final CreatingListener listener : listeners) {
listener.whenCreating(model);
}
return model; return model;
} }
/**
* CreatingListener listener could react when {@link #add(Class, int, Storage, boolean)} model happens. Also, the
* added models are being notified in this add operation.
*/
@Override
public void addModelListener(final CreatingListener listener) throws StorageException {
listeners.add(listener);
for (Model model : models) {
listener.whenCreating(model);
}
}
/**
* Read model column metadata based on the class level definition.
*/
private void retrieval(Class<?> clazz, private void retrieval(Class<?> clazz,
String modelName, String modelName,
List<ModelColumn> modelColumns, List<ModelColumn> modelColumns,
...@@ -108,9 +132,14 @@ public class StorageModels implements IModelManager, INewModel, IModelOverride { ...@@ -108,9 +132,14 @@ public class StorageModels implements IModelManager, INewModel, IModelOverride {
@Override @Override
public void overrideColumnName(String columnName, String newName) { public void overrideColumnName(String columnName, String newName) {
models.forEach(model -> { columnNameOverrideRule.put(columnName, newName);
model.getColumns().forEach(column -> column.getColumnName().overrideName(columnName, newName)); models.forEach(this::followColumnNameRules);
model.getExtraQueryIndices().forEach(extraQueryIndex -> extraQueryIndex.overrideName(columnName, newName)); }
private void followColumnNameRules(Model model) {
columnNameOverrideRule.forEach((oldName, newName) -> {
model.getColumns().forEach(column -> column.getColumnName().overrideName(oldName, newName));
model.getExtraQueryIndices().forEach(extraQueryIndex -> extraQueryIndex.overrideName(oldName, newName));
}); });
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.CoreModuleProvider;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.junit.Test;
import org.mockito.Mockito;
import org.powermock.reflect.Whitebox;
public class StorageInstallerTestCase {
@Test
public void testInstall() throws StorageException, ServiceNotProvidedException {
CoreModuleProvider moduleProvider = Mockito.mock(CoreModuleProvider.class);
CoreModule moduleDefine = Mockito.spy(CoreModule.class);
ModuleManager moduleManager = Mockito.mock(ModuleManager.class);
Whitebox.setInternalState(moduleDefine, "loadedProvider", moduleProvider);
Mockito.when(moduleManager.find(CoreModule.NAME)).thenReturn(moduleDefine);
// streamDataMapping.generate();
// TestStorageInstaller installer = new TestStorageInstaller(moduleManager);
// installer.install(null);
}
class TestStorageInstaller extends ModelInstaller {
public TestStorageInstaller(ModuleManager moduleManager) {
super(moduleManager);
}
@Override
protected boolean isExists(Client client, Model tableDefine) throws StorageException {
return false;
}
@Override
protected void createTable(Client client, Model tableDefine) throws StorageException {
}
}
}
...@@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProces ...@@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProces
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder; import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.StorageData; import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.annotation.Column; import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.QueryUnifiedIndex; import org.apache.skywalking.oap.server.core.storage.annotation.QueryUnifiedIndex;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage; import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
...@@ -47,7 +48,7 @@ public class StorageModelsTest { ...@@ -47,7 +48,7 @@ public class StorageModelsTest {
} }
@Test @Test
public void testStorageModels() { public void testStorageModels() throws StorageException {
StorageModels models = new StorageModels(); StorageModels models = new StorageModels();
models.add(TestModel.class, -1, models.add(TestModel.class, -1,
new Storage("StorageModelsTest", DownSampling.Hour), new Storage("StorageModelsTest", DownSampling.Hour),
......
...@@ -59,17 +59,6 @@ public class PrometheusFetcherProvider extends ModuleProvider { ...@@ -59,17 +59,6 @@ public class PrometheusFetcherProvider extends ModuleProvider {
@Override @Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException { public void prepare() throws ServiceNotProvidedException, ModuleStartException {
if (config.isActive()) {
// TODO. This is only a demo about creating new metrics
// We should create it based on metrics configuration.
final MeterSystem meterSystem = MeterSystem.meterSystem(getManager());
meterSystem.create("test_long_metrics", "avg", ScopeType.SERVICE, Long.class);
meterSystem.create("test_histogram_metrics", "histogram", ScopeType.SERVICE, BucketedValues.class);
meterSystem.create(
"test_percentile_metrics", "percentile", ScopeType.SERVICE,
PercentileFunction.PercentileArgument.class
);
}
} }
@Override @Override
...@@ -82,6 +71,14 @@ public class PrometheusFetcherProvider extends ModuleProvider { ...@@ -82,6 +71,14 @@ public class PrometheusFetcherProvider extends ModuleProvider {
if (config.isActive()) { if (config.isActive()) {
// TODO. This is only a demo about fetching the data and push into the calculation stream. // TODO. This is only a demo about fetching the data and push into the calculation stream.
final MeterSystem service = getManager().find(CoreModule.NAME).provider().getService(MeterSystem.class); final MeterSystem service = getManager().find(CoreModule.NAME).provider().getService(MeterSystem.class);
service.create("test_long_metrics", "avg", ScopeType.SERVICE, Long.class);
service.create("test_histogram_metrics", "histogram", ScopeType.SERVICE, BucketedValues.class);
service.create(
"test_percentile_metrics", "percentile", ScopeType.SERVICE,
PercentileFunction.PercentileArgument.class
);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new Runnable() { Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new Runnable() {
@Override @Override
public void run() { public void run() {
......
...@@ -35,6 +35,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageDAO; ...@@ -35,6 +35,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule; import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO; import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
...@@ -178,10 +179,9 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider { ...@@ -178,10 +179,9 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
public void start() throws ModuleStartException { public void start() throws ModuleStartException {
try { try {
elasticSearchClient.connect(); elasticSearchClient.connect();
StorageEsInstaller installer = new StorageEsInstaller(elasticSearchClient, getManager(), config);
StorageEsInstaller installer = new StorageEsInstaller(getManager(), config); getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer);
installer.install(elasticSearchClient);
} catch (StorageException | IOException | KeyStoreException | NoSuchAlgorithmException | KeyManagementException | CertificateException e) { } catch (StorageException | IOException | KeyStoreException | NoSuchAlgorithmException | KeyManagementException | CertificateException e) {
throw new ModuleStartException(e.getMessage(), e); throw new ModuleStartException(e.getMessage(), e);
} }
......
...@@ -41,14 +41,14 @@ public class StorageEsInstaller extends ModelInstaller { ...@@ -41,14 +41,14 @@ public class StorageEsInstaller extends ModelInstaller {
private final StorageModuleElasticsearchConfig config; private final StorageModuleElasticsearchConfig config;
protected final ColumnTypeEsMapping columnTypeEsMapping; protected final ColumnTypeEsMapping columnTypeEsMapping;
public StorageEsInstaller(ModuleManager moduleManager, final StorageModuleElasticsearchConfig config) { public StorageEsInstaller(Client client, ModuleManager moduleManager, final StorageModuleElasticsearchConfig config) {
super(moduleManager); super(client, moduleManager);
this.columnTypeEsMapping = new ColumnTypeEsMapping(); this.columnTypeEsMapping = new ColumnTypeEsMapping();
this.config = config; this.config = config;
} }
@Override @Override
protected boolean isExists(Client client, Model model) throws StorageException { protected boolean isExists(Model model) throws StorageException {
ElasticSearchClient esClient = (ElasticSearchClient) client; ElasticSearchClient esClient = (ElasticSearchClient) client;
try { try {
String timeSeriesIndexName = TimeSeriesUtils.latestWriteIndexName(model); String timeSeriesIndexName = TimeSeriesUtils.latestWriteIndexName(model);
...@@ -59,7 +59,7 @@ public class StorageEsInstaller extends ModelInstaller { ...@@ -59,7 +59,7 @@ public class StorageEsInstaller extends ModelInstaller {
} }
@Override @Override
protected void createTable(Client client, Model model) throws StorageException { protected void createTable(Model model) throws StorageException {
ElasticSearchClient esClient = (ElasticSearchClient) client; ElasticSearchClient esClient = (ElasticSearchClient) client;
Map<String, Object> settings = createSetting(model.isRecord()); Map<String, Object> settings = createSetting(model.isRecord());
......
...@@ -33,6 +33,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageDAO; ...@@ -33,6 +33,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule; import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO; import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
...@@ -184,8 +185,8 @@ public class StorageModuleElasticsearch7Provider extends ModuleProvider { ...@@ -184,8 +185,8 @@ public class StorageModuleElasticsearch7Provider extends ModuleProvider {
try { try {
elasticSearch7Client.connect(); elasticSearch7Client.connect();
StorageEs7Installer installer = new StorageEs7Installer(getManager(), config); StorageEs7Installer installer = new StorageEs7Installer(elasticSearch7Client, getManager(), config);
installer.install(elasticSearch7Client); getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer);
} catch (StorageException | IOException | KeyStoreException | NoSuchAlgorithmException | KeyManagementException | CertificateException e) { } catch (StorageException | IOException | KeyStoreException | NoSuchAlgorithmException | KeyManagementException | CertificateException e) {
throw new ModuleStartException(e.getMessage(), e); throw new ModuleStartException(e.getMessage(), e);
} }
......
...@@ -17,22 +17,21 @@ ...@@ -17,22 +17,21 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.base; package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.base;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.StorageEsInstaller; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.StorageEsInstaller;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.StorageModuleElasticsearch7Config; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.StorageModuleElasticsearch7Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
@Slf4j
public class StorageEs7Installer extends StorageEsInstaller { public class StorageEs7Installer extends StorageEsInstaller {
public StorageEs7Installer(final Client client,
private static final Logger logger = LoggerFactory.getLogger(StorageEs7Installer.class); final ModuleManager moduleManager,
final StorageModuleElasticsearch7Config config) {
public StorageEs7Installer(final ModuleManager moduleManager, final StorageModuleElasticsearch7Config config) { super(client, moduleManager, config);
super(moduleManager, config);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
...@@ -41,7 +40,7 @@ public class StorageEs7Installer extends StorageEsInstaller { ...@@ -41,7 +40,7 @@ public class StorageEs7Installer extends StorageEsInstaller {
Map<String, Object> type = (Map<String, Object>) mapping.remove(ElasticSearchClient.TYPE); Map<String, Object> type = (Map<String, Object>) mapping.remove(ElasticSearchClient.TYPE);
mapping.put("properties", type.get("properties")); mapping.put("properties", type.get("properties"));
logger.debug("elasticsearch index template setting: {}", mapping.toString()); log.debug("elasticsearch index template setting: {}", mapping.toString());
return mapping; return mapping;
} }
......
...@@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageDAO; ...@@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule; import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO; import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
...@@ -114,9 +115,9 @@ public class InfluxStorageProvider extends ModuleProvider { ...@@ -114,9 +115,9 @@ public class InfluxStorageProvider extends ModuleProvider {
public void start() throws ServiceNotProvidedException, ModuleStartException { public void start() throws ServiceNotProvidedException, ModuleStartException {
client.connect(); client.connect();
InfluxTableInstaller installer = new InfluxTableInstaller(getManager()); InfluxTableInstaller installer = new InfluxTableInstaller(client, getManager());
try { try {
installer.install(client); getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer);
} catch (StorageException e) { } catch (StorageException e) {
throw new ModuleStartException(e.getMessage(), e); throw new ModuleStartException(e.getMessage(), e);
} }
......
...@@ -26,18 +26,18 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager; ...@@ -26,18 +26,18 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager;
public class InfluxTableInstaller extends ModelInstaller { public class InfluxTableInstaller extends ModelInstaller {
public InfluxTableInstaller(ModuleManager moduleManager) { public InfluxTableInstaller(Client client, ModuleManager moduleManager) {
super(moduleManager); super(client, moduleManager);
} }
@Override @Override
protected boolean isExists(final Client client, final Model model) throws StorageException { protected boolean isExists(final Model model) throws StorageException {
TableMetaInfo.addModel(model); TableMetaInfo.addModel(model);
return true; return true;
} }
@Override @Override
protected void createTable(final Client client, final Model model) throws StorageException { protected void createTable(final Model model) throws StorageException {
// Automatically create table // Automatically create table
} }
} }
...@@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageDAO; ...@@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule; import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO; import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
...@@ -132,8 +133,8 @@ public class H2StorageProvider extends ModuleProvider { ...@@ -132,8 +133,8 @@ public class H2StorageProvider extends ModuleProvider {
try { try {
h2Client.connect(); h2Client.connect();
H2TableInstaller installer = new H2TableInstaller(getManager()); H2TableInstaller installer = new H2TableInstaller(h2Client, getManager());
installer.install(h2Client); getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer);
} catch (StorageException e) { } catch (StorageException e) {
throw new ModuleStartException(e.getMessage(), e); throw new ModuleStartException(e.getMessage(), e);
} }
......
...@@ -44,18 +44,18 @@ import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo; ...@@ -44,18 +44,18 @@ import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo;
public class H2TableInstaller extends ModelInstaller { public class H2TableInstaller extends ModelInstaller {
public static final String ID_COLUMN = "id"; public static final String ID_COLUMN = "id";
public H2TableInstaller(ModuleManager moduleManager) { public H2TableInstaller(Client client, ModuleManager moduleManager) {
super(moduleManager); super(client, moduleManager);
} }
@Override @Override
protected boolean isExists(Client client, Model model) throws StorageException { protected boolean isExists(Model model) throws StorageException {
TableMetaInfo.addModel(model); TableMetaInfo.addModel(model);
return false; return false;
} }
@Override @Override
protected void createTable(Client client, Model model) throws StorageException { protected void createTable(Model model) throws StorageException {
JDBCHikariCPClient jdbcHikariCPClient = (JDBCHikariCPClient) client; JDBCHikariCPClient jdbcHikariCPClient = (JDBCHikariCPClient) client;
try (Connection connection = jdbcHikariCPClient.getConnection()) { try (Connection connection = jdbcHikariCPClient.getConnection()) {
SQLBuilder tableCreateSQL = new SQLBuilder("CREATE TABLE IF NOT EXISTS " + model.getName() + " ("); SQLBuilder tableCreateSQL = new SQLBuilder("CREATE TABLE IF NOT EXISTS " + model.getName() + " (");
......
...@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageDAO; ...@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule; import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO; import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
...@@ -122,8 +123,8 @@ public class MySQLStorageProvider extends ModuleProvider { ...@@ -122,8 +123,8 @@ public class MySQLStorageProvider extends ModuleProvider {
try { try {
mysqlClient.connect(); mysqlClient.connect();
MySQLTableInstaller installer = new MySQLTableInstaller(getManager()); MySQLTableInstaller installer = new MySQLTableInstaller(mysqlClient, getManager());
installer.install(mysqlClient); getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer);
} catch (StorageException e) { } catch (StorageException e) {
throw new ModuleStartException(e.getMessage(), e); throw new ModuleStartException(e.getMessage(), e);
} }
......
...@@ -40,8 +40,8 @@ import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TableInstal ...@@ -40,8 +40,8 @@ import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TableInstal
*/ */
@Slf4j @Slf4j
public class MySQLTableInstaller extends H2TableInstaller { public class MySQLTableInstaller extends H2TableInstaller {
public MySQLTableInstaller(ModuleManager moduleManager) { public MySQLTableInstaller(Client client, ModuleManager moduleManager) {
super(moduleManager); super(client, moduleManager);
/* /*
* Override column because the default column names in core have syntax conflict with MySQL. * Override column because the default column names in core have syntax conflict with MySQL.
*/ */
...@@ -50,7 +50,7 @@ public class MySQLTableInstaller extends H2TableInstaller { ...@@ -50,7 +50,7 @@ public class MySQLTableInstaller extends H2TableInstaller {
} }
@Override @Override
protected boolean isExists(Client client, Model model) throws StorageException { protected boolean isExists(Model model) throws StorageException {
TableMetaInfo.addModel(model); TableMetaInfo.addModel(model);
JDBCHikariCPClient h2Client = (JDBCHikariCPClient) client; JDBCHikariCPClient h2Client = (JDBCHikariCPClient) client;
try (Connection conn = h2Client.getConnection()) { try (Connection conn = h2Client.getConnection()) {
......
...@@ -49,9 +49,10 @@ import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister; ...@@ -49,9 +49,10 @@ import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister; import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.source.SourceReceiver; import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.model.IModelManager; import org.apache.skywalking.oap.server.core.storage.model.IModelManager;
import org.apache.skywalking.oap.server.core.storage.model.IModelOverride; import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.model.INewModel; import org.apache.skywalking.oap.server.core.storage.model.ModelManipulator;
import org.apache.skywalking.oap.server.core.storage.model.StorageModels; import org.apache.skywalking.oap.server.core.storage.model.StorageModels;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter; import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter; import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
...@@ -108,8 +109,7 @@ public class MockCoreModuleProvider extends CoreModuleProvider { ...@@ -108,8 +109,7 @@ public class MockCoreModuleProvider extends CoreModuleProvider {
throw new ModuleStartException(e.getMessage(), e); throw new ModuleStartException(e.getMessage(), e);
} }
MeterSystem meterSystem = MeterSystem.meterSystem(getManager()); this.registerServiceImplementation(MeterSystem.class, new MeterSystem(getManager()));
this.registerServiceImplementation(MeterSystem.class, meterSystem);
CoreModuleConfig moduleConfig = new CoreModuleConfig(); CoreModuleConfig moduleConfig = new CoreModuleConfig();
this.registerServiceImplementation(ConfigService.class, new ConfigService(moduleConfig)); this.registerServiceImplementation(ConfigService.class, new ConfigService(moduleConfig));
...@@ -129,9 +129,9 @@ public class MockCoreModuleProvider extends CoreModuleProvider { ...@@ -129,9 +129,9 @@ public class MockCoreModuleProvider extends CoreModuleProvider {
this.registerServiceImplementation(IWorkerInstanceSetter.class, instancesService); this.registerServiceImplementation(IWorkerInstanceSetter.class, instancesService);
this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager())); this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager()));
this.registerServiceImplementation(INewModel.class, storageModels); this.registerServiceImplementation(ModelCreator.class, storageModels);
this.registerServiceImplementation(IModelManager.class, storageModels); this.registerServiceImplementation(IModelManager.class, storageModels);
this.registerServiceImplementation(IModelOverride.class, storageModels); this.registerServiceImplementation(ModelManipulator.class, storageModels);
this.registerServiceImplementation( this.registerServiceImplementation(
NetworkAddressAliasCache.class, new NetworkAddressAliasCache(moduleConfig)); NetworkAddressAliasCache.class, new NetworkAddressAliasCache(moduleConfig));
...@@ -160,11 +160,9 @@ public class MockCoreModuleProvider extends CoreModuleProvider { ...@@ -160,11 +160,9 @@ public class MockCoreModuleProvider extends CoreModuleProvider {
@Override @Override
public void start() throws ModuleStartException { public void start() throws ModuleStartException {
MeterSystem.closeMeterCreationChannel();
try { try {
annotationScan.scan(); annotationScan.scan();
} catch (IOException e) { } catch (IOException | StorageException e) {
throw new ModuleStartException(e.getMessage(), e); throw new ModuleStartException(e.getMessage(), e);
} }
} }
......
...@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener; ...@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener;
import org.apache.skywalking.oap.server.core.analysis.worker.NoneStreamingProcessor; import org.apache.skywalking.oap.server.core.analysis.worker.NoneStreamingProcessor;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor; import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
import org.apache.skywalking.oap.server.core.annotation.AnnotationListener; import org.apache.skywalking.oap.server.core.annotation.AnnotationListener;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/** /**
...@@ -44,7 +45,7 @@ public class MockStreamAnnotationListener implements AnnotationListener { ...@@ -44,7 +45,7 @@ public class MockStreamAnnotationListener implements AnnotationListener {
} }
@Override @Override
public void notify(Class aClass) { public void notify(Class aClass) throws StorageException {
if (aClass.isAnnotationPresent(Stream.class)) { if (aClass.isAnnotationPresent(Stream.class)) {
Stream stream = (Stream) aClass.getAnnotation(Stream.class); Stream stream = (Stream) aClass.getAnnotation(Stream.class);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册