From 4babd6ff73ee32535f09eb31ecbda736e3052a32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E6=99=9F=20Wu=20Sheng?= Date: Tue, 12 Feb 2019 23:13:02 +0800 Subject: [PATCH] Support Top sql (#2239) * The prototype of topN sql worker. * Add scope and new manual dispatcher. * no message * Finish topN persistence codes. Not test yet. And query have not added. * Finish the top n database statement persistent. * Support different slow thresholds for different db types. * Fix wrong db default threshold * Finish new query protocol binding. * Provide query empty implementation and sync ui. * Finish all codes, hope it works :P I will run the tests tonight. * Fix * Fix * Fix a startup issue. * Add time bucket to persistent. * Fix wrong query result column name. * Fix Database query. * Fix checkstyle. * Fix wrong order logic, and add a test case to verify, to fix https://github.com/apache/incubator-skywalking/pull/2239#discussion_r255948249 --- docker/config/application.yml | 1 + .../oap/server/core/CoreModule.java | 1 + .../oap/server/core/CoreModuleProvider.java | 3 + .../core/analysis/DispatcherManager.java | 4 + .../analysis/data/LimitedSizeDataCache.java | 51 +++++++ .../data/LimitedSizeDataCollection.java | 115 +++++++++++++++ .../analysis/data/NonMergeDataCollection.java | 4 +- .../oap/server/core/analysis/data/Window.java | 10 ++ .../database/DatabaseStatementDispatcher.java | 40 ++++++ .../database/TopNDatabaseStatement.java | 82 +++++++++++ .../oap/server/core/analysis/topn/TopN.java | 46 ++++++ .../analysis/topn/annotation/TopNType.java | 26 ++++ .../topn/annotation/TopNTypeListener.java | 44 ++++++ .../core/analysis/worker/TopNProcess.java | 66 +++++++++ .../core/analysis/worker/TopNWorker.java | 134 ++++++++++++++++++ .../core/query/TopNRecordsQueryService.java | 51 +++++++ .../server/core/query/entity/TopNRecord.java | 32 +++++ .../core/source/DatabaseSlowStatement.java | 44 ++++++ .../oap/server/core/source/Scope.java | 2 +- .../core/storage/ComparableStorageData.java | 27 ++++ .../server/core/storage/PersistenceTimer.java | 1 + .../server/core/storage/StorageModule.java | 3 +- .../storage/query/ITopNRecordsQueryDAO.java | 29 ++++ .../data/LimitedSizeDataCollectionTest.java | 72 ++++++++++ .../query/graphql/GraphQLQueryProvider.java | 2 + .../graphql/resolver/TopNRecordsQuery.java | 59 ++++++++ .../graphql/type/TopNRecordsCondition.java | 32 +++++ .../src/main/resources/query-protocol | 2 +- .../trace/provider/DBLatencyThresholds.java | 51 +++++++ .../trace/provider/TraceModuleProvider.java | 21 +-- .../provider/TraceServiceModuleConfig.java | 9 +- .../trace/provider/parser/SegmentParse.java | 17 ++- .../trace/provider/parser/SegmentParseV2.java | 15 +- .../trace/provider/parser/SpanTags.java | 25 ++++ .../parser/listener/SpanListenerFactory.java | 3 +- .../endpoint/MultiScopesSpanListener.java | 71 +++++++--- .../listener/segment/SegmentSpanListener.java | 3 +- .../service/ServiceMappingSpanListener.java | 3 +- .../receiver/trace/mock/ServiceBMock.java | 4 +- .../src/main/assembly/application.yml | 1 + .../src/main/resources/application.yml | 1 + .../StorageModuleElasticsearchProvider.java | 1 + .../query/TopNRecordsQueryEsDAO.java | 66 +++++++++ .../plugin/jdbc/h2/H2StorageProvider.java | 1 + .../jdbc/h2/dao/H2TopNRecordsQueryDAO.java | 78 ++++++++++ .../jdbc/mysql/MySQLStorageProvider.java | 1 + skywalking-ui | 2 +- 47 files changed, 1297 insertions(+), 59 deletions(-) create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCache.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/annotation/TopNType.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/annotation/TopNTypeListener.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNProcess.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopNRecordsQueryService.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/TopNRecord.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ComparableStorageData.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopNRecordsQueryDAO.java create mode 100644 oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollectionTest.java create mode 100644 oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopNRecordsQuery.java create mode 100644 oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/TopNRecordsCondition.java create mode 100644 oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/DBLatencyThresholds.java create mode 100644 oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SpanTags.java create mode 100644 oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopNRecordsQueryEsDAO.java create mode 100644 oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TopNRecordsQueryDAO.java diff --git a/docker/config/application.yml b/docker/config/application.yml index 1a13b4cff0..f9b645f0c3 100644 --- a/docker/config/application.yml +++ b/docker/config/application.yml @@ -75,6 +75,7 @@ receiver-trace: bufferDataMaxFileSize: ${SW_RECEIVER_BUFFER_DATA_MAX_FILE_SIZE:500} # Unit is MB bufferFileCleanWhenRestart: ${SW_RECEIVER_BUFFER_FILE_CLEAN_WHEN_RESTART:false} sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is 1/10000. 10000 means 100% sample in default. + slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,redis:50} # The slow database access thresholds. Unit ms. receiver-jvm: default: service-mesh: diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java index 1baa2507d9..8eb7302dfc 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java @@ -65,6 +65,7 @@ public class CoreModule extends ModuleDefine { classes.add(MetadataQueryService.class); classes.add(AggregationQueryService.class); classes.add(AlarmQueryService.class); + classes.add(TopNRecordsQueryService.class); } private void addServerInterface(List classes) { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java index 3eb387e1bb..0106fe7338 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java @@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.core; import java.io.IOException; import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorTypeListener; import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordTypeListener; +import org.apache.skywalking.oap.server.core.analysis.topn.annotation.TopNTypeListener; import org.apache.skywalking.oap.server.core.annotation.AnnotationScan; import org.apache.skywalking.oap.server.core.cache.*; import org.apache.skywalking.oap.server.core.cluster.*; @@ -130,12 +131,14 @@ public class CoreModuleProvider extends ModuleProvider { this.registerServiceImplementation(MetadataQueryService.class, new MetadataQueryService(getManager())); this.registerServiceImplementation(AggregationQueryService.class, new AggregationQueryService(getManager())); this.registerServiceImplementation(AlarmQueryService.class, new AlarmQueryService(getManager())); + this.registerServiceImplementation(TopNRecordsQueryService.class, new TopNRecordsQueryService(getManager())); annotationScan.registerListener(storageAnnotationListener); annotationScan.registerListener(streamAnnotationListener); annotationScan.registerListener(new IndicatorTypeListener(getManager())); annotationScan.registerListener(new InventoryTypeListener(getManager())); annotationScan.registerListener(new RecordTypeListener(getManager())); + annotationScan.registerListener(new TopNTypeListener(getManager())); this.remoteClientManager = new RemoteClientManager(getManager()); this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java index 4697792331..61602a6aae 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java @@ -47,6 +47,10 @@ public class DispatcherManager { } public void forward(Source source) { + if (source == null) { + return; + } + for (SourceDispatcher dispatcher : dispatcherMap.get(source.scope())) { dispatcher.dispatch(source); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCache.java new file mode 100644 index 0000000000..33cfe1d684 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCache.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.data; + +import org.apache.skywalking.oap.server.core.storage.*; + +public class LimitedSizeDataCache extends Window implements DataCache { + + private SWCollection limitedSizeDataCollection; + private final int limitSize; + + public LimitedSizeDataCache(int limitSize) { + super(false); + this.limitSize = limitSize; + init(); + } + + @Override public SWCollection collectionInstance() { + return new LimitedSizeDataCollection<>(limitSize); + } + + public void add(STORAGE_DATA data) { + limitedSizeDataCollection.put(data); + } + + @Override public void writing() { + limitedSizeDataCollection = getCurrentAndWriting(); + } + + @Override public void finishWriting() { + limitedSizeDataCollection.finishWriting(); + limitedSizeDataCollection = null; + } +} + diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java new file mode 100644 index 0000000000..12cd96c7f5 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollection.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.data; + +import java.util.*; +import org.apache.skywalking.oap.server.core.storage.ComparableStorageData; + +public class LimitedSizeDataCollection implements SWCollection { + + private final HashMap> data; + private final int limitedSize; + private volatile boolean writing; + private volatile boolean reading; + + LimitedSizeDataCollection(int limitedSize) { + this.data = new HashMap<>(); + this.writing = false; + this.reading = false; + this.limitedSize = limitedSize; + } + + public void finishWriting() { + writing = false; + } + + @Override public void writing() { + writing = true; + } + + @Override public boolean isWriting() { + return writing; + } + + @Override public void finishReading() { + reading = false; + } + + @Override public void reading() { + reading = true; + } + + @Override public boolean isReading() { + return reading; + } + + @Override public int size() { + return data.size(); + } + + @Override public void clear() { + data.clear(); + } + + @Override public boolean containsKey(STORAGE_DATA key) { + throw new UnsupportedOperationException("Limited size data collection doesn't support containsKey operation."); + } + + @Override public STORAGE_DATA get(STORAGE_DATA key) { + throw new UnsupportedOperationException("Limited size data collection doesn't support get operation."); + } + + @Override public void put(STORAGE_DATA value) { + LinkedList storageDataList = this.data.get(value); + if (storageDataList == null) { + storageDataList = new LinkedList<>(); + data.put(value, storageDataList); + } + + if (storageDataList.size() < limitedSize) { + storageDataList.add(value); + return; + } + + for (int i = 0; i < storageDataList.size(); i++) { + STORAGE_DATA storageData = storageDataList.get(i); + if (value.compareTo(storageData) <= 0) { + if (i == 0) { + // input value is less than the smallest in top N list, ignore + } else { + // Remove the smallest in top N list + // add the current value into the right position + storageDataList.add(i, value); + storageDataList.removeFirst(); + } + return; + } + } + + // Add the value as biggest in top N list + storageDataList.addLast(value); + storageDataList.removeFirst(); + } + + @Override public Collection collection() { + List collection = new ArrayList<>(); + data.values().forEach(e -> e.forEach(collection::add)); + return collection; + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java index 1d7a53c058..bba2fc05c3 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java @@ -69,11 +69,11 @@ public class NonMergeDataCollection implements } @Override public boolean containsKey(STORAGE_DATA key) { - throw new UnsupportedOperationException("Close merge data collection not support containsKey operation."); + throw new UnsupportedOperationException("Non merge data collection doesn't support containsKey operation."); } @Override public STORAGE_DATA get(STORAGE_DATA key) { - throw new UnsupportedOperationException("Close merge data collection not support get operation."); + throw new UnsupportedOperationException("Non merge data collection doesn't support get operation."); } @Override public void put(STORAGE_DATA value) { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java index 6f9486dae2..ff2ca6d402 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java @@ -33,6 +33,16 @@ public abstract class Window { private SWCollection windowDataB; Window() { + this(true); + } + + Window(boolean autoInit) { + if (autoInit) { + init(); + } + } + + protected void init() { this.windowDataA = collectionInstance(); this.windowDataB = collectionInstance(); this.pointer = windowDataA; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java new file mode 100644 index 0000000000..f36cca5035 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.manual.database; + +import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher; +import org.apache.skywalking.oap.server.core.analysis.worker.TopNProcess; +import org.apache.skywalking.oap.server.core.source.DatabaseSlowStatement; + +/** + * @author wusheng + */ +public class DatabaseStatementDispatcher implements SourceDispatcher { + @Override public void dispatch(DatabaseSlowStatement source) { + TopNDatabaseStatement statement = new TopNDatabaseStatement(); + statement.setId(source.getId()); + statement.setServiceId(source.getDatabaseServiceId()); + statement.setLatency(source.getLatency()); + statement.setStatement(source.getStatement()); + statement.setTimeBucket(source.getTimeBucket()); + statement.setTraceId(source.getTraceId()); + + TopNProcess.INSTANCE.in(statement); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java new file mode 100644 index 0000000000..2456486429 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.manual.database; + +import java.util.*; +import lombok.*; +import org.apache.skywalking.oap.server.core.*; +import org.apache.skywalking.oap.server.core.analysis.topn.TopN; +import org.apache.skywalking.oap.server.core.analysis.topn.annotation.TopNType; +import org.apache.skywalking.oap.server.core.source.Scope; +import org.apache.skywalking.oap.server.core.storage.StorageBuilder; +import org.apache.skywalking.oap.server.core.storage.annotation.*; + +/** + * Database TopN statement, including Database SQL statement, mongoDB and Redis commands. + * + * @author wusheng + */ +@TopNType +@StorageEntity(name = TopNDatabaseStatement.INDEX_NAME, builder = TopNDatabaseStatement.Builder.class, source = Scope.DatabaseSlowStatement) +public class TopNDatabaseStatement extends TopN { + public static final String INDEX_NAME = "top_n_database_statement"; + + + @Setter private String id; + + @Override public String id() { + return id; + } + + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + TopNDatabaseStatement statement = (TopNDatabaseStatement)o; + return getServiceId() == statement.getServiceId(); + } + + @Override public int hashCode() { + return Objects.hash(getServiceId()); + } + + public static class Builder implements StorageBuilder { + + @Override public TopNDatabaseStatement map2Data(Map dbMap) { + TopNDatabaseStatement statement = new TopNDatabaseStatement(); + statement.setStatement((String)dbMap.get(STATEMENT)); + statement.setTraceId((String)dbMap.get(TRACE_ID)); + statement.setLatency(((Number)dbMap.get(LATENCY)).longValue()); + statement.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue()); + statement.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue()); + return statement; + } + + @Override public Map data2Map(TopNDatabaseStatement storageData) { + Map map = new HashMap<>(); + map.put(STATEMENT, storageData.getStatement()); + map.put(TRACE_ID, storageData.getTraceId()); + map.put(LATENCY, storageData.getLatency()); + map.put(SERVICE_ID, storageData.getServiceId()); + map.put(TIME_BUCKET, storageData.getTimeBucket()); + return map; + } + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.java new file mode 100644 index 0000000000..fae8298b6b --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.topn; + +import lombok.*; +import org.apache.skywalking.oap.server.core.analysis.record.Record; +import org.apache.skywalking.oap.server.core.storage.ComparableStorageData; +import org.apache.skywalking.oap.server.core.storage.annotation.Column; + +/** + * TopN data. + * + * @author wusheng + */ +public abstract class TopN extends Record implements ComparableStorageData { + public static final String STATEMENT = "statement"; + public static final String LATENCY = "latency"; + public static final String TRACE_ID = "trace_id"; + public static final String SERVICE_ID = "service_id"; + + @Getter @Setter @Column(columnName = STATEMENT) private String statement; + @Getter @Setter @Column(columnName = LATENCY) private long latency; + @Getter @Setter @Column(columnName = TRACE_ID) private String traceId; + @Getter @Setter @Column(columnName = SERVICE_ID) private int serviceId; + + @Override public int compareTo(Object o) { + TopN target = (TopN)o; + return (int)(latency - target.latency); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/annotation/TopNType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/annotation/TopNType.java new file mode 100644 index 0000000000..82731b6a88 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/annotation/TopNType.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.topn.annotation; + +import java.lang.annotation.*; + +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +public @interface TopNType { +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/annotation/TopNTypeListener.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/annotation/TopNTypeListener.java new file mode 100644 index 0000000000..d668cddc26 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/annotation/TopNTypeListener.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.topn.annotation; + +import java.lang.annotation.Annotation; +import org.apache.skywalking.oap.server.core.analysis.worker.TopNProcess; +import org.apache.skywalking.oap.server.core.annotation.AnnotationListener; +import org.apache.skywalking.oap.server.library.module.ModuleManager; + +/** + * @author wusheng + */ +public class TopNTypeListener implements AnnotationListener { + + private final ModuleManager moduleManager; + + public TopNTypeListener(ModuleManager moduleManager) { + this.moduleManager = moduleManager; + } + + @Override public Class annotation() { + return TopNType.class; + } + + @Override public void notify(Class aClass) { + TopNProcess.INSTANCE.create(moduleManager, aClass); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNProcess.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNProcess.java new file mode 100644 index 0000000000..059c46ea6c --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNProcess.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.worker; + +import java.util.*; +import lombok.Getter; +import org.apache.skywalking.oap.server.core.UnexpectedException; +import org.apache.skywalking.oap.server.core.analysis.manual.database.TopNDatabaseStatement; +import org.apache.skywalking.oap.server.core.analysis.record.Record; +import org.apache.skywalking.oap.server.core.analysis.topn.TopN; +import org.apache.skywalking.oap.server.core.storage.*; +import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils; +import org.apache.skywalking.oap.server.core.worker.*; +import org.apache.skywalking.oap.server.library.module.ModuleManager; + +/** + * TopN is a special process, which hold a certain size of windows, + * and cache all top N records, save to the persistence in low frequence. + * + * @author wusheng + */ +public enum TopNProcess { + INSTANCE; + + @Getter private List persistentWorkers = new ArrayList<>(); + private Map, TopNWorker> workers = new HashMap<>(); + + public void create(ModuleManager moduleManager, Class topNClass) { + String modelName = StorageEntityAnnotationUtils.getModelName(topNClass); + Class builderClass = StorageEntityAnnotationUtils.getBuilder(topNClass); + + StorageDAO storageDAO = moduleManager.find(StorageModule.NAME).provider().getService(StorageDAO.class); + IRecordDAO recordDAO; + try { + recordDAO = storageDAO.newRecordDao(builderClass.newInstance()); + } catch (InstantiationException | IllegalAccessException e) { + throw new UnexpectedException(""); + } + + TopNWorker persistentWorker = new TopNWorker(WorkerIdGenerator.INSTANCES.generate(), modelName, moduleManager, + 50, recordDAO); + WorkerInstances.INSTANCES.put(persistentWorker.getWorkerId(), persistentWorker); + persistentWorkers.add(persistentWorker); + workers.put(topNClass, persistentWorker); + } + + public void in(TopNDatabaseStatement statement) { + workers.get(statement.getClass()).in(statement); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java new file mode 100644 index 0000000000..5d6304ef62 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNWorker.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.worker; + +import java.util.*; +import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; +import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; +import org.apache.skywalking.oap.server.core.analysis.data.LimitedSizeDataCache; +import org.apache.skywalking.oap.server.core.analysis.topn.TopN; +import org.apache.skywalking.oap.server.core.storage.IRecordDAO; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.slf4j.*; + +/** + * Top N worker is a persistence worker, but no + * + * @author wusheng + */ +public class TopNWorker extends PersistenceWorker> { + private static final Logger logger = LoggerFactory.getLogger(TopNWorker.class); + private final LimitedSizeDataCache limitedSizeDataCache; + private final IRecordDAO recordDAO; + private final String modelName; + private final DataCarrier dataCarrier; + private long reportCycle; + private volatile long lastReportTimestamp; + + public TopNWorker(int workerId, String modelName, ModuleManager moduleManager, + int topNSize, + IRecordDAO recordDAO) { + super(moduleManager, workerId, -1); + this.limitedSizeDataCache = new LimitedSizeDataCache<>(topNSize); + this.recordDAO = recordDAO; + this.modelName = modelName; + this.dataCarrier = new DataCarrier<>(1, 10000); + this.dataCarrier.consume(new TopNWorker.TopNConsumer(), 1); + this.lastReportTimestamp = System.currentTimeMillis(); + // Top N persistent only works per 10 minutes. + this.reportCycle = 10 * 60 * 1000L; + } + + @Override void onWork(TopN data) { + limitedSizeDataCache.writing(); + try { + limitedSizeDataCache.add(data); + } finally { + limitedSizeDataCache.finishWriting(); + } + } + + /** + * TopN is not following the batch size trigger mode. The memory cost of this worker is limited always. + * + * `onWork` method has been override, so this method would never be executed. No need to implement this method, + */ + @Override public void cacheData(TopN data) { + + } + + @Override public LimitedSizeDataCache getCache() { + return limitedSizeDataCache; + } + + /** + * The top N worker persistent cycle is much less than the others, override `flushAndSwitch` to extend the execute + * time windows. + * + * Switch and persistent attempt happens based on reportCycle. + * + * @return + */ + @Override public boolean flushAndSwitch() { + long now = System.currentTimeMillis(); + if (now - lastReportTimestamp <= reportCycle) { + return false; + } + lastReportTimestamp = now; + return super.flushAndSwitch(); + } + + @Override public List prepareBatch(LimitedSizeDataCache cache) { + List batchCollection = new LinkedList<>(); + cache.getLast().collection().forEach(record -> { + try { + batchCollection.add(recordDAO.prepareBatchInsert(modelName, record)); + } catch (Throwable t) { + logger.error(t.getMessage(), t); + } + }); + return batchCollection; + } + + @Override public void in(TopN n) { + dataCarrier.produce(n); + } + + private class TopNConsumer implements IConsumer { + @Override public void init() { + + } + + @Override public void consume(List data) { + /** + * TopN is not following the batch size trigger mode. + * No need to implement this method, the memory size is limited always. + */ + data.forEach(row -> onWork(row)); + } + + @Override public void onError(List data, Throwable t) { + logger.error(t.getMessage(), t); + } + + @Override public void onExit() { + + } + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopNRecordsQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopNRecordsQueryService.java new file mode 100644 index 0000000000..3d07aa5ea1 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TopNRecordsQueryService.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.query; + +import java.io.IOException; +import java.util.List; +import org.apache.skywalking.oap.server.core.query.entity.*; +import org.apache.skywalking.oap.server.core.storage.StorageModule; +import org.apache.skywalking.oap.server.core.storage.query.*; +import org.apache.skywalking.oap.server.library.module.*; +import org.apache.skywalking.oap.server.library.module.Service; + +/** + * @author wusheng + */ +public class TopNRecordsQueryService implements Service { + private final ModuleManager moduleManager; + private ITopNRecordsQueryDAO topNRecordsQueryDAO; + + public TopNRecordsQueryService(ModuleManager manager) { + this.moduleManager = manager; + } + + private ITopNRecordsQueryDAO getTopNRecordsQueryDAO() { + if (topNRecordsQueryDAO == null) { + this.topNRecordsQueryDAO = moduleManager.find(StorageModule.NAME).provider().getService(ITopNRecordsQueryDAO.class); + } + return topNRecordsQueryDAO; + } + + public List getTopNRecords(long startSecondTB, long endSecondTB, String metricName, int serviceId, + int topN, Order order) throws IOException { + return getTopNRecordsQueryDAO().getTopNRecords(startSecondTB, endSecondTB, metricName, serviceId, topN, order); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/TopNRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/TopNRecord.java new file mode 100644 index 0000000000..e8baa5d112 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/entity/TopNRecord.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.query.entity; + +import lombok.*; + +/** + * @author wusheng + */ +@Setter +@Getter +public class TopNRecord { + private String statement; + private long latency; + private String traceId; +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java new file mode 100644 index 0000000000..254a5e7c64 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.source; + +import lombok.*; +import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.core.source.annotation.SourceType; + +/** + * @author wusheng + */ +@SourceType +public class DatabaseSlowStatement extends Source { + @Getter @Setter private String id; + @Getter @Setter private int databaseServiceId; + @Getter @Setter private String statement; + @Getter @Setter private long latency; + @Getter @Setter private String traceId; + + @Override public Scope scope() { + return Scope.DatabaseSlowStatement; + } + + @Override public String getEntityId() { + return Const.EMPTY_STRING; + } + +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java index d162f3edb5..fd6629994c 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java @@ -24,7 +24,7 @@ package org.apache.skywalking.oap.server.core.source; public enum Scope { All, Service, ServiceInstance, Endpoint, ServiceRelation, ServiceInstanceRelation, EndpointRelation, NetworkAddress, ServiceInstanceJVMCPU, ServiceInstanceJVMMemory, ServiceInstanceJVMMemoryPool, ServiceInstanceJVMGC, - Segment, Alarm, ServiceInventory, ServiceInstanceInventory, EndpointInventory, DatabaseAccess; + Segment, Alarm, ServiceInventory, ServiceInstanceInventory, EndpointInventory, DatabaseAccess, DatabaseSlowStatement; public static Scope valueOf(int ordinal) { if (ordinal < 0 || ordinal >= values().length) { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ComparableStorageData.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ComparableStorageData.java new file mode 100644 index 0000000000..8e1a639105 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/ComparableStorageData.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.storage; + +/** + * Storage data with comparable capability. + * + * @author wusheng + */ +public interface ComparableStorageData extends StorageData, Comparable { +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java index 367bd9e424..9bdbfd19d6 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java @@ -84,6 +84,7 @@ public enum PersistenceTimer { List persistenceWorkers = new ArrayList<>(); persistenceWorkers.addAll(IndicatorProcess.INSTANCE.getPersistentWorkers()); persistenceWorkers.addAll(RecordProcess.INSTANCE.getPersistentWorkers()); + persistenceWorkers.addAll(TopNProcess.INSTANCE.getPersistentWorkers()); persistenceWorkers.forEach(worker -> { if (logger.isDebugEnabled()) { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java index 02a896e424..60729125a1 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java @@ -39,6 +39,7 @@ public class StorageModule extends ModuleDefine { IHistoryDeleteDAO.class, IServiceInventoryCacheDAO.class, IServiceInstanceInventoryCacheDAO.class, IEndpointInventoryCacheDAO.class, INetworkAddressInventoryCacheDAO.class, - ITopologyQueryDAO.class, IMetricQueryDAO.class, ITraceQueryDAO.class, IMetadataQueryDAO.class, IAggregationQueryDAO.class, IAlarmQueryDAO.class}; + ITopologyQueryDAO.class, IMetricQueryDAO.class, ITraceQueryDAO.class, IMetadataQueryDAO.class, IAggregationQueryDAO.class, IAlarmQueryDAO.class, + ITopNRecordsQueryDAO.class}; } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopNRecordsQueryDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopNRecordsQueryDAO.java new file mode 100644 index 0000000000..4b20e53e28 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ITopNRecordsQueryDAO.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.storage.query; + +import java.io.IOException; +import java.util.List; +import org.apache.skywalking.oap.server.core.query.entity.*; +import org.apache.skywalking.oap.server.library.module.Service; + +public interface ITopNRecordsQueryDAO extends Service { + List getTopNRecords(long startSecondTB, long endSecondTB, String metricName, int serviceId, + int topN, Order order) throws IOException; +} diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollectionTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollectionTest.java new file mode 100644 index 0000000000..852c884124 --- /dev/null +++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeDataCollectionTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.data; + +import java.util.Objects; +import org.apache.skywalking.oap.server.core.storage.ComparableStorageData; +import org.junit.*; + +/** + * @author wusheng + */ +public class LimitedSizeDataCollectionTest { + @Test + public void testPut() { + LimitedSizeDataCollection collection = new LimitedSizeDataCollection<>(5); + collection.put(new MockStorageData(1)); + collection.put(new MockStorageData(3)); + collection.put(new MockStorageData(5)); + collection.put(new MockStorageData(7)); + collection.put(new MockStorageData(9)); + + MockStorageData income = new MockStorageData(4); + collection.put(income); + + int[] expected = new int[] {3, 4, 5, 7, 9}; + int i = 0; + for (MockStorageData data : collection.collection()) { + Assert.assertEquals(expected[i++], data.latency); + } + } + + private class MockStorageData implements ComparableStorageData { + private long latency; + + public MockStorageData(long latency) { + this.latency = latency; + } + + @Override public int compareTo(Object o) { + MockStorageData target = (MockStorageData)o; + return (int)(latency - target.latency); + } + + @Override public String id() { + return null; + } + + @Override public boolean equals(Object o) { + return true; + } + + @Override public int hashCode() { + return Objects.hash(1); + } + } +} diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/GraphQLQueryProvider.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/GraphQLQueryProvider.java index c834dafecd..d940141d0f 100644 --- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/GraphQLQueryProvider.java +++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/GraphQLQueryProvider.java @@ -66,6 +66,8 @@ public class GraphQLQueryProvider extends ModuleProvider { .resolvers(new AggregationQuery(getManager())) .file("query-protocol/alarm.graphqls") .resolvers(new AlarmQuery(getManager())) + .file("query-protocol/top-n-records.graphqls") + .resolvers(new TopNRecordsQuery(getManager())) .build() .makeExecutableSchema(); this.graphQL = GraphQL.newGraphQL(schema).build(); diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopNRecordsQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopNRecordsQuery.java new file mode 100644 index 0000000000..63545210cf --- /dev/null +++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/TopNRecordsQuery.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.query.graphql.resolver; + +import com.coxautodev.graphql.tools.GraphQLQueryResolver; +import java.io.IOException; +import java.util.List; +import org.apache.skywalking.oap.query.graphql.type.TopNRecordsCondition; +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.query.*; +import org.apache.skywalking.oap.server.core.query.entity.*; +import org.apache.skywalking.oap.server.library.module.ModuleManager; + +/** + * @author wusheng + */ +public class TopNRecordsQuery implements GraphQLQueryResolver { + private final ModuleManager moduleManager; + private TopNRecordsQueryService topNRecordsQueryService; + + public TopNRecordsQuery(ModuleManager moduleManager) { + this.moduleManager = moduleManager; + } + + private TopNRecordsQueryService getTopNRecordsQueryService() { + if (topNRecordsQueryService == null) { + this.topNRecordsQueryService = moduleManager.find(CoreModule.NAME).provider().getService(TopNRecordsQueryService.class); + } + return topNRecordsQueryService; + } + + public List getTopNRecords(TopNRecordsCondition condition) throws IOException { + long startSecondTB = DurationUtils.INSTANCE.startTimeDurationToSecondTimeBucket(condition.getDuration().getStep(), condition.getDuration().getStart()); + long endSecondTB = DurationUtils.INSTANCE.endTimeDurationToSecondTimeBucket(condition.getDuration().getStep(), condition.getDuration().getEnd()); + + String metricName = condition.getMetricName(); + Order order = condition.getOrder(); + int topN = condition.getTopN(); + int serviceId = condition.getServiceId(); + + return getTopNRecordsQueryService().getTopNRecords(startSecondTB, endSecondTB, metricName, serviceId, topN, order); + } +} diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/TopNRecordsCondition.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/TopNRecordsCondition.java new file mode 100644 index 0000000000..d698f1a12c --- /dev/null +++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/type/TopNRecordsCondition.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.query.graphql.type; + +import lombok.*; +import org.apache.skywalking.oap.server.core.query.entity.Order; + +@Getter +@Setter +public class TopNRecordsCondition { + private int serviceId; + private String metricName; + private int topN; + private Order order; + private Duration duration; +} diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol index 85b81e2e34..6f11e3b829 160000 --- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol +++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol @@ -1 +1 @@ -Subproject commit 85b81e2e34efb0b670d039154feca336c9203700 +Subproject commit 6f11e3b829bba4d3532477e968291cf657f0ac0b diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/DBLatencyThresholds.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/DBLatencyThresholds.java new file mode 100644 index 0000000000..b66c8900a0 --- /dev/null +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/DBLatencyThresholds.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.trace.provider; + +import java.util.*; + +/** + * @author wusheng + */ +public class DBLatencyThresholds { + private Map thresholds; + + DBLatencyThresholds(String config) { + thresholds = new HashMap<>(); + String[] settings = config.split(","); + for (String setting : settings) { + String[] typeValue = setting.split(":"); + if (typeValue.length == 2) { + thresholds.put(typeValue[0].toLowerCase(), Integer.parseInt(typeValue[1])); + } + } + if (!thresholds.containsKey("default")) { + thresholds.put("default", 10000); + } + } + + public int getThreshold(String type) { + type = type.toLowerCase(); + if (thresholds.containsKey(type)) { + return thresholds.get(type); + } else { + return thresholds.get("default"); + } + } +} diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java index baa027ffbc..367702dcc9 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java @@ -20,22 +20,13 @@ package org.apache.skywalking.oap.server.receiver.trace.provider; import java.io.IOException; import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister; -import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister; -import org.apache.skywalking.oap.server.library.module.ModuleConfig; -import org.apache.skywalking.oap.server.library.module.ModuleDefine; -import org.apache.skywalking.oap.server.library.module.ModuleProvider; -import org.apache.skywalking.oap.server.library.module.ModuleStartException; -import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; +import org.apache.skywalking.oap.server.core.server.*; +import org.apache.skywalking.oap.server.library.module.*; import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule; import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v5.grpc.TraceSegmentServiceHandler; import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v5.rest.TraceSegmentServletHandler; import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v6.grpc.TraceSegmentReportServiceHandler; -import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserService; -import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParse; -import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParseV2; -import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParserListenerManager; -import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParserServiceImpl; +import org.apache.skywalking.oap.server.receiver.trace.provider.parser.*; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.endpoint.MultiScopesSpanListener; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.segment.SegmentSpanListener; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.service.ServiceMappingSpanListener; @@ -68,19 +59,21 @@ public class TraceModuleProvider extends ModuleProvider { } @Override public void prepare() throws ServiceNotProvidedException { + moduleConfig.setDbLatencyThresholds(new DBLatencyThresholds(moduleConfig.getSlowDBAccessThreshold())); + SegmentParserListenerManager listenerManager = new SegmentParserListenerManager(); listenerManager.add(new MultiScopesSpanListener.Factory()); listenerManager.add(new ServiceMappingSpanListener.Factory()); listenerManager.add(new SegmentSpanListener.Factory(moduleConfig.getSampleRate())); - segmentProducer = new SegmentParse.Producer(getManager(), listenerManager); + segmentProducer = new SegmentParse.Producer(getManager(), listenerManager, moduleConfig); listenerManager = new SegmentParserListenerManager(); listenerManager.add(new MultiScopesSpanListener.Factory()); listenerManager.add(new ServiceMappingSpanListener.Factory()); listenerManager.add(new SegmentSpanListener.Factory(moduleConfig.getSampleRate())); - segmentProducerV2 = new SegmentParseV2.Producer(getManager(), listenerManager); + segmentProducerV2 = new SegmentParseV2.Producer(getManager(), listenerManager, moduleConfig); this.registerServiceImplementation(ISegmentParserService.class, new SegmentParserServiceImpl(segmentProducerV2)); } diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java index 5a3695386d..8ebeec613c 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceServiceModuleConfig.java @@ -30,8 +30,13 @@ public class TraceServiceModuleConfig extends ModuleConfig { @Setter @Getter private int bufferDataMaxFileSize; @Setter @Getter private boolean bufferFileCleanWhenRestart; /** - * The sample rate precision is 1/10000. - * 10000 means 100% sample in default. + * The sample rate precision is 1/10000. 10000 means 100% sample in default. */ @Setter @Getter private int sampleRate = 10000; + + /** + * The threshold used to check the slow database access. Unit, millisecond. + */ + @Setter @Getter private String slowDBAccessThreshold = "default:200"; + @Setter @Getter private DBLatencyThresholds dbLatencyThresholds; } diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java index 0a4bd69417..7abe9e5922 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java @@ -25,6 +25,7 @@ import org.apache.skywalking.apm.network.language.agent.*; import org.apache.skywalking.oap.server.library.buffer.*; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.util.TimeBucketUtils; +import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.*; @@ -43,12 +44,14 @@ public class SegmentParse { private final List spanListeners; private final SegmentParserListenerManager listenerManager; private final SegmentCoreInfo segmentCoreInfo; + private final TraceServiceModuleConfig config; @Setter private SegmentStandardizationWorker standardizationWorker; private volatile static CounterMetric TRACE_BUFFER_FILE_RETRY; private volatile static CounterMetric TRACE_BUFFER_FILE_OUT; private volatile static CounterMetric TRACE_PARSE_ERROR; - private SegmentParse(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) { + private SegmentParse(ModuleManager moduleManager, SegmentParserListenerManager listenerManager, + TraceServiceModuleConfig config) { this.moduleManager = moduleManager; this.listenerManager = listenerManager; this.spanListeners = new LinkedList<>(); @@ -56,6 +59,7 @@ public class SegmentParse { this.segmentCoreInfo.setStartTime(Long.MAX_VALUE); this.segmentCoreInfo.setEndTime(Long.MIN_VALUE); this.segmentCoreInfo.setV2(false); + this.config = config; MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class); TRACE_BUFFER_FILE_RETRY = metricCreator.createCounter("v5_trace_buffer_file_retry", "The number of retry trace segment from the buffer file, but haven't registered successfully.", @@ -239,7 +243,7 @@ public class SegmentParse { } private void createSpanListeners() { - listenerManager.getSpanListenerFactories().forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager))); + listenerManager.getSpanListenerFactories().forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager, config))); } public enum Source { @@ -251,20 +255,23 @@ public class SegmentParse { @Setter private SegmentStandardizationWorker standardizationWorker; private final ModuleManager moduleManager; private final SegmentParserListenerManager listenerManager; + private final TraceServiceModuleConfig config; - public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) { + public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager, + TraceServiceModuleConfig config) { this.moduleManager = moduleManager; this.listenerManager = listenerManager; + this.config = config; } public void send(UpstreamSegment segment, Source source) { - SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager); + SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager, config); segmentParse.setStandardizationWorker(standardizationWorker); segmentParse.parse(new BufferData<>(segment), source); } @Override public boolean call(BufferData bufferData) { - SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager); + SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager, config); segmentParse.setStandardizationWorker(standardizationWorker); boolean parseResult = segmentParse.parse(bufferData, Source.Buffer); if (parseResult) { diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java index dd28ba4639..543b0eb1a1 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java @@ -26,6 +26,7 @@ import org.apache.skywalking.apm.network.language.agent.v2.SegmentObject; import org.apache.skywalking.oap.server.library.buffer.*; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.util.TimeBucketUtils; +import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.*; @@ -46,12 +47,13 @@ public class SegmentParseV2 { private final List spanListeners; private final SegmentParserListenerManager listenerManager; private final SegmentCoreInfo segmentCoreInfo; + private final TraceServiceModuleConfig config; @Setter private SegmentStandardizationWorker standardizationWorker; private volatile static CounterMetric TRACE_BUFFER_FILE_RETRY; private volatile static CounterMetric TRACE_BUFFER_FILE_OUT; private volatile static CounterMetric TRACE_PARSE_ERROR; - private SegmentParseV2(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) { + private SegmentParseV2(ModuleManager moduleManager, SegmentParserListenerManager listenerManager, TraceServiceModuleConfig config) { this.moduleManager = moduleManager; this.listenerManager = listenerManager; this.spanListeners = new LinkedList<>(); @@ -59,6 +61,7 @@ public class SegmentParseV2 { this.segmentCoreInfo.setStartTime(Long.MAX_VALUE); this.segmentCoreInfo.setEndTime(Long.MIN_VALUE); this.segmentCoreInfo.setV2(true); + this.config = config; if (TRACE_BUFFER_FILE_RETRY == null) { MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class); @@ -245,7 +248,7 @@ public class SegmentParseV2 { } private void createSpanListeners() { - listenerManager.getSpanListenerFactories().forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager))); + listenerManager.getSpanListenerFactories().forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager, config))); } public static class Producer implements DataStreamReader.CallBack { @@ -253,20 +256,22 @@ public class SegmentParseV2 { @Setter private SegmentStandardizationWorker standardizationWorker; private final ModuleManager moduleManager; private final SegmentParserListenerManager listenerManager; + private final TraceServiceModuleConfig config; - public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) { + public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager, TraceServiceModuleConfig config) { this.moduleManager = moduleManager; this.listenerManager = listenerManager; + this.config = config; } public void send(UpstreamSegment segment, SegmentSource source) { - SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager); + SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager, config); segmentParse.setStandardizationWorker(standardizationWorker); segmentParse.parse(new BufferData<>(segment), source); } @Override public boolean call(BufferData bufferData) { - SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager); + SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager, config); segmentParse.setStandardizationWorker(standardizationWorker); boolean parseResult = segmentParse.parse(bufferData, SegmentSource.Buffer); if (parseResult) { diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SpanTags.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SpanTags.java new file mode 100644 index 0000000000..0f9fef0ae5 --- /dev/null +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SpanTags.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.trace.provider.parser; + +public class SpanTags { + public static final String DB_STATEMENT = "db.statement"; + + public static final String DB_TYPE = "db.type"; +} diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java index fd8d09f213..9b4afd6aa2 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java @@ -19,10 +19,11 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener; import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig; /** * @author peng-yongsheng */ public interface SpanListenerFactory { - SpanListener create(ModuleManager moduleManager); + SpanListener create(ModuleManager moduleManager, TraceServiceModuleConfig config); } diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java index 590d71ea3a..0330e775a9 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java @@ -18,28 +18,19 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.endpoint; -import java.util.LinkedList; -import java.util.List; +import java.util.*; +import org.apache.skywalking.apm.network.common.KeyStringValuePair; import org.apache.skywalking.apm.network.language.agent.SpanLayer; -import org.apache.skywalking.oap.server.core.Const; -import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache; -import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache; -import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache; -import org.apache.skywalking.oap.server.core.source.DetectPoint; -import org.apache.skywalking.oap.server.core.source.EndpointRelation; -import org.apache.skywalking.oap.server.core.source.RequestType; -import org.apache.skywalking.oap.server.core.source.SourceReceiver; +import org.apache.skywalking.oap.server.core.*; +import org.apache.skywalking.oap.server.core.cache.*; +import org.apache.skywalking.oap.server.core.source.*; import org.apache.skywalking.oap.server.library.module.ModuleManager; -import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.ReferenceDecorator; -import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo; -import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator; -import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.EntrySpanListener; -import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.ExitSpanListener; -import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListener; -import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListenerFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.skywalking.oap.server.library.util.TimeBucketUtils; +import org.apache.skywalking.oap.server.receiver.trace.provider.*; +import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SpanTags; +import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*; +import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*; +import org.slf4j.*; import static java.util.Objects.nonNull; @@ -63,16 +54,20 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe private final List entrySourceBuilders; private final List exitSourceBuilders; + private final List slowDatabaseAccesses; + private final TraceServiceModuleConfig config; private SpanDecorator entrySpanDecorator; private long minuteTimeBucket; - private MultiScopesSpanListener(ModuleManager moduleManager) { + private MultiScopesSpanListener(ModuleManager moduleManager, TraceServiceModuleConfig config) { this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class); this.entrySourceBuilders = new LinkedList<>(); this.exitSourceBuilders = new LinkedList<>(); + this.slowDatabaseAccesses = new ArrayList<>(10); this.instanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class); this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class); this.endpointInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(EndpointInventoryCache.class); + this.config = config; } @Override public boolean containsPoint(Point point) { @@ -152,6 +147,34 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe sourceBuilder.setComponentId(spanDecorator.getComponentId()); setPublicAttrs(sourceBuilder, spanDecorator); exitSourceBuilders.add(sourceBuilder); + + if (sourceBuilder.getType().equals(RequestType.DATABASE)) { + boolean isSlowDBAccess = false; + + DatabaseSlowStatement statement = new DatabaseSlowStatement(); + statement.setId(segmentCoreInfo.getSegmentId() + "-" + spanDecorator.getSpanId()); + statement.setDatabaseServiceId(sourceBuilder.getDestServiceId()); + statement.setLatency(sourceBuilder.getLatency()); + statement.setTimeBucket(TimeBucketUtils.INSTANCE.getSecondTimeBucket(segmentCoreInfo.getStartTime())); + statement.setTraceId(segmentCoreInfo.getSegmentId()); + for (KeyStringValuePair tag : spanDecorator.getAllTags()) { + if (SpanTags.DB_STATEMENT.equals(tag.getKey())) { + statement.setStatement(tag.getValue()); + + } else if (SpanTags.DB_TYPE.equals(tag.getKey())) { + String dbType = tag.getValue(); + DBLatencyThresholds thresholds = config.getDbLatencyThresholds(); + int threshold = thresholds.getThreshold(dbType); + if (sourceBuilder.getLatency() > threshold) { + isSlowDBAccess = true; + } + } + } + + if (isSlowDBAccess) { + slowDatabaseAccesses.add(statement); + } + } } private void setPublicAttrs(SourceBuilder sourceBuilder, SpanDecorator spanDecorator) { @@ -215,13 +238,15 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe sourceReceiver.receive(exitSourceBuilder.toDatabaseAccess()); } }); + + slowDatabaseAccesses.forEach(sourceReceiver::receive); } public static class Factory implements SpanListenerFactory { @Override - public SpanListener create(ModuleManager moduleManager) { - return new MultiScopesSpanListener(moduleManager); + public SpanListener create(ModuleManager moduleManager, TraceServiceModuleConfig config) { + return new MultiScopesSpanListener(moduleManager, config); } } } diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java index 9add272442..644d2d20b8 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java @@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.core.source.SourceReceiver; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.util.BooleanUtils; import org.apache.skywalking.oap.server.library.util.TimeBucketUtils; +import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.EntrySpanListener; @@ -144,7 +145,7 @@ public class SegmentSpanListener implements FirstSpanListener, EntrySpanListener this.sampler = new TraceSegmentSampler(segmentSamplingRate); } - @Override public SpanListener create(ModuleManager moduleManager) { + @Override public SpanListener create(ModuleManager moduleManager, TraceServiceModuleConfig config) { return new SegmentSpanListener(moduleManager, sampler); } } diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java index 9e14954f94..338a1cd82a 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java @@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache; import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister; import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*; import org.slf4j.*; @@ -81,7 +82,7 @@ public class ServiceMappingSpanListener implements EntrySpanListener { public static class Factory implements SpanListenerFactory { - @Override public SpanListener create(ModuleManager moduleManager) { + @Override public SpanListener create(ModuleManager moduleManager, TraceServiceModuleConfig config) { return new ServiceMappingSpanListener(moduleManager); } } diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceBMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceBMock.java index 344e0490f6..9496815521 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceBMock.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/ServiceBMock.java @@ -113,9 +113,11 @@ class ServiceBMock { span.setSpanLayer(SpanLayer.Database); span.setParentSpanId(0); span.setStartTime(startTimestamp + 550); - span.setEndTime(startTimestamp + 1000); + span.setEndTime(startTimestamp + 1500); span.setComponentId(ComponentsDefine.MONGO_DRIVER.getId()); span.setIsError(true); + span.addTags(KeyWithStringValue.newBuilder().setKey("db.statement").setValue("select * from database where complex = 1;").build()); + span.addTags(KeyWithStringValue.newBuilder().setKey("db.type").setValue("mongodb").build()); if (isPrepare) { span.setOperationName("mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]"); diff --git a/oap-server/server-starter/src/main/assembly/application.yml b/oap-server/server-starter/src/main/assembly/application.yml index 746de629b0..840777d66a 100644 --- a/oap-server/server-starter/src/main/assembly/application.yml +++ b/oap-server/server-starter/src/main/assembly/application.yml @@ -75,6 +75,7 @@ receiver-trace: bufferDataMaxFileSize: ${SW_RECEIVER_BUFFER_DATA_MAX_FILE_SIZE:500} # Unit is MB bufferFileCleanWhenRestart: ${SW_RECEIVER_BUFFER_FILE_CLEAN_WHEN_RESTART:false} sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is 1/10000. 10000 means 100% sample in default. + slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,redis:50} # The slow database access thresholds. Unit ms. receiver-jvm: default: #service-mesh: diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml index 5b53d73a06..28c7b97196 100644 --- a/oap-server/server-starter/src/main/resources/application.yml +++ b/oap-server/server-starter/src/main/resources/application.yml @@ -75,6 +75,7 @@ receiver-trace: bufferDataMaxFileSize: ${SW_RECEIVER_BUFFER_DATA_MAX_FILE_SIZE:500} # Unit is MB bufferFileCleanWhenRestart: ${SW_RECEIVER_BUFFER_FILE_CLEAN_WHEN_RESTART:false} sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is 1/10000. 10000 means 100% sample in default. + slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,redis:50} # The slow database access thresholds. Unit ms. receiver-jvm: default: service-mesh: diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java index 8f31dbaa50..cf10758db1 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java @@ -80,6 +80,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider { this.registerServiceImplementation(IMetadataQueryDAO.class, new MetadataQueryEsDAO(elasticSearchClient)); this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQueryEsDAO(elasticSearchClient)); this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEsDAO(elasticSearchClient)); + this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQueryEsDAO(elasticSearchClient)); } @Override diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopNRecordsQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopNRecordsQueryEsDAO.java new file mode 100644 index 0000000000..d5f23d5d6e --- /dev/null +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/TopNRecordsQueryEsDAO.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query; + +import java.io.IOException; +import java.util.*; +import org.apache.skywalking.oap.server.core.analysis.topn.TopN; +import org.apache.skywalking.oap.server.core.query.entity.*; +import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO; +import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; +import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.index.query.*; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.SortOrder; + +/** + * @author wusheng + */ +public class TopNRecordsQueryEsDAO extends EsDAO implements ITopNRecordsQueryDAO { + public TopNRecordsQueryEsDAO(ElasticSearchClient client) { + super(client); + } + + @Override + public List getTopNRecords(long startSecondTB, long endSecondTB, String metricName, int serviceId, + int topN, Order order) throws IOException { + SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource(); + BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); + boolQueryBuilder.must().add(QueryBuilders.rangeQuery(TopN.TIME_BUCKET).gte(startSecondTB).lte(endSecondTB)); + boolQueryBuilder.must().add(QueryBuilders.termQuery(TopN.SERVICE_ID, serviceId)); + + sourceBuilder.query(boolQueryBuilder); + sourceBuilder.size(topN).sort(TopN.LATENCY, order.equals(Order.DES) ? SortOrder.DESC : SortOrder.ASC); + SearchResponse response = getClient().search(metricName, sourceBuilder); + + List results = new ArrayList<>(); + + for (SearchHit searchHit : response.getHits().getHits()) { + TopNRecord record = new TopNRecord(); + record.setStatement((String)searchHit.getSourceAsMap().get(TopN.STATEMENT)); + record.setTraceId((String)searchHit.getSourceAsMap().get(TopN.TRACE_ID)); + record.setLatency(((Number)searchHit.getSourceAsMap().get(TopN.LATENCY)).longValue()); + results.add(record); + } + + return results; + } +} diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java index a76fb49a7d..addd6a0ff3 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java @@ -86,6 +86,7 @@ public class H2StorageProvider extends ModuleProvider { this.registerServiceImplementation(IAggregationQueryDAO.class, new H2AggregationQueryDAO(h2Client)); this.registerServiceImplementation(IAlarmQueryDAO.class, new H2AlarmQueryDAO(h2Client)); this.registerServiceImplementation(IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(h2Client)); + this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new H2TopNRecordsQueryDAO(h2Client)); } @Override public void start() throws ServiceNotProvidedException, ModuleStartException { diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TopNRecordsQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TopNRecordsQueryDAO.java new file mode 100644 index 0000000000..2ab22317d3 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TopNRecordsQueryDAO.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao; + +import java.io.IOException; +import java.sql.*; +import java.util.*; +import org.apache.skywalking.oap.server.core.analysis.topn.TopN; +import org.apache.skywalking.oap.server.core.query.entity.*; +import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO; +import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient; + +/** + * @author wusheng + */ +public class H2TopNRecordsQueryDAO implements ITopNRecordsQueryDAO { + private JDBCHikariCPClient h2Client; + + public H2TopNRecordsQueryDAO(JDBCHikariCPClient h2Client) { + this.h2Client = h2Client; + } + + @Override + public List getTopNRecords(long startSecondTB, long endSecondTB, String metricName, int serviceId, + int topN, Order order) throws IOException { + StringBuilder sql = new StringBuilder("select * from " + metricName + " where "); + List parameters = new ArrayList<>(10); + + sql.append(" service_id = ? "); + parameters.add(serviceId); + + sql.append(" and ").append(TopN.TIME_BUCKET).append(" >= ?"); + parameters.add(startSecondTB); + sql.append(" and ").append(TopN.TIME_BUCKET).append(" <= ?"); + parameters.add(endSecondTB); + + sql.append(" order by ").append(TopN.LATENCY); + if (order.equals(Order.DES)) { + sql.append(" desc "); + } else { + sql.append(" asc "); + } + sql.append(" limit ").append(topN); + + List results = new ArrayList<>(); + try (Connection connection = h2Client.getConnection()) { + try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), parameters.toArray(new Object[0]))) { + while (resultSet.next()) { + TopNRecord record = new TopNRecord(); + record.setStatement(resultSet.getString(TopN.STATEMENT)); + record.setTraceId(resultSet.getString(TopN.TRACE_ID)); + record.setLatency(resultSet.getLong(TopN.LATENCY)); + results.add(record); + } + } + } catch (SQLException e) { + throw new IOException(e); + } + + return results; + } +} diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java index fda7d311f3..6f2ac9439f 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java @@ -92,6 +92,7 @@ public class MySQLStorageProvider extends ModuleProvider { this.registerServiceImplementation(IAggregationQueryDAO.class, new MySQLAggregationQueryDAO(mysqlClient)); this.registerServiceImplementation(IAlarmQueryDAO.class, new MySQLAlarmQueryDAO(mysqlClient)); this.registerServiceImplementation(IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(mysqlClient)); + this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new H2TopNRecordsQueryDAO(mysqlClient)); } @Override public void start() throws ServiceNotProvidedException, ModuleStartException { diff --git a/skywalking-ui b/skywalking-ui index d5e46c075c..ed64821de4 160000 --- a/skywalking-ui +++ b/skywalking-ui @@ -1 +1 @@ -Subproject commit d5e46c075cb30e3de42aeaaa9e839a758d3fc029 +Subproject commit ed64821de4fe1e524ec069660d18d7efb1b1061f -- GitLab