diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md index c0ce0563d071d9efd40c65d2fd425614eae863c2..6bb7a579c3bb24b5769008c02ce6e7f6728e7241 100644 --- a/docs/en/setup/backend/configuration-vocabulary.md +++ b/docs/en/setup/backend/configuration-vocabulary.md @@ -29,6 +29,7 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode | - | - | serviceNameMaxLength| Max length limitation of service name.|SW_SERVICE_NAME_MAX_LENGTH|70| | - | - | instanceNameMaxLength| Max length limitation of service instance name. The max length of service + instance names should be less than 200.|SW_INSTANCE_NAME_MAX_LENGTH|70| | - | - | endpointNameMaxLength| Max length limitation of endpoint name. The max length of service + endpoint names should be less than 240.|SW_ENDPOINT_NAME_MAX_LENGTH|150| +| - | - | searchableTracesTags | Define the set of span tag keys, which should be searchable through the GraphQL. Multiple values should be separated through the comma. | SW_SEARCHABLE_TAG_KEYS | http.method,status_code,db.type,db.instance,mq.queue,mq.topic,mq.broker| | - | - | gRPCThreadPoolSize|Pool size of gRPC server| - | CPU core * 4| | - | - | gRPCThreadPoolQueueSize| The queue size of gRPC server| - | 10000| | - | - | maxConcurrentCallsPerConnection | The maximum number of concurrent calls permitted for each incoming connection. Defaults to no limit. | - | - | @@ -113,10 +114,14 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode | - | - | url | H2 connection URL. Default is H2 memory mode | SW_STORAGE_H2_URL | jdbc:h2:mem:skywalking-oap-db | | - | - | user | User name of H2 database. | SW_STORAGE_H2_USER | sa | | - | - | password | Password of H2 database. | - | - | -| - | - | metadataQueryMaxSize | The max size of metadata per query. | - | 5000 | +| - | - | metadataQueryMaxSize | The max size of metadata per query. | SW_STORAGE_H2_QUERY_MAX_SIZE | 5000 | +| - | - | maxSizeOfArrayColumn | Some entities, such as trace segment, include the logic column with multiple values. In the H2, we use multiple physical columns to host the values, such as, Change column_a with values [1,2,3,4,5] to `column_a_0 = 1, column_a_1 = 2, column_a_2 = 3 , column_a_3 = 4, column_a_4 = 5` | SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN | 20 | +| - | - | numOfSearchableValuesPerTag | In a trace segment, it includes multiple spans with multiple tags. Different spans could have same tag keys, such as multiple HTTP exit spans all have their own `http.method` tag. This configuration set the limitation of max num of values for the same tag key. | SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG | 2 | | - |mysql| - | MySQL Storage. The MySQL JDBC Driver is not in the dist, please copy it into oap-lib folder manually | - | - | | - | - | properties | Hikari connection pool configurations | - | Listed in the `application.yaml`. | -| - | - | metadataQueryMaxSize | The max size of metadata per query. | - | 5000 | +| - | - | metadataQueryMaxSize | The max size of metadata per query. | SW_STORAGE_MYSQL_QUERY_MAX_SIZE | 5000 | +| - | - | maxSizeOfArrayColumn | Some entities, such as trace segment, include the logic column with multiple values. In the MySQL, we use multiple physical columns to host the values, such as, Change column_a with values [1,2,3,4,5] to `column_a_0 = 1, column_a_1 = 2, column_a_2 = 3 , column_a_3 = 4, column_a_4 = 5` | SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN | 20 | +| - | - | numOfSearchableValuesPerTag | In a trace segment, it includes multiple spans with multiple tags. Different spans could have same tag keys, such as multiple HTTP exit spans all have their own `http.method` tag. This configuration set the limitation of max num of values for the same tag key. | SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG | 2 | | - |influxdb| - | InfluxDB storage. |- | - | | - | - | url| InfluxDB connection URL. | SW_STORAGE_INFLUXDB_URL | http://localhost:8086| | - | - | user | User name of InfluxDB. | SW_STORAGE_INFLUXDB_USER | root| diff --git a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/SegmentAnalysisListener.java b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/SegmentAnalysisListener.java index b0f34613d1b4e17adab7fa9cb74322e2038d7a08..bb887862bc717471ac89f689af95ccd192c025f5 100644 --- a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/SegmentAnalysisListener.java +++ b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/SegmentAnalysisListener.java @@ -18,22 +18,26 @@ package org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener; +import java.util.Arrays; +import java.util.List; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject; import org.apache.skywalking.apm.network.language.agent.v3.SpanObject; import org.apache.skywalking.apm.util.StringUtil; +import org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleConfig; import org.apache.skywalking.oap.server.core.Const; import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.analysis.IDManager; import org.apache.skywalking.oap.server.core.analysis.NodeType; import org.apache.skywalking.oap.server.core.analysis.TimeBucket; +import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag; +import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.core.config.NamingControl; import org.apache.skywalking.oap.server.core.source.Segment; import org.apache.skywalking.oap.server.core.source.SourceReceiver; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.util.BooleanUtils; -import org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleConfig; /** * SegmentSpanListener forwards the segment raw data to the persistence layer with the query required conditions. @@ -44,6 +48,7 @@ public class SegmentAnalysisListener implements FirstAnalysisListener, EntryAnal private final SourceReceiver sourceReceiver; private final TraceSegmentSampler sampler; private final NamingControl namingControl; + private final List searchableTagKeys; private final Segment segment = new Segment(); private SAMPLE_STATUS sampleStatus = SAMPLE_STATUS.UNKNOWN; @@ -96,7 +101,6 @@ public class SegmentAnalysisListener implements FirstAnalysisListener, EntryAnal serviceId, endpointName ); - } @Override @@ -144,11 +148,21 @@ public class SegmentAnalysisListener implements FirstAnalysisListener, EntryAnal if (!isError && span.getIsError()) { isError = true; } + + appendSearchableTags(span); }); final long accurateDuration = endTimestamp - startTimestamp; duration = accurateDuration > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) accurateDuration; } + private void appendSearchableTags(SpanObject span) { + span.getTagsList().forEach(tag -> { + if (searchableTagKeys.contains(tag.getKey())) { + segment.getTags().add(new SpanTag(tag.getKey(), tag.getValue())); + } + }); + } + @Override public void build() { if (log.isDebugEnabled()) { @@ -173,9 +187,14 @@ public class SegmentAnalysisListener implements FirstAnalysisListener, EntryAnal private final SourceReceiver sourceReceiver; private final TraceSegmentSampler sampler; private final NamingControl namingControl; + private final List searchTagKeys; public Factory(ModuleManager moduleManager, AnalyzerModuleConfig config) { this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class); + final ConfigService configService = moduleManager.find(CoreModule.NAME) + .provider() + .getService(ConfigService.class); + this.searchTagKeys = Arrays.asList(configService.getSearchableTracesTags().split(Const.COMMA)); this.sampler = new TraceSegmentSampler(config.getTraceSampleRateWatcher()); this.namingControl = moduleManager.find(CoreModule.NAME) .provider() @@ -184,7 +203,12 @@ public class SegmentAnalysisListener implements FirstAnalysisListener, EntryAnal @Override public AnalysisListener create(ModuleManager moduleManager, AnalyzerModuleConfig config) { - return new SegmentAnalysisListener(sourceReceiver, sampler, namingControl); + return new SegmentAnalysisListener( + sourceReceiver, + sampler, + namingControl, + searchTagKeys + ); } } } diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml index b6c585a259ea9cec92c58acd6c79a159b92e44d8..13a4f17719aedbf2b295efc0afce4344f213a65c 100755 --- a/oap-server/server-bootstrap/src/main/resources/application.yml +++ b/oap-server/server-bootstrap/src/main/resources/application.yml @@ -89,6 +89,8 @@ core: instanceNameMaxLength: ${SW_INSTANCE_NAME_MAX_LENGTH:70} # The max length of service + endpoint names should be less than 240 endpointNameMaxLength: ${SW_ENDPOINT_NAME_MAX_LENGTH:150} + # Define the set of span tag keys, which should be searchable through the GraphQL. + searchableTagKeys: ${SW_SEARCHABLE_TAG_KEYS:http.method,status_code,db.type,db.instance,mq.queue,mq.topic,mq.broker} storage: selector: ${SW_STORAGE:h2} elasticsearch: @@ -140,6 +142,8 @@ storage: url: ${SW_STORAGE_H2_URL:jdbc:h2:mem:skywalking-oap-db} user: ${SW_STORAGE_H2_USER:sa} metadataQueryMaxSize: ${SW_STORAGE_H2_QUERY_MAX_SIZE:5000} + maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20} + numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2} mysql: properties: jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:3306/swtest"} @@ -150,6 +154,8 @@ storage: dataSource.prepStmtCacheSqlLimit: ${SW_DATA_SOURCE_PREP_STMT_CACHE_SQL_LIMIT:2048} dataSource.useServerPrepStmts: ${SW_DATA_SOURCE_USE_SERVER_PREP_STMTS:true} metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000} + maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20} + numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2} influxdb: # InfluxDB configuration url: ${SW_STORAGE_INFLUXDB_URL:http://localhost:8086} @@ -229,7 +235,7 @@ kafka-fetcher: receiver-meter: selector: ${SW_RECEIVER_METER:-} default: - + receiver-oc: selector: ${SW_OC_RECEIVER:-} default: diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java index e67871b31abaf1b9e9f391ce55f83f31c0a3f35a..3592bc85e05dd7cd0410b8d98658a34c0083ef09 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java @@ -27,6 +27,7 @@ public class Const { public static final String RELATION_ID_CONNECTOR = "-"; public static final String RELATION_ID_PARSER_SPLIT = "\\-"; public static final String LINE = "-"; + public static final String COMMA = ","; public static final String SPACE = " "; public static final String KEY_VALUE_SPLIT = ","; public static final String ARRAY_SPLIT = "|"; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java index 4682d40c4172535d3cb3b01b5c25a436069db04e..e36ce7c8ecdaa596b718d083925e6480720bfa69 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java @@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.core; import java.util.ArrayList; import java.util.List; import lombok.Getter; +import lombok.Setter; import org.apache.skywalking.oap.server.core.source.ScopeDefaultColumn; import org.apache.skywalking.oap.server.library.module.ModuleConfig; @@ -116,6 +117,14 @@ public class CoreModuleConfig extends ModuleConfig { * In the current practice, we don't recommend the length over 190. */ private int endpointNameMaxLength = 150; + /** + * Define the set of span tag keys, which should be searchable through the GraphQL. + * + * @since 8.2.0 + */ + @Setter + @Getter + private String searchableTracesTags = DEFAULT_SEARCHABLE_TAG_KEYS; public CoreModuleConfig() { this.downsampling = new ArrayList<>(); @@ -142,4 +151,19 @@ public class CoreModuleConfig extends ModuleConfig { */ Aggregator } + + /** + * SkyWalking Java Agent provides the recommended tag keys for other language agents or SDKs. This field declare the + * recommended keys should be searchable. + */ + private static final String DEFAULT_SEARCHABLE_TAG_KEYS = String.join( + Const.COMMA, + "http.method", + "status_code", + "db.type", + "db.instance", + "mq.queue", + "mq.topic", + "mq.broker" + ); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentDispatcher.java index 69f8fab9e4e261edc4df69c47d1247a39911c8cb..69036baefd8aaecec1114cb973162f2830bc851e 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentDispatcher.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentDispatcher.java @@ -40,6 +40,8 @@ public class SegmentDispatcher implements SourceDispatcher { segment.setDataBinary(source.getDataBinary()); segment.setTimeBucket(source.getTimeBucket()); segment.setVersion(source.getVersion()); + segment.setTagsRawData(source.getTags()); + segment.setTags(SpanTag.Util.toStringList(source.getTags())); RecordStreamProcessor.getInstance().in(segment); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java index a8d523c3d9a3467961de1b53f1e8b879582a2654..bf7369e8176f1dad1b7fe00ed1d2167878f720d2 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java @@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.core.analysis.manual.segment; import java.util.Base64; import java.util.HashMap; +import java.util.List; import java.util.Map; import joptsimple.internal.Strings; import lombok.Getter; @@ -53,6 +54,7 @@ public class SegmentRecord extends Record { public static final String IS_ERROR = "is_error"; public static final String DATA_BINARY = "data_binary"; public static final String VERSION = "version"; + public static final String TAGS = "tags"; @Setter @Getter @@ -106,6 +108,17 @@ public class SegmentRecord extends Record { @Getter @Column(columnName = VERSION, storageOnly = true) private int version; + @Setter + @Getter + @Column(columnName = TAGS) + private List tags; + /** + * Tags raw data is a duplicate field of {@link #tags}. Some storage don't support array values in a single column. + * Then, those implementations could use this raw data to generate necessary data structures. + */ + @Setter + @Getter + private List tagsRawData; @Override public String id() { @@ -139,6 +152,7 @@ public class SegmentRecord extends Record { map.put(DATA_BINARY, new String(Base64.getEncoder().encode(storageData.getDataBinary()))); } map.put(VERSION, storageData.getVersion()); + map.put(TAGS, storageData.getTags()); return map; } @@ -163,6 +177,7 @@ public class SegmentRecord extends Record { record.setDataBinary(Base64.getDecoder().decode((String) dbMap.get(DATA_BINARY))); } record.setVersion(((Number) dbMap.get(VERSION)).intValue()); + // Don't read the tags as they has been in the data binary already. return record; } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SpanTag.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SpanTag.java new file mode 100644 index 0000000000000000000000000000000000000000..4dc2514c1f8beed7dfdc4413f90f3c9299d4862a --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SpanTag.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.manual.segment; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import lombok.Getter; +import lombok.Setter; +import org.apache.skywalking.oap.server.library.util.CollectionUtils; + +@Getter +@Setter +public class SpanTag { + private String key; + private String value; + + public SpanTag() { + } + + public SpanTag(String key, String value) { + this.key = key; + this.value = value; + } + + @Override + public String toString() { + return key + "=" + value; + } + + public static class Util { + public static List toStringList(List list) { + if (CollectionUtils.isEmpty(list)) { + return Collections.emptyList(); + } + List result = new ArrayList<>(list.size()); + list.forEach(e -> result.add(e.toString())); + return result; + } + } +} + diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/ConfigService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/ConfigService.java index 923d101c8a76a74420748022f1c217a66a945591..a5fd428e355681f06369481aa50c7ec7f5c8a6a3 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/ConfigService.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/ConfigService.java @@ -26,9 +26,11 @@ import org.apache.skywalking.oap.server.library.module.Service; public class ConfigService implements Service { private final String gRPCHost; private final int gRPCPort; + private final String searchableTracesTags; public ConfigService(CoreModuleConfig moduleConfig) { this.gRPCHost = moduleConfig.getGRPCHost(); this.gRPCPort = moduleConfig.getGRPCPort(); + this.searchableTracesTags = moduleConfig.getSearchableTracesTags(); } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java index 4fc0c8776037e9291bf8846f3153cd068a0bc0bc..1b73ffd36e17643d9f7e9f2d5cc4fc729a2d07af 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java @@ -27,6 +27,7 @@ import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject; import org.apache.skywalking.oap.server.core.Const; import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; +import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag; import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService; import org.apache.skywalking.oap.server.core.query.type.KeyValue; import org.apache.skywalking.oap.server.core.query.type.LogEntity; @@ -83,13 +84,13 @@ public class TraceQueryService implements Service { final QueryOrder queryOrder, final Pagination paging, final long startTB, - final long endTB) throws IOException { + final long endTB, + final List tags) throws IOException { PaginationUtils.Page page = PaginationUtils.INSTANCE.exchange(paging); return getTraceQueryDAO().queryBasicTraces( startTB, endTB, minTraceDuration, maxTraceDuration, endpointName, serviceId, serviceInstanceId, endpointId, - traceId, page - .getLimit(), page.getFrom(), traceState, queryOrder + traceId, page.getLimit(), page.getFrom(), traceState, queryOrder, tags ); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/input/TraceQueryCondition.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/input/TraceQueryCondition.java index 86cfb140e8f522ea47973dc968ab2eb0be935118..2b6845fb30a095216fa1cff62a6afc63d5235085 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/input/TraceQueryCondition.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/input/TraceQueryCondition.java @@ -18,8 +18,10 @@ package org.apache.skywalking.oap.server.core.query.input; +import java.util.List; import lombok.Getter; import lombok.Setter; +import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag; import org.apache.skywalking.oap.server.core.query.type.Pagination; import org.apache.skywalking.oap.server.core.query.type.QueryOrder; import org.apache.skywalking.oap.server.core.query.type.TraceState; @@ -38,4 +40,5 @@ public class TraceQueryCondition { private TraceState traceState; private QueryOrder queryOrder; private Pagination paging; + private List tags; } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Segment.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Segment.java index 2bdec9aa2238dda8a535f0199633966a8902a7f7..f7b58f5a33232a3d8fb88e49c4d5d4afce114249 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Segment.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Segment.java @@ -18,8 +18,11 @@ package org.apache.skywalking.oap.server.core.source; +import java.util.ArrayList; +import java.util.List; import lombok.Getter; import lombok.Setter; +import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag; import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SEGMENT; @@ -72,4 +75,7 @@ public class Segment extends Source { @Setter @Getter private int version; + @Setter + @Getter + private List tags = new ArrayList<>(); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/DataTypeMapping.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/DataTypeMapping.java index 4ad3622adb2e8fcd856ff37ee17874c3811b462f..1f679de7e5c87e11a2df9f3205ec66c1bf94ce3f 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/DataTypeMapping.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/DataTypeMapping.java @@ -18,7 +18,11 @@ package org.apache.skywalking.oap.server.core.storage.model; -public interface DataTypeMapping { +import java.lang.reflect.Type; - String transform(Class type); +public interface DataTypeMapping { + /** + * Map the given typd and genericType of the field to the column type. + */ + String transform(Class type, Type genericType); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumn.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumn.java index 7f8c9a003975e96de5c2bacf48cc486de46cd7df..0d0c2bd4d0258e6447bca660ce0d66c653af008c 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumn.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumn.java @@ -18,7 +18,7 @@ package org.apache.skywalking.oap.server.core.storage.model; -import com.google.gson.JsonObject; +import java.lang.reflect.Type; import lombok.Getter; import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable; @@ -26,28 +26,23 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable; public class ModelColumn { private final ColumnName columnName; private final Class type; + private final Type genericType; private final boolean matchQuery; private final boolean storageOnly; private final int length; public ModelColumn(ColumnName columnName, Class type, + Type genericType, boolean matchQuery, boolean storageOnly, boolean isValue, int length) { this.columnName = columnName; this.type = type; + this.genericType = genericType; this.matchQuery = matchQuery; - - /* - * Only accept length in the String/JsonObject definition. - */ - if (type.equals(String.class) || type.equals(JsonObject.class)) { - this.length = length; - } else { - this.length = 0; - } + this.length = length; /* * byte[] and {@link IntKeyLongValueHashMap} could never be query. */ diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java index 6000a93591874f11d888fdb9cb598417b36d45af..600dbe93fd49503b1c93b717912a88ce7184f4eb 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java @@ -122,8 +122,8 @@ public class StorageModels implements IModelManager, ModelCreator, ModelManipula } modelColumns.add( new ModelColumn( - new ColumnName(modelName, column.columnName()), field.getType(), column.matchQuery(), - column.storageOnly(), column.dataType().isValue(), columnLength + new ColumnName(modelName, column.columnName()), field.getType(), field.getGenericType(), + column.matchQuery(), column.storageOnly(), column.dataType().isValue(), columnLength )); if (log.isDebugEnabled()) { log.debug("The field named {} with the {} type", column.columnName(), field.getType()); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java index 366003441fa9bc00d2113bd469bdeac4929394ff..35e48cf700f56e5f10b328ffbf5bd70879f102fb 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITraceQueryDAO.java @@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.core.storage.query; import java.io.IOException; import java.util.List; import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; +import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag; import org.apache.skywalking.oap.server.core.query.type.QueryOrder; import org.apache.skywalking.oap.server.core.query.type.Span; import org.apache.skywalking.oap.server.core.query.type.TraceBrief; @@ -29,9 +30,20 @@ import org.apache.skywalking.oap.server.library.module.Service; public interface ITraceQueryDAO extends Service { - TraceBrief queryBasicTraces(long startSecondTB, long endSecondTB, long minDuration, long maxDuration, - String endpointName, String serviceId, String serviceInstanceId, String endpointId, String traceId, int limit, int from, - TraceState traceState, QueryOrder queryOrder) throws IOException; + TraceBrief queryBasicTraces(long startSecondTB, + long endSecondTB, + long minDuration, + long maxDuration, + String endpointName, + String serviceId, + String serviceInstanceId, + String endpointId, + String traceId, + int limit, + int from, + TraceState traceState, + QueryOrder queryOrder, + final List tags) throws IOException; List queryByTraceId(String traceId) throws IOException; diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java index 7c2d0a26b9ec9bde7e7235e0b3c6cf658b73f94b..670eac243284bf152eaa797192caf7c7759ac704 100644 --- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java +++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java @@ -25,20 +25,20 @@ import org.junit.Test; public class ModelColumnTest { @Test public void testColumnDefine() { - ModelColumn column = new ModelColumn(new ColumnName("", "abc"), byte[].class, true, + ModelColumn column = new ModelColumn(new ColumnName("", "abc"), byte[].class, byte[].class, true, false, true, 0 ); Assert.assertEquals(true, column.isStorageOnly()); Assert.assertEquals("abc", column.getColumnName().getName()); - column = new ModelColumn(new ColumnName("", "abc"), DataTable.class, true, + column = new ModelColumn(new ColumnName("", "abc"), DataTable.class, DataTable.class, true, false, true, 200 ); Assert.assertEquals(true, column.isStorageOnly()); Assert.assertEquals("abc", column.getColumnName().getName()); - Assert.assertEquals(0, column.getLength()); + Assert.assertEquals(200, column.getLength()); - column = new ModelColumn(new ColumnName("", "abc"), String.class, true, + column = new ModelColumn(new ColumnName("", "abc"), String.class, String.class, true, false, true, 200 ); Assert.assertEquals(false, column.isStorageOnly()); @@ -47,7 +47,7 @@ public class ModelColumnTest { @Test(expected = IllegalArgumentException.class) public void testConflictDefinition() { - ModelColumn column = new ModelColumn(new ColumnName("", "abc"), String.class, + ModelColumn column = new ModelColumn(new ColumnName("", "abc"), String.class, String.class, true, true, true, 200 ); } diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TraceQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TraceQuery.java index 804334e2bb6de3d1bfa2d3f23ade7adb56c604d8..d9df534dbbf318c3e223ee3d8566796e37324218 100644 --- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TraceQuery.java +++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TraceQuery.java @@ -75,7 +75,7 @@ public class TraceQuery implements GraphQLQueryResolver { return getQueryService().queryBasicTraces( condition.getServiceId(), condition.getServiceInstanceId(), endpointId, traceId, endpointName, minDuration, - maxDuration, traceState, queryOrder, pagination, startSecondTB, endSecondTB + maxDuration, traceState, queryOrder, pagination, startSecondTB, endSecondTB, condition.getTags() ); } diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol index 563bb51c71922f017911345d7cd5c62a7ac8995c..f38def1d502327856c1cae7ceb233f3c0c8c8e2a 160000 --- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol +++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol @@ -1 +1 @@ -Subproject commit 563bb51c71922f017911345d7cd5c62a7ac8995c +Subproject commit f38def1d502327856c1cae7ceb233f3c0c8c8e2a diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceAMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceAMock.java index f72ec1b14ecb59bf5931363ec1b736afa9544438..6192eb5369a0cb6c0fb9225bf68b855a0feb9ebe 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceAMock.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceAMock.java @@ -19,6 +19,7 @@ package org.apache.skywalking.oap.server.receiver.trace.mock; import io.grpc.stub.StreamObserver; +import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair; import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject; import org.apache.skywalking.apm.network.language.agent.v3.SpanLayer; import org.apache.skywalking.apm.network.language.agent.v3.SpanObject; @@ -29,7 +30,7 @@ class ServiceAMock { public static String SERVICE_NAME = "mock_a_service"; public static String SERVICE_INSTANCE_NAME = "mock_a_service_instance"; - static String REST_ENDPOINT = "/dubbox-case/case/dubbox-rest"; + static String REST_ENDPOINT = "/dubbox-case/case/dubbox-rest/404-test"; static String DUBBO_ENDPOINT = "org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()"; static String DUBBO_ADDRESS = "DubboIPAddress:1000"; @@ -62,6 +63,9 @@ class ServiceAMock { span.setComponentId(ComponentsDefine.TOMCAT.getId()); span.setOperationName(REST_ENDPOINT); span.setIsError(false); + span.addTags(KeyStringValuePair.newBuilder().setKey("http.method").setValue("get").build()); + span.addTags(KeyStringValuePair.newBuilder().setKey("status_code").setValue("404").build()); + span.addTags(KeyStringValuePair.newBuilder().setKey("status_code").setValue("200").build()); return span; } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java index 7cec27ec28255713b727a18b1687ac48d43c02fa..6acc3be827404afebe0ab6a6664be03412139e24 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java @@ -19,6 +19,9 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base; import com.google.gson.JsonObject; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.List; import org.apache.skywalking.oap.server.core.analysis.NodeType; import org.apache.skywalking.oap.server.core.storage.model.DataTypeMapping; import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject; @@ -26,7 +29,7 @@ import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObje public class ColumnTypeEsMapping implements DataTypeMapping { @Override - public String transform(Class type) { + public String transform(Class type, Type genericType) { if (Integer.class.equals(type) || int.class.equals(type) || NodeType.class.equals(type)) { return "integer"; } else if (Long.class.equals(type) || long.class.equals(type)) { @@ -41,6 +44,9 @@ public class ColumnTypeEsMapping implements DataTypeMapping { return "binary"; } else if (JsonObject.class.equals(type)) { return "text"; + } else if (List.class.isAssignableFrom(type)) { + final Type elementType = ((ParameterizedType) genericType).getActualTypeArguments()[0]; + return transform((Class) elementType, elementType); } else { throw new IllegalArgumentException("Unsupported data type: " + type.getName()); } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java index 143d095bd2ac2e0f18223ef1a84a4a6fd298076e..9dc331b8e20e5c1fe3179e5a4fb76805ab80b640 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java @@ -133,7 +133,7 @@ public class StorageEsInstaller extends ModelInstaller { String matchCName = MatchCNameBuilder.INSTANCE.build(columnDefine.getColumnName().getName()); Map originalColumn = new HashMap<>(); - originalColumn.put("type", columnTypeEsMapping.transform(columnDefine.getType())); + originalColumn.put("type", columnTypeEsMapping.transform(columnDefine.getType(), columnDefine.getGenericType())); originalColumn.put("copy_to", matchCName); properties.put(columnDefine.getColumnName().getName(), originalColumn); @@ -143,7 +143,7 @@ public class StorageEsInstaller extends ModelInstaller { properties.put(matchCName, matchColumn); } else { Map column = new HashMap<>(); - column.put("type", columnTypeEsMapping.transform(columnDefine.getType())); + column.put("type", columnTypeEsMapping.transform(columnDefine.getType(), columnDefine.getGenericType())); if (columnDefine.isStorageOnly()) { column.put("index", false); } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java index 192be91ee41a09ac746910b340b6d62df46be230..047be7a55c3c82291e43ef13b258276e3d7ab29b 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TraceQueryEsDAO.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.List; import org.apache.skywalking.apm.util.StringUtil; import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; +import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag; import org.apache.skywalking.oap.server.core.query.type.BasicTrace; import org.apache.skywalking.oap.server.core.query.type.QueryOrder; import org.apache.skywalking.oap.server.core.query.type.Span; @@ -34,6 +35,7 @@ import org.apache.skywalking.oap.server.core.query.type.TraceState; import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO; import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.oap.server.library.util.BooleanUtils; +import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder; import org.elasticsearch.action.search.SearchResponse; @@ -67,7 +69,8 @@ public class TraceQueryEsDAO extends EsDAO implements ITraceQueryDAO { int limit, int from, TraceState traceState, - QueryOrder queryOrder) throws IOException { + QueryOrder queryOrder, + final List tags) throws IOException { SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource(); BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); @@ -120,6 +123,13 @@ public class TraceQueryEsDAO extends EsDAO implements ITraceQueryDAO { sourceBuilder.sort(SegmentRecord.LATENCY, SortOrder.DESC); break; } + if (CollectionUtils.isNotEmpty(tags)) { + BoolQueryBuilder tagMatchQuery = QueryBuilders.boolQuery(); + tags.forEach(tag -> { + tagMatchQuery.must(QueryBuilders.termQuery(SegmentRecord.TAGS, tag.toString())); + }); + mustQueryList.add(tagMatchQuery); + } sourceBuilder.size(limit); sourceBuilder.from(from); @@ -135,9 +145,11 @@ public class TraceQueryEsDAO extends EsDAO implements ITraceQueryDAO { basicTrace.setStart(String.valueOf(searchHit.getSourceAsMap().get(SegmentRecord.START_TIME))); basicTrace.getEndpointNames().add((String) searchHit.getSourceAsMap().get(SegmentRecord.ENDPOINT_NAME)); basicTrace.setDuration(((Number) searchHit.getSourceAsMap().get(SegmentRecord.LATENCY)).intValue()); - basicTrace.setError(BooleanUtils.valueToBoolean(((Number) searchHit.getSourceAsMap() - .get( - SegmentRecord.IS_ERROR)).intValue())); + basicTrace.setError( + BooleanUtils.valueToBoolean( + ((Number) searchHit.getSourceAsMap().get(SegmentRecord.IS_ERROR)).intValue() + ) + ); basicTrace.getTraceIds().add((String) searchHit.getSourceAsMap().get(SegmentRecord.TRACE_ID)); traceBrief.getTraces().add(basicTrace); } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchColumnTypeMappingTestCase.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchColumnTypeMappingTestCase.java index 807a7b832a89bb516bd1ee024586418045539071..ae1ecc78b701653a818a0fac23cdb4c8ae35d08d 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchColumnTypeMappingTestCase.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchColumnTypeMappingTestCase.java @@ -18,24 +18,30 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base; +import java.lang.reflect.Type; +import java.util.List; import org.junit.Assert; import org.junit.Test; public class ElasticSearchColumnTypeMappingTestCase { + public List a; @Test - public void test() { + public void test() throws NoSuchFieldException { ColumnTypeEsMapping mapping = new ColumnTypeEsMapping(); - Assert.assertEquals("integer", mapping.transform(int.class)); - Assert.assertEquals("integer", mapping.transform(Integer.class)); + Assert.assertEquals("integer", mapping.transform(int.class, int.class)); + Assert.assertEquals("integer", mapping.transform(Integer.class, Integer.class)); - Assert.assertEquals("long", mapping.transform(long.class)); - Assert.assertEquals("long", mapping.transform(Long.class)); + Assert.assertEquals("long", mapping.transform(long.class, long.class)); + Assert.assertEquals("long", mapping.transform(Long.class, Long.class)); - Assert.assertEquals("double", mapping.transform(double.class)); - Assert.assertEquals("double", mapping.transform(Double.class)); + Assert.assertEquals("double", mapping.transform(double.class, double.class)); + Assert.assertEquals("double", mapping.transform(Double.class, Double.class)); - Assert.assertEquals("keyword", mapping.transform(String.class)); + Assert.assertEquals("keyword", mapping.transform(String.class, String.class)); + + final Type listFieldType = this.getClass().getField("a").getGenericType(); + Assert.assertEquals("keyword", mapping.transform(List.class, listFieldType)); } } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/TraceQueryEs7DAO.java b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/TraceQueryEs7DAO.java index c78628dbf820df4602f60928599c4afbb0c700cc..2077fa96f15f94ede06b85aadc7c7424b53ff1a1 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/TraceQueryEs7DAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/query/TraceQueryEs7DAO.java @@ -23,12 +23,14 @@ import java.io.IOException; import java.util.List; import org.apache.skywalking.apm.util.StringUtil; import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; +import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag; import org.apache.skywalking.oap.server.core.query.type.BasicTrace; import org.apache.skywalking.oap.server.core.query.type.QueryOrder; import org.apache.skywalking.oap.server.core.query.type.TraceBrief; import org.apache.skywalking.oap.server.core.query.type.TraceState; import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.oap.server.library.util.BooleanUtils; +import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder; import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TraceQueryEsDAO; import org.elasticsearch.action.search.SearchResponse; @@ -59,7 +61,8 @@ public class TraceQueryEs7DAO extends TraceQueryEsDAO { int limit, int from, TraceState traceState, - QueryOrder queryOrder) throws IOException { + QueryOrder queryOrder, + final List tags) throws IOException { SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource(); BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); @@ -112,6 +115,13 @@ public class TraceQueryEs7DAO extends TraceQueryEsDAO { sourceBuilder.sort(SegmentRecord.LATENCY, SortOrder.DESC); break; } + if (CollectionUtils.isNotEmpty(tags)) { + BoolQueryBuilder tagMatchQuery = QueryBuilders.boolQuery(); + tags.forEach(tag -> { + tagMatchQuery.must(QueryBuilders.termQuery(SegmentRecord.TAGS, tag.toString())); + }); + mustQueryList.add(tagMatchQuery); + } sourceBuilder.size(limit); sourceBuilder.from(from); @@ -127,9 +137,10 @@ public class TraceQueryEs7DAO extends TraceQueryEsDAO { basicTrace.setStart(String.valueOf(searchHit.getSourceAsMap().get(SegmentRecord.START_TIME))); basicTrace.getEndpointNames().add((String) searchHit.getSourceAsMap().get(SegmentRecord.ENDPOINT_NAME)); basicTrace.setDuration(((Number) searchHit.getSourceAsMap().get(SegmentRecord.LATENCY)).intValue()); - basicTrace.setError(BooleanUtils.valueToBoolean(((Number) searchHit.getSourceAsMap() - .get( - SegmentRecord.IS_ERROR)).intValue())); + basicTrace.setError( + BooleanUtils.valueToBoolean( + ((Number) searchHit.getSourceAsMap().get(SegmentRecord.IS_ERROR)).intValue()) + ); basicTrace.getTraceIds().add((String) searchHit.getSourceAsMap().get(SegmentRecord.TRACE_ID)); traceBrief.getTraces().add(basicTrace); } diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/TableMetaInfo.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/TableMetaInfo.java index 3d73663172eeb29d2abbe115762bf12faebafb96..dfd4476d99846b229db649e8fb5b28dff2126af8 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/TableMetaInfo.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/TableMetaInfo.java @@ -80,6 +80,11 @@ public class TableMetaInfo { if (storageAndColumnMap.containsKey(SegmentRecord.SERVICE_ID)) { storageAndTagMap.put(SegmentRecord.SERVICE_ID, InfluxConstants.TagName.SERVICE_ID); } + + // The field of SegmentRecord, tags, store as tag only. see SegmentRecord.DATA_BINARY + if (SegmentRecord.INDEX_NAME.equals(model.getName())) { + storageAndColumnMap.remove(SegmentRecord.TAGS); + } } TableMetaInfo info = TableMetaInfo.builder() diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/InfluxInsertRequest.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/InfluxInsertRequest.java index f9843721654341c02a7e7916706c63ca97e6aff3..c510d2e810a1fa8dfb4d6f54fa6f5fa1dd20ea96 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/InfluxInsertRequest.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/InfluxInsertRequest.java @@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.storage.plugin.influxdb.base; import com.google.common.collect.Maps; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; import org.apache.skywalking.oap.server.core.storage.StorageBuilder; import org.apache.skywalking.oap.server.core.storage.StorageData; import org.apache.skywalking.oap.server.core.storage.model.Model; @@ -40,6 +41,9 @@ public class InfluxInsertRequest implements InsertRequest, UpdateRequest { public InfluxInsertRequest(Model model, StorageData storageData, StorageBuilder storageBuilder) { Map objectMap = storageBuilder.data2Map(storageData); + if (SegmentRecord.INDEX_NAME.equals(model.getName())) { + objectMap.remove(SegmentRecord.TAGS); + } for (ModelColumn column : model.getColumns()) { Object value = objectMap.get(column.getColumnName().getName()); @@ -68,6 +72,11 @@ public class InfluxInsertRequest implements InsertRequest, UpdateRequest { return this; } + public InfluxInsertRequest tag(String key, String value) { + builder.tag(key, value); + return this; + } + public Point getPoint() { return builder.build(); } diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/RecordDAO.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/RecordDAO.java index 63739c1708d30e2414d76f136a97602d9fdfc8dc..a3d1bc7e71d99f9303e3db2249be32e8a32a3591 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/RecordDAO.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/RecordDAO.java @@ -18,10 +18,16 @@ package org.apache.skywalking.oap.server.storage.plugin.influxdb.base; +import com.google.common.base.Joiner; import java.io.IOException; +import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger; import org.apache.skywalking.oap.server.core.analysis.TimeBucket; +import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; +import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag; import org.apache.skywalking.oap.server.core.analysis.record.Record; import org.apache.skywalking.oap.server.core.storage.IRecordDAO; import org.apache.skywalking.oap.server.core.storage.StorageBuilder; @@ -52,6 +58,21 @@ public class RecordDAO implements IRecordDAO { TableMetaInfo.get(model.getName()).getStorageAndTagMap().forEach((field, tag) -> { request.addFieldAsTag(field, tag); }); + + if (SegmentRecord.INDEX_NAME.equals(model.getName())) { + Map> collect = ((SegmentRecord) record).getTagsRawData() + .stream() + .collect( + Collectors.groupingBy(SpanTag::getKey)); + collect.entrySet().forEach(e -> { + request.tag(e.getKey(), "'" + Joiner.on("'") + .join(e.getValue() + .stream() + .map(SpanTag::getValue) + .collect(Collectors.toSet())) + "'"); + }); + } return request; } + } diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TraceQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TraceQuery.java index f315082946fa69b36310b8202028a1708027f882..f01602b9241c58de34bb2327ef0c12b755b6d55f 100644 --- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TraceQuery.java +++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TraceQuery.java @@ -26,6 +26,7 @@ import java.util.List; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.apm.util.StringUtil; import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; +import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag; import org.apache.skywalking.oap.server.core.query.type.BasicTrace; import org.apache.skywalking.oap.server.core.query.type.QueryOrder; import org.apache.skywalking.oap.server.core.query.type.Span; @@ -33,12 +34,14 @@ import org.apache.skywalking.oap.server.core.query.type.TraceBrief; import org.apache.skywalking.oap.server.core.query.type.TraceState; import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO; import org.apache.skywalking.oap.server.library.util.BooleanUtils; +import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient; import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants; import org.elasticsearch.common.Strings; import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; import org.influxdb.querybuilder.SelectQueryImpl; +import org.influxdb.querybuilder.WhereNested; import org.influxdb.querybuilder.WhereQueryImpl; import org.influxdb.querybuilder.clauses.Clause; @@ -69,7 +72,8 @@ public class TraceQuery implements ITraceQueryDAO { int limit, int from, TraceState traceState, - QueryOrder queryOrder) + QueryOrder queryOrder, + final List tags) throws IOException { String orderBy = SegmentRecord.START_TIME; @@ -121,6 +125,13 @@ public class TraceQuery implements ITraceQueryDAO { recallQuery.and(eq(SegmentRecord.IS_ERROR, BooleanUtils.FALSE)); break; } + if (CollectionUtils.isNotEmpty(tags)) { + WhereNested> nested = recallQuery.andNested(); + for (final SpanTag tag : tags) { + nested.and(contains(tag.getKey(), "'" + tag.getValue() + "'")); + } + nested.close(); + } WhereQueryImpl countQuery = select() .count(SegmentRecord.ENDPOINT_ID) diff --git a/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/elasticsearch/JaegerTraceQueryEsDAO.java b/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/elasticsearch/JaegerTraceQueryEsDAO.java index 6bcd4af29ed88318ecfa536ec2ff91e6acc018c0..4e8c48f3ec5227f2527ab5d071633b59eaf26ae8 100644 --- a/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/elasticsearch/JaegerTraceQueryEsDAO.java +++ b/oap-server/server-storage-plugin/storage-jaeger-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jaeger/elasticsearch/JaegerTraceQueryEsDAO.java @@ -30,6 +30,7 @@ import java.util.List; import org.apache.skywalking.apm.util.StringUtil; import org.apache.skywalking.oap.server.core.analysis.IDManager; import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; +import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag; import org.apache.skywalking.oap.server.core.query.type.BasicTrace; import org.apache.skywalking.oap.server.core.query.type.KeyValue; import org.apache.skywalking.oap.server.core.query.type.LogEntity; @@ -89,7 +90,8 @@ public class JaegerTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO { int limit, int from, TraceState traceState, - QueryOrder queryOrder) throws IOException { + QueryOrder queryOrder, + final List tags) throws IOException { SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource(); diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageConfig.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageConfig.java index b39c2f2cddb71c7bfa97bc938e80e141654acc95..75667e63d14a473ebb833f1c955a99e4bf01704e 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageConfig.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageConfig.java @@ -30,4 +30,33 @@ public class H2StorageConfig extends ModuleConfig { private String user = ""; private String password = ""; private int metadataQueryMaxSize = 5000; + /** + * Some entities, such as trace segment, include the logic column with multiple values. Some storage support this + * kind of data structure, but H2 doesn't. + * + * In the H2, we use multiple physical columns to host the values, such as, + * + * Change column_a with values [1,2,3,4,5] to + *

