From 285ad0a353f2d48db5920adfa5be76463a827006 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BD=AD=E5=8B=87=E5=8D=87=20pengys?= <8082209@qq.com> Date: Thu, 8 Mar 2018 21:09:57 +0800 Subject: [PATCH] 1. Fixed the OverViewLayerQuery#getTopNSlowService method bug which is not order by average response time. (#913) 2. Fixed the OverViewLayerQuery#getTopNApplicationThroughput method bug which is not order by TPS. 3. Fixed the ApplicationQuery#getSlowService method bug which is not order by average response time. 4. Fixed the ApplicationQuery#getServerThroughput method bug which is not order by TPS. #907 --- .../es/dao/ui/ApplicationMetricEsUIDAO.java | 42 +++++------- .../es/dao/ui/InstanceMetricEsUIDAO.java | 51 +++++++------- .../es/dao/ui/ServiceMetricEsUIDAO.java | 68 +++++++------------ 3 files changed, 67 insertions(+), 94 deletions(-) diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationMetricEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationMetricEsUIDAO.java index b9dba15650..c3c7213f03 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationMetricEsUIDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationMetricEsUIDAO.java @@ -18,10 +18,8 @@ package org.apache.skywalking.apm.collector.storage.es.dao.ui; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.apm.collector.storage.dao.ui.IApplicationMetricUIDAO; import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO; @@ -35,13 +33,10 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.script.Script; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.sum.Sum; -import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders; /** * @author peng-yongsheng @@ -52,8 +47,6 @@ public class ApplicationMetricEsUIDAO extends EsDAO implements IApplicationMetri super(client); } - private static final String AVG_TPS = "avg_tps"; - @Override public List getTopNApplicationThroughput(Step step, long startTimeBucket, long endTimeBucket, int betweenSecond, int topN, MetricSource metricSource) { @@ -70,36 +63,37 @@ public class ApplicationMetricEsUIDAO extends EsDAO implements IApplicationMetri searchRequestBuilder.setQuery(boolQuery); searchRequestBuilder.setSize(0); - TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(ApplicationMetricTable.COLUMN_APPLICATION_ID).field(ApplicationMetricTable.COLUMN_APPLICATION_ID).size(topN); + TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(ApplicationMetricTable.COLUMN_APPLICATION_ID).field(ApplicationMetricTable.COLUMN_APPLICATION_ID).size(2000); aggregationBuilder.subAggregation(AggregationBuilders.sum(ApplicationMetricTable.COLUMN_TRANSACTION_CALLS).field(ApplicationMetricTable.COLUMN_TRANSACTION_CALLS)); - aggregationBuilder.subAggregation(AggregationBuilders.sum(ApplicationMetricTable.COLUMN_TRANSACTION_ERROR_CALLS).field(ApplicationMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)); - - Map bucketsPathsMap = new HashMap<>(); - bucketsPathsMap.put(ApplicationMetricTable.COLUMN_TRANSACTION_CALLS, ApplicationMetricTable.COLUMN_TRANSACTION_CALLS); - bucketsPathsMap.put(ApplicationMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, ApplicationMetricTable.COLUMN_TRANSACTION_ERROR_CALLS); - - String idOrCode = "(params." + ApplicationMetricTable.COLUMN_TRANSACTION_CALLS + " - params." + ApplicationMetricTable.COLUMN_TRANSACTION_ERROR_CALLS + ")" - + " / " - + "(" + betweenSecond + ")"; - Script script = new Script(idOrCode); - aggregationBuilder.subAggregation(PipelineAggregatorBuilders.bucketScript(AVG_TPS, bucketsPathsMap, script)); - searchRequestBuilder.addAggregation(aggregationBuilder); + SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); List applicationTPSs = new LinkedList<>(); Terms applicationIdTerms = searchResponse.getAggregations().get(ApplicationMetricTable.COLUMN_APPLICATION_ID); applicationIdTerms.getBuckets().forEach(applicationIdTerm -> { int applicationId = applicationIdTerm.getKeyAsNumber().intValue(); + Sum callSum = applicationIdTerm.getAggregations().get(ApplicationMetricTable.COLUMN_TRANSACTION_CALLS); + long calls = (long)callSum.getValue(); + int callsPerSec = (int)(betweenSecond == 0 ? 0 : calls / betweenSecond); ApplicationTPS applicationTPS = new ApplicationTPS(); - InternalSimpleValue simpleValue = applicationIdTerm.getAggregations().get(AVG_TPS); - applicationTPS.setApplicationId(applicationId); - applicationTPS.setCallsPerSec((int)simpleValue.getValue()); + applicationTPS.setCallsPerSec(callsPerSec); applicationTPSs.add(applicationTPS); }); - return applicationTPSs; + + applicationTPSs.sort((first, second) -> first.getCallsPerSec() > second.getCallsPerSec() ? -1 : 1); + + if (applicationTPSs.size() <= topN) { + return applicationTPSs; + } else { + List newCollection = new LinkedList<>(); + for (int i = 0; i < topN; i++) { + newCollection.add(applicationTPSs.get(i)); + } + return newCollection; + } } @Override diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceMetricEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceMetricEsUIDAO.java index a52eba8f26..701550b968 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceMetricEsUIDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceMetricEsUIDAO.java @@ -18,15 +18,14 @@ package org.apache.skywalking.apm.collector.storage.es.dao.ui; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.apm.collector.core.util.Const; import org.apache.skywalking.apm.collector.storage.dao.ui.IInstanceMetricUIDAO; import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO; import org.apache.skywalking.apm.collector.storage.table.MetricSource; +import org.apache.skywalking.apm.collector.storage.table.application.ApplicationMetricTable; import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetricTable; import org.apache.skywalking.apm.collector.storage.ui.common.Step; import org.apache.skywalking.apm.collector.storage.ui.server.AppServerInfo; @@ -40,26 +39,22 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.script.Script; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders; +import org.elasticsearch.search.aggregations.metrics.sum.Sum; /** * @author peng-yongsheng */ public class InstanceMetricEsUIDAO extends EsDAO implements IInstanceMetricUIDAO { - private static final String AVG_TPS = "avg_tps"; - public InstanceMetricEsUIDAO(ElasticSearchClient client) { super(client); } - @Override public List getServerThroughput(int applicationId, Step step, long startTimeBucket, long endTimeBucket, - int secondBetween, int topN, MetricSource metricSource) { + @Override public List getServerThroughput(int applicationId, Step step, long startTimeBucket, + long endTimeBucket, int secondBetween, int topN, MetricSource metricSource) { String tableName = TimePyramidTableNameBuilder.build(step, InstanceMetricTable.TABLE); SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(tableName); @@ -76,36 +71,36 @@ public class InstanceMetricEsUIDAO extends EsDAO implements IInstanceMetricUIDAO searchRequestBuilder.setQuery(boolQuery); searchRequestBuilder.setSize(0); - TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(InstanceMetricTable.COLUMN_INSTANCE_ID).field(InstanceMetricTable.COLUMN_INSTANCE_ID).size(topN); + TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(InstanceMetricTable.COLUMN_INSTANCE_ID).field(InstanceMetricTable.COLUMN_INSTANCE_ID).size(2000); aggregationBuilder.subAggregation(AggregationBuilders.sum(InstanceMetricTable.COLUMN_TRANSACTION_CALLS).field(InstanceMetricTable.COLUMN_TRANSACTION_CALLS)); - aggregationBuilder.subAggregation(AggregationBuilders.sum(InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS).field(InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)); - - Map bucketsPathsMap = new HashMap<>(); - bucketsPathsMap.put(InstanceMetricTable.COLUMN_TRANSACTION_CALLS, InstanceMetricTable.COLUMN_TRANSACTION_CALLS); - bucketsPathsMap.put(InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS); - - String idOrCode = "(params." + InstanceMetricTable.COLUMN_TRANSACTION_CALLS + " - params." + InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS + ")" - + " / " - + "( " + secondBetween + " )"; - Script script = new Script(idOrCode); - aggregationBuilder.subAggregation(PipelineAggregatorBuilders.bucketScript(AVG_TPS, bucketsPathsMap, script)); searchRequestBuilder.addAggregation(aggregationBuilder); SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); List appServerInfos = new LinkedList<>(); - Terms serviceIdTerms = searchResponse.getAggregations().get(InstanceMetricTable.COLUMN_INSTANCE_ID); - serviceIdTerms.getBuckets().forEach(serviceIdTerm -> { - int instanceId = serviceIdTerm.getKeyAsNumber().intValue(); + Terms instanceIdTerms = searchResponse.getAggregations().get(InstanceMetricTable.COLUMN_INSTANCE_ID); + instanceIdTerms.getBuckets().forEach(instanceIdTerm -> { + int instanceId = instanceIdTerm.getKeyAsNumber().intValue(); + Sum callSum = instanceIdTerm.getAggregations().get(ApplicationMetricTable.COLUMN_TRANSACTION_CALLS); + long calls = (long)callSum.getValue(); + int callsPerSec = (int)(secondBetween == 0 ? 0 : calls / secondBetween); AppServerInfo appServerInfo = new AppServerInfo(); - InternalSimpleValue simpleValue = serviceIdTerm.getAggregations().get(AVG_TPS); - appServerInfo.setId(instanceId); - appServerInfo.setCallsPerSec((int)simpleValue.getValue()); + appServerInfo.setCallsPerSec(callsPerSec); appServerInfos.add(appServerInfo); }); - return appServerInfos; + + appServerInfos.sort((first, second) -> first.getCallsPerSec() > second.getCallsPerSec() ? -1 : 1); + if (appServerInfos.size() <= topN) { + return appServerInfos; + } else { + List newCollection = new LinkedList<>(); + for (int i = 0; i < topN; i++) { + newCollection.add(appServerInfos.get(i)); + } + return newCollection; + } } @Override public List getServerTPSTrend(int instanceId, Step step, List durationPoints) { diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceMetricEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceMetricEsUIDAO.java index 1113552404..677a47cef5 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceMetricEsUIDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceMetricEsUIDAO.java @@ -19,10 +19,10 @@ package org.apache.skywalking.apm.collector.storage.es.dao.ui; import java.util.Collection; -import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; -import java.util.Map; +import java.util.Set; import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.apm.collector.core.util.Const; import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceMetricUIDAO; @@ -43,21 +43,19 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.script.Script; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.sum.Sum; -import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders; +import org.elasticsearch.search.sort.SortBuilders; +import org.elasticsearch.search.sort.SortOrder; /** * @author peng-yongsheng */ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO { - private static final String AVG_DURATION = "avg_duration"; - public ServiceMetricEsUIDAO(ElasticSearchClient client) { super(client); } @@ -177,8 +175,7 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO { @Override public List getSlowService(int applicationId, Step step, long startTimeBucket, long endTimeBucket, - Integer topN, - MetricSource metricSource) { + Integer topN, MetricSource metricSource) { String tableName = TimePyramidTableNameBuilder.build(step, ServiceMetricTable.TABLE); SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(tableName); @@ -193,43 +190,30 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO { boolQuery.must().add(QueryBuilders.termQuery(ServiceMetricTable.COLUMN_SOURCE_VALUE, metricSource.getValue())); searchRequestBuilder.setQuery(boolQuery); - searchRequestBuilder.setSize(0); - - TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(ServiceMetricTable.COLUMN_SERVICE_ID).field(ServiceMetricTable.COLUMN_SERVICE_ID).size(topN); - aggregationBuilder.subAggregation(AggregationBuilders.sum(ServiceMetricTable.COLUMN_TRANSACTION_CALLS).field(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)); - aggregationBuilder.subAggregation(AggregationBuilders.sum(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS).field(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)); - aggregationBuilder.subAggregation(AggregationBuilders.sum(ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM).field(ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM)); - aggregationBuilder.subAggregation(AggregationBuilders.sum(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM).field(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM)); - - Map bucketsPathsMap = new HashMap<>(); - bucketsPathsMap.put(ServiceMetricTable.COLUMN_TRANSACTION_CALLS, ServiceMetricTable.COLUMN_TRANSACTION_CALLS); - bucketsPathsMap.put(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS); - bucketsPathsMap.put(ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM, ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM); - bucketsPathsMap.put(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM, ServiceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM); - - String idOrCode = "(params." + ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM + " - params." + ServiceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM + ")" - + " / " - + "(params." + ServiceMetricTable.COLUMN_TRANSACTION_CALLS + " - params." + ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS + ")"; - Script script = new Script(idOrCode); - aggregationBuilder.subAggregation(PipelineAggregatorBuilders.bucketScript(AVG_DURATION, bucketsPathsMap, script)); - - searchRequestBuilder.addAggregation(aggregationBuilder); + searchRequestBuilder.setSize(topN * 60); + searchRequestBuilder.addSort(SortBuilders.fieldSort(ServiceMetricTable.COLUMN_TRANSACTION_AVERAGE_DURATION).order(SortOrder.DESC)); SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); - List serviceMetrics = new LinkedList<>(); - Terms serviceIdTerms = searchResponse.getAggregations().get(ServiceMetricTable.COLUMN_SERVICE_ID); - serviceIdTerms.getBuckets().forEach(serviceIdTerm -> { - int serviceId = serviceIdTerm.getKeyAsNumber().intValue(); + SearchHit[] searchHits = searchResponse.getHits().getHits(); - ServiceMetric serviceMetric = new ServiceMetric(); - InternalSimpleValue simpleValue = serviceIdTerm.getAggregations().get(AVG_DURATION); - Sum calls = serviceIdTerm.getAggregations().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS); + Set serviceIds = new HashSet<>(); + List serviceMetrics = new LinkedList<>(); + for (SearchHit searchHit : searchHits) { + int serviceId = ((Number)searchHit.getSource().get(ServiceMetricTable.COLUMN_SERVICE_ID)).intValue(); + if (!serviceIds.contains(serviceId)) { + ServiceMetric serviceMetric = new ServiceMetric(); + serviceMetric.setId(serviceId); + serviceMetric.setCalls(((Number)searchHit.getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue()); + serviceMetric.setAvgResponseTime(((Number)searchHit.getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_AVERAGE_DURATION)).intValue()); + serviceMetrics.add(serviceMetric); + + serviceIds.add(serviceId); + } + if (topN == serviceIds.size()) { + break; + } + } - serviceMetric.setCalls((long)calls.getValue()); - serviceMetric.setId(serviceId); - serviceMetric.setAvgResponseTime((int)simpleValue.getValue()); - serviceMetrics.add(serviceMetric); - }); return serviceMetrics; } } -- GitLab