diff --git a/oap-server/server-bootstrap/src/main/resources/official_analysis.oal b/oap-server/server-bootstrap/src/main/resources/official_analysis.oal index ad66bac78abb2168dbbf6fe943c9835b304794c6..afee68bb90f7e165ff7cfc388caffae21de7edc3 100755 --- a/oap-server/server-bootstrap/src/main/resources/official_analysis.oal +++ b/oap-server/server-bootstrap/src/main/resources/official_analysis.oal @@ -95,7 +95,7 @@ envoy_heap_memory_max_used = from(EnvoyInstanceMetric.value).filter(metricName = envoy_total_connections_used = from(EnvoyInstanceMetric.value).filter(metricName == "server.total_connections").maxDouble(); envoy_parent_connections_used = from(EnvoyInstanceMetric.value).filter(metricName == "server.parent_connections").maxDouble(); -// Disable unnecessary hard core sources +// Disable unnecessary hard core stream, targeting @Stream#name ///////// // disable(segment); // disable(endpoint_relation_server_side); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java index 4236a1ffa6120cec89548dc6699681a7cd58609c..907da941f42785394c4c707d4fb8dc6efc7924e3 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java @@ -20,6 +20,14 @@ package org.apache.skywalking.oap.server.core.analysis.data; import java.util.concurrent.atomic.AtomicInteger; +/** + * Data cache window. Window holds two data collections(A and B). They are switchable based on outside command. In any + * time, one collection is accepting the input data, and the other is immutable. + * + * This window makes sure there is not concurrency read-write situation. + * + * @param type in the Window and internal collection. + */ public abstract class Window { private AtomicInteger windowSwitch = new AtomicInteger(0); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java index 065a98c05b37a570eb08b0a4e7ea738f685317cc..12dbdb31e451c24fbbaa63dcdd1a87e4aa7ccac6 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/Metrics.java @@ -26,31 +26,71 @@ import org.apache.skywalking.oap.server.core.storage.annotation.Column; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; +/** + * Metrics represents the statistic data, which analysis by OAL script or hard code. It has the lifecycle controlled by + * TTL(time to live). + */ public abstract class Metrics extends StreamData implements StorageData { public static final String TIME_BUCKET = "time_bucket"; public static final String ENTITY_ID = "entity_id"; + /** + * Time attribute + */ @Getter @Setter @Column(columnName = TIME_BUCKET) private long timeBucket; + + /** + * Time in the cache, only work when MetricsPersistentWorker#enableDatabaseSession == true. + */ @Getter - @Setter private long survivalTime = 0L; - public abstract String id(); - + /** + * Merge the given metrics instance, these two must be the same metrics type. + * + * @param metrics to be merged + */ public abstract void combine(Metrics metrics); + /** + * Calculate the metrics final value when required. + */ public abstract void calculate(); + /** + * Downsampling the metrics to hour precision. + * + * @return the metrics in hour precision in the clone mode. + */ public abstract Metrics toHour(); + /** + * Downsampling the metrics to day precision. + * + * @return the metrics in day precision in the clone mode. + */ public abstract Metrics toDay(); + /** + * Downsampling the metrics to month precision. + * + * @return the metrics in month precision in the clone mode. + */ public abstract Metrics toMonth(); + /** + * Extend the {@link #survivalTime} + * + * @param value to extend + */ + public void extendSurvivalTime(long value) { + survivalTime += value; + } + public long toTimeBucketInHour() { if (isMinuteBucket()) { return timeBucket / 100; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/Record.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/Record.java index b75e2aca7723095f4418ff836858a7e6e2205ff9..4cf97d20742c23fc45a371b4e967aa01c6c0ae55 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/Record.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/Record.java @@ -20,13 +20,21 @@ package org.apache.skywalking.oap.server.core.analysis.record; import lombok.Getter; import lombok.Setter; +import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.storage.StorageData; import org.apache.skywalking.oap.server.core.storage.annotation.Column; +/** + * Record storage represents the entity have fully and manually entity definition by hard codes. Most of then are + * original log data or task records. These data needs to persistent without further analysis. + */ public abstract class Record implements StorageData { public static final String TIME_BUCKET = "time_bucket"; + /** + * Time attribute, all storage data is time sensitive, as same as {@link Metrics} + */ @Getter @Setter @Column(columnName = TIME_BUCKET) diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java index 259695e39813dc83337de0ea05ad63ac668840fc..263cac47c9ca466fe51fa522b189f1d58188c889 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java @@ -57,8 +57,8 @@ public class MetricsPersistentWorker extends PersistenceWorker nextAlarmWorker, AbstractWorker nextExportWorker, - MetricsTransWorker transWorker, boolean enableDatabaseSession) { + AbstractWorker nextAlarmWorker, AbstractWorker nextExportWorker, + MetricsTransWorker transWorker, boolean enableDatabaseSession) { super(moduleDefineHolder); this.model = model; this.databaseSession = new HashMap<>(100); @@ -152,7 +152,10 @@ public class MetricsPersistentWorker extends PersistenceWorker 0) { - logger.debug("prepare batch requests for model {}, took time: {}", model.getName(), System.currentTimeMillis() - start); + logger.debug( + "prepare batch requests for model {}, took time: {}", model.getName(), + System.currentTimeMillis() - start + ); } } @@ -207,7 +210,8 @@ public class MetricsPersistentWorker extends PersistenceWorker iterator = databaseSession.values().iterator(); while (iterator.hasNext()) { Metrics metrics = iterator.next(); - metrics.setSurvivalTime(tookTime + metrics.getSurvivalTime()); + metrics.extendSurvivalTime(tookTime); + // 70,000ms means more than one minute. if (metrics.getSurvivalTime() > 70000) { iterator.remove(); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java index c46a85789322e0ba1d616d2ecee72a8c9842d327..156618da2620600d8cbe1d9be3a025654cd0a926 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java @@ -29,6 +29,13 @@ import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * PersistenceWorker take the responsibility to pushing data to the final storage. The target storage is based on the + * activate storage implementation. This worker controls the persistence flow. + * + * @param The type of worker input. All inputs will be merged and saved. + * @param Cache type to hold all input. + */ public abstract class PersistenceWorker> extends AbstractWorker { private static final Logger logger = LoggerFactory.getLogger(PersistenceWorker.class); @@ -37,16 +44,34 @@ public abstract class PersistenceWorker lastCollection, List prepareRequests); public final void buildBatchRequests(List prepareRequests) { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java index 7b31d119655b3b67a0fb973f93d183b3e9938339..ea3947e090cb2d1d7104f9f6490a38ef4ca8dd73 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java @@ -24,6 +24,9 @@ import org.apache.skywalking.oap.server.core.remote.data.StreamData; import org.apache.skywalking.oap.server.core.storage.StorageData; import org.apache.skywalking.oap.server.core.storage.annotation.Column; +/** + * RegisterSource represents the metadata entity. + */ public abstract class RegisterSource extends StreamData implements StorageData { public static final String SEQUENCE = "sequence"; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java index 214f6603fbce4ffca58f11acd182b07c3e6ac71f..cab4cc6dbbe39297b7369b3c27054f6f488c9ab5 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java @@ -78,6 +78,9 @@ public class DefaultScopeDefine { private static final Map SERVICE_INSTANCE_CATALOG = new HashMap<>(); private static final Map ENDPOINT_CATALOG = new HashMap<>(); + /** + * Annotation scan listener + */ public static class Listener implements AnnotationListener { @Override public Class annotation() { @@ -93,16 +96,24 @@ public class DefaultScopeDefine { } } - public static final void addNewScope(ScopeDeclaration declaration, Class originalClass) { + /** + * Add a new scope based on the scan result + * + * @param declaration includes the definition. + * @param originalClass represents the class having the {@link ScopeDeclaration} annotation + */ + private static final void addNewScope(ScopeDeclaration declaration, Class originalClass) { int id = declaration.id(); if (ID_2_NAME.containsKey(id)) { - throw new UnexpectedException("ScopeDeclaration id=" + id + " at " + originalClass.getName() + " has conflict with another named " + ID_2_NAME - .get(id)); + throw new UnexpectedException( + "ScopeDeclaration id=" + id + " at " + originalClass.getName() + " has conflict with another named " + ID_2_NAME + .get(id)); } String name = declaration.name(); if (NAME_2_ID.containsKey(name)) { - throw new UnexpectedException("ScopeDeclaration fieldName=" + name + " at " + originalClass.getName() + " has conflict with another id= " + NAME_2_ID - .get(name)); + throw new UnexpectedException( + "ScopeDeclaration fieldName=" + name + " at " + originalClass.getName() + " has conflict with another id= " + NAME_2_ID + .get(name)); } ID_2_NAME.put(id, name); NAME_2_ID.put(name, id); @@ -112,16 +123,21 @@ public class DefaultScopeDefine { ScopeDefaultColumn.VirtualColumnDefinition virtualColumn = (ScopeDefaultColumn.VirtualColumnDefinition) originalClass .getAnnotation(ScopeDefaultColumn.VirtualColumnDefinition.class); if (virtualColumn != null) { - scopeDefaultColumns.add(new ScopeDefaultColumn(virtualColumn.fieldName(), virtualColumn.columnName(), virtualColumn - .type(), virtualColumn.isID())); + scopeDefaultColumns.add( + new ScopeDefaultColumn(virtualColumn.fieldName(), virtualColumn.columnName(), virtualColumn + .type(), virtualColumn.isID())); } Field[] scopeClassField = originalClass.getDeclaredFields(); if (scopeClassField != null) { for (Field field : scopeClassField) { - ScopeDefaultColumn.DefinedByField definedByField = field.getAnnotation(ScopeDefaultColumn.DefinedByField.class); + ScopeDefaultColumn.DefinedByField definedByField = field.getAnnotation( + ScopeDefaultColumn.DefinedByField.class); if (definedByField != null) { - scopeDefaultColumns.add(new ScopeDefaultColumn(field.getName(), definedByField.columnName(), field.getType(), definedByField - .isID())); + scopeDefaultColumns.add( + new ScopeDefaultColumn(field.getName(), definedByField.columnName(), field.getType(), + definedByField + .isID() + )); } } } @@ -142,6 +158,12 @@ public class DefaultScopeDefine { } } + /** + * Fetch the name from given id + * + * @param id represents an existing scope id. + * @return scope name. + */ public static String nameOf(int id) { String name = ID_2_NAME.get(id); if (name == null) { @@ -150,6 +172,12 @@ public class DefaultScopeDefine { return name; } + /** + * Fetch the id of given name + * + * @param name represents an existing scope name + * @return scope id + */ public static int valueOf(String name) { Integer id = NAME_2_ID.get(name); if (id == null) { @@ -158,20 +186,41 @@ public class DefaultScopeDefine { return id; } + /** + * Reset all existing scope definitions. For test only. + */ public static void reset() { NAME_2_ID.clear(); ID_2_NAME.clear(); SCOPE_COLUMNS.clear(); } + /** + * Check whether current service belongs service catalog + * + * @param scopeId represents an existing scope id. + * @return true is current scope set {@link ScopeDeclaration#catalog()} == {@link #SERVICE_CATALOG_NAME} + */ public static boolean inServiceCatalog(int scopeId) { return SERVICE_CATALOG.containsKey(scopeId); } + /** + * Check whether current service belongs service instance catalog + * + * @param scopeId represents an existing scope id. + * @return true is current scope set {@link ScopeDeclaration#catalog()} == {@link #SERVICE_INSTANCE_CATALOG_NAME} + */ public static boolean inServiceInstanceCatalog(int scopeId) { return SERVICE_INSTANCE_CATALOG.containsKey(scopeId); } + /** + * Check whether current service belongs endpoint catalog + * + * @param scopeId represents an existing scope id. + * @return true is current scope set {@link ScopeDeclaration#catalog()} == {@link #ENDPOINT_CATALOG_NAME} + */ public static boolean inEndpointCatalog(int scopeId) { return ENDPOINT_CATALOG.containsKey(scopeId); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ScopeDeclaration.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ScopeDeclaration.java index c0ba4fe702ca0468154d0cb127b6a9b34dd6eee5..177b1de5d937883d2d996bf2438b0fe0831559ad 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ScopeDeclaration.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ScopeDeclaration.java @@ -22,9 +22,20 @@ import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord; /** - * DefaultScopeDefine id declaration. + * ScopeDeclaration includes + * + * 1.Source entity used in OAL script, such as Service as a Scope could be used like this in the OAL script. + * + * service_resp_time = from(Service.latency).longAvg(); + * + * 2. Manual source such as {@link Segment} + * + * 3. None stream entity like {@link ProfileTaskRecord}. + * + * NOTICE, in OAL script, `disable` is for stream, rather than source, it doesn't require this annotation. */ @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageData.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageData.java index ad5d746276e320fa78ddb46f0b42322b10859ba4..f8224835d1bf124f2c143c13287887ae43bfb5c4 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageData.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageData.java @@ -18,6 +18,12 @@ package org.apache.skywalking.oap.server.core.storage; +/** + * Any persistent entity should be an implementation of this interface. + */ public interface StorageData { + /** + * @return the unique id used in any storage option. + */ String id(); }