未验证 提交 e55073e5 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

OAP Core polish, especially storage level (#4587)

- Add length definition with reasonable default value.
- #content of @Column has been renamed to storageOnly . I add this to many fields as they are not being query in any case.
- Merge H2 and MySQL columntype mapping back to consistent.
- Remove @IDColumn.
- Support @QueryUnifiedIndex.
- Refactor the MySQL and H2 installers to use @Column and @QueryUnifiedIndex definitions to create indices automatically. But the index naming rule has been changed to entityseqIDX. seq is the Increment Interger for every entity.
- Support @MetricsExtension and insertOnly in the MetricsPersistentWorker worker.
- Optimize MetricsStreamProcessor
上级 f56d98dc
......@@ -65,7 +65,6 @@ import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener;
import org.apache.skywalking.oap.server.core.oal.rt.OALCompileException;
import org.apache.skywalking.oap.server.core.oal.rt.OALEngine;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.IDColumn;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.util.ResourceUtils;
import org.slf4j.Logger;
......@@ -236,34 +235,31 @@ public class OALRuntime implements OALEngine {
*/
for (SourceColumn field : metricsStmt.getFieldsFromSource()) {
try {
CtField newField = CtField.make("private " + field.getType()
.getName() + " " + field.getFieldName() + ";", metricsClass);
CtField newField = CtField.make(
"private " + field.getType()
.getName() + " " + field.getFieldName() + ";", metricsClass);
metricsClass.addField(newField);
metricsClass.addMethod(CtNewMethod.getter(field.getFieldGetter(), newField));
metricsClass.addMethod(CtNewMethod.setter(field.getFieldSetter(), newField));
AnnotationsAttribute annotationsAttribute = new AnnotationsAttribute(constPool, AnnotationsAttribute.visibleTag);
AnnotationsAttribute annotationsAttribute = new AnnotationsAttribute(
constPool, AnnotationsAttribute.visibleTag);
/**
* Add @Column(columnName = "${sourceField.columnName}")
*/
Annotation columnAnnotation = new Annotation(Column.class.getName(), constPool);
columnAnnotation.addMemberValue("columnName", new StringMemberValue(field.getColumnName(), constPool));
annotationsAttribute.addAnnotation(columnAnnotation);
if (field.isID()) {
/**
* Add @IDColumn
*/
Annotation idAnnotation = new Annotation(IDColumn.class.getName(), constPool);
annotationsAttribute.addAnnotation(idAnnotation);
if (field.getType().equals(String.class)) {
columnAnnotation.addMemberValue("length", new IntegerMemberValue(constPool, field.getLength()));
}
annotationsAttribute.addAnnotation(columnAnnotation);
newField.getFieldInfo().addAttribute(annotationsAttribute);
} catch (CannotCompileException e) {
logger.error("Can't add field(including set/get) " + field.getFieldName() + " in " + className + ".", e);
logger.error(
"Can't add field(including set/get) " + field.getFieldName() + " in " + className + ".", e);
throw new OALCompileException(e.getMessage(), e);
}
}
......@@ -287,11 +283,13 @@ public class OALRuntime implements OALEngine {
*
* at Stream(name = "${tableName}", scopeId = ${sourceScopeId}, builder = ${metricsName}Metrics.Builder.class, processor = MetricsStreamProcessor.class)
*/
AnnotationsAttribute annotationsAttribute = new AnnotationsAttribute(constPool, AnnotationsAttribute.visibleTag);
AnnotationsAttribute annotationsAttribute = new AnnotationsAttribute(
constPool, AnnotationsAttribute.visibleTag);
Annotation streamAnnotation = new Annotation(Stream.class.getName(), constPool);
streamAnnotation.addMemberValue("name", new StringMemberValue(metricsStmt.getTableName(), constPool));
streamAnnotation.addMemberValue("scopeId", new IntegerMemberValue(constPool, metricsStmt.getSourceScopeId()));
streamAnnotation.addMemberValue("builder", new ClassMemberValue(metricsBuilderClassName(metricsStmt, true), constPool));
streamAnnotation.addMemberValue(
"builder", new ClassMemberValue(metricsBuilderClassName(metricsStmt, true), constPool));
streamAnnotation.addMemberValue("processor", new ClassMemberValue(METRICS_STREAM_PROCESSOR, constPool));
annotationsAttribute.addAnnotation(streamAnnotation);
......@@ -328,7 +326,8 @@ public class OALRuntime implements OALEngine {
* Create empty construct
*/
try {
CtConstructor defaultConstructor = CtNewConstructor.make("public " + className + "() {}", metricsBuilderClass);
CtConstructor defaultConstructor = CtNewConstructor.make(
"public " + className + "() {}", metricsBuilderClass);
metricsBuilderClass.addConstructor(defaultConstructor);
} catch (CannotCompileException e) {
logger.error("Can't add empty constructor in " + className + ".", e);
......@@ -363,7 +362,7 @@ public class OALRuntime implements OALEngine {
* Generate SourceDispatcher class and inject it to classloader
*/
private Class generateDispatcherClass(String scopeName,
DispatcherContext dispatcherContext) throws OALCompileException {
DispatcherContext dispatcherContext) throws OALCompileException {
String className = dispatcherClassName(scopeName, false);
CtClass dispatcherClass = classPool.makeClass(dispatcherClassName(scopeName, true));
......@@ -376,11 +375,22 @@ public class OALRuntime implements OALEngine {
* Set generic signature
*/
String sourceClassName = SOURCE_PACKAGE + dispatcherContext.getSource();
SignatureAttribute.ClassSignature dispatcherSignature = new SignatureAttribute.ClassSignature(null, null,
// Set interface and its generic params
new SignatureAttribute.ClassType[] {
new SignatureAttribute.ClassType(SourceDispatcher.class.getCanonicalName(), new SignatureAttribute.TypeArgument[] {new SignatureAttribute.TypeArgument(new SignatureAttribute.ClassType(sourceClassName))})
});
SignatureAttribute.ClassSignature dispatcherSignature =
new SignatureAttribute.ClassSignature(
null, null,
// Set interface and its generic params
new SignatureAttribute.ClassType[] {
new SignatureAttribute.ClassType(
SourceDispatcher.class
.getCanonicalName(),
new SignatureAttribute.TypeArgument[] {
new SignatureAttribute.TypeArgument(
new SignatureAttribute.ClassType(
sourceClassName))
}
)
}
);
dispatcherClass.setGenericSignature(dispatcherSignature.encode());
} catch (NotFoundException e) {
......@@ -397,7 +407,10 @@ public class OALRuntime implements OALEngine {
configuration.getTemplate("dispatcher/doMetrics.ftl").process(dispatcherContextMetric, methodEntity);
dispatcherClass.addMethod(CtNewMethod.make(methodEntity.toString(), dispatcherClass));
} catch (Exception e) {
logger.error("Can't generate method do" + dispatcherContextMetric.getMetricsName() + " for " + className + ".", e);
logger.error(
"Can't generate method do" + dispatcherContextMetric.getMetricsName() + " for " + className + ".",
e
);
logger.error("Method body as following" + System.lineSeparator() + "{}", methodEntity);
throw new OALCompileException(e.getMessage(), e);
}
......
......@@ -32,15 +32,17 @@ public class SourceColumn {
private Class<?> type;
private String typeName;
private boolean isID;
private int length;
private String fieldSetter;
private String fieldGetter;
public SourceColumn(String fieldName, String columnName, Class<?> type, boolean isID) {
public SourceColumn(String fieldName, String columnName, Class<?> type, boolean isID, int length) {
this.fieldName = fieldName;
this.columnName = columnName;
this.type = type;
this.typeName = type.getName();
this.isID = isID;
this.length = length;
this.fieldGetter = ClassMethodUtil.toGetMethod(fieldName);
this.fieldSetter = ClassMethodUtil.toSetMethod(fieldName);
......
......@@ -29,8 +29,9 @@ public class SourceColumnsFactory {
List<ScopeDefaultColumn> columns = DefaultScopeDefine.getDefaultColumns(source);
for (ScopeDefaultColumn defaultColumn : columns) {
sourceColumns.add(new SourceColumn(defaultColumn.getFieldName(), defaultColumn.getColumnName(), defaultColumn
.getType(), defaultColumn.isID()));
sourceColumns.add(
new SourceColumn(defaultColumn.getFieldName(), defaultColumn.getColumnName(), defaultColumn
.getType(), defaultColumn.isID(), defaultColumn.getLength()));
}
return sourceColumns;
}
......
......@@ -32,7 +32,6 @@ import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProces
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.IDColumn;
/**
* This class is auto generated. Please don't change this class manually.
......@@ -43,7 +42,6 @@ public class ServiceAvgMetrics extends LongAvgMetrics implements WithMetadata {
@Setter
@Getter
@Column(columnName = "entity_id")
@IDColumn
private java.lang.String entityId;
@Override
......
......@@ -46,9 +46,9 @@ import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
import org.apache.skywalking.oap.server.core.storage.model.IModelManager;
import org.apache.skywalking.oap.server.core.storage.model.IModelOverride;
import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
import org.apache.skywalking.oap.server.core.storage.model.INewModel;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
......@@ -110,8 +110,8 @@ public class CoreModule extends ModuleDefine {
}
private void addInsideService(List<Class> classes) {
classes.add(IModelSetter.class);
classes.add(IModelGetter.class);
classes.add(INewModel.class);
classes.add(IModelManager.class);
classes.add(IModelOverride.class);
classes.add(RemoteClientManager.class);
classes.add(RemoteSenderService.class);
......
......@@ -73,9 +73,9 @@ import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl;
import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
import org.apache.skywalking.oap.server.core.storage.model.IModelManager;
import org.apache.skywalking.oap.server.core.storage.model.IModelOverride;
import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
import org.apache.skywalking.oap.server.core.storage.model.INewModel;
import org.apache.skywalking.oap.server.core.storage.model.StorageModels;
import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
......@@ -207,8 +207,8 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(IWorkerInstanceSetter.class, instancesService);
this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager()));
this.registerServiceImplementation(IModelSetter.class, storageModels);
this.registerServiceImplementation(IModelGetter.class, storageModels);
this.registerServiceImplementation(INewModel.class, storageModels);
this.registerServiceImplementation(IModelManager.class, storageModels);
this.registerServiceImplementation(IModelOverride.class, storageModels);
this.registerServiceImplementation(
......
......@@ -54,11 +54,11 @@ public class AlarmRecord extends Record {
@Column(columnName = SCOPE)
private int scope;
@Column(columnName = NAME)
@Column(columnName = NAME, storageOnly = true)
private String name;
@Column(columnName = ID0)
@Column(columnName = ID0, storageOnly = true)
private String id0;
@Column(columnName = ID1)
@Column(columnName = ID1, storageOnly = true)
private String id1;
@Column(columnName = START_TIME)
private long startTime;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
/**
* MetricsExtension annotation defines extension attributes of the {@link Stream} with {@link MetricsStreamProcessor}.
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface MetricsExtension {
/**
* @return true if this metrics stream support down sampling.
*/
boolean supportDownSampling();
/**
* @return true if this metrics data could be updated.
*/
boolean supportUpdate();
}
......@@ -59,9 +59,4 @@ public @interface Stream {
* InventoryStreamProcessor}, {@link TopNStreamProcessor} and {@link NoneStreamingProcessor} for more details.
*/
Class<? extends StreamProcessor> processor();
/**
* @return true if this metrics stream support down sampling.
*/
boolean supportDownSampling() default true;
}
......@@ -23,11 +23,13 @@ import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.MetricsExtension;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
......@@ -42,8 +44,8 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.EN
@ScopeDeclaration(id = ENDPOINT_TRAFFIC, name = "EndpointTraffic")
@Stream(name = EndpointTraffic.INDEX_NAME, scopeId = DefaultScopeDefine.ENDPOINT_TRAFFIC,
builder = EndpointTraffic.Builder.class, processor = MetricsStreamProcessor.class,
supportDownSampling = false)
builder = EndpointTraffic.Builder.class, processor = MetricsStreamProcessor.class)
@MetricsExtension(supportDownSampling = false, supportUpdate = false)
public class EndpointTraffic extends Metrics {
public static final String INDEX_NAME = "endpoint_traffic";
......@@ -109,16 +111,12 @@ public class EndpointTraffic extends Metrics {
public String id() {
// Downgrade the time bucket to day level only.
// supportDownSampling == false for this entity.
String splitJointId = String.valueOf(getTimeBucket() / 10000);
splitJointId += Const.ID_SPLIT + buildId(this);
return splitJointId;
return buildId(this);
}
@Override
public int hashCode() {
int result = 17;
result = 31 * result + buildId(this).hashCode();
return result;
return Objects.hash(serviceId, name, detectPoint);
}
@Override
......
......@@ -30,7 +30,6 @@ import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.IDColumn;
@Stream(name = EndpointRelationServerSideMetrics.INDEX_NAME, scopeId = DefaultScopeDefine.ENDPOINT_RELATION, builder = EndpointRelationServerSideMetrics.Builder.class, processor = MetricsStreamProcessor.class)
public class EndpointRelationServerSideMetrics extends Metrics {
......@@ -43,22 +42,18 @@ public class EndpointRelationServerSideMetrics extends Metrics {
@Setter
@Getter
@Column(columnName = SOURCE_ENDPOINT)
@IDColumn
private String sourceEndpoint;
@Setter
@Getter
@Column(columnName = DEST_ENDPOINT)
@IDColumn
private String destEndpoint;
@Setter
@Getter
@Column(columnName = COMPONENT_ID)
@IDColumn
@Column(columnName = COMPONENT_ID, storageOnly = true)
private int componentId;
@Setter
@Getter
@Column(columnName = ENTITY_ID)
@IDColumn
private String entityId;
@Override
......
......@@ -32,7 +32,6 @@ import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.IDColumn;
@Stream(name = ServiceInstanceRelationClientSideMetrics.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_INSTANCE_RELATION, builder = ServiceInstanceRelationClientSideMetrics.Builder.class, processor = MetricsStreamProcessor.class)
public class ServiceInstanceRelationClientSideMetrics extends Metrics {
......@@ -51,7 +50,6 @@ public class ServiceInstanceRelationClientSideMetrics extends Metrics {
@Setter
@Getter
@Column(columnName = SOURCE_SERVICE_INSTANCE_ID)
@IDColumn
private int sourceServiceInstanceId;
@Setter
@Getter
......@@ -60,23 +58,21 @@ public class ServiceInstanceRelationClientSideMetrics extends Metrics {
@Setter
@Getter
@Column(columnName = DEST_SERVICE_INSTANCE_ID)
@IDColumn
private int destServiceInstanceId;
@Setter
@Getter
@Column(columnName = COMPONENT_ID)
@IDColumn
@Column(columnName = COMPONENT_ID, storageOnly = true)
private int componentId;
@Setter(AccessLevel.PRIVATE)
@Getter
@Column(columnName = ENTITY_ID)
@IDColumn
private String entityId;
@Override
public String id() {
String splitJointId = String.valueOf(getTimeBucket());
splitJointId += Const.ID_SPLIT + RelationDefineUtil.buildEntityId(new RelationDefineUtil.RelationDefine(sourceServiceInstanceId, destServiceInstanceId, componentId));
splitJointId += Const.ID_SPLIT + RelationDefineUtil.buildEntityId(
new RelationDefineUtil.RelationDefine(sourceServiceInstanceId, destServiceInstanceId, componentId));
return splitJointId;
}
......
......@@ -32,7 +32,6 @@ import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.IDColumn;
@Stream(name = ServiceInstanceRelationServerSideMetrics.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_INSTANCE_RELATION, builder = ServiceInstanceRelationServerSideMetrics.Builder.class, processor = MetricsStreamProcessor.class)
public class ServiceInstanceRelationServerSideMetrics extends Metrics {
......@@ -51,7 +50,6 @@ public class ServiceInstanceRelationServerSideMetrics extends Metrics {
@Setter
@Getter
@Column(columnName = SOURCE_SERVICE_INSTANCE_ID)
@IDColumn
private int sourceServiceInstanceId;
@Setter
@Getter
......@@ -60,23 +58,21 @@ public class ServiceInstanceRelationServerSideMetrics extends Metrics {
@Setter
@Getter
@Column(columnName = DEST_SERVICE_INSTANCE_ID)
@IDColumn
private int destServiceInstanceId;
@Setter
@Getter
@Column(columnName = COMPONENT_ID)
@IDColumn
@Column(columnName = COMPONENT_ID, storageOnly = true)
private int componentId;
@Setter(AccessLevel.PRIVATE)
@Getter
@Column(columnName = ENTITY_ID)
@IDColumn
private String entityId;
@Override
public String id() {
String splitJointId = String.valueOf(getTimeBucket());
splitJointId += Const.ID_SPLIT + RelationDefineUtil.buildEntityId(new RelationDefineUtil.RelationDefine(sourceServiceInstanceId, destServiceInstanceId, componentId));
splitJointId += Const.ID_SPLIT + RelationDefineUtil.buildEntityId(
new RelationDefineUtil.RelationDefine(sourceServiceInstanceId, destServiceInstanceId, componentId));
return splitJointId;
}
......
......@@ -32,7 +32,6 @@ import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.IDColumn;
@Stream(name = ServiceRelationClientSideMetrics.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_RELATION, builder = ServiceRelationClientSideMetrics.Builder.class, processor = MetricsStreamProcessor.class)
public class ServiceRelationClientSideMetrics extends Metrics {
......@@ -45,28 +44,25 @@ public class ServiceRelationClientSideMetrics extends Metrics {
@Setter
@Getter
@Column(columnName = SOURCE_SERVICE_ID)
@IDColumn
private int sourceServiceId;
@Setter
@Getter
@Column(columnName = DEST_SERVICE_ID)
@IDColumn
private int destServiceId;
@Setter
@Getter
@Column(columnName = COMPONENT_ID)
@IDColumn
@Column(columnName = COMPONENT_ID, storageOnly = true)
private int componentId;
@Setter(AccessLevel.PRIVATE)
@Getter
@Column(columnName = ENTITY_ID)
@IDColumn
private String entityId;
@Override
public String id() {
String splitJointId = String.valueOf(getTimeBucket());
splitJointId += Const.ID_SPLIT + RelationDefineUtil.buildEntityId(new RelationDefineUtil.RelationDefine(sourceServiceId, destServiceId, componentId));
splitJointId += Const.ID_SPLIT + RelationDefineUtil.buildEntityId(
new RelationDefineUtil.RelationDefine(sourceServiceId, destServiceId, componentId));
return splitJointId;
}
......
......@@ -32,7 +32,6 @@ import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.IDColumn;
@Stream(name = ServiceRelationServerSideMetrics.INDEX_NAME, scopeId = DefaultScopeDefine.SERVICE_RELATION, builder = ServiceRelationServerSideMetrics.Builder.class, processor = MetricsStreamProcessor.class)
public class ServiceRelationServerSideMetrics extends Metrics {
......@@ -45,22 +44,18 @@ public class ServiceRelationServerSideMetrics extends Metrics {
@Setter
@Getter
@Column(columnName = SOURCE_SERVICE_ID)
@IDColumn
private int sourceServiceId;
@Setter
@Getter
@Column(columnName = DEST_SERVICE_ID)
@IDColumn
private int destServiceId;
@Setter
@Getter
@Column(columnName = COMPONENT_ID)
@IDColumn
@Column(columnName = COMPONENT_ID, storageOnly = true)
private int componentId;
@Setter(AccessLevel.PRIVATE)
@Getter
@Column(columnName = ENTITY_ID)
@IDColumn
private String entityId;
@Override
......
......@@ -52,11 +52,11 @@ public class SegmentRecord extends Record {
@Setter
@Getter
@Column(columnName = SEGMENT_ID)
@Column(columnName = SEGMENT_ID, length = 150)
private String segmentId;
@Setter
@Getter
@Column(columnName = TRACE_ID)
@Column(columnName = TRACE_ID, length = 150)
private String traceId;
@Setter
@Getter
......@@ -92,11 +92,11 @@ public class SegmentRecord extends Record {
private int isError;
@Setter
@Getter
@Column(columnName = DATA_BINARY)
@Column(columnName = DATA_BINARY, storageOnly = true)
private byte[] dataBinary;
@Setter
@Getter
@Column(columnName = VERSION)
@Column(columnName = VERSION, storageOnly = true)
private int version;
@Override
......
......@@ -48,15 +48,15 @@ public abstract class ApdexMetrics extends Metrics implements IntValueHolder {
@Getter
@Setter
@Column(columnName = TOTAL_NUM)
@Column(columnName = TOTAL_NUM, storageOnly = true)
private int totalNum;
@Getter
@Setter
@Column(columnName = S_NUM)
@Column(columnName = S_NUM, storageOnly = true)
private int sNum;
@Getter
@Setter
@Column(columnName = T_NUM)
@Column(columnName = T_NUM, storageOnly = true)
private int tNum;
@Getter
@Setter
......
......@@ -38,7 +38,7 @@ public abstract class CPMMetrics extends Metrics implements LongValueHolder {
private long value;
@Getter
@Setter
@Column(columnName = TOTAL)
@Column(columnName = TOTAL, storageOnly = true)
private long total;
@Entrance
......
......@@ -36,11 +36,11 @@ public abstract class DoubleAvgMetrics extends Metrics implements DoubleValueHol
@Getter
@Setter
@Column(columnName = SUMMATION)
@Column(columnName = SUMMATION, storageOnly = true)
private double summation;
@Getter
@Setter
@Column(columnName = COUNT)
@Column(columnName = COUNT, storageOnly = true)
private long count;
@Getter
@Setter
......
......@@ -23,14 +23,14 @@ import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.IntKeyLongValuePair;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataType;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
/**
* IntKeyLongValue is a common bean, with key in Int and value in Long
*/
@Setter
@Getter
public class IntKeyLongValue implements Comparable<IntKeyLongValue>, StorageDataType {
public class IntKeyLongValue implements Comparable<IntKeyLongValue>, StorageDataComplexObject {
private int key;
private long value;
......@@ -83,8 +83,8 @@ public class IntKeyLongValue implements Comparable<IntKeyLongValue>, StorageData
@Override
public void toObject(String data) {
String[] keyValue = data.split(Const.KEY_VALUE_SPLIT);
this.key = Integer.valueOf(keyValue[0]);
this.value = Long.valueOf(keyValue[1]);
this.key = Integer.parseInt(keyValue[0]);
this.value = Long.parseLong(keyValue[1]);
}
@Override
......
......@@ -22,9 +22,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataType;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
public class IntKeyLongValueHashMap extends HashMap<Integer, IntKeyLongValue> implements StorageDataType {
public class IntKeyLongValueHashMap extends HashMap<Integer, IntKeyLongValue> implements StorageDataComplexObject {
public IntKeyLongValueHashMap() {
super();
......
......@@ -36,11 +36,11 @@ public abstract class LongAvgMetrics extends Metrics implements LongValueHolder
@Getter
@Setter
@Column(columnName = SUMMATION)
@Column(columnName = SUMMATION, storageOnly = true)
private long summation;
@Getter
@Setter
@Column(columnName = COUNT)
@Column(columnName = COUNT, storageOnly = true)
private long count;
@Getter
@Setter
......
......@@ -45,17 +45,21 @@ public abstract class PercentileMetrics extends GroupMetrics implements MultiInt
99
};
/**
* The special case when the column is isValue = true, but storageOnly = true, because it is {@link
* IntKeyLongValueHashMap} type, this column can't be query by the aggregation way.
*/
@Getter
@Setter
@Column(columnName = VALUE, isValue = true)
@Column(columnName = VALUE, isValue = true, storageOnly = true)
private IntKeyLongValueHashMap percentileValues;
@Getter
@Setter
@Column(columnName = PRECISION)
@Column(columnName = PRECISION, storageOnly = true)
private int precision;
@Getter
@Setter
@Column(columnName = DATASET)
@Column(columnName = DATASET, storageOnly = true)
private IntKeyLongValueHashMap dataset;
private boolean isCalculated;
......
......@@ -46,11 +46,11 @@ public abstract class PxxMetrics extends GroupMetrics implements IntValueHolder
private int value;
@Getter
@Setter
@Column(columnName = PRECISION)
@Column(columnName = PRECISION, storageOnly = true)
private int precision;
@Getter
@Setter
@Column(columnName = DETAIL_GROUP)
@Column(columnName = DETAIL_GROUP, storageOnly = true)
private IntKeyLongValueHashMap detailGroup;
private final int percentileRank;
......
......@@ -43,15 +43,19 @@ public abstract class ThermodynamicMetrics extends GroupMetrics {
@Getter
@Setter
@Column(columnName = STEP)
@Column(columnName = STEP, storageOnly = true)
private int step = 0;
@Getter
@Setter
@Column(columnName = NUM_OF_STEPS)
@Column(columnName = NUM_OF_STEPS, storageOnly = true)
private int numOfSteps = 0;
/**
* The special case when the column is isValue = true, but storageOnly = true, because it is {@link
* IntKeyLongValueHashMap} type, this column can't be query by the aggregation way.
*/
@Getter
@Setter
@Column(columnName = DETAIL_GROUP, isValue = true)
@Column(columnName = DETAIL_GROUP, isValue = true, storageOnly = true)
private IntKeyLongValueHashMap detailGroup = new IntKeyLongValueHashMap(30);
/**
......
......@@ -35,7 +35,7 @@ public abstract class TopN extends Record implements ComparableStorageData {
@Getter
@Setter
@Column(columnName = STATEMENT, content = true)
@Column(columnName = STATEMENT, storageOnly = true)
private String statement;
@Getter
@Setter
......
......@@ -25,7 +25,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
......@@ -36,7 +36,6 @@ import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.annotation.IDColumn;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
......@@ -48,27 +47,29 @@ import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
@Slf4j
public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDataCache<Metrics>> {
private final Model model;
private final Map<Metrics, Metrics> databaseSession;
private final Map<Metrics, Metrics> context;
private final MergeDataCache<Metrics> mergeDataCache;
private final IMetricsDAO metricsDAO;
private final AbstractWorker<Metrics> nextAlarmWorker;
private final AbstractWorker<ExportEvent> nextExportWorker;
private final Optional<AbstractWorker<Metrics>> nextAlarmWorker;
private final Optional<AbstractWorker<ExportEvent>> nextExportWorker;
private final DataCarrier<Metrics> dataCarrier;
private final MetricsTransWorker transWorker;
private final Optional<MetricsTransWorker> transWorker;
private final boolean enableDatabaseSession;
private final boolean supportUpdate;
MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO,
AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker,
MetricsTransWorker transWorker, boolean enableDatabaseSession) {
MetricsTransWorker transWorker, boolean enableDatabaseSession, boolean supportUpdate) {
super(moduleDefineHolder);
this.model = model;
this.databaseSession = new HashMap<>(100);
this.context = new HashMap<>(100);
this.enableDatabaseSession = enableDatabaseSession;
this.mergeDataCache = new MergeDataCache<>();
this.metricsDAO = metricsDAO;
this.nextAlarmWorker = nextAlarmWorker;
this.nextExportWorker = nextExportWorker;
this.transWorker = transWorker;
this.nextAlarmWorker = Optional.ofNullable(nextAlarmWorker);
this.nextExportWorker = Optional.ofNullable(nextExportWorker);
this.transWorker = Optional.ofNullable(transWorker);
this.supportUpdate = supportUpdate;
String name = "METRICS_L2_AGGREGATION";
int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
......@@ -86,6 +87,17 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new PersistentConsumer(this));
}
/**
* Create the leaf MetricsPersistentWorker, no next step.
*/
MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO,
boolean enableDatabaseSession, boolean supportUpdate) {
this(moduleDefineHolder, model, metricsDAO,
null, null, null,
enableDatabaseSession, supportUpdate
);
}
@Override
void onWork(Metrics metrics) {
cacheData(metrics);
......@@ -107,52 +119,28 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
@Override
public void prepareBatch(Collection<Metrics> lastCollection, List<PrepareRequest> prepareRequests) {
long start = System.currentTimeMillis();
if (lastCollection.size() == 0) {
return;
}
int i = 0;
int batchGetSize = 2000;
Metrics[] metrics = null;
/*
* Hard coded the max size. This is only the batch size of one metrics, too large number is meaningless.
*/
int maxBatchGetSize = 2000;
final int batchSize = Math.max(maxBatchGetSize, lastCollection.size());
List<Metrics> metricsList = new ArrayList<>();
for (Metrics data : lastCollection) {
if (Objects.nonNull(nextExportWorker)) {
ExportEvent event = new ExportEvent(data, ExportEvent.EventType.INCREMENT);
nextExportWorker.in(event);
}
if (Objects.nonNull(transWorker)) {
transWorker.in(data);
}
transWorker.ifPresent(metricsTransWorker -> metricsTransWorker.in(data));
int mod = i % batchGetSize;
if (mod == 0) {
int residual = lastCollection.size() - i;
if (residual >= batchGetSize) {
metrics = new Metrics[batchGetSize];
} else {
metrics = new Metrics[residual];
}
}
metrics[mod] = data;
if (mod == metrics.length - 1) {
try {
syncStorageToCache(metrics);
for (Metrics metric : metrics) {
Metrics cacheMetric = databaseSession.get(metric);
if (cacheMetric != null) {
cacheMetric.combine(metric);
cacheMetric.calculate();
prepareRequests.add(metricsDAO.prepareBatchUpdate(model, cacheMetric));
nextWorker(cacheMetric);
} else {
prepareRequests.add(metricsDAO.prepareBatchInsert(model, metric));
nextWorker(metric);
}
}
} catch (Throwable t) {
log.error(t.getMessage(), t);
}
metricsList.add(data);
if (metricsList.size() == batchSize) {
flushDataToStorage(metricsList, prepareRequests);
}
}
i++;
if (metricsList.size() > 0) {
flushDataToStorage(metricsList, prepareRequests);
}
if (prepareRequests.size() > 0) {
......@@ -163,16 +151,52 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
}
}
private void nextWorker(Metrics metric) {
if (Objects.nonNull(nextAlarmWorker)) {
nextAlarmWorker.in(metric);
}
if (Objects.nonNull(nextExportWorker)) {
ExportEvent event = new ExportEvent(metric, ExportEvent.EventType.TOTAL);
nextExportWorker.in(event);
private void flushDataToStorage(List<Metrics> metricsList,
List<PrepareRequest> prepareRequests) {
try {
loadFromStorage(metricsList);
for (Metrics metrics : metricsList) {
Metrics cachedMetrics = context.get(metrics);
if (cachedMetrics != null) {
/*
* If the metrics is not supportUpdate, defined through MetricsExtension#supportUpdate,
* then no merge and further process happens.
*/
if (!supportUpdate) {
continue;
}
/*
* Merge metrics into cachedMetrics, change only happens inside cachedMetrics.
*/
cachedMetrics.combine(metrics);
cachedMetrics.calculate();
prepareRequests.add(metricsDAO.prepareBatchUpdate(model, cachedMetrics));
nextWorker(cachedMetrics);
/*
* The `data` should be not changed in any case. Exporter is an async process.
*/
nextExportWorker.ifPresent(exportEvenWorker -> exportEvenWorker.in(
new ExportEvent(metrics, ExportEvent.EventType.INCREMENT)));
} else {
prepareRequests.add(metricsDAO.prepareBatchInsert(model, metrics));
nextWorker(metrics);
}
}
} catch (Throwable t) {
log.error(t.getMessage(), t);
} finally {
metricsList.clear();
}
}
private void nextWorker(Metrics metrics) {
nextAlarmWorker.ifPresent(nextAlarmWorker -> nextAlarmWorker.in(metrics));
nextExportWorker.ifPresent(
nextExportWorker -> nextExportWorker.in(new ExportEvent(metrics, ExportEvent.EventType.TOTAL)));
}
@Override
public void cacheData(Metrics input) {
mergeDataCache.writing();
......@@ -189,16 +213,16 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
}
/**
* Sync data to the cache if the {@link #enableDatabaseSession} == true.
* Load data from the storage, if {@link #enableDatabaseSession} == true, only load data when the id doesn't exist.
*/
private void syncStorageToCache(Metrics[] metrics) throws IOException {
private void loadFromStorage(List<Metrics> metrics) throws IOException {
if (!enableDatabaseSession) {
databaseSession.clear();
context.clear();
}
List<String> notInCacheIds = new ArrayList<>();
for (Metrics metric : metrics) {
if (!databaseSession.containsKey(metric)) {
if (!context.containsKey(metric)) {
notInCacheIds.add(metric.id());
}
}
......@@ -206,7 +230,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
if (notInCacheIds.size() > 0) {
List<Metrics> metricsList = metricsDAO.multiGet(model, notInCacheIds);
for (Metrics metric : metricsList) {
databaseSession.put(metric, metric);
context.put(metric, metric);
}
}
}
......@@ -214,7 +238,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
@Override
public void endOfRound(long tookTime) {
if (enableDatabaseSession) {
Iterator<Metrics> iterator = databaseSession.values().iterator();
Iterator<Metrics> iterator = context.values().iterator();
while (iterator.hasNext()) {
Metrics metrics = iterator.next();
metrics.extendSurvivalTime(tookTime);
......@@ -229,7 +253,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
/**
* Metrics queue processor, merge the received metrics if existing one with same ID(s) and time bucket.
*
* ID is declared through {@link IDColumn}
* ID is declared through {@link Object#hashCode()} and {@link Object#equals(Object)} as usual.
*/
private class PersistentConsumer implements IConsumer<Metrics> {
......
......@@ -28,6 +28,7 @@ import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
import org.apache.skywalking.oap.server.core.analysis.Downsampling;
import org.apache.skywalking.oap.server.core.analysis.MetricsExtension;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
......@@ -36,7 +37,7 @@ import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
import org.apache.skywalking.oap.server.core.storage.model.INewModel;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
......@@ -104,7 +105,7 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " metrics DAO failure.", e);
}
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
INewModel modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(INewModel.class);
DownsamplingConfigService configService = moduleDefineHolder.find(CoreModule.NAME)
.provider()
.getService(DownsamplingConfigService.class);
......@@ -114,31 +115,42 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
MetricsPersistentWorker monthPersistentWorker = null;
MetricsTransWorker transWorker = null;
if (stream.supportDownSampling()) {
final MetricsExtension metricsExtension = metricsClass.getAnnotation(MetricsExtension.class);
/**
* All metrics default are `supportDownSampling` and `insertAndUpdate`, unless it has explicit definition.
*/
boolean supportDownSampling = true;
boolean supportUpdate = true;
if (metricsExtension != null) {
supportDownSampling = metricsExtension.supportDownSampling();
supportUpdate = metricsExtension.supportUpdate();
}
if (supportDownSampling) {
if (configService.shouldToHour()) {
Model model = modelSetter.putIfAbsent(
Model model = modelSetter.add(
metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Hour), false);
hourPersistentWorker = worker(moduleDefineHolder, metricsDAO, model);
hourPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate);
}
if (configService.shouldToDay()) {
Model model = modelSetter.putIfAbsent(
Model model = modelSetter.add(
metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Day), false);
dayPersistentWorker = worker(moduleDefineHolder, metricsDAO, model);
dayPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate);
}
if (configService.shouldToMonth()) {
Model model = modelSetter.putIfAbsent(
Model model = modelSetter.add(
metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Month), false);
monthPersistentWorker = worker(moduleDefineHolder, metricsDAO, model);
monthPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate);
}
transWorker = new MetricsTransWorker(
moduleDefineHolder, stream.name(), hourPersistentWorker, dayPersistentWorker, monthPersistentWorker);
}
Model model = modelSetter.putIfAbsent(
Model model = modelSetter.add(
metricsClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Minute), false);
MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(
moduleDefineHolder, metricsDAO, model, transWorker);
moduleDefineHolder, metricsDAO, model, transWorker, supportUpdate);
String remoteReceiverWorkerName = stream.name() + "_rec";
IWorkerInstanceSetter workerInstanceSetter = moduleDefineHolder.find(CoreModule.NAME)
......@@ -156,20 +168,26 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
private MetricsPersistentWorker minutePersistentWorker(ModuleDefineHolder moduleDefineHolder,
IMetricsDAO metricsDAO,
Model model,
MetricsTransWorker transWorker) {
MetricsTransWorker transWorker,
boolean supportUpdate) {
AlarmNotifyWorker alarmNotifyWorker = new AlarmNotifyWorker(moduleDefineHolder);
ExportWorker exportWorker = new ExportWorker(moduleDefineHolder);
MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentWorker(
moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker, transWorker, enableDatabaseSession);
moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker, transWorker, enableDatabaseSession,
supportUpdate
);
persistentWorkers.add(minutePersistentWorker);
return minutePersistentWorker;
}
private MetricsPersistentWorker worker(ModuleDefineHolder moduleDefineHolder, IMetricsDAO metricsDAO, Model model) {
private MetricsPersistentWorker downSamplingWorker(ModuleDefineHolder moduleDefineHolder,
IMetricsDAO metricsDAO,
Model model,
boolean supportUpdate) {
MetricsPersistentWorker persistentWorker = new MetricsPersistentWorker(
moduleDefineHolder, model, metricsDAO, null, null, null, enableDatabaseSession);
moduleDefineHolder, model, metricsDAO, enableDatabaseSession, supportUpdate);
persistentWorkers.add(persistentWorker);
return persistentWorker;
......
......@@ -31,7 +31,7 @@ import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
import org.apache.skywalking.oap.server.core.storage.model.INewModel;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
......@@ -72,8 +72,8 @@ public class NoneStreamingProcessor implements StreamProcessor<NoneStream> {
.getSimpleName() + " none stream record DAO failure.", e);
}
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
Model model = modelSetter.putIfAbsent(streamClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second), true);
INewModel modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(INewModel.class);
Model model = modelSetter.add(streamClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second), true);
final NoneStreamPersistentWorker persistentWorker = new NoneStreamPersistentWorker(moduleDefineHolder, model, noneStream);
workers.put(streamClass, persistentWorker);
......
......@@ -31,7 +31,7 @@ import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
import org.apache.skywalking.oap.server.core.storage.model.INewModel;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
......@@ -66,8 +66,8 @@ public class RecordStreamProcessor implements StreamProcessor<Record> {
throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " record DAO failure.", e);
}
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
Model model = modelSetter.putIfAbsent(recordClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second), true);
INewModel modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(INewModel.class);
Model model = modelSetter.add(recordClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second), true);
RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, recordDAO);
workers.put(recordClass, persistentWorker);
......
......@@ -36,7 +36,7 @@ import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
import org.apache.skywalking.oap.server.core.storage.model.INewModel;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
......@@ -77,8 +77,8 @@ public class TopNStreamProcessor implements StreamProcessor<TopN> {
.getSimpleName() + " top n record DAO failure.", e);
}
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
Model model = modelSetter.putIfAbsent(topNClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second), true);
INewModel modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(INewModel.class);
Model model = modelSetter.add(topNClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second), true);
TopNWorker persistentWorker = new TopNWorker(moduleDefineHolder, model, topSize, topNWorkerReportCycle * 60 * 1000L, recordDAO);
persistentWorkers.add(persistentWorker);
......
......@@ -47,11 +47,11 @@ public class ProfileTaskLogRecord extends Record {
public static final String OPERATION_TYPE = "operation_type";
public static final String OPERATION_TIME = "operation_time";
@Column(columnName = TASK_ID)
@Column(columnName = TASK_ID, storageOnly = true)
private String taskId;
@Column(columnName = INSTANCE_ID)
@Column(columnName = INSTANCE_ID, storageOnly = true)
private int instanceId;
@Column(columnName = OPERATION_TYPE)
@Column(columnName = OPERATION_TYPE, storageOnly = true)
private int operationType;
@Column(columnName = OPERATION_TIME)
private long operationTime;
......
......@@ -31,6 +31,7 @@ import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcess
import org.apache.skywalking.oap.server.core.source.ScopeDeclaration;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.QueryUnifiedIndex;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROFILE_TASK_SEGMENT_SNAPSHOT;
......@@ -52,8 +53,11 @@ public class ProfileThreadSnapshotRecord extends Record {
public static final String STACK_BINARY = "stack_binary";
@Column(columnName = TASK_ID)
@QueryUnifiedIndex(withColumns = {SEGMENT_ID})
private String taskId;
@Column(columnName = SEGMENT_ID)
@QueryUnifiedIndex(withColumns = {SEQUENCE})
@QueryUnifiedIndex(withColumns = {DUMP_TIME})
private String segmentId;
@Column(columnName = DUMP_TIME)
private long dumpTime;
......
......@@ -36,6 +36,7 @@ import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.source.ScopeDeclaration;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.QueryUnifiedIndex;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import static java.util.Objects.nonNull;
......@@ -64,6 +65,10 @@ public class ServiceInstanceInventory extends RegisterSource {
@Setter
@Getter
@Column(columnName = NAME)
@QueryUnifiedIndex(withColumns = {
HEARTBEAT_TIME,
REGISTER_TIME
})
private String name = Const.EMPTY_STRING;
@Setter
@Getter
......
......@@ -34,6 +34,7 @@ import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.source.ScopeDeclaration;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.QueryUnifiedIndex;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SERVICE_INVENTORY;
......@@ -55,6 +56,10 @@ public class ServiceInventory extends RegisterSource {
@Setter
@Getter
@Column(columnName = NAME, matchQuery = true)
@QueryUnifiedIndex(withColumns = {
HEARTBEAT_TIME,
REGISTER_TIME
})
private String name = Const.EMPTY_STRING;
@Setter
@Getter
......
......@@ -32,7 +32,7 @@ import org.apache.skywalking.oap.server.core.storage.IRegisterDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
import org.apache.skywalking.oap.server.core.storage.model.INewModel;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
......@@ -82,8 +82,8 @@ public class InventoryStreamProcessor implements StreamProcessor<RegisterSource>
throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " register DAO failure.", e);
}
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
Model model = modelSetter.putIfAbsent(
INewModel modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(INewModel.class);
Model model = modelSetter.add(
inventoryClass, stream.scopeId(), new Storage(stream.name(), false, false, Downsampling.None), false);
RegisterPersistentWorker persistentWorker = new RegisterPersistentWorker(
......
......@@ -117,12 +117,17 @@ public class DefaultScopeDefine {
"ScopeDeclaration id=" + id + " at " + originalClass.getName() + " has conflict with another named " + ID_2_NAME
.get(id));
}
if (id < 0) {
throw new UnexpectedException(
"ScopeDeclaration id=" + id + " at " + originalClass.getName() + " is negative. ");
}
String name = declaration.name();
if (NAME_2_ID.containsKey(name)) {
throw new UnexpectedException(
"ScopeDeclaration fieldName=" + name + " at " + originalClass.getName() + " has conflict with another id= " + NAME_2_ID
.get(name));
}
ID_2_NAME.put(id, name);
NAME_2_ID.put(name, id);
......@@ -133,7 +138,7 @@ public class DefaultScopeDefine {
if (virtualColumn != null) {
scopeDefaultColumns.add(
new ScopeDefaultColumn(virtualColumn.fieldName(), virtualColumn.columnName(), virtualColumn
.type(), virtualColumn.isID()));
.type(), virtualColumn.isID(), virtualColumn.length()));
}
Field[] scopeClassField = originalClass.getDeclaredFields();
if (scopeClassField != null) {
......@@ -143,9 +148,9 @@ public class DefaultScopeDefine {
if (definedByField != null) {
if (!definedByField.requireDynamicActive() || ACTIVE_EXTRA_MODEL_COLUMNS) {
scopeDefaultColumns.add(
new ScopeDefaultColumn(field.getName(), definedByField.columnName(), field.getType(),
definedByField
.isID()
new ScopeDefaultColumn(
field.getName(), definedByField.columnName(), field.getType(), false,
definedByField.length()
));
}
}
......
......@@ -24,25 +24,25 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
/**
* Define the default columns of source scope. These columns pass down into the persistent entity(OAL metrics entity)
* automatically.
*/
@Getter(AccessLevel.PUBLIC)
@Setter(AccessLevel.PUBLIC)
public class ScopeDefaultColumn {
private String fieldName;
private String columnName;
private Class<?> type;
private boolean isID;
private int length;
public ScopeDefaultColumn(String fieldName, String columnName, Class<?> type, boolean isID) {
public ScopeDefaultColumn(String fieldName, String columnName, Class<?> type, boolean isID, int length) {
this.fieldName = fieldName;
this.columnName = columnName;
this.type = type;
this.isID = isID;
this.length = length;
}
@Target({ElementType.FIELD})
......@@ -50,14 +50,17 @@ public class ScopeDefaultColumn {
public @interface DefinedByField {
String columnName();
boolean isID() default false;
/**
* Dynamic active means this column is only activated through core setting explicitly.
*
* @return
*/
boolean requireDynamicActive() default false;
/**
* Define column length, only effective when the type is String.
*/
int length() default 256;
}
@Target({ElementType.TYPE})
......@@ -70,5 +73,10 @@ public class ScopeDefaultColumn {
Class type();
boolean isID() default false;
/**
* Define column length, only effective when the type is String.
*/
int length() default 512;
}
}
......@@ -55,5 +55,12 @@ public @interface Column {
/**
* The column is just saved, never used in query.
*/
boolean content() default false;
boolean storageOnly() default false;
/**
* @return the length of this column, this is only for {@link String} column. The usage of this depends on the
* storage implementation.
* @since 7.1.0
*/
int length() default 200;
}
......@@ -24,9 +24,10 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* IDColumn is the plus annotation for {@link Column}, declares this column is ID for the entity, besides time(bucket).
* The support of the multiple {@link QueryUnifiedIndex}s on one field.
*/
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
public @interface IDColumn {
public @interface MultipleQueryUnifiedIndex {
QueryUnifiedIndex[] value();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Repeatable;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* QueryIndex defines the unified index is required in the query stage. This works only the storage supports this kind
* of index model. Mostly, work for the typical relational database, such as MySQL, TiDB.
*/
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Repeatable(MultipleQueryUnifiedIndex.class)
public @interface QueryUnifiedIndex {
/**
* @return list of other column should be add into the unified index.
*/
String[] withColumns();
}
......@@ -18,15 +18,22 @@
package org.apache.skywalking.oap.server.core.storage.model;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
/**
* Short column name unsupported for now. No define in @Column annotation. The storage implementation need to use name
* to do match.
*/
@Slf4j
@ToString
public class ColumnName {
private final String modelName;
private String fullName;
private String storageName = null;
public ColumnName(String fullName) {
public ColumnName(String modelName, String fullName) {
this.modelName = modelName;
this.fullName = fullName;
}
......@@ -38,7 +45,13 @@ public class ColumnName {
return storageName != null ? storageName : fullName;
}
public void setStorageName(String storageName) {
this.storageName = storageName;
public void overrideName(String oldName, String storageName) {
if (fullName.equals(oldName)) {
log.debug(
"Model {} column {} has been override. The new column name is {}.",
modelName, oldName, storageName
);
this.storageName = storageName;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage.model;
import lombok.Getter;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
/**
* The extra query index if the storage could support this mode. Many NO-SQL support one column index only, in that
* case, this could be ignored in the implementation level.
*/
@Getter
public class ExtraQueryIndex {
private String[] columns;
public ExtraQueryIndex(String mainColumn, final String[] withColumns) {
if (CollectionUtils.isNotEmpty(withColumns)) {
columns = new String[withColumns.length + 1];
columns[0] = mainColumn;
System.arraycopy(withColumns, 0, columns, 1, withColumns.length);
} else {
throw new IllegalArgumentException("ExtraQueryIndex required withColumns as a not empty list.");
}
}
/**
* Keep the same name replacement as {@link ColumnName#overrideName(String, String)}
*
* @param oldName to be replaced.
* @param newName to use in the storage level.
*/
public void overrideName(String oldName, String newName) {
for (int i = 0; i < columns.length; i++) {
if (columns[i].equals(oldName)) {
columns[i] = newName;
}
}
}
}
......@@ -21,6 +21,9 @@ package org.apache.skywalking.oap.server.core.storage.model;
import java.util.List;
import org.apache.skywalking.oap.server.library.module.Service;
public interface IModelGetter extends Service {
List<Model> getModels();
/**
* IModelManager implementation supports to read all existing models.
*/
public interface IModelManager extends Service {
List<Model> allModels();
}
......@@ -21,7 +21,14 @@ package org.apache.skywalking.oap.server.core.storage.model;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.library.module.Service;
public interface IModelSetter extends Service {
Model putIfAbsent(Class aClass, int scopeId, Storage storage, boolean record);
/**
* INewModel implementation supports creating a new module.
*/
public interface INewModel extends Service {
/**
* Add a new model
*
* @return the created new model
*/
Model add(Class aClass, int scopeId, Storage storage, boolean record);
}
......@@ -22,6 +22,9 @@ import java.util.List;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.analysis.Downsampling;
/**
* The model definition of a logic entity.
*/
@Getter
public class Model {
......@@ -30,12 +33,20 @@ public class Model {
private final Downsampling downsampling;
private final boolean deleteHistory;
private final List<ModelColumn> columns;
private final List<ExtraQueryIndex> extraQueryIndices;
private final int scopeId;
private final boolean record;
public Model(String name, List<ModelColumn> columns, boolean capableOfTimeSeries, boolean deleteHistory,
int scopeId, Downsampling downsampling, boolean record) {
public Model(String name,
List<ModelColumn> columns,
List<ExtraQueryIndex> extraQueryIndices,
boolean capableOfTimeSeries,
boolean deleteHistory,
int scopeId,
Downsampling downsampling,
boolean record) {
this.columns = columns;
this.extraQueryIndices = extraQueryIndices;
this.capableOfTimeSeries = capableOfTimeSeries;
this.downsampling = downsampling;
this.deleteHistory = deleteHistory;
......
......@@ -19,18 +19,45 @@
package org.apache.skywalking.oap.server.core.storage.model;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.analysis.metrics.IntKeyLongValueHashMap;
@Getter
public class ModelColumn {
private final ColumnName columnName;
private final Class<?> type;
private final boolean matchQuery;
private final boolean content;
private final boolean storageOnly;
private final int length;
public ModelColumn(ColumnName columnName, Class<?> type, boolean matchQuery, boolean content) {
public ModelColumn(ColumnName columnName,
Class<?> type,
boolean matchQuery,
boolean storageOnly,
boolean isValue,
int length) {
this.columnName = columnName;
this.type = type;
this.matchQuery = matchQuery;
this.content = content;
/*
* Only accept length in the String definition.
*/
if (!type.equals(String.class)) {
this.length = 0;
} else {
this.length = length;
}
/*
* byte[] and {@link IntKeyLongValueHashMap} could never be query.
*/
if (type.equals(byte[].class) || type.equals(IntKeyLongValueHashMap.class)) {
this.storageOnly = true;
} else {
if (storageOnly && isValue) {
throw new IllegalArgumentException(
"The column " + columnName + " can't be defined as both isValue and storageOnly.");
}
this.storageOnly = storageOnly;
}
}
}
......@@ -41,9 +41,9 @@ public abstract class ModelInstaller {
* Entrance of the storage entity installation work.
*/
public final void install(Client client) throws StorageException {
IModelGetter modelGetter = moduleManager.find(CoreModule.NAME).provider().getService(IModelGetter.class);
IModelManager modelGetter = moduleManager.find(CoreModule.NAME).provider().getService(IModelManager.class);
List<Model> models = modelGetter.getModels();
List<Model> models = modelGetter.allModels();
if (RunningMode.isNoInitMode()) {
for (Model model : models) {
......
......@@ -18,22 +18,24 @@
package org.apache.skywalking.oap.server.core.storage.model;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.MultipleQueryUnifiedIndex;
import org.apache.skywalking.oap.server.core.storage.annotation.QueryUnifiedIndex;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StorageModels implements IModelGetter, IModelSetter, IModelOverride {
private static final Logger logger = LoggerFactory.getLogger(StorageModels.class);
@Getter
/**
* StorageModels manages all models detected by the core.
*/
@Slf4j
public class StorageModels implements IModelManager, INewModel, IModelOverride {
private final List<Model> models;
public StorageModels() {
......@@ -41,7 +43,7 @@ public class StorageModels implements IModelGetter, IModelSetter, IModelOverride
}
@Override
public Model putIfAbsent(Class aClass, int scopeId, Storage storage, boolean record) {
public Model add(Class aClass, int scopeId, Storage storage, boolean record) {
// Check this scope id is valid.
DefaultScopeDefine.nameOf(scopeId);
......@@ -51,47 +53,77 @@ public class StorageModels implements IModelGetter, IModelSetter, IModelOverride
}
}
List<ModelColumn> modelColumns = new LinkedList<>();
retrieval(aClass, storage.getModelName(), modelColumns);
List<ModelColumn> modelColumns = new ArrayList<>();
List<ExtraQueryIndex> extraQueryIndices = new ArrayList<>();
retrieval(aClass, storage.getModelName(), modelColumns, extraQueryIndices);
Model model = new Model(storage.getModelName(), modelColumns, storage.isCapableOfTimeSeries(), storage.isDeleteHistory(), scopeId, storage
.getDownsampling(), record);
Model model = new Model(
storage.getModelName(), modelColumns, extraQueryIndices, storage.isCapableOfTimeSeries(),
storage.isDeleteHistory(), scopeId,
storage.getDownsampling(), record
);
models.add(model);
return model;
}
private void retrieval(Class clazz, String modelName, List<ModelColumn> modelColumns) {
private void retrieval(Class clazz,
String modelName,
List<ModelColumn> modelColumns,
List<ExtraQueryIndex> extraQueryIndices) {
if (log.isDebugEnabled()) {
log.debug("Analysis {} to generate Model.", clazz.getName());
}
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
if (field.isAnnotationPresent(Column.class)) {
Column column = field.getAnnotation(Column.class);
modelColumns.add(new ModelColumn(new ColumnName(column.columnName()), field.getType(), column.matchQuery(), column
.content()));
if (logger.isDebugEnabled()) {
logger.debug("The field named {} with the {} type", column.columnName(), field.getType());
modelColumns.add(
new ModelColumn(
new ColumnName(modelName, column.columnName()), field.getType(), column.matchQuery(), column
.storageOnly(), column.isValue(), column.length()));
if (log.isDebugEnabled()) {
log.debug("The field named {} with the {} type", column.columnName(), field.getType());
}
if (column.isValue()) {
ValueColumnMetadata.INSTANCE.putIfAbsent(modelName, column.columnName(), column.function());
}
List<QueryUnifiedIndex> indexDefinitions = new ArrayList<>();
if (field.isAnnotationPresent(QueryUnifiedIndex.class)) {
indexDefinitions.add(field.getAnnotation(QueryUnifiedIndex.class));
}
if (field.isAnnotationPresent(MultipleQueryUnifiedIndex.class)) {
Collections.addAll(indexDefinitions, field.getAnnotation(MultipleQueryUnifiedIndex.class).value());
}
indexDefinitions.forEach(indexDefinition -> {
extraQueryIndices.add(new ExtraQueryIndex(
column.columnName(),
indexDefinition.withColumns()
));
});
}
}
if (Objects.nonNull(clazz.getSuperclass())) {
retrieval(clazz.getSuperclass(), modelName, modelColumns);
retrieval(clazz.getSuperclass(), modelName, modelColumns, extraQueryIndices);
}
}
@Override
public void overrideColumnName(String columnName, String newName) {
models.forEach(model -> model.getColumns().forEach(column -> {
ColumnName existColumnName = column.getColumnName();
String name = existColumnName.getName();
if (name.equals(columnName)) {
existColumnName.setStorageName(newName);
logger.debug("Model {} column {} has been override. The new column name is {}.", model.getName(), name, newName);
}
}));
models.forEach(model -> {
model.getColumns().forEach(column -> column.getColumnName().overrideName(columnName, newName));
model.getExtraQueryIndices().forEach(extraQueryIndex -> extraQueryIndex.overrideName(columnName, newName));
});
}
@Override
public List<Model> allModels() {
return models;
}
}
......@@ -32,7 +32,7 @@ import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
import org.apache.skywalking.oap.server.core.storage.model.IModelManager;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
......@@ -77,8 +77,8 @@ public enum DataTTLKeeperTimer {
}
log.info("Beginning to remove expired metrics from the storage.");
IModelGetter modelGetter = moduleManager.find(CoreModule.NAME).provider().getService(IModelGetter.class);
List<Model> models = modelGetter.getModels();
IModelManager modelGetter = moduleManager.find(CoreModule.NAME).provider().getService(IModelManager.class);
List<Model> models = modelGetter.allModels();
models.forEach(model -> {
if (model.isDeleteHistory()) {
execute(model);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage.type;
/**
* StorageDataComplexObject implementation supports String-Object interconversion.
*/
public interface StorageDataComplexObject {
/**
* @return string representing this object.
*/
String toStorageData();
/**
* Initialize this object based on the given string data.
*/
void toObject(String data);
/**
* Initialize the object based on the given source.
*/
void copyFrom(Object source);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core;
import org.junit.Assert;
import org.junit.Test;
public class CoreModuleTest {
@Test
public void testOpenServiceList() {
CoreModule coreModule = new CoreModule();
Assert.assertEquals(31, coreModule.services().length);
}
}
......@@ -16,13 +16,14 @@
*
*/
package org.apache.skywalking.oap.server.core.storage.type;
package org.apache.skywalking.oap.server.core;
public interface StorageDataType {
import org.junit.Assert;
import org.junit.Test;
String toStorageData();
void toObject(String data);
void copyFrom(Object source);
public class WorkPathTest {
@Test
public void testPath() {
Assert.assertTrue(WorkPath.getPath().exists());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage.model;
import org.junit.Assert;
import org.junit.Test;
public class ExtraQueryIndexTest {
@Test
public void testIndexColumns() {
final ExtraQueryIndex extraQueryIndex = new ExtraQueryIndex("a1", new String[] {"a2"});
Assert.assertArrayEquals(new String[] {
"a1",
"a2"
}, extraQueryIndex.getColumns());
}
@Test(expected = IllegalArgumentException.class)
public void testIllegalIndexColumns() {
ExtraQueryIndex extraQueryIndex = new ExtraQueryIndex("a1", new String[0]);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage.model;
import org.apache.skywalking.oap.server.core.analysis.metrics.IntKeyLongValueHashMap;
import org.junit.Assert;
import org.junit.Test;
public class ModelColumnTest {
@Test
public void testColumnDefine() {
ModelColumn column = new ModelColumn(new ColumnName("", "abc"), byte[].class, true,
false, true, 0
);
Assert.assertEquals(true, column.isStorageOnly());
Assert.assertEquals("abc", column.getColumnName().getName());
column = new ModelColumn(new ColumnName("", "abc"), IntKeyLongValueHashMap.class, true,
false, true, 200
);
Assert.assertEquals(true, column.isStorageOnly());
Assert.assertEquals("abc", column.getColumnName().getName());
Assert.assertEquals(0, column.getLength());
column = new ModelColumn(new ColumnName("", "abc"), String.class, true,
false, true, 200
);
Assert.assertEquals(false, column.isStorageOnly());
Assert.assertEquals("abc", column.getColumnName().getName());
}
@Test(expected = IllegalArgumentException.class)
public void testConflictDefinition() {
ModelColumn column = new ModelColumn(new ColumnName("", "abc"), String.class,
true, true, true, 200
);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage.model;
import java.util.List;
import java.util.Map;
import org.apache.skywalking.oap.server.core.analysis.Downsampling;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.QueryUnifiedIndex;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
@PrepareForTest({DefaultScopeDefine.class})
public class StorageModelsTest {
@BeforeClass
public static void setup() {
PowerMockito.mockStatic(DefaultScopeDefine.class);
PowerMockito.when(DefaultScopeDefine.nameOf(-1)).thenReturn("any");
}
@Test
public void testStorageModels() {
StorageModels models = new StorageModels();
models.add(TestModel.class, -1,
new Storage("StorageModelsTest", true, true, Downsampling.Hour),
false
);
final List<Model> allModules = models.allModels();
Assert.assertEquals(1, allModules.size());
final Model model = allModules.get(0);
Assert.assertEquals(4, model.getColumns().size());
Assert.assertEquals(false, model.getColumns().get(0).isStorageOnly());
Assert.assertEquals(false, model.getColumns().get(1).isStorageOnly());
Assert.assertEquals(false, model.getColumns().get(2).isStorageOnly());
Assert.assertEquals(true, model.getColumns().get(3).isStorageOnly());
final List<ExtraQueryIndex> extraQueryIndices = model.getExtraQueryIndices();
Assert.assertEquals(3, extraQueryIndices.size());
Assert.assertArrayEquals(new String[] {
"column2",
"column"
}, extraQueryIndices.get(2).getColumns());
}
@Stream(name = "StorageModelsTest", scopeId = -1, builder = TestModel.Builder.class, processor = MetricsStreamProcessor.class)
private static class TestModel {
@Column(columnName = "column")
private String column;
@Column(columnName = "column1")
@QueryUnifiedIndex(withColumns = {"column2"})
private String column1;
@Column(columnName = "column2")
@QueryUnifiedIndex(withColumns = {"column1"})
@QueryUnifiedIndex(withColumns = {"column"})
private String column2;
@Column(columnName = "column", storageOnly = true)
private String column4;
static class Builder implements StorageBuilder {
@Override
public StorageData map2Data(final Map dbMap) {
return null;
}
@Override
public Map<String, Object> data2Map(final StorageData storageData) {
return null;
}
}
}
}
......@@ -23,7 +23,7 @@ import java.util.Map;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.query.sql.Where;
import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataType;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
......@@ -62,8 +62,8 @@ public abstract class EsDAO extends AbstractDAO<ElasticSearchClient> {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
for (String key : objectMap.keySet()) {
Object value = objectMap.get(key);
if (value instanceof StorageDataType) {
builder.field(key, ((StorageDataType) value).toStorageData());
if (value instanceof StorageDataComplexObject) {
builder.field(key, ((StorageDataComplexObject) value).toStorageData());
} else {
builder.field(key, value);
}
......
......@@ -139,14 +139,12 @@ public class StorageEsInstaller extends ModelInstaller {
matchColumn.put("type", "text");
matchColumn.put("analyzer", "oap_analyzer");
properties.put(matchCName, matchColumn);
} else if (columnDefine.isContent()) {
Map<String, Object> column = new HashMap<>();
column.put("type", "text");
column.put("index", false);
properties.put(columnDefine.getColumnName().getName(), column);
} else {
Map<String, Object> column = new HashMap<>();
column.put("type", columnTypeEsMapping.transform(columnDefine.getType()));
if (columnDefine.isStorageOnly()) {
column.put("index", false);
}
properties.put(columnDefine.getColumnName().getName(), column);
}
}
......
......@@ -26,7 +26,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
import org.apache.skywalking.oap.server.core.query.entity.Attribute;
import org.apache.skywalking.oap.server.core.query.entity.Database;
......@@ -233,12 +232,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
.add(QueryBuilders.termQuery(EndpointTraffic.DETECT_POINT, DetectPoint.SERVER.value()));
sourceBuilder.query(boolQueryBuilder);
/**
* Query the dataset by a larger limit condition and distinct in the memory,
* in order to avoid the storage level distinct.
* This is a match query only, don't need 100% accurate.
*/
sourceBuilder.size(limit * 7);
sourceBuilder.size(limit);
SearchResponse response = getClient().search(EndpointTraffic.INDEX_NAME, sourceBuilder);
......@@ -254,9 +248,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
endpoints.add(endpoint);
}
final List<Endpoint> endpointList = endpoints.stream().distinct().collect(Collectors.toList());
return endpointList.size() > limit ? endpointList.subList(0, limit) : endpointList;
return endpoints;
}
@Override
......
......@@ -149,8 +149,10 @@ public class InfluxStorageProvider extends ModuleProvider {
ModelInstaller installer;
if (config.getMetabaseType().equalsIgnoreCase("h2")) {
installer = new InfluxDBH2MetaDBInstaller(getManager());
} else {
} else if (config.getMetabaseType().equalsIgnoreCase("mysql")) {
installer = new InfluxDBMySQLMetaDBInstaller(getManager());
} else {
throw new IllegalArgumentException("Unavailable metabase type, " + config.getMetabaseType());
}
installer.install(client);
new H2RegisterLockInstaller().install(client, lockDAO);
......
......@@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataType;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
......@@ -47,10 +47,10 @@ public class InfluxInsertRequest implements InsertRequest, UpdateRequest {
for (ModelColumn column : model.getColumns()) {
Object value = objectMap.get(column.getColumnName().getName());
if (value instanceof StorageDataType) {
if (value instanceof StorageDataComplexObject) {
fields.put(
column.getColumnName().getStorageName(),
((StorageDataType) value).toStorageData()
((StorageDataComplexObject) value).toStorageData()
);
} else {
fields.put(column.getColumnName().getStorageName(), value);
......
......@@ -33,7 +33,7 @@ import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataType;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
......@@ -81,8 +81,8 @@ public class MetricsDAO implements IMetricsDAO {
for (int i = 1; i < columns.size(); i++) {
Object value = values.get(i);
if (value instanceof StorageDataType) {
value = ((StorageDataType) value).toStorageData();
if (value instanceof StorageDataComplexObject) {
value = ((StorageDataComplexObject) value).toStorageData();
}
data.put(storageAndColumnNames.get(columns.get(i)), value);
......
......@@ -31,7 +31,6 @@ import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcess
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.IDColumn;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
@Stream(name = JaegerSpanRecord.INDEX_NAME, scopeId = DefaultScopeDefine.JAEGER_SPAN, builder = JaegerSpanRecord.Builder.class, processor = RecordStreamProcessor.class)
......@@ -53,62 +52,50 @@ public class JaegerSpanRecord extends Record {
@Setter
@Getter
@Column(columnName = TRACE_ID)
@IDColumn
private String traceId;
@Setter
@Getter
@Column(columnName = SPAN_ID)
@IDColumn
private String spanId;
@Setter
@Getter
@Column(columnName = SERVICE_ID)
@IDColumn
private int serviceId;
@Setter
@Getter
@Column(columnName = SERVICE_INSTANCE_ID)
@IDColumn
private int serviceInstanceId;
@Setter
@Getter
@Column(columnName = ENDPOINT_NAME, matchQuery = true)
@IDColumn
private String endpointName;
@Setter
@Getter
@Column(columnName = ENDPOINT_ID)
@IDColumn
private String endpointId;
@Setter
@Getter
@Column(columnName = START_TIME)
@IDColumn
private long startTime;
@Setter
@Getter
@Column(columnName = END_TIME)
@IDColumn
private long endTime;
@Setter
@Getter
@Column(columnName = LATENCY)
@IDColumn
private int latency;
@Setter
@Getter
@Column(columnName = IS_ERROR)
@IDColumn
private int isError;
@Setter
@Getter
@Column(columnName = DATA_BINARY)
@IDColumn
private byte[] dataBinary;
@Setter
@Getter
@Column(columnName = ENCODE)
@IDColumn
private int encode;
@Override
......
......@@ -252,12 +252,7 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
}
sql.append(" and ").append(EndpointTraffic.DETECT_POINT).append(" = ?");
condition.add(DetectPoint.SERVER.value());
/**
* Query the dataset by a larger limit condition and distinct in the memory,
* in order to avoid the storage level distinct.
* This is a match query only, don't need 100% accurate.
*/
sql.append(" limit ").append(limit * 7);
sql.append(" limit ").append(limit);
List<Endpoint> endpoints = new ArrayList<>();
try (Connection connection = h2Client.getConnection()) {
......@@ -324,8 +319,9 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
.getAsString());
for (String ipv4 : ipv4s) {
serviceInstance.getAttributes()
.add(new Attribute(ServiceInstanceInventory.PropertyUtil.IPV4S,
ipv4
.add(new Attribute(
ServiceInstanceInventory.PropertyUtil.IPV4S,
ipv4
));
}
} else {
......
......@@ -50,10 +50,6 @@ public class H2RegisterLockInstaller {
tableCreateSQL.appendLine("name VARCHAR(100)");
tableCreateSQL.appendLine(")");
if (logger.isDebugEnabled()) {
logger.debug("creating table: " + tableCreateSQL.toStringInNewLine());
}
try (Connection connection = h2Client.getConnection()) {
h2Client.execute(connection, tableCreateSQL.toString());
......@@ -67,9 +63,10 @@ public class H2RegisterLockInstaller {
}
private void putIfAbsent(JDBCHikariCPClient h2Client, Connection connection, int scopeId,
String scopeName) throws StorageException {
String scopeName) throws StorageException {
boolean existed = false;
try (ResultSet resultSet = h2Client.executeQuery(connection, "select 1 from " + LOCK_TABLE_NAME + " where id = " + scopeId)) {
try (ResultSet resultSet = h2Client.executeQuery(
connection, "select 1 from " + LOCK_TABLE_NAME + " where id = " + scopeId)) {
if (resultSet.next()) {
existed = true;
}
......@@ -77,7 +74,8 @@ public class H2RegisterLockInstaller {
throw new StorageException(e.getMessage(), e);
}
if (!existed) {
try (PreparedStatement statement = connection.prepareStatement("insert into " + LOCK_TABLE_NAME + "(id, sequence, name) values (?, ?, ?)")) {
try (PreparedStatement statement = connection.prepareStatement(
"insert into " + LOCK_TABLE_NAME + "(id, sequence, name) values (?, ?, ?)")) {
statement.setInt(1, scopeId);
statement.setInt(2, 1);
statement.setString(3, scopeName);
......
......@@ -31,7 +31,7 @@ import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataType;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.ArrayParamBuilder;
......@@ -138,8 +138,8 @@ public class H2SQLExecutor {
}
Object value = objectMap.get(column.getColumnName().getName());
if (value instanceof StorageDataType) {
param.add(((StorageDataType) value).toStorageData());
if (value instanceof StorageDataComplexObject) {
param.add(((StorageDataComplexObject) value).toStorageData());
} else {
param.add(value);
}
......@@ -164,8 +164,8 @@ public class H2SQLExecutor {
}
Object value = objectMap.get(column.getColumnName().getName());
if (value instanceof StorageDataType) {
param.add(((StorageDataType) value).toStorageData());
if (value instanceof StorageDataComplexObject) {
param.add(((StorageDataComplexObject) value).toStorageData());
} else {
param.add(value);
}
......
......@@ -19,12 +19,9 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.analysis.metrics.IntKeyLongValueHashMap;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.model.ColumnName;
import org.apache.skywalking.oap.server.core.storage.model.Model;
......@@ -49,55 +46,43 @@ public class H2TableInstaller extends ModelInstaller {
@Override
protected boolean isExists(Client client, Model model) throws StorageException {
TableMetaInfo.addModel(model);
JDBCHikariCPClient h2Client = (JDBCHikariCPClient) client;
try (Connection conn = h2Client.getConnection()) {
try (ResultSet rset = conn.getMetaData().getTables(null, null, model.getName(), null)) {
if (rset.next()) {
return true;
}
}
} catch (SQLException e) {
throw new StorageException(e.getMessage(), e);
} catch (JDBCClientException e) {
throw new StorageException(e.getMessage(), e);
}
return false;
TableMetaInfo.addModel(model);
return false;
}
@Override
protected void createTable(Client client, Model model) throws StorageException {
JDBCHikariCPClient h2Client = (JDBCHikariCPClient) client;
SQLBuilder tableCreateSQL = new SQLBuilder("CREATE TABLE IF NOT EXISTS " + model.getName() + " (");
tableCreateSQL.appendLine("id VARCHAR(300) PRIMARY KEY, ");
for (int i = 0; i < model.getColumns().size(); i++) {
ModelColumn column = model.getColumns().get(i);
ColumnName name = column.getColumnName();
tableCreateSQL.appendLine(
name.getStorageName() + " " + getColumnType(model, name, column.getType()) + (i != model
.getColumns()
.size() - 1 ? "," : ""));
}
tableCreateSQL.appendLine(")");
JDBCHikariCPClient jdbcHikariCPClient = (JDBCHikariCPClient) client;
try (Connection connection = jdbcHikariCPClient.getConnection()) {
SQLBuilder tableCreateSQL = new SQLBuilder("CREATE TABLE IF NOT EXISTS " + model.getName() + " (");
tableCreateSQL.appendLine("id VARCHAR(300) PRIMARY KEY, ");
for (int i = 0; i < model.getColumns().size(); i++) {
ModelColumn column = model.getColumns().get(i);
ColumnName name = column.getColumnName();
tableCreateSQL.appendLine(
name.getStorageName() + " " + getColumnType(column) + (i != model
.getColumns()
.size() - 1 ? "," : ""));
}
tableCreateSQL.appendLine(")");
if (log.isDebugEnabled()) {
log.debug("creating table: " + tableCreateSQL.toStringInNewLine());
}
if (log.isDebugEnabled()) {
log.debug("creating table: " + tableCreateSQL.toStringInNewLine());
}
try (Connection connection = h2Client.getConnection()) {
h2Client.execute(connection, tableCreateSQL.toString());
} catch (JDBCClientException e) {
throw new StorageException(e.getMessage(), e);
} catch (SQLException e) {
jdbcHikariCPClient.execute(connection, tableCreateSQL.toString());
createTableIndexes(jdbcHikariCPClient, connection, model);
} catch (JDBCClientException | SQLException e) {
throw new StorageException(e.getMessage(), e);
}
}
/**
* Set up the data type mapping between Java type and H2 database type
*/
protected String getColumnType(Model model, ColumnName name, Class<?> type) {
protected String getColumnType(ModelColumn column) {
final Class<?> type = column.getType();
if (Integer.class.equals(type) || int.class.equals(type)) {
return "INT";
} else if (Long.class.equals(type) || long.class.equals(type)) {
......@@ -105,18 +90,26 @@ public class H2TableInstaller extends ModelInstaller {
} else if (Double.class.equals(type) || double.class.equals(type)) {
return "DOUBLE";
} else if (String.class.equals(type)) {
return "VARCHAR(2000)";
return "VARCHAR(" + column.getLength() + ")";
} else if (IntKeyLongValueHashMap.class.equals(type)) {
return "VARCHAR(20000)";
return "MEDIUMTEXT";
} else if (byte[].class.equals(type)) {
if (DefaultScopeDefine.SEGMENT == model.getScopeId()) {
if (name.getName().equals(SegmentRecord.DATA_BINARY)) {
return "MEDIUMTEXT";
}
}
return "VARCHAR(20000)";
return "MEDIUMTEXT";
} else {
throw new IllegalArgumentException("Unsupported data type: " + type.getName());
}
}
protected void createTableIndexes(JDBCHikariCPClient client,
Connection connection,
Model model) throws JDBCClientException {
}
protected void createIndex(JDBCHikariCPClient client, Connection connection, Model model,
SQLBuilder indexSQL) throws JDBCClientException {
if (log.isDebugEnabled()) {
log.debug("create index for table {}, sql: {} ", model.getName(), indexSQL.toStringInNewLine());
}
client.execute(connection, indexSQL.toString());
}
}
......@@ -19,33 +19,21 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.analysis.metrics.IntKeyLongValueHashMap;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.model.ColumnName;
import org.apache.skywalking.oap.server.core.storage.model.ExtraQueryIndex;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLBuilder;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TableInstaller;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.ALARM;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.NETWORK_ADDRESS;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROFILE_TASK_LOG;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROFILE_TASK_SEGMENT_SNAPSHOT;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SEGMENT;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SERVICE_INSTANCE_INVENTORY;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SERVICE_INVENTORY;
/**
* Extend H2TableInstaller but match MySQL SQL syntax.
*/
......@@ -61,228 +49,56 @@ public class MySQLTableInstaller extends H2TableInstaller {
}
@Override
protected void createTable(Client client, Model model) throws StorageException {
super.createTable(client, model);
JDBCHikariCPClient jdbcHikariCPClient = (JDBCHikariCPClient) client;
this.createIndexes(jdbcHikariCPClient, model);
}
/**
* Based on MySQL features, provide a specific data type mappings.
*/
@Override
protected String getColumnType(Model model, ColumnName name, Class<?> type) {
if (Integer.class.equals(type) || int.class.equals(type)) {
return "INT";
} else if (Long.class.equals(type) || long.class.equals(type)) {
return "BIGINT";
} else if (Double.class.equals(type) || double.class.equals(type)) {
return "DOUBLE";
} else if (String.class.equals(type)) {
if (name.getName().equals(SegmentRecord.TRACE_ID) || name.getName().equals(SegmentRecord.SEGMENT_ID)) {
return "VARCHAR(150)";
}
if (Metrics.ENTITY_ID.equals(name.getName())) {
return "VARCHAR(512)";
}
if (SegmentRecord.ENDPOINT_NAME.equals(name.getName()) || SegmentRecord.ENDPOINT_ID.equals(
name.getName())) {
return "VARCHAR(200)";
}
if (PROFILE_TASK_LOG == model.getScopeId() || PROFILE_TASK_SEGMENT_SNAPSHOT == model.getScopeId()) {
if (name.getName().equals(ProfileTaskLogRecord.TASK_ID)) {
return "VARCHAR(300)";
protected boolean isExists(Client client, Model model) throws StorageException {
TableMetaInfo.addModel(model);
JDBCHikariCPClient h2Client = (JDBCHikariCPClient) client;
try (Connection conn = h2Client.getConnection()) {
try (ResultSet rset = conn.getMetaData().getTables(null, null, model.getName(), null)) {
if (rset.next()) {
return true;
}
}
return "VARCHAR(2000)";
} else if (IntKeyLongValueHashMap.class.equals(type)) {
return "MEDIUMTEXT";
} else if (byte[].class.equals(type)) {
return "MEDIUMTEXT";
} else {
throw new IllegalArgumentException("Unsupported data type: " + type.getName());
}
}
/**
* Create indexes of all tables. Due to MySQL storage is suitable for middle size use case and also compatible with
* TiDB users, Indexes are required for the UI query.
*
* Based on different Model, provide different index creation strategy.
*/
protected void createIndexes(JDBCHikariCPClient client, Model model) throws StorageException {
switch (model.getScopeId()) {
case SERVICE_INVENTORY:
case SERVICE_INSTANCE_INVENTORY:
case NETWORK_ADDRESS:
createInventoryIndexes(client, model);
return;
case SEGMENT:
createSegmentIndexes(client, model);
return;
case ALARM:
createAlarmIndexes(client, model);
return;
case PROFILE_TASK_LOG:
createProfileLogIndexes(client, model);
return;
case PROFILE_TASK_SEGMENT_SNAPSHOT:
createProfileThreadSnapshotIndexes(client, model);
return;
default:
createIndexesForAllMetrics(client, model);
}
}
private void createProfileThreadSnapshotIndexes(JDBCHikariCPClient client, Model model) throws StorageException {
try (Connection connection = client.getConnection()) {
// query by task id, sequence
SQLBuilder tableIndexSQL = new SQLBuilder("CREATE INDEX ");
tableIndexSQL.append(model.getName().toUpperCase()).append("_A ");
tableIndexSQL.append("ON ")
.append(model.getName())
.append("(")
.append(ProfileThreadSnapshotRecord.TASK_ID)
.append(", ")
.append(ProfileThreadSnapshotRecord.SEQUENCE)
.append(")");
createIndex(client, connection, model, tableIndexSQL);
// query by segment id, sequence
tableIndexSQL = new SQLBuilder("CREATE INDEX ");
tableIndexSQL.append(model.getName().toUpperCase()).append("_SEGMENT_ID_SEQUENCE ");
tableIndexSQL.append("ON ")
.append(model.getName())
.append("(")
.append(ProfileThreadSnapshotRecord.SEGMENT_ID)
.append(", ")
.append(ProfileThreadSnapshotRecord.SEQUENCE)
.append(")");
createIndex(client, connection, model, tableIndexSQL);
// query by segment id, dump time
tableIndexSQL = new SQLBuilder("CREATE INDEX ");
tableIndexSQL.append(model.getName().toUpperCase()).append("_SEGMENT_ID_DUMP_TIME ");
tableIndexSQL.append("ON ")
.append(model.getName())
.append("(")
.append(ProfileThreadSnapshotRecord.SEGMENT_ID)
.append(", ")
.append(ProfileThreadSnapshotRecord.DUMP_TIME)
.append(")");
createIndex(client, connection, model, tableIndexSQL);
} catch (JDBCClientException | SQLException e) {
throw new StorageException(e.getMessage(), e);
}
}
private void createProfileLogIndexes(JDBCHikariCPClient client, Model model) throws StorageException {
try (Connection connection = client.getConnection()) {
// query by task id
SQLBuilder tableIndexSQL = new SQLBuilder("CREATE INDEX ");
tableIndexSQL.append(model.getName().toUpperCase()).append("_TASK_ID ");
tableIndexSQL.append("ON ")
.append(model.getName())
.append("(")
.append(ProfileTaskLogRecord.TASK_ID)
.append(")");
createIndex(client, connection, model, tableIndexSQL);
} catch (JDBCClientException | SQLException e) {
throw new StorageException(e.getMessage(), e);
}
}
private void createIndexesForAllMetrics(JDBCHikariCPClient client, Model model) throws StorageException {
try (Connection connection = client.getConnection()) {
SQLBuilder tableIndexSQL = new SQLBuilder("CREATE INDEX ");
tableIndexSQL.append(model.getName().toUpperCase()).append("_TIME_BUCKET ");
tableIndexSQL.append("ON ")
.append(model.getName())
.append("(")
.append(Metrics.TIME_BUCKET)
.append(")");
createIndex(client, connection, model, tableIndexSQL);
} catch (JDBCClientException | SQLException e) {
} catch (SQLException | JDBCClientException e) {
throw new StorageException(e.getMessage(), e);
}
return false;
}
private void createAlarmIndexes(JDBCHikariCPClient client, Model model) throws StorageException {
try (Connection connection = client.getConnection()) {
SQLBuilder tableIndexSQL = new SQLBuilder("CREATE INDEX ");
tableIndexSQL.append(model.getName().toUpperCase()).append("_TIME_BUCKET ");
tableIndexSQL.append("ON ")
.append(model.getName())
.append("(")
.append(AlarmRecord.TIME_BUCKET)
.append(")");
createIndex(client, connection, model, tableIndexSQL);
} catch (JDBCClientException | SQLException e) {
throw new StorageException(e.getMessage(), e);
@Override
protected void createTableIndexes(JDBCHikariCPClient client,
Connection connection,
Model model) throws JDBCClientException {
int indexSeq = 0;
for (final ModelColumn modelColumn : model.getColumns()) {
if (!modelColumn.isStorageOnly()) {
SQLBuilder tableIndexSQL = new SQLBuilder("CREATE INDEX ");
tableIndexSQL.append(model.getName().toUpperCase())
.append("_")
.append(String.valueOf(indexSeq++))
.append("_IDX ");
tableIndexSQL.append("ON ").append(model.getName()).append("(")
.append(modelColumn.getColumnName().getStorageName())
.append(")");
createIndex(client, connection, model, tableIndexSQL);
}
}
}
private void createSegmentIndexes(JDBCHikariCPClient client, Model model) throws StorageException {
try (Connection connection = client.getConnection()) {
for (final ExtraQueryIndex extraQueryIndex : model.getExtraQueryIndices()) {
SQLBuilder tableIndexSQL = new SQLBuilder("CREATE INDEX ");
tableIndexSQL.append(model.getName().toUpperCase()).append("_TRACE_ID ");
tableIndexSQL.append("ON ").append(model.getName()).append("(").append(SegmentRecord.TRACE_ID).append(")");
createIndex(client, connection, model, tableIndexSQL);
tableIndexSQL = new SQLBuilder("CREATE INDEX ");
tableIndexSQL.append(model.getName().toUpperCase()).append("_ENDPOINT_ID ");
tableIndexSQL.append("ON ")
.append(model.getName())
.append("(")
.append(SegmentRecord.ENDPOINT_ID)
.append(")");
createIndex(client, connection, model, tableIndexSQL);
tableIndexSQL = new SQLBuilder("CREATE INDEX ");
tableIndexSQL.append(model.getName().toUpperCase()).append("_LATENCY ");
tableIndexSQL.append("ON ").append(model.getName()).append("(").append(SegmentRecord.LATENCY).append(")");
createIndex(client, connection, model, tableIndexSQL);
tableIndexSQL = new SQLBuilder("CREATE INDEX ");
tableIndexSQL.append(model.getName().toUpperCase()).append("_TIME_BUCKET ");
tableIndexSQL.append("ON ")
.append(model.getName())
.append("(")
.append(SegmentRecord.TIME_BUCKET)
.append(")");
createIndex(client, connection, model, tableIndexSQL);
} catch (JDBCClientException | SQLException e) {
throw new StorageException(e.getMessage(), e);
}
}
private void createInventoryIndexes(JDBCHikariCPClient client, Model model) throws StorageException {
try (Connection connection = client.getConnection()) {
SQLBuilder tableIndexSQL = new SQLBuilder("CREATE UNIQUE INDEX ");
tableIndexSQL.append(model.getName().toUpperCase()).append("_SEQ ");
tableIndexSQL.append("ON ").append(model.getName()).append("(").append(RegisterSource.SEQUENCE).append(")");
createIndex(client, connection, model, tableIndexSQL);
tableIndexSQL = new SQLBuilder("CREATE INDEX ");
tableIndexSQL.append(model.getName().toUpperCase()).append("_TIME ");
tableIndexSQL.append("ON ")
.append(model.getName())
.append("(")
.append(RegisterSource.HEARTBEAT_TIME)
.append(", ")
.append(RegisterSource.REGISTER_TIME)
.append(")");
tableIndexSQL.append(model.getName().toUpperCase())
.append("_")
.append(String.valueOf(indexSeq++))
.append("_IDX ");
tableIndexSQL.append(" ON ").append(model.getName()).append("(");
final String[] columns = extraQueryIndex.getColumns();
for (int i = 0; i < columns.length; i++) {
tableIndexSQL.append(columns[i]);
if (i < columns.length - 1) {
tableIndexSQL.append(",");
}
}
tableIndexSQL.append(")");
createIndex(client, connection, model, tableIndexSQL);
} catch (JDBCClientException | SQLException e) {
throw new StorageException(e.getMessage(), e);
}
}
private void createIndex(JDBCHikariCPClient client, Connection connection, Model model,
SQLBuilder indexSQL) throws JDBCClientException {
if (log.isDebugEnabled()) {
log.debug("create index for table {}, sql: {} ", model.getName(), indexSQL.toStringInNewLine());
}
client.execute(connection, indexSQL.toString());
}
}
......@@ -31,7 +31,6 @@ import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcess
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.annotation.IDColumn;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
@Stream(name = ZipkinSpanRecord.INDEX_NAME, scopeId = DefaultScopeDefine.ZIPKIN_SPAN, builder = ZipkinSpanRecord.Builder.class, processor = RecordStreamProcessor.class)
......@@ -53,62 +52,50 @@ public class ZipkinSpanRecord extends Record {
@Setter
@Getter
@Column(columnName = TRACE_ID)
@IDColumn
private String traceId;
@Setter
@Getter
@Column(columnName = SPAN_ID)
@IDColumn
private String spanId;
@Setter
@Getter
@Column(columnName = SERVICE_ID)
@IDColumn
private int serviceId;
@Setter
@Getter
@Column(columnName = SERVICE_INSTANCE_ID)
@IDColumn
private int serviceInstanceId;
@Setter
@Getter
@Column(columnName = ENDPOINT_NAME, matchQuery = true)
@IDColumn
private String endpointName;
@Setter
@Getter
@Column(columnName = ENDPOINT_ID)
@IDColumn
private String endpointId;
@Setter
@Getter
@Column(columnName = START_TIME)
@IDColumn
private long startTime;
@Setter
@Getter
@Column(columnName = END_TIME)
@IDColumn
private long endTime;
@Setter
@Getter
@Column(columnName = LATENCY)
@IDColumn
private int latency;
@Setter
@Getter
@Column(columnName = IS_ERROR)
@IDColumn
private int isError;
@Setter
@Getter
@Column(columnName = DATA_BINARY)
@IDColumn
private byte[] dataBinary;
@Setter
@Getter
@Column(columnName = ENCODE)
@IDColumn
private int encode;
@Override
......
......@@ -54,9 +54,9 @@ import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
import org.apache.skywalking.oap.server.core.storage.model.IModelManager;
import org.apache.skywalking.oap.server.core.storage.model.IModelOverride;
import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
import org.apache.skywalking.oap.server.core.storage.model.INewModel;
import org.apache.skywalking.oap.server.core.storage.model.StorageModels;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
......@@ -129,8 +129,8 @@ public class MockCoreModuleProvider extends CoreModuleProvider {
this.registerServiceImplementation(IWorkerInstanceSetter.class, instancesService);
this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager()));
this.registerServiceImplementation(IModelSetter.class, storageModels);
this.registerServiceImplementation(IModelGetter.class, storageModels);
this.registerServiceImplementation(INewModel.class, storageModels);
this.registerServiceImplementation(IModelManager.class, storageModels);
this.registerServiceImplementation(IModelOverride.class, storageModels);
this.registerServiceImplementation(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册