提交 53b9e7f8 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

Implementation of aggregation query. (#1702)

上级 e83e175a
......@@ -61,6 +61,7 @@ public class CoreModule extends ModuleDefine {
classes.add(MetricQueryService.class);
classes.add(TraceQueryService.class);
classes.add(MetadataQueryService.class);
classes.add(AggregationQueryService.class);
}
private void addServerInterface(List<Class> classes) {
......
......@@ -114,6 +114,7 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(MetricQueryService.class, new MetricQueryService(getManager()));
this.registerServiceImplementation(TraceQueryService.class, new TraceQueryService(getManager()));
this.registerServiceImplementation(MetadataQueryService.class, new MetadataQueryService(getManager()));
this.registerServiceImplementation(AggregationQueryService.class, new AggregationQueryService(getManager()));
annotationScan.registerListener(storageAnnotationListener);
annotationScan.registerListener(streamAnnotationListener);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.query;
import java.io.IOException;
import java.util.List;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.register.*;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* @author peng-yongsheng
*/
public class AggregationQueryService implements Service {
private final ModuleManager moduleManager;
private IAggregationQueryDAO aggregationQueryDAO;
public AggregationQueryService(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
private IAggregationQueryDAO getAggregationQueryDAO() {
if (aggregationQueryDAO == null) {
aggregationQueryDAO = moduleManager.find(StorageModule.NAME).getService(IAggregationQueryDAO.class);
}
return aggregationQueryDAO;
}
public List<TopNEntity> getServiceTopN(final String name, final int topN, final Step step, final long startTB,
final long endTB, final Order order) throws IOException {
List<TopNEntity> topNEntities = getAggregationQueryDAO().getServiceTopN(name, topN, step, startTB, endTB, order);
for (TopNEntity entity : topNEntities) {
ServiceInventory inventory = moduleManager.find(CoreModule.NAME).getService(ServiceInventoryCache.class).get(Integer.valueOf(entity.getId()));
if (inventory != null) {
entity.setName(inventory.getName());
}
}
return topNEntities;
}
public List<TopNEntity> getAllServiceInstanceTopN(final String name, final int topN, final Step step,
final long startTB, final long endTB, final Order order) throws IOException {
List<TopNEntity> topNEntities = getAggregationQueryDAO().getAllServiceInstanceTopN(name, topN, step, startTB, endTB, order);
for (TopNEntity entity : topNEntities) {
ServiceInstanceInventory inventory = moduleManager.find(CoreModule.NAME).getService(ServiceInstanceInventoryCache.class).get(Integer.valueOf(entity.getId()));
if (inventory != null) {
entity.setName(inventory.getName());
}
}
return topNEntities;
}
public List<TopNEntity> getServiceInstanceTopN(final int serviceId, final String name, final int topN,
final Step step, final long startTB, final long endTB, final Order order) throws IOException {
List<TopNEntity> topNEntities = getAggregationQueryDAO().getServiceInstanceTopN(serviceId, name, topN, step, startTB, endTB, order);
for (TopNEntity entity : topNEntities) {
ServiceInstanceInventory inventory = moduleManager.find(CoreModule.NAME).getService(ServiceInstanceInventoryCache.class).get(Integer.valueOf(entity.getId()));
if (inventory != null) {
entity.setName(inventory.getName());
}
}
return topNEntities;
}
public List<TopNEntity> getAllEndpointTopN(final String name, final int topN, final Step step,
final long startTB, final long endTB, final Order order) throws IOException {
List<TopNEntity> topNEntities = getAggregationQueryDAO().getAllEndpointTopN(name, topN, step, startTB, endTB, order);
for (TopNEntity entity : topNEntities) {
EndpointInventory inventory = moduleManager.find(CoreModule.NAME).getService(EndpointInventoryCache.class).get(Integer.valueOf(entity.getId()));
if (inventory != null) {
entity.setName(inventory.getName());
}
}
return topNEntities;
}
public List<TopNEntity> getEndpointTopN(final int serviceId, final String name, final int topN,
final Step step, final long startTB, final long endTB, final Order order) throws IOException {
List<TopNEntity> topNEntities = getAggregationQueryDAO().getEndpointTopN(serviceId, name, topN, step, startTB, endTB, order);
for (TopNEntity entity : topNEntities) {
EndpointInventory inventory = moduleManager.find(CoreModule.NAME).getService(EndpointInventoryCache.class).get(Integer.valueOf(entity.getId()));
if (inventory != null) {
entity.setName(inventory.getName());
}
}
return topNEntities;
}
}
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.query.graphql.type;
package org.apache.skywalking.oap.server.core.query.entity;
public enum Order {
ASC,
......
......@@ -16,8 +16,12 @@
*
*/
package org.apache.skywalking.oap.query.graphql.type;
package org.apache.skywalking.oap.server.core.query.entity;
import lombok.*;
@Getter
@Setter
public class TopNEntity {
private String name;
private String id;
......
......@@ -38,6 +38,6 @@ public class StorageModule extends ModuleDefine {
IBatchDAO.class, StorageDAO.class, IRegisterLockDAO.class,
IServiceInventoryCacheDAO.class, IServiceInstanceInventoryCacheDAO.class,
IEndpointInventoryCacheDAO.class, INetworkAddressInventoryCacheDAO.class,
ITopologyQueryDAO.class, IMetricQueryDAO.class, ITraceQueryDAO.class, IMetadataQueryDAO.class};
ITopologyQueryDAO.class, IMetricQueryDAO.class, ITraceQueryDAO.class, IMetadataQueryDAO.class, IAggregationQueryDAO.class};
}
}
......@@ -40,9 +40,6 @@ public class TimePyramidTableNameBuilder {
case HOUR:
tableName = tableName + Const.ID_SPLIT + TimePyramid.Hour.getName();
break;
case MINUTE:
tableName = tableName + Const.ID_SPLIT + TimePyramid.Minute.getName();
break;
}
return tableName;
}
......
......@@ -16,15 +16,30 @@
*
*/
package org.apache.skywalking.oap.query.graphql.type;
package org.apache.skywalking.oap.server.core.storage.query;
import lombok.Getter;
import java.io.IOException;
import java.util.List;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.storage.DAO;
@Getter
public class TopNCondition {
private String name;
private int topN;
private Order order;
private Scope filterScope;
private int filterId;
/**
* @author peng-yongsheng
*/
public interface IAggregationQueryDAO extends DAO {
List<TopNEntity> getServiceTopN(final String name, final int topN, final Step step, final long startTB,
final long endTB, final Order order) throws IOException;
List<TopNEntity> getAllServiceInstanceTopN(final String name, final int topN, final Step step,
final long startTB, final long endTB, final Order order) throws IOException;
List<TopNEntity> getServiceInstanceTopN(final int serviceId, final String name, final int topN,
final Step step, final long startTB, final long endTB, final Order order) throws IOException;
List<TopNEntity> getAllEndpointTopN(final String name, final int topN, final Step step,
final long startTB, final long endTB, final Order order) throws IOException;
List<TopNEntity> getEndpointTopN(final int serviceId, final String name, final int topN,
final Step step, final long startTB, final long endTB, final Order order) throws IOException;
}
......@@ -63,7 +63,7 @@ public class GraphQLQueryProvider extends ModuleProvider {
.file("query-protocol/trace.graphqls")
.resolvers(new TraceQuery(getManager()))
.file("query-protocol/aggregation.graphqls")
.resolvers(new AggregationQuery())
.resolvers(new AggregationQuery(getManager()))
.file("query-protocol/alarm.graphqls")
.resolvers(new AlarmQuery())
.build()
......
......@@ -19,11 +19,67 @@
package org.apache.skywalking.oap.query.graphql.resolver;
import com.coxautodev.graphql.tools.GraphQLQueryResolver;
import java.util.*;
import org.apache.skywalking.oap.query.graphql.type.*;
import java.io.IOException;
import java.util.List;
import org.apache.skywalking.oap.query.graphql.type.Duration;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.query.*;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
public class AggregationQuery implements GraphQLQueryResolver {
public List<TopNEntity> getTopN(final TopNCondition condition, final Duration duration) {
return Collections.emptyList();
private final ModuleManager moduleManager;
private AggregationQueryService queryService;
public AggregationQuery(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
private AggregationQueryService getQueryService() {
if (queryService == null) {
this.queryService = moduleManager.find(CoreModule.NAME).getService(AggregationQueryService.class);
}
return queryService;
}
public List<TopNEntity> getServiceTopN(final String name, final int topN, final Duration duration,
final Order order) throws IOException {
long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
return getQueryService().getServiceTopN(name, topN, duration.getStep(), startTimeBucket, endTimeBucket, order);
}
public List<TopNEntity> getAllServiceInstanceTopN(final String name, final int topN, final Duration duration,
final Order order) throws IOException {
long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
return getQueryService().getAllServiceInstanceTopN(name, topN, duration.getStep(), startTimeBucket, endTimeBucket, order);
}
public List<TopNEntity> getServiceInstanceTopN(final int serviceId, final String name, final int topN,
final Duration duration, final Order order) throws IOException {
long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
return getQueryService().getServiceInstanceTopN(serviceId, name, topN, duration.getStep(), startTimeBucket, endTimeBucket, order);
}
public List<TopNEntity> getAllEndpointTopN(final String name, final int topN,
final Duration duration, final Order order) throws IOException {
long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
return getQueryService().getAllEndpointTopN(name, topN, duration.getStep(), startTimeBucket, endTimeBucket, order);
}
public List<TopNEntity> getEndpointTopN(final int serviceId, final String name, final int topN,
final Duration duration, final Order order) throws IOException {
long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
return getQueryService().getEndpointTopN(serviceId, name, topN, duration.getStep(), startTimeBucket, endTimeBucket, order);
}
}
......@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.query.graphql.resolver;
import com.coxautodev.graphql.tools.GraphQLQueryResolver;
import org.apache.skywalking.oap.query.graphql.type.*;
import org.apache.skywalking.oap.server.core.query.entity.Pagination;
import org.apache.skywalking.oap.server.core.source.Scope;
public class AlarmQuery implements GraphQLQueryResolver {
public AlarmTrend getAlarmTrend(final Duration duration) {
......
......@@ -18,6 +18,8 @@
package org.apache.skywalking.oap.query.graphql.type;
import org.apache.skywalking.oap.server.core.source.Scope;
public class AlarmMessage {
private Scope scope;
private String id;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.query.graphql.type;
public enum Scope {
SERVICE,
SERVICE_INSTANCE,
ENDPOINT,
SERVICE_RELATION,
SERVICE_INSTANCE_RELATION,
ENDPOINT_RELATION
}
Subproject commit f2e54c2cd3f7fdb2cdc975cf791e1bb1d9aab96e
Subproject commit d71d3f183a1e498aafad8c59c8d3373408140ac9
......@@ -180,9 +180,6 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
exitSourceBuilder.setSourceEndpointName(endpointInventoryCache.get(exitSourceBuilder.getSourceEndpointId()).getName());
exitSourceBuilder.setTimeBucket(minuteTimeBucket);
sourceReceiver.receive(exitSourceBuilder.toService());
sourceReceiver.receive(exitSourceBuilder.toServiceInstance());
sourceReceiver.receive(exitSourceBuilder.toEndpoint());
sourceReceiver.receive(exitSourceBuilder.toServiceRelation());
sourceReceiver.receive(exitSourceBuilder.toServiceInstanceRelation());
sourceReceiver.receive(exitSourceBuilder.toEndpointRelation());
......
......@@ -79,6 +79,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
this.registerServiceImplementation(IMetricQueryDAO.class, new MetricQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(ITraceQueryDAO.class, new TraceQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IMetadataQueryDAO.class, new MetadataQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQueryEsDAO(elasticSearchClient));
}
@Override
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
import java.io.IOException;
import java.util.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.register.*;
import org.apache.skywalking.oap.server.core.storage.TimePyramidTableNameBuilder;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.aggregations.*;
import org.elasticsearch.search.aggregations.bucket.terms.*;
import org.elasticsearch.search.aggregations.metrics.avg.Avg;
import org.elasticsearch.search.builder.SearchSourceBuilder;
/**
* @author peng-yongsheng
*/
public class AggregationQueryEsDAO extends EsDAO implements IAggregationQueryDAO {
public AggregationQueryEsDAO(ElasticSearchClient client) {
super(client);
}
@Override
public List<TopNEntity> getServiceTopN(String name, int topN, Step step, long startTB,
long endTB, Order order) throws IOException {
String indexName = TimePyramidTableNameBuilder.build(step, name);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.query(QueryBuilders.rangeQuery(Indicator.TIME_BUCKET).lte(endTB).gte(startTB));
return aggregation(indexName, sourceBuilder, topN, order);
}
@Override public List<TopNEntity> getAllServiceInstanceTopN(String name, int topN, Step step,
long startTB, long endTB, Order order) throws IOException {
String indexName = TimePyramidTableNameBuilder.build(step, name);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.query(QueryBuilders.rangeQuery(Indicator.TIME_BUCKET).lte(endTB).gte(startTB));
return aggregation(indexName, sourceBuilder, topN, order);
}
@Override public List<TopNEntity> getServiceInstanceTopN(int serviceId, String name, int topN,
Step step, long startTB, long endTB, Order order) throws IOException {
String indexName = TimePyramidTableNameBuilder.build(step, name);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
sourceBuilder.query(boolQueryBuilder);
boolQueryBuilder.must().add(QueryBuilders.rangeQuery(Indicator.TIME_BUCKET).lte(endTB).gte(startTB));
boolQueryBuilder.must().add(QueryBuilders.termQuery(ServiceInstanceInventory.SERVICE_ID, serviceId));
return aggregation(indexName, sourceBuilder, topN, order);
}
@Override
public List<TopNEntity> getAllEndpointTopN(String name, int topN, Step step, long startTB,
long endTB, Order order) throws IOException {
String indexName = TimePyramidTableNameBuilder.build(step, name);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.query(QueryBuilders.rangeQuery(Indicator.TIME_BUCKET).lte(endTB).gte(startTB));
return aggregation(indexName, sourceBuilder, topN, order);
}
@Override
public List<TopNEntity> getEndpointTopN(int serviceId, String name, int topN, Step step,
long startTB, long endTB, Order order) throws IOException {
String indexName = TimePyramidTableNameBuilder.build(step, name);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
sourceBuilder.query(boolQueryBuilder);
boolQueryBuilder.must().add(QueryBuilders.rangeQuery(Indicator.TIME_BUCKET).lte(endTB).gte(startTB));
boolQueryBuilder.must().add(QueryBuilders.termQuery(EndpointInventory.SERVICE_ID, serviceId));
return aggregation(indexName, sourceBuilder, topN, order);
}
private List<TopNEntity> aggregation(String indexName, SearchSourceBuilder sourceBuilder, int topN,
Order order) throws IOException {
boolean asc = false;
if (order.equals(Order.ASC)) {
asc = true;
}
TermsAggregationBuilder aggregationBuilder = AggregationBuilders
.terms(Indicator.ENTITY_ID)
.field(Indicator.ENTITY_ID)
.order(BucketOrder.aggregation("value", asc))
.size(topN)
.subAggregation(
AggregationBuilders.avg("value").field("value")
);
sourceBuilder.aggregation(aggregationBuilder);
SearchResponse response = getClient().search(indexName, sourceBuilder);
List<TopNEntity> topNEntities = new ArrayList<>();
Terms idTerms = response.getAggregations().get(Indicator.ENTITY_ID);
for (Terms.Bucket termsBucket : idTerms.getBuckets()) {
TopNEntity topNEntity = new TopNEntity();
topNEntity.setId(termsBucket.getKeyAsString());
Avg value = termsBucket.getAggregations().get("value");
topNEntity.setValue((int)value.getValue());
topNEntities.add(topNEntity);
}
return topNEntities;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册