+ * column_a_0 = 1, column_a_1 = 2, column_a_2 = 3 , column_a_3 = 4, column_a_4 = 5 + *

+ * + * This configuration controls the threshold about how many physical columns should to be added, also limit the max + * values of this kind of column. + * + * SkyWalking don't create a new table for indexing, because it would amplify the size of data set to dozens time, + * which is not practical in the production environment. + * + * @since 8.2.0 + */ + private int maxSizeOfArrayColumn = 20; + /** + * In a trace segment, it includes multiple spans with multiple tags. Different spans could have same tag keys, such + * as multiple HTTP exit spans all have their own `http.method` tag. + * + * This configuration set the limitation of max num of values for the same tag key. + * + * @since 8.2.0 + */ + private int numOfSearchableValuesPerTag = 2; } diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java index 265352268bad498244b557eca53a4581d9f079a0..82db55355da961e76002418cb65d748983d29968 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java @@ -20,7 +20,9 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2; import java.util.Properties; import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.oap.server.core.Const; import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.core.storage.IBatchDAO; import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO; import org.apache.skywalking.oap.server.core.storage.StorageDAO; @@ -109,39 +111,64 @@ public class H2StorageProvider extends ModuleProvider { h2Client = new JDBCHikariCPClient(settings); this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(h2Client)); - this.registerServiceImplementation(StorageDAO.class, new H2StorageDAO(h2Client)); + this.registerServiceImplementation( + StorageDAO.class, + new H2StorageDAO( + getManager(), h2Client, config.getMaxSizeOfArrayColumn(), config.getNumOfSearchableValuesPerTag()) + ); this.registerServiceImplementation( - INetworkAddressAliasDAO.class, new H2NetworkAddressAliasDAO(h2Client)); + INetworkAddressAliasDAO.class, new H2NetworkAddressAliasDAO(h2Client)); this.registerServiceImplementation(ITopologyQueryDAO.class, new H2TopologyQueryDAO(h2Client)); this.registerServiceImplementation(IMetricsQueryDAO.class, new H2MetricsQueryDAO(h2Client)); - this.registerServiceImplementation(ITraceQueryDAO.class, new H2TraceQueryDAO(h2Client)); this.registerServiceImplementation( - IMetadataQueryDAO.class, new H2MetadataQueryDAO(h2Client, config.getMetadataQueryMaxSize())); + ITraceQueryDAO.class, new H2TraceQueryDAO( + getManager(), + h2Client, + config.getMaxSizeOfArrayColumn(), + config.getNumOfSearchableValuesPerTag() + )); + this.registerServiceImplementation( + IMetadataQueryDAO.class, new H2MetadataQueryDAO(h2Client, config.getMetadataQueryMaxSize())); this.registerServiceImplementation(IAggregationQueryDAO.class, new H2AggregationQueryDAO(h2Client)); this.registerServiceImplementation(IAlarmQueryDAO.class, new H2AlarmQueryDAO(h2Client)); this.registerServiceImplementation( - IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(h2Client)); + IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(h2Client)); this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new H2TopNRecordsQueryDAO(h2Client)); this.registerServiceImplementation(ILogQueryDAO.class, new H2LogQueryDAO(h2Client)); this.registerServiceImplementation(IProfileTaskQueryDAO.class, new H2ProfileTaskQueryDAO(h2Client)); this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new H2ProfileTaskLogQueryDAO(h2Client)); this.registerServiceImplementation( - IProfileThreadSnapshotQueryDAO.class, new H2ProfileThreadSnapshotQueryDAO(h2Client)); + IProfileThreadSnapshotQueryDAO.class, new H2ProfileThreadSnapshotQueryDAO(h2Client)); this.registerServiceImplementation(UITemplateManagementDAO.class, new H2UITemplateManagementDAO(h2Client)); } @Override public void start() throws ServiceNotProvidedException, ModuleStartException { - MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME).provider().getService(MetricsCreator.class); - HealthCheckMetrics healthChecker = metricCreator.createHealthCheckerGauge("storage_h2", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE); + final ConfigService configService = getManager().find(CoreModule.NAME) + .provider() + .getService(ConfigService.class); + final int numOfSearchableTags = configService.getSearchableTracesTags().split(Const.COMMA).length; + if (numOfSearchableTags * config.getNumOfSearchableValuesPerTag() > config.getMaxSizeOfArrayColumn()) { + throw new ModuleStartException("Size of searchableTracesTags[" + numOfSearchableTags + + "] * numOfSearchableValuesPerTag[" + config.getNumOfSearchableValuesPerTag() + + "] > maxSizeOfArrayColumn[" + config.getMaxSizeOfArrayColumn() + + "]. Potential out of bound in the runtime."); + } + + MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME) + .provider() + .getService(MetricsCreator.class); + HealthCheckMetrics healthChecker = metricCreator.createHealthCheckerGauge( + "storage_h2", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE); h2Client.registerChecker(healthChecker); try { h2Client.connect(); - H2TableInstaller installer = new H2TableInstaller(h2Client, getManager()); + H2TableInstaller installer = new H2TableInstaller( + h2Client, getManager(), config.getMaxSizeOfArrayColumn(), config.getNumOfSearchableValuesPerTag()); getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer); } catch (StorageException e) { throw new ModuleStartException(e.getMessage(), e); @@ -155,6 +182,6 @@ public class H2StorageProvider extends ModuleProvider { @Override public String[] requiredModules() { - return new String[]{CoreModule.NAME}; + return new String[] {CoreModule.NAME}; } } diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RecordDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RecordDAO.java index c0c08eb0a950113b2d8f010faa6282562f0cbde6..2cce60f72309cff7aded5e4657ecafd1f73af194 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RecordDAO.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RecordDAO.java @@ -19,25 +19,58 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao; import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.UnexpectedException; +import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; import org.apache.skywalking.oap.server.core.analysis.record.Record; +import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.core.storage.IRecordDAO; import org.apache.skywalking.oap.server.core.storage.StorageBuilder; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient; import org.apache.skywalking.oap.server.library.client.request.InsertRequest; +import org.apache.skywalking.oap.server.library.module.ModuleManager; public class H2RecordDAO extends H2SQLExecutor implements IRecordDAO { - private JDBCHikariCPClient h2Client; private StorageBuilder storageBuilder; + private final int maxSizeOfArrayColumn; - public H2RecordDAO(JDBCHikariCPClient h2Client, StorageBuilder storageBuilder) { + public H2RecordDAO(ModuleManager manager, + JDBCHikariCPClient h2Client, + StorageBuilder storageBuilder, + final int maxSizeOfArrayColumn, + final int numOfSearchableValuesPerTag) { this.h2Client = h2Client; - this.storageBuilder = storageBuilder; + try { + if (SegmentRecord.class + .equals( + storageBuilder.getClass().getMethod("map2Data", Map.class).getReturnType() + ) + ) { + this.maxSizeOfArrayColumn = maxSizeOfArrayColumn; + final ConfigService configService = manager.find(CoreModule.NAME) + .provider() + .getService(ConfigService.class); + this.storageBuilder = new H2SegmentRecordBuilder( + maxSizeOfArrayColumn, + numOfSearchableValuesPerTag, + Arrays.asList(configService.getSearchableTracesTags().split(Const.COMMA)) + ); + } else { + this.maxSizeOfArrayColumn = 1; + this.storageBuilder = storageBuilder; + } + } catch (NoSuchMethodException e) { + throw new UnexpectedException("Can't find the SegmentRecord$Builder.map2Data method."); + } } @Override public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException { - return getInsertExecutor(model.getName(), record, storageBuilder); + return getInsertExecutor(model.getName(), record, storageBuilder, maxSizeOfArrayColumn); } } diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SQLExecutor.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SQLExecutor.java index 04c79ca70eb9ebad4cc2bcf845565e5ffe0b1c47..b63e910fa286b976633e12a06cc6a7c836532a79 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SQLExecutor.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SQLExecutor.java @@ -105,6 +105,12 @@ public class H2SQLExecutor { protected SQLExecutor getInsertExecutor(String modelName, T metrics, StorageBuilder storageBuilder) throws IOException { + return getInsertExecutor(modelName, metrics, storageBuilder, 1); + } + + protected SQLExecutor getInsertExecutor(String modelName, T metrics, + StorageBuilder storageBuilder, + int maxSizeOfArrayColumn) throws IOException { Map objectMap = storageBuilder.data2Map(metrics); SQLBuilder sqlBuilder = new SQLBuilder("INSERT INTO " + modelName + " VALUES"); @@ -114,16 +120,27 @@ public class H2SQLExecutor { param.add(metrics.id()); for (int i = 0; i < columns.size(); i++) { ModelColumn column = columns.get(i); - sqlBuilder.append("?"); - if (i != columns.size() - 1) { - sqlBuilder.append(","); + if (List.class.isAssignableFrom(column.getType())) { + for (int physicalColumnIdx = 0; physicalColumnIdx < maxSizeOfArrayColumn; physicalColumnIdx++) { + sqlBuilder.append("?"); + param.add(objectMap.get(column.getColumnName().getName() + "_" + physicalColumnIdx)); + if (physicalColumnIdx != maxSizeOfArrayColumn - 1) { + sqlBuilder.append(","); + } + } + } else { + sqlBuilder.append("?"); + + Object value = objectMap.get(column.getColumnName().getName()); + if (value instanceof StorageDataComplexObject) { + param.add(((StorageDataComplexObject) value).toStorageData()); + } else { + param.add(value); + } } - Object value = objectMap.get(column.getColumnName().getName()); - if (value instanceof StorageDataComplexObject) { - param.add(((StorageDataComplexObject) value).toStorageData()); - } else { - param.add(value); + if (i != columns.size() - 1) { + sqlBuilder.append(","); } } sqlBuilder.append(")"); diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SegmentRecordBuilder.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SegmentRecordBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..522178a7b5b39700bdc04c04f2276d53bd766379 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2SegmentRecordBuilder.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao; + +import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import joptsimple.internal.Strings; +import org.apache.skywalking.apm.util.StringUtil; +import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; +import org.apache.skywalking.oap.server.core.analysis.record.Record; +import org.apache.skywalking.oap.server.core.analysis.topn.TopN; +import org.apache.skywalking.oap.server.core.storage.StorageBuilder; +import org.apache.skywalking.oap.server.library.util.CollectionUtils; + +import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.DATA_BINARY; +import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.ENDPOINT_ID; +import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.ENDPOINT_NAME; +import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.END_TIME; +import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.IS_ERROR; +import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.LATENCY; +import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.SEGMENT_ID; +import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.SERVICE_ID; +import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.SERVICE_INSTANCE_ID; +import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.START_TIME; +import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.TAGS; +import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.TIME_BUCKET; +import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.TRACE_ID; +import static org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.VERSION; + +/** + * H2/MySQL is different from standard {@link org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord.Builder}, + * this maps the tags into multiple columns. + */ +public class H2SegmentRecordBuilder implements StorageBuilder { + private int numOfSearchableValuesPerTag; + private final List searchTagKeys; + + public H2SegmentRecordBuilder(final int maxSizeOfArrayColumn, + final int numOfSearchableValuesPerTag, + final List searchTagKeys) { + this.numOfSearchableValuesPerTag = numOfSearchableValuesPerTag; + final int maxNumOfTags = maxSizeOfArrayColumn / numOfSearchableValuesPerTag; + if (searchTagKeys.size() > maxNumOfTags) { + this.searchTagKeys = searchTagKeys.subList(0, maxNumOfTags); + } else { + this.searchTagKeys = searchTagKeys; + } + } + + @Override + public Map data2Map(Record record) { + SegmentRecord storageData = (SegmentRecord) record; + storageData.setStatement(Strings.join(new String[] { + storageData.getEndpointName(), + storageData.getTraceId() + }, " - ")); + Map map = new HashMap<>(); + map.put(SEGMENT_ID, storageData.getSegmentId()); + map.put(TRACE_ID, storageData.getTraceId()); + map.put(TopN.STATEMENT, storageData.getStatement()); + map.put(SERVICE_ID, storageData.getServiceId()); + map.put(SERVICE_INSTANCE_ID, storageData.getServiceInstanceId()); + map.put(ENDPOINT_NAME, storageData.getEndpointName()); + map.put(ENDPOINT_ID, storageData.getEndpointId()); + map.put(START_TIME, storageData.getStartTime()); + map.put(END_TIME, storageData.getEndTime()); + map.put(LATENCY, storageData.getLatency()); + map.put(IS_ERROR, storageData.getIsError()); + map.put(TIME_BUCKET, storageData.getTimeBucket()); + if (CollectionUtils.isEmpty(storageData.getDataBinary())) { + map.put(DATA_BINARY, Const.EMPTY_STRING); + } else { + map.put(DATA_BINARY, new String(Base64.getEncoder().encode(storageData.getDataBinary()))); + } + map.put(VERSION, storageData.getVersion()); + storageData.getTagsRawData().forEach(spanTag -> { + final int index = searchTagKeys.indexOf(spanTag.getKey()); + boolean shouldAdd = true; + int tagIdx = 0; + final String tagExpression = spanTag.toString(); + for (int i = 0; i < numOfSearchableValuesPerTag; i++) { + tagIdx = index * numOfSearchableValuesPerTag + i; + final String previousValue = (String) map.get(TAGS + "_" + tagIdx); + if (previousValue == null) { + // Still have at least one available slot, add directly. + shouldAdd = true; + break; + } + // If value is duplicated with added one, ignore. + if (previousValue.equals(tagExpression)) { + shouldAdd = false; + break; + } + // Reach the end of tag + if (i == numOfSearchableValuesPerTag - 1) { + shouldAdd = false; + } + } + if (shouldAdd) { + map.put(TAGS + "_" + tagIdx, tagExpression); + } + }); + return map; + } + + @Override + public Record map2Data(Map dbMap) { + SegmentRecord record = new SegmentRecord(); + record.setSegmentId((String) dbMap.get(SEGMENT_ID)); + record.setTraceId((String) dbMap.get(TRACE_ID)); + record.setStatement((String) dbMap.get(TopN.STATEMENT)); + record.setServiceId((String) dbMap.get(SERVICE_ID)); + record.setServiceInstanceId((String) dbMap.get(SERVICE_INSTANCE_ID)); + record.setEndpointName((String) dbMap.get(ENDPOINT_NAME)); + record.setEndpointId((String) dbMap.get(ENDPOINT_ID)); + record.setStartTime(((Number) dbMap.get(START_TIME)).longValue()); + record.setEndTime(((Number) dbMap.get(END_TIME)).longValue()); + record.setLatency(((Number) dbMap.get(LATENCY)).intValue()); + record.setIsError(((Number) dbMap.get(IS_ERROR)).intValue()); + record.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue()); + if (StringUtil.isEmpty((String) dbMap.get(DATA_BINARY))) { + record.setDataBinary(new byte[] {}); + } else { + record.setDataBinary(Base64.getDecoder().decode((String) dbMap.get(DATA_BINARY))); + } + record.setVersion(((Number) dbMap.get(VERSION)).intValue()); + // Don't read the tags as they has been in the data binary already. + return record; + } +} \ No newline at end of file diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2StorageDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2StorageDAO.java index 0b2bdf1f7e6927b85a36c859a1557089f6598e77..f81d03f23b5dc1f7f9de0b317b6b7561ed62decb 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2StorageDAO.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2StorageDAO.java @@ -18,6 +18,7 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao; +import lombok.RequiredArgsConstructor; import org.apache.skywalking.oap.server.core.analysis.config.NoneStream; import org.apache.skywalking.oap.server.core.analysis.management.ManagementData; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; @@ -29,14 +30,14 @@ import org.apache.skywalking.oap.server.core.storage.IRecordDAO; import org.apache.skywalking.oap.server.core.storage.StorageBuilder; import org.apache.skywalking.oap.server.core.storage.StorageDAO; import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +@RequiredArgsConstructor public class H2StorageDAO implements StorageDAO { - - private JDBCHikariCPClient h2Client; - - public H2StorageDAO(JDBCHikariCPClient h2Client) { - this.h2Client = h2Client; - } + private final ModuleManager manager; + private final JDBCHikariCPClient h2Client; + private final int maxSizeOfArrayColumn; + private final int numOfSearchableValuesPerTag; @Override public IMetricsDAO newMetricsDao(StorageBuilder storageBuilder) { @@ -45,7 +46,7 @@ public class H2StorageDAO implements StorageDAO { @Override public IRecordDAO newRecordDao(StorageBuilder storageBuilder) { - return new H2RecordDAO(h2Client, storageBuilder); + return new H2RecordDAO(manager, h2Client, storageBuilder, maxSizeOfArrayColumn, numOfSearchableValuesPerTag); } @Override diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java index ebbd9582859a18da9b874305355e0ef8143baad1..536b6188f383f201d4dff9b8040f06ba2ce90b50 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java @@ -19,8 +19,11 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao; import com.google.gson.JsonObject; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; import java.sql.Connection; import java.sql.SQLException; +import java.util.List; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.core.analysis.NodeType; import org.apache.skywalking.oap.server.core.storage.StorageException; @@ -44,8 +47,16 @@ import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo; public class H2TableInstaller extends ModelInstaller { public static final String ID_COLUMN = "id"; - public H2TableInstaller(Client client, ModuleManager moduleManager) { + protected final int maxSizeOfArrayColumn; + protected final int numOfSearchableValuesPerTag; + + public H2TableInstaller(Client client, + ModuleManager moduleManager, + int maxSizeOfArrayColumn, + int numOfSearchableValuesPerTag) { super(client, moduleManager); + this.maxSizeOfArrayColumn = maxSizeOfArrayColumn; + this.numOfSearchableValuesPerTag = numOfSearchableValuesPerTag; } @Override @@ -67,9 +78,7 @@ public class H2TableInstaller extends ModelInstaller { ModelColumn column = model.getColumns().get(i); ColumnName name = column.getColumnName(); tableCreateSQL.appendLine( - name.getStorageName() + " " + getColumnType(column) + (i != model - .getColumns() - .size() - 1 ? "," : "")); + getColumn(column) + (i != model.getColumns().size() - 1 ? "," : "")); } tableCreateSQL.appendLine(")"); @@ -88,22 +97,37 @@ public class H2TableInstaller extends ModelInstaller { /** * Set up the data type mapping between Java type and H2 database type */ - protected String getColumnType(ModelColumn column) { - final Class type = column.getType(); + protected String getColumn(ModelColumn column) { + return transform(column, column.getType(), column.getGenericType()); + } + + protected String transform(ModelColumn column, Class type, Type genericType) { + final String storageName = column.getColumnName().getStorageName(); if (Integer.class.equals(type) || int.class.equals(type) || NodeType.class.equals(type)) { - return "INT"; + return storageName + " INT"; } else if (Long.class.equals(type) || long.class.equals(type)) { - return "BIGINT"; + return storageName + " BIGINT"; } else if (Double.class.equals(type) || double.class.equals(type)) { - return "DOUBLE"; + return storageName + " DOUBLE"; } else if (String.class.equals(type)) { - return "VARCHAR(" + column.getLength() + ")"; + return storageName + " VARCHAR(" + column.getLength() + ")"; } else if (StorageDataComplexObject.class.isAssignableFrom(type)) { - return "VARCHAR(20000)"; + return storageName + " VARCHAR(20000)"; } else if (byte[].class.equals(type)) { - return "MEDIUMTEXT"; + return storageName + " MEDIUMTEXT"; } else if (JsonObject.class.equals(type)) { - return "VARCHAR(" + column.getLength() + ")"; + return storageName + " VARCHAR(" + column.getLength() + ")"; + } else if (List.class.isAssignableFrom(type)) { + final Type elementType = ((ParameterizedType) genericType).getActualTypeArguments()[0]; + String oneColumnType = transform(column, (Class) elementType, elementType); + // Remove the storageName as prefix + oneColumnType = oneColumnType.substring(storageName.length()); + StringBuilder columns = new StringBuilder(); + for (int i = 0; i < maxSizeOfArrayColumn; i++) { + columns.append(storageName).append("_").append(i).append(oneColumnType) + .append(i == maxSizeOfArrayColumn - 1 ? "" : ","); + } + return columns.toString(); } else { throw new IllegalArgumentException("Unsupported data type: " + type.getName()); } diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java index d3ee419416881231c7b61d3d444f15be21017dab..35e982831c2c34475928e99dae765b75f8c226b1 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java @@ -24,11 +24,16 @@ import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Base64; import java.util.Collections; import java.util.List; import org.apache.skywalking.apm.util.StringUtil; +import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; +import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag; +import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.core.query.type.BasicTrace; import org.apache.skywalking.oap.server.core.query.type.QueryOrder; import org.apache.skywalking.oap.server.core.query.type.Span; @@ -36,14 +41,26 @@ import org.apache.skywalking.oap.server.core.query.type.TraceBrief; import org.apache.skywalking.oap.server.core.query.type.TraceState; import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO; import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient; +import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.util.BooleanUtils; +import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.elasticsearch.search.sort.SortOrder; public class H2TraceQueryDAO implements ITraceQueryDAO { + private ModuleManager manager; private JDBCHikariCPClient h2Client; - - public H2TraceQueryDAO(JDBCHikariCPClient h2Client) { + private List searchableTagKeys; + private int maxSizeOfArrayColumn; + private int numOfSearchableValuesPerTag; + + public H2TraceQueryDAO(ModuleManager manager, + JDBCHikariCPClient h2Client, + final int maxSizeOfArrayColumn, + final int numOfSearchableValuesPerTag) { this.h2Client = h2Client; + this.manager = manager; + this.maxSizeOfArrayColumn = maxSizeOfArrayColumn; + this.numOfSearchableValuesPerTag = numOfSearchableValuesPerTag; } @Override @@ -59,7 +76,18 @@ public class H2TraceQueryDAO implements ITraceQueryDAO { int limit, int from, TraceState traceState, - QueryOrder queryOrder) throws IOException { + QueryOrder queryOrder, + final List tags) throws IOException { + if (searchableTagKeys == null) { + final ConfigService configService = manager.find(CoreModule.NAME) + .provider() + .getService(ConfigService.class); + searchableTagKeys = Arrays.asList(configService.getSearchableTracesTags().split(Const.COMMA)); + if (searchableTagKeys.size() > maxSizeOfArrayColumn) { + this.searchableTagKeys = searchableTagKeys.subList(0, maxSizeOfArrayColumn); + } + } + StringBuilder sql = new StringBuilder(); List parameters = new ArrayList<>(10); @@ -101,6 +129,26 @@ public class H2TraceQueryDAO implements ITraceQueryDAO { sql.append(" and ").append(SegmentRecord.TRACE_ID).append(" = ?"); parameters.add(traceId); } + if (CollectionUtils.isNotEmpty(tags)) { + for (final SpanTag tag : tags) { + final int foundIdx = searchableTagKeys.indexOf(tag.getKey()); + if (foundIdx > -1) { + sql.append(" and ("); + for (int i = 0; i < numOfSearchableValuesPerTag; i++) { + final String physicalColumn = SegmentRecord.TAGS + "_" + (foundIdx * numOfSearchableValuesPerTag + i); + sql.append(physicalColumn).append(" = ? "); + parameters.add(tag.toString()); + if (i != numOfSearchableValuesPerTag - 1) { + sql.append(" or "); + } + } + sql.append(")"); + } else { + //If the tag is not searchable, but is required, then don't need to run the real query. + return new TraceBrief(); + } + } + } switch (traceState) { case ERROR: sql.append(" and ").append(SegmentRecord.IS_ERROR).append(" = ").append(BooleanUtils.TRUE); diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageConfig.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageConfig.java index 1b44b1de1e8b69faab3926bbc62f0ddb7d4d1473..eb9b1ae981090a93b0bbe201d511b160d09bae75 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageConfig.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageConfig.java @@ -18,16 +18,26 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql; +import java.util.Properties; import lombok.Getter; import lombok.Setter; import org.apache.skywalking.oap.server.library.module.ModuleConfig; -import java.util.Properties; - @Setter @Getter public final class MySQLStorageConfig extends ModuleConfig { - private int metadataQueryMaxSize = 5000; + /** + * Inherit from {@link org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.H2StorageConfig#getMaxSizeOfArrayColumn()} + * + * @since 8.2.0 + */ + private int maxSizeOfArrayColumn = 20; + /** + * Inherit from {@link org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.H2StorageConfig#getNumOfSearchableValuesPerTag()} + * + * @since 8.2.0 + */ + private int numOfSearchableValuesPerTag = 2; private Properties properties; } diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java index 11a39bceb9d6cb05451c7a94ebca110b553079af..765c252be446f56321065c456b62b6eb1b755b13 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java @@ -19,7 +19,9 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql; import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.oap.server.core.Const; import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.core.storage.IBatchDAO; import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO; import org.apache.skywalking.oap.server.core.storage.StorageDAO; @@ -96,35 +98,60 @@ public class MySQLStorageProvider extends ModuleProvider { mysqlClient = new JDBCHikariCPClient(config.getProperties()); this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(mysqlClient)); - this.registerServiceImplementation(StorageDAO.class, new H2StorageDAO(mysqlClient)); this.registerServiceImplementation( - INetworkAddressAliasDAO.class, new H2NetworkAddressAliasDAO(mysqlClient)); + StorageDAO.class, + new H2StorageDAO( + getManager(), mysqlClient, config.getMaxSizeOfArrayColumn(), config.getNumOfSearchableValuesPerTag()) + ); + this.registerServiceImplementation( + INetworkAddressAliasDAO.class, new H2NetworkAddressAliasDAO(mysqlClient)); this.registerServiceImplementation(ITopologyQueryDAO.class, new H2TopologyQueryDAO(mysqlClient)); this.registerServiceImplementation(IMetricsQueryDAO.class, new H2MetricsQueryDAO(mysqlClient)); - this.registerServiceImplementation(ITraceQueryDAO.class, new MySQLTraceQueryDAO(mysqlClient)); this.registerServiceImplementation( - IMetadataQueryDAO.class, new H2MetadataQueryDAO(mysqlClient, config.getMetadataQueryMaxSize())); + ITraceQueryDAO.class, + new MySQLTraceQueryDAO( + getManager(), + mysqlClient, + config.getMaxSizeOfArrayColumn(), + config.getNumOfSearchableValuesPerTag() + ) + ); + this.registerServiceImplementation( + IMetadataQueryDAO.class, new H2MetadataQueryDAO(mysqlClient, config.getMetadataQueryMaxSize())); this.registerServiceImplementation(IAggregationQueryDAO.class, new MySQLAggregationQueryDAO(mysqlClient)); this.registerServiceImplementation(IAlarmQueryDAO.class, new MySQLAlarmQueryDAO(mysqlClient)); this.registerServiceImplementation( - IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(mysqlClient)); + IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(mysqlClient)); this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new H2TopNRecordsQueryDAO(mysqlClient)); this.registerServiceImplementation(ILogQueryDAO.class, new MySQLLogQueryDAO(mysqlClient)); this.registerServiceImplementation(IProfileTaskQueryDAO.class, new H2ProfileTaskQueryDAO(mysqlClient)); this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new H2ProfileTaskLogQueryDAO(mysqlClient)); this.registerServiceImplementation( - IProfileThreadSnapshotQueryDAO.class, new H2ProfileThreadSnapshotQueryDAO(mysqlClient)); + IProfileThreadSnapshotQueryDAO.class, new H2ProfileThreadSnapshotQueryDAO(mysqlClient)); this.registerServiceImplementation(UITemplateManagementDAO.class, new H2UITemplateManagementDAO(mysqlClient)); } @Override public void start() throws ServiceNotProvidedException, ModuleStartException { + final ConfigService configService = getManager().find(CoreModule.NAME) + .provider() + .getService(ConfigService.class); + final int numOfSearchableTags = configService.getSearchableTracesTags().split(Const.COMMA).length; + if (numOfSearchableTags * config.getNumOfSearchableValuesPerTag() > config.getMaxSizeOfArrayColumn()) { + throw new ModuleStartException("Size of searchableTracesTags[" + numOfSearchableTags + + "] * numOfSearchableValuesPerTag[" + config.getNumOfSearchableValuesPerTag() + + "] > maxSizeOfArrayColumn[" + config.getMaxSizeOfArrayColumn() + + "]. Potential out of bound in the runtime."); + } + try { mysqlClient.connect(); - MySQLTableInstaller installer = new MySQLTableInstaller(mysqlClient, getManager()); + MySQLTableInstaller installer = new MySQLTableInstaller( + mysqlClient, getManager(), config.getMaxSizeOfArrayColumn(), config.getNumOfSearchableValuesPerTag() + ); getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer); } catch (StorageException e) { throw new ModuleStartException(e.getMessage(), e); @@ -138,6 +165,6 @@ public class MySQLStorageProvider extends ModuleProvider { @Override public String[] requiredModules() { - return new String[]{CoreModule.NAME}; + return new String[] {CoreModule.NAME}; } } diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java index 55a6b51c1af3a4ed5e6399a628fccebd080e4ee6..62f615f53f8c9962ca5109c759c56e5be4e82751 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java @@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.List; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.model.ExtraQueryIndex; @@ -40,8 +41,11 @@ import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TableInstal */ @Slf4j public class MySQLTableInstaller extends H2TableInstaller { - public MySQLTableInstaller(Client client, ModuleManager moduleManager) { - super(client, moduleManager); + public MySQLTableInstaller(Client client, + ModuleManager moduleManager, + int maxSizeOfArrayColumn, + int numOfSearchableValuesPerTag) { + super(client, moduleManager, maxSizeOfArrayColumn, numOfSearchableValuesPerTag); /* * Override column because the default column names in core have syntax conflict with MySQL. */ @@ -72,24 +76,39 @@ public class MySQLTableInstaller extends H2TableInstaller { int indexSeq = 0; for (final ModelColumn modelColumn : model.getColumns()) { if (!modelColumn.isStorageOnly()) { - SQLBuilder tableIndexSQL = new SQLBuilder("CREATE INDEX "); - tableIndexSQL.append(model.getName().toUpperCase()) - .append("_") - .append(String.valueOf(indexSeq++)) - .append("_IDX "); - tableIndexSQL.append("ON ").append(model.getName()).append("(") - .append(modelColumn.getColumnName().getStorageName()) - .append(")"); - createIndex(client, connection, model, tableIndexSQL); + final Class type = modelColumn.getType(); + if (List.class.isAssignableFrom(type)) { + for (int i = 0; i < maxSizeOfArrayColumn; i++) { + SQLBuilder tableIndexSQL = new SQLBuilder("CREATE INDEX "); + tableIndexSQL.append(model.getName().toUpperCase()) + .append("_") + .append(String.valueOf(indexSeq++)) + .append("_IDX "); + tableIndexSQL.append("ON ").append(model.getName()).append("(") + .append(modelColumn.getColumnName().getStorageName() + "_" + i) + .append(")"); + createIndex(client, connection, model, tableIndexSQL); + } + } else { + SQLBuilder tableIndexSQL = new SQLBuilder("CREATE INDEX "); + tableIndexSQL.append(model.getName().toUpperCase()) + .append("_") + .append(String.valueOf(indexSeq++)) + .append("_IDX "); + tableIndexSQL.append("ON ").append(model.getName()).append("(") + .append(modelColumn.getColumnName().getStorageName()) + .append(")"); + createIndex(client, connection, model, tableIndexSQL); + } } } for (final ExtraQueryIndex extraQueryIndex : model.getExtraQueryIndices()) { SQLBuilder tableIndexSQL = new SQLBuilder("CREATE INDEX "); tableIndexSQL.append(model.getName().toUpperCase()) - .append("_") - .append(String.valueOf(indexSeq++)) - .append("_IDX "); + .append("_") + .append(String.valueOf(indexSeq++)) + .append("_IDX "); tableIndexSQL.append(" ON ").append(model.getName()).append("("); final String[] columns = extraQueryIndex.getColumns(); for (int i = 0; i < columns.length; i++) { @@ -104,17 +123,18 @@ public class MySQLTableInstaller extends H2TableInstaller { } @Override - protected String getColumnType(final ModelColumn column) { + protected String getColumn(final ModelColumn column) { + final String storageName = column.getColumnName().getStorageName(); final Class type = column.getType(); if (StorageDataComplexObject.class.isAssignableFrom(type)) { - return "MEDIUMTEXT"; + return storageName + " MEDIUMTEXT"; } else if (String.class.equals(type)) { if (column.getLength() > 16383) { - return "MEDIUMTEXT"; + return storageName + " MEDIUMTEXT"; } else { - return "VARCHAR(" + column.getLength() + ")"; + return storageName + " VARCHAR(" + column.getLength() + ")"; } } - return super.getColumnType(column); + return super.getColumn(column); } } diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTraceQueryDAO.java index 7cb6a38b2aa2848702731f4ad0c568ff5c6c2b89..fd1f8815636c6fc9ead346829490363394ff34af 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTraceQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTraceQueryDAO.java @@ -19,12 +19,16 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql; import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient; +import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TraceQueryDAO; public class MySQLTraceQueryDAO extends H2TraceQueryDAO { - public MySQLTraceQueryDAO(JDBCHikariCPClient mysqlClient) { - super(mysqlClient); + public MySQLTraceQueryDAO(ModuleManager manager, + JDBCHikariCPClient h2Client, + final int maxSizeOfArrayColumn, + final int numOfSearchableValuesPerTag) { + super(manager, h2Client, maxSizeOfArrayColumn, numOfSearchableValuesPerTag); } @Override diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java index 45f090aee5f312ec463bf763e0aaf24fe0f22707..bec545a804cedc65deefca89901b4e803cddb502 100644 --- a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java +++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/ZipkinTraceQueryEsDAO.java @@ -27,6 +27,7 @@ import java.util.List; import org.apache.skywalking.apm.util.StringUtil; import org.apache.skywalking.oap.server.core.analysis.IDManager; import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; +import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag; import org.apache.skywalking.oap.server.core.query.type.BasicTrace; import org.apache.skywalking.oap.server.core.query.type.KeyValue; import org.apache.skywalking.oap.server.core.query.type.LogEntity; @@ -86,7 +87,8 @@ public class ZipkinTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO { int limit, int from, TraceState traceState, - QueryOrder queryOrder) throws IOException { + QueryOrder queryOrder, + final List tags) throws IOException { SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource(); diff --git a/oap-server/server-tools/profile-exporter/tool-profile-snapshot-bootstrap/src/test/java/org/apache/skywalking/oap/server/tool/profile/exporter/test/ProfileTraceDAO.java b/oap-server/server-tools/profile-exporter/tool-profile-snapshot-bootstrap/src/test/java/org/apache/skywalking/oap/server/tool/profile/exporter/test/ProfileTraceDAO.java index fe499d33b94e28ee5d2560b1c207924c4edf3bfa..7c3a4ba292a28bbff037c43fb85518a037f0dcc6 100644 --- a/oap-server/server-tools/profile-exporter/tool-profile-snapshot-bootstrap/src/test/java/org/apache/skywalking/oap/server/tool/profile/exporter/test/ProfileTraceDAO.java +++ b/oap-server/server-tools/profile-exporter/tool-profile-snapshot-bootstrap/src/test/java/org/apache/skywalking/oap/server/tool/profile/exporter/test/ProfileTraceDAO.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject; import org.apache.skywalking.apm.network.language.agent.v3.SpanObject; import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; +import org.apache.skywalking.oap.server.core.analysis.manual.segment.SpanTag; import org.apache.skywalking.oap.server.core.query.type.QueryOrder; import org.apache.skywalking.oap.server.core.query.type.Span; import org.apache.skywalking.oap.server.core.query.type.TraceBrief; @@ -50,7 +51,8 @@ public class ProfileTraceDAO implements ITraceQueryDAO { int limit, int from, TraceState traceState, - QueryOrder queryOrder) throws IOException { + QueryOrder queryOrder, + final List tags) throws IOException { return null; }