未验证 提交 7f7e96b0 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

[IMPORTANT] Query traces with tags as condition (#5270)

上级 5a2c91cd
......@@ -29,6 +29,7 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | serviceNameMaxLength| Max length limitation of service name.|SW_SERVICE_NAME_MAX_LENGTH|70|
| - | - | instanceNameMaxLength| Max length limitation of service instance name. The max length of service + instance names should be less than 200.|SW_INSTANCE_NAME_MAX_LENGTH|70|
| - | - | endpointNameMaxLength| Max length limitation of endpoint name. The max length of service + endpoint names should be less than 240.|SW_ENDPOINT_NAME_MAX_LENGTH|150|
| - | - | searchableTracesTags | Define the set of span tag keys, which should be searchable through the GraphQL. Multiple values should be separated through the comma. | SW_SEARCHABLE_TAG_KEYS | http.method,status_code,db.type,db.instance,mq.queue,mq.topic,mq.broker|
| - | - | gRPCThreadPoolSize|Pool size of gRPC server| - | CPU core * 4|
| - | - | gRPCThreadPoolQueueSize| The queue size of gRPC server| - | 10000|
| - | - | maxConcurrentCallsPerConnection | The maximum number of concurrent calls permitted for each incoming connection. Defaults to no limit. | - | - |
......@@ -113,10 +114,14 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | url | H2 connection URL. Default is H2 memory mode | SW_STORAGE_H2_URL | jdbc:h2:mem:skywalking-oap-db |
| - | - | user | User name of H2 database. | SW_STORAGE_H2_USER | sa |
| - | - | password | Password of H2 database. | - | - |
| - | - | metadataQueryMaxSize | The max size of metadata per query. | - | 5000 |
| - | - | metadataQueryMaxSize | The max size of metadata per query. | SW_STORAGE_H2_QUERY_MAX_SIZE | 5000 |
| - | - | maxSizeOfArrayColumn | Some entities, such as trace segment, include the logic column with multiple values. In the H2, we use multiple physical columns to host the values, such as, Change column_a with values [1,2,3,4,5] to `column_a_0 = 1, column_a_1 = 2, column_a_2 = 3 , column_a_3 = 4, column_a_4 = 5` | SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN | 20 |
| - | - | numOfSearchableValuesPerTag | In a trace segment, it includes multiple spans with multiple tags. Different spans could have same tag keys, such as multiple HTTP exit spans all have their own `http.method` tag. This configuration set the limitation of max num of values for the same tag key. | SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG | 2 |
| - |mysql| - | MySQL Storage. The MySQL JDBC Driver is not in the dist, please copy it into oap-lib folder manually | - | - |
| - | - | properties | Hikari connection pool configurations | - | Listed in the `application.yaml`. |
| - | - | metadataQueryMaxSize | The max size of metadata per query. | - | 5000 |
| - | - | metadataQueryMaxSize | The max size of metadata per query. | SW_STORAGE_MYSQL_QUERY_MAX_SIZE | 5000 |
| - | - | maxSizeOfArrayColumn | Some entities, such as trace segment, include the logic column with multiple values. In the MySQL, we use multiple physical columns to host the values, such as, Change column_a with values [1,2,3,4,5] to `column_a_0 = 1, column_a_1 = 2, column_a_2 = 3 , column_a_3 = 4, column_a_4 = 5` | SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN | 20 |
| - | - | numOfSearchableValuesPerTag | In a trace segment, it includes multiple spans with multiple tags. Different spans could have same tag keys, such as multiple HTTP exit spans all have their own `http.method` tag. This configuration set the limitation of max num of values for the same tag key. | SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG | 2 |
| - |influxdb| - | InfluxDB storage. |- | - |
| - | - | url| InfluxDB connection URL. | SW_STORAGE_INFLUXDB_URL | http://localhost:8086|
| - | - | user | User name of InfluxDB. | SW_STORAGE_INFLUXDB_USER | root|
......
......@@ -18,22 +18,26 @@
package org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener;
import java.util.Arrays;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.apm.network.language.agent.v3.SpanObject;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleConfig;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.NodeType;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.source.Segment;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleConfig;
/**
* SegmentSpanListener forwards the segment raw data to the persistence layer with the query required conditions.
......@@ -44,6 +48,7 @@ public class SegmentAnalysisListener implements FirstAnalysisListener, EntryAnal
private final SourceReceiver sourceReceiver;
private final TraceSegmentSampler sampler;
private final NamingControl namingControl;
private final List<String> searchableTagKeys;
private final Segment segment = new Segment();
private SAMPLE_STATUS sampleStatus = SAMPLE_STATUS.UNKNOWN;
......@@ -96,7 +101,6 @@ public class SegmentAnalysisListener implements FirstAnalysisListener, EntryAnal
serviceId,
endpointName
);
}
@Override
......@@ -144,11 +148,21 @@ public class SegmentAnalysisListener implements FirstAnalysisListener, EntryAnal
if (!isError && span.getIsError()) {
isError = true;
}
appendSearchableTags(span);
});
final long accurateDuration = endTimestamp - startTimestamp;
duration = accurateDuration > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) accurateDuration;
}
private void appendSearchableTags(SpanObject span) {
span.getTagsList().forEach(tag -> {
if (searchableTagKeys.contains(tag.getKey())) {
segment.getTags().add(new SpanTag(tag.getKey(), tag.getValue()));
}
});
}
@Override
public void build() {
if (log.isDebugEnabled()) {
......@@ -173,9 +187,14 @@ public class SegmentAnalysisListener implements FirstAnalysisListener, EntryAnal
private final SourceReceiver sourceReceiver;
private final TraceSegmentSampler sampler;
private final NamingControl namingControl;
private final List<String> searchTagKeys;
public Factory(ModuleManager moduleManager, AnalyzerModuleConfig config) {
this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
final ConfigService configService = moduleManager.find(CoreModule.NAME)
.provider()
.getService(ConfigService.class);
this.searchTagKeys = Arrays.asList(configService.getSearchableTracesTags().split(Const.COMMA));
this.sampler = new TraceSegmentSampler(config.getTraceSampleRateWatcher());
this.namingControl = moduleManager.find(CoreModule.NAME)
.provider()
......@@ -184,7 +203,12 @@ public class SegmentAnalysisListener implements FirstAnalysisListener, EntryAnal
@Override
public AnalysisListener create(ModuleManager moduleManager, AnalyzerModuleConfig config) {
return new SegmentAnalysisListener(sourceReceiver, sampler, namingControl);
return new SegmentAnalysisListener(
sourceReceiver,
sampler,
namingControl,
searchTagKeys
);
}
}
}
......@@ -89,6 +89,8 @@ core:
instanceNameMaxLength: ${SW_INSTANCE_NAME_MAX_LENGTH:70}
# The max length of service + endpoint names should be less than 240
endpointNameMaxLength: ${SW_ENDPOINT_NAME_MAX_LENGTH:150}
# Define the set of span tag keys, which should be searchable through the GraphQL.
searchableTagKeys: ${SW_SEARCHABLE_TAG_KEYS:http.method,status_code,db.type,db.instance,mq.queue,mq.topic,mq.broker}
storage:
selector: ${SW_STORAGE:h2}
elasticsearch:
......@@ -140,6 +142,8 @@ storage:
url: ${SW_STORAGE_H2_URL:jdbc:h2:mem:skywalking-oap-db}
user: ${SW_STORAGE_H2_USER:sa}
metadataQueryMaxSize: ${SW_STORAGE_H2_QUERY_MAX_SIZE:5000}
maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
mysql:
properties:
jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:3306/swtest"}
......@@ -150,6 +154,8 @@ storage:
dataSource.prepStmtCacheSqlLimit: ${SW_DATA_SOURCE_PREP_STMT_CACHE_SQL_LIMIT:2048}
dataSource.useServerPrepStmts: ${SW_DATA_SOURCE_USE_SERVER_PREP_STMTS:true}
metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
influxdb:
# InfluxDB configuration
url: ${SW_STORAGE_INFLUXDB_URL:http://localhost:8086}
......@@ -229,7 +235,7 @@ kafka-fetcher:
receiver-meter:
selector: ${SW_RECEIVER_METER:-}
default:
receiver-oc:
selector: ${SW_OC_RECEIVER:-}
default:
......
......@@ -27,6 +27,7 @@ public class Const {
public static final String RELATION_ID_CONNECTOR = "-";
public static final String RELATION_ID_PARSER_SPLIT = "\\-";
public static final String LINE = "-";
public static final String COMMA = ",";
public static final String SPACE = " ";
public static final String KEY_VALUE_SPLIT = ",";
public static final String ARRAY_SPLIT = "|";
......
......@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.core;
import java.util.ArrayList;
import java.util.List;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.source.ScopeDefaultColumn;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
......@@ -116,6 +117,14 @@ public class CoreModuleConfig extends ModuleConfig {
* In the current practice, we don't recommend the length over 190.
*/
private int endpointNameMaxLength = 150;
/**
* Define the set of span tag keys, which should be searchable through the GraphQL.
*
* @since 8.2.0
*/
@Setter
@Getter
private String searchableTracesTags = DEFAULT_SEARCHABLE_TAG_KEYS;
public CoreModuleConfig() {
this.downsampling = new ArrayList<>();
......@@ -142,4 +151,19 @@ public class CoreModuleConfig extends ModuleConfig {
*/
Aggregator
}
/**
* SkyWalking Java Agent provides the recommended tag keys for other language agents or SDKs. This field declare the
* recommended keys should be searchable.
*/
private static final String DEFAULT_SEARCHABLE_TAG_KEYS = String.join(
Const.COMMA,
"http.method",
"status_code",
"db.type",
"db.instance",
"mq.queue",
"mq.topic",
"mq.broker"
);
}
......@@ -40,6 +40,8 @@ public class SegmentDispatcher implements SourceDispatcher<Segment> {
segment.setDataBinary(source.getDataBinary());
segment.setTimeBucket(source.getTimeBucket());
segment.setVersion(source.getVersion());
segment.setTagsRawData(source.getTags());
segment.setTags(SpanTag.Util.toStringList(source.getTags()));
RecordStreamProcessor.getInstance().in(segment);
}
......
......@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.core.analysis.manual.segment;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import joptsimple.internal.Strings;
import lombok.Getter;
......@@ -53,6 +54,7 @@ public class SegmentRecord extends Record {
public static final String IS_ERROR = "is_error";
public static final String DATA_BINARY = "data_binary";
public static final String VERSION = "version";
public static final String TAGS = "tags";
@Setter
@Getter
......@@ -106,6 +108,17 @@ public class SegmentRecord extends Record {
@Getter
@Column(columnName = VERSION, storageOnly = true)
private int version;
@Setter
@Getter
@Column(columnName = TAGS)
private List<String> tags;
/**
* Tags raw data is a duplicate field of {@link #tags}. Some storage don't support array values in a single column.
* Then, those implementations could use this raw data to generate necessary data structures.
*/
@Setter
@Getter
private List<SpanTag> tagsRawData;
@Override
public String id() {
......@@ -139,6 +152,7 @@ public class SegmentRecord extends Record {
map.put(DATA_BINARY, new String(Base64.getEncoder().encode(storageData.getDataBinary())));
}
map.put(VERSION, storageData.getVersion());
map.put(TAGS, storageData.getTags());
return map;
}
......@@ -163,6 +177,7 @@ public class SegmentRecord extends Record {
record.setDataBinary(Base64.getDecoder().decode((String) dbMap.get(DATA_BINARY)));
}
record.setVersion(((Number) dbMap.get(VERSION)).intValue());
// Don't read the tags as they has been in the data binary already.
return record;
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.manual.segment;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
@Getter
@Setter
public class SpanTag {
private String key;
private String value;
public SpanTag() {
}
public SpanTag(String key, String value) {
this.key = key;
this.value = value;
}
@Override
public String toString() {
return key + "=" + value;
}
public static class Util {
public static List<String> toStringList(List<SpanTag> list) {
if (CollectionUtils.isEmpty(list)) {
return Collections.emptyList();
}
List<String> result = new ArrayList<>(list.size());
list.forEach(e -> result.add(e.toString()));
return result;
}
}
}
......@@ -26,9 +26,11 @@ import org.apache.skywalking.oap.server.library.module.Service;
public class ConfigService implements Service {
private final String gRPCHost;
private final int gRPCPort;
private final String searchableTracesTags;
public ConfigService(CoreModuleConfig moduleConfig) {
this.gRPCHost = moduleConfig.getGRPCHost();
this.gRPCPort = moduleConfig.getGRPCPort();
this.searchableTracesTags = moduleConfig.getSearchableTracesTags();
}
}
......@@ -27,6 +27,7 @@ import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag;
import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
import org.apache.skywalking.oap.server.core.query.type.KeyValue;
import org.apache.skywalking.oap.server.core.query.type.LogEntity;
......@@ -83,13 +84,13 @@ public class TraceQueryService implements Service {
final QueryOrder queryOrder,
final Pagination paging,
final long startTB,
final long endTB) throws IOException {
final long endTB,
final List<SpanTag> tags) throws IOException {
PaginationUtils.Page page = PaginationUtils.INSTANCE.exchange(paging);
return getTraceQueryDAO().queryBasicTraces(
startTB, endTB, minTraceDuration, maxTraceDuration, endpointName, serviceId, serviceInstanceId, endpointId,
traceId, page
.getLimit(), page.getFrom(), traceState, queryOrder
traceId, page.getLimit(), page.getFrom(), traceState, queryOrder, tags
);
}
......
......@@ -18,8 +18,10 @@
package org.apache.skywalking.oap.server.core.query.input;
import java.util.List;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag;
import org.apache.skywalking.oap.server.core.query.type.Pagination;
import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
import org.apache.skywalking.oap.server.core.query.type.TraceState;
......@@ -38,4 +40,5 @@ public class TraceQueryCondition {
private TraceState traceState;
private QueryOrder queryOrder;
private Pagination paging;
private List<SpanTag> tags;
}
......@@ -18,8 +18,11 @@
package org.apache.skywalking.oap.server.core.source;
import java.util.ArrayList;
import java.util.List;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SEGMENT;
......@@ -72,4 +75,7 @@ public class Segment extends Source {
@Setter
@Getter
private int version;
@Setter
@Getter
private List<SpanTag> tags = new ArrayList<>();
}
......@@ -18,7 +18,11 @@
package org.apache.skywalking.oap.server.core.storage.model;
public interface DataTypeMapping {
import java.lang.reflect.Type;
String transform(Class<?> type);
public interface DataTypeMapping {
/**
* Map the given typd and genericType of the field to the column type.
*/
String transform(Class<?> type, Type genericType);
}
......@@ -18,7 +18,7 @@
package org.apache.skywalking.oap.server.core.storage.model;
import com.google.gson.JsonObject;
import java.lang.reflect.Type;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
......@@ -26,28 +26,23 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
public class ModelColumn {
private final ColumnName columnName;
private final Class<?> type;
private final Type genericType;
private final boolean matchQuery;
private final boolean storageOnly;
private final int length;
public ModelColumn(ColumnName columnName,
Class<?> type,
Type genericType,
boolean matchQuery,
boolean storageOnly,
boolean isValue,
int length) {
this.columnName = columnName;
this.type = type;
this.genericType = genericType;
this.matchQuery = matchQuery;
/*
* Only accept length in the String/JsonObject definition.
*/
if (type.equals(String.class) || type.equals(JsonObject.class)) {
this.length = length;
} else {
this.length = 0;
}
this.length = length;
/*
* byte[] and {@link IntKeyLongValueHashMap} could never be query.
*/
......
......@@ -122,8 +122,8 @@ public class StorageModels implements IModelManager, ModelCreator, ModelManipula
}
modelColumns.add(
new ModelColumn(
new ColumnName(modelName, column.columnName()), field.getType(), column.matchQuery(),
column.storageOnly(), column.dataType().isValue(), columnLength
new ColumnName(modelName, column.columnName()), field.getType(), field.getGenericType(),
column.matchQuery(), column.storageOnly(), column.dataType().isValue(), columnLength
));
if (log.isDebugEnabled()) {
log.debug("The field named {} with the {} type", column.columnName(), field.getType());
......
......@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.core.storage.query;
import java.io.IOException;
import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag;
import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
import org.apache.skywalking.oap.server.core.query.type.Span;
import org.apache.skywalking.oap.server.core.query.type.TraceBrief;
......@@ -29,9 +30,20 @@ import org.apache.skywalking.oap.server.library.module.Service;
public interface ITraceQueryDAO extends Service {
TraceBrief queryBasicTraces(long startSecondTB, long endSecondTB, long minDuration, long maxDuration,
String endpointName, String serviceId, String serviceInstanceId, String endpointId, String traceId, int limit, int from,
TraceState traceState, QueryOrder queryOrder) throws IOException;
TraceBrief queryBasicTraces(long startSecondTB,
long endSecondTB,
long minDuration,
long maxDuration,
String endpointName,
String serviceId,
String serviceInstanceId,
String endpointId,
String traceId,
int limit,
int from,
TraceState traceState,
QueryOrder queryOrder,
final List<SpanTag> tags) throws IOException;
List<SegmentRecord> queryByTraceId(String traceId) throws IOException;
......
......@@ -25,20 +25,20 @@ import org.junit.Test;
public class ModelColumnTest {
@Test
public void testColumnDefine() {
ModelColumn column = new ModelColumn(new ColumnName("", "abc"), byte[].class, true,
ModelColumn column = new ModelColumn(new ColumnName("", "abc"), byte[].class, byte[].class, true,
false, true, 0
);
Assert.assertEquals(true, column.isStorageOnly());
Assert.assertEquals("abc", column.getColumnName().getName());
column = new ModelColumn(new ColumnName("", "abc"), DataTable.class, true,
column = new ModelColumn(new ColumnName("", "abc"), DataTable.class, DataTable.class, true,
false, true, 200
);
Assert.assertEquals(true, column.isStorageOnly());
Assert.assertEquals("abc", column.getColumnName().getName());
Assert.assertEquals(0, column.getLength());
Assert.assertEquals(200, column.getLength());
column = new ModelColumn(new ColumnName("", "abc"), String.class, true,
column = new ModelColumn(new ColumnName("", "abc"), String.class, String.class, true,
false, true, 200
);
Assert.assertEquals(false, column.isStorageOnly());
......@@ -47,7 +47,7 @@ public class ModelColumnTest {
@Test(expected = IllegalArgumentException.class)
public void testConflictDefinition() {
ModelColumn column = new ModelColumn(new ColumnName("", "abc"), String.class,
ModelColumn column = new ModelColumn(new ColumnName("", "abc"), String.class, String.class,
true, true, true, 200
);
}
......
......@@ -75,7 +75,7 @@ public class TraceQuery implements GraphQLQueryResolver {
return getQueryService().queryBasicTraces(
condition.getServiceId(), condition.getServiceInstanceId(), endpointId, traceId, endpointName, minDuration,
maxDuration, traceState, queryOrder, pagination, startSecondTB, endSecondTB
maxDuration, traceState, queryOrder, pagination, startSecondTB, endSecondTB, condition.getTags()
);
}
......
Subproject commit 563bb51c71922f017911345d7cd5c62a7ac8995c
Subproject commit f38def1d502327856c1cae7ceb233f3c0c8c8e2a
......@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.receiver.trace.mock;
import io.grpc.stub.StreamObserver;
import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.apm.network.language.agent.v3.SpanLayer;
import org.apache.skywalking.apm.network.language.agent.v3.SpanObject;
......@@ -29,7 +30,7 @@ class ServiceAMock {
public static String SERVICE_NAME = "mock_a_service";
public static String SERVICE_INSTANCE_NAME = "mock_a_service_instance";
static String REST_ENDPOINT = "/dubbox-case/case/dubbox-rest";
static String REST_ENDPOINT = "/dubbox-case/case/dubbox-rest/404-test";
static String DUBBO_ENDPOINT = "org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()";
static String DUBBO_ADDRESS = "DubboIPAddress:1000";
......@@ -62,6 +63,9 @@ class ServiceAMock {
span.setComponentId(ComponentsDefine.TOMCAT.getId());
span.setOperationName(REST_ENDPOINT);
span.setIsError(false);
span.addTags(KeyStringValuePair.newBuilder().setKey("http.method").setValue("get").build());
span.addTags(KeyStringValuePair.newBuilder().setKey("status_code").setValue("404").build());
span.addTags(KeyStringValuePair.newBuilder().setKey("status_code").setValue("200").build());
return span;
}
......
......@@ -19,6 +19,9 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import com.google.gson.JsonObject;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.NodeType;
import org.apache.skywalking.oap.server.core.storage.model.DataTypeMapping;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
......@@ -26,7 +29,7 @@ import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObje
public class ColumnTypeEsMapping implements DataTypeMapping {
@Override
public String transform(Class<?> type) {
public String transform(Class<?> type, Type genericType) {
if (Integer.class.equals(type) || int.class.equals(type) || NodeType.class.equals(type)) {
return "integer";
} else if (Long.class.equals(type) || long.class.equals(type)) {
......@@ -41,6 +44,9 @@ public class ColumnTypeEsMapping implements DataTypeMapping {
return "binary";
} else if (JsonObject.class.equals(type)) {
return "text";
} else if (List.class.isAssignableFrom(type)) {
final Type elementType = ((ParameterizedType) genericType).getActualTypeArguments()[0];
return transform((Class<?>) elementType, elementType);
} else {
throw new IllegalArgumentException("Unsupported data type: " + type.getName());
}
......
......@@ -133,7 +133,7 @@ public class StorageEsInstaller extends ModelInstaller {
String matchCName = MatchCNameBuilder.INSTANCE.build(columnDefine.getColumnName().getName());
Map<String, Object> originalColumn = new HashMap<>();
originalColumn.put("type", columnTypeEsMapping.transform(columnDefine.getType()));
originalColumn.put("type", columnTypeEsMapping.transform(columnDefine.getType(), columnDefine.getGenericType()));
originalColumn.put("copy_to", matchCName);
properties.put(columnDefine.getColumnName().getName(), originalColumn);
......@@ -143,7 +143,7 @@ public class StorageEsInstaller extends ModelInstaller {
properties.put(matchCName, matchColumn);
} else {
Map<String, Object> column = new HashMap<>();
column.put("type", columnTypeEsMapping.transform(columnDefine.getType()));
column.put("type", columnTypeEsMapping.transform(columnDefine.getType(), columnDefine.getGenericType()));
if (columnDefine.isStorageOnly()) {
column.put("index", false);
}
......
......@@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.List;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag;
import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
import org.apache.skywalking.oap.server.core.query.type.Span;
......@@ -34,6 +35,7 @@ import org.apache.skywalking.oap.server.core.query.type.TraceState;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder;
import org.elasticsearch.action.search.SearchResponse;
......@@ -67,7 +69,8 @@ public class TraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
int limit,
int from,
TraceState traceState,
QueryOrder queryOrder) throws IOException {
QueryOrder queryOrder,
final List<SpanTag> tags) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
......@@ -120,6 +123,13 @@ public class TraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
sourceBuilder.sort(SegmentRecord.LATENCY, SortOrder.DESC);
break;
}
if (CollectionUtils.isNotEmpty(tags)) {
BoolQueryBuilder tagMatchQuery = QueryBuilders.boolQuery();
tags.forEach(tag -> {
tagMatchQuery.must(QueryBuilders.termQuery(SegmentRecord.TAGS, tag.toString()));
});
mustQueryList.add(tagMatchQuery);
}
sourceBuilder.size(limit);
sourceBuilder.from(from);
......@@ -135,9 +145,11 @@ public class TraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
basicTrace.setStart(String.valueOf(searchHit.getSourceAsMap().get(SegmentRecord.START_TIME)));
basicTrace.getEndpointNames().add((String) searchHit.getSourceAsMap().get(SegmentRecord.ENDPOINT_NAME));
basicTrace.setDuration(((Number) searchHit.getSourceAsMap().get(SegmentRecord.LATENCY)).intValue());
basicTrace.setError(BooleanUtils.valueToBoolean(((Number) searchHit.getSourceAsMap()
.get(
SegmentRecord.IS_ERROR)).intValue()));
basicTrace.setError(
BooleanUtils.valueToBoolean(
((Number) searchHit.getSourceAsMap().get(SegmentRecord.IS_ERROR)).intValue()
)
);
basicTrace.getTraceIds().add((String) searchHit.getSourceAsMap().get(SegmentRecord.TRACE_ID));
traceBrief.getTraces().add(basicTrace);
}
......
......@@ -18,24 +18,30 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.lang.reflect.Type;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
public class ElasticSearchColumnTypeMappingTestCase {
public List<String> a;
@Test
public void test() {
public void test() throws NoSuchFieldException {
ColumnTypeEsMapping mapping = new ColumnTypeEsMapping();
Assert.assertEquals("integer", mapping.transform(int.class));
Assert.assertEquals("integer", mapping.transform(Integer.class));
Assert.assertEquals("integer", mapping.transform(int.class, int.class));
Assert.assertEquals("integer", mapping.transform(Integer.class, Integer.class));
Assert.assertEquals("long", mapping.transform(long.class));
Assert.assertEquals("long", mapping.transform(Long.class));
Assert.assertEquals("long", mapping.transform(long.class, long.class));
Assert.assertEquals("long", mapping.transform(Long.class, Long.class));
Assert.assertEquals("double", mapping.transform(double.class));
Assert.assertEquals("double", mapping.transform(Double.class));
Assert.assertEquals("double", mapping.transform(double.class, double.class));
Assert.assertEquals("double", mapping.transform(Double.class, Double.class));
Assert.assertEquals("keyword", mapping.transform(String.class));
Assert.assertEquals("keyword", mapping.transform(String.class, String.class));
final Type listFieldType = this.getClass().getField("a").getGenericType();
Assert.assertEquals("keyword", mapping.transform(List.class, listFieldType));
}
}
......@@ -23,12 +23,14 @@ import java.io.IOException;
import java.util.List;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag;
import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
import org.apache.skywalking.oap.server.core.query.type.TraceBrief;
import org.apache.skywalking.oap.server.core.query.type.TraceState;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TraceQueryEsDAO;
import org.elasticsearch.action.search.SearchResponse;
......@@ -59,7 +61,8 @@ public class TraceQueryEs7DAO extends TraceQueryEsDAO {
int limit,
int from,
TraceState traceState,
QueryOrder queryOrder) throws IOException {
QueryOrder queryOrder,
final List<SpanTag> tags) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
......@@ -112,6 +115,13 @@ public class TraceQueryEs7DAO extends TraceQueryEsDAO {
sourceBuilder.sort(SegmentRecord.LATENCY, SortOrder.DESC);
break;
}
if (CollectionUtils.isNotEmpty(tags)) {
BoolQueryBuilder tagMatchQuery = QueryBuilders.boolQuery();
tags.forEach(tag -> {
tagMatchQuery.must(QueryBuilders.termQuery(SegmentRecord.TAGS, tag.toString()));
});
mustQueryList.add(tagMatchQuery);
}
sourceBuilder.size(limit);
sourceBuilder.from(from);
......@@ -127,9 +137,10 @@ public class TraceQueryEs7DAO extends TraceQueryEsDAO {
basicTrace.setStart(String.valueOf(searchHit.getSourceAsMap().get(SegmentRecord.START_TIME)));
basicTrace.getEndpointNames().add((String) searchHit.getSourceAsMap().get(SegmentRecord.ENDPOINT_NAME));
basicTrace.setDuration(((Number) searchHit.getSourceAsMap().get(SegmentRecord.LATENCY)).intValue());
basicTrace.setError(BooleanUtils.valueToBoolean(((Number) searchHit.getSourceAsMap()
.get(
SegmentRecord.IS_ERROR)).intValue()));
basicTrace.setError(
BooleanUtils.valueToBoolean(
((Number) searchHit.getSourceAsMap().get(SegmentRecord.IS_ERROR)).intValue())
);
basicTrace.getTraceIds().add((String) searchHit.getSourceAsMap().get(SegmentRecord.TRACE_ID));
traceBrief.getTraces().add(basicTrace);
}
......
......@@ -80,6 +80,11 @@ public class TableMetaInfo {
if (storageAndColumnMap.containsKey(SegmentRecord.SERVICE_ID)) {
storageAndTagMap.put(SegmentRecord.SERVICE_ID, InfluxConstants.TagName.SERVICE_ID);
}
// The field of SegmentRecord, tags, store as tag only. see SegmentRecord.DATA_BINARY
if (SegmentRecord.INDEX_NAME.equals(model.getName())) {
storageAndColumnMap.remove(SegmentRecord.TAGS);
}
}
TableMetaInfo info = TableMetaInfo.builder()
......
......@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.storage.plugin.influxdb.base;
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.model.Model;
......@@ -40,6 +41,9 @@ public class InfluxInsertRequest implements InsertRequest, UpdateRequest {
public InfluxInsertRequest(Model model, StorageData storageData, StorageBuilder storageBuilder) {
Map<String, Object> objectMap = storageBuilder.data2Map(storageData);
if (SegmentRecord.INDEX_NAME.equals(model.getName())) {
objectMap.remove(SegmentRecord.TAGS);
}
for (ModelColumn column : model.getColumns()) {
Object value = objectMap.get(column.getColumnName().getName());
......@@ -68,6 +72,11 @@ public class InfluxInsertRequest implements InsertRequest, UpdateRequest {
return this;
}
public InfluxInsertRequest tag(String key, String value) {
builder.tag(key, value);
return this;
}
public Point getPoint() {
return builder.build();
}
......
......@@ -18,10 +18,16 @@
package org.apache.skywalking.oap.server.storage.plugin.influxdb.base;
import com.google.common.base.Joiner;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
......@@ -52,6 +58,21 @@ public class RecordDAO implements IRecordDAO {
TableMetaInfo.get(model.getName()).getStorageAndTagMap().forEach((field, tag) -> {
request.addFieldAsTag(field, tag);
});
if (SegmentRecord.INDEX_NAME.equals(model.getName())) {
Map<String, List<SpanTag>> collect = ((SegmentRecord) record).getTagsRawData()
.stream()
.collect(
Collectors.groupingBy(SpanTag::getKey));
collect.entrySet().forEach(e -> {
request.tag(e.getKey(), "'" + Joiner.on("'")
.join(e.getValue()
.stream()
.map(SpanTag::getValue)
.collect(Collectors.toSet())) + "'");
});
}
return request;
}
}
......@@ -26,6 +26,7 @@ import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag;
import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
import org.apache.skywalking.oap.server.core.query.type.Span;
......@@ -33,12 +34,14 @@ import org.apache.skywalking.oap.server.core.query.type.TraceBrief;
import org.apache.skywalking.oap.server.core.query.type.TraceState;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
import org.elasticsearch.common.Strings;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.influxdb.querybuilder.SelectQueryImpl;
import org.influxdb.querybuilder.WhereNested;
import org.influxdb.querybuilder.WhereQueryImpl;
import org.influxdb.querybuilder.clauses.Clause;
......@@ -69,7 +72,8 @@ public class TraceQuery implements ITraceQueryDAO {
int limit,
int from,
TraceState traceState,
QueryOrder queryOrder)
QueryOrder queryOrder,
final List<SpanTag> tags)
throws IOException {
String orderBy = SegmentRecord.START_TIME;
......@@ -121,6 +125,13 @@ public class TraceQuery implements ITraceQueryDAO {
recallQuery.and(eq(SegmentRecord.IS_ERROR, BooleanUtils.FALSE));
break;
}
if (CollectionUtils.isNotEmpty(tags)) {
WhereNested<WhereQueryImpl<SelectQueryImpl>> nested = recallQuery.andNested();
for (final SpanTag tag : tags) {
nested.and(contains(tag.getKey(), "'" + tag.getValue() + "'"));
}
nested.close();
}
WhereQueryImpl<SelectQueryImpl> countQuery = select()
.count(SegmentRecord.ENDPOINT_ID)
......
......@@ -30,6 +30,7 @@ import java.util.List;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag;
import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
import org.apache.skywalking.oap.server.core.query.type.KeyValue;
import org.apache.skywalking.oap.server.core.query.type.LogEntity;
......@@ -89,7 +90,8 @@ public class JaegerTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
int limit,
int from,
TraceState traceState,
QueryOrder queryOrder) throws IOException {
QueryOrder queryOrder,
final List<SpanTag> tags) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
......
......@@ -30,4 +30,33 @@ public class H2StorageConfig extends ModuleConfig {
private String user = "";
private String password = "";
private int metadataQueryMaxSize = 5000;
/**
* Some entities, such as trace segment, include the logic column with multiple values. Some storage support this
* kind of data structure, but H2 doesn't.
*
* In the H2, we use multiple physical columns to host the values, such as,
*
* Change column_a with values [1,2,3,4,5] to
* <p>
* column_a_0 = 1, column_a_1 = 2, column_a_2 = 3 , column_a_3 = 4, column_a_4 = 5
* </p>
*
* This configuration controls the threshold about how many physical columns should to be added, also limit the max
* values of this kind of column.
*
* SkyWalking don't create a new table for indexing, because it would amplify the size of data set to dozens time,
* which is not practical in the production environment.
*
* @since 8.2.0
*/
private int maxSizeOfArrayColumn = 20;
/**
* In a trace segment, it includes multiple spans with multiple tags. Different spans could have same tag keys, such
* as multiple HTTP exit spans all have their own `http.method` tag.
*
* This configuration set the limitation of max num of values for the same tag key.
*
* @since 8.2.0
*/
private int numOfSearchableValuesPerTag = 2;
}
......@@ -20,7 +20,9 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
......@@ -109,39 +111,64 @@ public class H2StorageProvider extends ModuleProvider {
h2Client = new JDBCHikariCPClient(settings);
this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(h2Client));
this.registerServiceImplementation(StorageDAO.class, new H2StorageDAO(h2Client));
this.registerServiceImplementation(
StorageDAO.class,
new H2StorageDAO(
getManager(), h2Client, config.getMaxSizeOfArrayColumn(), config.getNumOfSearchableValuesPerTag())
);
this.registerServiceImplementation(
INetworkAddressAliasDAO.class, new H2NetworkAddressAliasDAO(h2Client));
INetworkAddressAliasDAO.class, new H2NetworkAddressAliasDAO(h2Client));
this.registerServiceImplementation(ITopologyQueryDAO.class, new H2TopologyQueryDAO(h2Client));
this.registerServiceImplementation(IMetricsQueryDAO.class, new H2MetricsQueryDAO(h2Client));
this.registerServiceImplementation(ITraceQueryDAO.class, new H2TraceQueryDAO(h2Client));
this.registerServiceImplementation(
IMetadataQueryDAO.class, new H2MetadataQueryDAO(h2Client, config.getMetadataQueryMaxSize()));
ITraceQueryDAO.class, new H2TraceQueryDAO(
getManager(),
h2Client,
config.getMaxSizeOfArrayColumn(),
config.getNumOfSearchableValuesPerTag()
));
this.registerServiceImplementation(
IMetadataQueryDAO.class, new H2MetadataQueryDAO(h2Client, config.getMetadataQueryMaxSize()));
this.registerServiceImplementation(IAggregationQueryDAO.class, new H2AggregationQueryDAO(h2Client));
this.registerServiceImplementation(IAlarmQueryDAO.class, new H2AlarmQueryDAO(h2Client));
this.registerServiceImplementation(
IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(h2Client));
IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(h2Client));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new H2TopNRecordsQueryDAO(h2Client));
this.registerServiceImplementation(ILogQueryDAO.class, new H2LogQueryDAO(h2Client));
this.registerServiceImplementation(IProfileTaskQueryDAO.class, new H2ProfileTaskQueryDAO(h2Client));
this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new H2ProfileTaskLogQueryDAO(h2Client));
this.registerServiceImplementation(
IProfileThreadSnapshotQueryDAO.class, new H2ProfileThreadSnapshotQueryDAO(h2Client));
IProfileThreadSnapshotQueryDAO.class, new H2ProfileThreadSnapshotQueryDAO(h2Client));
this.registerServiceImplementation(UITemplateManagementDAO.class, new H2UITemplateManagementDAO(h2Client));
}
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
HealthCheckMetrics healthChecker = metricCreator.createHealthCheckerGauge("storage_h2", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
final ConfigService configService = getManager().find(CoreModule.NAME)
.provider()
.getService(ConfigService.class);
final int numOfSearchableTags = configService.getSearchableTracesTags().split(Const.COMMA).length;
if (numOfSearchableTags * config.getNumOfSearchableValuesPerTag() > config.getMaxSizeOfArrayColumn()) {
throw new ModuleStartException("Size of searchableTracesTags[" + numOfSearchableTags
+ "] * numOfSearchableValuesPerTag[" + config.getNumOfSearchableValuesPerTag()
+ "] > maxSizeOfArrayColumn[" + config.getMaxSizeOfArrayColumn()
+ "]. Potential out of bound in the runtime.");
}
MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class);
HealthCheckMetrics healthChecker = metricCreator.createHealthCheckerGauge(
"storage_h2", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
h2Client.registerChecker(healthChecker);
try {
h2Client.connect();
H2TableInstaller installer = new H2TableInstaller(h2Client, getManager());
H2TableInstaller installer = new H2TableInstaller(
h2Client, getManager(), config.getMaxSizeOfArrayColumn(), config.getNumOfSearchableValuesPerTag());
getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer);
} catch (StorageException e) {
throw new ModuleStartException(e.getMessage(), e);
......@@ -155,6 +182,6 @@ public class H2StorageProvider extends ModuleProvider {
@Override
public String[] requiredModules() {
return new String[]{CoreModule.NAME};
return new String[] {CoreModule.NAME};
}
}
......@@ -19,25 +19,58 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
public class H2RecordDAO extends H2SQLExecutor implements IRecordDAO {
private JDBCHikariCPClient h2Client;
private StorageBuilder<Record> storageBuilder;
private final int maxSizeOfArrayColumn;
public H2RecordDAO(JDBCHikariCPClient h2Client, StorageBuilder<Record> storageBuilder) {
public H2RecordDAO(ModuleManager manager,
JDBCHikariCPClient h2Client,
StorageBuilder<Record> storageBuilder,
final int maxSizeOfArrayColumn,
final int numOfSearchableValuesPerTag) {
this.h2Client = h2Client;
this.storageBuilder = storageBuilder;
try {
if (SegmentRecord.class
.equals(
storageBuilder.getClass().getMethod("map2Data", Map.class).getReturnType()
)
) {
this.maxSizeOfArrayColumn = maxSizeOfArrayColumn;
final ConfigService configService = manager.find(CoreModule.NAME)
.provider()
.getService(ConfigService.class);
this.storageBuilder = new H2SegmentRecordBuilder(
maxSizeOfArrayColumn,
numOfSearchableValuesPerTag,
Arrays.asList(configService.getSearchableTracesTags().split(Const.COMMA))
);
} else {
this.maxSizeOfArrayColumn = 1;
this.storageBuilder = storageBuilder;
}
} catch (NoSuchMethodException e) {
throw new UnexpectedException("Can't find the SegmentRecord$Builder.map2Data method.");
}
}
@Override
public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
return getInsertExecutor(model.getName(), record, storageBuilder);
return getInsertExecutor(model.getName(), record, storageBuilder, maxSizeOfArrayColumn);
}
}
......@@ -105,6 +105,12 @@ public class H2SQLExecutor {
protected <T extends StorageData> SQLExecutor getInsertExecutor(String modelName, T metrics,
StorageBuilder<T> storageBuilder) throws IOException {
return getInsertExecutor(modelName, metrics, storageBuilder, 1);
}
protected <T extends StorageData> SQLExecutor getInsertExecutor(String modelName, T metrics,
StorageBuilder<T> storageBuilder,
int maxSizeOfArrayColumn) throws IOException {
Map<String, Object> objectMap = storageBuilder.data2Map(metrics);
SQLBuilder sqlBuilder = new SQLBuilder("INSERT INTO " + modelName + " VALUES");
......@@ -114,16 +120,27 @@ public class H2SQLExecutor {
param.add(metrics.id());
for (int i = 0; i < columns.size(); i++) {
ModelColumn column = columns.get(i);
sqlBuilder.append("?");
if (i != columns.size() - 1) {
sqlBuilder.append(",");
if (List.class.isAssignableFrom(column.getType())) {
for (int physicalColumnIdx = 0; physicalColumnIdx < maxSizeOfArrayColumn; physicalColumnIdx++) {
sqlBuilder.append("?");
param.add(objectMap.get(column.getColumnName().getName() + "_" + physicalColumnIdx));
if (physicalColumnIdx != maxSizeOfArrayColumn - 1) {
sqlBuilder.append(",");
}
}
} else {
sqlBuilder.append("?");
Object value = objectMap.get(column.getColumnName().getName());
if (value instanceof StorageDataComplexObject) {
param.add(((StorageDataComplexObject) value).toStorageData());
} else {
param.add(value);
}
}
Object value = objectMap.get(column.getColumnName().getName());
if (value instanceof StorageDataComplexObject) {
param.add(((StorageDataComplexObject) value).toStorageData());
} else {
param.add(value);
if (i != columns.size() - 1) {
sqlBuilder.append(",");
}
}
sqlBuilder.append(")");
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import joptsimple.internal.Strings;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.DATA_BINARY;
import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.ENDPOINT_ID;
import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.ENDPOINT_NAME;
import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.END_TIME;
import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.IS_ERROR;
import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.LATENCY;
import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.SEGMENT_ID;
import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.SERVICE_ID;
import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.SERVICE_INSTANCE_ID;
import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.START_TIME;
import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.TAGS;
import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.TIME_BUCKET;
import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.TRACE_ID;
import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.VERSION;
/**
* H2/MySQL is different from standard {@link org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.Builder},
* this maps the tags into multiple columns.
*/
public class H2SegmentRecordBuilder implements StorageBuilder<Record> {
private int numOfSearchableValuesPerTag;
private final List<String> searchTagKeys;
public H2SegmentRecordBuilder(final int maxSizeOfArrayColumn,
final int numOfSearchableValuesPerTag,
final List<String> searchTagKeys) {
this.numOfSearchableValuesPerTag = numOfSearchableValuesPerTag;
final int maxNumOfTags = maxSizeOfArrayColumn / numOfSearchableValuesPerTag;
if (searchTagKeys.size() > maxNumOfTags) {
this.searchTagKeys = searchTagKeys.subList(0, maxNumOfTags);
} else {
this.searchTagKeys = searchTagKeys;
}
}
@Override
public Map<String, Object> data2Map(Record record) {
SegmentRecord storageData = (SegmentRecord) record;
storageData.setStatement(Strings.join(new String[] {
storageData.getEndpointName(),
storageData.getTraceId()
}, " - "));
Map<String, Object> map = new HashMap<>();
map.put(SEGMENT_ID, storageData.getSegmentId());
map.put(TRACE_ID, storageData.getTraceId());
map.put(TopN.STATEMENT, storageData.getStatement());
map.put(SERVICE_ID, storageData.getServiceId());
map.put(SERVICE_INSTANCE_ID, storageData.getServiceInstanceId());
map.put(ENDPOINT_NAME, storageData.getEndpointName());
map.put(ENDPOINT_ID, storageData.getEndpointId());
map.put(START_TIME, storageData.getStartTime());
map.put(END_TIME, storageData.getEndTime());
map.put(LATENCY, storageData.getLatency());
map.put(IS_ERROR, storageData.getIsError());
map.put(TIME_BUCKET, storageData.getTimeBucket());
if (CollectionUtils.isEmpty(storageData.getDataBinary())) {
map.put(DATA_BINARY, Const.EMPTY_STRING);
} else {
map.put(DATA_BINARY, new String(Base64.getEncoder().encode(storageData.getDataBinary())));
}
map.put(VERSION, storageData.getVersion());
storageData.getTagsRawData().forEach(spanTag -> {
final int index = searchTagKeys.indexOf(spanTag.getKey());
boolean shouldAdd = true;
int tagIdx = 0;
final String tagExpression = spanTag.toString();
for (int i = 0; i < numOfSearchableValuesPerTag; i++) {
tagIdx = index * numOfSearchableValuesPerTag + i;
final String previousValue = (String) map.get(TAGS + "_" + tagIdx);
if (previousValue == null) {
// Still have at least one available slot, add directly.
shouldAdd = true;
break;
}
// If value is duplicated with added one, ignore.
if (previousValue.equals(tagExpression)) {
shouldAdd = false;
break;
}
// Reach the end of tag
if (i == numOfSearchableValuesPerTag - 1) {
shouldAdd = false;
}
}
if (shouldAdd) {
map.put(TAGS + "_" + tagIdx, tagExpression);
}
});
return map;
}
@Override
public Record map2Data(Map<String, Object> dbMap) {
SegmentRecord record = new SegmentRecord();
record.setSegmentId((String) dbMap.get(SEGMENT_ID));
record.setTraceId((String) dbMap.get(TRACE_ID));
record.setStatement((String) dbMap.get(TopN.STATEMENT));
record.setServiceId((String) dbMap.get(SERVICE_ID));
record.setServiceInstanceId((String) dbMap.get(SERVICE_INSTANCE_ID));
record.setEndpointName((String) dbMap.get(ENDPOINT_NAME));
record.setEndpointId((String) dbMap.get(ENDPOINT_ID));
record.setStartTime(((Number) dbMap.get(START_TIME)).longValue());
record.setEndTime(((Number) dbMap.get(END_TIME)).longValue());
record.setLatency(((Number) dbMap.get(LATENCY)).intValue());
record.setIsError(((Number) dbMap.get(IS_ERROR)).intValue());
record.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue());
if (StringUtil.isEmpty((String) dbMap.get(DATA_BINARY))) {
record.setDataBinary(new byte[] {});
} else {
record.setDataBinary(Base64.getDecoder().decode((String) dbMap.get(DATA_BINARY)));
}
record.setVersion(((Number) dbMap.get(VERSION)).intValue());
// Don't read the tags as they has been in the data binary already.
return record;
}
}
\ No newline at end of file
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.oap.server.core.analysis.config.NoneStream;
import org.apache.skywalking.oap.server.core.analysis.management.ManagementData;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
......@@ -29,14 +30,14 @@ import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
@RequiredArgsConstructor
public class H2StorageDAO implements StorageDAO {
private JDBCHikariCPClient h2Client;
public H2StorageDAO(JDBCHikariCPClient h2Client) {
this.h2Client = h2Client;
}
private final ModuleManager manager;
private final JDBCHikariCPClient h2Client;
private final int maxSizeOfArrayColumn;
private final int numOfSearchableValuesPerTag;
@Override
public IMetricsDAO newMetricsDao(StorageBuilder<Metrics> storageBuilder) {
......@@ -45,7 +46,7 @@ public class H2StorageDAO implements StorageDAO {
@Override
public IRecordDAO newRecordDao(StorageBuilder<Record> storageBuilder) {
return new H2RecordDAO(h2Client, storageBuilder);
return new H2RecordDAO(manager, h2Client, storageBuilder, maxSizeOfArrayColumn, numOfSearchableValuesPerTag);
}
@Override
......
......@@ -19,8 +19,11 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import com.google.gson.JsonObject;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.NodeType;
import org.apache.skywalking.oap.server.core.storage.StorageException;
......@@ -44,8 +47,16 @@ import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo;
public class H2TableInstaller extends ModelInstaller {
public static final String ID_COLUMN = "id";
public H2TableInstaller(Client client, ModuleManager moduleManager) {
protected final int maxSizeOfArrayColumn;
protected final int numOfSearchableValuesPerTag;
public H2TableInstaller(Client client,
ModuleManager moduleManager,
int maxSizeOfArrayColumn,
int numOfSearchableValuesPerTag) {
super(client, moduleManager);
this.maxSizeOfArrayColumn = maxSizeOfArrayColumn;
this.numOfSearchableValuesPerTag = numOfSearchableValuesPerTag;
}
@Override
......@@ -67,9 +78,7 @@ public class H2TableInstaller extends ModelInstaller {
ModelColumn column = model.getColumns().get(i);
ColumnName name = column.getColumnName();
tableCreateSQL.appendLine(
name.getStorageName() + " " + getColumnType(column) + (i != model
.getColumns()
.size() - 1 ? "," : ""));
getColumn(column) + (i != model.getColumns().size() - 1 ? "," : ""));
}
tableCreateSQL.appendLine(")");
......@@ -88,22 +97,37 @@ public class H2TableInstaller extends ModelInstaller {
/**
* Set up the data type mapping between Java type and H2 database type
*/
protected String getColumnType(ModelColumn column) {
final Class<?> type = column.getType();
protected String getColumn(ModelColumn column) {
return transform(column, column.getType(), column.getGenericType());
}
protected String transform(ModelColumn column, Class<?> type, Type genericType) {
final String storageName = column.getColumnName().getStorageName();
if (Integer.class.equals(type) || int.class.equals(type) || NodeType.class.equals(type)) {
return "INT";
return storageName + " INT";
} else if (Long.class.equals(type) || long.class.equals(type)) {
return "BIGINT";
return storageName + " BIGINT";
} else if (Double.class.equals(type) || double.class.equals(type)) {
return "DOUBLE";
return storageName + " DOUBLE";
} else if (String.class.equals(type)) {
return "VARCHAR(" + column.getLength() + ")";
return storageName + " VARCHAR(" + column.getLength() + ")";
} else if (StorageDataComplexObject.class.isAssignableFrom(type)) {
return "VARCHAR(20000)";
return storageName + " VARCHAR(20000)";
} else if (byte[].class.equals(type)) {
return "MEDIUMTEXT";
return storageName + " MEDIUMTEXT";
} else if (JsonObject.class.equals(type)) {
return "VARCHAR(" + column.getLength() + ")";
return storageName + " VARCHAR(" + column.getLength() + ")";
} else if (List.class.isAssignableFrom(type)) {
final Type elementType = ((ParameterizedType) genericType).getActualTypeArguments()[0];
String oneColumnType = transform(column, (Class<?>) elementType, elementType);
// Remove the storageName as prefix
oneColumnType = oneColumnType.substring(storageName.length());
StringBuilder columns = new StringBuilder();
for (int i = 0; i < maxSizeOfArrayColumn; i++) {
columns.append(storageName).append("_").append(i).append(oneColumnType)
.append(i == maxSizeOfArrayColumn - 1 ? "" : ",");
}
return columns.toString();
} else {
throw new IllegalArgumentException("Unsupported data type: " + type.getName());
}
......
......@@ -24,11 +24,16 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
import org.apache.skywalking.oap.server.core.query.type.Span;
......@@ -36,14 +41,26 @@ import org.apache.skywalking.oap.server.core.query.type.TraceBrief;
import org.apache.skywalking.oap.server.core.query.type.TraceState;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.elasticsearch.search.sort.SortOrder;
public class H2TraceQueryDAO implements ITraceQueryDAO {
private ModuleManager manager;
private JDBCHikariCPClient h2Client;
public H2TraceQueryDAO(JDBCHikariCPClient h2Client) {
private List<String> searchableTagKeys;
private int maxSizeOfArrayColumn;
private int numOfSearchableValuesPerTag;
public H2TraceQueryDAO(ModuleManager manager,
JDBCHikariCPClient h2Client,
final int maxSizeOfArrayColumn,
final int numOfSearchableValuesPerTag) {
this.h2Client = h2Client;
this.manager = manager;
this.maxSizeOfArrayColumn = maxSizeOfArrayColumn;
this.numOfSearchableValuesPerTag = numOfSearchableValuesPerTag;
}
@Override
......@@ -59,7 +76,18 @@ public class H2TraceQueryDAO implements ITraceQueryDAO {
int limit,
int from,
TraceState traceState,
QueryOrder queryOrder) throws IOException {
QueryOrder queryOrder,
final List<SpanTag> tags) throws IOException {
if (searchableTagKeys == null) {
final ConfigService configService = manager.find(CoreModule.NAME)
.provider()
.getService(ConfigService.class);
searchableTagKeys = Arrays.asList(configService.getSearchableTracesTags().split(Const.COMMA));
if (searchableTagKeys.size() > maxSizeOfArrayColumn) {
this.searchableTagKeys = searchableTagKeys.subList(0, maxSizeOfArrayColumn);
}
}
StringBuilder sql = new StringBuilder();
List<Object> parameters = new ArrayList<>(10);
......@@ -101,6 +129,26 @@ public class H2TraceQueryDAO implements ITraceQueryDAO {
sql.append(" and ").append(SegmentRecord.TRACE_ID).append(" = ?");
parameters.add(traceId);
}
if (CollectionUtils.isNotEmpty(tags)) {
for (final SpanTag tag : tags) {
final int foundIdx = searchableTagKeys.indexOf(tag.getKey());
if (foundIdx > -1) {
sql.append(" and (");
for (int i = 0; i < numOfSearchableValuesPerTag; i++) {
final String physicalColumn = SegmentRecord.TAGS + "_" + (foundIdx * numOfSearchableValuesPerTag + i);
sql.append(physicalColumn).append(" = ? ");
parameters.add(tag.toString());
if (i != numOfSearchableValuesPerTag - 1) {
sql.append(" or ");
}
}
sql.append(")");
} else {
//If the tag is not searchable, but is required, then don't need to run the real query.
return new TraceBrief();
}
}
}
switch (traceState) {
case ERROR:
sql.append(" and ").append(SegmentRecord.IS_ERROR).append(" = ").append(BooleanUtils.TRUE);
......
......@@ -18,16 +18,26 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql;
import java.util.Properties;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import java.util.Properties;
@Setter
@Getter
public final class MySQLStorageConfig extends ModuleConfig {
private int metadataQueryMaxSize = 5000;
/**
* Inherit from {@link org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.H2StorageConfig#getMaxSizeOfArrayColumn()}
*
* @since 8.2.0
*/
private int maxSizeOfArrayColumn = 20;
/**
* Inherit from {@link org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.H2StorageConfig#getNumOfSearchableValuesPerTag()}
*
* @since 8.2.0
*/
private int numOfSearchableValuesPerTag = 2;
private Properties properties;
}
......@@ -19,7 +19,9 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
......@@ -96,35 +98,60 @@ public class MySQLStorageProvider extends ModuleProvider {
mysqlClient = new JDBCHikariCPClient(config.getProperties());
this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(mysqlClient));
this.registerServiceImplementation(StorageDAO.class, new H2StorageDAO(mysqlClient));
this.registerServiceImplementation(
INetworkAddressAliasDAO.class, new H2NetworkAddressAliasDAO(mysqlClient));
StorageDAO.class,
new H2StorageDAO(
getManager(), mysqlClient, config.getMaxSizeOfArrayColumn(), config.getNumOfSearchableValuesPerTag())
);
this.registerServiceImplementation(
INetworkAddressAliasDAO.class, new H2NetworkAddressAliasDAO(mysqlClient));
this.registerServiceImplementation(ITopologyQueryDAO.class, new H2TopologyQueryDAO(mysqlClient));
this.registerServiceImplementation(IMetricsQueryDAO.class, new H2MetricsQueryDAO(mysqlClient));
this.registerServiceImplementation(ITraceQueryDAO.class, new MySQLTraceQueryDAO(mysqlClient));
this.registerServiceImplementation(
IMetadataQueryDAO.class, new H2MetadataQueryDAO(mysqlClient, config.getMetadataQueryMaxSize()));
ITraceQueryDAO.class,
new MySQLTraceQueryDAO(
getManager(),
mysqlClient,
config.getMaxSizeOfArrayColumn(),
config.getNumOfSearchableValuesPerTag()
)
);
this.registerServiceImplementation(
IMetadataQueryDAO.class, new H2MetadataQueryDAO(mysqlClient, config.getMetadataQueryMaxSize()));
this.registerServiceImplementation(IAggregationQueryDAO.class, new MySQLAggregationQueryDAO(mysqlClient));
this.registerServiceImplementation(IAlarmQueryDAO.class, new MySQLAlarmQueryDAO(mysqlClient));
this.registerServiceImplementation(
IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(mysqlClient));
IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(mysqlClient));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new H2TopNRecordsQueryDAO(mysqlClient));
this.registerServiceImplementation(ILogQueryDAO.class, new MySQLLogQueryDAO(mysqlClient));
this.registerServiceImplementation(IProfileTaskQueryDAO.class, new H2ProfileTaskQueryDAO(mysqlClient));
this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new H2ProfileTaskLogQueryDAO(mysqlClient));
this.registerServiceImplementation(
IProfileThreadSnapshotQueryDAO.class, new H2ProfileThreadSnapshotQueryDAO(mysqlClient));
IProfileThreadSnapshotQueryDAO.class, new H2ProfileThreadSnapshotQueryDAO(mysqlClient));
this.registerServiceImplementation(UITemplateManagementDAO.class, new H2UITemplateManagementDAO(mysqlClient));
}
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
final ConfigService configService = getManager().find(CoreModule.NAME)
.provider()
.getService(ConfigService.class);
final int numOfSearchableTags = configService.getSearchableTracesTags().split(Const.COMMA).length;
if (numOfSearchableTags * config.getNumOfSearchableValuesPerTag() > config.getMaxSizeOfArrayColumn()) {
throw new ModuleStartException("Size of searchableTracesTags[" + numOfSearchableTags
+ "] * numOfSearchableValuesPerTag[" + config.getNumOfSearchableValuesPerTag()
+ "] > maxSizeOfArrayColumn[" + config.getMaxSizeOfArrayColumn()
+ "]. Potential out of bound in the runtime.");
}
try {
mysqlClient.connect();
MySQLTableInstaller installer = new MySQLTableInstaller(mysqlClient, getManager());
MySQLTableInstaller installer = new MySQLTableInstaller(
mysqlClient, getManager(), config.getMaxSizeOfArrayColumn(), config.getNumOfSearchableValuesPerTag()
);
getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer);
} catch (StorageException e) {
throw new ModuleStartException(e.getMessage(), e);
......@@ -138,6 +165,6 @@ public class MySQLStorageProvider extends ModuleProvider {
@Override
public String[] requiredModules() {
return new String[]{CoreModule.NAME};
return new String[] {CoreModule.NAME};
}
}
......@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.model.ExtraQueryIndex;
......@@ -40,8 +41,11 @@ import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TableInstal
*/
@Slf4j
public class MySQLTableInstaller extends H2TableInstaller {
public MySQLTableInstaller(Client client, ModuleManager moduleManager) {
super(client, moduleManager);
public MySQLTableInstaller(Client client,
ModuleManager moduleManager,
int maxSizeOfArrayColumn,
int numOfSearchableValuesPerTag) {
super(client, moduleManager, maxSizeOfArrayColumn, numOfSearchableValuesPerTag);
/*
* Override column because the default column names in core have syntax conflict with MySQL.
*/
......@@ -72,24 +76,39 @@ public class MySQLTableInstaller extends H2TableInstaller {
int indexSeq = 0;
for (final ModelColumn modelColumn : model.getColumns()) {
if (!modelColumn.isStorageOnly()) {
SQLBuilder tableIndexSQL = new SQLBuilder("CREATE INDEX ");
tableIndexSQL.append(model.getName().toUpperCase())
.append("_")
.append(String.valueOf(indexSeq++))
.append("_IDX ");
tableIndexSQL.append("ON ").append(model.getName()).append("(")
.append(modelColumn.getColumnName().getStorageName())
.append(")");
createIndex(client, connection, model, tableIndexSQL);
final Class<?> type = modelColumn.getType();
if (List.class.isAssignableFrom(type)) {
for (int i = 0; i < maxSizeOfArrayColumn; i++) {
SQLBuilder tableIndexSQL = new SQLBuilder("CREATE INDEX ");
tableIndexSQL.append(model.getName().toUpperCase())
.append("_")
.append(String.valueOf(indexSeq++))
.append("_IDX ");
tableIndexSQL.append("ON ").append(model.getName()).append("(")
.append(modelColumn.getColumnName().getStorageName() + "_" + i)
.append(")");
createIndex(client, connection, model, tableIndexSQL);
}
} else {
SQLBuilder tableIndexSQL = new SQLBuilder("CREATE INDEX ");
tableIndexSQL.append(model.getName().toUpperCase())
.append("_")
.append(String.valueOf(indexSeq++))
.append("_IDX ");
tableIndexSQL.append("ON ").append(model.getName()).append("(")
.append(modelColumn.getColumnName().getStorageName())
.append(")");
createIndex(client, connection, model, tableIndexSQL);
}
}
}
for (final ExtraQueryIndex extraQueryIndex : model.getExtraQueryIndices()) {
SQLBuilder tableIndexSQL = new SQLBuilder("CREATE INDEX ");
tableIndexSQL.append(model.getName().toUpperCase())
.append("_")
.append(String.valueOf(indexSeq++))
.append("_IDX ");
.append("_")
.append(String.valueOf(indexSeq++))
.append("_IDX ");
tableIndexSQL.append(" ON ").append(model.getName()).append("(");
final String[] columns = extraQueryIndex.getColumns();
for (int i = 0; i < columns.length; i++) {
......@@ -104,17 +123,18 @@ public class MySQLTableInstaller extends H2TableInstaller {
}
@Override
protected String getColumnType(final ModelColumn column) {
protected String getColumn(final ModelColumn column) {
final String storageName = column.getColumnName().getStorageName();
final Class<?> type = column.getType();
if (StorageDataComplexObject.class.isAssignableFrom(type)) {
return "MEDIUMTEXT";
return storageName + " MEDIUMTEXT";
} else if (String.class.equals(type)) {
if (column.getLength() > 16383) {
return "MEDIUMTEXT";
return storageName + " MEDIUMTEXT";
} else {
return "VARCHAR(" + column.getLength() + ")";
return storageName + " VARCHAR(" + column.getLength() + ")";
}
}
return super.getColumnType(column);
return super.getColumn(column);
}
}
......@@ -19,12 +19,16 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TraceQueryDAO;
public class MySQLTraceQueryDAO extends H2TraceQueryDAO {
public MySQLTraceQueryDAO(JDBCHikariCPClient mysqlClient) {
super(mysqlClient);
public MySQLTraceQueryDAO(ModuleManager manager,
JDBCHikariCPClient h2Client,
final int maxSizeOfArrayColumn,
final int numOfSearchableValuesPerTag) {
super(manager, h2Client, maxSizeOfArrayColumn, numOfSearchableValuesPerTag);
}
@Override
......
......@@ -27,6 +27,7 @@ import java.util.List;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag;
import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
import org.apache.skywalking.oap.server.core.query.type.KeyValue;
import org.apache.skywalking.oap.server.core.query.type.LogEntity;
......@@ -86,7 +87,8 @@ public class ZipkinTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
int limit,
int from,
TraceState traceState,
QueryOrder queryOrder) throws IOException {
QueryOrder queryOrder,
final List<SpanTag> tags) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
......
......@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.apm.network.language.agent.v3.SpanObject;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag;
import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
import org.apache.skywalking.oap.server.core.query.type.Span;
import org.apache.skywalking.oap.server.core.query.type.TraceBrief;
......@@ -50,7 +51,8 @@ public class ProfileTraceDAO implements ITraceQueryDAO {
int limit,
int from,
TraceState traceState,
QueryOrder queryOrder) throws IOException {
QueryOrder queryOrder,
final List<SpanTag> tags) throws IOException {
return null;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册