提交 63fb961c 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

New OAP annotations, they are friendly for streaming loader. (#2670)

* New stream annotation.
- Inventory annotation example:
@Stream(name = ServiceInventory.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_INVENTORY, storage = @storage(builder = ServiceInventory.Builder.class, deleteHistory = false), kind = StreamKind.Inventory)

- Metrics annotation example:
@Stream(name = ServiceRelationClientSideMetrics.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_RELATION, storage = @storage(builder = ServiceRelationClientSideMetrics.Builder.class), kind = StreamKind.Metrics)

- Record annotation example:
@Stream(name = SegmentRecord.INDEX_NAME, scopeId = DefaultScopeDefine.SEGMENT, storage = @storage(builder = SegmentRecord.Builder.class), kind = StreamKind.Record)

- TopN annotation example:
@Stream(name = TopNDatabaseStatement.INDEX_NAME, scopeId = DefaultScopeDefine.DATABASE_SLOW_STATEMENT, storage = @storage(builder = TopNDatabaseStatement.Builder.class), kind = StreamKind.TopN)

* no message

* Stream annotation finish.

* Make stream processors to be the singleton class.

* Fixed the compile errors.

* Modify the OAL module for this annotation refactor.

* Add apache license header.

* Duplicate import.

* Fixed check style error.
上级 1b6bd5f8
......@@ -20,7 +20,7 @@ package org.apache.skywalking.oap.server.core.analysis.generated.${packageName};
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
<#if (metrics?size>0)>
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsProcess;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
<#list metrics as metrics>
<#if metrics.filterExpressions??>
import org.apache.skywalking.oap.server.core.analysis.metrics.expression.*;
......@@ -66,7 +66,8 @@ public class ${source}Dispatcher implements SourceDispatcher<${source}> {
metrics.${field.fieldSetter}(source.${field.fieldGetter}());
</#list>
metrics.${metrics.entryMethod.methodName}(<#list metrics.entryMethod.argsExpressions as arg>${arg}<#if arg_has_next>, </#if></#list>);
MetricsProcess.INSTANCE.in(metrics);
MetricsStreamProcessor.getInstance().in(metrics);
}
</#list>
}
......@@ -28,21 +28,19 @@ import org.apache.skywalking.oap.server.core.Const;
<#break>
</#if>
</#list>
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.metrics.*;
import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.MetricsType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
/**
* This class is auto generated. Please don't change this class manually.
*
* @author Observability Analysis Language code generator
*/
@MetricsType
@StreamData
@StorageEntity(name = "${tableName}", builder = ${metricsName}Metrics.Builder.class, sourceScopeId = ${sourceScopeId})
@Stream(name = "${tableName}", scopeId = ${sourceScopeId}, storage = @Storage(builder = ${metricsName}Metrics.Builder.class), processor = MetricsStreamProcessor.class)
public class ${metricsName}Metrics extends ${metricsClassName} implements WithMetadata {
<#list fieldsFromSource as sourceField>
......
......@@ -21,21 +21,19 @@ package org.apache.skywalking.oap.server.core.analysis.generated.service.service
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.metrics.*;
import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.MetricsType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
/**
* This class is auto generated. Please don't change this class manually.
*
* @author Observability Analysis Language code generator
*/
@MetricsType
@StreamData
@StorageEntity(name = "service_avg", builder = ServiceAvgMetrics.Builder.class, sourceScopeId = 1)
@Stream(name = "service_avg", scopeId = 1, storage = @Storage(builder = ServiceAvgMetrics.Builder.class), processor = MetricsStreamProcessor.class)
public class ServiceAvgMetrics extends LongAvgMetrics implements WithMetadata {
@Setter @Getter @Column(columnName = "entity_id") @IDColumn private java.lang.String entityId;
......
......@@ -19,7 +19,7 @@
package org.apache.skywalking.oap.server.core.analysis.generated.service;
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsProcess;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.analysis.metrics.expression.*;
import org.apache.skywalking.oap.server.core.source.*;
......@@ -47,6 +47,7 @@ public class ServiceDispatcher implements SourceDispatcher<Service> {
metrics.setTimeBucket(source.getTimeBucket());
metrics.setEntityId(source.getEntityId());
metrics.combine(source.getLatency(), 1);
MetricsProcess.INSTANCE.in(metrics);
MetricsStreamProcessor.getInstance().in(metrics);
}
}
......@@ -72,5 +72,4 @@ instance_jvm_old_gc_time = from(ServiceInstanceJVMGC.time).filter(phrase == GCPh
instance_jvm_young_gc_count = from(ServiceInstanceJVMGC.count).filter(phrase == GCPhrase.NEW).sum();
instance_jvm_old_gc_count = from(ServiceInstanceJVMGC.count).filter(phrase == GCPhrase.OLD).sum();
// endpoint_Avg_for_prod_serv = from(Endpoint.latency).filter(name == "/product/service").longAvg();
\ No newline at end of file
......@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.util.ResourceUtils;
public class AlarmModuleProvider extends ModuleProvider {
private NotifyHandler notifyHandler;
@Override public String name() {
......
......@@ -24,8 +24,8 @@ import org.apache.skywalking.oap.server.core.config.*;
import org.apache.skywalking.oap.server.core.query.*;
import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import org.apache.skywalking.oap.server.core.remote.define.*;
import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.core.storage.model.*;
......@@ -79,9 +79,11 @@ public class CoreModule extends ModuleDefine {
}
private void addInsideService(List<Class> classes) {
classes.add(IModelSetter.class);
classes.add(IModelGetter.class);
classes.add(IModelOverride.class);
classes.add(StreamDataClassGetter.class);
classes.add(StreamDataMappingGetter.class);
classes.add(StreamDataMappingSetter.class);
classes.add(RemoteClientManager.class);
classes.add(RemoteSenderService.class);
}
......
......@@ -19,25 +19,21 @@
package org.apache.skywalking.oap.server.core;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.MetricsTypeListener;
import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordTypeListener;
import org.apache.skywalking.oap.server.core.analysis.topn.annotation.TopNTypeListener;
import org.apache.skywalking.oap.server.core.analysis.*;
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.config.*;
import org.apache.skywalking.oap.server.core.query.*;
import org.apache.skywalking.oap.server.core.register.annotation.InventoryTypeListener;
import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.core.remote.*;
import org.apache.skywalking.oap.server.core.remote.annotation.*;
import org.apache.skywalking.oap.server.core.remote.client.*;
import org.apache.skywalking.oap.server.core.remote.define.*;
import org.apache.skywalking.oap.server.core.remote.health.HealthCheckServiceHandler;
import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageAnnotationListener;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageModels;
import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
import org.apache.skywalking.oap.server.core.worker.*;
......@@ -46,32 +42,27 @@ import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class CoreModuleProvider extends ModuleProvider {
private static final Logger logger = LoggerFactory.getLogger(CoreModuleProvider.class);
private final CoreModuleConfig moduleConfig;
private GRPCServer grpcServer;
private JettyServer jettyServer;
private RemoteClientManager remoteClientManager;
private final AnnotationScan annotationScan;
private final StorageAnnotationListener storageAnnotationListener;
private final StreamAnnotationListener streamAnnotationListener;
private final StreamDataAnnotationContainer streamDataAnnotationContainer;
private final StorageModels storageModels;
private final StreamDataMapping streamDataMapping;
private final SourceReceiverImpl receiver;
public CoreModuleProvider() {
super();
this.moduleConfig = new CoreModuleConfig();
this.annotationScan = new AnnotationScan();
this.storageAnnotationListener = new StorageAnnotationListener();
this.streamAnnotationListener = new StreamAnnotationListener();
this.streamDataAnnotationContainer = new StreamDataAnnotationContainer();
this.streamDataMapping = new StreamDataMapping();
this.storageModels = new StorageModels();
this.receiver = new SourceReceiverImpl();
}
......@@ -120,15 +111,17 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(SourceReceiver.class, receiver);
this.registerServiceImplementation(StreamDataClassGetter.class, streamDataAnnotationContainer);
this.registerServiceImplementation(StreamDataMappingGetter.class, streamDataMapping);
this.registerServiceImplementation(StreamDataMappingSetter.class, streamDataMapping);
WorkerInstancesService instancesService = new WorkerInstancesService();
this.registerServiceImplementation(IWorkerInstanceGetter.class, instancesService);
this.registerServiceImplementation(IWorkerInstanceSetter.class, instancesService);
this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager()));
this.registerServiceImplementation(IModelGetter.class, storageAnnotationListener);
this.registerServiceImplementation(IModelOverride.class, storageAnnotationListener);
this.registerServiceImplementation(IModelSetter.class, storageModels);
this.registerServiceImplementation(IModelGetter.class, storageModels);
this.registerServiceImplementation(IModelOverride.class, storageModels);
this.registerServiceImplementation(ServiceInventoryCache.class, new ServiceInventoryCache(getManager()));
this.registerServiceImplementation(IServiceInventoryRegister.class, new ServiceInventoryRegister(getManager()));
......@@ -151,12 +144,7 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(AlarmQueryService.class, new AlarmQueryService(getManager()));
this.registerServiceImplementation(TopNRecordsQueryService.class, new TopNRecordsQueryService(getManager()));
annotationScan.registerListener(storageAnnotationListener);
annotationScan.registerListener(streamAnnotationListener);
annotationScan.registerListener(new MetricsTypeListener(getManager()));
annotationScan.registerListener(new InventoryTypeListener(getManager()));
annotationScan.registerListener(new RecordTypeListener(getManager()));
annotationScan.registerListener(new TopNTypeListener(getManager()));
annotationScan.registerListener(new StreamAnnotationListener(getManager()));
this.remoteClientManager = new RemoteClientManager(getManager());
this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager);
......@@ -171,7 +159,6 @@ public class CoreModuleProvider extends ModuleProvider {
receiver.scan();
annotationScan.scan(() -> {
streamDataAnnotationContainer.generate(streamAnnotationListener.getStreamClasses());
});
} catch (IOException | IllegalAccessException | InstantiationException e) {
throw new ModuleStartException(e.getMessage(), e);
......
......@@ -21,8 +21,9 @@ package org.apache.skywalking.oap.server.core.alarm;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordType;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
......@@ -34,9 +35,8 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.AL
*/
@Getter
@Setter
@RecordType
@ScopeDeclaration(id = ALARM, name = "Alarm")
@StorageEntity(name = AlarmRecord.INDEX_NAME, builder = AlarmRecord.Builder.class, sourceScopeId = DefaultScopeDefine.ALARM)
@Stream(name = AlarmRecord.INDEX_NAME, scopeId = DefaultScopeDefine.ALARM, storage = @Storage(builder = AlarmRecord.Builder.class), processor = RecordStreamProcessor.class)
public class AlarmRecord extends Record {
public static final String INDEX_NAME = "alarm_record";
......
......@@ -19,7 +19,7 @@
package org.apache.skywalking.oap.server.core.alarm;
import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordProcess;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
import org.slf4j.*;
......@@ -47,7 +47,7 @@ public class AlarmStandardPersistence implements AlarmCallback {
record.setStartTime(message.getStartTime());
record.setTimeBucket(TimeBucketUtils.INSTANCE.getSecondTimeBucket(message.getStartTime()));
RecordProcess.INSTANCE.in(record);
RecordStreamProcessor.getInstance().in(record);
});
}
}
......@@ -23,6 +23,6 @@ import org.apache.skywalking.oap.server.core.source.Source;
/**
* @author peng-yongsheng
*/
public interface SourceDispatcher<S extends Source> {
void dispatch(S source);
public interface SourceDispatcher<SOURCE extends Source> {
void dispatch(SOURCE source);
}
......@@ -16,14 +16,23 @@
*
*/
package org.apache.skywalking.oap.server.core.remote.annotation;
package org.apache.skywalking.oap.server.core.analysis;
import java.lang.annotation.*;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
/**
* @author peng-yongsheng
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface StreamData {
public @interface Stream {
String name();
int scopeId();
Storage storage();
Class<? extends StreamProcessor> processor();
}
......@@ -16,34 +16,53 @@
*
*/
package org.apache.skywalking.oap.server.core.remote.annotation;
package org.apache.skywalking.oap.server.core.analysis;
import java.lang.annotation.Annotation;
import java.util.*;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.worker.*;
import org.apache.skywalking.oap.server.core.annotation.AnnotationListener;
import org.slf4j.*;
import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProcessor;
import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
* @author peng-yongsheng
*/
public class StreamAnnotationListener implements AnnotationListener {
private static final Logger logger = LoggerFactory.getLogger(StreamAnnotationListener.class);
private final ModuleDefineHolder moduleDefineHolder;
@Getter private final List<Class> streamClasses;
public StreamAnnotationListener() {
this.streamClasses = new LinkedList<>();
public StreamAnnotationListener(ModuleDefineHolder moduleDefineHolder) {
this.moduleDefineHolder = moduleDefineHolder;
}
@Override public Class<? extends Annotation> annotation() {
return StreamData.class;
return Stream.class;
}
@SuppressWarnings("unchecked")
@Override public void notify(Class aClass) {
logger.info("The owner class of stream data annotation, class name: {}", aClass.getName());
if (aClass.isAnnotationPresent(Stream.class)) {
Stream stream = (Stream)aClass.getAnnotation(Stream.class);
streamClasses.add(aClass);
if (stream.processor().equals(InventoryStreamProcessor.class)) {
InventoryStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class).putIfAbsent(aClass, false, stream.name(), stream.scopeId(), stream.storage());
} else if (stream.processor().equals(RecordStreamProcessor.class)) {
RecordStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class).putIfAbsent(aClass, false, stream.name(), stream.scopeId(), stream.storage());
} else if (stream.processor().equals(MetricsStreamProcessor.class)) {
MetricsStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class).putIfAbsent(aClass, true, stream.name(), stream.scopeId(), stream.storage());
} else if (stream.processor().equals(TopNStreamProcessor.class)) {
TopNStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class).putIfAbsent(aClass, false, stream.name(), stream.scopeId(), stream.storage());
} else {
throw new UnexpectedException("Unknown stream processor.");
}
} else {
throw new UnexpectedException("Stream annotation listener could only parse the class present stream annotation.");
}
}
}
......@@ -16,14 +16,16 @@
*
*/
package org.apache.skywalking.oap.server.core.analysis.metrics.annotation;
package org.apache.skywalking.oap.server.core.analysis;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
* @author peng-yongsheng
*/
public class MetricsAnnotationUtils {
public interface StreamProcessor<STREAM> {
void in(STREAM stream);
public static boolean isMetrics(Class aClass) {
return aClass.isAnnotationPresent(MetricsType.class);
}
void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends STREAM> streamClass);
}
......@@ -19,13 +19,14 @@
package org.apache.skywalking.oap.server.core.analysis.manual.database;
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.worker.TopNProcess;
import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor;
import org.apache.skywalking.oap.server.core.source.DatabaseSlowStatement;
/**
* @author wusheng
*/
public class DatabaseStatementDispatcher implements SourceDispatcher<DatabaseSlowStatement> {
@Override public void dispatch(DatabaseSlowStatement source) {
TopNDatabaseStatement statement = new TopNDatabaseStatement();
statement.setId(source.getId());
......@@ -35,6 +36,6 @@ public class DatabaseStatementDispatcher implements SourceDispatcher<DatabaseSlo
statement.setTimeBucket(source.getTimeBucket());
statement.setTraceId(source.getTraceId());
TopNProcess.INSTANCE.in(statement);
TopNStreamProcessor.getInstance().in(statement);
}
}
......@@ -19,23 +19,23 @@
package org.apache.skywalking.oap.server.core.analysis.manual.database;
import java.util.*;
import lombok.*;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.analysis.topn.annotation.TopNType;
import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
/**
* Database TopN statement, including Database SQL statement, mongoDB and Redis commands.
*
* @author wusheng
*/
@TopNType
@StorageEntity(name = TopNDatabaseStatement.INDEX_NAME, builder = TopNDatabaseStatement.Builder.class, sourceScopeId = DefaultScopeDefine.DATABASE_SLOW_STATEMENT)
@Stream(name = TopNDatabaseStatement.INDEX_NAME, scopeId = DefaultScopeDefine.DATABASE_SLOW_STATEMENT, storage = @Storage(builder = TopNDatabaseStatement.Builder.class), processor = TopNStreamProcessor.class)
public class TopNDatabaseStatement extends TopN {
public static final String INDEX_NAME = "top_n_database_statement";
public static final String INDEX_NAME = "top_n_database_statement";
@Setter private String id;
......
......@@ -19,13 +19,14 @@
package org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation;
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsProcess;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.source.EndpointRelation;
/**
* @author wusheng, peng-yongsheng
*/
public class EndpointCallRelationDispatcher implements SourceDispatcher<EndpointRelation> {
@Override
public void dispatch(EndpointRelation source) {
switch (source.getDetectPoint()) {
......@@ -42,6 +43,7 @@ public class EndpointCallRelationDispatcher implements SourceDispatcher<Endpoint
metrics.setDestEndpointId(source.getChildEndpointId());
metrics.setComponentId(source.getComponentId());
metrics.setEntityId(source.getEntityId());
MetricsProcess.INSTANCE.in(metrics);
MetricsStreamProcessor.getInstance().in(metrics);
}
}
......@@ -21,17 +21,15 @@ package org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.MetricsType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
@MetricsType
@StreamData
@StorageEntity(name = EndpointRelationServerSideMetrics.INDEX_NAME, builder = EndpointRelationServerSideMetrics.Builder.class, sourceScopeId = DefaultScopeDefine.ENDPOINT_RELATION)
@Stream(name = EndpointRelationServerSideMetrics.INDEX_NAME, scopeId = DefaultScopeDefine.ENDPOINT_RELATION, storage = @Storage(builder = EndpointRelationServerSideMetrics.Builder.class), processor = MetricsStreamProcessor.class)
public class EndpointRelationServerSideMetrics extends Metrics {
public static final String INDEX_NAME = "endpoint_relation_server_side";
......@@ -46,9 +44,9 @@ public class EndpointRelationServerSideMetrics extends Metrics {
@Override public String id() {
String splitJointId = String.valueOf(getTimeBucket());
splitJointId += Const.ID_SPLIT + String.valueOf(sourceEndpointId);
splitJointId += Const.ID_SPLIT + String.valueOf(destEndpointId);
splitJointId += Const.ID_SPLIT + String.valueOf(componentId);
splitJointId += Const.ID_SPLIT + sourceEndpointId;
splitJointId += Const.ID_SPLIT + destEndpointId;
splitJointId += Const.ID_SPLIT + componentId;
return splitJointId;
}
......
......@@ -19,13 +19,14 @@
package org.apache.skywalking.oap.server.core.analysis.manual.log;
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordProcess;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
import org.apache.skywalking.oap.server.core.source.HTTPAccessLog;
/**
* @author wusheng
*/
public class HTTPAccessLogDispatcher implements SourceDispatcher<HTTPAccessLog> {
@Override public void dispatch(HTTPAccessLog source) {
HTTPAccessLogRecord record = new HTTPAccessLogRecord();
record.setTimestamp(source.getTimestamp());
......@@ -39,6 +40,6 @@ public class HTTPAccessLogDispatcher implements SourceDispatcher<HTTPAccessLog>
record.setContentType(source.getContentType().value());
record.setContent(source.getContent());
RecordProcess.INSTANCE.in(record);
RecordStreamProcessor.getInstance().in(record);
}
}
......@@ -19,13 +19,16 @@
package org.apache.skywalking.oap.server.core.analysis.manual.log;
import java.util.Map;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntity;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import static org.apache.skywalking.oap.server.core.analysis.manual.log.HTTPAccessLogRecord.INDEX_NAME;
@StorageEntity(name = INDEX_NAME, builder = HTTPAccessLogRecord.Builder.class, sourceScopeId = DefaultScopeDefine.HTTP_ACCESS_LOG)
@Stream(name = INDEX_NAME, scopeId = DefaultScopeDefine.HTTP_ACCESS_LOG, storage = @Storage(builder = HTTPAccessLogRecord.Builder.class), processor = RecordStreamProcessor.class)
public class HTTPAccessLogRecord extends AbstractLogRecord {
public static final String INDEX_NAME = "http_access_log";
public static class Builder extends AbstractLogRecord.Builder<HTTPAccessLogRecord> {
......
......@@ -19,7 +19,7 @@
package org.apache.skywalking.oap.server.core.analysis.manual.segment;
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordProcess;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
import org.apache.skywalking.oap.server.core.source.Segment;
/**
......@@ -43,6 +43,6 @@ public class SegmentDispatcher implements SourceDispatcher<Segment> {
segment.setTimeBucket(source.getTimeBucket());
segment.setVersion(source.getVersion());
RecordProcess.INSTANCE.in(segment);
RecordStreamProcessor.getInstance().in(segment);
}
}
......@@ -22,8 +22,9 @@ import java.util.*;
import lombok.*;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordType;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
......@@ -32,8 +33,7 @@ import org.apache.skywalking.oap.server.library.util.CollectionUtils;
/**
* @author peng-yongsheng
*/
@RecordType
@StorageEntity(name = SegmentRecord.INDEX_NAME, builder = SegmentRecord.Builder.class, sourceScopeId = DefaultScopeDefine.SEGMENT)
@Stream(name = SegmentRecord.INDEX_NAME, scopeId = DefaultScopeDefine.SEGMENT, storage = @Storage(builder = SegmentRecord.Builder.class), processor = RecordStreamProcessor.class)
public class SegmentRecord extends Record {
public static final String INDEX_NAME = "segment";
......
......@@ -19,13 +19,14 @@
package org.apache.skywalking.oap.server.core.analysis.manual.servicerelation;
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsProcess;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.source.ServiceRelation;
/**
* @author wusheng
*/
public class ServiceCallRelationDispatcher implements SourceDispatcher<ServiceRelation> {
@Override
public void dispatch(ServiceRelation source) {
switch (source.getDetectPoint()) {
......@@ -45,7 +46,8 @@ public class ServiceCallRelationDispatcher implements SourceDispatcher<ServiceRe
metrics.setDestServiceId(source.getDestServiceId());
metrics.setComponentId(source.getComponentId());
metrics.setEntityId(source.getEntityId());
MetricsProcess.INSTANCE.in(metrics);
MetricsStreamProcessor.getInstance().in(metrics);
}
private void clientSide(ServiceRelation source) {
......@@ -55,6 +57,7 @@ public class ServiceCallRelationDispatcher implements SourceDispatcher<ServiceRe
metrics.setDestServiceId(source.getDestServiceId());
metrics.setComponentId(source.getComponentId());
metrics.setEntityId(source.getEntityId());
MetricsProcess.INSTANCE.in(metrics);
MetricsStreamProcessor.getInstance().in(metrics);
}
}
......@@ -21,17 +21,15 @@ package org.apache.skywalking.oap.server.core.analysis.manual.servicerelation;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.MetricsType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
@MetricsType
@StreamData
@StorageEntity(name = ServiceRelationClientSideMetrics.INDEX_NAME, builder = ServiceRelationClientSideMetrics.Builder.class, sourceScopeId = DefaultScopeDefine.SERVICE_RELATION)
@Stream(name = ServiceRelationClientSideMetrics.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_RELATION, storage = @Storage(builder = ServiceRelationClientSideMetrics.Builder.class), processor = MetricsStreamProcessor.class)
public class ServiceRelationClientSideMetrics extends Metrics {
public static final String INDEX_NAME = "service_relation_client_side";
......@@ -46,9 +44,9 @@ public class ServiceRelationClientSideMetrics extends Metrics {
@Override public String id() {
String splitJointId = String.valueOf(getTimeBucket());
splitJointId += Const.ID_SPLIT + String.valueOf(sourceServiceId);
splitJointId += Const.ID_SPLIT + String.valueOf(destServiceId);
splitJointId += Const.ID_SPLIT + String.valueOf(componentId);
splitJointId += Const.ID_SPLIT + sourceServiceId;
splitJointId += Const.ID_SPLIT + destServiceId;
splitJointId += Const.ID_SPLIT + componentId;
return splitJointId;
}
......
......@@ -21,18 +21,15 @@ package org.apache.skywalking.oap.server.core.analysis.manual.servicerelation;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.MetricsType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
@MetricsType
@StreamData
@StorageEntity(name = ServiceRelationServerSideMetrics.INDEX_NAME, builder = ServiceRelationServerSideMetrics.Builder.class,
sourceScopeId = DefaultScopeDefine.SERVICE_RELATION)
@Stream(name = ServiceRelationServerSideMetrics.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_RELATION, storage = @Storage(builder = ServiceRelationServerSideMetrics.Builder.class), processor = MetricsStreamProcessor.class)
public class ServiceRelationServerSideMetrics extends Metrics {
public static final String INDEX_NAME = "service_relation_server_side";
......@@ -47,9 +44,9 @@ public class ServiceRelationServerSideMetrics extends Metrics {
@Override public String id() {
String splitJointId = String.valueOf(getTimeBucket());
splitJointId += Const.ID_SPLIT + String.valueOf(sourceServiceId);
splitJointId += Const.ID_SPLIT + String.valueOf(destServiceId);
splitJointId += Const.ID_SPLIT + String.valueOf(componentId);
splitJointId += Const.ID_SPLIT + sourceServiceId;
splitJointId += Const.ID_SPLIT + destServiceId;
splitJointId += Const.ID_SPLIT + componentId;
return splitJointId;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.metrics.annotation;
import java.lang.annotation.Annotation;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsProcess;
import org.apache.skywalking.oap.server.core.annotation.AnnotationListener;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* @author peng-yongsheng
*/
public class MetricsTypeListener implements AnnotationListener {
private final ModuleManager moduleManager;
public MetricsTypeListener(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
@Override public Class<? extends Annotation> annotation() {
return MetricsType.class;
}
@Override public void notify(Class aClass) {
MetricsProcess.INSTANCE.create(moduleManager, aClass);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.record.annotation;
import java.lang.annotation.*;
/**
* @author peng-yongsheng
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RecordType {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.record.annotation;
import java.lang.annotation.Annotation;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordProcess;
import org.apache.skywalking.oap.server.core.annotation.AnnotationListener;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* @author peng-yongsheng
*/
public class RecordTypeListener implements AnnotationListener {
private final ModuleManager moduleManager;
public RecordTypeListener(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
@Override public Class<? extends Annotation> annotation() {
return RecordType.class;
}
@Override public void notify(Class aClass) {
RecordProcess.INSTANCE.create(moduleManager, aClass);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.topn.annotation;
import java.lang.annotation.*;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface TopNType {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.topn.annotation;
import java.lang.annotation.Annotation;
import org.apache.skywalking.oap.server.core.analysis.worker.TopNProcess;
import org.apache.skywalking.oap.server.core.annotation.AnnotationListener;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* @author wusheng
*/
public class TopNTypeListener implements AnnotationListener {
private final ModuleManager moduleManager;
public TopNTypeListener(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
@Override public Class<? extends Annotation> annotation() {
return TopNType.class;
}
@Override public void notify(Class aClass) {
TopNProcess.INSTANCE.create(moduleManager, aClass);
}
}
......@@ -21,21 +21,25 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
import org.apache.skywalking.oap.server.core.analysis.*;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
* @author peng-yongsheng
*/
public enum MetricsProcess {
INSTANCE;
public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
private final static MetricsStreamProcessor PROCESSOR = new MetricsStreamProcessor();
private Map<Class<? extends Metrics>, MetricsAggregateWorker> entryWorkers = new HashMap<>();
@Getter private List<MetricsPersistentWorker> persistentWorkers = new ArrayList<>();
public static MetricsStreamProcessor getInstance() {
return PROCESSOR;
}
public void in(Metrics metrics) {
MetricsAggregateWorker worker = entryWorkers.get(metrics.getClass());
if (worker != null) {
......@@ -43,50 +47,44 @@ public enum MetricsProcess {
}
}
public void create(ModuleManager moduleManager, Class<? extends Metrics> metricsClass) {
String modelName = StorageEntityAnnotationUtils.getModelName(metricsClass);
if (DisableRegister.INSTANCE.include(modelName)) {
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Metrics> metricsClass) {
if (DisableRegister.INSTANCE.include(stream.name())) {
return;
}
Class<? extends StorageBuilder> builderClass = StorageEntityAnnotationUtils.getBuilder(metricsClass);
StorageDAO storageDAO = moduleManager.find(StorageModule.NAME).provider().getService(StorageDAO.class);
StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IMetricsDAO metricsDAO;
try {
metricsDAO = storageDAO.newMetricsDao(builderClass.newInstance());
metricsDAO = storageDAO.newMetricsDao(stream.storage().builder().newInstance());
} catch (InstantiationException | IllegalAccessException e) {
throw new UnexpectedException("Create " + builderClass.getSimpleName() + " metrics DAO failure.", e);
throw new UnexpectedException("Create " + stream.storage().builder().getSimpleName() + " metrics DAO failure.", e);
}
MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(moduleManager, metricsDAO, modelName);
MetricsPersistentWorker hourPersistentWorker = worker(moduleManager, metricsDAO, modelName + Const.ID_SPLIT + Downsampling.Hour.getName());
MetricsPersistentWorker dayPersistentWorker = worker(moduleManager, metricsDAO, modelName + Const.ID_SPLIT + Downsampling.Day.getName());
MetricsPersistentWorker monthPersistentWorker = worker(moduleManager, metricsDAO, modelName + Const.ID_SPLIT + Downsampling.Month.getName());
MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(moduleDefineHolder, metricsDAO, stream.name());
MetricsPersistentWorker hourPersistentWorker = worker(moduleDefineHolder, metricsDAO, stream.name() + Const.ID_SPLIT + Downsampling.Hour.getName());
MetricsPersistentWorker dayPersistentWorker = worker(moduleDefineHolder, metricsDAO, stream.name() + Const.ID_SPLIT + Downsampling.Day.getName());
MetricsPersistentWorker monthPersistentWorker = worker(moduleDefineHolder, metricsDAO, stream.name() + Const.ID_SPLIT + Downsampling.Month.getName());
MetricsTransWorker transWorker = new MetricsTransWorker(moduleManager, modelName, minutePersistentWorker, hourPersistentWorker, dayPersistentWorker, monthPersistentWorker);
MetricsRemoteWorker remoteWorker = new MetricsRemoteWorker(moduleManager, transWorker, modelName);
MetricsAggregateWorker aggregateWorker = new MetricsAggregateWorker(moduleManager, remoteWorker, modelName);
MetricsTransWorker transWorker = new MetricsTransWorker(moduleDefineHolder, stream.name(), minutePersistentWorker, hourPersistentWorker, dayPersistentWorker, monthPersistentWorker);
MetricsRemoteWorker remoteWorker = new MetricsRemoteWorker(moduleDefineHolder, transWorker, stream.name());
MetricsAggregateWorker aggregateWorker = new MetricsAggregateWorker(moduleDefineHolder, remoteWorker, stream.name());
entryWorkers.put(metricsClass, aggregateWorker);
}
private MetricsPersistentWorker minutePersistentWorker(ModuleManager moduleManager,
IMetricsDAO metricsDAO, String modelName) {
AlarmNotifyWorker alarmNotifyWorker = new AlarmNotifyWorker(moduleManager);
ExportWorker exportWorker = new ExportWorker(moduleManager);
private MetricsPersistentWorker minutePersistentWorker(ModuleDefineHolder moduleDefineHolder, IMetricsDAO metricsDAO, String modelName) {
AlarmNotifyWorker alarmNotifyWorker = new AlarmNotifyWorker(moduleDefineHolder);
ExportWorker exportWorker = new ExportWorker(moduleDefineHolder);
MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentWorker(moduleManager, modelName,
MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentWorker(moduleDefineHolder, modelName,
1000, metricsDAO, alarmNotifyWorker, exportWorker);
persistentWorkers.add(minutePersistentWorker);
return minutePersistentWorker;
}
private MetricsPersistentWorker worker(ModuleManager moduleManager,
IMetricsDAO metricsDAO, String modelName) {
MetricsPersistentWorker persistentWorker = new MetricsPersistentWorker(moduleManager, modelName,
private MetricsPersistentWorker worker(ModuleDefineHolder moduleDefineHolder, IMetricsDAO metricsDAO, String modelName) {
MetricsPersistentWorker persistentWorker = new MetricsPersistentWorker(moduleDefineHolder, modelName,
1000, metricsDAO, null, null);
persistentWorkers.add(persistentWorker);
......
......@@ -21,20 +21,24 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
import org.apache.skywalking.oap.server.core.analysis.*;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
* @author peng-yongsheng
*/
public enum RecordProcess {
INSTANCE;
public class RecordStreamProcessor implements StreamProcessor<Record> {
private final static RecordStreamProcessor PROCESSOR = new RecordStreamProcessor();
private Map<Class<? extends Record>, RecordPersistentWorker> workers = new HashMap<>();
public static RecordStreamProcessor getInstance() {
return PROCESSOR;
}
public void in(Record record) {
RecordPersistentWorker worker = workers.get(record.getClass());
if (worker != null) {
......@@ -44,24 +48,20 @@ public enum RecordProcess {
@Getter private List<RecordPersistentWorker> persistentWorkers = new ArrayList<>();
public void create(ModuleManager moduleManager, Class<? extends Record> recordClass) {
String modelName = StorageEntityAnnotationUtils.getModelName(recordClass);
if (DisableRegister.INSTANCE.include(modelName)) {
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Record> recordClass) {
if (DisableRegister.INSTANCE.include(stream.name())) {
return;
}
Class<? extends StorageBuilder> builderClass = StorageEntityAnnotationUtils.getBuilder(recordClass);
StorageDAO storageDAO = moduleManager.find(StorageModule.NAME).provider().getService(StorageDAO.class);
StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IRecordDAO recordDAO;
try {
recordDAO = storageDAO.newRecordDao(builderClass.newInstance());
recordDAO = storageDAO.newRecordDao(stream.storage().builder().newInstance());
} catch (InstantiationException | IllegalAccessException e) {
throw new UnexpectedException("Create " + builderClass.getSimpleName() + " record DAO failure.", e);
throw new UnexpectedException("Create " + stream.storage().builder().getSimpleName() + " record DAO failure.", e);
}
RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleManager, modelName, 1000, recordDAO);
RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, stream.name(), 1000, recordDAO);
persistentWorkers.add(persistentWorker);
workers.put(recordClass, persistentWorker);
}
......
......@@ -21,52 +21,51 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
import org.apache.skywalking.oap.server.core.analysis.manual.database.TopNDatabaseStatement;
import org.apache.skywalking.oap.server.core.analysis.*;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
* TopN is a special process, which hold a certain size of windows, and cache all top N records, save to the persistence
* in low frequence.
* in low frequency.
*
* @author wusheng
*/
public enum TopNProcess {
INSTANCE;
public class TopNStreamProcessor implements StreamProcessor<TopN> {
private static final TopNStreamProcessor PROCESSOR = new TopNStreamProcessor();
@Getter private List<TopNWorker> persistentWorkers = new ArrayList<>();
private Map<Class<? extends Record>, TopNWorker> workers = new HashMap<>();
public void create(ModuleManager moduleManager, Class<? extends TopN> topNClass) {
String modelName = StorageEntityAnnotationUtils.getModelName(topNClass);
public static TopNStreamProcessor getInstance() {
return PROCESSOR;
}
if (DisableRegister.INSTANCE.include(modelName)) {
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends TopN> topNClass) {
if (DisableRegister.INSTANCE.include(stream.name())) {
return;
}
Class<? extends StorageBuilder> builderClass = StorageEntityAnnotationUtils.getBuilder(topNClass);
StorageDAO storageDAO = moduleManager.find(StorageModule.NAME).provider().getService(StorageDAO.class);
StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IRecordDAO recordDAO;
try {
recordDAO = storageDAO.newRecordDao(builderClass.newInstance());
recordDAO = storageDAO.newRecordDao(stream.storage().builder().newInstance());
} catch (InstantiationException | IllegalAccessException e) {
throw new UnexpectedException("Create " + builderClass.getSimpleName() + " top n record DAO failure.", e);
throw new UnexpectedException("Create " + stream.storage().builder().getSimpleName() + " top n record DAO failure.", e);
}
TopNWorker persistentWorker = new TopNWorker(moduleManager, modelName, 50, recordDAO);
TopNWorker persistentWorker = new TopNWorker(moduleDefineHolder, stream.name(), 50, recordDAO);
persistentWorkers.add(persistentWorker);
workers.put(topNClass, persistentWorker);
}
public void in(TopNDatabaseStatement statement) {
TopNWorker worker = workers.get(statement.getClass());
public void in(TopN topN) {
TopNWorker worker = workers.get(topN.getClass());
if (worker != null) {
worker.in(statement);
worker.in(topN);
}
}
}
......@@ -52,16 +52,14 @@ public class AnnotationScan {
}
}
listeners.forEach(listener ->
listener.complete()
);
listeners.forEach(AnnotationListenerCache::complete);
if (callBack != null) {
callBack.run();
}
}
public class AnnotationListenerCache {
private class AnnotationListenerCache {
private AnnotationListener listener;
private List<Class<?>> matchedClass;
......
......@@ -21,8 +21,8 @@ package org.apache.skywalking.oap.server.core.register;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.register.annotation.InventoryType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProcessor;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
......@@ -34,13 +34,11 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.EN
/**
* @author peng-yongsheng
*/
@InventoryType
@StreamData
@ScopeDeclaration(id = ENDPOINT_INVENTORY, name = "EndpointInventory")
@StorageEntity(name = EndpointInventory.MODEL_NAME, builder = EndpointInventory.Builder.class, deleteHistory = false, sourceScopeId = DefaultScopeDefine.ENDPOINT_INVENTORY)
@Stream(name = EndpointInventory.INDEX_NAME, scopeId = DefaultScopeDefine.ENDPOINT_INVENTORY, storage = @Storage(builder = EndpointInventory.Builder.class, deleteHistory = false), processor = InventoryStreamProcessor.class)
public class EndpointInventory extends RegisterSource {
public static final String MODEL_NAME = "endpoint_inventory";
public static final String INDEX_NAME = "endpoint_inventory";
public static final String SERVICE_ID = "service_id";
public static final String NAME = "name";
......
......@@ -21,8 +21,8 @@ package org.apache.skywalking.oap.server.core.register;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.register.annotation.InventoryType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProcessor;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
......@@ -34,16 +34,14 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.NE
/**
* @author peng-yongsheng
*/
@InventoryType
@StreamData
@ScopeDeclaration(id = NETWORK_ADDRESS, name = "NetworkAddress")
@StorageEntity(name = NetworkAddressInventory.MODEL_NAME, builder = NetworkAddressInventory.Builder.class, deleteHistory = false, sourceScopeId = DefaultScopeDefine.NETWORK_ADDRESS)
@Stream(name = NetworkAddressInventory.INDEX_NAME, scopeId = DefaultScopeDefine.NETWORK_ADDRESS, storage = @Storage(builder = NetworkAddressInventory.Builder.class, deleteHistory = false), processor = InventoryStreamProcessor.class)
public class NetworkAddressInventory extends RegisterSource {
public static final String MODEL_NAME = "network_address_inventory";
public static final String INDEX_NAME = "network_address_inventory";
private static final String NAME = "name";
public static final String NODE_TYPE = "node_type";
private static final String NODE_TYPE = "node_type";
@Setter @Getter @Column(columnName = NAME, matchQuery = true) private String name = Const.EMPTY_STRING;
@Setter(AccessLevel.PRIVATE) @Getter(AccessLevel.PRIVATE) @Column(columnName = NODE_TYPE) private int nodeType;
......
......@@ -23,8 +23,8 @@ import com.google.gson.reflect.TypeToken;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.register.annotation.InventoryType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProcessor;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
......@@ -37,13 +37,11 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SE
/**
* @author peng-yongsheng
*/
@InventoryType
@StreamData
@ScopeDeclaration(id = SERVICE_INSTANCE_INVENTORY, name = "ServiceInstanceInventory")
@StorageEntity(name = ServiceInstanceInventory.MODEL_NAME, builder = ServiceInstanceInventory.Builder.class, deleteHistory = false, sourceScopeId = DefaultScopeDefine.SERVICE_INSTANCE_INVENTORY)
@Stream(name = ServiceInstanceInventory.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_INSTANCE_INVENTORY, storage = @Storage(builder = ServiceInstanceInventory.Builder.class, deleteHistory = false), processor = InventoryStreamProcessor.class)
public class ServiceInstanceInventory extends RegisterSource {
public static final String MODEL_NAME = "service_instance_inventory";
public static final String INDEX_NAME = "service_instance_inventory";
public static final String NAME = "name";
public static final String INSTANCE_UUID = "instance_uuid";
......@@ -105,7 +103,6 @@ public class ServiceInstanceInventory extends RegisterSource {
return prop != null && prop.length() > 0;
}
@Override public boolean equals(Object obj) {
if (this == obj)
return true;
......
......@@ -22,8 +22,8 @@ import com.google.gson.*;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.register.annotation.InventoryType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProcessor;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
......@@ -36,13 +36,11 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SE
/**
* @author peng-yongsheng
*/
@InventoryType
@StreamData
@ScopeDeclaration(id = SERVICE_INVENTORY, name = "ServiceInventory")
@StorageEntity(name = ServiceInventory.MODEL_NAME, builder = ServiceInventory.Builder.class, deleteHistory = false, sourceScopeId = DefaultScopeDefine.SERVICE_INVENTORY)
@Stream(name = ServiceInventory.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_INVENTORY, storage = @Storage(builder = ServiceInventory.Builder.class, deleteHistory = false), processor = InventoryStreamProcessor.class)
public class ServiceInventory extends RegisterSource {
public static final String MODEL_NAME = "service_inventory";
public static final String INDEX_NAME = "service_inventory";
public static final String NAME = "name";
public static final String IS_ADDRESS = "is_address";
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.register.annotation;
import java.lang.annotation.Annotation;
import org.apache.skywalking.oap.server.core.annotation.AnnotationListener;
import org.apache.skywalking.oap.server.core.register.worker.InventoryProcess;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* @author peng-yongsheng
*/
public class InventoryTypeListener implements AnnotationListener {
private final ModuleManager moduleManager;
public InventoryTypeListener(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
@Override public Class<? extends Annotation> annotation() {
return InventoryType.class;
}
@Override public void notify(Class aClass) {
InventoryProcess.INSTANCE.create(moduleManager, aClass);
}
}
......@@ -22,9 +22,9 @@ import java.util.Objects;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
import org.apache.skywalking.oap.server.core.register.EndpointInventory;
import org.apache.skywalking.oap.server.core.register.worker.InventoryProcess;
import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProcessor;
import org.apache.skywalking.oap.server.core.source.DetectPoint;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*;
import static java.util.Objects.isNull;
......@@ -36,16 +36,16 @@ public class EndpointInventoryRegister implements IEndpointInventoryRegister {
private static final Logger logger = LoggerFactory.getLogger(EndpointInventoryRegister.class);
private final ModuleManager moduleManager;
private final ModuleDefineHolder moduleDefineHolder;
private EndpointInventoryCache cacheService;
public EndpointInventoryRegister(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
public EndpointInventoryRegister(ModuleDefineHolder moduleDefineHolder) {
this.moduleDefineHolder = moduleDefineHolder;
}
private EndpointInventoryCache getCacheService() {
if (isNull(cacheService)) {
cacheService = moduleManager.find(CoreModule.NAME).provider().getService(EndpointInventoryCache.class);
cacheService = moduleDefineHolder.find(CoreModule.NAME).provider().getService(EndpointInventoryCache.class);
}
return cacheService;
}
......@@ -63,7 +63,7 @@ public class EndpointInventoryRegister implements IEndpointInventoryRegister {
endpointInventory.setRegisterTime(now);
endpointInventory.setHeartbeatTime(now);
InventoryProcess.INSTANCE.in(endpointInventory);
InventoryStreamProcessor.getInstance().in(endpointInventory);
}
return endpointId;
}
......@@ -77,7 +77,7 @@ public class EndpointInventoryRegister implements IEndpointInventoryRegister {
if (Objects.nonNull(endpointInventory)) {
endpointInventory.setHeartbeatTime(heartBeatTime);
InventoryProcess.INSTANCE.in(endpointInventory);
InventoryStreamProcessor.getInstance().in(endpointInventory);
} else {
logger.warn("Endpoint {} heartbeat, but not found in storage.", endpointId);
}
......
......@@ -21,10 +21,10 @@ package org.apache.skywalking.oap.server.core.register.service;
import com.google.gson.JsonObject;
import java.util.Objects;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache;
import org.apache.skywalking.oap.server.core.register.*;
import org.apache.skywalking.oap.server.core.register.worker.InventoryProcess;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProcessor;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*;
import static java.util.Objects.isNull;
......@@ -36,40 +36,32 @@ public class NetworkAddressInventoryRegister implements INetworkAddressInventory
private static final Logger logger = LoggerFactory.getLogger(NetworkAddressInventoryRegister.class);
private final ModuleManager moduleManager;
private ServiceInventoryCache serviceInventoryCache;
private final ModuleDefineHolder moduleDefineHolder;
private NetworkAddressInventoryCache networkAddressInventoryCache;
private IServiceInventoryRegister serviceInventoryRegister;
private IServiceInstanceInventoryRegister serviceInstanceInventoryRegister;
public NetworkAddressInventoryRegister(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
private ServiceInventoryCache getServiceInventoryCache() {
if (isNull(serviceInventoryCache)) {
this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class);
}
return this.serviceInventoryCache;
public NetworkAddressInventoryRegister(ModuleDefineHolder moduleDefineHolder) {
this.moduleDefineHolder = moduleDefineHolder;
}
private NetworkAddressInventoryCache getNetworkAddressInventoryCache() {
if (isNull(networkAddressInventoryCache)) {
this.networkAddressInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(NetworkAddressInventoryCache.class);
this.networkAddressInventoryCache = moduleDefineHolder.find(CoreModule.NAME).provider().getService(NetworkAddressInventoryCache.class);
}
return this.networkAddressInventoryCache;
}
private IServiceInventoryRegister getServiceInventoryRegister() {
if (isNull(serviceInventoryRegister)) {
this.serviceInventoryRegister = moduleManager.find(CoreModule.NAME).provider().getService(IServiceInventoryRegister.class);
this.serviceInventoryRegister = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IServiceInventoryRegister.class);
}
return this.serviceInventoryRegister;
}
private IServiceInstanceInventoryRegister getServiceInstanceInventoryRegister() {
if (isNull(serviceInstanceInventoryRegister)) {
this.serviceInstanceInventoryRegister = moduleManager.find(CoreModule.NAME).provider().getService(IServiceInstanceInventoryRegister.class);
this.serviceInstanceInventoryRegister = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IServiceInstanceInventoryRegister.class);
}
return this.serviceInstanceInventoryRegister;
}
......@@ -95,7 +87,7 @@ public class NetworkAddressInventoryRegister implements INetworkAddressInventory
newNetworkAddress.setRegisterTime(now);
newNetworkAddress.setHeartbeatTime(now);
InventoryProcess.INSTANCE.in(newNetworkAddress);
InventoryStreamProcessor.getInstance().in(newNetworkAddress);
}
return Const.NONE;
......@@ -111,7 +103,7 @@ public class NetworkAddressInventoryRegister implements INetworkAddressInventory
networkAddress = networkAddress.getClone();
networkAddress.setHeartbeatTime(heartBeatTime);
InventoryProcess.INSTANCE.in(networkAddress);
InventoryStreamProcessor.getInstance().in(networkAddress);
} else {
logger.warn("Network getAddress {} heartbeat, but not found in storage.", addressId);
}
......@@ -125,7 +117,7 @@ public class NetworkAddressInventoryRegister implements INetworkAddressInventory
newNetworkAddress.setNetworkAddressNodeType(nodeType);
newNetworkAddress.setHeartbeatTime(System.currentTimeMillis());
InventoryProcess.INSTANCE.in(newNetworkAddress);
InventoryStreamProcessor.getInstance().in(newNetworkAddress);
}
}
......
......@@ -23,8 +23,8 @@ import java.util.Objects;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
import org.apache.skywalking.oap.server.core.register.worker.InventoryProcess;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProcessor;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.slf4j.*;
......@@ -37,16 +37,16 @@ public class ServiceInstanceInventoryRegister implements IServiceInstanceInvento
private static final Logger logger = LoggerFactory.getLogger(ServiceInstanceInventoryRegister.class);
private final ModuleManager moduleManager;
private final ModuleDefineHolder moduleDefineHolder;
private ServiceInstanceInventoryCache serviceInstanceInventoryCache;
public ServiceInstanceInventoryRegister(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
public ServiceInstanceInventoryRegister(ModuleDefineHolder moduleDefineHolder) {
this.moduleDefineHolder = moduleDefineHolder;
}
private ServiceInstanceInventoryCache getServiceInstanceInventoryCache() {
if (isNull(serviceInstanceInventoryCache)) {
serviceInstanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class);
serviceInstanceInventoryCache = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class);
}
return serviceInstanceInventoryCache;
}
......@@ -72,7 +72,7 @@ public class ServiceInstanceInventoryRegister implements IServiceInstanceInvento
serviceInstanceInventory.setProperties(properties);
InventoryProcess.INSTANCE.in(serviceInstanceInventory);
InventoryStreamProcessor.getInstance().in(serviceInstanceInventory);
}
return serviceInstanceId;
}
......@@ -94,7 +94,7 @@ public class ServiceInstanceInventoryRegister implements IServiceInstanceInvento
serviceInstanceInventory.setRegisterTime(registerTime);
serviceInstanceInventory.setHeartbeatTime(registerTime);
InventoryProcess.INSTANCE.in(serviceInstanceInventory);
InventoryStreamProcessor.getInstance().in(serviceInstanceInventory);
}
return serviceInstanceId;
}
......@@ -103,7 +103,7 @@ public class ServiceInstanceInventoryRegister implements IServiceInstanceInvento
ServiceInstanceInventory serviceInstanceInventory = getServiceInstanceInventoryCache().get(serviceInstanceId);
if (Objects.nonNull(serviceInstanceInventory)) {
serviceInstanceInventory.setHeartbeatTime(heartBeatTime);
InventoryProcess.INSTANCE.in(serviceInstanceInventory);
InventoryStreamProcessor.getInstance().in(serviceInstanceInventory);
} else {
logger.warn("Service instance {} heartbeat, but not found in storage.", serviceInstanceId);
}
......
......@@ -23,8 +23,8 @@ import java.util.Objects;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
import org.apache.skywalking.oap.server.core.register.*;
import org.apache.skywalking.oap.server.core.register.worker.InventoryProcess;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProcessor;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.slf4j.*;
......@@ -37,16 +37,16 @@ public class ServiceInventoryRegister implements IServiceInventoryRegister {
private static final Logger logger = LoggerFactory.getLogger(ServiceInventoryRegister.class);
private final ModuleManager moduleManager;
private final ModuleDefineHolder moduleDefineHolder;
private ServiceInventoryCache serviceInventoryCache;
public ServiceInventoryRegister(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
public ServiceInventoryRegister(ModuleDefineHolder moduleDefineHolder) {
this.moduleDefineHolder = moduleDefineHolder;
}
private ServiceInventoryCache getServiceInventoryCache() {
if (isNull(serviceInventoryCache)) {
this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class);
this.serviceInventoryCache = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class);
}
return serviceInventoryCache;
}
......@@ -67,7 +67,7 @@ public class ServiceInventoryRegister implements IServiceInventoryRegister {
serviceInventory.setMappingLastUpdateTime(now);
serviceInventory.setProperties(properties);
InventoryProcess.INSTANCE.in(serviceInventory);
InventoryStreamProcessor.getInstance().in(serviceInventory);
}
return serviceId;
}
......@@ -86,7 +86,7 @@ public class ServiceInventoryRegister implements IServiceInventoryRegister {
serviceInventory.setHeartbeatTime(now);
serviceInventory.setMappingLastUpdateTime(now);
InventoryProcess.INSTANCE.in(serviceInventory);
InventoryStreamProcessor.getInstance().in(serviceInventory);
}
return serviceId;
}
......@@ -100,7 +100,7 @@ public class ServiceInventoryRegister implements IServiceInventoryRegister {
serviceInventory.setProperties(properties);
serviceInventory.setMappingLastUpdateTime(System.currentTimeMillis());
InventoryProcess.INSTANCE.in(serviceInventory);
InventoryStreamProcessor.getInstance().in(serviceInventory);
}
} else {
logger.warn("Service {} nodeType/properties update, but not found in storage.", serviceId);
......@@ -113,7 +113,7 @@ public class ServiceInventoryRegister implements IServiceInventoryRegister {
serviceInventory = serviceInventory.getClone();
serviceInventory.setHeartbeatTime(heartBeatTime);
InventoryProcess.INSTANCE.in(serviceInventory);
InventoryStreamProcessor.getInstance().in(serviceInventory);
} else {
logger.warn("Service {} heartbeat, but not found in storage.", serviceId);
}
......@@ -126,7 +126,7 @@ public class ServiceInventoryRegister implements IServiceInventoryRegister {
serviceInventory.setMappingServiceId(mappingServiceId);
serviceInventory.setMappingLastUpdateTime(System.currentTimeMillis());
InventoryProcess.INSTANCE.in(serviceInventory);
InventoryStreamProcessor.getInstance().in(serviceInventory);
} else {
logger.warn("Service {} mapping update, but not found in storage.", serviceId);
}
......
......@@ -20,42 +20,42 @@ package org.apache.skywalking.oap.server.core.register.worker;
import java.util.*;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.*;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
* @author peng-yongsheng
*/
public enum InventoryProcess {
INSTANCE;
public class InventoryStreamProcessor implements StreamProcessor<RegisterSource> {
private static final InventoryStreamProcessor PROCESSOR = new InventoryStreamProcessor();
private Map<Class<? extends RegisterSource>, RegisterDistinctWorker> entryWorkers = new HashMap<>();
public static InventoryStreamProcessor getInstance() {
return PROCESSOR;
}
public void in(RegisterSource registerSource) {
entryWorkers.get(registerSource.getClass()).in(registerSource);
}
public void create(ModuleManager moduleManager, Class<? extends RegisterSource> inventoryClass) {
String modelName = StorageEntityAnnotationUtils.getModelName(inventoryClass);
int scopeId = StorageEntityAnnotationUtils.getSourceScope(inventoryClass);
Class<? extends StorageBuilder> builderClass = StorageEntityAnnotationUtils.getBuilder(inventoryClass);
StorageDAO storageDAO = moduleManager.find(StorageModule.NAME).provider().getService(StorageDAO.class);
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends RegisterSource> inventoryClass) {
StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IRegisterDAO registerDAO;
try {
registerDAO = storageDAO.newRegisterDao(builderClass.newInstance());
registerDAO = storageDAO.newRegisterDao(stream.storage().builder().newInstance());
} catch (InstantiationException | IllegalAccessException e) {
throw new UnexpectedException("Create " + builderClass.getSimpleName() + " register DAO failure.", e);
throw new UnexpectedException("Create " + stream.storage().builder().getSimpleName() + " register DAO failure.", e);
}
RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(moduleManager, modelName, registerDAO, scopeId);
RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(moduleDefineHolder, stream.name(), registerDAO, stream.scopeId());
RegisterRemoteWorker remoteWorker = new RegisterRemoteWorker(moduleManager, persistentWorker);
RegisterRemoteWorker remoteWorker = new RegisterRemoteWorker(moduleDefineHolder, persistentWorker);
RegisterDistinctWorker distinctWorker = new RegisterDistinctWorker(moduleManager, remoteWorker);
RegisterDistinctWorker distinctWorker = new RegisterDistinctWorker(moduleDefineHolder, remoteWorker);
entryWorkers.put(inventoryClass, distinctWorker);
}
......
......@@ -21,8 +21,8 @@ package org.apache.skywalking.oap.server.core.remote;
import io.grpc.stub.StreamObserver;
import java.util.Objects;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
......@@ -43,7 +43,7 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas
private static final Logger logger = LoggerFactory.getLogger(RemoteServiceHandler.class);
private final ModuleDefineHolder moduleDefineHolder;
private StreamDataClassGetter streamDataClassGetter;
private StreamDataMappingGetter streamDataMappingGetter;
private IWorkerInstanceGetter workerInstanceGetter;
private CounterMetrics remoteInCounter;
private CounterMetrics remoteInErrorCounter;
......@@ -64,10 +64,10 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas
}
@Override public StreamObserver<RemoteMessage> call(StreamObserver<Empty> responseObserver) {
if (Objects.isNull(streamDataClassGetter)) {
if (Objects.isNull(streamDataMappingGetter)) {
synchronized (RemoteServiceHandler.class) {
if (Objects.isNull(streamDataClassGetter)) {
streamDataClassGetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataClassGetter.class);
if (Objects.isNull(streamDataMappingGetter)) {
streamDataMappingGetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingGetter.class);
}
}
}
......@@ -89,7 +89,7 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas
int nextWorkerId = message.getNextWorkerId();
RemoteData remoteData = message.getRemoteData();
Class<StreamData> streamDataClass = streamDataClassGetter.findClassById(streamDataId);
Class<? extends StreamData> streamDataClass = streamDataMappingGetter.findClassById(streamDataId);
try {
StreamData streamData = streamDataClass.newInstance();
streamData.deserialize(remoteData);
......
......@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
......@@ -47,7 +47,7 @@ public class GRPCRemoteClient implements RemoteClient {
private final int channelSize;
private final int bufferSize;
private final Address address;
private final StreamDataClassGetter streamDataClassGetter;
private final StreamDataMappingGetter streamDataMappingGetter;
private final AtomicInteger concurrentStreamObserverNumber = new AtomicInteger(0);
private GRPCClient client;
private DataCarrier<RemoteMessage> carrier;
......@@ -56,9 +56,9 @@ public class GRPCRemoteClient implements RemoteClient {
private CounterMetrics remoteOutErrorCounter;
public GRPCRemoteClient(ModuleDefineHolder moduleDefineHolder, StreamDataClassGetter streamDataClassGetter, Address address, int channelSize,
public GRPCRemoteClient(ModuleDefineHolder moduleDefineHolder, StreamDataMappingGetter streamDataMappingGetter, Address address, int channelSize,
int bufferSize) {
this.streamDataClassGetter = streamDataClassGetter;
this.streamDataMappingGetter = streamDataMappingGetter;
this.address = address;
this.channelSize = channelSize;
this.bufferSize = bufferSize;
......@@ -122,7 +122,7 @@ public class GRPCRemoteClient implements RemoteClient {
* @param streamData the entity contains the values.
*/
@Override public void push(int nextWorkerId, StreamData streamData) {
int streamDataId = streamDataClassGetter.findIdByClass(streamData.getClass());
int streamDataId = streamDataMappingGetter.findIdByClass(streamData.getClass());
RemoteMessage.Builder builder = RemoteMessage.newBuilder();
builder.setNextWorkerId(nextWorkerId);
builder.setStreamDataId(streamDataId);
......
......@@ -22,7 +22,7 @@ import java.util.*;
import java.util.concurrent.*;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
......@@ -39,7 +39,7 @@ public class RemoteClientManager implements Service {
private static final Logger logger = LoggerFactory.getLogger(RemoteClientManager.class);
private final ModuleDefineHolder moduleDefineHolder;
private StreamDataClassGetter streamDataClassGetter;
private StreamDataMappingGetter streamDataMappingGetter;
private ClusterNodesQuery clusterNodesQuery;
private final List<RemoteClient> clientsA;
private final List<RemoteClient> clientsB;
......@@ -76,10 +76,10 @@ public class RemoteClientManager implements Service {
}
}
if (Objects.isNull(streamDataClassGetter)) {
if (Objects.isNull(streamDataMappingGetter)) {
synchronized (RemoteClientManager.class) {
if (Objects.isNull(streamDataClassGetter)) {
this.streamDataClassGetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataClassGetter.class);
if (Objects.isNull(streamDataMappingGetter)) {
this.streamDataMappingGetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataMappingGetter.class);
}
}
}
......@@ -199,7 +199,7 @@ public class RemoteClientManager implements Service {
RemoteClient client = new SelfRemoteClient(moduleDefineHolder, address);
getFreeClients().add(client);
} else {
RemoteClient client = new GRPCRemoteClient(moduleDefineHolder, streamDataClassGetter, address, 1, 3000);
RemoteClient client = new GRPCRemoteClient(moduleDefineHolder, streamDataMappingGetter, address, 1, 3000);
client.connect();
getFreeClients().add(client);
}
......
......@@ -16,44 +16,40 @@
*
*/
package org.apache.skywalking.oap.server.core.remote.annotation;
package org.apache.skywalking.oap.server.core.remote.define;
import java.util.*;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class StreamDataAnnotationContainer implements StreamDataClassGetter {
private static final Logger logger = LoggerFactory.getLogger(StreamDataAnnotationContainer.class);
public class StreamDataMapping implements StreamDataMappingGetter, StreamDataMappingSetter {
private int id = 0;
private final Map<Class<StreamData>, Integer> classMap;
private final Map<Integer, Class<StreamData>> idMap;
private final Map<Class<? extends StreamData>, Integer> classMap;
private final Map<Integer, Class<? extends StreamData>> idMap;
public StreamDataAnnotationContainer() {
public StreamDataMapping() {
this.classMap = new HashMap<>();
this.idMap = new HashMap<>();
}
@SuppressWarnings(value = "unchecked")
public synchronized void generate(List<Class> streamDataClasses) {
streamDataClasses.sort(Comparator.comparing(Class::getName));
for (Class streamDataClass : streamDataClasses) {
id++;
classMap.put(streamDataClass, id);
idMap.put(id, streamDataClass);
@Override public synchronized void putIfAbsent(Class<? extends StreamData> streamDataClass) {
if (classMap.containsKey(streamDataClass)) {
return;
}
id++;
classMap.put(streamDataClass, id);
idMap.put(id, streamDataClass);
}
@Override public int findIdByClass(Class streamDataClass) {
@Override public int findIdByClass(Class<? extends StreamData> streamDataClass) {
return classMap.get(streamDataClass);
}
@Override public Class<StreamData> findClassById(int id) {
@Override public Class<? extends StreamData> findClassById(int id) {
return idMap.get(id);
}
}
......@@ -16,17 +16,17 @@
*
*/
package org.apache.skywalking.oap.server.core.register.annotation;
package org.apache.skywalking.oap.server.core.remote.define;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* @author peng-yongsheng
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface InventoryType {
public interface StreamDataMappingGetter extends Service {
int findIdByClass(Class<? extends StreamData> streamDataClass);
Class<? extends StreamData> findClassById(int id);
}
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.core.remote.annotation;
package org.apache.skywalking.oap.server.core.remote.define;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.library.module.Service;
......@@ -24,9 +24,6 @@ import org.apache.skywalking.oap.server.library.module.Service;
/**
* @author peng-yongsheng
*/
public interface StreamDataClassGetter extends Service {
int findIdByClass(Class streamDataClass);
Class<StreamData> findClassById(int id);
public interface StreamDataMappingSetter extends Service {
void putIfAbsent(Class<? extends StreamData> streamDataClass);
}
......@@ -82,9 +82,9 @@ public enum PersistenceTimer {
List batchAllCollection = new LinkedList();
try {
List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
persistenceWorkers.addAll(MetricsProcess.INSTANCE.getPersistentWorkers());
persistenceWorkers.addAll(RecordProcess.INSTANCE.getPersistentWorkers());
persistenceWorkers.addAll(TopNProcess.INSTANCE.getPersistentWorkers());
persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
persistenceWorkers.addAll(RecordStreamProcessor.getInstance().getPersistentWorkers());
persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
persistenceWorkers.forEach(worker -> {
if (logger.isDebugEnabled()) {
......
......@@ -26,15 +26,9 @@ import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface StorageEntity {
String name();
public @interface Storage {
Class<? extends StorageBuilder> builder();
/**
* @return scope id.
*/
int sourceScopeId();
boolean deleteHistory() default true;
}
......@@ -26,18 +26,9 @@ import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
*/
public class StorageEntityAnnotationUtils {
public static String getModelName(Class aClass) {
if (aClass.isAnnotationPresent(StorageEntity.class)) {
StorageEntity annotation = (StorageEntity)aClass.getAnnotation(StorageEntity.class);
return annotation.name();
} else {
throw new UnexpectedException("Fail to get model name from class " + aClass.getSimpleName());
}
}
public static boolean getDeleteHistory(Class aClass) {
if (aClass.isAnnotationPresent(StorageEntity.class)) {
StorageEntity annotation = (StorageEntity)aClass.getAnnotation(StorageEntity.class);
if (aClass.isAnnotationPresent(Storage.class)) {
Storage annotation = (Storage)aClass.getAnnotation(Storage.class);
return annotation.deleteHistory();
} else {
throw new UnexpectedException("Fail to get delete history tag from class " + aClass.getSimpleName());
......@@ -45,20 +36,11 @@ public class StorageEntityAnnotationUtils {
}
public static Class<? extends StorageBuilder> getBuilder(Class aClass) {
if (aClass.isAnnotationPresent(StorageEntity.class)) {
StorageEntity annotation = (StorageEntity)aClass.getAnnotation(StorageEntity.class);
if (aClass.isAnnotationPresent(Storage.class)) {
Storage annotation = (Storage)aClass.getAnnotation(Storage.class);
return annotation.builder();
} else {
throw new UnexpectedException("Fail to get entity builder from class " + aClass.getSimpleName());
}
}
public static int getSourceScope(Class aClass) {
if (aClass.isAnnotationPresent(StorageEntity.class)) {
StorageEntity annotation = (StorageEntity)aClass.getAnnotation(StorageEntity.class);
return annotation.sourceScopeId();
} else {
throw new UnexpectedException("Fail to get source scope from class " + aClass.getSimpleName());
}
}
}
......@@ -18,12 +18,9 @@
package org.apache.skywalking.oap.server.core.storage.annotation;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.util.*;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.MetricsAnnotationUtils;
import org.apache.skywalking.oap.server.core.annotation.AnnotationListener;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.model.*;
import org.slf4j.*;
......@@ -31,33 +28,23 @@ import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class StorageAnnotationListener implements AnnotationListener, IModelGetter, IModelOverride {
public class StorageModels implements IModelGetter, IModelSetter, IModelOverride {
private static final Logger logger = LoggerFactory.getLogger(StorageAnnotationListener.class);
private static final Logger logger = LoggerFactory.getLogger(StorageModels.class);
@Getter private final List<Model> models;
public StorageAnnotationListener() {
public StorageModels() {
this.models = new LinkedList<>();
}
@Override public Class<? extends Annotation> annotation() {
return StorageEntity.class;
}
@Override public void notify(Class aClass) {
logger.info("The owner class of storage annotation, class name: {}", aClass.getName());
String modelName = StorageEntityAnnotationUtils.getModelName(aClass);
boolean deleteHistory = StorageEntityAnnotationUtils.getDeleteHistory(aClass);
int sourceScopeId = StorageEntityAnnotationUtils.getSourceScope(aClass);
@Override public void putIfAbsent(Class aClass, boolean isMetrics, String modelName, int scopeId, Storage storage) {
// Check this scope id is valid.
DefaultScopeDefine.nameOf(sourceScopeId);
DefaultScopeDefine.nameOf(scopeId);
List<ModelColumn> modelColumns = new LinkedList<>();
boolean isMetrics = MetricsAnnotationUtils.isMetrics(aClass);
retrieval(aClass, modelName, modelColumns);
models.add(new Model(modelName, modelColumns, isMetrics, deleteHistory, sourceScopeId));
models.add(new Model(modelName, modelColumns, isMetrics, storage.deleteHistory(), scopeId));
}
private void retrieval(Class clazz, String modelName, List<ModelColumn> modelColumns) {
......
......@@ -16,14 +16,14 @@
*
*/
package org.apache.skywalking.oap.server.core.analysis.metrics.annotation;
package org.apache.skywalking.oap.server.core.storage.model;
import java.lang.annotation.*;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* @author peng-yongsheng
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface MetricsType {
public interface IModelSetter extends Service {
void putIfAbsent(Class aClass, boolean isMetrics, String modelName, int scopeId, Storage storage);
}
......@@ -23,8 +23,8 @@ import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.*;
......@@ -52,11 +52,11 @@ public class RemoteServiceHandlerTestCase {
ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
moduleManager.put(CoreModule.NAME, moduleDefine);
StreamDataClassGetter classGetter = mock(StreamDataClassGetter.class);
Class<?> dataClass = TestRemoteData.class;
when(classGetter.findClassById(streamDataClassId)).thenReturn((Class<StreamData>)dataClass);
StreamDataMappingGetter classGetter = mock(StreamDataMappingGetter.class);
Class dataClass = TestRemoteData.class;
when(classGetter.findClassById(streamDataClassId)).thenReturn(dataClass);
moduleDefine.provider().registerServiceImplementation(StreamDataClassGetter.class, classGetter);
moduleDefine.provider().registerServiceImplementation(StreamDataMappingGetter.class, classGetter);
String serverName = InProcessServerBuilder.generateName();
MetricsCreator metricsCreator = mock(MetricsCreator.class);
......
......@@ -19,7 +19,7 @@
package org.apache.skywalking.oap.server.core.remote.client;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
......@@ -53,7 +53,7 @@ public class GRPCRemoteClientRealClient {
moduleManager.put(TelemetryModule.NAME, telemetryModuleDefine);
telemetryModuleDefine.provider().registerServiceImplementation(MetricsCreator.class, metricsCreator);
GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(moduleManager, new TestClassGetter(), address, 1, 10));
GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(moduleManager, new TestMappingGetter(), address, 1, 10));
remoteClient.connect();
for (int i = 0; i < 10000; i++) {
......@@ -64,7 +64,7 @@ public class GRPCRemoteClientRealClient {
TimeUnit.MINUTES.sleep(10);
}
public static class TestClassGetter implements StreamDataClassGetter {
public static class TestMappingGetter implements StreamDataMappingGetter {
@Override public int findIdByClass(Class streamDataClass) {
return 1;
......
......@@ -21,7 +21,7 @@ package org.apache.skywalking.oap.server.core.remote.client;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.testing.module.*;
......@@ -36,7 +36,7 @@ public class GRPCRemoteClientRealServer {
ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
moduleManager.put(CoreModule.NAME, moduleDefine);
moduleDefine.provider().registerServiceImplementation(StreamDataClassGetter.class, new GRPCRemoteClientRealClient.TestClassGetter());
moduleDefine.provider().registerServiceImplementation(StreamDataMappingGetter.class, new GRPCRemoteClientRealClient.TestMappingGetter());
GRPCServer server = new GRPCServer("localhost", 10000);
server.initialize();
......
......@@ -22,7 +22,7 @@ import io.grpc.testing.GrpcServerRule;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.worker.*;
......@@ -41,7 +41,7 @@ public class GRPCRemoteClientTestCase {
private final int nextWorkerId = 1;
private ModuleManagerTesting moduleManager;
private StreamDataClassGetter classGetter;
private StreamDataMappingGetter classGetter;
@Rule public final GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();
@Before
......@@ -50,8 +50,8 @@ public class GRPCRemoteClientTestCase {
ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
moduleManager.put(CoreModule.NAME, moduleDefine);
classGetter = mock(StreamDataClassGetter.class);
moduleDefine.provider().registerServiceImplementation(StreamDataClassGetter.class, classGetter);
classGetter = mock(StreamDataMappingGetter.class);
moduleDefine.provider().registerServiceImplementation(StreamDataMappingGetter.class, classGetter);
WorkerInstancesService workerInstancesService = new WorkerInstancesService();
moduleDefine.provider().registerServiceImplementation(IWorkerInstanceGetter.class, workerInstancesService);
......@@ -86,8 +86,8 @@ public class GRPCRemoteClientTestCase {
when(classGetter.findIdByClass(TestStreamData.class)).thenReturn(1);
Class<?> dataClass = TestStreamData.class;
when(classGetter.findClassById(1)).thenReturn((Class<StreamData>)dataClass);
Class dataClass = TestStreamData.class;
when(classGetter.findClassById(1)).thenReturn(dataClass);
for (int i = 0; i < 12; i++) {
remoteClient.push(nextWorkerId, new TestStreamData());
......
......@@ -21,7 +21,7 @@ package org.apache.skywalking.oap.server.core.remote.client;
import java.util.*;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.core.remote.define.StreamDataMappingGetter;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.apache.skywalking.oap.server.testing.module.*;
......@@ -46,8 +46,8 @@ public class RemoteClientManagerTestCase {
ClusterNodesQuery clusterNodesQuery = mock(ClusterNodesQuery.class);
clusterModuleDefine.provider().registerServiceImplementation(ClusterNodesQuery.class, clusterNodesQuery);
StreamDataClassGetter streamDataClassGetter = mock(StreamDataClassGetter.class);
coreModuleDefine.provider().registerServiceImplementation(StreamDataClassGetter.class, streamDataClassGetter);
StreamDataMappingGetter streamDataMappingGetter = mock(StreamDataMappingGetter.class);
coreModuleDefine.provider().registerServiceImplementation(StreamDataMappingGetter.class, streamDataMappingGetter);
MetricsCreator metricsCreator = mock(MetricsCreator.class);
when(metricsCreator.createGauge(any(), any(), any(), any())).thenReturn(new GaugeMetrics() {
......
......@@ -20,7 +20,7 @@ package org.apache.skywalking.oap.server.core.storage;
import java.util.LinkedList;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataAnnotationContainer;
import org.apache.skywalking.oap.server.core.remote.define.StreamDataMapping;
import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.module.*;
......@@ -35,7 +35,7 @@ public class StorageInstallerTestCase {
@Test
public void testInstall() throws StorageException, ServiceNotProvidedException {
StreamDataAnnotationContainer streamDataAnnotationContainer = new StreamDataAnnotationContainer();
StreamDataMapping streamDataMapping = new StreamDataMapping();
CoreModuleProvider moduleProvider = Mockito.mock(CoreModuleProvider.class);
CoreModule moduleDefine = Mockito.spy(CoreModule.class);
ModuleManager moduleManager = Mockito.mock(ModuleManager.class);
......@@ -44,9 +44,9 @@ public class StorageInstallerTestCase {
moduleProviders.add(moduleProvider);
Mockito.when(moduleManager.find(CoreModule.NAME)).thenReturn(moduleDefine);
Mockito.when(moduleProvider.getService(StreamDataAnnotationContainer.class)).thenReturn(streamDataAnnotationContainer);
Mockito.when(moduleProvider.getService(StreamDataMapping.class)).thenReturn(streamDataMapping);
// streamDataAnnotationContainer.generate();
// streamDataMapping.generate();
// TestStorageInstaller installer = new TestStorageInstaller(moduleManager);
// installer.install(null);
......
......@@ -46,7 +46,7 @@ public class EndpointInventoryCacheEsDAO extends EsDAO implements IEndpointInven
@Override public int getEndpointId(int serviceId, String endpointName, int detectPoint) {
try {
String id = EndpointInventory.buildId(serviceId, endpointName, detectPoint);
GetResponse response = getClient().get(EndpointInventory.MODEL_NAME, id);
GetResponse response = getClient().get(EndpointInventory.INDEX_NAME, id);
if (response.isExists()) {
return (int)response.getSource().getOrDefault(RegisterSource.SEQUENCE, 0);
} else {
......@@ -64,7 +64,7 @@ public class EndpointInventoryCacheEsDAO extends EsDAO implements IEndpointInven
searchSourceBuilder.query(QueryBuilders.termQuery(EndpointInventory.SEQUENCE, endpointId));
searchSourceBuilder.size(1);
SearchResponse response = getClient().search(EndpointInventory.MODEL_NAME, searchSourceBuilder);
SearchResponse response = getClient().search(EndpointInventory.INDEX_NAME, searchSourceBuilder);
if (response.getHits().totalHits == 1) {
SearchHit searchHit = response.getHits().getAt(0);
return builder.map2Data(searchHit.getSourceAsMap());
......
......@@ -46,7 +46,7 @@ public class NetworkAddressInventoryCacheEsDAO extends EsDAO implements INetwork
@Override public int getAddressId(String networkAddress) {
try {
String id = NetworkAddressInventory.buildId(networkAddress);
GetResponse response = getClient().get(NetworkAddressInventory.MODEL_NAME, id);
GetResponse response = getClient().get(NetworkAddressInventory.INDEX_NAME, id);
if (response.isExists()) {
return (int)response.getSource().getOrDefault(NetworkAddressInventory.SEQUENCE, 0);
} else {
......@@ -64,7 +64,7 @@ public class NetworkAddressInventoryCacheEsDAO extends EsDAO implements INetwork
searchSourceBuilder.query(QueryBuilders.termQuery(NetworkAddressInventory.SEQUENCE, addressId));
searchSourceBuilder.size(1);
SearchResponse response = getClient().search(NetworkAddressInventory.MODEL_NAME, searchSourceBuilder);
SearchResponse response = getClient().search(NetworkAddressInventory.INDEX_NAME, searchSourceBuilder);
if (response.getHits().totalHits == 1) {
SearchHit searchHit = response.getHits().getAt(0);
return builder.map2Data(searchHit.getSourceAsMap());
......
......@@ -49,7 +49,7 @@ public class ServiceInstanceInventoryCacheDAO extends EsDAO implements IServiceI
searchSourceBuilder.query(QueryBuilders.termQuery(ServiceInstanceInventory.SEQUENCE, serviceInstanceId));
searchSourceBuilder.size(1);
SearchResponse response = getClient().search(ServiceInstanceInventory.MODEL_NAME, searchSourceBuilder);
SearchResponse response = getClient().search(ServiceInstanceInventory.INDEX_NAME, searchSourceBuilder);
if (response.getHits().totalHits == 1) {
SearchHit searchHit = response.getHits().getAt(0);
return builder.map2Data(searchHit.getSourceAsMap());
......@@ -74,7 +74,7 @@ public class ServiceInstanceInventoryCacheDAO extends EsDAO implements IServiceI
private int get(String id) {
try {
GetResponse response = getClient().get(ServiceInstanceInventory.MODEL_NAME, id);
GetResponse response = getClient().get(ServiceInstanceInventory.INDEX_NAME, id);
if (response.isExists()) {
return (int)response.getSource().getOrDefault(RegisterSource.SEQUENCE, 0);
} else {
......
......@@ -57,7 +57,7 @@ public class ServiceInventoryCacheEsDAO extends EsDAO implements IServiceInvento
private int get(String id) {
try {
GetResponse response = getClient().get(ServiceInventory.MODEL_NAME, id);
GetResponse response = getClient().get(ServiceInventory.INDEX_NAME, id);
if (response.isExists()) {
return (int)response.getSource().getOrDefault(RegisterSource.SEQUENCE, 0);
} else {
......@@ -75,7 +75,7 @@ public class ServiceInventoryCacheEsDAO extends EsDAO implements IServiceInvento
searchSourceBuilder.query(QueryBuilders.termQuery(ServiceInventory.SEQUENCE, serviceId));
searchSourceBuilder.size(1);
SearchResponse response = getClient().search(ServiceInventory.MODEL_NAME, searchSourceBuilder);
SearchResponse response = getClient().search(ServiceInventory.INDEX_NAME, searchSourceBuilder);
if (response.getHits().totalHits == 1) {
SearchHit searchHit = response.getHits().getAt(0);
return builder.map2Data(searchHit.getSourceAsMap());
......@@ -101,7 +101,7 @@ public class ServiceInventoryCacheEsDAO extends EsDAO implements IServiceInvento
searchSourceBuilder.query(boolQuery);
searchSourceBuilder.size(50);
SearchResponse response = getClient().search(ServiceInventory.MODEL_NAME, searchSourceBuilder);
SearchResponse response = getClient().search(ServiceInventory.INDEX_NAME, searchSourceBuilder);
for (SearchHit searchHit : response.getHits().getHits()) {
serviceInventories.add(this.builder.map2Data(searchHit.getSourceAsMap()));
......
......@@ -20,9 +20,9 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock;
import com.google.gson.JsonObject;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.register.worker.InventoryProcess;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProcessor;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.common.xcontent.*;
......@@ -54,9 +54,9 @@ public class RegisterLockInstaller {
createIndex();
}
for (Class registerSource : InventoryProcess.INSTANCE.getAllRegisterSources()) {
int sourceScopeId = StorageEntityAnnotationUtils.getSourceScope(registerSource);
putIfAbsent(sourceScopeId);
for (Class registerSource : InventoryStreamProcessor.getInstance().getAllRegisterSources()) {
int scopeId = ((Stream)registerSource.getAnnotation(Stream.class)).scopeId();
putIfAbsent(scopeId);
}
} catch (IOException e) {
throw new StorageException(e.getMessage());
......
......@@ -80,7 +80,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.size(0);
SearchResponse response = getClient().search(ServiceInventory.MODEL_NAME, sourceBuilder);
SearchResponse response = getClient().search(ServiceInventory.INDEX_NAME, sourceBuilder);
return (int)response.getHits().getTotalHits();
}
......@@ -94,7 +94,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.size(0);
SearchResponse response = getClient().search(EndpointInventory.MODEL_NAME, sourceBuilder);
SearchResponse response = getClient().search(EndpointInventory.INDEX_NAME, sourceBuilder);
return (int)response.getHits().getTotalHits();
}
......@@ -105,7 +105,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
sourceBuilder.query(QueryBuilders.termQuery(ServiceInventory.NODE_TYPE, nodeTypeValue));
sourceBuilder.size(0);
SearchResponse response = getClient().search(ServiceInventory.MODEL_NAME, sourceBuilder);
SearchResponse response = getClient().search(ServiceInventory.INDEX_NAME, sourceBuilder);
return (int)response.getHits().getTotalHits();
}
......@@ -122,7 +122,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.size(queryMaxSize);
SearchResponse response = getClient().search(ServiceInventory.MODEL_NAME, sourceBuilder);
SearchResponse response = getClient().search(ServiceInventory.INDEX_NAME, sourceBuilder);
return buildServices(response);
}
......@@ -137,7 +137,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.size(queryMaxSize);
SearchResponse response = getClient().search(ServiceInventory.MODEL_NAME, sourceBuilder);
SearchResponse response = getClient().search(ServiceInventory.INDEX_NAME, sourceBuilder);
List<Database> databases = new ArrayList<>();
for (SearchHit searchHit : response.getHits()) {
......@@ -175,13 +175,13 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.size(queryMaxSize);
SearchResponse response = getClient().search(ServiceInventory.MODEL_NAME, sourceBuilder);
SearchResponse response = getClient().search(ServiceInventory.INDEX_NAME, sourceBuilder);
return buildServices(response);
}
@Override
public Service searchService(String serviceCode) throws IOException {
GetResponse response = getClient().get(ServiceInventory.MODEL_NAME, ServiceInventory.buildId(serviceCode));
GetResponse response = getClient().get(ServiceInventory.INDEX_NAME, ServiceInventory.buildId(serviceCode));
if (response.isExists()) {
Service service = new Service();
service.setId(((Number)response.getSource().get(ServiceInventory.SEQUENCE)).intValue());
......@@ -209,7 +209,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.size(limit);
SearchResponse response = getClient().search(EndpointInventory.MODEL_NAME, sourceBuilder);
SearchResponse response = getClient().search(EndpointInventory.INDEX_NAME, sourceBuilder);
List<Endpoint> endpoints = new ArrayList<>();
for (SearchHit searchHit : response.getHits()) {
......@@ -236,7 +236,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.size(queryMaxSize);
SearchResponse response = getClient().search(ServiceInstanceInventory.MODEL_NAME, sourceBuilder);
SearchResponse response = getClient().search(ServiceInstanceInventory.INDEX_NAME, sourceBuilder);
List<ServiceInstance> serviceInstances = new ArrayList<>();
for (SearchHit searchHit : response.getHits()) {
......
......@@ -22,15 +22,15 @@ import java.util.*;
import lombok.*;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordType;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
@RecordType
@StorageEntity(name = JaegerSpanRecord.INDEX_NAME, builder = JaegerSpanRecord.Builder.class, sourceScopeId = DefaultScopeDefine.JAEGER_SPAN)
@Stream(name = JaegerSpanRecord.INDEX_NAME, scopeId = DefaultScopeDefine.JAEGER_SPAN, storage = @Storage(builder = JaegerSpanRecord.Builder.class), processor = RecordStreamProcessor.class)
public class JaegerSpanRecord extends Record {
public static final String INDEX_NAME = "jaeger_span";
public static final String TRACE_ID = "trace_id";
......
......@@ -19,7 +19,7 @@
package org.apache.skywalking.oap.server.storage.plugin.jaeger;
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordProcess;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
/**
* Dispatch for Zipkin native mode spans.
......@@ -27,6 +27,7 @@ import org.apache.skywalking.oap.server.core.analysis.worker.RecordProcess;
* @author wusheng
*/
public class JaegerSpanRecordDispatcher implements SourceDispatcher<JaegerSpan> {
@Override public void dispatch(JaegerSpan source) {
JaegerSpanRecord segment = new JaegerSpanRecord();
segment.setTraceId(source.getTraceId());
......@@ -43,6 +44,6 @@ public class JaegerSpanRecordDispatcher implements SourceDispatcher<JaegerSpan>
segment.setTimeBucket(source.getTimeBucket());
segment.setEncode(source.getEncode());
RecordProcess.INSTANCE.in(segment);
RecordStreamProcessor.getInstance().in(segment);
}
}
......@@ -37,12 +37,12 @@ public class H2EndpointInventoryCacheDAO extends H2SQLExecutor implements IEndpo
@Override public int getEndpointId(int serviceId, String endpointName, int detectPoint) {
String id = EndpointInventory.buildId(serviceId, endpointName, detectPoint);
return getEntityIDByID(h2Client, EndpointInventory.SEQUENCE, EndpointInventory.MODEL_NAME, id);
return getEntityIDByID(h2Client, EndpointInventory.SEQUENCE, EndpointInventory.INDEX_NAME, id);
}
@Override public EndpointInventory get(int endpointId) {
try {
return (EndpointInventory)getByColumn(h2Client, EndpointInventory.MODEL_NAME, EndpointInventory.SEQUENCE, endpointId, new EndpointInventory.Builder());
return (EndpointInventory)getByColumn(h2Client, EndpointInventory.INDEX_NAME, EndpointInventory.SEQUENCE, endpointId, new EndpointInventory.Builder());
} catch (IOException e) {
logger.error(e.getMessage(), e);
return null;
......
......@@ -68,7 +68,7 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
public int numOfService(long startTimestamp, long endTimestamp) throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> condition = new ArrayList<>(5);
sql.append("select count(*) num from ").append(ServiceInventory.MODEL_NAME).append(" where ");
sql.append("select count(*) num from ").append(ServiceInventory.INDEX_NAME).append(" where ");
setTimeRangeCondition(sql, condition, startTimestamp, endTimestamp);
sql.append(" and ").append(ServiceInventory.IS_ADDRESS).append("=0");
......@@ -88,7 +88,7 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
public int numOfEndpoint(long startTimestamp, long endTimestamp) throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> condition = new ArrayList<>(5);
sql.append("select count(*) num from ").append(EndpointInventory.MODEL_NAME).append(" where ");
sql.append("select count(*) num from ").append(EndpointInventory.INDEX_NAME).append(" where ");
sql.append(EndpointInventory.DETECT_POINT).append("=").append(DetectPoint.SERVER.ordinal());
try (Connection connection = h2Client.getConnection()) {
......@@ -109,7 +109,7 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
int nodeTypeValue) throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> condition = new ArrayList<>(5);
sql.append("select count(*) num from ").append(ServiceInventory.MODEL_NAME).append(" where ");
sql.append("select count(*) num from ").append(ServiceInventory.INDEX_NAME).append(" where ");
sql.append(ServiceInventory.NODE_TYPE).append("=?");
condition.add(nodeTypeValue);
......@@ -129,7 +129,7 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
public List<Service> getAllServices(long startTimestamp, long endTimestamp) throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> condition = new ArrayList<>(5);
sql.append("select * from ").append(ServiceInventory.MODEL_NAME).append(" where ");
sql.append("select * from ").append(ServiceInventory.INDEX_NAME).append(" where ");
setTimeRangeCondition(sql, condition, startTimestamp, endTimestamp);
sql.append(" and ").append(ServiceInventory.IS_ADDRESS).append("=? limit ").append(metadataQueryMaxSize);
condition.add(BooleanUtils.FALSE);
......@@ -147,7 +147,7 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
public List<Database> getAllDatabases() throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> condition = new ArrayList<>(1);
sql.append("select * from ").append(ServiceInventory.MODEL_NAME).append(" where ");
sql.append("select * from ").append(ServiceInventory.INDEX_NAME).append(" where ");
sql.append(ServiceInventory.NODE_TYPE).append("=? limit ").append(metadataQueryMaxSize);
condition.add(NodeType.Database.value());
......@@ -181,7 +181,7 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
String keyword) throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> condition = new ArrayList<>(5);
sql.append("select * from ").append(ServiceInventory.MODEL_NAME).append(" where ");
sql.append("select * from ").append(ServiceInventory.INDEX_NAME).append(" where ");
setTimeRangeCondition(sql, condition, startTimestamp, endTimestamp);
sql.append(" and ").append(ServiceInventory.IS_ADDRESS).append("=?");
condition.add(BooleanUtils.FALSE);
......@@ -203,7 +203,7 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
public Service searchService(String serviceCode) throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> condition = new ArrayList<>(5);
sql.append("select * from ").append(ServiceInventory.MODEL_NAME).append(" where ");
sql.append("select * from ").append(ServiceInventory.INDEX_NAME).append(" where ");
sql.append(ServiceInventory.IS_ADDRESS).append("=?");
condition.add(BooleanUtils.FALSE);
sql.append(" and ").append(ServiceInventory.NAME).append(" = ?");
......@@ -231,7 +231,7 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
int limit) throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> condition = new ArrayList<>(5);
sql.append("select * from ").append(EndpointInventory.MODEL_NAME).append(" where ");
sql.append("select * from ").append(EndpointInventory.INDEX_NAME).append(" where ");
sql.append(EndpointInventory.SERVICE_ID).append("=?");
condition.add(serviceId);
if (!Strings.isNullOrEmpty(keyword)) {
......@@ -263,7 +263,7 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
String serviceId) throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> condition = new ArrayList<>(5);
sql.append("select * from ").append(ServiceInstanceInventory.MODEL_NAME).append(" where ");
sql.append("select * from ").append(ServiceInstanceInventory.INDEX_NAME).append(" where ");
setTimeRangeCondition(sql, condition, startTimestamp, endTimestamp);
sql.append(" and ").append(ServiceInstanceInventory.SERVICE_ID).append("=?");
condition.add(serviceId);
......
......@@ -38,12 +38,12 @@ public class H2NetworkAddressInventoryCacheDAO extends H2SQLExecutor implements
@Override public int getAddressId(String networkAddress) {
String id = NetworkAddressInventory.buildId(networkAddress);
return getEntityIDByID(h2Client, NetworkAddressInventory.SEQUENCE, NetworkAddressInventory.MODEL_NAME, id);
return getEntityIDByID(h2Client, NetworkAddressInventory.SEQUENCE, NetworkAddressInventory.INDEX_NAME, id);
}
@Override public NetworkAddressInventory get(int addressId) {
try {
return (NetworkAddressInventory)getByColumn(h2Client, NetworkAddressInventory.MODEL_NAME, NetworkAddressInventory.SEQUENCE, addressId, new NetworkAddressInventory.Builder());
return (NetworkAddressInventory)getByColumn(h2Client, NetworkAddressInventory.INDEX_NAME, NetworkAddressInventory.SEQUENCE, addressId, new NetworkAddressInventory.Builder());
} catch (IOException e) {
logger.error(e.getMessage(), e);
return null;
......
......@@ -19,10 +19,10 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.sql.*;
import org.apache.skywalking.oap.server.core.register.worker.InventoryProcess;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProcessor;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
......@@ -59,9 +59,9 @@ public class H2RegisterLockInstaller {
try (Connection connection = h2Client.getConnection()) {
h2Client.execute(connection, tableCreateSQL.toString());
for (Class registerSource : InventoryProcess.INSTANCE.getAllRegisterSources()) {
int scopeId = StorageEntityAnnotationUtils.getSourceScope(registerSource);
putIfAbsent(h2Client, connection, scopeId, DefaultScopeDefine.nameOf(scopeId));
for (Class registerSource : InventoryStreamProcessor.getInstance().getAllRegisterSources()) {
int scopeId = ((Stream)registerSource.getAnnotation(Stream.class)).scopeId();
putIfAbsent(h2Client, connection, scopeId, DefaultScopeDefine.nameOf(1));
}
} catch (JDBCClientException | SQLException e) {
throw new StorageException(e.getMessage(), e);
......
......@@ -38,7 +38,7 @@ public class H2ServiceInstanceInventoryCacheDAO extends H2SQLExecutor implements
@Override public ServiceInstanceInventory get(int serviceInstanceId) {
try {
return (ServiceInstanceInventory)getByColumn(h2Client, ServiceInstanceInventory.MODEL_NAME, ServiceInstanceInventory.SEQUENCE, serviceInstanceId, new ServiceInstanceInventory.Builder());
return (ServiceInstanceInventory)getByColumn(h2Client, ServiceInstanceInventory.INDEX_NAME, ServiceInstanceInventory.SEQUENCE, serviceInstanceId, new ServiceInstanceInventory.Builder());
} catch (IOException e) {
logger.error(e.getMessage(), e);
return null;
......@@ -56,6 +56,6 @@ public class H2ServiceInstanceInventoryCacheDAO extends H2SQLExecutor implements
}
private int getByID(String id) {
return getEntityIDByID(h2Client, ServiceInstanceInventory.SEQUENCE, ServiceInstanceInventory.MODEL_NAME, id);
return getEntityIDByID(h2Client, ServiceInstanceInventory.SEQUENCE, ServiceInstanceInventory.INDEX_NAME, id);
}
}
......@@ -19,17 +19,13 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.sql.*;
import java.util.*;
import org.apache.skywalking.oap.server.core.register.ServiceInventory;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.*;
/**
* @author wusheng
......@@ -44,17 +40,17 @@ public class H2ServiceInventoryCacheDAO extends H2SQLExecutor implements IServic
@Override public int getServiceId(String serviceName) {
String id = ServiceInventory.buildId(serviceName);
return getEntityIDByID(h2Client, ServiceInventory.SEQUENCE, ServiceInventory.MODEL_NAME, id);
return getEntityIDByID(h2Client, ServiceInventory.SEQUENCE, ServiceInventory.INDEX_NAME, id);
}
@Override public int getServiceId(int addressId) {
String id = ServiceInventory.buildId(addressId);
return getEntityIDByID(h2Client, ServiceInventory.SEQUENCE, ServiceInventory.MODEL_NAME, id);
return getEntityIDByID(h2Client, ServiceInventory.SEQUENCE, ServiceInventory.INDEX_NAME, id);
}
@Override public ServiceInventory get(int serviceId) {
try {
return (ServiceInventory)getByColumn(h2Client, ServiceInventory.MODEL_NAME, ServiceInventory.SEQUENCE, serviceId, new ServiceInventory.Builder());
return (ServiceInventory)getByColumn(h2Client, ServiceInventory.INDEX_NAME, ServiceInventory.SEQUENCE, serviceId, new ServiceInventory.Builder());
} catch (IOException e) {
logger.error(e.getMessage(), e);
return null;
......@@ -66,7 +62,7 @@ public class H2ServiceInventoryCacheDAO extends H2SQLExecutor implements IServic
try {
StringBuilder sql = new StringBuilder("select * from ");
sql.append(ServiceInventory.MODEL_NAME);
sql.append(ServiceInventory.INDEX_NAME);
sql.append(" where ").append(ServiceInventory.IS_ADDRESS).append("=? ");
sql.append(" and ").append(ServiceInventory.MAPPING_LAST_UPDATE_TIME).append(">?");
......@@ -74,7 +70,7 @@ public class H2ServiceInventoryCacheDAO extends H2SQLExecutor implements IServic
try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), BooleanUtils.TRUE, System.currentTimeMillis() - 30 * 60 * 1000)) {
ServiceInventory serviceInventory;
do {
serviceInventory = (ServiceInventory)toStorageData(resultSet, ServiceInventory.MODEL_NAME, new ServiceInventory.Builder());
serviceInventory = (ServiceInventory)toStorageData(resultSet, ServiceInventory.INDEX_NAME, new ServiceInventory.Builder());
if (serviceInventory != null) {
serviceInventories.add(serviceInventory);
}
......
......@@ -22,15 +22,15 @@ import java.util.*;
import lombok.*;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordType;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
@RecordType
@StorageEntity(name = ZipkinSpanRecord.INDEX_NAME, builder = ZipkinSpanRecord.Builder.class, sourceScopeId = DefaultScopeDefine.ZIPKIN_SPAN)
@Stream(name = ZipkinSpanRecord.INDEX_NAME, scopeId = DefaultScopeDefine.ZIPKIN_SPAN, storage = @Storage(builder = ZipkinSpanRecord.Builder.class), processor = RecordStreamProcessor.class)
public class ZipkinSpanRecord extends Record {
public static final String INDEX_NAME = "zipkin_span";
public static final String TRACE_ID = "trace_id";
......
......@@ -19,7 +19,7 @@
package org.apache.skywalking.oap.server.storage.plugin.zipkin;
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordProcess;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
/**
* Dispatch for Zipkin native mode spans.
......@@ -27,6 +27,7 @@ import org.apache.skywalking.oap.server.core.analysis.worker.RecordProcess;
* @author wusheng
*/
public class ZipkinSpanRecordDispatcher implements SourceDispatcher<ZipkinSpan> {
@Override public void dispatch(ZipkinSpan source) {
ZipkinSpanRecord segment = new ZipkinSpanRecord();
segment.setTraceId(source.getTraceId());
......@@ -43,6 +44,6 @@ public class ZipkinSpanRecordDispatcher implements SourceDispatcher<ZipkinSpan>
segment.setTimeBucket(source.getTimeBucket());
segment.setEncode(source.getEncode());
RecordProcess.INSTANCE.in(segment);
RecordStreamProcessor.getInstance().in(segment);
}
}
Subproject commit 26322717f1503bc84ea052e31696fba8a445c438
Subproject commit ef3570fb126c534d0f15c3a241944fa46ee594b7
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册