未验证 提交 0300048d 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Fill some comments for core. (#4356)

* Fill some comments for core.

* Fix a comment issue.
Co-authored-by: Nkezhenxu94 <kezhenxu94@163.com>
上级 c761f30e
......@@ -95,7 +95,7 @@ envoy_heap_memory_max_used = from(EnvoyInstanceMetric.value).filter(metricName =
envoy_total_connections_used = from(EnvoyInstanceMetric.value).filter(metricName == "server.total_connections").maxDouble();
envoy_parent_connections_used = from(EnvoyInstanceMetric.value).filter(metricName == "server.parent_connections").maxDouble();
// Disable unnecessary hard core sources
// Disable unnecessary hard core stream, targeting @Stream#name
/////////
// disable(segment);
// disable(endpoint_relation_server_side);
......
......@@ -20,6 +20,14 @@ package org.apache.skywalking.oap.server.core.analysis.data;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Data cache window. Window holds two data collections(A and B). They are switchable based on outside command. In any
* time, one collection is accepting the input data, and the other is immutable.
*
* This window makes sure there is not concurrency read-write situation.
*
* @param <DATA> type in the Window and internal collection.
*/
public abstract class Window<DATA> {
private AtomicInteger windowSwitch = new AtomicInteger(0);
......
......@@ -26,31 +26,71 @@ import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
/**
* Metrics represents the statistic data, which analysis by OAL script or hard code. It has the lifecycle controlled by
* TTL(time to live).
*/
public abstract class Metrics extends StreamData implements StorageData {
public static final String TIME_BUCKET = "time_bucket";
public static final String ENTITY_ID = "entity_id";
/**
* Time attribute
*/
@Getter
@Setter
@Column(columnName = TIME_BUCKET)
private long timeBucket;
/**
* Time in the cache, only work when MetricsPersistentWorker#enableDatabaseSession == true.
*/
@Getter
@Setter
private long survivalTime = 0L;
public abstract String id();
/**
* Merge the given metrics instance, these two must be the same metrics type.
*
* @param metrics to be merged
*/
public abstract void combine(Metrics metrics);
/**
* Calculate the metrics final value when required.
*/
public abstract void calculate();
/**
* Downsampling the metrics to hour precision.
*
* @return the metrics in hour precision in the clone mode.
*/
public abstract Metrics toHour();
/**
* Downsampling the metrics to day precision.
*
* @return the metrics in day precision in the clone mode.
*/
public abstract Metrics toDay();
/**
* Downsampling the metrics to month precision.
*
* @return the metrics in month precision in the clone mode.
*/
public abstract Metrics toMonth();
/**
* Extend the {@link #survivalTime}
*
* @param value to extend
*/
public void extendSurvivalTime(long value) {
survivalTime += value;
}
public long toTimeBucketInHour() {
if (isMinuteBucket()) {
return timeBucket / 100;
......
......@@ -20,13 +20,21 @@ package org.apache.skywalking.oap.server.core.analysis.record;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
/**
* Record storage represents the entity have fully and manually entity definition by hard codes. Most of then are
* original log data or task records. These data needs to persistent without further analysis.
*/
public abstract class Record implements StorageData {
public static final String TIME_BUCKET = "time_bucket";
/**
* Time attribute, all storage data is time sensitive, as same as {@link Metrics}
*/
@Getter
@Setter
@Column(columnName = TIME_BUCKET)
......
......@@ -57,8 +57,8 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
private final boolean enableDatabaseSession;
MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO,
AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker,
MetricsTransWorker transWorker, boolean enableDatabaseSession) {
AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker,
MetricsTransWorker transWorker, boolean enableDatabaseSession) {
super(moduleDefineHolder);
this.model = model;
this.databaseSession = new HashMap<>(100);
......@@ -152,7 +152,10 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
}
if (prepareRequests.size() > 0) {
logger.debug("prepare batch requests for model {}, took time: {}", model.getName(), System.currentTimeMillis() - start);
logger.debug(
"prepare batch requests for model {}, took time: {}", model.getName(),
System.currentTimeMillis() - start
);
}
}
......@@ -207,7 +210,8 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
Iterator<Metrics> iterator = databaseSession.values().iterator();
while (iterator.hasNext()) {
Metrics metrics = iterator.next();
metrics.setSurvivalTime(tookTime + metrics.getSurvivalTime());
metrics.extendSurvivalTime(tookTime);
// 70,000ms means more than one minute.
if (metrics.getSurvivalTime() > 70000) {
iterator.remove();
}
......
......@@ -29,6 +29,13 @@ import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* PersistenceWorker take the responsibility to pushing data to the final storage. The target storage is based on the
* activate storage implementation. This worker controls the persistence flow.
*
* @param <INPUT> The type of worker input. All inputs will be merged and saved.
* @param <CACHE> Cache type to hold all input.
*/
public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends Window<INPUT>> extends AbstractWorker<INPUT> {
private static final Logger logger = LoggerFactory.getLogger(PersistenceWorker.class);
......@@ -37,16 +44,34 @@ public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends
super(moduleDefineHolder);
}
/**
* Accept the input, and push the data into the cache.
*/
void onWork(INPUT input) {
cacheData(input);
}
/**
* Cache data based on different strategies. See the implementations for more details.
*/
public abstract void cacheData(INPUT input);
public abstract CACHE getCache();
/**
* The persistence process is driven by the {@link org.apache.skywalking.oap.server.core.storage.PersistenceTimer}.
* This is a notification method for the worker when every round finished.
*
* @param tookTime The time costs in this round.
*/
public abstract void endOfRound(long tookTime);
/**
* For every cache implementation(see {@link Window}), there are two dataset, switch them when one persistence round
* is beginning, in order to make cached data immutable.
*
* @return true if switch successfully.
*/
public boolean flushAndSwitch() {
boolean isSwitch;
try {
......@@ -59,6 +84,13 @@ public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends
return isSwitch;
}
/**
* Prepare the batch persistence, transfer all prepared data to the executable data format based on the storage
* implementations.
*
* @param lastCollection the source of transformation, they are in memory object format.
* @param prepareRequests data in the formats for the final persistence operations.
*/
public abstract void prepareBatch(Collection<INPUT> lastCollection, List<PrepareRequest> prepareRequests);
public final void buildBatchRequests(List<PrepareRequest> prepareRequests) {
......
......@@ -24,6 +24,9 @@ import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
/**
* RegisterSource represents the metadata entity.
*/
public abstract class RegisterSource extends StreamData implements StorageData {
public static final String SEQUENCE = "sequence";
......
......@@ -78,6 +78,9 @@ public class DefaultScopeDefine {
private static final Map<Integer, Boolean> SERVICE_INSTANCE_CATALOG = new HashMap<>();
private static final Map<Integer, Boolean> ENDPOINT_CATALOG = new HashMap<>();
/**
* Annotation scan listener
*/
public static class Listener implements AnnotationListener {
@Override
public Class<? extends Annotation> annotation() {
......@@ -93,16 +96,24 @@ public class DefaultScopeDefine {
}
}
public static final void addNewScope(ScopeDeclaration declaration, Class originalClass) {
/**
* Add a new scope based on the scan result
*
* @param declaration includes the definition.
* @param originalClass represents the class having the {@link ScopeDeclaration} annotation
*/
private static final void addNewScope(ScopeDeclaration declaration, Class originalClass) {
int id = declaration.id();
if (ID_2_NAME.containsKey(id)) {
throw new UnexpectedException("ScopeDeclaration id=" + id + " at " + originalClass.getName() + " has conflict with another named " + ID_2_NAME
.get(id));
throw new UnexpectedException(
"ScopeDeclaration id=" + id + " at " + originalClass.getName() + " has conflict with another named " + ID_2_NAME
.get(id));
}
String name = declaration.name();
if (NAME_2_ID.containsKey(name)) {
throw new UnexpectedException("ScopeDeclaration fieldName=" + name + " at " + originalClass.getName() + " has conflict with another id= " + NAME_2_ID
.get(name));
throw new UnexpectedException(
"ScopeDeclaration fieldName=" + name + " at " + originalClass.getName() + " has conflict with another id= " + NAME_2_ID
.get(name));
}
ID_2_NAME.put(id, name);
NAME_2_ID.put(name, id);
......@@ -112,16 +123,21 @@ public class DefaultScopeDefine {
ScopeDefaultColumn.VirtualColumnDefinition virtualColumn = (ScopeDefaultColumn.VirtualColumnDefinition) originalClass
.getAnnotation(ScopeDefaultColumn.VirtualColumnDefinition.class);
if (virtualColumn != null) {
scopeDefaultColumns.add(new ScopeDefaultColumn(virtualColumn.fieldName(), virtualColumn.columnName(), virtualColumn
.type(), virtualColumn.isID()));
scopeDefaultColumns.add(
new ScopeDefaultColumn(virtualColumn.fieldName(), virtualColumn.columnName(), virtualColumn
.type(), virtualColumn.isID()));
}
Field[] scopeClassField = originalClass.getDeclaredFields();
if (scopeClassField != null) {
for (Field field : scopeClassField) {
ScopeDefaultColumn.DefinedByField definedByField = field.getAnnotation(ScopeDefaultColumn.DefinedByField.class);
ScopeDefaultColumn.DefinedByField definedByField = field.getAnnotation(
ScopeDefaultColumn.DefinedByField.class);
if (definedByField != null) {
scopeDefaultColumns.add(new ScopeDefaultColumn(field.getName(), definedByField.columnName(), field.getType(), definedByField
.isID()));
scopeDefaultColumns.add(
new ScopeDefaultColumn(field.getName(), definedByField.columnName(), field.getType(),
definedByField
.isID()
));
}
}
}
......@@ -142,6 +158,12 @@ public class DefaultScopeDefine {
}
}
/**
* Fetch the name from given id
*
* @param id represents an existing scope id.
* @return scope name.
*/
public static String nameOf(int id) {
String name = ID_2_NAME.get(id);
if (name == null) {
......@@ -150,6 +172,12 @@ public class DefaultScopeDefine {
return name;
}
/**
* Fetch the id of given name
*
* @param name represents an existing scope name
* @return scope id
*/
public static int valueOf(String name) {
Integer id = NAME_2_ID.get(name);
if (id == null) {
......@@ -158,20 +186,41 @@ public class DefaultScopeDefine {
return id;
}
/**
* Reset all existing scope definitions. For test only.
*/
public static void reset() {
NAME_2_ID.clear();
ID_2_NAME.clear();
SCOPE_COLUMNS.clear();
}
/**
* Check whether current service belongs service catalog
*
* @param scopeId represents an existing scope id.
* @return true is current scope set {@link ScopeDeclaration#catalog()} == {@link #SERVICE_CATALOG_NAME}
*/
public static boolean inServiceCatalog(int scopeId) {
return SERVICE_CATALOG.containsKey(scopeId);
}
/**
* Check whether current service belongs service instance catalog
*
* @param scopeId represents an existing scope id.
* @return true is current scope set {@link ScopeDeclaration#catalog()} == {@link #SERVICE_INSTANCE_CATALOG_NAME}
*/
public static boolean inServiceInstanceCatalog(int scopeId) {
return SERVICE_INSTANCE_CATALOG.containsKey(scopeId);
}
/**
* Check whether current service belongs endpoint catalog
*
* @param scopeId represents an existing scope id.
* @return true is current scope set {@link ScopeDeclaration#catalog()} == {@link #ENDPOINT_CATALOG_NAME}
*/
public static boolean inEndpointCatalog(int scopeId) {
return ENDPOINT_CATALOG.containsKey(scopeId);
}
......
......@@ -22,9 +22,20 @@ import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
/**
* DefaultScopeDefine id declaration.
* ScopeDeclaration includes
*
* 1.Source entity used in OAL script, such as Service as a Scope could be used like this in the OAL script.
*
* service_resp_time = from(Service.latency).longAvg();
*
* 2. Manual source such as {@link Segment}
*
* 3. None stream entity like {@link ProfileTaskRecord}.
*
* NOTICE, in OAL script, `disable` is for stream, rather than source, it doesn't require this annotation.
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
......
......@@ -18,6 +18,12 @@
package org.apache.skywalking.oap.server.core.storage;
/**
* Any persistent entity should be an implementation of this interface.
*/
public interface StorageData {
/**
* @return the unique id used in any storage option.
*/
String id();
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册