提交 1bc0d845 编写于 作者: P peng-yongsheng

Provide the getTopNServerThroughput query.

上级 87d246d7
......@@ -20,7 +20,9 @@ package org.apache.skywalking.apm.collector.storage.dao.ui;
import java.util.List;
import org.apache.skywalking.apm.collector.storage.base.dao.DAO;
import org.apache.skywalking.apm.collector.storage.table.MetricSource;
import org.apache.skywalking.apm.collector.storage.ui.common.Step;
import org.apache.skywalking.apm.collector.storage.ui.server.AppServerInfo;
import org.apache.skywalking.apm.collector.storage.utils.DurationPoint;
/**
......@@ -28,6 +30,10 @@ import org.apache.skywalking.apm.collector.storage.utils.DurationPoint;
*/
public interface IInstanceMetricUIDAO extends DAO {
List<AppServerInfo> getTopNServerThroughput(int applicationId, Step step, long start, long end, long secondBetween,
int topN,
MetricSource metricSource);
List<Integer> getServerTPSTrend(int instanceId, Step step, List<DurationPoint> durationPoints);
List<Integer> getResponseTimeTrend(int instanceId, Step step, List<DurationPoint> durationPoints);
......
......@@ -25,6 +25,8 @@ import java.util.List;
*/
public class AppServerInfo {
private int id;
private int applicationId;
private String applicationCode;
private String osInfo;
private String name;
private int tps;
......@@ -40,6 +42,22 @@ public class AppServerInfo {
this.id = id;
}
public int getApplicationId() {
return applicationId;
}
public void setApplicationId(int applicationId) {
this.applicationId = applicationId;
}
public String getApplicationCode() {
return applicationCode;
}
public void setApplicationCode(String applicationCode) {
this.applicationCode = applicationCode;
}
public String getOsInfo() {
return osInfo;
}
......
......@@ -18,8 +18,10 @@
package org.apache.skywalking.apm.collector.storage.es.dao.ui;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.dao.ui.IInstanceMetricUIDAO;
......@@ -27,21 +29,85 @@ import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.apache.skywalking.apm.collector.storage.table.MetricSource;
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetricTable;
import org.apache.skywalking.apm.collector.storage.ui.common.Step;
import org.apache.skywalking.apm.collector.storage.ui.server.AppServerInfo;
import org.apache.skywalking.apm.collector.storage.utils.DurationPoint;
import org.apache.skywalking.apm.collector.storage.utils.TimePyramidTableNameBuilder;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders;
/**
* @author peng-yongsheng
*/
public class InstanceMetricEsUIDAO extends EsDAO implements IInstanceMetricUIDAO {
private static final String AVG_TPS = "avg_tps";
public InstanceMetricEsUIDAO(ElasticSearchClient client) {
super(client);
}
@Override public List<AppServerInfo> getTopNServerThroughput(int applicationId, Step step, long start, long end,
long secondBetween, int topN, MetricSource metricSource) {
String tableName = TimePyramidTableNameBuilder.build(step, InstanceMetricTable.TABLE);
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(tableName);
searchRequestBuilder.setTypes(InstanceMetricTable.TABLE_TYPE);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(QueryBuilders.rangeQuery(InstanceMetricTable.COLUMN_TIME_BUCKET).gte(start).lte(end));
if (applicationId != 0) {
boolQuery.must().add(QueryBuilders.termQuery(InstanceMetricTable.COLUMN_APPLICATION_ID, applicationId));
}
boolQuery.must().add(QueryBuilders.termQuery(InstanceMetricTable.COLUMN_SOURCE_VALUE, metricSource.getValue()));
searchRequestBuilder.setQuery(boolQuery);
searchRequestBuilder.setSize(0);
TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(InstanceMetricTable.COLUMN_INSTANCE_ID).field(InstanceMetricTable.COLUMN_INSTANCE_ID).size(topN);
aggregationBuilder.subAggregation(AggregationBuilders.sum(InstanceMetricTable.COLUMN_TRANSACTION_CALLS).field(InstanceMetricTable.COLUMN_TRANSACTION_CALLS));
aggregationBuilder.subAggregation(AggregationBuilders.sum(InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS).field(InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS));
Map<String, String> bucketsPathsMap = new HashMap<>();
bucketsPathsMap.put(InstanceMetricTable.COLUMN_TRANSACTION_CALLS, InstanceMetricTable.COLUMN_TRANSACTION_CALLS);
bucketsPathsMap.put(InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS);
String idOrCode = "(params." + InstanceMetricTable.COLUMN_TRANSACTION_CALLS + " - params." + InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS + ")"
+ " / "
+ "( " + secondBetween + " )";
Script script = new Script(idOrCode);
aggregationBuilder.subAggregation(PipelineAggregatorBuilders.bucketScript(AVG_TPS, bucketsPathsMap, script));
searchRequestBuilder.addAggregation(aggregationBuilder);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
List<AppServerInfo> appServerInfos = new LinkedList<>();
Terms serviceIdTerms = searchResponse.getAggregations().get(InstanceMetricTable.COLUMN_INSTANCE_ID);
serviceIdTerms.getBuckets().forEach(serviceIdTerm -> {
int instanceId = serviceIdTerm.getKeyAsNumber().intValue();
AppServerInfo appServerInfo = new AppServerInfo();
InternalSimpleValue simpleValue = serviceIdTerm.getAggregations().get(AVG_TPS);
appServerInfo.setId(instanceId);
appServerInfo.setTps((int)simpleValue.getValue());
appServerInfos.add(appServerInfo);
});
return appServerInfos;
}
@Override public List<Integer> getServerTPSTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
String tableName = TimePyramidTableNameBuilder.build(step, InstanceMetricTable.TABLE);
......
......@@ -31,6 +31,7 @@ import org.apache.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.apache.skywalking.apm.collector.storage.table.MetricSource;
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetricTable;
import org.apache.skywalking.apm.collector.storage.ui.common.Step;
import org.apache.skywalking.apm.collector.storage.ui.server.AppServerInfo;
import org.apache.skywalking.apm.collector.storage.utils.DurationPoint;
import org.apache.skywalking.apm.collector.storage.utils.TimePyramidTableNameBuilder;
import org.slf4j.Logger;
......@@ -48,6 +49,11 @@ public class InstanceMetricH2UIDAO extends H2DAO implements IInstanceMetricUIDAO
super(client);
}
@Override public List<AppServerInfo> getTopNServerThroughput(int applicationId, Step step, long start, long end,
long secondBetween, int topN, MetricSource metricSource) {
return null;
}
@Override public List<Integer> getServerTPSTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
H2Client client = getClient();
String tableName = TimePyramidTableNameBuilder.build(step, InstanceMetricTable.TABLE);
......
......@@ -34,6 +34,7 @@ import org.apache.skywalking.apm.collector.ui.service.AlarmService;
import org.apache.skywalking.apm.collector.ui.service.ApplicationService;
import org.apache.skywalking.apm.collector.ui.service.ClusterTopologyService;
import org.apache.skywalking.apm.collector.ui.service.NetworkAddressService;
import org.apache.skywalking.apm.collector.ui.service.ServerService;
import org.apache.skywalking.apm.collector.ui.service.ServiceNameService;
import org.apache.skywalking.apm.collector.ui.utils.DurationUtils;
......@@ -47,6 +48,7 @@ public class OverViewLayerQuery implements Query {
private ApplicationService applicationService;
private NetworkAddressService networkAddressService;
private ServiceNameService serviceNameService;
private ServerService serverService;
private AlarmService alarmService;
public OverViewLayerQuery(ModuleManager moduleManager) {
......@@ -88,6 +90,13 @@ public class OverViewLayerQuery implements Query {
return alarmService;
}
private ServerService getServerService() {
if (ObjectUtils.isEmpty(serverService)) {
this.serverService = new ServerService(moduleManager);
}
return serverService;
}
public Topology getClusterTopology(Duration duration) throws ParseException {
long start = DurationUtils.INSTANCE.durationToSecondTimeBucket(duration.getStep(), duration.getStart());
long end = DurationUtils.INSTANCE.durationToSecondTimeBucket(duration.getStep(), duration.getEnd());
......@@ -111,6 +120,7 @@ public class OverViewLayerQuery implements Query {
public AlarmTrend getAlarmTrend(Duration duration) throws ParseException {
long start = DurationUtils.INSTANCE.durationToSecondTimeBucket(duration.getStep(), duration.getStart());
long end = DurationUtils.INSTANCE.durationToSecondTimeBucket(duration.getStep(), duration.getEnd());
return getAlarmService().getApplicationAlarmTrend(duration.getStep(), start, end);
}
......@@ -121,10 +131,15 @@ public class OverViewLayerQuery implements Query {
public List<ServiceMetric> getTopNSlowService(Duration duration, int topN) throws ParseException {
long start = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
long end = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
return getServiceNameService().getSlowService(duration.getStep(), start, end, topN);
}
public List<AppServerInfo> getTopNServerThroughput(Duration duration, int topN) {
return null;
public List<AppServerInfo> getTopNServerThroughput(int applicationId, Duration duration,
int topN) throws ParseException {
long start = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
long end = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
return getServerService().getTopNServerThroughput(applicationId, duration.getStep(), start, end, topN);
}
}
......@@ -24,6 +24,9 @@ import com.google.gson.JsonObject;
import java.text.ParseException;
import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.apm.collector.cache.CacheModule;
import org.apache.skywalking.apm.collector.cache.service.ApplicationCacheService;
import org.apache.skywalking.apm.collector.cache.service.InstanceCacheService;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.StringUtils;
......@@ -33,6 +36,8 @@ import org.apache.skywalking.apm.collector.storage.dao.ui.IGCMetricUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ui.IInstanceMetricUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ui.IInstanceUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ui.IMemoryMetricUIDAO;
import org.apache.skywalking.apm.collector.storage.table.MetricSource;
import org.apache.skywalking.apm.collector.storage.table.register.Instance;
import org.apache.skywalking.apm.collector.storage.ui.common.ResponseTimeTrend;
import org.apache.skywalking.apm.collector.storage.ui.common.Step;
import org.apache.skywalking.apm.collector.storage.ui.common.ThroughputTrend;
......@@ -54,6 +59,8 @@ public class ServerService {
private final ICpuMetricUIDAO cpuMetricUIDAO;
private final IGCMetricUIDAO gcMetricUIDAO;
private final IMemoryMetricUIDAO memoryMetricUIDAO;
private final InstanceCacheService instanceCacheService;
private final ApplicationCacheService applicationCacheService;
public ServerService(ModuleManager moduleManager) {
this.instanceUIDAO = moduleManager.find(StorageModule.NAME).getService(IInstanceUIDAO.class);
......@@ -61,6 +68,20 @@ public class ServerService {
this.cpuMetricUIDAO = moduleManager.find(StorageModule.NAME).getService(ICpuMetricUIDAO.class);
this.gcMetricUIDAO = moduleManager.find(StorageModule.NAME).getService(IGCMetricUIDAO.class);
this.memoryMetricUIDAO = moduleManager.find(StorageModule.NAME).getService(IMemoryMetricUIDAO.class);
this.instanceCacheService = moduleManager.find(CacheModule.NAME).getService(InstanceCacheService.class);
this.applicationCacheService = moduleManager.find(CacheModule.NAME).getService(ApplicationCacheService.class);
}
public List<AppServerInfo> getTopNServerThroughput(int applicationId, Step step, long start, long end, int topN) {
//TODO
List<AppServerInfo> appServerInfos = instanceMetricUIDAO.getTopNServerThroughput(applicationId, step, start, end, 1000, topN, MetricSource.Callee);
appServerInfos.forEach(appServerInfo -> {
Instance instance = instanceUIDAO.getInstance(appServerInfo.getId());
appServerInfo.setOsInfo(instance.getOsInfo());
});
buildAppServerInfo(appServerInfos);
return appServerInfos;
}
public List<AppServerInfo> searchServer(String keyword, long start, long end) {
......@@ -132,6 +153,9 @@ public class ServerService {
private void buildAppServerInfo(List<AppServerInfo> serverInfos) {
serverInfos.forEach(serverInfo -> {
int applicationId = instanceCacheService.getApplicationId(serverInfo.getId());
serverInfo.setApplicationId(applicationId);
serverInfo.setApplicationCode(applicationCacheService.getApplicationById(applicationId).getApplicationCode());
if (StringUtils.isNotEmpty(serverInfo.getOsInfo())) {
JsonObject osInfoJson = gson.fromJson(serverInfo.getOsInfo(), JsonObject.class);
if (osInfoJson.has("osName")) {
......
......@@ -33,5 +33,5 @@ extend type Query {
getAlarmTrend(duration: Duration!): AlarmTrend
getConjecturalApps(duration: Duration!): ConjecturalAppBrief
getTopNSlowService(duration: Duration!, topN: Int!): [ServiceMetric!]!
getTopNServerThroughput(duration: Duration!, topN: Int!): [AppServerInfo!]!
getTopNServerThroughput(applicationId: Int, duration: Duration!, topN: Int!): [AppServerInfo!]!
}
......@@ -6,6 +6,8 @@
type AppServerInfo {
id: ID!
name: String!
applicationId: Int!
applicationName: String
tps: Int!
host: String
pid: Int
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册