diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationMetricEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationMetricEsUIDAO.java index b9dba1565079c7f37fd0a3829bdd667d1a5cf5ce..c3c7213f039e3731a996db1540e3ed21faad6885 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationMetricEsUIDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationMetricEsUIDAO.java @@ -18,10 +18,8 @@ package org.apache.skywalking.apm.collector.storage.es.dao.ui; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.apm.collector.storage.dao.ui.IApplicationMetricUIDAO; import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO; @@ -35,13 +33,10 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.script.Script; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.sum.Sum; -import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders; /** * @author peng-yongsheng @@ -52,8 +47,6 @@ public class ApplicationMetricEsUIDAO extends EsDAO implements IApplicationMetri super(client); } - private static final String AVG_TPS = "avg_tps"; - @Override public List getTopNApplicationThroughput(Step step, long startTimeBucket, long endTimeBucket, int betweenSecond, int topN, MetricSource metricSource) { @@ -70,36 +63,37 @@ public class ApplicationMetricEsUIDAO extends EsDAO implements IApplicationMetri searchRequestBuilder.setQuery(boolQuery); searchRequestBuilder.setSize(0); - TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(ApplicationMetricTable.COLUMN_APPLICATION_ID).field(ApplicationMetricTable.COLUMN_APPLICATION_ID).size(topN); + TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(ApplicationMetricTable.COLUMN_APPLICATION_ID).field(ApplicationMetricTable.COLUMN_APPLICATION_ID).size(2000); aggregationBuilder.subAggregation(AggregationBuilders.sum(ApplicationMetricTable.COLUMN_TRANSACTION_CALLS).field(ApplicationMetricTable.COLUMN_TRANSACTION_CALLS)); - aggregationBuilder.subAggregation(AggregationBuilders.sum(ApplicationMetricTable.COLUMN_TRANSACTION_ERROR_CALLS).field(ApplicationMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)); - - Map bucketsPathsMap = new HashMap<>(); - bucketsPathsMap.put(ApplicationMetricTable.COLUMN_TRANSACTION_CALLS, ApplicationMetricTable.COLUMN_TRANSACTION_CALLS); - bucketsPathsMap.put(ApplicationMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, ApplicationMetricTable.COLUMN_TRANSACTION_ERROR_CALLS); - - String idOrCode = "(params." + ApplicationMetricTable.COLUMN_TRANSACTION_CALLS + " - params." + ApplicationMetricTable.COLUMN_TRANSACTION_ERROR_CALLS + ")" - + " / " - + "(" + betweenSecond + ")"; - Script script = new Script(idOrCode); - aggregationBuilder.subAggregation(PipelineAggregatorBuilders.bucketScript(AVG_TPS, bucketsPathsMap, script)); - searchRequestBuilder.addAggregation(aggregationBuilder); + SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); List applicationTPSs = new LinkedList<>(); Terms applicationIdTerms = searchResponse.getAggregations().get(ApplicationMetricTable.COLUMN_APPLICATION_ID); applicationIdTerms.getBuckets().forEach(applicationIdTerm -> { int applicationId = applicationIdTerm.getKeyAsNumber().intValue(); + Sum callSum = applicationIdTerm.getAggregations().get(ApplicationMetricTable.COLUMN_TRANSACTION_CALLS); + long calls = (long)callSum.getValue(); + int callsPerSec = (int)(betweenSecond == 0 ? 0 : calls / betweenSecond); ApplicationTPS applicationTPS = new ApplicationTPS(); - InternalSimpleValue simpleValue = applicationIdTerm.getAggregations().get(AVG_TPS); - applicationTPS.setApplicationId(applicationId); - applicationTPS.setCallsPerSec((int)simpleValue.getValue()); + applicationTPS.setCallsPerSec(callsPerSec); applicationTPSs.add(applicationTPS); }); - return applicationTPSs; + + applicationTPSs.sort((first, second) -> first.getCallsPerSec() > second.getCallsPerSec() ? -1 : 1); + + if (applicationTPSs.size() <= topN) { + return applicationTPSs; + } else { + List newCollection = new LinkedList<>(); + for (int i = 0; i < topN; i++) { + newCollection.add(applicationTPSs.get(i)); + } + return newCollection; + } } @Override diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceMetricEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceMetricEsUIDAO.java index a52eba8f26b0f338cdff71674a2c20324bb6f86c..701550b9685b9e8de3190cc152e91d3c9e40e00a 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceMetricEsUIDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceMetricEsUIDAO.java @@ -18,15 +18,14 @@ package org.apache.skywalking.apm.collector.storage.es.dao.ui; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.apm.collector.core.util.Const; import org.apache.skywalking.apm.collector.storage.dao.ui.IInstanceMetricUIDAO; import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO; import org.apache.skywalking.apm.collector.storage.table.MetricSource; +import org.apache.skywalking.apm.collector.storage.table.application.ApplicationMetricTable; import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetricTable; import org.apache.skywalking.apm.collector.storage.ui.common.Step; import org.apache.skywalking.apm.collector.storage.ui.server.AppServerInfo; @@ -40,26 +39,22 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.script.Script; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders; +import org.elasticsearch.search.aggregations.metrics.sum.Sum; /** * @author peng-yongsheng */ public class InstanceMetricEsUIDAO extends EsDAO implements IInstanceMetricUIDAO { - private static final String AVG_TPS = "avg_tps"; - public InstanceMetricEsUIDAO(ElasticSearchClient client) { super(client); } - @Override public List getServerThroughput(int applicationId, Step step, long startTimeBucket, long endTimeBucket, - int secondBetween, int topN, MetricSource metricSource) { + @Override public List getServerThroughput(int applicationId, Step step, long startTimeBucket, + long endTimeBucket, int secondBetween, int topN, MetricSource metricSource) { String tableName = TimePyramidTableNameBuilder.build(step, InstanceMetricTable.TABLE); SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(tableName); @@ -76,36 +71,36 @@ public class InstanceMetricEsUIDAO extends EsDAO implements IInstanceMetricUIDAO searchRequestBuilder.setQuery(boolQuery); searchRequestBuilder.setSize(0); - TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(InstanceMetricTable.COLUMN_INSTANCE_ID).field(InstanceMetricTable.COLUMN_INSTANCE_ID).size(topN); + TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(InstanceMetricTable.COLUMN_INSTANCE_ID).field(InstanceMetricTable.COLUMN_INSTANCE_ID).size(2000); aggregationBuilder.subAggregation(AggregationBuilders.sum(InstanceMetricTable.COLUMN_TRANSACTION_CALLS).field(InstanceMetricTable.COLUMN_TRANSACTION_CALLS)); - aggregationBuilder.subAggregation(AggregationBuilders.sum(InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS).field(InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)); - - Map bucketsPathsMap = new HashMap<>(); - bucketsPathsMap.put(InstanceMetricTable.COLUMN_TRANSACTION_CALLS, InstanceMetricTable.COLUMN_TRANSACTION_CALLS); - bucketsPathsMap.put(InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS); - - String idOrCode = "(params." + InstanceMetricTable.COLUMN_TRANSACTION_CALLS + " - params." + InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS + ")" - + " / " - + "( " + secondBetween + " )"; - Script script = new Script(idOrCode); - aggregationBuilder.subAggregation(PipelineAggregatorBuilders.bucketScript(AVG_TPS, bucketsPathsMap, script)); searchRequestBuilder.addAggregation(aggregationBuilder); SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); List appServerInfos = new LinkedList<>(); - Terms serviceIdTerms = searchResponse.getAggregations().get(InstanceMetricTable.COLUMN_INSTANCE_ID); - serviceIdTerms.getBuckets().forEach(serviceIdTerm -> { - int instanceId = serviceIdTerm.getKeyAsNumber().intValue(); + Terms instanceIdTerms = searchResponse.getAggregations().get(InstanceMetricTable.COLUMN_INSTANCE_ID); + instanceIdTerms.getBuckets().forEach(instanceIdTerm -> { + int instanceId = instanceIdTerm.getKeyAsNumber().intValue(); + Sum callSum = instanceIdTerm.getAggregations().get(ApplicationMetricTable.COLUMN_TRANSACTION_CALLS); + long calls = (long)callSum.getValue(); + int callsPerSec = (int)(secondBetween == 0 ? 0 : calls / secondBetween); AppServerInfo appServerInfo = new AppServerInfo(); - InternalSimpleValue simpleValue = serviceIdTerm.getAggregations().get(AVG_TPS); - appServerInfo.setId(instanceId); - appServerInfo.setCallsPerSec((int)simpleValue.getValue()); + appServerInfo.setCallsPerSec(callsPerSec); appServerInfos.add(appServerInfo); }); - return appServerInfos; + + appServerInfos.sort((first, second) -> first.getCallsPerSec() > second.getCallsPerSec() ? -1 : 1); + if (appServerInfos.size() <= topN) { + return appServerInfos; + } else { + List newCollection = new LinkedList<>(); + for (int i = 0; i < topN; i++) { + newCollection.add(appServerInfos.get(i)); + } + return newCollection; + } } @Override public List getServerTPSTrend(int instanceId, Step step, List durationPoints) { diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceMetricEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceMetricEsUIDAO.java index 111355240466a8f16409e4f090212cccf8f630ac..677a47cef53b19d435ecc8848e05120777344661 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceMetricEsUIDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceMetricEsUIDAO.java @@ -19,10 +19,10 @@ package org.apache.skywalking.apm.collector.storage.es.dao.ui; import java.util.Collection; -import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; -import java.util.Map; +import java.util.Set; import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.apm.collector.core.util.Const; import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceMetricUIDAO; @@ -43,21 +43,19 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.script.Script; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.sum.Sum; -import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders; +import org.elasticsearch.search.sort.SortBuilders; +import org.elasticsearch.search.sort.SortOrder; /** * @author peng-yongsheng */ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO { - private static final String AVG_DURATION = "avg_duration"; - public ServiceMetricEsUIDAO(ElasticSearchClient client) { super(client); } @@ -177,8 +175,7 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO { @Override public List getSlowService(int applicationId, Step step, long startTimeBucket, long endTimeBucket, - Integer topN, - MetricSource metricSource) { + Integer topN, MetricSource metricSource) { String tableName = TimePyramidTableNameBuilder.build(step, ServiceMetricTable.TABLE); SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(tableName); @@ -193,43 +190,30 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO { boolQuery.must().add(QueryBuilders.termQuery(ServiceMetricTable.COLUMN_SOURCE_VALUE, metricSource.getValue())); searchRequestBuilder.setQuery(boolQuery); - searchRequestBuilder.setSize(0); - - TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(ServiceMetricTable.COLUMN_SERVICE_ID).field(ServiceMetricTable.COLUMN_SERVICE_ID).size(topN); - aggregationBuilder.subAggregation(AggregationBuilders.sum(ServiceMetricTable.COLUMN_TRANSACTION_CALLS).field(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)); - aggregationBuilder.subAggregation(AggregationBuilders.sum(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS).field(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)); - aggregationBuilder.subAggregation(AggregationBuilders.sum(ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM).field(ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM)); - aggregationBuilder.subAggregation(AggregationBuilders.sum(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM).field(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM)); - - Map bucketsPathsMap = new HashMap<>(); - bucketsPathsMap.put(ServiceMetricTable.COLUMN_TRANSACTION_CALLS, ServiceMetricTable.COLUMN_TRANSACTION_CALLS); - bucketsPathsMap.put(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS); - bucketsPathsMap.put(ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM, ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM); - bucketsPathsMap.put(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM, ServiceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM); - - String idOrCode = "(params." + ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM + " - params." + ServiceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM + ")" - + " / " - + "(params." + ServiceMetricTable.COLUMN_TRANSACTION_CALLS + " - params." + ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS + ")"; - Script script = new Script(idOrCode); - aggregationBuilder.subAggregation(PipelineAggregatorBuilders.bucketScript(AVG_DURATION, bucketsPathsMap, script)); - - searchRequestBuilder.addAggregation(aggregationBuilder); + searchRequestBuilder.setSize(topN * 60); + searchRequestBuilder.addSort(SortBuilders.fieldSort(ServiceMetricTable.COLUMN_TRANSACTION_AVERAGE_DURATION).order(SortOrder.DESC)); SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); - List serviceMetrics = new LinkedList<>(); - Terms serviceIdTerms = searchResponse.getAggregations().get(ServiceMetricTable.COLUMN_SERVICE_ID); - serviceIdTerms.getBuckets().forEach(serviceIdTerm -> { - int serviceId = serviceIdTerm.getKeyAsNumber().intValue(); + SearchHit[] searchHits = searchResponse.getHits().getHits(); - ServiceMetric serviceMetric = new ServiceMetric(); - InternalSimpleValue simpleValue = serviceIdTerm.getAggregations().get(AVG_DURATION); - Sum calls = serviceIdTerm.getAggregations().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS); + Set serviceIds = new HashSet<>(); + List serviceMetrics = new LinkedList<>(); + for (SearchHit searchHit : searchHits) { + int serviceId = ((Number)searchHit.getSource().get(ServiceMetricTable.COLUMN_SERVICE_ID)).intValue(); + if (!serviceIds.contains(serviceId)) { + ServiceMetric serviceMetric = new ServiceMetric(); + serviceMetric.setId(serviceId); + serviceMetric.setCalls(((Number)searchHit.getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue()); + serviceMetric.setAvgResponseTime(((Number)searchHit.getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_AVERAGE_DURATION)).intValue()); + serviceMetrics.add(serviceMetric); + + serviceIds.add(serviceId); + } + if (topN == serviceIds.size()) { + break; + } + } - serviceMetric.setCalls((long)calls.getValue()); - serviceMetric.setId(serviceId); - serviceMetric.setAvgResponseTime((int)simpleValue.getValue()); - serviceMetrics.add(serviceMetric); - }); return serviceMetrics; } }