提交 1f7125a0 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

Remove storage annotation. (#2813)

上级 e1195ab1
...@@ -40,7 +40,7 @@ import org.apache.skywalking.oap.server.core.storage.annotation.*; ...@@ -40,7 +40,7 @@ import org.apache.skywalking.oap.server.core.storage.annotation.*;
* *
* @author Observability Analysis Language code generator * @author Observability Analysis Language code generator
*/ */
@Stream(name = "${tableName}", scopeId = ${sourceScopeId}, storage = @Storage(builder = ${metricsName}Metrics.Builder.class), processor = MetricsStreamProcessor.class) @Stream(name = "${tableName}", scopeId = ${sourceScopeId}, builder = ${metricsName}Metrics.Builder.class, processor = MetricsStreamProcessor.class)
public class ${metricsName}Metrics extends ${metricsClassName} implements WithMetadata { public class ${metricsName}Metrics extends ${metricsClassName} implements WithMetadata {
<#list fieldsFromSource as sourceField> <#list fieldsFromSource as sourceField>
......
...@@ -33,7 +33,7 @@ import org.apache.skywalking.oap.server.core.storage.annotation.*; ...@@ -33,7 +33,7 @@ import org.apache.skywalking.oap.server.core.storage.annotation.*;
* *
* @author Observability Analysis Language code generator * @author Observability Analysis Language code generator
*/ */
@Stream(name = "service_avg", scopeId = 1, storage = @Storage(builder = ServiceAvgMetrics.Builder.class), processor = MetricsStreamProcessor.class) @Stream(name = "service_avg", scopeId = 1, builder = ServiceAvgMetrics.Builder.class, processor = MetricsStreamProcessor.class)
public class ServiceAvgMetrics extends LongAvgMetrics implements WithMetadata { public class ServiceAvgMetrics extends LongAvgMetrics implements WithMetadata {
@Setter @Getter @Column(columnName = "entity_id") @IDColumn private java.lang.String entityId; @Setter @Getter @Column(columnName = "entity_id") @IDColumn private java.lang.String entityId;
......
...@@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.core.analysis.record.Record; ...@@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor; import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
import org.apache.skywalking.oap.server.core.source.*; import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder; import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*; import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.ALARM; import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.ALARM;
...@@ -36,7 +36,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.AL ...@@ -36,7 +36,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.AL
@Getter @Getter
@Setter @Setter
@ScopeDeclaration(id = ALARM, name = "Alarm") @ScopeDeclaration(id = ALARM, name = "Alarm")
@Stream(name = AlarmRecord.INDEX_NAME, scopeId = DefaultScopeDefine.ALARM, storage = @Storage(builder = AlarmRecord.Builder.class), processor = RecordStreamProcessor.class) @Stream(name = AlarmRecord.INDEX_NAME, scopeId = DefaultScopeDefine.ALARM, builder = AlarmRecord.Builder.class, processor = RecordStreamProcessor.class)
public class AlarmRecord extends Record { public class AlarmRecord extends Record {
public static final String INDEX_NAME = "alarm_record"; public static final String INDEX_NAME = "alarm_record";
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
package org.apache.skywalking.oap.server.core.analysis; package org.apache.skywalking.oap.server.core.analysis;
import java.lang.annotation.*; import java.lang.annotation.*;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage; import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
...@@ -32,7 +32,7 @@ public @interface Stream { ...@@ -32,7 +32,7 @@ public @interface Stream {
int scopeId(); int scopeId();
Storage storage(); Class<? extends StorageBuilder> builder();
Class<? extends StreamProcessor> processor(); Class<? extends StreamProcessor> processor();
} }
...@@ -25,14 +25,13 @@ import org.apache.skywalking.oap.server.core.analysis.topn.TopN; ...@@ -25,14 +25,13 @@ import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor; import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder; import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
/** /**
* Database TopN statement, including Database SQL statement, mongoDB and Redis commands. * Database TopN statement, including Database SQL statement, mongoDB and Redis commands.
* *
* @author wusheng * @author wusheng
*/ */
@Stream(name = TopNDatabaseStatement.INDEX_NAME, scopeId = DefaultScopeDefine.DATABASE_SLOW_STATEMENT, storage = @Storage(builder = TopNDatabaseStatement.Builder.class), processor = TopNStreamProcessor.class) @Stream(name = TopNDatabaseStatement.INDEX_NAME, scopeId = DefaultScopeDefine.DATABASE_SLOW_STATEMENT, builder = TopNDatabaseStatement.Builder.class, processor = TopNStreamProcessor.class)
public class TopNDatabaseStatement extends TopN { public class TopNDatabaseStatement extends TopN {
public static final String INDEX_NAME = "top_n_database_statement"; public static final String INDEX_NAME = "top_n_database_statement";
......
...@@ -29,7 +29,7 @@ import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; ...@@ -29,7 +29,7 @@ import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder; import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*; import org.apache.skywalking.oap.server.core.storage.annotation.*;
@Stream(name = EndpointRelationServerSideMetrics.INDEX_NAME, scopeId = DefaultScopeDefine.ENDPOINT_RELATION, storage = @Storage(builder = EndpointRelationServerSideMetrics.Builder.class), processor = MetricsStreamProcessor.class) @Stream(name = EndpointRelationServerSideMetrics.INDEX_NAME, scopeId = DefaultScopeDefine.ENDPOINT_RELATION, builder = EndpointRelationServerSideMetrics.Builder.class, processor = MetricsStreamProcessor.class)
public class EndpointRelationServerSideMetrics extends Metrics { public class EndpointRelationServerSideMetrics extends Metrics {
public static final String INDEX_NAME = "endpoint_relation_server_side"; public static final String INDEX_NAME = "endpoint_relation_server_side";
......
...@@ -22,13 +22,12 @@ import java.util.Map; ...@@ -22,13 +22,12 @@ import java.util.Map;
import org.apache.skywalking.oap.server.core.analysis.Stream; import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor; import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import static org.apache.skywalking.oap.server.core.analysis.manual.log.HTTPAccessLogRecord.INDEX_NAME; import static org.apache.skywalking.oap.server.core.analysis.manual.log.HTTPAccessLogRecord.INDEX_NAME;
@Stream(name = INDEX_NAME, scopeId = DefaultScopeDefine.HTTP_ACCESS_LOG, storage = @Storage(builder = HTTPAccessLogRecord.Builder.class), processor = RecordStreamProcessor.class) @Stream(name = INDEX_NAME, scopeId = DefaultScopeDefine.HTTP_ACCESS_LOG, builder = HTTPAccessLogRecord.Builder.class, processor = RecordStreamProcessor.class)
public class HTTPAccessLogRecord extends AbstractLogRecord { public class HTTPAccessLogRecord extends AbstractLogRecord {
public static final String INDEX_NAME = "http_access_log"; public static final String INDEX_NAME = "http_access_log";
public static class Builder extends AbstractLogRecord.Builder<HTTPAccessLogRecord> { public static class Builder extends AbstractLogRecord.Builder<HTTPAccessLogRecord> {
......
...@@ -27,13 +27,13 @@ import org.apache.skywalking.oap.server.core.analysis.record.Record; ...@@ -27,13 +27,13 @@ import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor; import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder; import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*; import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.library.util.CollectionUtils;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
@Stream(name = SegmentRecord.INDEX_NAME, scopeId = DefaultScopeDefine.SEGMENT, storage = @Storage(builder = SegmentRecord.Builder.class), processor = RecordStreamProcessor.class) @Stream(name = SegmentRecord.INDEX_NAME, scopeId = DefaultScopeDefine.SEGMENT, builder = SegmentRecord.Builder.class, processor = RecordStreamProcessor.class)
public class SegmentRecord extends Record { public class SegmentRecord extends Record {
public static final String INDEX_NAME = "segment"; public static final String INDEX_NAME = "segment";
......
...@@ -21,8 +21,8 @@ package org.apache.skywalking.oap.server.core.analysis.manual.servicerelation; ...@@ -21,8 +21,8 @@ package org.apache.skywalking.oap.server.core.analysis.manual.servicerelation;
import java.util.*; import java.util.*;
import lombok.*; import lombok.*;
import org.apache.skywalking.oap.server.core.Const; import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.manual.RelationDefineUtil;
import org.apache.skywalking.oap.server.core.analysis.Stream; import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.manual.RelationDefineUtil;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor; import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
...@@ -30,7 +30,7 @@ import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; ...@@ -30,7 +30,7 @@ import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder; import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*; import org.apache.skywalking.oap.server.core.storage.annotation.*;
@Stream(name = ServiceRelationClientSideMetrics.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_RELATION, storage = @Storage(builder = ServiceRelationClientSideMetrics.Builder.class), processor = MetricsStreamProcessor.class) @Stream(name = ServiceRelationClientSideMetrics.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_RELATION, builder = ServiceRelationClientSideMetrics.Builder.class, processor = MetricsStreamProcessor.class)
public class ServiceRelationClientSideMetrics extends Metrics { public class ServiceRelationClientSideMetrics extends Metrics {
public static final String INDEX_NAME = "service_relation_client_side"; public static final String INDEX_NAME = "service_relation_client_side";
......
...@@ -21,8 +21,8 @@ package org.apache.skywalking.oap.server.core.analysis.manual.servicerelation; ...@@ -21,8 +21,8 @@ package org.apache.skywalking.oap.server.core.analysis.manual.servicerelation;
import java.util.*; import java.util.*;
import lombok.*; import lombok.*;
import org.apache.skywalking.oap.server.core.Const; import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.manual.RelationDefineUtil;
import org.apache.skywalking.oap.server.core.analysis.Stream; import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.manual.RelationDefineUtil;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor; import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
...@@ -30,7 +30,7 @@ import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; ...@@ -30,7 +30,7 @@ import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder; import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*; import org.apache.skywalking.oap.server.core.storage.annotation.*;
@Stream(name = ServiceRelationServerSideMetrics.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_RELATION, storage = @Storage(builder = ServiceRelationServerSideMetrics.Builder.class), processor = MetricsStreamProcessor.class) @Stream(name = ServiceRelationServerSideMetrics.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_RELATION, builder = ServiceRelationServerSideMetrics.Builder.class, processor = MetricsStreamProcessor.class)
public class ServiceRelationServerSideMetrics extends Metrics { public class ServiceRelationServerSideMetrics extends Metrics {
public static final String INDEX_NAME = "service_relation_server_side"; public static final String INDEX_NAME = "service_relation_server_side";
......
...@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.analysis.*; ...@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.analysis.*;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService; import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
import org.apache.skywalking.oap.server.core.storage.*; import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.model.*; import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
...@@ -57,9 +58,9 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> { ...@@ -57,9 +58,9 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class); StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IMetricsDAO metricsDAO; IMetricsDAO metricsDAO;
try { try {
metricsDAO = storageDAO.newMetricsDao(stream.storage().builder().newInstance()); metricsDAO = storageDAO.newMetricsDao(stream.builder().newInstance());
} catch (InstantiationException | IllegalAccessException e) { } catch (InstantiationException | IllegalAccessException e) {
throw new UnexpectedException("Create " + stream.storage().builder().getSimpleName() + " metrics DAO failure.", e); throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " metrics DAO failure.", e);
} }
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class); IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
...@@ -70,19 +71,19 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> { ...@@ -70,19 +71,19 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
MetricsPersistentWorker monthPersistentWorker = null; MetricsPersistentWorker monthPersistentWorker = null;
if (configService.shouldToHour()) { if (configService.shouldToHour()) {
Model model = modelSetter.putIfAbsent(metricsClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Hour); Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Hour));
hourPersistentWorker = worker(moduleDefineHolder, metricsDAO, model); hourPersistentWorker = worker(moduleDefineHolder, metricsDAO, model);
} }
if (configService.shouldToDay()) { if (configService.shouldToDay()) {
Model model = modelSetter.putIfAbsent(metricsClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Day); Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Day));
dayPersistentWorker = worker(moduleDefineHolder, metricsDAO, model); dayPersistentWorker = worker(moduleDefineHolder, metricsDAO, model);
} }
if (configService.shouldToMonth()) { if (configService.shouldToMonth()) {
Model model = modelSetter.putIfAbsent(metricsClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Month); Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Month));
monthPersistentWorker = worker(moduleDefineHolder, metricsDAO, model); monthPersistentWorker = worker(moduleDefineHolder, metricsDAO, model);
} }
Model model = modelSetter.putIfAbsent(metricsClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Minute); Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Minute));
MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(moduleDefineHolder, metricsDAO, model); MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(moduleDefineHolder, metricsDAO, model);
MetricsTransWorker transWorker = new MetricsTransWorker(moduleDefineHolder, stream.name(), minutePersistentWorker, hourPersistentWorker, dayPersistentWorker, monthPersistentWorker); MetricsTransWorker transWorker = new MetricsTransWorker(moduleDefineHolder, stream.name(), minutePersistentWorker, hourPersistentWorker, dayPersistentWorker, monthPersistentWorker);
......
...@@ -24,6 +24,7 @@ import org.apache.skywalking.oap.server.core.*; ...@@ -24,6 +24,7 @@ import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.*; import org.apache.skywalking.oap.server.core.analysis.*;
import org.apache.skywalking.oap.server.core.analysis.record.Record; import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.*; import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.model.*; import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
...@@ -58,13 +59,13 @@ public class RecordStreamProcessor implements StreamProcessor<Record> { ...@@ -58,13 +59,13 @@ public class RecordStreamProcessor implements StreamProcessor<Record> {
StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class); StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IRecordDAO recordDAO; IRecordDAO recordDAO;
try { try {
recordDAO = storageDAO.newRecordDao(stream.storage().builder().newInstance()); recordDAO = storageDAO.newRecordDao(stream.builder().newInstance());
} catch (InstantiationException | IllegalAccessException e) { } catch (InstantiationException | IllegalAccessException e) {
throw new UnexpectedException("Create " + stream.storage().builder().getSimpleName() + " record DAO failure.", e); throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " record DAO failure.", e);
} }
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class); IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
Model model = modelSetter.putIfAbsent(recordClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Minute); Model model = modelSetter.putIfAbsent(recordClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Minute));
RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, 1000, recordDAO); RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, 1000, recordDAO);
persistentWorkers.add(persistentWorker); persistentWorkers.add(persistentWorker);
......
...@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.analysis.*; ...@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.analysis.*;
import org.apache.skywalking.oap.server.core.analysis.record.Record; import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN; import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.storage.*; import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.model.*; import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
...@@ -54,13 +55,13 @@ public class TopNStreamProcessor implements StreamProcessor<TopN> { ...@@ -54,13 +55,13 @@ public class TopNStreamProcessor implements StreamProcessor<TopN> {
StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class); StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IRecordDAO recordDAO; IRecordDAO recordDAO;
try { try {
recordDAO = storageDAO.newRecordDao(stream.storage().builder().newInstance()); recordDAO = storageDAO.newRecordDao(stream.builder().newInstance());
} catch (InstantiationException | IllegalAccessException e) { } catch (InstantiationException | IllegalAccessException e) {
throw new UnexpectedException("Create " + stream.storage().builder().getSimpleName() + " top n record DAO failure.", e); throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " top n record DAO failure.", e);
} }
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class); IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
Model model = modelSetter.putIfAbsent(topNClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Minute); Model model = modelSetter.putIfAbsent(topNClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Minute));
TopNWorker persistentWorker = new TopNWorker(moduleDefineHolder, model, 50, recordDAO); TopNWorker persistentWorker = new TopNWorker(moduleDefineHolder, model, 50, recordDAO);
persistentWorkers.add(persistentWorker); persistentWorkers.add(persistentWorker);
......
...@@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProc ...@@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProc
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.source.*; import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder; import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*; import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.ENDPOINT_INVENTORY; import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.ENDPOINT_INVENTORY;
...@@ -35,7 +35,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.EN ...@@ -35,7 +35,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.EN
* @author peng-yongsheng * @author peng-yongsheng
*/ */
@ScopeDeclaration(id = ENDPOINT_INVENTORY, name = "EndpointInventory") @ScopeDeclaration(id = ENDPOINT_INVENTORY, name = "EndpointInventory")
@Stream(name = EndpointInventory.INDEX_NAME, scopeId = DefaultScopeDefine.ENDPOINT_INVENTORY, storage = @Storage(builder = EndpointInventory.Builder.class, deleteHistory = false, capableOfTimeSeries = false), processor = InventoryStreamProcessor.class) @Stream(name = EndpointInventory.INDEX_NAME, scopeId = DefaultScopeDefine.ENDPOINT_INVENTORY, builder = EndpointInventory.Builder.class, processor = InventoryStreamProcessor.class)
public class EndpointInventory extends RegisterSource { public class EndpointInventory extends RegisterSource {
public static final String INDEX_NAME = "endpoint_inventory"; public static final String INDEX_NAME = "endpoint_inventory";
......
...@@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProc ...@@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProc
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.source.*; import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder; import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*; import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.NETWORK_ADDRESS; import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.NETWORK_ADDRESS;
...@@ -35,7 +35,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.NE ...@@ -35,7 +35,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.NE
* @author peng-yongsheng * @author peng-yongsheng
*/ */
@ScopeDeclaration(id = NETWORK_ADDRESS, name = "NetworkAddress") @ScopeDeclaration(id = NETWORK_ADDRESS, name = "NetworkAddress")
@Stream(name = NetworkAddressInventory.INDEX_NAME, scopeId = DefaultScopeDefine.NETWORK_ADDRESS, storage = @Storage(builder = NetworkAddressInventory.Builder.class, deleteHistory = false, capableOfTimeSeries = false), processor = InventoryStreamProcessor.class) @Stream(name = NetworkAddressInventory.INDEX_NAME, scopeId = DefaultScopeDefine.NETWORK_ADDRESS, builder = NetworkAddressInventory.Builder.class, processor = InventoryStreamProcessor.class)
public class NetworkAddressInventory extends RegisterSource { public class NetworkAddressInventory extends RegisterSource {
public static final String INDEX_NAME = "network_address_inventory"; public static final String INDEX_NAME = "network_address_inventory";
......
...@@ -28,7 +28,7 @@ import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProc ...@@ -28,7 +28,7 @@ import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProc
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.source.*; import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder; import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*; import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.library.util.BooleanUtils; import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
...@@ -38,7 +38,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SE ...@@ -38,7 +38,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SE
* @author peng-yongsheng * @author peng-yongsheng
*/ */
@ScopeDeclaration(id = SERVICE_INSTANCE_INVENTORY, name = "ServiceInstanceInventory") @ScopeDeclaration(id = SERVICE_INSTANCE_INVENTORY, name = "ServiceInstanceInventory")
@Stream(name = ServiceInstanceInventory.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_INSTANCE_INVENTORY, storage = @Storage(builder = ServiceInstanceInventory.Builder.class, deleteHistory = false, capableOfTimeSeries = false), processor = InventoryStreamProcessor.class) @Stream(name = ServiceInstanceInventory.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_INSTANCE_INVENTORY, builder = ServiceInstanceInventory.Builder.class, processor = InventoryStreamProcessor.class)
public class ServiceInstanceInventory extends RegisterSource { public class ServiceInstanceInventory extends RegisterSource {
public static final String INDEX_NAME = "service_instance_inventory"; public static final String INDEX_NAME = "service_instance_inventory";
......
...@@ -27,7 +27,7 @@ import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProc ...@@ -27,7 +27,7 @@ import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProc
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.source.*; import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder; import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*; import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.library.util.BooleanUtils; import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
...@@ -37,7 +37,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SE ...@@ -37,7 +37,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SE
* @author peng-yongsheng * @author peng-yongsheng
*/ */
@ScopeDeclaration(id = SERVICE_INVENTORY, name = "ServiceInventory") @ScopeDeclaration(id = SERVICE_INVENTORY, name = "ServiceInventory")
@Stream(name = ServiceInventory.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_INVENTORY, storage = @Storage(builder = ServiceInventory.Builder.class, deleteHistory = false, capableOfTimeSeries = false), processor = InventoryStreamProcessor.class) @Stream(name = ServiceInventory.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_INVENTORY, builder = ServiceInventory.Builder.class, processor = InventoryStreamProcessor.class)
public class ServiceInventory extends RegisterSource { public class ServiceInventory extends RegisterSource {
public static final String INDEX_NAME = "service_inventory"; public static final String INDEX_NAME = "service_inventory";
......
...@@ -23,6 +23,7 @@ import org.apache.skywalking.oap.server.core.*; ...@@ -23,6 +23,7 @@ import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.*; import org.apache.skywalking.oap.server.core.analysis.*;
import org.apache.skywalking.oap.server.core.register.RegisterSource; import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.storage.*; import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.model.*; import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
...@@ -48,13 +49,13 @@ public class InventoryStreamProcessor implements StreamProcessor<RegisterSource> ...@@ -48,13 +49,13 @@ public class InventoryStreamProcessor implements StreamProcessor<RegisterSource>
StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class); StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IRegisterDAO registerDAO; IRegisterDAO registerDAO;
try { try {
registerDAO = storageDAO.newRegisterDao(stream.storage().builder().newInstance()); registerDAO = storageDAO.newRegisterDao(stream.builder().newInstance());
} catch (InstantiationException | IllegalAccessException e) { } catch (InstantiationException | IllegalAccessException e) {
throw new UnexpectedException("Create " + stream.storage().builder().getSimpleName() + " register DAO failure.", e); throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " register DAO failure.", e);
} }
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class); IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
Model model = modelSetter.putIfAbsent(inventoryClass, stream.name(), stream.scopeId(), stream.storage()); Model model = modelSetter.putIfAbsent(inventoryClass, stream.scopeId(), new Storage(stream.name(), false, false, Downsampling.None));
RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(moduleDefineHolder, model.getName(), registerDAO, stream.scopeId()); RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(moduleDefineHolder, model.getName(), registerDAO, stream.scopeId());
......
...@@ -13,24 +13,28 @@ ...@@ -13,24 +13,28 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*
*/ */
package org.apache.skywalking.oap.server.core.storage.annotation; package org.apache.skywalking.oap.server.core.storage.annotation;
import java.lang.annotation.*; import lombok.Getter;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder; import org.apache.skywalking.oap.server.core.analysis.Downsampling;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
@Target(ElementType.TYPE) @Getter
@Retention(RetentionPolicy.RUNTIME) public class Storage {
public @interface Storage {
Class<? extends StorageBuilder> builder();
boolean deleteHistory() default true; private final String modelName;
private final boolean capableOfTimeSeries;
private final boolean deleteHistory;
private final Downsampling downsampling;
boolean capableOfTimeSeries() default true; public Storage(String modelName, boolean capableOfTimeSeries, boolean deleteHistory, Downsampling downsampling) {
this.modelName = modelName;
this.capableOfTimeSeries = capableOfTimeSeries;
this.deleteHistory = deleteHistory;
this.downsampling = downsampling;
}
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage.annotation;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
/**
* @author peng-yongsheng
*/
public class StorageEntityAnnotationUtils {
public static boolean getDeleteHistory(Class aClass) {
if (aClass.isAnnotationPresent(Storage.class)) {
Storage annotation = (Storage)aClass.getAnnotation(Storage.class);
return annotation.deleteHistory();
} else {
throw new UnexpectedException("Fail to get delete history tag from class " + aClass.getSimpleName());
}
}
public static Class<? extends StorageBuilder> getBuilder(Class aClass) {
if (aClass.isAnnotationPresent(Storage.class)) {
Storage annotation = (Storage)aClass.getAnnotation(Storage.class);
return annotation.builder();
} else {
throw new UnexpectedException("Fail to get entity builder from class " + aClass.getSimpleName());
}
}
}
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
package org.apache.skywalking.oap.server.core.storage.model; package org.apache.skywalking.oap.server.core.storage.model;
import org.apache.skywalking.oap.server.core.analysis.Downsampling;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage; import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.library.module.Service; import org.apache.skywalking.oap.server.library.module.Service;
...@@ -27,7 +26,5 @@ import org.apache.skywalking.oap.server.library.module.Service; ...@@ -27,7 +26,5 @@ import org.apache.skywalking.oap.server.library.module.Service;
*/ */
public interface IModelSetter extends Service { public interface IModelSetter extends Service {
Model putIfAbsent(Class aClass, String modelName, int scopeId, Storage storage); Model putIfAbsent(Class aClass, int scopeId, Storage storage);
Model putIfAbsent(Class aClass, String modelName, int scopeId, Storage storage, Downsampling downsampling);
} }
...@@ -20,7 +20,6 @@ package org.apache.skywalking.oap.server.core.storage.model; ...@@ -20,7 +20,6 @@ package org.apache.skywalking.oap.server.core.storage.model;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.*; import java.util.*;
import lombok.Getter; import lombok.Getter;
import org.apache.skywalking.oap.server.core.analysis.Downsampling;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.annotation.*; import org.apache.skywalking.oap.server.core.storage.annotation.*;
import org.slf4j.*; import org.slf4j.*;
...@@ -38,24 +37,20 @@ public class StorageModels implements IModelGetter, IModelSetter, IModelOverride ...@@ -38,24 +37,20 @@ public class StorageModels implements IModelGetter, IModelSetter, IModelOverride
this.models = new LinkedList<>(); this.models = new LinkedList<>();
} }
@Override public Model putIfAbsent(Class aClass, String modelName, int scopeId, Storage storage) { @Override public Model putIfAbsent(Class aClass, int scopeId, Storage storage) {
return putIfAbsent(aClass, modelName, scopeId, storage, Downsampling.None);
}
@Override public Model putIfAbsent(Class aClass, String modelName, int scopeId, Storage storage, Downsampling downsampling) {
// Check this scope id is valid. // Check this scope id is valid.
DefaultScopeDefine.nameOf(scopeId); DefaultScopeDefine.nameOf(scopeId);
for (Model model : models) { for (Model model : models) {
if (model.getName().equals(modelName)) { if (model.getName().equals(storage.getModelName())) {
return model; return model;
} }
} }
List<ModelColumn> modelColumns = new LinkedList<>(); List<ModelColumn> modelColumns = new LinkedList<>();
retrieval(aClass, modelName, modelColumns); retrieval(aClass, storage.getModelName(), modelColumns);
Model model = new Model(modelName, modelColumns, storage.capableOfTimeSeries(), storage.deleteHistory(), scopeId, downsampling); Model model = new Model(storage.getModelName(), modelColumns, storage.isCapableOfTimeSeries(), storage.isDeleteHistory(), scopeId, storage.getDownsampling());
models.add(model); models.add(model);
return model; return model;
......
...@@ -30,7 +30,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageBuilder; ...@@ -30,7 +30,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*; import org.apache.skywalking.oap.server.core.storage.annotation.*;
import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.library.util.CollectionUtils;
@Stream(name = JaegerSpanRecord.INDEX_NAME, scopeId = DefaultScopeDefine.JAEGER_SPAN, storage = @Storage(builder = JaegerSpanRecord.Builder.class), processor = RecordStreamProcessor.class) @Stream(name = JaegerSpanRecord.INDEX_NAME, scopeId = DefaultScopeDefine.JAEGER_SPAN, builder = JaegerSpanRecord.Builder.class, processor = RecordStreamProcessor.class)
public class JaegerSpanRecord extends Record { public class JaegerSpanRecord extends Record {
public static final String INDEX_NAME = "jaeger_span"; public static final String INDEX_NAME = "jaeger_span";
public static final String TRACE_ID = "trace_id"; public static final String TRACE_ID = "trace_id";
......
...@@ -30,7 +30,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageBuilder; ...@@ -30,7 +30,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*; import org.apache.skywalking.oap.server.core.storage.annotation.*;
import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.library.util.CollectionUtils;
@Stream(name = ZipkinSpanRecord.INDEX_NAME, scopeId = DefaultScopeDefine.ZIPKIN_SPAN, storage = @Storage(builder = ZipkinSpanRecord.Builder.class), processor = RecordStreamProcessor.class) @Stream(name = ZipkinSpanRecord.INDEX_NAME, scopeId = DefaultScopeDefine.ZIPKIN_SPAN, builder = ZipkinSpanRecord.Builder.class, processor = RecordStreamProcessor.class)
public class ZipkinSpanRecord extends Record { public class ZipkinSpanRecord extends Record {
public static final String INDEX_NAME = "zipkin_span"; public static final String INDEX_NAME = "zipkin_span";
public static final String TRACE_ID = "trace_id"; public static final String TRACE_ID = "trace_id";
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册