未验证 提交 ce3bff89 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Merge pull request #853 from peng-yongsheng/feature/ServiceQuery

"searchService","getServiceResponseTimeTrend","getServiceTPSTrend","getServiceSLATrend","getServiceTopology"
......@@ -21,7 +21,6 @@ package org.apache.skywalking.apm.collector.storage.dao.ui;
import java.util.List;
import org.apache.skywalking.apm.collector.storage.base.dao.DAO;
import org.apache.skywalking.apm.collector.storage.table.MetricSource;
import org.apache.skywalking.apm.collector.storage.ui.common.Call;
import org.apache.skywalking.apm.collector.storage.ui.common.Step;
/**
......@@ -29,15 +28,66 @@ import org.apache.skywalking.apm.collector.storage.ui.common.Step;
*/
public interface IServiceReferenceMetricUIDAO extends DAO {
List<Call> getFrontServices(Step step, long startTime, long endTime,
List<ServiceReferenceMetric> getFrontServices(Step step, long startTimeBucket, long endTimeBucket,
MetricSource metricSource, int behindServiceId);
List<Call> getBehindServices(Step step, long startTime, long endTime,
List<ServiceReferenceMetric> getBehindServices(Step step, long startTimeBucket, long endTimeBucket,
MetricSource metricSource, int frontServiceId);
List<Call> getFrontServices(Step step, long startTime, long endTime,
MetricSource metricSource, List<Integer> behindServiceIds);
class ServiceReferenceMetric {
private int source;
private int target;
private long calls;
private long errorCalls;
private long durations;
private long errorDurations;
List<Call> getBehindServices(Step step, long startTime, long endTime,
MetricSource metricSource, List<Integer> frontServiceIds);
public int getSource() {
return source;
}
public void setSource(int source) {
this.source = source;
}
public int getTarget() {
return target;
}
public void setTarget(int target) {
this.target = target;
}
public long getCalls() {
return calls;
}
public void setCalls(long calls) {
this.calls = calls;
}
public long getErrorCalls() {
return errorCalls;
}
public void setErrorCalls(long errorCalls) {
this.errorCalls = errorCalls;
}
public long getDurations() {
return durations;
}
public void setDurations(long durations) {
this.durations = durations;
}
public long getErrorDurations() {
return errorDurations;
}
public void setErrorDurations(long errorDurations) {
this.errorDurations = errorDurations;
}
}
}
......@@ -29,6 +29,7 @@ import org.apache.skywalking.apm.network.proto.SpanType;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
......@@ -58,7 +59,11 @@ public class ServiceNameServiceEsUIDAO extends EsDAO implements IServiceNameServ
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setSize(topN);
searchRequestBuilder.setQuery(QueryBuilders.matchQuery(ServiceNameTable.COLUMN_SERVICE_NAME, keyword));
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(QueryBuilders.matchQuery(ServiceNameTable.COLUMN_SERVICE_NAME, keyword));
boolQuery.must().add(QueryBuilders.termQuery(ServiceNameTable.COLUMN_SRC_SPAN_TYPE, SpanType.Entry_VALUE));
searchRequestBuilder.setQuery(boolQuery);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
SearchHit[] searchHits = searchResponse.getHits().getHits();
......
......@@ -25,7 +25,6 @@ import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceReferenceMetri
import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.apache.skywalking.apm.collector.storage.table.MetricSource;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetricTable;
import org.apache.skywalking.apm.collector.storage.ui.common.Call;
import org.apache.skywalking.apm.collector.storage.ui.common.Step;
import org.apache.skywalking.apm.collector.storage.utils.TimePyramidTableNameBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
......@@ -47,7 +46,8 @@ public class ServiceReferenceEsMetricUIDAO extends EsDAO implements IServiceRefe
super(client);
}
@Override public List<Call> getFrontServices(Step step, long startTime, long endTime, MetricSource metricSource,
@Override public List<ServiceReferenceMetric> getFrontServices(Step step, long startTimeBucket, long endTimeBucket,
MetricSource metricSource,
int behindServiceId) {
String tableName = TimePyramidTableNameBuilder.build(step, ServiceReferenceMetricTable.TABLE);
......@@ -56,7 +56,7 @@ public class ServiceReferenceEsMetricUIDAO extends EsDAO implements IServiceRefe
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(QueryBuilders.rangeQuery(ServiceReferenceMetricTable.COLUMN_TIME_BUCKET).gte(startTime).lte(endTime));
boolQuery.must().add(QueryBuilders.rangeQuery(ServiceReferenceMetricTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket));
boolQuery.must().add(QueryBuilders.termQuery(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID, behindServiceId));
boolQuery.must().add(QueryBuilders.termQuery(ServiceReferenceMetricTable.COLUMN_SOURCE_VALUE, metricSource.getValue()));
......@@ -65,19 +65,22 @@ public class ServiceReferenceEsMetricUIDAO extends EsDAO implements IServiceRefe
TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID).field(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID).size(100);
aggregationBuilder.subAggregation(AggregationBuilders.sum(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS).field(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS));
aggregationBuilder.subAggregation(AggregationBuilders.sum(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS).field(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS));
aggregationBuilder.subAggregation(AggregationBuilders.sum(ServiceReferenceMetricTable.COLUMN_TRANSACTION_DURATION_SUM).field(ServiceReferenceMetricTable.COLUMN_TRANSACTION_DURATION_SUM));
aggregationBuilder.subAggregation(AggregationBuilders.sum(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM).field(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM));
searchRequestBuilder.addAggregation(aggregationBuilder);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
List<Call> calls = new LinkedList<>();
List<ServiceReferenceMetric> referenceMetrics = new LinkedList<>();
Terms frontServiceIdTerms = searchResponse.getAggregations().get(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID);
buildNodeByBehindServiceId(calls, frontServiceIdTerms, behindServiceId);
buildNodeByBehindServiceId(referenceMetrics, frontServiceIdTerms, behindServiceId);
return calls;
return referenceMetrics;
}
@Override public List<Call> getBehindServices(Step step, long startTime, long endTime, MetricSource metricSource,
@Override public List<ServiceReferenceMetric> getBehindServices(Step step, long startTimeBucket, long endTimeBucket,
MetricSource metricSource,
int frontServiceId) {
String tableName = TimePyramidTableNameBuilder.build(step, ServiceReferenceMetricTable.TABLE);
......@@ -86,7 +89,7 @@ public class ServiceReferenceEsMetricUIDAO extends EsDAO implements IServiceRefe
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(QueryBuilders.rangeQuery(ServiceReferenceMetricTable.COLUMN_TIME_BUCKET).gte(startTime).lte(endTime));
boolQuery.must().add(QueryBuilders.rangeQuery(ServiceReferenceMetricTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket));
boolQuery.must().add(QueryBuilders.termQuery(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID, frontServiceId));
boolQuery.must().add(QueryBuilders.termQuery(ServiceReferenceMetricTable.COLUMN_SOURCE_VALUE, metricSource.getValue()));
......@@ -95,106 +98,59 @@ public class ServiceReferenceEsMetricUIDAO extends EsDAO implements IServiceRefe
TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID).field(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID).size(100);
aggregationBuilder.subAggregation(AggregationBuilders.sum(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS).field(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS));
aggregationBuilder.subAggregation(AggregationBuilders.sum(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS).field(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS));
aggregationBuilder.subAggregation(AggregationBuilders.sum(ServiceReferenceMetricTable.COLUMN_TRANSACTION_DURATION_SUM).field(ServiceReferenceMetricTable.COLUMN_TRANSACTION_DURATION_SUM));
aggregationBuilder.subAggregation(AggregationBuilders.sum(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM).field(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM));
searchRequestBuilder.addAggregation(aggregationBuilder);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
List<Call> calls = new LinkedList<>();
List<ServiceReferenceMetric> referenceMetrics = new LinkedList<>();
Terms behindServiceIdTerms = searchResponse.getAggregations().get(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID);
buildNodeByFrontServiceId(calls, behindServiceIdTerms, frontServiceId);
buildNodeByFrontServiceId(referenceMetrics, behindServiceIdTerms, frontServiceId);
return calls;
return referenceMetrics;
}
@Override public List<Call> getFrontServices(Step step, long startTime, long endTime, MetricSource metricSource,
List<Integer> behindServiceIds) {
String tableName = TimePyramidTableNameBuilder.build(step, ServiceReferenceMetricTable.TABLE);
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(tableName);
searchRequestBuilder.setTypes(ServiceReferenceMetricTable.TABLE_TYPE);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(QueryBuilders.rangeQuery(ServiceReferenceMetricTable.COLUMN_TIME_BUCKET).gte(startTime).lte(endTime));
boolQuery.must().add(QueryBuilders.termsQuery(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID, behindServiceIds));
boolQuery.must().add(QueryBuilders.termQuery(ServiceReferenceMetricTable.COLUMN_SOURCE_VALUE, metricSource.getValue()));
searchRequestBuilder.setQuery(boolQuery);
searchRequestBuilder.setSize(0);
return executeAggregation(searchRequestBuilder);
}
@Override public List<Call> getBehindServices(Step step, long startTime, long endTime, MetricSource metricSource,
List<Integer> frontServiceIds) {
String tableName = TimePyramidTableNameBuilder.build(step, ServiceReferenceMetricTable.TABLE);
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(tableName);
searchRequestBuilder.setTypes(ServiceReferenceMetricTable.TABLE_TYPE);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(QueryBuilders.rangeQuery(ServiceReferenceMetricTable.COLUMN_TIME_BUCKET).gte(startTime).lte(endTime));
boolQuery.must().add(QueryBuilders.termsQuery(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID, frontServiceIds));
boolQuery.must().add(QueryBuilders.termQuery(ServiceReferenceMetricTable.COLUMN_SOURCE_VALUE, metricSource.getValue()));
searchRequestBuilder.setQuery(boolQuery);
searchRequestBuilder.setSize(0);
return executeAggregation(searchRequestBuilder);
}
private List<Call> executeAggregation(SearchRequestBuilder searchRequestBuilder) {
TermsAggregationBuilder frontAggregationBuilder = AggregationBuilders.terms(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID).field(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID).size(100);
TermsAggregationBuilder behindAggregationBuilder = AggregationBuilders.terms(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID).field(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID).size(100);
behindAggregationBuilder.subAggregation(AggregationBuilders.sum(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS).field(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS));
behindAggregationBuilder.subAggregation(AggregationBuilders.sum(ServiceReferenceMetricTable.COLUMN_TRANSACTION_DURATION_SUM).field(ServiceReferenceMetricTable.COLUMN_TRANSACTION_DURATION_SUM));
frontAggregationBuilder.subAggregation(behindAggregationBuilder);
searchRequestBuilder.addAggregation(frontAggregationBuilder);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
List<Call> nodes = new LinkedList<>();
Terms frontServiceIdTerms = searchResponse.getAggregations().get(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID);
frontServiceIdTerms.getBuckets().forEach(frontServiceIdBucket -> {
int frontServiceId = frontServiceIdBucket.getKeyAsNumber().intValue();
Terms behindServiceIdTerms = frontServiceIdBucket.getAggregations().get(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID);
buildNodeByFrontServiceId(nodes, behindServiceIdTerms, frontServiceId);
});
return nodes;
}
private void buildNodeByFrontServiceId(List<Call> calls, Terms behindServiceIdTerms, int frontServiceId) {
private void buildNodeByFrontServiceId(List<ServiceReferenceMetric> referenceMetrics, Terms behindServiceIdTerms,
int frontServiceId) {
behindServiceIdTerms.getBuckets().forEach(behindServiceIdBucket -> {
int behindServiceId = behindServiceIdBucket.getKeyAsNumber().intValue();
Sum callsSum = behindServiceIdBucket.getAggregations().get(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS);
Sum responseTimes = behindServiceIdBucket.getAggregations().get(ServiceReferenceMetricTable.COLUMN_TRANSACTION_DURATION_SUM);
Call call = new Call();
call.setSource(frontServiceId);
call.setTarget(behindServiceId);
// call.setCalls((int)callsSum.getValue());
// call.setResponseTimes((int)responseTimes.getValue());
calls.add(call);
Sum errorCallsSum = behindServiceIdBucket.getAggregations().get(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS);
Sum durationSum = behindServiceIdBucket.getAggregations().get(ServiceReferenceMetricTable.COLUMN_TRANSACTION_DURATION_SUM);
Sum errorDurationSum = behindServiceIdBucket.getAggregations().get(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM);
ServiceReferenceMetric referenceMetric = new ServiceReferenceMetric();
referenceMetric.setSource(frontServiceId);
referenceMetric.setTarget(behindServiceId);
referenceMetric.setCalls((long)callsSum.getValue());
referenceMetric.setErrorCalls((long)errorCallsSum.getValue());
referenceMetric.setDurations((long)durationSum.getValue());
referenceMetric.setErrorDurations((long)errorDurationSum.getValue());
referenceMetrics.add(referenceMetric);
});
}
private void buildNodeByBehindServiceId(List<Call> calls, Terms frontServiceIdTerms, int behindServiceId) {
private void buildNodeByBehindServiceId(List<ServiceReferenceMetric> referenceMetrics, Terms frontServiceIdTerms,
int behindServiceId) {
frontServiceIdTerms.getBuckets().forEach(frontServiceIdBucket -> {
int frontServiceId = frontServiceIdBucket.getKeyAsNumber().intValue();
Sum callsSum = frontServiceIdBucket.getAggregations().get(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS);
Sum responseTimes = frontServiceIdBucket.getAggregations().get(ServiceReferenceMetricTable.COLUMN_TRANSACTION_DURATION_SUM);
Call call = new Call();
call.setTarget(behindServiceId);
call.setSource(frontServiceId);
// call.setCalls((int)callsSum.getValue());
// call.setResponseTimes((int)responseTimes.getValue());
calls.add(call);
Sum errorCallsSum = frontServiceIdBucket.getAggregations().get(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS);
Sum durationSum = frontServiceIdBucket.getAggregations().get(ServiceReferenceMetricTable.COLUMN_TRANSACTION_DURATION_SUM);
Sum errorDurationSum = frontServiceIdBucket.getAggregations().get(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM);
ServiceReferenceMetric referenceMetric = new ServiceReferenceMetric();
referenceMetric.setTarget(behindServiceId);
referenceMetric.setSource(frontServiceId);
referenceMetric.setCalls((long)callsSum.getValue());
referenceMetric.setErrorCalls((long)errorCallsSum.getValue());
referenceMetric.setDurations((long)durationSum.getValue());
referenceMetric.setErrorDurations((long)errorDurationSum.getValue());
referenceMetrics.add(referenceMetric);
});
}
}
......@@ -60,9 +60,9 @@ public class ServiceNameServiceH2UIDAO extends H2DAO implements IServiceNameServ
}
@Override public List<ServiceInfo> searchService(String keyword, int topN) {
String dynamicSql = "select {0},{1} from {2} where {3} like ? limit ?";
String sql = SqlBuilder.buildSql(dynamicSql, ServiceNameTable.COLUMN_SERVICE_ID, ServiceNameTable.COLUMN_SERVICE_NAME, ServiceNameTable.TABLE, ServiceNameTable.COLUMN_SERVICE_NAME);
Object[] params = new Object[] {keyword, topN};
String dynamicSql = "select {0},{1} from {2} where {3} like ? and {4} = ? limit ?";
String sql = SqlBuilder.buildSql(dynamicSql, ServiceNameTable.COLUMN_SERVICE_ID, ServiceNameTable.COLUMN_SERVICE_NAME, ServiceNameTable.TABLE, ServiceNameTable.COLUMN_SERVICE_NAME, ServiceNameTable.COLUMN_SRC_SPAN_TYPE);
Object[] params = new Object[] {keyword, SpanType.Entry_VALUE, topN};
List<ServiceInfo> serviceInfos = new LinkedList<>();
try (ResultSet rs = getClient().executeQuery(sql, params)) {
......
......@@ -23,7 +23,6 @@ import org.apache.skywalking.apm.collector.client.h2.H2Client;
import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceReferenceMetricUIDAO;
import org.apache.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.apache.skywalking.apm.collector.storage.table.MetricSource;
import org.apache.skywalking.apm.collector.storage.ui.common.Call;
import org.apache.skywalking.apm.collector.storage.ui.common.Step;
/**
......@@ -35,23 +34,15 @@ public class ServiceReferenceH2MetricUIDAO extends H2DAO implements IServiceRefe
super(client);
}
@Override public List<Call> getFrontServices(Step step, long startTime, long endTime, MetricSource metricSource,
@Override public List<ServiceReferenceMetric> getFrontServices(Step step, long startTimeBucket, long endTimeBucket,
MetricSource metricSource,
int behindServiceId) {
return null;
}
@Override public List<Call> getBehindServices(Step step, long startTime, long endTime, MetricSource metricSource,
@Override public List<ServiceReferenceMetric> getBehindServices(Step step, long startTimeBucket, long endTimeBucket,
MetricSource metricSource,
int frontServiceId) {
return null;
}
@Override public List<Call> getFrontServices(Step step, long startTime, long endTime, MetricSource metricSource,
List<Integer> behindServiceIds) {
return null;
}
@Override public List<Call> getBehindServices(Step step, long startTime, long endTime, MetricSource metricSource,
List<Integer> frontServiceIds) {
return null;
}
}
......@@ -65,27 +65,34 @@ public class ServiceQuery implements Query {
}
public ResponseTimeTrend getServiceResponseTimeTrend(int serviceId, Duration duration) throws ParseException {
long start = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
long end = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
return getServiceNameService().getServiceResponseTimeTrend(serviceId, duration.getStep(), start, end);
long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
return getServiceNameService().getServiceResponseTimeTrend(serviceId, duration.getStep(), startTimeBucket, endTimeBucket);
}
public ThroughputTrend getServiceTPSTrend(int serviceId, Duration duration) throws ParseException {
long start = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
long end = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
return getServiceNameService().getServiceTPSTrend(serviceId, duration.getStep(), start, end);
long startSecondTimeBucket = DurationUtils.INSTANCE.durationToSecondTimeBucket(duration.getStep(), duration.getStart());
long endSecondTimeBucket = DurationUtils.INSTANCE.durationToSecondTimeBucket(duration.getStep(), duration.getEnd());
return getServiceNameService().getServiceTPSTrend(serviceId, duration.getStep(), startTimeBucket, endTimeBucket, startSecondTimeBucket, endSecondTimeBucket);
}
public SLATrend getServiceSLATrend(int serviceId, Duration duration) throws ParseException {
long start = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
long end = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
return getServiceNameService().getServiceSLATrend(serviceId, duration.getStep(), start, end);
long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
return getServiceNameService().getServiceSLATrend(serviceId, duration.getStep(), startTimeBucket, endTimeBucket);
}
public Topology getServiceTopology(int serviceId, Duration duration) throws ParseException {
long start = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
long end = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
return getServiceTopologyService().getServiceTopology(duration.getStep(), serviceId, start, end);
long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
long startSecondTimeBucket = DurationUtils.INSTANCE.durationToSecondTimeBucket(duration.getStep(), duration.getStart());
long endSecondTimeBucket = DurationUtils.INSTANCE.durationToSecondTimeBucket(duration.getStep(), duration.getEnd());
return getServiceTopologyService().getServiceTopology(duration.getStep(), serviceId, startTimeBucket, endTimeBucket, startSecondTimeBucket, endSecondTimeBucket);
}
}
......@@ -66,28 +66,30 @@ public class ServiceNameService {
return serviceNameServiceUIDAO.searchService(keyword, topN);
}
public ThroughputTrend getServiceTPSTrend(int serviceId, Step step, long start, long end) throws ParseException {
public ThroughputTrend getServiceTPSTrend(int serviceId, Step step, long startTimeBucket,
long endTimeBucket, long startSecondTimeBucket, long endSecondTimeBucket) throws ParseException {
ThroughputTrend throughputTrend = new ThroughputTrend();
List<DurationPoint> durationPoints = DurationUtils.INSTANCE.getDurationPoints(step, start, end);
List<DurationPoint> durationPoints = DurationUtils.INSTANCE.getDurationPoints(step, startTimeBucket, endTimeBucket);
List<Integer> callsTrends = serviceMetricUIDAO.getServiceTPSTrend(serviceId, step, durationPoints);
//TODO
callsTrends.forEach(calls -> throughputTrend.getTrendList().add(calls / 1000));
ServiceName serviceName = serviceNameCacheService.get(serviceId);
int secondBetween = secondBetweenService.calculate(serviceName.getApplicationId(), startSecondTimeBucket, endSecondTimeBucket);
callsTrends.forEach(calls -> throughputTrend.getTrendList().add(calls / secondBetween));
return throughputTrend;
}
public ResponseTimeTrend getServiceResponseTimeTrend(int serviceId, Step step, long start,
long end) throws ParseException {
public ResponseTimeTrend getServiceResponseTimeTrend(int serviceId, Step step, long startTimeBucket,
long endTimeBucket) throws ParseException {
ResponseTimeTrend responseTimeTrend = new ResponseTimeTrend();
List<DurationPoint> durationPoints = DurationUtils.INSTANCE.getDurationPoints(step, start, end);
List<DurationPoint> durationPoints = DurationUtils.INSTANCE.getDurationPoints(step, startTimeBucket, endTimeBucket);
responseTimeTrend.setTrendList(serviceMetricUIDAO.getServiceResponseTimeTrend(serviceId, step, durationPoints));
return responseTimeTrend;
}
public SLATrend getServiceSLATrend(int serviceId, Step step, long start, long end) throws ParseException {
public SLATrend getServiceSLATrend(int serviceId, Step step, long startTimeBucket, long endTimeBucket) throws ParseException {
SLATrend slaTrend = new SLATrend();
List<DurationPoint> durationPoints = DurationUtils.INSTANCE.getDurationPoints(step, start, end);
List<DurationPoint> durationPoints = DurationUtils.INSTANCE.getDurationPoints(step, startTimeBucket, endTimeBucket);
slaTrend.setTrendList(serviceMetricUIDAO.getServiceSLATrend(serviceId, step, durationPoints));
return slaTrend;
}
......
......@@ -21,6 +21,7 @@ package org.apache.skywalking.apm.collector.ui.service;
import java.text.ParseException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
......@@ -38,6 +39,7 @@ import org.apache.skywalking.apm.collector.storage.ui.common.Call;
import org.apache.skywalking.apm.collector.storage.ui.common.Node;
import org.apache.skywalking.apm.collector.storage.ui.common.Step;
import org.apache.skywalking.apm.collector.storage.ui.common.Topology;
import org.apache.skywalking.apm.collector.storage.ui.common.VisualUserNode;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -53,40 +55,77 @@ public class ServiceTopologyService {
private final IServiceMetricUIDAO serviceMetricUIDAO;
private final IServiceReferenceMetricUIDAO serviceReferenceMetricUIDAO;
private final ServiceNameCacheService serviceNameCacheService;
private final SecondBetweenService secondBetweenService;
public ServiceTopologyService(ModuleManager moduleManager) {
this.serviceMetricUIDAO = moduleManager.find(StorageModule.NAME).getService(IServiceMetricUIDAO.class);
this.serviceReferenceMetricUIDAO = moduleManager.find(StorageModule.NAME).getService(IServiceReferenceMetricUIDAO.class);
this.applicationComponentUIDAO = moduleManager.find(StorageModule.NAME).getService(IApplicationComponentUIDAO.class);
this.serviceNameCacheService = moduleManager.find(CacheModule.NAME).getService(ServiceNameCacheService.class);
this.secondBetweenService = new SecondBetweenService(moduleManager);
}
public Topology getServiceTopology(Step step, int serviceId, long start, long end) throws ParseException {
logger.debug("start: {}, end: {}", start, end);
List<IApplicationComponentUIDAO.ApplicationComponent> applicationComponents = applicationComponentUIDAO.load(step, start, end);
public Topology getServiceTopology(Step step, int serviceId, long startTimeBucket,
long endTimeBucket, long startSecondTimeBucket, long endSecondTimeBucket) throws ParseException {
logger.debug("startTimeBucket: {}, endTimeBucket: {}", startTimeBucket, endTimeBucket);
List<IApplicationComponentUIDAO.ApplicationComponent> applicationComponents = applicationComponentUIDAO.load(step, startTimeBucket, endTimeBucket);
Map<Integer, String> components = new HashMap<>();
applicationComponents.forEach(component -> components.put(component.getApplicationId(), ComponentsDefine.getInstance().getComponentName(component.getComponentId())));
List<Call> calleeCalls = serviceReferenceMetricUIDAO.getFrontServices(step, start, end, MetricSource.Callee, serviceId);
calleeCalls.addAll(serviceReferenceMetricUIDAO.getBehindServices(step, start, end, MetricSource.Caller, serviceId));
List<IServiceReferenceMetricUIDAO.ServiceReferenceMetric> referenceMetrics = serviceReferenceMetricUIDAO.getFrontServices(step, startTimeBucket, endTimeBucket, MetricSource.Callee, serviceId);
referenceMetrics.addAll(serviceReferenceMetricUIDAO.getBehindServices(step, startTimeBucket, endTimeBucket, MetricSource.Caller, serviceId));
Set<Integer> nodeIds = new HashSet<>();
calleeCalls.forEach(call -> {
call.setCallType(Const.EMPTY_STRING);
nodeIds.add(call.getSource());
nodeIds.add(call.getTarget());
List<Call> calls = new LinkedList<>();
referenceMetrics.forEach(referenceMetric -> {
nodeIds.add(referenceMetric.getSource());
nodeIds.add(referenceMetric.getTarget());
Call call = new Call();
call.setSource(referenceMetric.getSource());
call.setTarget(referenceMetric.getTarget());
call.setAvgResponseTime((referenceMetric.getDurations() - referenceMetric.getErrorDurations()) / (referenceMetric.getCalls() - referenceMetric.getErrorCalls()));
call.setCallType(components.getOrDefault(serviceNameCacheService.get(referenceMetric.getTarget()).getApplicationId(), Const.UNKNOWN));
try {
int applicationId = serviceNameCacheService.get(referenceMetric.getTarget()).getApplicationId();
call.setCallsPerSec(referenceMetric.getCalls() / secondBetweenService.calculate(applicationId, startSecondTimeBucket, endSecondTimeBucket));
} catch (ParseException e) {
logger.error(e.getMessage(), e);
}
calls.add(call);
});
List<Node> serviceNodes = serviceMetricUIDAO.getServicesMetric(step, start, end, MetricSource.Callee, nodeIds);
List<Node> serviceNodes = serviceMetricUIDAO.getServicesMetric(step, startTimeBucket, endTimeBucket, MetricSource.Callee, nodeIds);
Set<Integer> gotNodes = new HashSet<>();
serviceNodes.forEach(serviceNode -> gotNodes.add(serviceNode.getId()));
Set<Integer> callerNodeIds = new HashSet<>();
nodeIds.forEach(nodeId -> {
if (!gotNodes.contains(nodeId)) {
callerNodeIds.add(nodeId);
}
});
serviceNodes.addAll(serviceMetricUIDAO.getServicesMetric(step, startTimeBucket, endTimeBucket, MetricSource.Caller, callerNodeIds));
serviceNodes.forEach(serviceNode -> {
ServiceName serviceName = serviceNameCacheService.get(serviceNode.getId());
serviceNode.setName(serviceName.getServiceName());
});
if (callerNodeIds.contains(Const.NONE_SERVICE_ID)) {
VisualUserNode userNode = new VisualUserNode();
userNode.setId(Const.NONE_SERVICE_ID);
userNode.setName(Const.USER_CODE);
userNode.setType(Const.USER_CODE.toUpperCase());
serviceNodes.add(userNode);
}
Topology topology = new Topology();
topology.setCalls(calleeCalls);
topology.setCalls(calls);
topology.setNodes(serviceNodes);
return topology;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册