未验证 提交 b9432655 编写于 作者: J Jared Tan 提交者: GitHub

Merge branch 'master' into len

......@@ -22,27 +22,60 @@ import java.io.IOException;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorTypeListener;
import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordTypeListener;
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.config.*;
import org.apache.skywalking.oap.server.core.query.*;
import org.apache.skywalking.oap.server.core.cache.CacheUpdateTimer;
import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
import org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache;
import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.config.ComponentLibraryCatalogService;
import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
import org.apache.skywalking.oap.server.core.query.AggregationQueryService;
import org.apache.skywalking.oap.server.core.query.AlarmQueryService;
import org.apache.skywalking.oap.server.core.query.MetadataQueryService;
import org.apache.skywalking.oap.server.core.query.MetricQueryService;
import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
import org.apache.skywalking.oap.server.core.query.TraceQueryService;
import org.apache.skywalking.oap.server.core.register.annotation.InventoryTypeListener;
import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.core.remote.*;
import org.apache.skywalking.oap.server.core.remote.annotation.*;
import org.apache.skywalking.oap.server.core.remote.client.*;
import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.register.service.EndpointInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.IEndpointInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.INetworkAddressInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.NetworkAddressInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.ServiceInstanceInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.ServiceInventoryRegister;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamAnnotationListener;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataAnnotationContainer;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegisterImpl;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl;
import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageAnnotationListener;
import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
import org.apache.skywalking.oap.server.core.storage.model.IModelOverride;
import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
import org.slf4j.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
......@@ -59,6 +92,7 @@ public class CoreModuleProvider extends ModuleProvider {
private final StorageAnnotationListener storageAnnotationListener;
private final StreamAnnotationListener streamAnnotationListener;
private final StreamDataAnnotationContainer streamDataAnnotationContainer;
private final SourceReceiverImpl receiver;
public CoreModuleProvider() {
super();
......@@ -67,6 +101,7 @@ public class CoreModuleProvider extends ModuleProvider {
this.storageAnnotationListener = new StorageAnnotationListener();
this.streamAnnotationListener = new StreamAnnotationListener();
this.streamDataAnnotationContainer = new StreamDataAnnotationContainer();
receiver = new SourceReceiverImpl();
}
@Override public String name() {
......@@ -101,7 +136,7 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(IComponentLibraryCatalogService.class, new ComponentLibraryCatalogService());
this.registerServiceImplementation(SourceReceiver.class, new SourceReceiverImpl());
this.registerServiceImplementation(SourceReceiver.class, receiver);
this.registerServiceImplementation(StreamDataClassGetter.class, streamDataAnnotationContainer);
......@@ -143,10 +178,12 @@ public class CoreModuleProvider extends ModuleProvider {
remoteClientManager.start();
try {
receiver.scan();
annotationScan.scan(() -> {
streamDataAnnotationContainer.generate(streamAnnotationListener.getStreamClasses());
});
} catch (IOException e) {
} catch (IOException | IllegalAccessException | InstantiationException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
......
......@@ -18,57 +18,91 @@
package org.apache.skywalking.oap.server.core.analysis;
import java.util.*;
import org.apache.skywalking.oap.server.core.analysis.generated.all.AllDispatcher;
import org.apache.skywalking.oap.server.core.analysis.generated.endpoint.EndpointDispatcher;
import org.apache.skywalking.oap.server.core.analysis.generated.endpointrelation.EndpointRelationDispatcher;
import org.apache.skywalking.oap.server.core.analysis.generated.service.ServiceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstance.ServiceInstanceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancejvmcpu.ServiceInstanceJVMCPUDispatcher;
import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancejvmgc.ServiceInstanceJVMGCDispatcher;
import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancejvmmemory.ServiceInstanceJVMMemoryDispatcher;
import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancejvmmemorypool.ServiceInstanceJVMMemoryPoolDispatcher;
import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancerelation.ServiceInstanceRelationDispatcher;
import org.apache.skywalking.oap.server.core.analysis.generated.servicerelation.ServiceRelationDispatcher;
import org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation.EndpointCallRelationDispatcher;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentDispatcher;
import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.ServiceCallRelationDispatcher;
import org.apache.skywalking.oap.server.core.source.*;
import org.slf4j.*;
import com.google.common.collect.ImmutableSet;
import com.google.common.reflect.ClassPath;
import java.io.IOException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.source.Scope;
import org.apache.skywalking.oap.server.core.source.Source;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
* @author peng-yongsheng, wusheng
*/
public class DispatcherManager {
private static final Logger logger = LoggerFactory.getLogger(DispatcherManager.class);
private Map<Scope, SourceDispatcher[]> dispatcherMap;
private Map<Scope, List<SourceDispatcher>> dispatcherMap;
public DispatcherManager() {
this.dispatcherMap = new HashMap<>();
}
this.dispatcherMap.put(Scope.All, new SourceDispatcher[] {new AllDispatcher()});
public void forward(Source source) {
for (SourceDispatcher dispatcher : dispatcherMap.get(source.scope())) {
dispatcher.dispatch(source);
}
}
this.dispatcherMap.put(Scope.Segment, new SourceDispatcher[] {new SegmentDispatcher()});
/**
* Scan all classes under `org.apache.skywalking` package,
*
* If it implement {@link org.apache.skywalking.oap.server.core.analysis.SourceDispatcher}, then, it will be added
* into this DispatcherManager based on the Source definition.
*
* @throws IOException
* @throws IllegalAccessException
* @throws InstantiationException
*/
public void scan() throws IOException, IllegalAccessException, InstantiationException {
ClassPath classpath = ClassPath.from(this.getClass().getClassLoader());
ImmutableSet<ClassPath.ClassInfo> classes = classpath.getTopLevelClassesRecursive("org.apache.skywalking");
for (ClassPath.ClassInfo classInfo : classes) {
Class<?> aClass = classInfo.load();
this.dispatcherMap.put(Scope.Service, new SourceDispatcher[] {new ServiceDispatcher()});
this.dispatcherMap.put(Scope.ServiceInstance, new SourceDispatcher[] {new ServiceInstanceDispatcher()});
this.dispatcherMap.put(Scope.Endpoint, new SourceDispatcher[] {new EndpointDispatcher()});
if (!aClass.isInterface() && SourceDispatcher.class.isAssignableFrom(aClass)) {
Type[] genericInterfaces = aClass.getGenericInterfaces();
for (Type genericInterface : genericInterfaces) {
ParameterizedType anInterface = (ParameterizedType)genericInterface;
if (anInterface.getRawType().getTypeName().equals(SourceDispatcher.class.getName())) {
Type[] arguments = anInterface.getActualTypeArguments();
this.dispatcherMap.put(Scope.ServiceRelation, new SourceDispatcher[] {new ServiceRelationDispatcher(), new ServiceCallRelationDispatcher()});
this.dispatcherMap.put(Scope.ServiceInstanceRelation, new SourceDispatcher[] {new ServiceInstanceRelationDispatcher()});
this.dispatcherMap.put(Scope.EndpointRelation, new SourceDispatcher[] {new EndpointRelationDispatcher(), new EndpointCallRelationDispatcher()});
if (arguments.length != 1) {
throw new UnexpectedException("unexpected type argument number, class " + aClass.getName());
}
Type argument = arguments[0];
this.dispatcherMap.put(Scope.ServiceInstanceJVMCPU, new SourceDispatcher[] {new ServiceInstanceJVMCPUDispatcher()});
this.dispatcherMap.put(Scope.ServiceInstanceJVMGC, new SourceDispatcher[] {new ServiceInstanceJVMGCDispatcher()});
this.dispatcherMap.put(Scope.ServiceInstanceJVMMemory, new SourceDispatcher[] {new ServiceInstanceJVMMemoryDispatcher()});
this.dispatcherMap.put(Scope.ServiceInstanceJVMMemoryPool, new SourceDispatcher[] {new ServiceInstanceJVMMemoryPoolDispatcher()});
}
Object source = ((Class)argument).newInstance();
public void forward(Source source) {
for (SourceDispatcher dispatcher : dispatcherMap.get(source.scope())) {
dispatcher.dispatch(source);
if (!Source.class.isAssignableFrom(source.getClass())) {
throw new UnexpectedException("unexpected type argument of class " + aClass.getName() + ", should be `org.apache.skywalking.oap.server.core.source.Source`. ");
}
Source dispatcherSource = (Source)source;
SourceDispatcher dispatcher = (SourceDispatcher)aClass.newInstance();
Scope scope = dispatcherSource.scope();
List<SourceDispatcher> dispatchers = this.dispatcherMap.get(scope);
if (dispatchers == null) {
dispatchers = new ArrayList<>();
this.dispatcherMap.put(scope, dispatchers);
}
dispatchers.add(dispatcher);
logger.info("Dispatcher {} is added into Scope {}.", dispatcher.getClass().getName(), scope);
}
}
}
}
}
}
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.core.source;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.analysis.DispatcherManager;
/**
......@@ -34,4 +35,8 @@ public class SourceReceiverImpl implements SourceReceiver {
@Override public void receive(Source source) {
dispatcherManager.forward(source);
}
public void scan() throws IOException, InstantiationException, IllegalAccessException {
dispatcherManager.scan();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册