diff --git a/oap-server/generate-tool/src/main/resources/code-templates/MetricsImplementor.ftl b/oap-server/generate-tool/src/main/resources/code-templates/MetricsImplementor.ftl index c295f95466776c34993af1da2fae397ab4479e35..bd945282d13e2112ba0200b2909f189b4730f9e2 100644 --- a/oap-server/generate-tool/src/main/resources/code-templates/MetricsImplementor.ftl +++ b/oap-server/generate-tool/src/main/resources/code-templates/MetricsImplementor.ftl @@ -40,7 +40,7 @@ import org.apache.skywalking.oap.server.core.storage.annotation.*; * * @author Observability Analysis Language code generator */ -@Stream(name = "${tableName}", scopeId = ${sourceScopeId}, storage = @Storage(builder = ${metricsName}Metrics.Builder.class), processor = MetricsStreamProcessor.class) +@Stream(name = "${tableName}", scopeId = ${sourceScopeId}, builder = ${metricsName}Metrics.Builder.class, processor = MetricsStreamProcessor.class) public class ${metricsName}Metrics extends ${metricsClassName} implements WithMetadata { <#list fieldsFromSource as sourceField> diff --git a/oap-server/generate-tool/src/test/resources/expectedFiles/MetricsImplementorExpected.java b/oap-server/generate-tool/src/test/resources/expectedFiles/MetricsImplementorExpected.java index 0248137e17e76dee95adfa0a627e30fe42ff6779..e133fd5ac35e7a9d05cabf9ae14cb2fc97b61d7f 100644 --- a/oap-server/generate-tool/src/test/resources/expectedFiles/MetricsImplementorExpected.java +++ b/oap-server/generate-tool/src/test/resources/expectedFiles/MetricsImplementorExpected.java @@ -33,7 +33,7 @@ import org.apache.skywalking.oap.server.core.storage.annotation.*; * * @author Observability Analysis Language code generator */ -@Stream(name = "service_avg", scopeId = 1, storage = @Storage(builder = ServiceAvgMetrics.Builder.class), processor = MetricsStreamProcessor.class) +@Stream(name = "service_avg", scopeId = 1, builder = ServiceAvgMetrics.Builder.class, processor = MetricsStreamProcessor.class) public class ServiceAvgMetrics extends LongAvgMetrics implements WithMetadata { @Setter @Getter @Column(columnName = "entity_id") @IDColumn private java.lang.String entityId; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java index 0363c712730d7cd0152949fb3abfb60d389db7de..82cbd6afeca61c62e4f4412f067f344a0f152640 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java @@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.core.analysis.record.Record; import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor; import org.apache.skywalking.oap.server.core.source.*; import org.apache.skywalking.oap.server.core.storage.StorageBuilder; -import org.apache.skywalking.oap.server.core.storage.annotation.*; +import org.apache.skywalking.oap.server.core.storage.annotation.Column; import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.ALARM; @@ -36,7 +36,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.AL @Getter @Setter @ScopeDeclaration(id = ALARM, name = "Alarm") -@Stream(name = AlarmRecord.INDEX_NAME, scopeId = DefaultScopeDefine.ALARM, storage = @Storage(builder = AlarmRecord.Builder.class), processor = RecordStreamProcessor.class) +@Stream(name = AlarmRecord.INDEX_NAME, scopeId = DefaultScopeDefine.ALARM, builder = AlarmRecord.Builder.class, processor = RecordStreamProcessor.class) public class AlarmRecord extends Record { public static final String INDEX_NAME = "alarm_record"; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Stream.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Stream.java index 014c0cc0753702fa9d36c8a1fdf0e01addaae49a..b12f7fe0d1dda320f569a132b25f6f69251ccc43 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Stream.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Stream.java @@ -19,7 +19,7 @@ package org.apache.skywalking.oap.server.core.analysis; import java.lang.annotation.*; -import org.apache.skywalking.oap.server.core.storage.annotation.Storage; +import org.apache.skywalking.oap.server.core.storage.StorageBuilder; /** * @author peng-yongsheng @@ -32,7 +32,7 @@ public @interface Stream { int scopeId(); - Storage storage(); + Class builder(); Class processor(); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java index 5d295d9625628cf35b8fc925f70bcee8c9fbc316..8a385d0d3413b58fb3239217d2becfbef1dac850 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java @@ -25,14 +25,13 @@ import org.apache.skywalking.oap.server.core.analysis.topn.TopN; import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.core.storage.StorageBuilder; -import org.apache.skywalking.oap.server.core.storage.annotation.Storage; /** * Database TopN statement, including Database SQL statement, mongoDB and Redis commands. * * @author wusheng */ -@Stream(name = TopNDatabaseStatement.INDEX_NAME, scopeId = DefaultScopeDefine.DATABASE_SLOW_STATEMENT, storage = @Storage(builder = TopNDatabaseStatement.Builder.class), processor = TopNStreamProcessor.class) +@Stream(name = TopNDatabaseStatement.INDEX_NAME, scopeId = DefaultScopeDefine.DATABASE_SLOW_STATEMENT, builder = TopNDatabaseStatement.Builder.class, processor = TopNStreamProcessor.class) public class TopNDatabaseStatement extends TopN { public static final String INDEX_NAME = "top_n_database_statement"; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointRelationServerSideMetrics.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointRelationServerSideMetrics.java index d36660dc35bff7f8585180557a310bef35651d8e..08a346c35b94116f363bfac401b99dacb04925d0 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointRelationServerSideMetrics.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/endpointrelation/EndpointRelationServerSideMetrics.java @@ -29,7 +29,7 @@ import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.core.storage.StorageBuilder; import org.apache.skywalking.oap.server.core.storage.annotation.*; -@Stream(name = EndpointRelationServerSideMetrics.INDEX_NAME, scopeId = DefaultScopeDefine.ENDPOINT_RELATION, storage = @Storage(builder = EndpointRelationServerSideMetrics.Builder.class), processor = MetricsStreamProcessor.class) +@Stream(name = EndpointRelationServerSideMetrics.INDEX_NAME, scopeId = DefaultScopeDefine.ENDPOINT_RELATION, builder = EndpointRelationServerSideMetrics.Builder.class, processor = MetricsStreamProcessor.class) public class EndpointRelationServerSideMetrics extends Metrics { public static final String INDEX_NAME = "endpoint_relation_server_side"; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/log/HTTPAccessLogRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/log/HTTPAccessLogRecord.java index 089468228c5933d7bc67944f37706fa6428fe2d9..9242810886f325010c8d7943b7bd2e4b128e6ce0 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/log/HTTPAccessLogRecord.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/log/HTTPAccessLogRecord.java @@ -22,13 +22,12 @@ import java.util.Map; import org.apache.skywalking.oap.server.core.analysis.Stream; import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; -import org.apache.skywalking.oap.server.core.storage.annotation.Storage; import static org.apache.skywalking.oap.server.core.analysis.manual.log.HTTPAccessLogRecord.INDEX_NAME; -@Stream(name = INDEX_NAME, scopeId = DefaultScopeDefine.HTTP_ACCESS_LOG, storage = @Storage(builder = HTTPAccessLogRecord.Builder.class), processor = RecordStreamProcessor.class) +@Stream(name = INDEX_NAME, scopeId = DefaultScopeDefine.HTTP_ACCESS_LOG, builder = HTTPAccessLogRecord.Builder.class, processor = RecordStreamProcessor.class) public class HTTPAccessLogRecord extends AbstractLogRecord { - + public static final String INDEX_NAME = "http_access_log"; public static class Builder extends AbstractLogRecord.Builder { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java index c081059b3398d97ee6cd0c7e5f1d6d5c11e999f6..2b0b090dd51ded121457ddb60fb068bb9199f4c7 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java @@ -27,13 +27,13 @@ import org.apache.skywalking.oap.server.core.analysis.record.Record; import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.core.storage.StorageBuilder; -import org.apache.skywalking.oap.server.core.storage.annotation.*; +import org.apache.skywalking.oap.server.core.storage.annotation.Column; import org.apache.skywalking.oap.server.library.util.CollectionUtils; /** * @author peng-yongsheng */ -@Stream(name = SegmentRecord.INDEX_NAME, scopeId = DefaultScopeDefine.SEGMENT, storage = @Storage(builder = SegmentRecord.Builder.class), processor = RecordStreamProcessor.class) +@Stream(name = SegmentRecord.INDEX_NAME, scopeId = DefaultScopeDefine.SEGMENT, builder = SegmentRecord.Builder.class, processor = RecordStreamProcessor.class) public class SegmentRecord extends Record { public static final String INDEX_NAME = "segment"; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationClientSideMetrics.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationClientSideMetrics.java index fd7dbe4f2773b493a966ccb45e910e9823d183c9..6f34e42fc61db134dfd4f9d22536ff86a8e51f9e 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationClientSideMetrics.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationClientSideMetrics.java @@ -21,8 +21,8 @@ package org.apache.skywalking.oap.server.core.analysis.manual.servicerelation; import java.util.*; import lombok.*; import org.apache.skywalking.oap.server.core.Const; -import org.apache.skywalking.oap.server.core.analysis.manual.RelationDefineUtil; import org.apache.skywalking.oap.server.core.analysis.Stream; +import org.apache.skywalking.oap.server.core.analysis.manual.RelationDefineUtil; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor; import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; @@ -30,7 +30,7 @@ import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.core.storage.StorageBuilder; import org.apache.skywalking.oap.server.core.storage.annotation.*; -@Stream(name = ServiceRelationClientSideMetrics.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_RELATION, storage = @Storage(builder = ServiceRelationClientSideMetrics.Builder.class), processor = MetricsStreamProcessor.class) +@Stream(name = ServiceRelationClientSideMetrics.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_RELATION, builder = ServiceRelationClientSideMetrics.Builder.class, processor = MetricsStreamProcessor.class) public class ServiceRelationClientSideMetrics extends Metrics { public static final String INDEX_NAME = "service_relation_client_side"; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationServerSideMetrics.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationServerSideMetrics.java index 5249993c27196f7850841fa9527816c17d3916ac..12de7873e41ce69224411f644114e4df5fe2afca 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationServerSideMetrics.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/servicerelation/ServiceRelationServerSideMetrics.java @@ -21,8 +21,8 @@ package org.apache.skywalking.oap.server.core.analysis.manual.servicerelation; import java.util.*; import lombok.*; import org.apache.skywalking.oap.server.core.Const; -import org.apache.skywalking.oap.server.core.analysis.manual.RelationDefineUtil; import org.apache.skywalking.oap.server.core.analysis.Stream; +import org.apache.skywalking.oap.server.core.analysis.manual.RelationDefineUtil; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor; import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; @@ -30,7 +30,7 @@ import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.core.storage.StorageBuilder; import org.apache.skywalking.oap.server.core.storage.annotation.*; -@Stream(name = ServiceRelationServerSideMetrics.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_RELATION, storage = @Storage(builder = ServiceRelationServerSideMetrics.Builder.class), processor = MetricsStreamProcessor.class) +@Stream(name = ServiceRelationServerSideMetrics.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_RELATION, builder = ServiceRelationServerSideMetrics.Builder.class, processor = MetricsStreamProcessor.class) public class ServiceRelationServerSideMetrics extends Metrics { public static final String INDEX_NAME = "service_relation_server_side"; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java index 9f9499d355f35ae238e94f6992f8f4069796c963..e2505068e7746ac20fc57bab77921f1050819dee 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java @@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.analysis.*; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService; import org.apache.skywalking.oap.server.core.storage.*; +import org.apache.skywalking.oap.server.core.storage.annotation.Storage; import org.apache.skywalking.oap.server.core.storage.model.*; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; @@ -57,9 +58,9 @@ public class MetricsStreamProcessor implements StreamProcessor { StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class); IMetricsDAO metricsDAO; try { - metricsDAO = storageDAO.newMetricsDao(stream.storage().builder().newInstance()); + metricsDAO = storageDAO.newMetricsDao(stream.builder().newInstance()); } catch (InstantiationException | IllegalAccessException e) { - throw new UnexpectedException("Create " + stream.storage().builder().getSimpleName() + " metrics DAO failure.", e); + throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " metrics DAO failure.", e); } IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class); @@ -70,19 +71,19 @@ public class MetricsStreamProcessor implements StreamProcessor { MetricsPersistentWorker monthPersistentWorker = null; if (configService.shouldToHour()) { - Model model = modelSetter.putIfAbsent(metricsClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Hour); + Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Hour)); hourPersistentWorker = worker(moduleDefineHolder, metricsDAO, model); } if (configService.shouldToDay()) { - Model model = modelSetter.putIfAbsent(metricsClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Day); + Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Day)); dayPersistentWorker = worker(moduleDefineHolder, metricsDAO, model); } if (configService.shouldToMonth()) { - Model model = modelSetter.putIfAbsent(metricsClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Month); + Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Month)); monthPersistentWorker = worker(moduleDefineHolder, metricsDAO, model); } - Model model = modelSetter.putIfAbsent(metricsClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Minute); + Model model = modelSetter.putIfAbsent(metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Minute)); MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(moduleDefineHolder, metricsDAO, model); MetricsTransWorker transWorker = new MetricsTransWorker(moduleDefineHolder, stream.name(), minutePersistentWorker, hourPersistentWorker, dayPersistentWorker, monthPersistentWorker); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java index e460b5a6c900f43d1ae8bc7d51cdbbea89e6c0ad..04e772a5c12c83c52f0ba6cdf1067e0ab87752a5 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java @@ -24,6 +24,7 @@ import org.apache.skywalking.oap.server.core.*; import org.apache.skywalking.oap.server.core.analysis.*; import org.apache.skywalking.oap.server.core.analysis.record.Record; import org.apache.skywalking.oap.server.core.storage.*; +import org.apache.skywalking.oap.server.core.storage.annotation.Storage; import org.apache.skywalking.oap.server.core.storage.model.*; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; @@ -58,13 +59,13 @@ public class RecordStreamProcessor implements StreamProcessor { StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class); IRecordDAO recordDAO; try { - recordDAO = storageDAO.newRecordDao(stream.storage().builder().newInstance()); + recordDAO = storageDAO.newRecordDao(stream.builder().newInstance()); } catch (InstantiationException | IllegalAccessException e) { - throw new UnexpectedException("Create " + stream.storage().builder().getSimpleName() + " record DAO failure.", e); + throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " record DAO failure.", e); } IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class); - Model model = modelSetter.putIfAbsent(recordClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Minute); + Model model = modelSetter.putIfAbsent(recordClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Minute)); RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, 1000, recordDAO); persistentWorkers.add(persistentWorker); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java index e3c648c6692b8b77d339a3f3276a1185c4222f88..92a88a495150a169d77cec96c74a6490ad421ede 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java @@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.analysis.*; import org.apache.skywalking.oap.server.core.analysis.record.Record; import org.apache.skywalking.oap.server.core.analysis.topn.TopN; import org.apache.skywalking.oap.server.core.storage.*; +import org.apache.skywalking.oap.server.core.storage.annotation.Storage; import org.apache.skywalking.oap.server.core.storage.model.*; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; @@ -54,13 +55,13 @@ public class TopNStreamProcessor implements StreamProcessor { StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class); IRecordDAO recordDAO; try { - recordDAO = storageDAO.newRecordDao(stream.storage().builder().newInstance()); + recordDAO = storageDAO.newRecordDao(stream.builder().newInstance()); } catch (InstantiationException | IllegalAccessException e) { - throw new UnexpectedException("Create " + stream.storage().builder().getSimpleName() + " top n record DAO failure.", e); + throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " top n record DAO failure.", e); } IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class); - Model model = modelSetter.putIfAbsent(topNClass, stream.name(), stream.scopeId(), stream.storage(), Downsampling.Minute); + Model model = modelSetter.putIfAbsent(topNClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Minute)); TopNWorker persistentWorker = new TopNWorker(moduleDefineHolder, model, 50, recordDAO); persistentWorkers.add(persistentWorker); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/EndpointInventory.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/EndpointInventory.java index eeeeb686a4431880bb4e701cc0b33b139cdbcbac..8e906bcd6bfae7185644ee7b0c84c4477439344b 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/EndpointInventory.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/EndpointInventory.java @@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProc import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; import org.apache.skywalking.oap.server.core.source.*; import org.apache.skywalking.oap.server.core.storage.StorageBuilder; -import org.apache.skywalking.oap.server.core.storage.annotation.*; +import org.apache.skywalking.oap.server.core.storage.annotation.Column; import org.elasticsearch.common.Strings; import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.ENDPOINT_INVENTORY; @@ -35,7 +35,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.EN * @author peng-yongsheng */ @ScopeDeclaration(id = ENDPOINT_INVENTORY, name = "EndpointInventory") -@Stream(name = EndpointInventory.INDEX_NAME, scopeId = DefaultScopeDefine.ENDPOINT_INVENTORY, storage = @Storage(builder = EndpointInventory.Builder.class, deleteHistory = false, capableOfTimeSeries = false), processor = InventoryStreamProcessor.class) +@Stream(name = EndpointInventory.INDEX_NAME, scopeId = DefaultScopeDefine.ENDPOINT_INVENTORY, builder = EndpointInventory.Builder.class, processor = InventoryStreamProcessor.class) public class EndpointInventory extends RegisterSource { public static final String INDEX_NAME = "endpoint_inventory"; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/NetworkAddressInventory.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/NetworkAddressInventory.java index 0eb5bc4a1257dbb52def7ce57c382dbdc5a049fd..d0f09faf3fee60d94736edd55a0567e80a7c92ec 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/NetworkAddressInventory.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/NetworkAddressInventory.java @@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProc import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; import org.apache.skywalking.oap.server.core.source.*; import org.apache.skywalking.oap.server.core.storage.StorageBuilder; -import org.apache.skywalking.oap.server.core.storage.annotation.*; +import org.apache.skywalking.oap.server.core.storage.annotation.Column; import org.elasticsearch.common.Strings; import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.NETWORK_ADDRESS; @@ -35,7 +35,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.NE * @author peng-yongsheng */ @ScopeDeclaration(id = NETWORK_ADDRESS, name = "NetworkAddress") -@Stream(name = NetworkAddressInventory.INDEX_NAME, scopeId = DefaultScopeDefine.NETWORK_ADDRESS, storage = @Storage(builder = NetworkAddressInventory.Builder.class, deleteHistory = false, capableOfTimeSeries = false), processor = InventoryStreamProcessor.class) +@Stream(name = NetworkAddressInventory.INDEX_NAME, scopeId = DefaultScopeDefine.NETWORK_ADDRESS, builder = NetworkAddressInventory.Builder.class, processor = InventoryStreamProcessor.class) public class NetworkAddressInventory extends RegisterSource { public static final String INDEX_NAME = "network_address_inventory"; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInstanceInventory.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInstanceInventory.java index 895e2a813f5c14132a041d98c584b1697c5a9403..28cd596a7814be4e02c45ce22eb9380d2aab3f79 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInstanceInventory.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInstanceInventory.java @@ -28,7 +28,7 @@ import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProc import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; import org.apache.skywalking.oap.server.core.source.*; import org.apache.skywalking.oap.server.core.storage.StorageBuilder; -import org.apache.skywalking.oap.server.core.storage.annotation.*; +import org.apache.skywalking.oap.server.core.storage.annotation.Column; import org.apache.skywalking.oap.server.library.util.BooleanUtils; import org.elasticsearch.common.Strings; @@ -38,7 +38,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SE * @author peng-yongsheng */ @ScopeDeclaration(id = SERVICE_INSTANCE_INVENTORY, name = "ServiceInstanceInventory") -@Stream(name = ServiceInstanceInventory.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_INSTANCE_INVENTORY, storage = @Storage(builder = ServiceInstanceInventory.Builder.class, deleteHistory = false, capableOfTimeSeries = false), processor = InventoryStreamProcessor.class) +@Stream(name = ServiceInstanceInventory.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_INSTANCE_INVENTORY, builder = ServiceInstanceInventory.Builder.class, processor = InventoryStreamProcessor.class) public class ServiceInstanceInventory extends RegisterSource { public static final String INDEX_NAME = "service_instance_inventory"; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInventory.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInventory.java index cec189d0d57425783b0666cdd402e9ddd4eb67f2..380d479c9c5bd7e7584d85ea5965513859940504 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInventory.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInventory.java @@ -27,7 +27,7 @@ import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProc import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; import org.apache.skywalking.oap.server.core.source.*; import org.apache.skywalking.oap.server.core.storage.StorageBuilder; -import org.apache.skywalking.oap.server.core.storage.annotation.*; +import org.apache.skywalking.oap.server.core.storage.annotation.Column; import org.apache.skywalking.oap.server.library.util.BooleanUtils; import org.elasticsearch.common.Strings; @@ -37,7 +37,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SE * @author peng-yongsheng */ @ScopeDeclaration(id = SERVICE_INVENTORY, name = "ServiceInventory") -@Stream(name = ServiceInventory.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_INVENTORY, storage = @Storage(builder = ServiceInventory.Builder.class, deleteHistory = false, capableOfTimeSeries = false), processor = InventoryStreamProcessor.class) +@Stream(name = ServiceInventory.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_INVENTORY, builder = ServiceInventory.Builder.class, processor = InventoryStreamProcessor.class) public class ServiceInventory extends RegisterSource { public static final String INDEX_NAME = "service_inventory"; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java index 2cc8b7e0d68e216661c4ceb643bff8fbe5bfd6a1..0e4b3dd768935ec2a5a6ff39e126e74840c4e286 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/InventoryStreamProcessor.java @@ -23,6 +23,7 @@ import org.apache.skywalking.oap.server.core.*; import org.apache.skywalking.oap.server.core.analysis.*; import org.apache.skywalking.oap.server.core.register.RegisterSource; import org.apache.skywalking.oap.server.core.storage.*; +import org.apache.skywalking.oap.server.core.storage.annotation.Storage; import org.apache.skywalking.oap.server.core.storage.model.*; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; @@ -48,13 +49,13 @@ public class InventoryStreamProcessor implements StreamProcessor StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class); IRegisterDAO registerDAO; try { - registerDAO = storageDAO.newRegisterDao(stream.storage().builder().newInstance()); + registerDAO = storageDAO.newRegisterDao(stream.builder().newInstance()); } catch (InstantiationException | IllegalAccessException e) { - throw new UnexpectedException("Create " + stream.storage().builder().getSimpleName() + " register DAO failure.", e); + throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " register DAO failure.", e); } IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class); - Model model = modelSetter.putIfAbsent(inventoryClass, stream.name(), stream.scopeId(), stream.storage()); + Model model = modelSetter.putIfAbsent(inventoryClass, stream.scopeId(), new Storage(stream.name(), false, false, Downsampling.None)); RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(moduleDefineHolder, model.getName(), registerDAO, stream.scopeId()); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Storage.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Storage.java index 6a8073ca67b9c071b4566546771339cb1fc55bb4..9fc8773457b1cfcaed8a133c8b77067938403df0 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Storage.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Storage.java @@ -13,24 +13,28 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * */ package org.apache.skywalking.oap.server.core.storage.annotation; -import java.lang.annotation.*; -import org.apache.skywalking.oap.server.core.storage.StorageBuilder; +import lombok.Getter; +import org.apache.skywalking.oap.server.core.analysis.Downsampling; /** * @author peng-yongsheng */ -@Target(ElementType.TYPE) -@Retention(RetentionPolicy.RUNTIME) -public @interface Storage { - - Class builder(); +@Getter +public class Storage { - boolean deleteHistory() default true; + private final String modelName; + private final boolean capableOfTimeSeries; + private final boolean deleteHistory; + private final Downsampling downsampling; - boolean capableOfTimeSeries() default true; + public Storage(String modelName, boolean capableOfTimeSeries, boolean deleteHistory, Downsampling downsampling) { + this.modelName = modelName; + this.capableOfTimeSeries = capableOfTimeSeries; + this.deleteHistory = deleteHistory; + this.downsampling = downsampling; + } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageEntityAnnotationUtils.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageEntityAnnotationUtils.java deleted file mode 100644 index d408a2ea498301acdaec21356cec1ec7dbd5b057..0000000000000000000000000000000000000000 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/StorageEntityAnnotationUtils.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.oap.server.core.storage.annotation; - -import org.apache.skywalking.oap.server.core.UnexpectedException; -import org.apache.skywalking.oap.server.core.storage.StorageBuilder; - -/** - * @author peng-yongsheng - */ -public class StorageEntityAnnotationUtils { - - public static boolean getDeleteHistory(Class aClass) { - if (aClass.isAnnotationPresent(Storage.class)) { - Storage annotation = (Storage)aClass.getAnnotation(Storage.class); - return annotation.deleteHistory(); - } else { - throw new UnexpectedException("Fail to get delete history tag from class " + aClass.getSimpleName()); - } - } - - public static Class getBuilder(Class aClass) { - if (aClass.isAnnotationPresent(Storage.class)) { - Storage annotation = (Storage)aClass.getAnnotation(Storage.class); - return annotation.builder(); - } else { - throw new UnexpectedException("Fail to get entity builder from class " + aClass.getSimpleName()); - } - } -} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java index 5e55d73c106846ef505eb83d82467253bca4b42a..a4712124ec10729a6b628ecbc928d0feaeb57f59 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/IModelSetter.java @@ -18,7 +18,6 @@ package org.apache.skywalking.oap.server.core.storage.model; -import org.apache.skywalking.oap.server.core.analysis.Downsampling; import org.apache.skywalking.oap.server.core.storage.annotation.Storage; import org.apache.skywalking.oap.server.library.module.Service; @@ -27,7 +26,5 @@ import org.apache.skywalking.oap.server.library.module.Service; */ public interface IModelSetter extends Service { - Model putIfAbsent(Class aClass, String modelName, int scopeId, Storage storage); - - Model putIfAbsent(Class aClass, String modelName, int scopeId, Storage storage, Downsampling downsampling); + Model putIfAbsent(Class aClass, int scopeId, Storage storage); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java index bfba7d9ad22a981dc51bf370331627000ff3dc92..6e9f572c6f414f9f02acad1f40b04c5f1649025a 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java @@ -20,7 +20,6 @@ package org.apache.skywalking.oap.server.core.storage.model; import java.lang.reflect.Field; import java.util.*; import lombok.Getter; -import org.apache.skywalking.oap.server.core.analysis.Downsampling; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.core.storage.annotation.*; import org.slf4j.*; @@ -38,24 +37,20 @@ public class StorageModels implements IModelGetter, IModelSetter, IModelOverride this.models = new LinkedList<>(); } - @Override public Model putIfAbsent(Class aClass, String modelName, int scopeId, Storage storage) { - return putIfAbsent(aClass, modelName, scopeId, storage, Downsampling.None); - } - - @Override public Model putIfAbsent(Class aClass, String modelName, int scopeId, Storage storage, Downsampling downsampling) { + @Override public Model putIfAbsent(Class aClass, int scopeId, Storage storage) { // Check this scope id is valid. DefaultScopeDefine.nameOf(scopeId); for (Model model : models) { - if (model.getName().equals(modelName)) { + if (model.getName().equals(storage.getModelName())) { return model; } } List modelColumns = new LinkedList<>(); - retrieval(aClass, modelName, modelColumns); + retrieval(aClass, storage.getModelName(), modelColumns); - Model model = new Model(modelName, modelColumns, storage.capableOfTimeSeries(), storage.deleteHistory(), scopeId, downsampling); + Model model = new Model(storage.getModelName(), modelColumns, storage.isCapableOfTimeSeries(), storage.isDeleteHistory(), scopeId, storage.getDownsampling()); models.add(model); return model; diff --git a/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/JaegerSpanRecord.java b/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/JaegerSpanRecord.java index 5d191790103f7b7e1f72587c8328ca7a1d326780..0544227bd02ae443a528889cfa8b501112559356 100644 --- a/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/JaegerSpanRecord.java +++ b/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/JaegerSpanRecord.java @@ -30,7 +30,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageBuilder; import org.apache.skywalking.oap.server.core.storage.annotation.*; import org.apache.skywalking.oap.server.library.util.CollectionUtils; -@Stream(name = JaegerSpanRecord.INDEX_NAME, scopeId = DefaultScopeDefine.JAEGER_SPAN, storage = @Storage(builder = JaegerSpanRecord.Builder.class), processor = RecordStreamProcessor.class) +@Stream(name = JaegerSpanRecord.INDEX_NAME, scopeId = DefaultScopeDefine.JAEGER_SPAN, builder = JaegerSpanRecord.Builder.class, processor = RecordStreamProcessor.class) public class JaegerSpanRecord extends Record { public static final String INDEX_NAME = "jaeger_span"; public static final String TRACE_ID = "trace_id"; diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java index e18341829677e2a36646bc4a278c12a49da9f2af..b1333af390c85d44eb279eed505819765f72ea03 100644 --- a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java +++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java @@ -30,7 +30,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageBuilder; import org.apache.skywalking.oap.server.core.storage.annotation.*; import org.apache.skywalking.oap.server.library.util.CollectionUtils; -@Stream(name = ZipkinSpanRecord.INDEX_NAME, scopeId = DefaultScopeDefine.ZIPKIN_SPAN, storage = @Storage(builder = ZipkinSpanRecord.Builder.class), processor = RecordStreamProcessor.class) +@Stream(name = ZipkinSpanRecord.INDEX_NAME, scopeId = DefaultScopeDefine.ZIPKIN_SPAN, builder = ZipkinSpanRecord.Builder.class, processor = RecordStreamProcessor.class) public class ZipkinSpanRecord extends Record { public static final String INDEX_NAME = "zipkin_span"; public static final String TRACE_ID = "trace_id";