未验证 提交 6c6ef246 编写于 作者: W Wan Kai 提交者: GitHub

Support autocomplete tags in logs query. (#8999)

上级 369949b2
......@@ -42,6 +42,7 @@
* Add TermsAggregation properties collect_mode and execution_hint.
* Add "execution_hint": "map", "collect_mode": "breadth_first" for aggregation and topology query to improve 5-10x performance.
* Clean up scroll contexts after used.
* Support autocomplete tags in logs query.
#### UI
......
......@@ -24,7 +24,8 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.apm.network.language.agent.v3.SpanObject;
import org.apache.skywalking.oap.server.core.source.TraceTagAutocomplete;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagType;
import org.apache.skywalking.oap.server.core.source.TagAutocomplete;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleConfig;
import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.strategy.SegmentStatusAnalyzer;
......@@ -179,10 +180,10 @@ public class SegmentAnalysisListener implements FirstAnalysisListener, EntryAnal
private void addAutocompleteTags() {
segment.getTags().forEach(tag -> {
TraceTagAutocomplete tagAutocomplete = new TraceTagAutocomplete();
tagAutocomplete.setTag(tag.toString());
TagAutocomplete tagAutocomplete = new TagAutocomplete();
tagAutocomplete.setTagKey(tag.getKey());
tagAutocomplete.setTagValue(tag.getValue());
tagAutocomplete.setTagType(TagType.TRACE);
tagAutocomplete.setTimeBucket(TimeBucket.getMinuteTimeBucket(segment.getStartTime()));
sourceReceiver.receive(tagAutocomplete);
});
......
......@@ -29,6 +29,8 @@ import lombok.SneakyThrows;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.apm.network.logging.v3.LogDataBody;
import org.apache.skywalking.apm.network.logging.v3.TraceContext;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagType;
import org.apache.skywalking.oap.server.core.source.TagAutocomplete;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
import org.apache.skywalking.oap.server.core.Const;
......@@ -59,6 +61,7 @@ public class RecordAnalysisListener implements LogAnalysisListener {
@Override
public void build() {
sourceReceiver.receive(log);
addAutocompleteTags();
}
@Override
......@@ -128,6 +131,17 @@ public class RecordAnalysisListener implements LogAnalysisListener {
return logTags;
}
private void addAutocompleteTags() {
log.getTags().forEach(tag -> {
TagAutocomplete tagAutocomplete = new TagAutocomplete();
tagAutocomplete.setTagKey(tag.getKey());
tagAutocomplete.setTagValue(tag.getValue());
tagAutocomplete.setTagType(TagType.LOG);
tagAutocomplete.setTimeBucket(TimeBucket.getMinuteTimeBucket(log.getTimestamp()));
sourceReceiver.receive(tagAutocomplete);
});
}
public static class Factory implements LogAnalysisListenerFactory {
private final SourceReceiver sourceReceiver;
private final NamingControl namingControl;
......
......@@ -42,6 +42,7 @@ import org.apache.skywalking.oap.server.core.query.MetadataQueryService;
import org.apache.skywalking.oap.server.core.query.MetricsMetadataQueryService;
import org.apache.skywalking.oap.server.core.query.MetricsQueryService;
import org.apache.skywalking.oap.server.core.profiling.trace.ProfileTaskQueryService;
import org.apache.skywalking.oap.server.core.query.TagAutoCompleteQueryService;
import org.apache.skywalking.oap.server.core.query.TopNRecordsQueryService;
import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
import org.apache.skywalking.oap.server.core.query.TraceQueryService;
......@@ -126,6 +127,7 @@ public class CoreModule extends ModuleDefine {
classes.add(TopNRecordsQueryService.class);
classes.add(BrowserLogQueryService.class);
classes.add(EventQueryService.class);
classes.add(TagAutoCompleteQueryService.class);
}
private void addServerInterface(List<Class> classes) {
......
......@@ -64,6 +64,7 @@ import org.apache.skywalking.oap.server.core.query.MetadataQueryService;
import org.apache.skywalking.oap.server.core.query.MetricsMetadataQueryService;
import org.apache.skywalking.oap.server.core.query.MetricsQueryService;
import org.apache.skywalking.oap.server.core.profiling.trace.ProfileTaskQueryService;
import org.apache.skywalking.oap.server.core.query.TagAutoCompleteQueryService;
import org.apache.skywalking.oap.server.core.query.TopNRecordsQueryService;
import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
import org.apache.skywalking.oap.server.core.query.TraceQueryService;
......@@ -263,6 +264,7 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(AlarmQueryService.class, new AlarmQueryService(getManager()));
this.registerServiceImplementation(TopNRecordsQueryService.class, new TopNRecordsQueryService(getManager()));
this.registerServiceImplementation(EventQueryService.class, new EventQueryService(getManager()));
this.registerServiceImplementation(TagAutoCompleteQueryService.class, new TagAutoCompleteQueryService(getManager()));
// add profile service implementations
this.registerServiceImplementation(
......
......@@ -21,20 +21,31 @@ package org.apache.skywalking.oap.server.core.analysis.manual.searchtag;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.analysis.MetricsExtension;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
@Stream(name = TagAutocompleteData.INDEX_NAME, scopeId = DefaultScopeDefine.TAG_AUTOCOMPLETE,
builder = TagAutocompleteData.Builder.class, processor = MetricsStreamProcessor.class)
@MetricsExtension(supportDownSampling = false, supportUpdate = false, timeRelativeID = true)
@EqualsAndHashCode(of = {
"tag"
"tagKey",
"tagValue",
"tagType"
})
public abstract class TagAutocompleteData extends Metrics {
public class TagAutocompleteData extends Metrics {
public static final String INDEX_NAME = "tag_autocomplete";
public static final String TAG_KEY = "tag_key";
public static final String TAG_VALUE = "tag_value";
public static final String TAG_TYPE = "tag_type";
@Setter
@Getter
private String tag;
@Setter
@Getter
@Column(columnName = TAG_KEY)
......@@ -44,6 +55,11 @@ public abstract class TagAutocompleteData extends Metrics {
@Column(columnName = TAG_VALUE)
private String tagValue;
@Setter
@Getter
@Column(columnName = TAG_TYPE)
private String tagType;
@Override
public boolean combine(final Metrics metrics) {
return true;
......@@ -66,7 +82,7 @@ public abstract class TagAutocompleteData extends Metrics {
@Override
protected String id0() {
return toTimeBucketInDay() + "-" + tag;
return toTimeBucketInDay() + "-" + tagType + "-" + tagKey + "=" + tagValue;
}
@Override
......@@ -76,19 +92,39 @@ public abstract class TagAutocompleteData extends Metrics {
@Override
public void deserialize(final RemoteData remoteData) {
setTag(remoteData.getDataStrings(0));
setTagKey(remoteData.getDataStrings(1));
setTagValue(remoteData.getDataStrings(2));
setTagKey(remoteData.getDataStrings(0));
setTagValue(remoteData.getDataStrings(1));
setTagType(remoteData.getDataStrings(2));
setTimeBucket(remoteData.getDataLongs(0));
}
@Override
public RemoteData.Builder serialize() {
final RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(tag);
builder.addDataStrings(tagKey);
builder.addDataStrings(tagValue);
builder.addDataStrings(tagType);
builder.addDataLongs(getTimeBucket());
return builder;
}
public static class Builder implements StorageBuilder<TagAutocompleteData> {
@Override
public TagAutocompleteData storage2Entity(final Convert2Entity converter) {
TagAutocompleteData record = new TagAutocompleteData();
record.setTagKey((String) converter.get(TAG_KEY));
record.setTagValue((String) converter.get(TAG_VALUE));
record.setTagType((String) converter.get(TAG_TYPE));
record.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue());
return record;
}
@Override
public void entity2Storage(final TagAutocompleteData storageData, final Convert2Storage converter) {
converter.accept(TAG_KEY, storageData.getTagKey());
converter.accept(TAG_VALUE, storageData.getTagValue());
converter.accept(TAG_TYPE, storageData.getTagType());
converter.accept(TIME_BUCKET, storageData.getTimeBucket());
}
}
}
......@@ -16,20 +16,20 @@
*
*/
package org.apache.skywalking.oap.server.core.analysis.manual.segment;
package org.apache.skywalking.oap.server.core.analysis.manual.searchtag;
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.source.TraceTagAutocomplete;
import org.apache.skywalking.oap.server.core.source.TagAutocomplete;
public class TraceTagAutocompleteDispatcher implements SourceDispatcher<TraceTagAutocomplete> {
public class TagAutocompleteDispatcher implements SourceDispatcher<TagAutocomplete> {
@Override
public void dispatch(TraceTagAutocomplete source) {
TraceTagAutocompleteData autocomplete = new TraceTagAutocompleteData();
autocomplete.setTag(source.getTag());
public void dispatch(TagAutocomplete source) {
TagAutocompleteData autocomplete = new TagAutocompleteData();
autocomplete.setTagKey(source.getTagKey());
autocomplete.setTagValue(source.getTagValue());
autocomplete.setTagType(source.getTagType().name());
autocomplete.setTimeBucket(source.getTimeBucket());
MetricsStreamProcessor.getInstance().in(autocomplete);
}
......
......@@ -16,9 +16,9 @@
*
*/
package org.apache.skywalking.oap.server.core.source;
@ScopeDeclaration(id = DefaultScopeDefine.TRACE_TAG_AUTOCOMPLETE, name = "TraceTagAutocomplete")
public class TraceTagAutocomplete extends TagAutocomplete {
package org.apache.skywalking.oap.server.core.analysis.manual.searchtag;
public enum TagType {
TRACE,
LOG
}
......@@ -16,40 +16,43 @@
*
*/
package org.apache.skywalking.oap.server.core.analysis.manual.segment;
package org.apache.skywalking.oap.server.core.query;
import lombok.EqualsAndHashCode;
import org.apache.skywalking.oap.server.core.analysis.MetricsExtension;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagAutocompleteData;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import java.io.IOException;
import java.util.Set;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagType;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.query.ITagAutoCompleteQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.Service;
@Stream(name = TraceTagAutocompleteData.INDEX_NAME, scopeId = DefaultScopeDefine.TRACE_TAG_AUTOCOMPLETE,
builder = TraceTagAutocompleteData.Builder.class, processor = MetricsStreamProcessor.class)
@MetricsExtension(supportDownSampling = false, supportUpdate = false, timeRelativeID = true)
@EqualsAndHashCode(callSuper = true)
public class TraceTagAutocompleteData extends TagAutocompleteData {
public static final String INDEX_NAME = "trace_tag_autocomplete";
public class TagAutoCompleteQueryService implements Service {
private final ModuleManager moduleManager;
private ITagAutoCompleteQueryDAO tagAutoCompleteQueryDAO;
public static class Builder implements StorageBuilder<TraceTagAutocompleteData> {
@Override
public TraceTagAutocompleteData storage2Entity(final Convert2Entity converter) {
TraceTagAutocompleteData record = new TraceTagAutocompleteData();
record.setTagKey((String) converter.get(TAG_KEY));
record.setTagValue((String) converter.get(TAG_VALUE));
record.setTag(record.getTagKey() + "=" + record.getTagValue());
return record;
}
public TagAutoCompleteQueryService(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
@Override
public void entity2Storage(final TraceTagAutocompleteData storageData, final Convert2Storage converter) {
converter.accept(TAG_KEY, storageData.getTagKey());
converter.accept(TAG_VALUE, storageData.getTagValue());
converter.accept(TIME_BUCKET, storageData.getTimeBucket());
private ITagAutoCompleteQueryDAO getTagAutoCompleteQueryDAO() {
if (tagAutoCompleteQueryDAO == null) {
this.tagAutoCompleteQueryDAO = moduleManager.find(StorageModule.NAME).provider().getService(ITagAutoCompleteQueryDAO.class);
}
return tagAutoCompleteQueryDAO;
}
public Set<String> queryTagAutocompleteKeys(final TagType tagType,
final long startSecondTB,
final long endSecondTB) throws IOException {
return getTagAutoCompleteQueryDAO().queryTagAutocompleteKeys(tagType, startSecondTB, endSecondTB);
}
public Set<String> queryTagAutocompleteValues(final TagType tagType,
final String tagKey,
final int limit,
final long startSecondTB,
final long endSecondTB) throws IOException {
return getTagAutoCompleteQueryDAO().queryTagAutocompleteValues(
tagType, tagKey, limit, startSecondTB, endSecondTB);
}
}
......@@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
......@@ -129,18 +128,6 @@ public class TraceQueryService implements Service {
return trace;
}
public Set<String> queryTraceTagAutocompleteKeys(final long startSecondTB,
final long endSecondTB) throws IOException {
return getTraceQueryDAO().queryTraceTagAutocompleteKeys(startSecondTB, endSecondTB);
}
public Set<String> queryTraceTagAutocompleteValues(final String tagKey,
final int limit,
final long startSecondTB,
final long endSecondTB) throws IOException {
return getTraceQueryDAO().queryTraceTagAutocompleteValues(tagKey, limit, startSecondTB, endSecondTB);
}
private List<Span> buildSpanList(SegmentObject segmentObject) {
List<Span> spans = new ArrayList<>();
......
......@@ -101,7 +101,7 @@ public class DefaultScopeDefine {
public static final int EBPF_PROFILING_SCHEDULE = 47;
public static final int EBPF_PROFILING_DATA = 48;
public static final int SERVICE_LABEL = 49;
public static final int TRACE_TAG_AUTOCOMPLETE = 50;
public static final int TAG_AUTOCOMPLETE = 50;
/**
* Catalog of scope, the metrics processor could use this to group all generated metrics by oal rt.
......
......@@ -20,27 +20,30 @@ package org.apache.skywalking.oap.server.core.source;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagType;
import org.apache.skywalking.oap.server.library.util.StringUtil;
public abstract class TagAutocomplete extends Source {
@ScopeDeclaration(id = DefaultScopeDefine.TAG_AUTOCOMPLETE, name = "TagAutocomplete")
public class TagAutocomplete extends Source {
@Override
public int scope() {
return DefaultScopeDefine.TRACE_TAG_AUTOCOMPLETE;
return DefaultScopeDefine.TAG_AUTOCOMPLETE;
}
@Override
public String getEntityId() {
return tag;
return StringUtil.join('=', tagKey, tagValue);
}
@Setter
@Getter
private String tag;
@Setter
@Getter
private String tagKey;
@Setter
@Getter
private String tagValue;
@Setter
@Getter
private TagType tagType;
}
......@@ -34,6 +34,7 @@ import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITagAutoCompleteQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
......@@ -76,7 +77,8 @@ public class StorageModule extends ModuleDefine {
IEBPFProfilingTaskDAO.class,
IEBPFProfilingScheduleDAO.class,
IEBPFProfilingDataDAO.class,
IServiceLabelDAO.class
IServiceLabelDAO.class,
ITagAutoCompleteQueryDAO.class
};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage.query;
import java.io.IOException;
import java.util.Set;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagType;
import org.apache.skywalking.oap.server.library.module.Service;
public interface ITagAutoCompleteQueryDAO extends Service {
Set<String> queryTagAutocompleteKeys(final TagType tagType,
final long startSecondTB,
final long endSecondTB) throws IOException;
Set<String> queryTagAutocompleteValues(final TagType tagType,
final String tagKey,
final int limit,
final long startSecondTB,
final long endSecondTB) throws IOException;
}
......@@ -20,7 +20,6 @@ package org.apache.skywalking.oap.server.core.storage.query;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
......@@ -51,12 +50,4 @@ public interface ITraceQueryDAO extends Service {
* This method gives more flexible for 3rd trace without segment concept, which can't search data through {@link #queryByTraceId(String)}
*/
List<Span> doFlexibleTraceQuery(String traceId) throws IOException;
Set<String> queryTraceTagAutocompleteKeys(final long startSecondTB,
final long endSecondTB) throws IOException;
Set<String> queryTraceTagAutocompleteValues(final String tagKey,
final int limit,
final long startSecondTB,
final long endSecondTB) throws IOException;
}
......@@ -26,6 +26,6 @@ public class CoreModuleTest {
public void testOpenServiceList() {
CoreModule coreModule = new CoreModule();
Assert.assertEquals(35, coreModule.services().length);
Assert.assertEquals(36, coreModule.services().length);
}
}
......@@ -20,10 +20,14 @@ package org.apache.skywalking.oap.query.graphql.resolver;
import graphql.kickstart.tools.GraphQLQueryResolver;
import java.io.IOException;
import java.util.Set;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagType;
import org.apache.skywalking.oap.server.core.query.LogQueryService;
import org.apache.skywalking.oap.server.core.query.TagAutoCompleteQueryService;
import org.apache.skywalking.oap.server.core.query.enumeration.Order;
import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.query.input.LogQueryCondition;
import org.apache.skywalking.oap.server.core.query.type.Logs;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
......@@ -36,6 +40,7 @@ import static java.util.Objects.nonNull;
public class LogQuery implements GraphQLQueryResolver {
private final ModuleManager moduleManager;
private LogQueryService logQueryService;
private TagAutoCompleteQueryService tagQueryService;
public LogQuery(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
......@@ -48,6 +53,13 @@ public class LogQuery implements GraphQLQueryResolver {
return logQueryService;
}
private TagAutoCompleteQueryService getTagQueryService() {
if (tagQueryService == null) {
this.tagQueryService = moduleManager.find(CoreModule.NAME).provider().getService(TagAutoCompleteQueryService.class);
}
return tagQueryService;
}
public boolean supportQueryLogsByKeywords() {
return getQueryService().supportQueryLogsByKeywords();
}
......@@ -88,4 +100,12 @@ public class LogQuery implements GraphQLQueryResolver {
condition.getExcludingKeywordsOfContent()
);
}
public Set<String> queryLogTagAutocompleteKeys(final Duration queryDuration) throws IOException {
return getTagQueryService().queryTagAutocompleteKeys(TagType.LOG, queryDuration.getStartTimeBucketInSec(), queryDuration.getEndTimeBucketInSec());
}
public Set<String> queryLogTagAutocompleteValues(final String tagKey, final Duration queryDuration) throws IOException {
return getTagQueryService().queryTagAutocompleteValues(TagType.LOG, tagKey, 100, queryDuration.getStartTimeBucketInSec(), queryDuration.getEndTimeBucketInSec());
}
}
......@@ -25,6 +25,8 @@ import java.util.Set;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagType;
import org.apache.skywalking.oap.server.core.query.TagAutoCompleteQueryService;
import org.apache.skywalking.oap.server.core.query.TraceQueryService;
import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.query.input.TraceQueryCondition;
......@@ -41,6 +43,7 @@ public class TraceQuery implements GraphQLQueryResolver {
private final ModuleManager moduleManager;
private TraceQueryService queryService;
private TagAutoCompleteQueryService tagQueryService;
public TraceQuery(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
......@@ -53,6 +56,13 @@ public class TraceQuery implements GraphQLQueryResolver {
return queryService;
}
private TagAutoCompleteQueryService getTagQueryService() {
if (tagQueryService == null) {
this.tagQueryService = moduleManager.find(CoreModule.NAME).provider().getService(TagAutoCompleteQueryService.class);
}
return tagQueryService;
}
public TraceBrief queryBasicTraces(final TraceQueryCondition condition) throws IOException {
long startSecondTB = 0;
long endSecondTB = 0;
......@@ -85,10 +95,10 @@ public class TraceQuery implements GraphQLQueryResolver {
}
public Set<String> queryTraceTagAutocompleteKeys(final Duration queryDuration) throws IOException {
return getQueryService().queryTraceTagAutocompleteKeys(queryDuration.getStartTimeBucketInSec(), queryDuration.getEndTimeBucketInSec());
return getTagQueryService().queryTagAutocompleteKeys(TagType.TRACE, queryDuration.getStartTimeBucketInSec(), queryDuration.getEndTimeBucketInSec());
}
public Set<String> queryTraceTagAutocompleteValues(final String tagKey, final Duration queryDuration) throws IOException {
return getQueryService().queryTraceTagAutocompleteValues(tagKey, 100, queryDuration.getStartTimeBucketInSec(), queryDuration.getEndTimeBucketInSec());
return getTagQueryService().queryTagAutocompleteValues(TagType.TRACE, tagKey, 100, queryDuration.getStartTimeBucketInSec(), queryDuration.getEndTimeBucketInSec());
}
}
Subproject commit f40f3914b39efb6590bc51a7014dc5080361d9ba
Subproject commit 0a5a9ede449e9e7227577686dee124a862304b01
......@@ -24,7 +24,8 @@ import java.util.List;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.Layer;
import org.apache.skywalking.oap.server.core.source.TraceTagAutocomplete;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagType;
import org.apache.skywalking.oap.server.core.source.TagAutocomplete;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
......@@ -94,7 +95,7 @@ public class SpanForward {
if (searchTagKeys.contains(key)) {
String tagString = key + "=" + value;
zipkinSpan.getTags().add(tagString);
addAutocompleteTags(minuteTimeBucket, key, value, tagString);
addAutocompleteTags(minuteTimeBucket, key, value);
}
});
......@@ -110,11 +111,11 @@ public class SpanForward {
});
}
private void addAutocompleteTags(final long minuteTimeBucket, final String key, final String value, final String tagString) {
TraceTagAutocomplete tagAutocomplete = new TraceTagAutocomplete();
tagAutocomplete.setTag(tagString);
private void addAutocompleteTags(final long minuteTimeBucket, final String key, final String value) {
TagAutocomplete tagAutocomplete = new TagAutocomplete();
tagAutocomplete.setTagKey(key);
tagAutocomplete.setTagValue(value);
tagAutocomplete.setTagType(TagType.TRACE);
tagAutocomplete.setTimeBucket(minuteTimeBucket);
receiver.receive(tagAutocomplete);
}
......
......@@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfili
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingScheduleDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingTaskDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IServiceLabelDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITagAutoCompleteQueryDAO;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
......@@ -76,6 +77,7 @@ import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.Servi
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ProfileTaskLogEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ProfileTaskQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ProfileThreadSnapshotQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TagAutoCompleteQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopNRecordsQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopologyQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TraceQueryEsDAO;
......@@ -209,6 +211,8 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
new EBPFProfilingDataEsDAO(elasticSearchClient, config));
this.registerServiceImplementation(IServiceLabelDAO.class,
new ServiceLabelEsDAO(elasticSearchClient, config));
this.registerServiceImplementation(
ITagAutoCompleteQueryDAO.class, new TagAutoCompleteQueryDAO(elasticSearchClient));
}
@Override
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.skywalking.library.elasticsearch.requests.search.BoolQueryBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.Query;
import org.apache.skywalking.library.elasticsearch.requests.search.RangeQueryBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
import org.apache.skywalking.library.elasticsearch.requests.search.SearchBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.aggregation.Aggregation;
import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagAutocompleteData;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagType;
import org.apache.skywalking.oap.server.core.storage.query.ITagAutoCompleteQueryDAO;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeRangeIndexNameGenerator;
public class TagAutoCompleteQueryDAO extends EsDAO implements ITagAutoCompleteQueryDAO {
public TagAutoCompleteQueryDAO(ElasticSearchClient client) {
super(client);
}
@Override
public Set<String> queryTagAutocompleteKeys(final TagType tagType,
final long startSecondTB,
final long endSecondTB) throws IOException {
BoolQueryBuilder query = Query.bool();
appendTagAutocompleteCondition(tagType, startSecondTB, endSecondTB, query);
final SearchBuilder search = Search.builder().query(query);
search.aggregation(Aggregation.terms(TagAutocompleteData.TAG_KEY)
.field(TagAutocompleteData.TAG_KEY));
final SearchResponse response = getClient().search(
new TimeRangeIndexNameGenerator(
IndexController.LogicIndicesRegister.getPhysicalTableName(TagAutocompleteData.INDEX_NAME),
startSecondTB, endSecondTB
),
search.build()
);
Map<String, Object> terms =
(Map<String, Object>) response.getAggregations().get(TagAutocompleteData.TAG_KEY);
List<Map<String, Object>> buckets = (List<Map<String, Object>>) terms.get("buckets");
Set<String> tagKeys = new HashSet<>();
for (Map<String, Object> bucket : buckets) {
String tagKey = (String) bucket.get("key");
if (StringUtil.isEmpty(tagKey)) {
continue;
}
tagKeys.add(tagKey);
}
return tagKeys;
}
@Override
public Set<String> queryTagAutocompleteValues(final TagType tagType, final String tagKey,
final int limit,
final long startSecondTB,
final long endSecondTB) throws IOException {
BoolQueryBuilder query = Query.bool().must(Query.term(TagAutocompleteData.TAG_KEY, tagKey));
appendTagAutocompleteCondition(tagType, startSecondTB, endSecondTB, query);
final SearchBuilder search = Search.builder().query(query).size(limit);
final SearchResponse response = getClient().search(
new TimeRangeIndexNameGenerator(
IndexController.LogicIndicesRegister.getPhysicalTableName(TagAutocompleteData.INDEX_NAME),
startSecondTB, endSecondTB
),
search.build()
);
Set<String> tagValues = new HashSet<>();
for (SearchHit searchHit : response.getHits().getHits()) {
TagAutocompleteData tag = new TagAutocompleteData.Builder().storage2Entity(
new HashMapConverter.ToEntity(searchHit.getSource()));
tagValues.add(tag.getTagValue());
}
return tagValues;
}
private void appendTagAutocompleteCondition(final TagType tagType, final long startSecondTB,
final long endSecondTB,
final BoolQueryBuilder query) {
long startMinTB = startSecondTB / 100;
long endMinTB = endSecondTB / 100;
query.must(Query.term(TagAutocompleteData.TAG_TYPE, tagType.name()));
final RangeQueryBuilder rangeQuery = Query.range(TagAutocompleteData.TIME_BUCKET);
if (startMinTB > 0) {
rangeQuery.gte(startMinTB);
}
if (endMinTB > 0) {
rangeQuery.lte(endMinTB);
}
if (startMinTB > 0 || endMinTB > 0) {
query.must(rangeQuery);
}
}
}
......@@ -22,23 +22,18 @@ import com.google.common.base.Strings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.skywalking.library.elasticsearch.requests.search.BoolQueryBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.Query;
import org.apache.skywalking.library.elasticsearch.requests.search.RangeQueryBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
import org.apache.skywalking.library.elasticsearch.requests.search.SearchBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.Sort;
import org.apache.skywalking.library.elasticsearch.requests.search.aggregation.Aggregation;
import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.TraceTagAutocompleteData;
import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
import org.apache.skywalking.oap.server.core.query.type.Span;
......@@ -187,69 +182,4 @@ public class TraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
public List<Span> doFlexibleTraceQuery(String traceId) throws IOException {
return Collections.emptyList();
}
@Override
public Set<String> queryTraceTagAutocompleteKeys(final long startSecondTB,
final long endSecondTB) throws IOException {
BoolQueryBuilder query = Query.bool();
appendTagAutocompleteCondition(startSecondTB, endSecondTB, query);
final SearchBuilder search = Search.builder().query(query);
search.aggregation(Aggregation.terms(TraceTagAutocompleteData.TAG_KEY)
.field(TraceTagAutocompleteData.TAG_KEY));
final SearchResponse response = getClient().search(
new TimeRangeIndexNameGenerator(TraceTagAutocompleteData.INDEX_NAME, startSecondTB, endSecondTB),
search.build()
);
Map<String, Object> terms =
(Map<String, Object>) response.getAggregations().get(TraceTagAutocompleteData.TAG_KEY);
List<Map<String, Object>> buckets = (List<Map<String, Object>>) terms.get("buckets");
Set<String> tagKeys = new HashSet<>();
for (Map<String, Object> bucket : buckets) {
String tagKey = (String) bucket.get("key");
if (bucket.get("key") == null) {
continue;
}
tagKeys.add(tagKey);
}
return tagKeys;
}
@Override
public Set<String> queryTraceTagAutocompleteValues(final String tagKey, final int limit, final long startSecondTB,
final long endSecondTB) throws IOException {
BoolQueryBuilder query = Query.bool().must(Query.term(TraceTagAutocompleteData.TAG_KEY, tagKey));
appendTagAutocompleteCondition(startSecondTB, endSecondTB, query);
final SearchBuilder search = Search.builder().query(query).size(limit);
final SearchResponse response = getClient().search(
new TimeRangeIndexNameGenerator(
IndexController.LogicIndicesRegister.getPhysicalTableName(TraceTagAutocompleteData.INDEX_NAME),
startSecondTB, endSecondTB
),
search.build()
);
Set<String> tagValues = new HashSet<>();
for (SearchHit searchHit : response.getHits().getHits()) {
TraceTagAutocompleteData tag = new TraceTagAutocompleteData.Builder().storage2Entity(
new HashMapConverter.ToEntity(searchHit.getSource()));
tagValues.add(tag.getTagValue());
}
return tagValues;
}
private void appendTagAutocompleteCondition(final long startSecondTB, final long endSecondTB, final BoolQueryBuilder query) {
long startMinTB = startSecondTB / 100;
long endMinTB = endSecondTB / 100;
final RangeQueryBuilder rangeQuery = Query.range(TraceTagAutocompleteData.TIME_BUCKET);
if (startMinTB > 0) {
rangeQuery.gte(startMinTB);
}
if (endMinTB > 0) {
rangeQuery.lte(endMinTB);
}
if (startMinTB > 0 || endMinTB > 0) {
query.must(rangeQuery);
}
}
}
......@@ -43,6 +43,7 @@ import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITagAutoCompleteQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
......@@ -69,6 +70,7 @@ import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.ServiceLab
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.ProfileTaskLogQuery;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.ProfileTaskQuery;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.ProfileThreadSnapshotQuery;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.TagAutoCompleteQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.TopNRecordsQuery;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.TopologyQuery;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.TraceQuery;
......@@ -139,6 +141,7 @@ public class InfluxStorageProvider extends ModuleProvider {
this.registerServiceImplementation(IEBPFProfilingScheduleDAO.class, new EBPFProfilingScheduleQuery(client));
this.registerServiceImplementation(IEBPFProfilingDataDAO.class, new EBPFProfilingDataQuery(client));
this.registerServiceImplementation(IServiceLabelDAO.class, new ServiceLabelQuery(client));
this.registerServiceImplementation(ITagAutoCompleteQueryDAO.class, new TagAutoCompleteQueryDAO(client));
}
@Override
......
......@@ -30,7 +30,6 @@ import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTr
import org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessTraffic;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagAutocompleteData;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.TraceTagAutocompleteData;
import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
......@@ -105,7 +104,7 @@ public class TableMetaInfo {
}
}
if (model.getName().equals(TraceTagAutocompleteData.INDEX_NAME)) {
if (model.getName().equals(TagAutocompleteData.INDEX_NAME)) {
if (storageAndColumnMap.containsKey(TagAutocompleteData.TAG_KEY)) {
storageAndTagMap.put(TagAutocompleteData.TAG_KEY, InfluxConstants.TagName.AUTOCOMPLETE_TAG_KEY);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.influxdb.query;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagAutocompleteData;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagType;
import org.apache.skywalking.oap.server.core.storage.query.ITagAutoCompleteQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
import org.influxdb.dto.QueryResult;
import org.influxdb.querybuilder.SelectQueryImpl;
import org.influxdb.querybuilder.WhereQueryImpl;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.lte;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
@Slf4j
@RequiredArgsConstructor
public class TagAutoCompleteQueryDAO implements ITagAutoCompleteQueryDAO {
private final InfluxClient client;
@Override
public Set<String> queryTagAutocompleteKeys(final TagType tagType,
final long startSecondTB,
final long endSecondTB) throws IOException {
WhereQueryImpl<SelectQueryImpl> query = select()
.function("distinct", TagAutocompleteData.TAG_KEY)
.from(client.getDatabase(), TagAutocompleteData.INDEX_NAME)
.where();
appendTagAutocompleteCondition(tagType, startSecondTB, endSecondTB, query);
QueryResult.Series series = client.queryForSingleSeries(query);
if (log.isDebugEnabled()) {
log.debug("SQL: {} result set: {}", query.getCommand(), series);
}
if (series == null) {
return Collections.emptySet();
}
Set<String> tagKeys = new HashSet<>();
for (List<Object> values : series.getValues()) {
String tagKey = (String) values.get(1);
tagKeys.add(tagKey);
}
return tagKeys;
}
@Override
public Set<String> queryTagAutocompleteValues(final TagType tagType,
final String tagKey,
final int limit,
final long startSecondTB,
final long endSecondTB) throws IOException {
WhereQueryImpl<SelectQueryImpl> query = select()
.column(TagAutocompleteData.TAG_VALUE)
.from(client.getDatabase(), TagAutocompleteData.INDEX_NAME)
.where();
query.limit(limit);
query.and(eq(TagAutocompleteData.TAG_KEY, tagKey));
appendTagAutocompleteCondition(tagType, startSecondTB, endSecondTB, query);
QueryResult.Series series = client.queryForSingleSeries(query);
if (log.isDebugEnabled()) {
log.debug("SQL: {} result set: {}", query.getCommand(), series);
}
if (series == null) {
return Collections.emptySet();
}
Set<String> tagValues = new HashSet<>();
for (List<Object> values : series.getValues()) {
String tagValue = (String) values.get(1);
tagValues.add(tagValue);
}
return tagValues;
}
private void appendTagAutocompleteCondition(final TagType tagType,
final long startSecondTB,
final long endSecondTB,
final WhereQueryImpl<SelectQueryImpl> query) {
query.and(eq(TagAutocompleteData.TAG_TYPE, tagType.name()));
long startMinTB = startSecondTB / 100;
long endMinTB = endSecondTB / 100;
if (startMinTB > 0) {
query.and(gte(TagAutocompleteData.TIME_BUCKET, startMinTB));
}
if (endMinTB > 0) {
query.and(lte(TagAutocompleteData.TIME_BUCKET, endMinTB));
}
}
}
......@@ -23,11 +23,8 @@ import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.Base64;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.TraceTagAutocompleteData;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
......@@ -48,7 +45,6 @@ import org.influxdb.querybuilder.SelectQueryImpl;
import org.influxdb.querybuilder.WhereNested;
import org.influxdb.querybuilder.WhereQueryImpl;
import org.influxdb.querybuilder.clauses.Clause;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.contains;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte;
......@@ -226,71 +222,4 @@ public class TraceQuery implements ITraceQueryDAO {
public List<Span> doFlexibleTraceQuery(String traceId) {
return Collections.emptyList();
}
@Override
public Set<String> queryTraceTagAutocompleteKeys(final long startSecondTB,
final long endSecondTB) throws IOException {
WhereQueryImpl<SelectQueryImpl> query = select()
.function("distinct", TraceTagAutocompleteData.TAG_KEY)
.from(client.getDatabase(), TraceTagAutocompleteData.INDEX_NAME)
.where();
appendTagAutocompleteCondition(startSecondTB, endSecondTB, query);
QueryResult.Series series = client.queryForSingleSeries(query);
if (log.isDebugEnabled()) {
log.debug("SQL: {} result set: {}", query.getCommand(), series);
}
if (series == null) {
return Collections.emptySet();
}
Set<String> tagKeys = new HashSet<>();
for (List<Object> values : series.getValues()) {
String tagKey = (String) values.get(1);
tagKeys.add(tagKey);
}
return tagKeys;
}
@Override
public Set<String> queryTraceTagAutocompleteValues(final String tagKey,
final int limit,
final long startSecondTB,
final long endSecondTB) throws IOException {
WhereQueryImpl<SelectQueryImpl> query = select()
.column(TraceTagAutocompleteData.TAG_VALUE)
.from(client.getDatabase(), TraceTagAutocompleteData.INDEX_NAME)
.where();
query.limit(limit);
query.and(eq(TraceTagAutocompleteData.TAG_KEY, tagKey));
appendTagAutocompleteCondition(startSecondTB, endSecondTB, query);
QueryResult.Series series = client.queryForSingleSeries(query);
if (log.isDebugEnabled()) {
log.debug("SQL: {} result set: {}", query.getCommand(), series);
}
if (series == null) {
return Collections.emptySet();
}
Set<String> tagValues = new HashSet<>();
for (List<Object> values : series.getValues()) {
String tagValue = (String) values.get(1);
tagValues.add(tagValue);
}
return tagValues;
}
private void appendTagAutocompleteCondition(final long startSecondTB,
final long endSecondTB,
final WhereQueryImpl<SelectQueryImpl> query) {
long startMinTB = startSecondTB / 100;
long endMinTB = endSecondTB / 100;
if (startMinTB > 0) {
query.and(gte(TraceTagAutocompleteData.TIME_BUCKET, startMinTB));
}
if (endMinTB > 0) {
query.and(lte(TraceTagAutocompleteData.TIME_BUCKET, endMinTB));
}
}
}
......@@ -30,10 +30,12 @@ public interface IoTDBIndexes {
String PROCESS_ID_INX = "process_id";
String AGENT_ID_INX = "agent_id";
String AUTOCOMPLETE_TAG_KEY = "tag_key";
String AUTOCOMPLETE_TAG_TYPE = "tag_type";
static boolean isIndex(String key) {
return key.equals(ID_IDX) || key.equals(ENTITY_ID_IDX) || key.equals(LAYER_IDX) ||
key.equals(SERVICE_ID_IDX) || key.equals(GROUP_IDX) || key.equals(TRACE_ID_IDX) ||
key.equals(INSTANCE_ID_INX) || key.equals(AGENT_ID_INX) || key.equals(PROCESS_ID_INX) || key.equals(AUTOCOMPLETE_TAG_KEY);
key.equals(INSTANCE_ID_INX) || key.equals(AGENT_ID_INX) || key.equals(PROCESS_ID_INX) ||
key.equals(AUTOCOMPLETE_TAG_KEY) || key.equals(AUTOCOMPLETE_TAG_TYPE);
}
}
......@@ -45,6 +45,7 @@ import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITagAutoCompleteQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
......@@ -72,6 +73,7 @@ import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBLogQuery
import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBMetadataQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBMetricsQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBServiceLabelQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBTagAutoCompleteQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBTopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBTopologyQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBTraceQueryDAO;
......@@ -139,6 +141,7 @@ public class IoTDBStorageProvider extends ModuleProvider {
this.registerServiceImplementation(IEBPFProfilingScheduleDAO.class, new IoTDBEBPFProfilingScheduleDAO(client));
this.registerServiceImplementation(IEBPFProfilingDataDAO.class, new IoTDBEBPFProfilingDataDAO(client));
this.registerServiceImplementation(IServiceLabelDAO.class, new IoTDBServiceLabelQueryDAO(client));
this.registerServiceImplementation(ITagAutoCompleteQueryDAO.class, new IoTDBTagAutoCompleteQueryDAO(client));
}
@Override
......
......@@ -89,6 +89,10 @@ public class IoTDBTableMetaInfo {
indexes.add(IoTDBIndexes.AUTOCOMPLETE_TAG_KEY);
}
if (storageAndIndexMap.containsValue(IoTDBIndexes.AUTOCOMPLETE_TAG_TYPE)) {
indexes.add(IoTDBIndexes.AUTOCOMPLETE_TAG_TYPE);
}
final IoTDBTableMetaInfo tableMetaInfo = IoTDBTableMetaInfo.builder()
.model(model)
.columnAndTypeMap(columnAndTypeMap)
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.iotdb.query;
import com.google.common.base.Splitter;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionDataSetWrapper;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagAutocompleteData;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagType;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.query.ITagAutoCompleteQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBIndexes;
import org.apache.skywalking.oap.server.storage.plugin.iotdb.utils.IoTDBUtils;
@RequiredArgsConstructor
public class IoTDBTagAutoCompleteQueryDAO implements ITagAutoCompleteQueryDAO {
private final IoTDBClient client;
@Override
public Set<String> queryTagAutocompleteKeys(final TagType tagType,
final long startSecondTB,
final long endSecondTB) throws IOException {
StringBuilder query = new StringBuilder();
query.append("select *").append(" from ");
IoTDBUtils.addModelPath(client.getStorageGroup(), query, TagAutocompleteData.INDEX_NAME);
Map<String, String> indexAndValueMap = new HashMap<>();
indexAndValueMap.put(IoTDBIndexes.AUTOCOMPLETE_TAG_TYPE, tagType.name());
IoTDBUtils.addQueryIndexValue(TagAutocompleteData.INDEX_NAME, query, indexAndValueMap);
appendTagAutocompleteCondition(tagType, startSecondTB, endSecondTB, query);
query.append(IoTDBClient.ALIGN_BY_DEVICE);
SessionPool sessionPool = client.getSessionPool();
SessionDataSetWrapper wrapper = null;
Set<String> tagKeys = new HashSet<>();
try {
wrapper = sessionPool.executeQueryStatement(query.toString());
while (wrapper.hasNext()) {
RowRecord rowRecord = wrapper.next();
List<String> resultList = Splitter.on(IoTDBClient.DOT + "\"")
.splitToList(rowRecord.getFields().get(0).getStringValue());
String tagKey = resultList.get(resultList.size() - 2);
tagKeys.add(tagKey.substring(0, tagKey.length() - 1));
}
} catch (IoTDBConnectionException | StatementExecutionException e) {
throw new IOException(e);
} finally {
if (wrapper != null) {
sessionPool.closeResultSet(wrapper);
}
}
return tagKeys;
}
@Override
public Set<String> queryTagAutocompleteValues(final TagType tagType,
final String tagKey,
final int limit,
final long startSecondTB,
final long endSecondTB) throws IOException {
StringBuilder query = new StringBuilder();
query.append("select * from ");
IoTDBUtils.addModelPath(client.getStorageGroup(), query, TagAutocompleteData.INDEX_NAME);
Map<String, String> indexAndValueMap = new HashMap<>();
indexAndValueMap.put(IoTDBIndexes.AUTOCOMPLETE_TAG_KEY, tagKey);
indexAndValueMap.put(IoTDBIndexes.AUTOCOMPLETE_TAG_TYPE, tagType.name());
IoTDBUtils.addQueryIndexValue(TagAutocompleteData.INDEX_NAME, query, indexAndValueMap);
appendTagAutocompleteCondition(tagType, startSecondTB, endSecondTB, query);
query.append(" limit ").append(limit).append(IoTDBClient.ALIGN_BY_DEVICE);
List<? super StorageData> storageDataList = client.filterQuery(TagAutocompleteData.INDEX_NAME,
query.toString(),
new TagAutocompleteData.Builder()
);
Set<String> tagValues = new HashSet<>();
storageDataList.forEach(storageData -> {
TagAutocompleteData tagAutocompleteData = (TagAutocompleteData) storageData;
tagValues.add(tagAutocompleteData.getTagValue());
});
return tagValues;
}
private void appendTagAutocompleteCondition(final TagType tagType,
final long startSecondTB,
final long endSecondTB,
final StringBuilder query) {
long startMinTB = startSecondTB / 100;
long endMinTB = endSecondTB / 100;
StringBuilder where = new StringBuilder();
if (startMinTB > 0) {
where.append(IoTDBClient.TIME).append(" >= ").append(TimeBucket.getTimestamp(startMinTB));
}
if (endMinTB > 0) {
if (where.length() > 0) {
where.append(" and ");
}
where.append(IoTDBClient.TIME).append(" <= ").append(TimeBucket.getTimestamp(endMinTB));
}
if (where.length() > 0) {
query.append(" where ").append(where);
}
}
}
......@@ -18,27 +18,18 @@
package org.apache.skywalking.oap.server.storage.plugin.iotdb.query;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionDataSetWrapper;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.TraceTagAutocompleteData;
import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
import org.apache.skywalking.oap.server.core.query.type.Span;
......@@ -182,82 +173,4 @@ public class IoTDBTraceQueryDAO implements ITraceQueryDAO {
public List<Span> doFlexibleTraceQuery(String traceId) {
return Collections.emptyList();
}
@Override
public Set<String> queryTraceTagAutocompleteKeys(final long startSecondTB,
final long endSecondTB) throws IOException {
StringBuilder query = new StringBuilder();
query.append("select *").append(" from ");
IoTDBUtils.addModelPath(client.getStorageGroup(), query, TraceTagAutocompleteData.INDEX_NAME);
appendTagAutocompleteCondition(startSecondTB, endSecondTB, query);
query.append(IoTDBClient.ALIGN_BY_DEVICE);
SessionPool sessionPool = client.getSessionPool();
SessionDataSetWrapper wrapper = null;
Set<String> tagKeys = new HashSet<>();
try {
wrapper = sessionPool.executeQueryStatement(query.toString());
while (wrapper.hasNext()) {
RowRecord rowRecord = wrapper.next();
List<String> resultList = Splitter.on(IoTDBClient.DOT + "\"")
.splitToList(rowRecord.getFields().get(0).getStringValue());
String tagKey = resultList.get(resultList.size() - 1);
tagKeys.add(tagKey.substring(0, tagKey.length() - 1));
}
} catch (IoTDBConnectionException | StatementExecutionException e) {
throw new IOException(e);
} finally {
if (wrapper != null) {
sessionPool.closeResultSet(wrapper);
}
}
return tagKeys;
}
@Override
public Set<String> queryTraceTagAutocompleteValues(final String tagKey,
final int limit,
final long startSecondTB,
final long endSecondTB) throws IOException {
StringBuilder query = new StringBuilder();
query.append("select * from ");
IoTDBUtils.addModelPath(client.getStorageGroup(), query, TraceTagAutocompleteData.INDEX_NAME);
Map<String, String> indexAndValueMap = new HashMap<>();
indexAndValueMap.put(IoTDBIndexes.AUTOCOMPLETE_TAG_KEY, tagKey);
IoTDBUtils.addQueryIndexValue(TraceTagAutocompleteData.INDEX_NAME, query, indexAndValueMap);
appendTagAutocompleteCondition(startSecondTB, endSecondTB, query);
query.append(" limit ").append(limit).append(IoTDBClient.ALIGN_BY_DEVICE);
List<? super StorageData> storageDataList = client.filterQuery(TraceTagAutocompleteData.INDEX_NAME,
query.toString(),
new TraceTagAutocompleteData.Builder()
);
Set<String> tagValues = new HashSet<>();
storageDataList.forEach(storageData -> {
TraceTagAutocompleteData tagAutocompleteData = (TraceTagAutocompleteData) storageData;
tagValues.add(tagAutocompleteData.getTagValue());
});
return tagValues;
}
private void appendTagAutocompleteCondition(final long startSecondTB, final long endSecondTB, final StringBuilder query) {
long startMinTB = startSecondTB / 100;
long endMinTB = endSecondTB / 100;
StringBuilder where = new StringBuilder();
if (startMinTB > 0) {
where.append(IoTDBClient.TIME).append(" >= ").append(TimeBucket.getTimestamp(startMinTB));
}
if (endMinTB > 0) {
if (where.length() > 0) {
where.append(" and ");
}
where.append(IoTDBClient.TIME).append(" <= ").append(TimeBucket.getTimestamp(endMinTB));
}
if (where.length() > 0) {
query.append(" where ").append(where);
}
}
}
......@@ -46,6 +46,7 @@ import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITagAutoCompleteQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
......@@ -74,6 +75,7 @@ import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ProfileTask
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ProfileThreadSnapshotQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2StorageDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TableInstaller;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TagAutoCompleteQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TopologyQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TraceQueryDAO;
......@@ -179,6 +181,7 @@ public class H2StorageProvider extends ModuleProvider {
this.registerServiceImplementation(IEBPFProfilingScheduleDAO.class, new H2EBPFProfilingScheduleDAO(h2Client));
this.registerServiceImplementation(IEBPFProfilingDataDAO.class, new H2EBPFProfilingDataDAO(h2Client));
this.registerServiceImplementation(IServiceLabelDAO.class, new H2ServiceLabelQueryDAO(h2Client));
this.registerServiceImplementation(ITagAutoCompleteQueryDAO.class, new H2TagAutoCompleteQueryDAO(h2Client));
}
@Override
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagAutocompleteData;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagType;
import org.apache.skywalking.oap.server.core.storage.query.ITagAutoCompleteQueryDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
@RequiredArgsConstructor
public class H2TagAutoCompleteQueryDAO implements ITagAutoCompleteQueryDAO {
private final JDBCHikariCPClient h2Client;
@Override
public Set<String> queryTagAutocompleteKeys(final TagType tagType,
final long startSecondTB,
final long endSecondTB) throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> condition = new ArrayList<>(2);
sql.append("select distinct ").append(TagAutocompleteData.TAG_KEY).append(" from ")
.append(TagAutocompleteData.INDEX_NAME).append(" where ");
sql.append(" 1=1 ");
appendTagAutocompleteCondition(tagType, startSecondTB, endSecondTB, sql, condition);
try (Connection connection = h2Client.getConnection()) {
ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]));
Set<String> tagKeys = new HashSet<>();
while (resultSet.next()) {
tagKeys.add(resultSet.getString(TagAutocompleteData.TAG_KEY));
}
return tagKeys;
} catch (SQLException e) {
throw new IOException(e);
}
}
@Override
public Set<String> queryTagAutocompleteValues(final TagType tagType,
final String tagKey,
final int limit,
final long startSecondTB,
final long endSecondTB) throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> condition = new ArrayList<>(3);
sql.append("select * from ").append(TagAutocompleteData.INDEX_NAME).append(" where ");
sql.append(TagAutocompleteData.TAG_KEY).append(" = ?");
condition.add(tagKey);
appendTagAutocompleteCondition(tagType, startSecondTB, endSecondTB, sql, condition);
sql.append(" limit ").append(limit);
try (Connection connection = h2Client.getConnection()) {
ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]));
Set<String> tagValues = new HashSet<>();
while (resultSet.next()) {
tagValues.add(resultSet.getString(TagAutocompleteData.TAG_VALUE));
}
return tagValues;
} catch (SQLException e) {
throw new IOException(e);
}
}
private void appendTagAutocompleteCondition(final TagType tagType,
final long startSecondTB,
final long endSecondTB,
final StringBuilder sql,
final List<Object> condition) {
sql.append(" and ");
sql.append(TagAutocompleteData.TAG_TYPE).append(" = ?");
condition.add(tagType.name());
long startMinTB = startSecondTB / 100;
long endMinTB = endSecondTB / 100;
if (startMinTB > 0) {
sql.append(" and ");
sql.append(TagAutocompleteData.TIME_BUCKET).append(">=?");
condition.add(startMinTB);
}
if (endMinTB > 0) {
sql.append(" and ");
sql.append(TagAutocompleteData.TIME_BUCKET).append("<=?");
condition.add(endMinTB);
}
}
}
......@@ -27,10 +27,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.TraceTagAutocompleteData;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
......@@ -256,71 +253,4 @@ public class H2TraceQueryDAO implements ITraceQueryDAO {
public List<Span> doFlexibleTraceQuery(String traceId) {
return Collections.emptyList();
}
@Override
public Set<String> queryTraceTagAutocompleteKeys(final long startSecondTB,
final long endSecondTB) throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> condition = new ArrayList<>(2);
sql.append("select distinct ").append(TraceTagAutocompleteData.TAG_KEY).append(" from ")
.append(TraceTagAutocompleteData.INDEX_NAME).append(" where ");
sql.append(" 1=1 ");
appendTagAutocompleteCondition(startSecondTB, endSecondTB, sql, condition);
try (Connection connection = h2Client.getConnection()) {
ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]));
Set<String> tagKeys = new HashSet<>();
while (resultSet.next()) {
tagKeys.add(resultSet.getString(TraceTagAutocompleteData.TAG_KEY));
}
return tagKeys;
} catch (SQLException e) {
throw new IOException(e);
}
}
@Override
public Set<String> queryTraceTagAutocompleteValues(final String tagKey,
final int limit,
final long startSecondTB,
final long endSecondTB) throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> condition = new ArrayList<>(3);
sql.append("select * from ").append(TraceTagAutocompleteData.INDEX_NAME).append(" where ");
sql.append(TraceTagAutocompleteData.TAG_KEY).append(" = ?");
condition.add(tagKey);
appendTagAutocompleteCondition(startSecondTB, endSecondTB, sql, condition);
sql.append(" limit ").append(limit);
try (Connection connection = h2Client.getConnection()) {
ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]));
Set<String> tagValues = new HashSet<>();
while (resultSet.next()) {
tagValues.add(resultSet.getString(TraceTagAutocompleteData.TAG_VALUE));
}
return tagValues;
} catch (SQLException e) {
throw new IOException(e);
}
}
private void appendTagAutocompleteCondition(final long startSecondTB,
final long endSecondTB,
final StringBuilder sql,
final List<Object> condition) {
long startMinTB = startSecondTB / 100;
long endMinTB = endSecondTB / 100;
if (startMinTB > 0) {
sql.append(" and ");
sql.append(TraceTagAutocompleteData.TIME_BUCKET).append(">=?");
condition.add(startMinTB);
}
if (endMinTB > 0) {
if (!condition.isEmpty()) {
sql.append(" and ");
}
sql.append(TraceTagAutocompleteData.TIME_BUCKET).append("<=?");
condition.add(endMinTB);
}
}
}
......@@ -45,6 +45,7 @@ import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITagAutoCompleteQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
......@@ -68,6 +69,7 @@ import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ProfileTask
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ProfileThreadSnapshotQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2StorageDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TagAutoCompleteQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TopologyQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2UITemplateManagementDAO;
......@@ -165,6 +167,7 @@ public class MySQLStorageProvider extends ModuleProvider {
this.registerServiceImplementation(IEBPFProfilingScheduleDAO.class, new H2EBPFProfilingScheduleDAO(mysqlClient));
this.registerServiceImplementation(IEBPFProfilingDataDAO.class, new H2EBPFProfilingDataDAO(mysqlClient));
this.registerServiceImplementation(IServiceLabelDAO.class, new H2ServiceLabelQueryDAO(mysqlClient));
this.registerServiceImplementation(ITagAutoCompleteQueryDAO.class, new H2TagAutoCompleteQueryDAO(mysqlClient));
}
@Override
......
......@@ -45,6 +45,7 @@ import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITagAutoCompleteQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
......@@ -67,6 +68,7 @@ import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ProfileTask
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ProfileThreadSnapshotQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2StorageDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TagAutoCompleteQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TopologyQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2UITemplateManagementDAO;
......@@ -165,6 +167,7 @@ public class PostgreSQLStorageProvider extends ModuleProvider {
this.registerServiceImplementation(IEBPFProfilingScheduleDAO.class, new H2EBPFProfilingScheduleDAO(postgresqlClient));
this.registerServiceImplementation(IEBPFProfilingDataDAO.class, new H2EBPFProfilingDataDAO(postgresqlClient));
this.registerServiceImplementation(IServiceLabelDAO.class, new H2ServiceLabelQueryDAO(postgresqlClient));
this.registerServiceImplementation(ITagAutoCompleteQueryDAO.class, new H2TagAutoCompleteQueryDAO(postgresqlClient));
}
@Override
......
......@@ -45,6 +45,7 @@ import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITagAutoCompleteQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
......@@ -68,6 +69,7 @@ import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ProfileTask
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ProfileThreadSnapshotQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2StorageDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TagAutoCompleteQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TopologyQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2UITemplateManagementDAO;
......@@ -170,6 +172,7 @@ public class TiDBStorageProvider extends ModuleProvider {
this.registerServiceImplementation(IEBPFProfilingScheduleDAO.class, new H2EBPFProfilingScheduleDAO(mysqlClient));
this.registerServiceImplementation(IEBPFProfilingDataDAO.class, new H2EBPFProfilingDataDAO(mysqlClient));
this.registerServiceImplementation(IServiceLabelDAO.class, new H2ServiceLabelQueryDAO(mysqlClient));
this.registerServiceImplementation(ITagAutoCompleteQueryDAO.class, new H2TagAutoCompleteQueryDAO(mysqlClient));
}
@Override
......
......@@ -23,23 +23,18 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.skywalking.library.elasticsearch.requests.search.BoolQueryBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.Query;
import org.apache.skywalking.library.elasticsearch.requests.search.RangeQueryBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
import org.apache.skywalking.library.elasticsearch.requests.search.SearchBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.Sort;
import org.apache.skywalking.library.elasticsearch.requests.search.aggregation.Aggregation;
import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.TraceTagAutocompleteData;
import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
import org.apache.skywalking.oap.server.core.query.type.KeyValue;
import org.apache.skywalking.oap.server.core.query.type.LogEntity;
......@@ -55,8 +50,6 @@ import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeRangeIndexNameGenerator;
import org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord;
import zipkin2.Span;
import zipkin2.codec.SpanBytesDecoder;
......@@ -255,69 +248,4 @@ public class ZipkinTraceQueryEsDAO extends EsDAO implements ITraceQueryDAO {
}
return spanList;
}
@Override
public Set<String> queryTraceTagAutocompleteKeys(final long startSecondTB,
final long endSecondTB) throws IOException {
BoolQueryBuilder query = Query.bool();
appendTagAutocompleteCondition(startSecondTB, endSecondTB, query);
final SearchBuilder search = Search.builder().query(query);
search.aggregation(Aggregation.terms(TraceTagAutocompleteData.TAG_KEY)
.field(TraceTagAutocompleteData.TAG_KEY));
final SearchResponse response = getClient().search(
new TimeRangeIndexNameGenerator(TraceTagAutocompleteData.INDEX_NAME, startSecondTB, endSecondTB),
search.build()
);
Map<String, Object> terms =
(Map<String, Object>) response.getAggregations().get(TraceTagAutocompleteData.TAG_KEY);
List<Map<String, Object>> buckets = (List<Map<String, Object>>) terms.get("buckets");
Set<String> tagKeys = new HashSet<>();
for (Map<String, Object> bucket : buckets) {
String tagKey = (String) bucket.get("key");
if (bucket.get("key") == null) {
continue;
}
tagKeys.add(tagKey);
}
return tagKeys;
}
@Override
public Set<String> queryTraceTagAutocompleteValues(final String tagKey, final int limit, final long startSecondTB,
final long endSecondTB) throws IOException {
BoolQueryBuilder query = Query.bool().must(Query.term(TraceTagAutocompleteData.TAG_KEY, tagKey));
appendTagAutocompleteCondition(startSecondTB, endSecondTB, query);
final SearchBuilder search = Search.builder().query(query).size(limit);
final SearchResponse response = getClient().search(
new TimeRangeIndexNameGenerator(
IndexController.LogicIndicesRegister.getPhysicalTableName(TraceTagAutocompleteData.INDEX_NAME),
startSecondTB, endSecondTB
),
search.build()
);
Set<String> tagValues = new HashSet<>();
for (SearchHit searchHit : response.getHits().getHits()) {
TraceTagAutocompleteData tag = new TraceTagAutocompleteData.Builder().storage2Entity(
new HashMapConverter.ToEntity(searchHit.getSource()));
tagValues.add(tag.getTagValue());
}
return tagValues;
}
private void appendTagAutocompleteCondition(final long startSecondTB, final long endSecondTB, final BoolQueryBuilder query) {
long startMinTB = startSecondTB / 100;
long endMinTB = endSecondTB / 100;
final RangeQueryBuilder rangeQuery = Query.range(TraceTagAutocompleteData.TIME_BUCKET);
if (startMinTB > 0) {
rangeQuery.gte(startMinTB);
}
if (endMinTB > 0) {
rangeQuery.lte(endMinTB);
}
if (startMinTB > 0 || endMinTB > 0) {
query.must(rangeQuery);
}
}
}
......@@ -21,7 +21,6 @@ package org.apache.skywalking.oap.server.tool.profile.exporter.test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.apm.network.language.agent.v3.SpanObject;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
......@@ -84,20 +83,4 @@ public class ProfileTraceDAO implements ITraceQueryDAO {
public List<Span> doFlexibleTraceQuery(String traceId) throws IOException {
return null;
}
// No need implement here
@Override
public Set<String> queryTraceTagAutocompleteKeys(final long startSecondTB,
final long endSecondTB) throws IOException {
return null;
}
// No need implement here
@Override
public Set<String> queryTraceTagAutocompleteValues(final String tagKey,
final int limit,
final long startSecondTB,
final long endSecondTB) throws IOException {
return null;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册