未验证 提交 7c6016b1 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Make Profile entities could be disable by OAL (#4354)

* Make Profile entities could be disable by OAL
上级 7ff9a0cf
......@@ -44,7 +44,8 @@ SRC_SERVICE_INSTANCE_CLR_GC: 'ServiceInstanceCLRGC';
SRC_SERVICE_INSTANCE_CLR_THREAD: 'ServiceInstanceCLRThread';
SRC_ENVOY_INSTANCE_METRIC: 'EnvoyInstanceMetric';
//hard code sources, only used when need to be deactived.
//hard code entities, only used when need to be deactived.
//Disable is targeting @Stream#name
SRC_SEGMENT: 'segment';
SRC_TOP_N_DB_STATEMENT: 'top_n_database_statement';
SRC_ENDPOINT_RELATION_SERVER_SIDE: 'endpoint_relation_server_side';
......@@ -55,6 +56,9 @@ SRC_HTTP_ACCESS_LOG: 'http_access_log';
SRC_ZIPKIN_SPAN: 'zipkin_span';
SRC_JAEGER_SPAN: 'jaeger_span';
SRC_PROFILE_TASK: 'profile_task';
SRC_PROFILE_TASK_LOG: 'profile_task_log';
SRC_PROFILE_THREAD_SHANPSHOT: 'profile_task_segment_snapshot';
// Literals
......
......@@ -59,7 +59,8 @@ source
disableSource
: SRC_SEGMENT | SRC_TOP_N_DB_STATEMENT | SRC_ENDPOINT_RELATION_SERVER_SIDE | SRC_SERVICE_RELATION_SERVER_SIDE |
SRC_SERVICE_RELATION_CLIENT_SIDE | SRC_ALARM_RECORD | SRC_HTTP_ACCESS_LOG | SRC_ZIPKIN_SPAN | SRC_JAEGER_SPAN
SRC_SERVICE_RELATION_CLIENT_SIDE | SRC_ALARM_RECORD | SRC_HTTP_ACCESS_LOG | SRC_ZIPKIN_SPAN | SRC_JAEGER_SPAN |
SRC_PROFILE_TASK | SRC_PROFILE_TASK_LOG | SRC_PROFILE_THREAD_SHANPSHOT
;
sourceAttribute
......
......@@ -102,3 +102,6 @@ envoy_parent_connections_used = from(EnvoyInstanceMetric.value).filter(metricNam
// disable(top_n_database_statement);
// disable(zipkin_span);
// disable(jaeger_span);
// disable(profile_task);
// disable(profile_task_log);
// disable(profile_task_segment_snapshot);
......@@ -64,23 +64,30 @@ public class ProfileTaskMutationService implements Service {
* @param maxSamplingCount max trace count on sniffer
* @return task create result
*/
public ProfileTaskCreationResult createTask(final int serviceId, final String endpointName,
final long monitorStartTime, final int monitorDuration, final int minDurationThreshold, final int dumpPeriod,
final int maxSamplingCount) throws IOException {
public ProfileTaskCreationResult createTask(final int serviceId,
final String endpointName,
final long monitorStartTime,
final int monitorDuration,
final int minDurationThreshold,
final int dumpPeriod,
final int maxSamplingCount) throws IOException {
// calculate task execute range
long taskStartTime = monitorStartTime > 0 ? monitorStartTime : System.currentTimeMillis();
long taskEndTime = taskStartTime + TimeUnit.MINUTES.toMillis(monitorDuration);
// check data
final String errorMessage = checkDataSuccess(serviceId, endpointName, taskStartTime, taskEndTime, monitorDuration, minDurationThreshold, dumpPeriod, maxSamplingCount);
final String errorMessage = checkDataSuccess(
serviceId, endpointName, taskStartTime, taskEndTime, monitorDuration, minDurationThreshold, dumpPeriod,
maxSamplingCount
);
if (errorMessage != null) {
return ProfileTaskCreationResult.builder().errorReason(errorMessage).build();
}
// create task
final long createTime = System.currentTimeMillis();
final ProfileTaskNoneStream task = new ProfileTaskNoneStream();
final ProfileTaskRecord task = new ProfileTaskRecord();
task.setServiceId(serviceId);
task.setEndpointName(endpointName.trim());
task.setStartTime(taskStartTime);
......@@ -95,9 +102,14 @@ public class ProfileTaskMutationService implements Service {
return ProfileTaskCreationResult.builder().id(task.id()).build();
}
private String checkDataSuccess(final Integer serviceId, final String endpointName, final long monitorStartTime,
final long monitorEndTime, final int monitorDuration, final int minDurationThreshold, final int dumpPeriod,
final int maxSamplingCount) throws IOException {
private String checkDataSuccess(final Integer serviceId,
final String endpointName,
final long monitorStartTime,
final long monitorEndTime,
final int monitorDuration,
final int minDurationThreshold,
final int dumpPeriod,
final int maxSamplingCount) throws IOException {
// basic check
if (serviceId == null) {
return "service cannot be null";
......@@ -131,7 +143,8 @@ public class ProfileTaskMutationService implements Service {
// Each service can monitor up to 1 endpoints during the execution of tasks
long startTimeBucket = TimeBucket.getTimeBucket(monitorStartTime, Downsampling.Second);
long endTimeBucket = TimeBucket.getTimeBucket(monitorEndTime, Downsampling.Second);
final List<ProfileTask> alreadyHaveTaskList = getProfileTaskDAO().getTaskList(serviceId, null, startTimeBucket, endTimeBucket, 1);
final List<ProfileTask> alreadyHaveTaskList = getProfileTaskDAO().getTaskList(
serviceId, null, startTimeBucket, endTimeBucket, 1);
if (CollectionUtils.isNotEmpty(alreadyHaveTaskList)) {
// if any task time bucket in this range, means already have task, because time bucket is base on task end time
return "current service already has monitor task execute at this time";
......
......@@ -38,8 +38,8 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PR
@Getter
@Setter
@ScopeDeclaration(id = PROFILE_TASK, name = "ProfileTask")
@Stream(name = ProfileTaskNoneStream.INDEX_NAME, scopeId = PROFILE_TASK, builder = ProfileTaskNoneStream.Builder.class, processor = NoneStreamingProcessor.class)
public class ProfileTaskNoneStream extends NoneStream {
@Stream(name = ProfileTaskRecord.INDEX_NAME, scopeId = PROFILE_TASK, builder = ProfileTaskRecord.Builder.class, processor = NoneStreamingProcessor.class)
public class ProfileTaskRecord extends NoneStream {
public static final String INDEX_NAME = "profile_task";
public static final String SERVICE_ID = "service_id";
......@@ -73,11 +73,11 @@ public class ProfileTaskNoneStream extends NoneStream {
@Column(columnName = MAX_SAMPLING_COUNT)
private int maxSamplingCount;
public static class Builder implements StorageBuilder<ProfileTaskNoneStream> {
public static class Builder implements StorageBuilder<ProfileTaskRecord> {
@Override
public ProfileTaskNoneStream map2Data(Map<String, Object> dbMap) {
final ProfileTaskNoneStream record = new ProfileTaskNoneStream();
public ProfileTaskRecord map2Data(Map<String, Object> dbMap) {
final ProfileTaskRecord record = new ProfileTaskRecord();
record.setServiceId(((Number) dbMap.get(SERVICE_ID)).intValue());
record.setEndpointName((String) dbMap.get(ENDPOINT_NAME));
record.setStartTime(((Number) dbMap.get(START_TIME)).longValue());
......@@ -91,7 +91,7 @@ public class ProfileTaskNoneStream extends NoneStream {
}
@Override
public Map<String, Object> data2Map(ProfileTaskNoneStream storageData) {
public Map<String, Object> data2Map(ProfileTaskRecord storageData) {
final HashMap<String, Object> map = new HashMap<>();
map.put(SERVICE_ID, storageData.getServiceId());
map.put(ENDPOINT_NAME, storageData.getEndpointName());
......
......@@ -18,8 +18,11 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.profile.ProfileTaskNoneStream;
import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
import org.apache.skywalking.oap.server.core.query.entity.ProfileTask;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
......@@ -31,10 +34,6 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
public class ProfileTaskQueryEsDAO extends EsDAO implements IProfileTaskQueryDAO {
private final int queryMaxSize;
......@@ -46,27 +45,27 @@ public class ProfileTaskQueryEsDAO extends EsDAO implements IProfileTaskQueryDAO
@Override
public List<ProfileTask> getTaskList(Integer serviceId, String endpointName, Long startTimeBucket,
Long endTimeBucket, Integer limit) throws IOException {
Long endTimeBucket, Integer limit) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
final BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
sourceBuilder.query(boolQueryBuilder);
if (serviceId != null) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(ProfileTaskNoneStream.SERVICE_ID, serviceId));
boolQueryBuilder.must().add(QueryBuilders.termQuery(ProfileTaskRecord.SERVICE_ID, serviceId));
}
if (StringUtil.isNotEmpty(endpointName)) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(ProfileTaskNoneStream.ENDPOINT_NAME, endpointName));
boolQueryBuilder.must().add(QueryBuilders.termQuery(ProfileTaskRecord.ENDPOINT_NAME, endpointName));
}
if (startTimeBucket != null) {
boolQueryBuilder.must()
.add(QueryBuilders.rangeQuery(ProfileTaskNoneStream.TIME_BUCKET).gte(startTimeBucket));
.add(QueryBuilders.rangeQuery(ProfileTaskRecord.TIME_BUCKET).gte(startTimeBucket));
}
if (endTimeBucket != null) {
boolQueryBuilder.must().add(QueryBuilders.rangeQuery(ProfileTaskNoneStream.TIME_BUCKET).lte(endTimeBucket));
boolQueryBuilder.must().add(QueryBuilders.rangeQuery(ProfileTaskRecord.TIME_BUCKET).lte(endTimeBucket));
}
if (limit != null) {
......@@ -75,9 +74,9 @@ public class ProfileTaskQueryEsDAO extends EsDAO implements IProfileTaskQueryDAO
sourceBuilder.size(queryMaxSize);
}
sourceBuilder.sort(ProfileTaskNoneStream.START_TIME, SortOrder.DESC);
sourceBuilder.sort(ProfileTaskRecord.START_TIME, SortOrder.DESC);
final SearchResponse response = getClient().search(ProfileTaskNoneStream.INDEX_NAME, sourceBuilder);
final SearchResponse response = getClient().search(ProfileTaskRecord.INDEX_NAME, sourceBuilder);
final LinkedList<ProfileTask> tasks = new LinkedList<>();
for (SearchHit searchHit : response.getHits().getHits()) {
......@@ -97,7 +96,7 @@ public class ProfileTaskQueryEsDAO extends EsDAO implements IProfileTaskQueryDAO
sourceBuilder.query(QueryBuilders.idsQuery().addIds(id));
sourceBuilder.size(1);
final SearchResponse response = getClient().search(ProfileTaskNoneStream.INDEX_NAME, sourceBuilder);
final SearchResponse response = getClient().search(ProfileTaskRecord.INDEX_NAME, sourceBuilder);
if (response.getHits().getHits().length > 0) {
return parseTask(response.getHits().getHits()[0]);
......@@ -109,18 +108,19 @@ public class ProfileTaskQueryEsDAO extends EsDAO implements IProfileTaskQueryDAO
private ProfileTask parseTask(SearchHit data) {
return ProfileTask.builder()
.id(data.getId())
.serviceId(((Number) data.getSourceAsMap().get(ProfileTaskNoneStream.SERVICE_ID)).intValue())
.endpointName((String) data.getSourceAsMap().get(ProfileTaskNoneStream.ENDPOINT_NAME))
.startTime(((Number) data.getSourceAsMap().get(ProfileTaskNoneStream.START_TIME)).longValue())
.serviceId(((Number) data.getSourceAsMap().get(ProfileTaskRecord.SERVICE_ID)).intValue())
.endpointName((String) data.getSourceAsMap().get(ProfileTaskRecord.ENDPOINT_NAME))
.startTime(((Number) data.getSourceAsMap().get(ProfileTaskRecord.START_TIME)).longValue())
.createTime(((Number) data.getSourceAsMap()
.get(ProfileTaskNoneStream.CREATE_TIME)).longValue())
.duration(((Number) data.getSourceAsMap().get(ProfileTaskNoneStream.DURATION)).intValue())
.get(ProfileTaskRecord.CREATE_TIME)).longValue())
.duration(((Number) data.getSourceAsMap().get(ProfileTaskRecord.DURATION)).intValue())
.minDurationThreshold(((Number) data.getSourceAsMap()
.get(ProfileTaskNoneStream.MIN_DURATION_THRESHOLD)).intValue())
.get(
ProfileTaskRecord.MIN_DURATION_THRESHOLD)).intValue())
.dumpPeriod(((Number) data.getSourceAsMap()
.get(ProfileTaskNoneStream.DUMP_PERIOD)).intValue())
.get(ProfileTaskRecord.DUMP_PERIOD)).intValue())
.maxSamplingCount(((Number) data.getSourceAsMap()
.get(ProfileTaskNoneStream.MAX_SAMPLING_COUNT)).intValue())
.get(ProfileTaskRecord.MAX_SAMPLING_COUNT)).intValue())
.build();
}
}
......@@ -18,13 +18,6 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.profile.ProfileTaskNoneStream;
import org.apache.skywalking.oap.server.core.query.entity.ProfileTask;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
......@@ -32,6 +25,12 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
import org.apache.skywalking.oap.server.core.query.entity.ProfileTask;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
public class H2ProfileTaskQueryDAO implements IProfileTaskQueryDAO {
private JDBCHikariCPClient h2Client;
......@@ -42,39 +41,40 @@ public class H2ProfileTaskQueryDAO implements IProfileTaskQueryDAO {
@Override
public List<ProfileTask> getTaskList(Integer serviceId, String endpointName, Long startTimeBucket,
Long endTimeBucket, Integer limit) throws IOException {
Long endTimeBucket, Integer limit) throws IOException {
final StringBuilder sql = new StringBuilder();
final ArrayList<Object> condition = new ArrayList<>(4);
sql.append("select * from ").append(ProfileTaskNoneStream.INDEX_NAME).append(" where 1=1 ");
sql.append("select * from ").append(ProfileTaskRecord.INDEX_NAME).append(" where 1=1 ");
if (startTimeBucket != null) {
sql.append(" and ").append(ProfileTaskNoneStream.TIME_BUCKET).append(" >= ? ");
sql.append(" and ").append(ProfileTaskRecord.TIME_BUCKET).append(" >= ? ");
condition.add(startTimeBucket);
}
if (endTimeBucket != null) {
sql.append(" and ").append(ProfileTaskNoneStream.TIME_BUCKET).append(" <= ? ");
sql.append(" and ").append(ProfileTaskRecord.TIME_BUCKET).append(" <= ? ");
condition.add(endTimeBucket);
}
if (serviceId != null) {
sql.append(" and ").append(ProfileTaskNoneStream.SERVICE_ID).append("=? ");
sql.append(" and ").append(ProfileTaskRecord.SERVICE_ID).append("=? ");
condition.add(serviceId);
}
if (StringUtil.isNotEmpty(endpointName)) {
sql.append(" and ").append(ProfileTaskNoneStream.ENDPOINT_NAME).append("=?");
sql.append(" and ").append(ProfileTaskRecord.ENDPOINT_NAME).append("=?");
condition.add(endpointName);
}
sql.append(" ORDER BY ").append(ProfileTaskNoneStream.START_TIME).append(" DESC ");
sql.append(" ORDER BY ").append(ProfileTaskRecord.START_TIME).append(" DESC ");
if (limit != null) {
sql.append(" LIMIT ").append(limit);
}
try (Connection connection = h2Client.getConnection()) {
try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]))) {
try (ResultSet resultSet = h2Client.executeQuery(
connection, sql.toString(), condition.toArray(new Object[0]))) {
final LinkedList<ProfileTask> tasks = new LinkedList<>();
while (resultSet.next()) {
tasks.add(parseTask(resultSet));
......@@ -94,11 +94,12 @@ public class H2ProfileTaskQueryDAO implements IProfileTaskQueryDAO {
final StringBuilder sql = new StringBuilder();
final ArrayList<Object> condition = new ArrayList<>(1);
sql.append("select * from ").append(ProfileTaskNoneStream.INDEX_NAME).append(" where id=? LIMIT 1");
sql.append("select * from ").append(ProfileTaskRecord.INDEX_NAME).append(" where id=? LIMIT 1");
condition.add(id);
try (Connection connection = h2Client.getConnection()) {
try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]))) {
try (ResultSet resultSet = h2Client.executeQuery(
connection, sql.toString(), condition.toArray(new Object[0]))) {
if (resultSet.next()) {
return parseTask(resultSet);
}
......@@ -115,14 +116,14 @@ public class H2ProfileTaskQueryDAO implements IProfileTaskQueryDAO {
private ProfileTask parseTask(ResultSet data) throws SQLException {
return ProfileTask.builder()
.id(data.getString("id"))
.serviceId(data.getInt(ProfileTaskNoneStream.SERVICE_ID))
.endpointName(data.getString(ProfileTaskNoneStream.ENDPOINT_NAME))
.startTime(data.getLong(ProfileTaskNoneStream.START_TIME))
.createTime(data.getLong(ProfileTaskNoneStream.CREATE_TIME))
.duration(data.getInt(ProfileTaskNoneStream.DURATION))
.minDurationThreshold(data.getInt(ProfileTaskNoneStream.MIN_DURATION_THRESHOLD))
.dumpPeriod(data.getInt(ProfileTaskNoneStream.DUMP_PERIOD))
.maxSamplingCount(data.getInt(ProfileTaskNoneStream.MAX_SAMPLING_COUNT))
.serviceId(data.getInt(ProfileTaskRecord.SERVICE_ID))
.endpointName(data.getString(ProfileTaskRecord.ENDPOINT_NAME))
.startTime(data.getLong(ProfileTaskRecord.START_TIME))
.createTime(data.getLong(ProfileTaskRecord.CREATE_TIME))
.duration(data.getInt(ProfileTaskRecord.DURATION))
.minDurationThreshold(data.getInt(ProfileTaskRecord.MIN_DURATION_THRESHOLD))
.dumpPeriod(data.getInt(ProfileTaskRecord.DUMP_PERIOD))
.maxSamplingCount(data.getInt(ProfileTaskRecord.MAX_SAMPLING_COUNT))
.build();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册