提交 4babd6ff 编写于 作者: wu-sheng's avatar wu-sheng 提交者: 彭勇升 pengys

Support Top sql (#2239)

* The prototype of topN sql worker.

* Add scope and new manual dispatcher.

* no message

* Finish topN persistence codes. Not test yet. And query have not added.

* Finish the top n database statement persistent.

* Support different slow thresholds for different db types.

* Fix wrong db default threshold

* Finish new query protocol binding.

* Provide query empty implementation and sync ui.

* Finish all codes, hope it works :P
I will run the tests tonight.

* Fix

* Fix

* Fix a startup issue.

* Add time bucket to persistent.

* Fix wrong query result column name.

* Fix Database query.

* Fix checkstyle.

* Fix wrong order logic, and add a test case to verify, to fix https://github.com/apache/incubator-skywalking/pull/2239#discussion_r255948249
上级 4b15c713
......@@ -75,6 +75,7 @@ receiver-trace:
bufferDataMaxFileSize: ${SW_RECEIVER_BUFFER_DATA_MAX_FILE_SIZE:500} # Unit is MB
bufferFileCleanWhenRestart: ${SW_RECEIVER_BUFFER_FILE_CLEAN_WHEN_RESTART:false}
sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is 1/10000. 10000 means 100% sample in default.
slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,redis:50} # The slow database access thresholds. Unit ms.
receiver-jvm:
default:
service-mesh:
......
......@@ -65,6 +65,7 @@ public class CoreModule extends ModuleDefine {
classes.add(MetadataQueryService.class);
classes.add(AggregationQueryService.class);
classes.add(AlarmQueryService.class);
classes.add(TopNRecordsQueryService.class);
}
private void addServerInterface(List<Class> classes) {
......
......@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.core;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorTypeListener;
import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordTypeListener;
import org.apache.skywalking.oap.server.core.analysis.topn.annotation.TopNTypeListener;
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.cluster.*;
......@@ -130,12 +131,14 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(MetadataQueryService.class, new MetadataQueryService(getManager()));
this.registerServiceImplementation(AggregationQueryService.class, new AggregationQueryService(getManager()));
this.registerServiceImplementation(AlarmQueryService.class, new AlarmQueryService(getManager()));
this.registerServiceImplementation(TopNRecordsQueryService.class, new TopNRecordsQueryService(getManager()));
annotationScan.registerListener(storageAnnotationListener);
annotationScan.registerListener(streamAnnotationListener);
annotationScan.registerListener(new IndicatorTypeListener(getManager()));
annotationScan.registerListener(new InventoryTypeListener(getManager()));
annotationScan.registerListener(new RecordTypeListener(getManager()));
annotationScan.registerListener(new TopNTypeListener(getManager()));
this.remoteClientManager = new RemoteClientManager(getManager());
this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager);
......
......@@ -47,6 +47,10 @@ public class DispatcherManager {
}
public void forward(Source source) {
if (source == null) {
return;
}
for (SourceDispatcher dispatcher : dispatcherMap.get(source.scope())) {
dispatcher.dispatch(source);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.data;
import org.apache.skywalking.oap.server.core.storage.*;
public class LimitedSizeDataCache<STORAGE_DATA extends ComparableStorageData> extends Window<STORAGE_DATA> implements DataCache {
private SWCollection<STORAGE_DATA> limitedSizeDataCollection;
private final int limitSize;
public LimitedSizeDataCache(int limitSize) {
super(false);
this.limitSize = limitSize;
init();
}
@Override public SWCollection<STORAGE_DATA> collectionInstance() {
return new LimitedSizeDataCollection<>(limitSize);
}
public void add(STORAGE_DATA data) {
limitedSizeDataCollection.put(data);
}
@Override public void writing() {
limitedSizeDataCollection = getCurrentAndWriting();
}
@Override public void finishWriting() {
limitedSizeDataCollection.finishWriting();
limitedSizeDataCollection = null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.data;
import java.util.*;
import org.apache.skywalking.oap.server.core.storage.ComparableStorageData;
public class LimitedSizeDataCollection<STORAGE_DATA extends ComparableStorageData> implements SWCollection<STORAGE_DATA> {
private final HashMap<STORAGE_DATA, LinkedList<STORAGE_DATA>> data;
private final int limitedSize;
private volatile boolean writing;
private volatile boolean reading;
LimitedSizeDataCollection(int limitedSize) {
this.data = new HashMap<>();
this.writing = false;
this.reading = false;
this.limitedSize = limitedSize;
}
public void finishWriting() {
writing = false;
}
@Override public void writing() {
writing = true;
}
@Override public boolean isWriting() {
return writing;
}
@Override public void finishReading() {
reading = false;
}
@Override public void reading() {
reading = true;
}
@Override public boolean isReading() {
return reading;
}
@Override public int size() {
return data.size();
}
@Override public void clear() {
data.clear();
}
@Override public boolean containsKey(STORAGE_DATA key) {
throw new UnsupportedOperationException("Limited size data collection doesn't support containsKey operation.");
}
@Override public STORAGE_DATA get(STORAGE_DATA key) {
throw new UnsupportedOperationException("Limited size data collection doesn't support get operation.");
}
@Override public void put(STORAGE_DATA value) {
LinkedList<STORAGE_DATA> storageDataList = this.data.get(value);
if (storageDataList == null) {
storageDataList = new LinkedList<>();
data.put(value, storageDataList);
}
if (storageDataList.size() < limitedSize) {
storageDataList.add(value);
return;
}
for (int i = 0; i < storageDataList.size(); i++) {
STORAGE_DATA storageData = storageDataList.get(i);
if (value.compareTo(storageData) <= 0) {
if (i == 0) {
// input value is less than the smallest in top N list, ignore
} else {
// Remove the smallest in top N list
// add the current value into the right position
storageDataList.add(i, value);
storageDataList.removeFirst();
}
return;
}
}
// Add the value as biggest in top N list
storageDataList.addLast(value);
storageDataList.removeFirst();
}
@Override public Collection<STORAGE_DATA> collection() {
List<STORAGE_DATA> collection = new ArrayList<>();
data.values().forEach(e -> e.forEach(collection::add));
return collection;
}
}
......@@ -69,11 +69,11 @@ public class NonMergeDataCollection<STORAGE_DATA extends StorageData> implements
}
@Override public boolean containsKey(STORAGE_DATA key) {
throw new UnsupportedOperationException("Close merge data collection not support containsKey operation.");
throw new UnsupportedOperationException("Non merge data collection doesn't support containsKey operation.");
}
@Override public STORAGE_DATA get(STORAGE_DATA key) {
throw new UnsupportedOperationException("Close merge data collection not support get operation.");
throw new UnsupportedOperationException("Non merge data collection doesn't support get operation.");
}
@Override public void put(STORAGE_DATA value) {
......
......@@ -33,6 +33,16 @@ public abstract class Window<DATA> {
private SWCollection<DATA> windowDataB;
Window() {
this(true);
}
Window(boolean autoInit) {
if (autoInit) {
init();
}
}
protected void init() {
this.windowDataA = collectionInstance();
this.windowDataB = collectionInstance();
this.pointer = windowDataA;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.manual.database;
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.worker.TopNProcess;
import org.apache.skywalking.oap.server.core.source.DatabaseSlowStatement;
/**
* @author wusheng
*/
public class DatabaseStatementDispatcher implements SourceDispatcher<DatabaseSlowStatement> {
@Override public void dispatch(DatabaseSlowStatement source) {
TopNDatabaseStatement statement = new TopNDatabaseStatement();
statement.setId(source.getId());
statement.setServiceId(source.getDatabaseServiceId());
statement.setLatency(source.getLatency());
statement.setStatement(source.getStatement());
statement.setTimeBucket(source.getTimeBucket());
statement.setTraceId(source.getTraceId());
TopNProcess.INSTANCE.in(statement);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.manual.database;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.analysis.topn.annotation.TopNType;
import org.apache.skywalking.oap.server.core.source.Scope;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
/**
* Database TopN statement, including Database SQL statement, mongoDB and Redis commands.
*
* @author wusheng
*/
@TopNType
@StorageEntity(name = TopNDatabaseStatement.INDEX_NAME, builder = TopNDatabaseStatement.Builder.class, source = Scope.DatabaseSlowStatement)
public class TopNDatabaseStatement extends TopN {
public static final String INDEX_NAME = "top_n_database_statement";
@Setter private String id;
@Override public String id() {
return id;
}
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
TopNDatabaseStatement statement = (TopNDatabaseStatement)o;
return getServiceId() == statement.getServiceId();
}
@Override public int hashCode() {
return Objects.hash(getServiceId());
}
public static class Builder implements StorageBuilder<TopNDatabaseStatement> {
@Override public TopNDatabaseStatement map2Data(Map<String, Object> dbMap) {
TopNDatabaseStatement statement = new TopNDatabaseStatement();
statement.setStatement((String)dbMap.get(STATEMENT));
statement.setTraceId((String)dbMap.get(TRACE_ID));
statement.setLatency(((Number)dbMap.get(LATENCY)).longValue());
statement.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue());
statement.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
return statement;
}
@Override public Map<String, Object> data2Map(TopNDatabaseStatement storageData) {
Map<String, Object> map = new HashMap<>();
map.put(STATEMENT, storageData.getStatement());
map.put(TRACE_ID, storageData.getTraceId());
map.put(LATENCY, storageData.getLatency());
map.put(SERVICE_ID, storageData.getServiceId());
map.put(TIME_BUCKET, storageData.getTimeBucket());
return map;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.topn;
import lombok.*;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.ComparableStorageData;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
/**
* TopN data.
*
* @author wusheng
*/
public abstract class TopN extends Record implements ComparableStorageData {
public static final String STATEMENT = "statement";
public static final String LATENCY = "latency";
public static final String TRACE_ID = "trace_id";
public static final String SERVICE_ID = "service_id";
@Getter @Setter @Column(columnName = STATEMENT) private String statement;
@Getter @Setter @Column(columnName = LATENCY) private long latency;
@Getter @Setter @Column(columnName = TRACE_ID) private String traceId;
@Getter @Setter @Column(columnName = SERVICE_ID) private int serviceId;
@Override public int compareTo(Object o) {
TopN target = (TopN)o;
return (int)(latency - target.latency);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.topn.annotation;
import java.lang.annotation.*;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface TopNType {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.topn.annotation;
import java.lang.annotation.Annotation;
import org.apache.skywalking.oap.server.core.analysis.worker.TopNProcess;
import org.apache.skywalking.oap.server.core.annotation.AnnotationListener;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* @author wusheng
*/
public class TopNTypeListener implements AnnotationListener {
private final ModuleManager moduleManager;
public TopNTypeListener(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
@Override public Class<? extends Annotation> annotation() {
return TopNType.class;
}
@Override public void notify(Class aClass) {
TopNProcess.INSTANCE.create(moduleManager, aClass);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.manual.database.TopNDatabaseStatement;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
import org.apache.skywalking.oap.server.core.worker.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* TopN is a special process, which hold a certain size of windows,
* and cache all top N records, save to the persistence in low frequence.
*
* @author wusheng
*/
public enum TopNProcess {
INSTANCE;
@Getter private List<TopNWorker> persistentWorkers = new ArrayList<>();
private Map<Class<? extends Record>, TopNWorker> workers = new HashMap<>();
public void create(ModuleManager moduleManager, Class<? extends TopN> topNClass) {
String modelName = StorageEntityAnnotationUtils.getModelName(topNClass);
Class<? extends StorageBuilder> builderClass = StorageEntityAnnotationUtils.getBuilder(topNClass);
StorageDAO storageDAO = moduleManager.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IRecordDAO recordDAO;
try {
recordDAO = storageDAO.newRecordDao(builderClass.newInstance());
} catch (InstantiationException | IllegalAccessException e) {
throw new UnexpectedException("");
}
TopNWorker persistentWorker = new TopNWorker(WorkerIdGenerator.INSTANCES.generate(), modelName, moduleManager,
50, recordDAO);
WorkerInstances.INSTANCES.put(persistentWorker.getWorkerId(), persistentWorker);
persistentWorkers.add(persistentWorker);
workers.put(topNClass, persistentWorker);
}
public void in(TopNDatabaseStatement statement) {
workers.get(statement.getClass()).in(statement);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.analysis.data.LimitedSizeDataCache;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.*;
/**
* Top N worker is a persistence worker, but no
*
* @author wusheng
*/
public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<TopN>> {
private static final Logger logger = LoggerFactory.getLogger(TopNWorker.class);
private final LimitedSizeDataCache<TopN> limitedSizeDataCache;
private final IRecordDAO recordDAO;
private final String modelName;
private final DataCarrier<TopN> dataCarrier;
private long reportCycle;
private volatile long lastReportTimestamp;
public TopNWorker(int workerId, String modelName, ModuleManager moduleManager,
int topNSize,
IRecordDAO recordDAO) {
super(moduleManager, workerId, -1);
this.limitedSizeDataCache = new LimitedSizeDataCache<>(topNSize);
this.recordDAO = recordDAO;
this.modelName = modelName;
this.dataCarrier = new DataCarrier<>(1, 10000);
this.dataCarrier.consume(new TopNWorker.TopNConsumer(), 1);
this.lastReportTimestamp = System.currentTimeMillis();
// Top N persistent only works per 10 minutes.
this.reportCycle = 10 * 60 * 1000L;
}
@Override void onWork(TopN data) {
limitedSizeDataCache.writing();
try {
limitedSizeDataCache.add(data);
} finally {
limitedSizeDataCache.finishWriting();
}
}
/**
* TopN is not following the batch size trigger mode. The memory cost of this worker is limited always.
*
* `onWork` method has been override, so this method would never be executed. No need to implement this method,
*/
@Override public void cacheData(TopN data) {
}
@Override public LimitedSizeDataCache<TopN> getCache() {
return limitedSizeDataCache;
}
/**
* The top N worker persistent cycle is much less than the others, override `flushAndSwitch` to extend the execute
* time windows.
*
* Switch and persistent attempt happens based on reportCycle.
*
* @return
*/
@Override public boolean flushAndSwitch() {
long now = System.currentTimeMillis();
if (now - lastReportTimestamp <= reportCycle) {
return false;
}
lastReportTimestamp = now;
return super.flushAndSwitch();
}
@Override public List<Object> prepareBatch(LimitedSizeDataCache<TopN> cache) {
List<Object> batchCollection = new LinkedList<>();
cache.getLast().collection().forEach(record -> {
try {
batchCollection.add(recordDAO.prepareBatchInsert(modelName, record));
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
});
return batchCollection;
}
@Override public void in(TopN n) {
dataCarrier.produce(n);
}
private class TopNConsumer implements IConsumer<TopN> {
@Override public void init() {
}
@Override public void consume(List<TopN> data) {
/**
* TopN is not following the batch size trigger mode.
* No need to implement this method, the memory size is limited always.
*/
data.forEach(row -> onWork(row));
}
@Override public void onError(List<TopN> data, Throwable t) {
logger.error(t.getMessage(), t);
}
@Override public void onExit() {
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.query;
import java.io.IOException;
import java.util.List;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.query.*;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* @author wusheng
*/
public class TopNRecordsQueryService implements Service {
private final ModuleManager moduleManager;
private ITopNRecordsQueryDAO topNRecordsQueryDAO;
public TopNRecordsQueryService(ModuleManager manager) {
this.moduleManager = manager;
}
private ITopNRecordsQueryDAO getTopNRecordsQueryDAO() {
if (topNRecordsQueryDAO == null) {
this.topNRecordsQueryDAO = moduleManager.find(StorageModule.NAME).provider().getService(ITopNRecordsQueryDAO.class);
}
return topNRecordsQueryDAO;
}
public List<TopNRecord> getTopNRecords(long startSecondTB, long endSecondTB, String metricName, int serviceId,
int topN, Order order) throws IOException {
return getTopNRecordsQueryDAO().getTopNRecords(startSecondTB, endSecondTB, metricName, serviceId, topN, order);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.query.entity;
import lombok.*;
/**
* @author wusheng
*/
@Setter
@Getter
public class TopNRecord {
private String statement;
private long latency;
private String traceId;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.source;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.source.annotation.SourceType;
/**
* @author wusheng
*/
@SourceType
public class DatabaseSlowStatement extends Source {
@Getter @Setter private String id;
@Getter @Setter private int databaseServiceId;
@Getter @Setter private String statement;
@Getter @Setter private long latency;
@Getter @Setter private String traceId;
@Override public Scope scope() {
return Scope.DatabaseSlowStatement;
}
@Override public String getEntityId() {
return Const.EMPTY_STRING;
}
}
......@@ -24,7 +24,7 @@ package org.apache.skywalking.oap.server.core.source;
public enum Scope {
All, Service, ServiceInstance, Endpoint, ServiceRelation, ServiceInstanceRelation, EndpointRelation, NetworkAddress,
ServiceInstanceJVMCPU, ServiceInstanceJVMMemory, ServiceInstanceJVMMemoryPool, ServiceInstanceJVMGC,
Segment, Alarm, ServiceInventory, ServiceInstanceInventory, EndpointInventory, DatabaseAccess;
Segment, Alarm, ServiceInventory, ServiceInstanceInventory, EndpointInventory, DatabaseAccess, DatabaseSlowStatement;
public static Scope valueOf(int ordinal) {
if (ordinal < 0 || ordinal >= values().length) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage;
/**
* Storage data with comparable capability.
*
* @author wusheng
*/
public interface ComparableStorageData extends StorageData, Comparable {
}
......@@ -84,6 +84,7 @@ public enum PersistenceTimer {
List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
persistenceWorkers.addAll(IndicatorProcess.INSTANCE.getPersistentWorkers());
persistenceWorkers.addAll(RecordProcess.INSTANCE.getPersistentWorkers());
persistenceWorkers.addAll(TopNProcess.INSTANCE.getPersistentWorkers());
persistenceWorkers.forEach(worker -> {
if (logger.isDebugEnabled()) {
......
......@@ -39,6 +39,7 @@ public class StorageModule extends ModuleDefine {
IHistoryDeleteDAO.class,
IServiceInventoryCacheDAO.class, IServiceInstanceInventoryCacheDAO.class,
IEndpointInventoryCacheDAO.class, INetworkAddressInventoryCacheDAO.class,
ITopologyQueryDAO.class, IMetricQueryDAO.class, ITraceQueryDAO.class, IMetadataQueryDAO.class, IAggregationQueryDAO.class, IAlarmQueryDAO.class};
ITopologyQueryDAO.class, IMetricQueryDAO.class, ITraceQueryDAO.class, IMetadataQueryDAO.class, IAggregationQueryDAO.class, IAlarmQueryDAO.class,
ITopNRecordsQueryDAO.class};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage.query;
import java.io.IOException;
import java.util.List;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.library.module.Service;
public interface ITopNRecordsQueryDAO extends Service {
List<TopNRecord> getTopNRecords(long startSecondTB, long endSecondTB, String metricName, int serviceId,
int topN, Order order) throws IOException;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.data;
import java.util.Objects;
import org.apache.skywalking.oap.server.core.storage.ComparableStorageData;
import org.junit.*;
/**
* @author wusheng
*/
public class LimitedSizeDataCollectionTest {
@Test
public void testPut() {
LimitedSizeDataCollection<MockStorageData> collection = new LimitedSizeDataCollection<>(5);
collection.put(new MockStorageData(1));
collection.put(new MockStorageData(3));
collection.put(new MockStorageData(5));
collection.put(new MockStorageData(7));
collection.put(new MockStorageData(9));
MockStorageData income = new MockStorageData(4);
collection.put(income);
int[] expected = new int[] {3, 4, 5, 7, 9};
int i = 0;
for (MockStorageData data : collection.collection()) {
Assert.assertEquals(expected[i++], data.latency);
}
}
private class MockStorageData implements ComparableStorageData {
private long latency;
public MockStorageData(long latency) {
this.latency = latency;
}
@Override public int compareTo(Object o) {
MockStorageData target = (MockStorageData)o;
return (int)(latency - target.latency);
}
@Override public String id() {
return null;
}
@Override public boolean equals(Object o) {
return true;
}
@Override public int hashCode() {
return Objects.hash(1);
}
}
}
......@@ -66,6 +66,8 @@ public class GraphQLQueryProvider extends ModuleProvider {
.resolvers(new AggregationQuery(getManager()))
.file("query-protocol/alarm.graphqls")
.resolvers(new AlarmQuery(getManager()))
.file("query-protocol/top-n-records.graphqls")
.resolvers(new TopNRecordsQuery(getManager()))
.build()
.makeExecutableSchema();
this.graphQL = GraphQL.newGraphQL(schema).build();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.query.graphql.resolver;
import com.coxautodev.graphql.tools.GraphQLQueryResolver;
import java.io.IOException;
import java.util.List;
import org.apache.skywalking.oap.query.graphql.type.TopNRecordsCondition;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.query.*;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* @author wusheng
*/
public class TopNRecordsQuery implements GraphQLQueryResolver {
private final ModuleManager moduleManager;
private TopNRecordsQueryService topNRecordsQueryService;
public TopNRecordsQuery(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
private TopNRecordsQueryService getTopNRecordsQueryService() {
if (topNRecordsQueryService == null) {
this.topNRecordsQueryService = moduleManager.find(CoreModule.NAME).provider().getService(TopNRecordsQueryService.class);
}
return topNRecordsQueryService;
}
public List<TopNRecord> getTopNRecords(TopNRecordsCondition condition) throws IOException {
long startSecondTB = DurationUtils.INSTANCE.startTimeDurationToSecondTimeBucket(condition.getDuration().getStep(), condition.getDuration().getStart());
long endSecondTB = DurationUtils.INSTANCE.endTimeDurationToSecondTimeBucket(condition.getDuration().getStep(), condition.getDuration().getEnd());
String metricName = condition.getMetricName();
Order order = condition.getOrder();
int topN = condition.getTopN();
int serviceId = condition.getServiceId();
return getTopNRecordsQueryService().getTopNRecords(startSecondTB, endSecondTB, metricName, serviceId, topN, order);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.query.graphql.type;
import lombok.*;
import org.apache.skywalking.oap.server.core.query.entity.Order;
@Getter
@Setter
public class TopNRecordsCondition {
private int serviceId;
private String metricName;
private int topN;
private Order order;
private Duration duration;
}
Subproject commit 85b81e2e34efb0b670d039154feca336c9203700
Subproject commit 6f11e3b829bba4d3532477e968291cf657f0ac0b
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider;
import java.util.*;
/**
* @author wusheng
*/
public class DBLatencyThresholds {
private Map<String, Integer> thresholds;
DBLatencyThresholds(String config) {
thresholds = new HashMap<>();
String[] settings = config.split(",");
for (String setting : settings) {
String[] typeValue = setting.split(":");
if (typeValue.length == 2) {
thresholds.put(typeValue[0].toLowerCase(), Integer.parseInt(typeValue[1]));
}
}
if (!thresholds.containsKey("default")) {
thresholds.put("default", 10000);
}
}
public int getThreshold(String type) {
type = type.toLowerCase();
if (thresholds.containsKey(type)) {
return thresholds.get(type);
} else {
return thresholds.get("default");
}
}
}
......@@ -20,22 +20,13 @@ package org.apache.skywalking.oap.server.receiver.trace.provider;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v5.grpc.TraceSegmentServiceHandler;
import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v5.rest.TraceSegmentServletHandler;
import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v6.grpc.TraceSegmentReportServiceHandler;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserService;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParse;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParseV2;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParserListenerManager;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParserServiceImpl;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.endpoint.MultiScopesSpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.segment.SegmentSpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.service.ServiceMappingSpanListener;
......@@ -68,19 +59,21 @@ public class TraceModuleProvider extends ModuleProvider {
}
@Override public void prepare() throws ServiceNotProvidedException {
moduleConfig.setDbLatencyThresholds(new DBLatencyThresholds(moduleConfig.getSlowDBAccessThreshold()));
SegmentParserListenerManager listenerManager = new SegmentParserListenerManager();
listenerManager.add(new MultiScopesSpanListener.Factory());
listenerManager.add(new ServiceMappingSpanListener.Factory());
listenerManager.add(new SegmentSpanListener.Factory(moduleConfig.getSampleRate()));
segmentProducer = new SegmentParse.Producer(getManager(), listenerManager);
segmentProducer = new SegmentParse.Producer(getManager(), listenerManager, moduleConfig);
listenerManager = new SegmentParserListenerManager();
listenerManager.add(new MultiScopesSpanListener.Factory());
listenerManager.add(new ServiceMappingSpanListener.Factory());
listenerManager.add(new SegmentSpanListener.Factory(moduleConfig.getSampleRate()));
segmentProducerV2 = new SegmentParseV2.Producer(getManager(), listenerManager);
segmentProducerV2 = new SegmentParseV2.Producer(getManager(), listenerManager, moduleConfig);
this.registerServiceImplementation(ISegmentParserService.class, new SegmentParserServiceImpl(segmentProducerV2));
}
......
......@@ -30,8 +30,13 @@ public class TraceServiceModuleConfig extends ModuleConfig {
@Setter @Getter private int bufferDataMaxFileSize;
@Setter @Getter private boolean bufferFileCleanWhenRestart;
/**
* The sample rate precision is 1/10000.
* 10000 means 100% sample in default.
* The sample rate precision is 1/10000. 10000 means 100% sample in default.
*/
@Setter @Getter private int sampleRate = 10000;
/**
* The threshold used to check the slow database access. Unit, millisecond.
*/
@Setter @Getter private String slowDBAccessThreshold = "default:200";
@Setter @Getter private DBLatencyThresholds dbLatencyThresholds;
}
......@@ -25,6 +25,7 @@ import org.apache.skywalking.apm.network.language.agent.*;
import org.apache.skywalking.oap.server.library.buffer.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.*;
......@@ -43,12 +44,14 @@ public class SegmentParse {
private final List<SpanListener> spanListeners;
private final SegmentParserListenerManager listenerManager;
private final SegmentCoreInfo segmentCoreInfo;
private final TraceServiceModuleConfig config;
@Setter private SegmentStandardizationWorker standardizationWorker;
private volatile static CounterMetric TRACE_BUFFER_FILE_RETRY;
private volatile static CounterMetric TRACE_BUFFER_FILE_OUT;
private volatile static CounterMetric TRACE_PARSE_ERROR;
private SegmentParse(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
private SegmentParse(ModuleManager moduleManager, SegmentParserListenerManager listenerManager,
TraceServiceModuleConfig config) {
this.moduleManager = moduleManager;
this.listenerManager = listenerManager;
this.spanListeners = new LinkedList<>();
......@@ -56,6 +59,7 @@ public class SegmentParse {
this.segmentCoreInfo.setStartTime(Long.MAX_VALUE);
this.segmentCoreInfo.setEndTime(Long.MIN_VALUE);
this.segmentCoreInfo.setV2(false);
this.config = config;
MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
TRACE_BUFFER_FILE_RETRY = metricCreator.createCounter("v5_trace_buffer_file_retry", "The number of retry trace segment from the buffer file, but haven't registered successfully.",
......@@ -239,7 +243,7 @@ public class SegmentParse {
}
private void createSpanListeners() {
listenerManager.getSpanListenerFactories().forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager)));
listenerManager.getSpanListenerFactories().forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager, config)));
}
public enum Source {
......@@ -251,20 +255,23 @@ public class SegmentParse {
@Setter private SegmentStandardizationWorker standardizationWorker;
private final ModuleManager moduleManager;
private final SegmentParserListenerManager listenerManager;
private final TraceServiceModuleConfig config;
public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager,
TraceServiceModuleConfig config) {
this.moduleManager = moduleManager;
this.listenerManager = listenerManager;
this.config = config;
}
public void send(UpstreamSegment segment, Source source) {
SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager);
SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager, config);
segmentParse.setStandardizationWorker(standardizationWorker);
segmentParse.parse(new BufferData<>(segment), source);
}
@Override public boolean call(BufferData<UpstreamSegment> bufferData) {
SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager);
SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager, config);
segmentParse.setStandardizationWorker(standardizationWorker);
boolean parseResult = segmentParse.parse(bufferData, Source.Buffer);
if (parseResult) {
......
......@@ -26,6 +26,7 @@ import org.apache.skywalking.apm.network.language.agent.v2.SegmentObject;
import org.apache.skywalking.oap.server.library.buffer.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.*;
......@@ -46,12 +47,13 @@ public class SegmentParseV2 {
private final List<SpanListener> spanListeners;
private final SegmentParserListenerManager listenerManager;
private final SegmentCoreInfo segmentCoreInfo;
private final TraceServiceModuleConfig config;
@Setter private SegmentStandardizationWorker standardizationWorker;
private volatile static CounterMetric TRACE_BUFFER_FILE_RETRY;
private volatile static CounterMetric TRACE_BUFFER_FILE_OUT;
private volatile static CounterMetric TRACE_PARSE_ERROR;
private SegmentParseV2(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
private SegmentParseV2(ModuleManager moduleManager, SegmentParserListenerManager listenerManager, TraceServiceModuleConfig config) {
this.moduleManager = moduleManager;
this.listenerManager = listenerManager;
this.spanListeners = new LinkedList<>();
......@@ -59,6 +61,7 @@ public class SegmentParseV2 {
this.segmentCoreInfo.setStartTime(Long.MAX_VALUE);
this.segmentCoreInfo.setEndTime(Long.MIN_VALUE);
this.segmentCoreInfo.setV2(true);
this.config = config;
if (TRACE_BUFFER_FILE_RETRY == null) {
MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
......@@ -245,7 +248,7 @@ public class SegmentParseV2 {
}
private void createSpanListeners() {
listenerManager.getSpanListenerFactories().forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager)));
listenerManager.getSpanListenerFactories().forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager, config)));
}
public static class Producer implements DataStreamReader.CallBack<UpstreamSegment> {
......@@ -253,20 +256,22 @@ public class SegmentParseV2 {
@Setter private SegmentStandardizationWorker standardizationWorker;
private final ModuleManager moduleManager;
private final SegmentParserListenerManager listenerManager;
private final TraceServiceModuleConfig config;
public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager, TraceServiceModuleConfig config) {
this.moduleManager = moduleManager;
this.listenerManager = listenerManager;
this.config = config;
}
public void send(UpstreamSegment segment, SegmentSource source) {
SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager);
SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager, config);
segmentParse.setStandardizationWorker(standardizationWorker);
segmentParse.parse(new BufferData<>(segment), source);
}
@Override public boolean call(BufferData<UpstreamSegment> bufferData) {
SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager);
SegmentParseV2 segmentParse = new SegmentParseV2(moduleManager, listenerManager, config);
segmentParse.setStandardizationWorker(standardizationWorker);
boolean parseResult = segmentParse.parse(bufferData, SegmentSource.Buffer);
if (parseResult) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
public class SpanTags {
public static final String DB_STATEMENT = "db.statement";
public static final String DB_TYPE = "db.type";
}
......@@ -19,10 +19,11 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
/**
* @author peng-yongsheng
*/
public interface SpanListenerFactory {
SpanListener create(ModuleManager moduleManager);
SpanListener create(ModuleManager moduleManager, TraceServiceModuleConfig config);
}
......@@ -18,28 +18,19 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.endpoint;
import java.util.LinkedList;
import java.util.List;
import java.util.*;
import org.apache.skywalking.apm.network.common.KeyStringValuePair;
import org.apache.skywalking.apm.network.language.agent.SpanLayer;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
import org.apache.skywalking.oap.server.core.source.DetectPoint;
import org.apache.skywalking.oap.server.core.source.EndpointRelation;
import org.apache.skywalking.oap.server.core.source.RequestType;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.ReferenceDecorator;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.EntrySpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.ExitSpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListenerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
import org.apache.skywalking.oap.server.receiver.trace.provider.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SpanTags;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
import org.slf4j.*;
import static java.util.Objects.nonNull;
......@@ -63,16 +54,20 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
private final List<SourceBuilder> entrySourceBuilders;
private final List<SourceBuilder> exitSourceBuilders;
private final List<DatabaseSlowStatement> slowDatabaseAccesses;
private final TraceServiceModuleConfig config;
private SpanDecorator entrySpanDecorator;
private long minuteTimeBucket;
private MultiScopesSpanListener(ModuleManager moduleManager) {
private MultiScopesSpanListener(ModuleManager moduleManager, TraceServiceModuleConfig config) {
this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
this.entrySourceBuilders = new LinkedList<>();
this.exitSourceBuilders = new LinkedList<>();
this.slowDatabaseAccesses = new ArrayList<>(10);
this.instanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class);
this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class);
this.endpointInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(EndpointInventoryCache.class);
this.config = config;
}
@Override public boolean containsPoint(Point point) {
......@@ -152,6 +147,34 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
sourceBuilder.setComponentId(spanDecorator.getComponentId());
setPublicAttrs(sourceBuilder, spanDecorator);
exitSourceBuilders.add(sourceBuilder);
if (sourceBuilder.getType().equals(RequestType.DATABASE)) {
boolean isSlowDBAccess = false;
DatabaseSlowStatement statement = new DatabaseSlowStatement();
statement.setId(segmentCoreInfo.getSegmentId() + "-" + spanDecorator.getSpanId());
statement.setDatabaseServiceId(sourceBuilder.getDestServiceId());
statement.setLatency(sourceBuilder.getLatency());
statement.setTimeBucket(TimeBucketUtils.INSTANCE.getSecondTimeBucket(segmentCoreInfo.getStartTime()));
statement.setTraceId(segmentCoreInfo.getSegmentId());
for (KeyStringValuePair tag : spanDecorator.getAllTags()) {
if (SpanTags.DB_STATEMENT.equals(tag.getKey())) {
statement.setStatement(tag.getValue());
} else if (SpanTags.DB_TYPE.equals(tag.getKey())) {
String dbType = tag.getValue();
DBLatencyThresholds thresholds = config.getDbLatencyThresholds();
int threshold = thresholds.getThreshold(dbType);
if (sourceBuilder.getLatency() > threshold) {
isSlowDBAccess = true;
}
}
}
if (isSlowDBAccess) {
slowDatabaseAccesses.add(statement);
}
}
}
private void setPublicAttrs(SourceBuilder sourceBuilder, SpanDecorator spanDecorator) {
......@@ -215,13 +238,15 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
sourceReceiver.receive(exitSourceBuilder.toDatabaseAccess());
}
});
slowDatabaseAccesses.forEach(sourceReceiver::receive);
}
public static class Factory implements SpanListenerFactory {
@Override
public SpanListener create(ModuleManager moduleManager) {
return new MultiScopesSpanListener(moduleManager);
public SpanListener create(ModuleManager moduleManager, TraceServiceModuleConfig config) {
return new MultiScopesSpanListener(moduleManager, config);
}
}
}
......@@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.EntrySpanListener;
......@@ -144,7 +145,7 @@ public class SegmentSpanListener implements FirstSpanListener, EntrySpanListener
this.sampler = new TraceSegmentSampler(segmentSamplingRate);
}
@Override public SpanListener create(ModuleManager moduleManager) {
@Override public SpanListener create(ModuleManager moduleManager, TraceServiceModuleConfig config) {
return new SegmentSpanListener(moduleManager, sampler);
}
}
......
......@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
import org.slf4j.*;
......@@ -81,7 +82,7 @@ public class ServiceMappingSpanListener implements EntrySpanListener {
public static class Factory implements SpanListenerFactory {
@Override public SpanListener create(ModuleManager moduleManager) {
@Override public SpanListener create(ModuleManager moduleManager, TraceServiceModuleConfig config) {
return new ServiceMappingSpanListener(moduleManager);
}
}
......
......@@ -113,9 +113,11 @@ class ServiceBMock {
span.setSpanLayer(SpanLayer.Database);
span.setParentSpanId(0);
span.setStartTime(startTimestamp + 550);
span.setEndTime(startTimestamp + 1000);
span.setEndTime(startTimestamp + 1500);
span.setComponentId(ComponentsDefine.MONGO_DRIVER.getId());
span.setIsError(true);
span.addTags(KeyWithStringValue.newBuilder().setKey("db.statement").setValue("select * from database where complex = 1;").build());
span.addTags(KeyWithStringValue.newBuilder().setKey("db.type").setValue("mongodb").build());
if (isPrepare) {
span.setOperationName("mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]");
......
......@@ -75,6 +75,7 @@ receiver-trace:
bufferDataMaxFileSize: ${SW_RECEIVER_BUFFER_DATA_MAX_FILE_SIZE:500} # Unit is MB
bufferFileCleanWhenRestart: ${SW_RECEIVER_BUFFER_FILE_CLEAN_WHEN_RESTART:false}
sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is 1/10000. 10000 means 100% sample in default.
slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,redis:50} # The slow database access thresholds. Unit ms.
receiver-jvm:
default:
#service-mesh:
......
......@@ -75,6 +75,7 @@ receiver-trace:
bufferDataMaxFileSize: ${SW_RECEIVER_BUFFER_DATA_MAX_FILE_SIZE:500} # Unit is MB
bufferFileCleanWhenRestart: ${SW_RECEIVER_BUFFER_FILE_CLEAN_WHEN_RESTART:false}
sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is 1/10000. 10000 means 100% sample in default.
slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,redis:50} # The slow database access thresholds. Unit ms.
receiver-jvm:
default:
service-mesh:
......
......@@ -80,6 +80,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
this.registerServiceImplementation(IMetadataQueryDAO.class, new MetadataQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQueryEsDAO(elasticSearchClient));
}
@Override
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
import java.io.IOException;
import java.util.*;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
/**
* @author wusheng
*/
public class TopNRecordsQueryEsDAO extends EsDAO implements ITopNRecordsQueryDAO {
public TopNRecordsQueryEsDAO(ElasticSearchClient client) {
super(client);
}
@Override
public List<TopNRecord> getTopNRecords(long startSecondTB, long endSecondTB, String metricName, int serviceId,
int topN, Order order) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must().add(QueryBuilders.rangeQuery(TopN.TIME_BUCKET).gte(startSecondTB).lte(endSecondTB));
boolQueryBuilder.must().add(QueryBuilders.termQuery(TopN.SERVICE_ID, serviceId));
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.size(topN).sort(TopN.LATENCY, order.equals(Order.DES) ? SortOrder.DESC : SortOrder.ASC);
SearchResponse response = getClient().search(metricName, sourceBuilder);
List<TopNRecord> results = new ArrayList<>();
for (SearchHit searchHit : response.getHits().getHits()) {
TopNRecord record = new TopNRecord();
record.setStatement((String)searchHit.getSourceAsMap().get(TopN.STATEMENT));
record.setTraceId((String)searchHit.getSourceAsMap().get(TopN.TRACE_ID));
record.setLatency(((Number)searchHit.getSourceAsMap().get(TopN.LATENCY)).longValue());
results.add(record);
}
return results;
}
}
......@@ -86,6 +86,7 @@ public class H2StorageProvider extends ModuleProvider {
this.registerServiceImplementation(IAggregationQueryDAO.class, new H2AggregationQueryDAO(h2Client));
this.registerServiceImplementation(IAlarmQueryDAO.class, new H2AlarmQueryDAO(h2Client));
this.registerServiceImplementation(IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(h2Client));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new H2TopNRecordsQueryDAO(h2Client));
}
@Override public void start() throws ServiceNotProvidedException, ModuleStartException {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
import java.sql.*;
import java.util.*;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
/**
* @author wusheng
*/
public class H2TopNRecordsQueryDAO implements ITopNRecordsQueryDAO {
private JDBCHikariCPClient h2Client;
public H2TopNRecordsQueryDAO(JDBCHikariCPClient h2Client) {
this.h2Client = h2Client;
}
@Override
public List<TopNRecord> getTopNRecords(long startSecondTB, long endSecondTB, String metricName, int serviceId,
int topN, Order order) throws IOException {
StringBuilder sql = new StringBuilder("select * from " + metricName + " where ");
List<Object> parameters = new ArrayList<>(10);
sql.append(" service_id = ? ");
parameters.add(serviceId);
sql.append(" and ").append(TopN.TIME_BUCKET).append(" >= ?");
parameters.add(startSecondTB);
sql.append(" and ").append(TopN.TIME_BUCKET).append(" <= ?");
parameters.add(endSecondTB);
sql.append(" order by ").append(TopN.LATENCY);
if (order.equals(Order.DES)) {
sql.append(" desc ");
} else {
sql.append(" asc ");
}
sql.append(" limit ").append(topN);
List<TopNRecord> results = new ArrayList<>();
try (Connection connection = h2Client.getConnection()) {
try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), parameters.toArray(new Object[0]))) {
while (resultSet.next()) {
TopNRecord record = new TopNRecord();
record.setStatement(resultSet.getString(TopN.STATEMENT));
record.setTraceId(resultSet.getString(TopN.TRACE_ID));
record.setLatency(resultSet.getLong(TopN.LATENCY));
results.add(record);
}
}
} catch (SQLException e) {
throw new IOException(e);
}
return results;
}
}
......@@ -92,6 +92,7 @@ public class MySQLStorageProvider extends ModuleProvider {
this.registerServiceImplementation(IAggregationQueryDAO.class, new MySQLAggregationQueryDAO(mysqlClient));
this.registerServiceImplementation(IAlarmQueryDAO.class, new MySQLAlarmQueryDAO(mysqlClient));
this.registerServiceImplementation(IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(mysqlClient));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new H2TopNRecordsQueryDAO(mysqlClient));
}
@Override public void start() throws ServiceNotProvidedException, ModuleStartException {
......
Subproject commit d5e46c075cb30e3de42aeaaa9e839a758d3fc029
Subproject commit ed64821de4fe1e524ec069660d18d7efb1b1061f
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册