提交 7e149ab3 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

Metric query. (#1677)

上级 366609a4
......@@ -30,6 +30,7 @@ import org.apache.skywalking.oap.server.core.storage.annotation.Column;
public abstract class Indicator extends StreamData implements StorageData {
public static final String TIME_BUCKET = "time_bucket";
public static final String ENTITY_ID = "entity_id";
@Getter @Setter @Column(columnName = TIME_BUCKET) private long timeBucket;
......
......@@ -18,12 +18,9 @@
package org.apache.skywalking.oap.server.core.analysis.indicator;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.ConstOne;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.Entrance;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorOperator;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.SourceFrom;
import lombok.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.*;
import org.apache.skywalking.oap.server.core.query.sql.Function;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
/**
......@@ -38,7 +35,7 @@ public abstract class LongAvgIndicator extends Indicator implements LongValueHol
@Getter @Setter @Column(columnName = SUMMATION) private long summation;
@Getter @Setter @Column(columnName = COUNT) private int count;
@Getter @Setter @Column(columnName = VALUE) private long value;
@Getter @Setter @Column(columnName = VALUE, isValue = true, function = Function.Avg) private long value;
@Entrance
public final void combine(@SourceFrom long summation, @ConstOne int count) {
......
......@@ -18,16 +18,9 @@
package org.apache.skywalking.oap.server.core.analysis.indicator;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.Arg;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.Entrance;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorOperator;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.SourceFrom;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.*;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
/**
......@@ -44,7 +37,7 @@ import org.apache.skywalking.oap.server.core.storage.annotation.Column;
public abstract class ThermodynamicIndicator extends Indicator {
protected static final String DETAIL_GROUP = "detail_group";
protected static final String STEP = "step";
protected static final String NUM_OF_STEPS = "num_of_steps";
public static final String NUM_OF_STEPS = "num_of_steps";
@Getter @Setter @Column(columnName = STEP) private int step = 0;
@Getter @Setter @Column(columnName = NUM_OF_STEPS) private int numOfSteps = 0;
......
......@@ -16,19 +16,31 @@
*
*/
package org.apache.skywalking.oap.query.graphql.util;
import org.apache.skywalking.oap.server.core.Const;
package org.apache.skywalking.oap.server.core.query;
/**
* @author peng-yongsheng
*/
public enum DurationUtils {
INSTANCE;
public class DurationPoint {
private long point;
private long secondsBetween;
private long minutesBetween;
public DurationPoint(long point, long secondsBetween, long minutesBetween) {
this.point = point;
this.secondsBetween = secondsBetween;
this.minutesBetween = minutesBetween;
}
public long getPoint() {
return point;
}
public long getSecondsBetween() {
return secondsBetween;
}
public long exchangeToTimeBucket(String dateStr) {
dateStr = dateStr.replaceAll("-", Const.EMPTY_STRING);
dateStr = dateStr.replaceAll(" ", Const.EMPTY_STRING);
return Long.valueOf(dateStr);
public long getMinutesBetween() {
return minutesBetween;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.query;
import java.text.*;
import java.util.*;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.query.entity.Step;
import org.joda.time.*;
/**
* @author peng-yongsheng
*/
public enum DurationUtils {
INSTANCE;
public long exchangeToTimeBucket(String dateStr) {
dateStr = dateStr.replaceAll("-", Const.EMPTY_STRING);
dateStr = dateStr.replaceAll(" ", Const.EMPTY_STRING);
return Long.valueOf(dateStr);
}
public long startTimeDurationToSecondTimeBucket(Step step, String dateStr) {
long secondTimeBucket = 0;
switch (step) {
case MONTH:
secondTimeBucket = exchangeToTimeBucket(dateStr) * 100 * 100 * 100 * 100;
break;
case DAY:
secondTimeBucket = exchangeToTimeBucket(dateStr) * 100 * 100 * 100;
break;
case HOUR:
secondTimeBucket = exchangeToTimeBucket(dateStr) * 100 * 100;
break;
case MINUTE:
secondTimeBucket = exchangeToTimeBucket(dateStr) * 100;
break;
case SECOND:
secondTimeBucket = exchangeToTimeBucket(dateStr);
break;
}
return secondTimeBucket;
}
public long endTimeDurationToSecondTimeBucket(Step step, String dateStr) {
long secondTimeBucket = 0;
switch (step) {
case MONTH:
secondTimeBucket = (((exchangeToTimeBucket(dateStr) * 100 + 99) * 100 + 99) * 100 + 99) * 100 + 99;
break;
case DAY:
secondTimeBucket = ((exchangeToTimeBucket(dateStr) * 100 + 99) * 100 + 99) * 100 + 99;
break;
case HOUR:
secondTimeBucket = (exchangeToTimeBucket(dateStr) * 100 + 99) * 100 + 99;
break;
case MINUTE:
secondTimeBucket = exchangeToTimeBucket(dateStr) * 100 + 99;
break;
case SECOND:
secondTimeBucket = exchangeToTimeBucket(dateStr);
break;
}
return secondTimeBucket;
}
public int minutesBetween(Step step, long startTimeBucket, long endTimeBucket) throws ParseException {
Date startDate = formatDate(step, startTimeBucket);
Date endDate = formatDate(step, endTimeBucket);
return Minutes.minutesBetween(new DateTime(startDate), new DateTime(endDate)).getMinutes();
}
public int minutesBetween(Step step, DateTime dateTime) {
switch (step) {
case MONTH:
return dateTime.dayOfMonth().getMaximumValue() * 24 * 60;
case DAY:
return 24 * 60;
case HOUR:
return 60;
case MINUTE:
return 1;
case SECOND:
return 1;
default:
return 1;
}
}
public int secondsBetween(Step step, long startTimeBucket, long endTimeBucket) throws ParseException {
Date startDate = formatDate(step, startTimeBucket);
Date endDate = formatDate(step, endTimeBucket);
return Seconds.secondsBetween(new DateTime(startDate), new DateTime(endDate)).getSeconds();
}
public int secondsBetween(Step step, DateTime dateTime) {
switch (step) {
case MONTH:
return dateTime.dayOfMonth().getMaximumValue() * 24 * 60 * 60;
case DAY:
return 24 * 60 * 60;
case HOUR:
return 60 * 60;
case MINUTE:
return 60;
case SECOND:
return 1;
default:
return 1;
}
}
private Date formatDate(Step step, long timeBucket) throws ParseException {
Date date = null;
switch (step) {
case MONTH:
date = new SimpleDateFormat("yyyyMM").parse(String.valueOf(timeBucket));
break;
case DAY:
date = new SimpleDateFormat("yyyyMMdd").parse(String.valueOf(timeBucket));
break;
case HOUR:
date = new SimpleDateFormat("yyyyMMddHH").parse(String.valueOf(timeBucket));
break;
case MINUTE:
date = new SimpleDateFormat("yyyyMMddHHmm").parse(String.valueOf(timeBucket));
break;
case SECOND:
date = new SimpleDateFormat("yyyyMMddHHmmss").parse(String.valueOf(timeBucket));
break;
}
return date;
}
public DateTime parseToDateTime(Step step, long time) throws ParseException {
DateTime dateTime = null;
switch (step) {
case MONTH:
Date date = new SimpleDateFormat("yyyyMM").parse(String.valueOf(time));
dateTime = new DateTime(date);
break;
case DAY:
date = new SimpleDateFormat("yyyyMMdd").parse(String.valueOf(time));
dateTime = new DateTime(date);
break;
case HOUR:
date = new SimpleDateFormat("yyyyMMddHH").parse(String.valueOf(time));
dateTime = new DateTime(date);
break;
case MINUTE:
date = new SimpleDateFormat("yyyyMMddHHmm").parse(String.valueOf(time));
dateTime = new DateTime(date);
break;
case SECOND:
date = new SimpleDateFormat("yyyyMMddHHmmss").parse(String.valueOf(time));
dateTime = new DateTime(date);
break;
}
return dateTime;
}
public List<DurationPoint> getDurationPoints(Step step, long startTimeBucket,
long endTimeBucket) throws ParseException {
DateTime dateTime = parseToDateTime(step, startTimeBucket);
List<DurationPoint> durations = new LinkedList<>();
durations.add(new DurationPoint(startTimeBucket, secondsBetween(step, dateTime), minutesBetween(step, dateTime)));
int i = 0;
do {
switch (step) {
case MONTH:
dateTime = dateTime.plusMonths(1);
String timeBucket = new SimpleDateFormat("yyyyMM").format(dateTime.toDate());
durations.add(new DurationPoint(Long.valueOf(timeBucket), secondsBetween(step, dateTime), minutesBetween(step, dateTime)));
break;
case DAY:
dateTime = dateTime.plusDays(1);
timeBucket = new SimpleDateFormat("yyyyMMdd").format(dateTime.toDate());
durations.add(new DurationPoint(Long.valueOf(timeBucket), secondsBetween(step, dateTime), minutesBetween(step, dateTime)));
break;
case HOUR:
dateTime = dateTime.plusHours(1);
timeBucket = new SimpleDateFormat("yyyyMMddHH").format(dateTime.toDate());
durations.add(new DurationPoint(Long.valueOf(timeBucket), secondsBetween(step, dateTime), minutesBetween(step, dateTime)));
break;
case MINUTE:
dateTime = dateTime.plusMinutes(1);
timeBucket = new SimpleDateFormat("yyyyMMddHHmm").format(dateTime.toDate());
durations.add(new DurationPoint(Long.valueOf(timeBucket), secondsBetween(step, dateTime), minutesBetween(step, dateTime)));
break;
case SECOND:
dateTime = dateTime.plusSeconds(1);
timeBucket = new SimpleDateFormat("yyyyMMddHHmmss").format(dateTime.toDate());
durations.add(new DurationPoint(Long.valueOf(timeBucket), secondsBetween(step, dateTime), minutesBetween(step, dateTime)));
break;
}
i++;
if (i > 500) {
throw new UnexpectedException("Duration data error, step: " + step.name() + ", start: " + startTimeBucket + ", end: " + endTimeBucket);
}
}
while (endTimeBucket != durations.get(durations.size() - 1).getPoint());
return durations;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.query;
import java.io.IOException;
import java.text.ParseException;
import java.util.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.query.sql.*;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnIds;
import org.apache.skywalking.oap.server.core.storage.query.IMetricQueryDAO;
import org.apache.skywalking.oap.server.library.module.*;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class MetricQueryService implements Service {
private static final Logger logger = LoggerFactory.getLogger(MetricQueryService.class);
private final ModuleManager moduleManager;
private IMetricQueryDAO metricQueryDAO;
public MetricQueryService(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
private IMetricQueryDAO getMetricQueryDAO() {
if (metricQueryDAO == null) {
metricQueryDAO = moduleManager.find(StorageModule.NAME).getService(IMetricQueryDAO.class);
}
return metricQueryDAO;
}
public IntValues getValues(final String indName, final List<String> ids, final Step step, final long startTB,
final long endTB) throws IOException {
Where where = new Where();
KeyValues intKeyValues = new KeyValues();
intKeyValues.setKey(Indicator.ENTITY_ID);
where.getKeyValues().add(intKeyValues);
ids.forEach(intKeyValues.getValues()::add);
return getMetricQueryDAO().getValues(indName, step, startTB, endTB, where, ValueColumnIds.INSTANCE.getValueCName(indName), ValueColumnIds.INSTANCE.getValueFunction(indName));
}
public IntValues getLinearIntValues(final String indName, final String id, final Step step, final long startTB,
final long endTB) throws IOException, ParseException {
List<DurationPoint> durationPoints = DurationUtils.INSTANCE.getDurationPoints(step, startTB, endTB);
List<String> ids = new ArrayList<>();
durationPoints.forEach(durationPoint -> ids.add(durationPoint.getPoint() + Const.ID_SPLIT + id));
return getMetricQueryDAO().getLinearIntValues(indName, step, ids, ValueColumnIds.INSTANCE.getValueCName(indName));
}
public Thermodynamic getThermodynamic(final String indName, final String id, final Step step, final long startTB,
final long endTB) throws IOException, ParseException {
List<DurationPoint> durationPoints = DurationUtils.INSTANCE.getDurationPoints(step, startTB, endTB);
List<String> ids = new ArrayList<>();
durationPoints.forEach(durationPoint -> ids.add(durationPoint.getPoint() + Const.ID_SPLIT + id));
return getMetricQueryDAO().getThermodynamic(indName, step, ids, ValueColumnIds.INSTANCE.getValueCName(indName));
}
}
......@@ -16,11 +16,14 @@
*
*/
package org.apache.skywalking.oap.query.graphql.type;
package org.apache.skywalking.oap.server.core.query.entity;
import java.util.List;
import java.util.*;
import lombok.*;
@Getter
@Setter
public class Thermodynamic {
private List<List<Long>> nodes;
private List<List<Long>> nodes = new ArrayList<>();
private int axisYStep;
}
......@@ -22,5 +22,5 @@ package org.apache.skywalking.oap.server.core.query.sql;
* @author peng-yongsheng
*/
public enum Function {
Avg, Sum
None, Avg, Sum
}
......@@ -24,7 +24,7 @@ import lombok.*;
/**
* @author peng-yongsheng
*/
public class IntKeyValues {
public class KeyValues {
@Getter @Setter private String key;
@Getter private List<Integer> values = new LinkedList<>();
@Getter private List<String> values = new LinkedList<>();
}
......@@ -26,5 +26,5 @@ import lombok.Getter;
*/
@Getter
public class Where {
private List<IntKeyValues> keyValues = new LinkedList<>();
private List<KeyValues> keyValues = new LinkedList<>();
}
......@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core.storage.annotation;
import java.lang.annotation.*;
import org.apache.skywalking.oap.server.core.query.sql.Function;
/**
* @author peng-yongsheng
......@@ -28,6 +29,10 @@ import java.lang.annotation.*;
public @interface Column {
String columnName();
boolean isValue() default false;
Function function() default Function.None;
boolean matchQuery() default false;
boolean termQuery() default true;
......
......@@ -46,14 +46,15 @@ public class StorageAnnotationListener implements AnnotationListener, IModelGett
@Override public void notify(Class aClass) {
logger.info("The owner class of storage annotation, class name: {}", aClass.getName());
String modelName = StorageEntityAnnotationUtils.getModelName(aClass);
List<ModelColumn> modelColumns = new LinkedList<>();
retrieval(aClass, modelColumns);
retrieval(aClass, modelName, modelColumns);
String modelName = StorageEntityAnnotationUtils.getModelName(aClass);
models.add(new Model(modelName, modelColumns));
}
private void retrieval(Class clazz, List<ModelColumn> modelColumns) {
private void retrieval(Class clazz, String modelName, List<ModelColumn> modelColumns) {
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
......@@ -63,11 +64,14 @@ public class StorageAnnotationListener implements AnnotationListener, IModelGett
if (logger.isDebugEnabled()) {
logger.debug("The field named {} with the {} type", column.columnName(), field.getType());
}
if (column.isValue()) {
ValueColumnIds.INSTANCE.putIfAbsent(modelName, column.columnName(), column.function());
}
}
}
if (Objects.nonNull(clazz.getSuperclass())) {
retrieval(clazz.getSuperclass(), modelColumns);
retrieval(clazz.getSuperclass(), modelName, modelColumns);
}
}
}
......@@ -16,19 +16,38 @@
*
*/
package org.apache.skywalking.oap.server.core.storage.query;
package org.apache.skywalking.oap.server.core.storage.annotation;
import java.io.IOException;
import java.util.List;
import org.apache.skywalking.oap.server.core.query.entity.Step;
import org.apache.skywalking.oap.server.core.query.sql.Where;
import org.apache.skywalking.oap.server.core.storage.DAO;
import java.util.*;
import org.apache.skywalking.oap.server.core.query.sql.Function;
/**
* @author peng-yongsheng
*/
public interface IUniqueQueryDAO extends DAO {
public enum ValueColumnIds {
INSTANCE;
List<TwoIdGroup> aggregation(String indName, Step step, long startTB,
long endTB, Where where, String idCName1, String idCName2) throws IOException;
private Map<String, ValueColumn> mapping = new HashMap<>();
public void putIfAbsent(String indName, String valueCName, Function function) {
mapping.putIfAbsent(indName, new ValueColumn(valueCName, function));
}
public String getValueCName(String indName) {
return mapping.get(indName).valueCName;
}
public Function getValueFunction(String indName) {
return mapping.get(indName).function;
}
class ValueColumn {
private final String valueCName;
private final Function function;
private ValueColumn(String valueCName, Function function) {
this.valueCName = valueCName;
this.function = function;
}
}
}
......@@ -20,7 +20,7 @@ package org.apache.skywalking.oap.server.core.storage.query;
import java.io.IOException;
import java.util.List;
import org.apache.skywalking.oap.server.core.query.entity.Step;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.query.sql.*;
import org.apache.skywalking.oap.server.core.storage.DAO;
......@@ -29,10 +29,10 @@ import org.apache.skywalking.oap.server.core.storage.DAO;
*/
public interface IMetricQueryDAO extends DAO {
List<OneIdGroupValue> aggregation(String indName, Step step, long startTB,
long endTB, Where where, String idCName, String valueCName, Function function) throws IOException;
IntValues getValues(String indName, Step step, long startTB,
long endTB, Where where, String valueCName, Function function) throws IOException;
List<TwoIdGroupValue> aggregation(String indName, Step step, long startTB,
long endTB, Where where, String idCName1, String idCName2, String valueCName,
Function function) throws IOException;
IntValues getLinearIntValues(String indName, Step step, List<String> ids, String valueCName) throws IOException;
Thermodynamic getThermodynamic(String indName, Step step, List<String> ids, String valueCName) throws IOException;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage.query;
import lombok.*;
/**
* @author peng-yongsheng
*/
@Getter
@Setter
public class OneIdGroupValue {
private int id;
private Number value;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage.query;
import lombok.*;
/**
* @author peng-yongsheng
*/
@Getter
@Setter
public class TwoIdGroup {
private int id1;
private int id2;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage.query;
import lombok.*;
/**
* @author peng-yongsheng
*/
@Getter
@Setter
public class TwoIdGroupValue {
private int id1;
private int id2;
private Number value;
}
......@@ -128,6 +128,13 @@ public class ElasticSearchClient implements Client {
return client.get(request);
}
public MultiGetResponse multiGet(String indexName, List<String> ids) throws IOException {
final String newIndexName = formatIndexName(indexName);
MultiGetRequest request = new MultiGetRequest();
ids.forEach(id -> request.add(newIndexName, TYPE, id));
return client.multiGet(request);
}
public void forceInsert(String indexName, String id, XContentBuilder source) throws IOException {
IndexRequest request = prepareInsert(indexName, id, source);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
......
......@@ -57,7 +57,7 @@ public class GraphQLQueryProvider extends ModuleProvider {
.file("query-protocol/metadata.graphqls")
.resolvers(new MetadataQuery())
.file("query-protocol/metric.graphqls")
.resolvers(new MetricQuery())
.resolvers(new MetricQuery(getManager()))
.file("query-protocol/topology.graphqls")
.resolvers(new TopologyQuery(getManager()))
.file("query-protocol/trace.graphqls")
......
......@@ -19,20 +19,50 @@
package org.apache.skywalking.oap.query.graphql.resolver;
import com.coxautodev.graphql.tools.GraphQLQueryResolver;
import java.io.IOException;
import java.text.ParseException;
import org.apache.skywalking.oap.query.graphql.type.*;
import org.apache.skywalking.oap.server.core.query.entity.IntValues;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.query.*;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
public class MetricQuery implements GraphQLQueryResolver {
public IntValues getValues(final BatchMetricConditions metric, final Duration duration) {
return new IntValues();
private final ModuleManager moduleManager;
private MetricQueryService metricQueryService;
public MetricQuery(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
private MetricQueryService getMetricQueryService() {
if (metricQueryService == null) {
this.metricQueryService = moduleManager.find(CoreModule.NAME).getService(MetricQueryService.class);
}
return metricQueryService;
}
public IntValues getValues(final BatchMetricConditions metric, final Duration duration) throws IOException {
long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
return getMetricQueryService().getValues(metric.getName(), metric.getIds(), duration.getStep(), startTimeBucket, endTimeBucket);
}
public IntValues getLinearIntValues(final MetricCondition metric, final Duration duration) {
return new IntValues();
public IntValues getLinearIntValues(final MetricCondition metric,
final Duration duration) throws IOException, ParseException {
long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
return getMetricQueryService().getLinearIntValues(metric.getName(), metric.getId(), duration.getStep(), startTimeBucket, endTimeBucket);
}
public Thermodynamic getThermodynamic(final MetricCondition metric, final Duration duration) {
return new Thermodynamic();
public Thermodynamic getThermodynamic(final MetricCondition metric,
final Duration duration) throws IOException, ParseException {
long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
return getMetricQueryService().getThermodynamic(metric.getName(), metric.getId(), duration.getStep(), startTimeBucket, endTimeBucket);
}
}
......@@ -21,9 +21,8 @@ package org.apache.skywalking.oap.query.graphql.resolver;
import com.coxautodev.graphql.tools.GraphQLQueryResolver;
import java.io.IOException;
import org.apache.skywalking.oap.query.graphql.type.Duration;
import org.apache.skywalking.oap.query.graphql.util.DurationUtils;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
import org.apache.skywalking.oap.server.core.query.*;
import org.apache.skywalking.oap.server.core.query.entity.Topology;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
......
......@@ -17,11 +17,14 @@
*/
package org.apache.skywalking.oap.query.graphql.type;
import java.util.List;
import java.util.*;
import lombok.Getter;
/**
* @author liu-xinyuan
* @author liu-xinyuan
**/
@Getter
public class BatchMetricConditions {
private List<MetricCondition> metricConditions;
private String name;
private List<String> ids = new ArrayList<>();
}
......@@ -18,6 +18,10 @@
package org.apache.skywalking.oap.query.graphql.type;
import lombok.*;
@Getter
@Setter
public class MetricCondition {
private String id;
private String name;
......
......@@ -20,12 +20,14 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
import java.io.IOException;
import java.util.*;
import org.apache.skywalking.oap.server.core.query.entity.Step;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.query.sql.*;
import org.apache.skywalking.oap.server.core.storage.TimePyramidTableNameBuilder;
import org.apache.skywalking.oap.server.core.storage.query.*;
import org.apache.skywalking.oap.server.core.storage.query.IMetricQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.elasticsearch.action.get.*;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.*;
......@@ -40,73 +42,32 @@ public class MetricQueryEsDAO extends EsDAO implements IMetricQueryDAO {
super(client);
}
public List<OneIdGroupValue> aggregation(String indName, Step step, long startTB,
long endTB, Where where, String idCName, String valueCName, Function function) throws IOException {
public IntValues getValues(String indName, Step step, long startTB, long endTB, Where where, String valueCName,
Function function) throws IOException {
String indexName = TimePyramidTableNameBuilder.build(step, indName);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
queryBuild(sourceBuilder, where, startTB, endTB);
TermsAggregationBuilder aggIdCName1 = AggregationBuilders.terms(idCName).field(idCName).size(1000);
functionAggregation(function, aggIdCName1, valueCName);
TermsAggregationBuilder entityIdAggregation = AggregationBuilders.terms(Indicator.ENTITY_ID).field(Indicator.ENTITY_ID).size(1000);
functionAggregation(function, entityIdAggregation, valueCName);
sourceBuilder.aggregation(aggIdCName1);
sourceBuilder.aggregation(entityIdAggregation);
SearchResponse response = getClient().search(indexName, sourceBuilder);
List<OneIdGroupValue> values = new ArrayList<>();
Terms idTerms = response.getAggregations().get(idCName);
IntValues intValues = new IntValues();
Terms idTerms = response.getAggregations().get(Indicator.ENTITY_ID);
for (Terms.Bucket idBucket : idTerms.getBuckets()) {
Terms valueTerms = idBucket.getAggregations().get(valueCName);
for (Terms.Bucket valueBucket : valueTerms.getBuckets()) {
OneIdGroupValue value = new OneIdGroupValue();
value.setId(idBucket.getKeyAsNumber().intValue());
value.setValue(valueBucket.getKeyAsNumber());
values.add(value);
KVInt value = new KVInt();
value.setId(idBucket.getKeyAsString());
value.setValue(valueBucket.getKeyAsNumber().intValue());
intValues.getValues().add(value);
}
}
return values;
}
public List<TwoIdGroupValue> aggregation(String indName, Step step, long startTB,
long endTB, Where where, String idCName1, String idCName2, String valueCName,
Function function) throws IOException {
String indexName = TimePyramidTableNameBuilder.build(step, indName);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
queryBuild(sourceBuilder, where, startTB, endTB);
sourceBuilder.aggregation(
AggregationBuilders.terms(idCName1).field(idCName1).size(1000)
.subAggregation(AggregationBuilders.terms(idCName2).field(idCName2).size(1000)
.subAggregation(AggregationBuilders.avg(valueCName).field(valueCName)))
);
TermsAggregationBuilder aggIdCName1 = AggregationBuilders.terms(idCName1).field(idCName1).size(1000);
TermsAggregationBuilder aggIdCName2 = AggregationBuilders.terms(idCName2).field(idCName2).size(1000);
aggIdCName1.subAggregation(aggIdCName2);
functionAggregation(function, aggIdCName2, valueCName);
sourceBuilder.aggregation(aggIdCName1);
SearchResponse response = getClient().search(indexName, sourceBuilder);
List<TwoIdGroupValue> values = new ArrayList<>();
Terms id1Terms = response.getAggregations().get(idCName1);
for (Terms.Bucket id1Bucket : id1Terms.getBuckets()) {
Terms id2Terms = id1Bucket.getAggregations().get(idCName2);
for (Terms.Bucket id2Bucket : id2Terms.getBuckets()) {
Terms valueTerms = id1Bucket.getAggregations().get(valueCName);
for (Terms.Bucket valueBucket : valueTerms.getBuckets()) {
TwoIdGroupValue value = new TwoIdGroupValue();
value.setId1(id1Bucket.getKeyAsNumber().intValue());
value.setId1(id2Bucket.getKeyAsNumber().intValue());
value.setValue(valueBucket.getKeyAsNumber());
values.add(value);
}
}
}
return values;
return intValues;
}
private void functionAggregation(Function function, TermsAggregationBuilder parentAggBuilder, String valueCName) {
......@@ -119,4 +80,36 @@ public class MetricQueryEsDAO extends EsDAO implements IMetricQueryDAO {
break;
}
}
@Override public IntValues getLinearIntValues(String indName, Step step, List<String> ids,
String valueCName) throws IOException {
String indexName = TimePyramidTableNameBuilder.build(step, indName);
MultiGetResponse response = getClient().multiGet(indexName, ids);
IntValues intValues = new IntValues();
for (MultiGetItemResponse itemResponse : response.getResponses()) {
int value = ((Number)itemResponse.getResponse().getSource().getOrDefault(valueCName, 0)).intValue();
KVInt kvInt = new KVInt();
kvInt.setId(itemResponse.getId());
kvInt.setValue(value);
intValues.getValues().add(kvInt);
}
return intValues;
}
@Override public Thermodynamic getThermodynamic(String indName, Step step, List<String> ids,
String valueCName) throws IOException {
String indexName = TimePyramidTableNameBuilder.build(step, indName);
MultiGetResponse response = getClient().multiGet(indexName, ids);
Thermodynamic thermodynamic = new Thermodynamic();
for (MultiGetItemResponse itemResponse : response.getResponses()) {
List<Long> axisYValues = new ArrayList<>();
thermodynamic.getNodes().add(axisYValues);
}
return thermodynamic;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
import java.io.IOException;
import java.util.*;
import org.apache.skywalking.oap.server.core.query.entity.Step;
import org.apache.skywalking.oap.server.core.query.sql.Where;
import org.apache.skywalking.oap.server.core.storage.TimePyramidTableNameBuilder;
import org.apache.skywalking.oap.server.core.storage.query.*;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.builder.SearchSourceBuilder;
/**
* @author peng-yongsheng
*/
public class UniqueQueryEsDAO extends EsDAO implements IUniqueQueryDAO {
public UniqueQueryEsDAO(ElasticSearchClient client) {
super(client);
}
@Override public List<TwoIdGroup> aggregation(String indName, Step step, long startTB, long endTB, Where where,
String idCName1, String idCName2) throws IOException {
String indexName = TimePyramidTableNameBuilder.build(step, indName);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
queryBuild(sourceBuilder, where, startTB, endTB);
sourceBuilder.aggregation(AggregationBuilders.terms(idCName1).field(idCName1).size(1000)
.subAggregation(AggregationBuilders.terms(idCName2).field(idCName2).size(1000)));
SearchResponse response = getClient().search(indexName, sourceBuilder);
List<TwoIdGroup> values = new ArrayList<>();
Terms id1Terms = response.getAggregations().get(idCName1);
for (Terms.Bucket id1Bucket : id1Terms.getBuckets()) {
Terms id2Terms = id1Bucket.getAggregations().get(idCName2);
for (Terms.Bucket id2Bucket : id2Terms.getBuckets()) {
TwoIdGroup value = new TwoIdGroup();
value.setId1(id1Bucket.getKeyAsNumber().intValue());
value.setId2(id2Bucket.getKeyAsNumber().intValue());
values.add(value);
}
}
return values;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册