未验证 提交 68eba154 编写于 作者: G Gao Hongtao 提交者: GitHub

Add latest function (#5321)

Signed-off-by: NGao Hongtao <hanahmily@gmail.com>
上级 0c1c390f
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.meter.function;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
import org.apache.skywalking.oap.server.core.analysis.metrics.LongValueHolder;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.Entrance;
import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.SourceFrom;
import org.apache.skywalking.oap.server.core.query.sql.Function;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
@MeterFunction(functionName = "latest")
@ToString
public abstract class LatestFunction extends Metrics implements AcceptableValue<Long>, LongValueHolder {
protected static final String VALUE = "value";
@Setter
@Getter
@Column(columnName = ENTITY_ID, length = 512)
private String entityId;
/**
* Service ID is required for sort query.
*/
@Setter
@Getter
@Column(columnName = InstanceTraffic.SERVICE_ID)
private String serviceId;
@Getter
@Setter
@Column(columnName = VALUE, dataType = Column.ValueDataType.COMMON_VALUE, function = Function.Latest)
private long value;
@Override public void accept(MeterEntity entity, Long value) {
this.entityId = entity.id();
this.serviceId = entity.serviceId();
this.value = value;
}
@Entrance public final void combine(@SourceFrom long value) {
this.value = value;
}
@Override public final void combine(Metrics metrics) {
LatestFunction latestFunction = (LatestFunction) metrics;
combine(latestFunction.value);
}
@Override public void calculate() {
}
@Override
public Metrics toHour() {
LatestFunction metrics = (LatestFunction) createNew();
metrics.setEntityId(getEntityId());
metrics.setTimeBucket(toTimeBucketInHour());
metrics.setServiceId(getServiceId());
metrics.setValue(getValue());
return metrics;
}
@Override
public Metrics toDay() {
LatestFunction metrics = (LatestFunction) createNew();
metrics.setEntityId(getEntityId());
metrics.setTimeBucket(toTimeBucketInDay());
metrics.setServiceId(getServiceId());
metrics.setValue(getValue());
return metrics;
}
@Override
public int remoteHashCode() {
return entityId.hashCode();
}
@Override
public void deserialize(final RemoteData remoteData) {
this.value = remoteData.getDataLongs(0);
setTimeBucket(remoteData.getDataLongs(1));
this.entityId = remoteData.getDataStrings(0);
this.serviceId = remoteData.getDataStrings(1);
}
@Override
public RemoteData.Builder serialize() {
RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
remoteBuilder.addDataLongs(value);
remoteBuilder.addDataLongs(getTimeBucket());
remoteBuilder.addDataStrings(entityId);
remoteBuilder.addDataStrings(serviceId);
return remoteBuilder;
}
@Override
public String id() {
return getTimeBucket() + Const.ID_CONNECTOR + entityId;
}
@Override
public Class<? extends LastestStorageBuilder> builder() {
return LatestFunction.LastestStorageBuilder.class;
}
public static class LastestStorageBuilder implements StorageBuilder<LatestFunction> {
@Override
public LatestFunction map2Data(final Map<String, Object> dbMap) {
LatestFunction metrics = new LatestFunction() {
@Override
public AcceptableValue<Long> createNew() {
throw new UnexpectedException("createNew should not be called");
}
};
metrics.setValue(((Number) dbMap.get(VALUE)).longValue());
metrics.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue());
metrics.setServiceId((String) dbMap.get(InstanceTraffic.SERVICE_ID));
metrics.setEntityId((String) dbMap.get(ENTITY_ID));
return metrics;
}
@Override
public Map<String, Object> data2Map(final LatestFunction storageData) {
Map<String, Object> map = new HashMap<>();
map.put(VALUE, storageData.getValue());
map.put(TIME_BUCKET, storageData.getTimeBucket());
map.put(InstanceTraffic.SERVICE_ID, storageData.getServiceId());
map.put(ENTITY_ID, storageData.getEntityId());
return map;
}
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (!(o instanceof LatestFunction))
return false;
LatestFunction function = (LatestFunction) o;
return Objects.equals(entityId, function.entityId) &&
getTimeBucket() == function.getTimeBucket();
}
@Override
public int hashCode() {
return Objects.hash(entityId, getTimeBucket());
}
}
......@@ -76,6 +76,8 @@ public class PrometheusMetricConverter {
private final static String AVG_LABELED = "avgLabeled";
private final static String LATEST = "latest";
private final Window window = new Window();
private final List<MetricsRule> rules;
......@@ -124,7 +126,7 @@ public class PrometheusMetricConverter {
}
return rule._3.getLabelFilter().stream()
.allMatch(matchRule -> matchRule.getOptions().stream()
.anyMatch(metric.getLabels().get(matchRule.getKey())::matches));
.anyMatch(option -> matchLabel(option, metric.getLabels().get(matchRule.getKey()))));
})
.map(rule -> Tuple.of(rule._1, rule._2, rule._3, metric))
)
......@@ -158,6 +160,7 @@ public class PrometheusMetricConverter {
log.debug("Building metrics {} -> {}", operation, sources);
Try.run(() -> {
switch (operation.getName()) {
case LATEST:
case AVG:
sources.forEach((source, metrics) -> {
AcceptableValue<Long> value = service.buildMetrics(formatMetricName(operation.getMetricName()), Long.class);
......@@ -287,4 +290,19 @@ public class PrometheusMetricConverter {
.onFailure(e -> log.debug(debugMessage + " failed", e))
.toJavaStream();
}
private boolean matchLabel(String option, String labelValue) {
if (option.startsWith("!")) {
return !matchLabelRule(option.substring(1), labelValue);
}
log.debug("{} {}", option, labelValue);
return matchLabelRule(option, labelValue);
}
private boolean matchLabelRule(String rule, String labelValue) {
if (Strings.isNullOrEmpty(rule)) {
return Strings.isNullOrEmpty(labelValue);
}
return labelValue.matches(rule);
}
}
......@@ -50,7 +50,7 @@ public class MetricsQueryService implements Service {
/**
* Read metrics single value in the duration of required metrics
*/
public int readMetricsValue(MetricsCondition condition, Duration duration) throws IOException {
public long readMetricsValue(MetricsCondition condition, Duration duration) throws IOException {
return getMetricQueryDAO().readMetricsValue(
condition, ValueColumnMetadata.INSTANCE.getValueCName(condition.getName()), duration);
}
......
......@@ -19,5 +19,5 @@
package org.apache.skywalking.oap.server.core.query.sql;
public enum Function {
None, Avg, Sum
None, Avg, Sum, Latest
}
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.core.query.type;
import io.vavr.collection.Stream;
import java.util.ArrayList;
import java.util.List;
......@@ -36,4 +37,8 @@ public class IntValues {
}
return defaultValue;
}
public long latestValue(int defaultValue) {
return Stream.ofAll(values).map(KVInt::getValue).findLast(v -> v != defaultValue).getOrElse((long) defaultValue);
}
}
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.core.storage.query;
import com.google.common.base.Strings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
......@@ -44,7 +45,7 @@ import static java.util.stream.Collectors.toList;
* @since 8.0.0
*/
public interface IMetricsQueryDAO extends DAO {
int readMetricsValue(MetricsCondition condition, String valueColumnName, Duration duration) throws IOException;
long readMetricsValue(MetricsCondition condition, String valueColumnName, Duration duration) throws IOException;
MetricsValues readMetricsValues(MetricsCondition condition,
String valueColumnName,
......@@ -95,7 +96,7 @@ public interface IMetricsQueryDAO extends DAO {
final List<String> ids,
final Map<String, DataTable> idMap) {
List<String> allLabels;
if (Objects.isNull(labels) || labels.size() < 1) {
if (Objects.isNull(labels) || labels.size() < 1 || labels.stream().allMatch(Strings::isNullOrEmpty)) {
allLabels = idMap.values().stream()
.flatMap(dataTable -> dataTable.keys().stream())
.distinct().collect(Collectors.toList());
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.meter.function;
import java.util.Map;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.Spy;
import org.mockito.runners.MockitoJUnitRunner;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
@RunWith(MockitoJUnitRunner.class)
public class LatestFunctionTest {
@Spy
private LatestFunction function;
@Test
public void testAccept() {
long time = 1597113318673L;
function.accept(MeterEntity.newService("latest_sync_time"), time);
assertThat(function.getValue(), is(time));
time = 1597113447737L;
function.accept(MeterEntity.newService("latest_sync_time"), time);
assertThat(function.getValue(), is(time));
}
@Test
public void testCalculate() {
long time1 = 1597113318673L;
long time2 = 1597113447737L;
function.accept(MeterEntity.newService("latest_sync_time"), time1);
function.accept(MeterEntity.newService("latest_sync_time"), time2);
function.calculate();
assertThat(function.getValue(), is(time2));
}
@Test
public void testSerialize() {
long time = 1597113447737L;
function.accept(MeterEntity.newService("latest_sync_time"), time);
LatestFunction function2 = Mockito.spy(LatestFunction.class);
function2.deserialize(function.serialize().build());
assertThat(function2.getEntityId(), is(function.getEntityId()));
assertThat(function2.getTimeBucket(), is(function.getTimeBucket()));
}
@Test
public void testBuilder() throws IllegalAccessException, InstantiationException {
long time = 1597113447737L;
function.accept(MeterEntity.newService("latest_sync_time"), time);
function.calculate();
StorageBuilder<LatestFunction> storageBuilder = function.builder().newInstance();
Map<String, Object> map = storageBuilder.data2Map(function);
map.put(LatestFunction.VALUE, map.get(LatestFunction.VALUE));
LatestFunction function2 = storageBuilder.map2Data(map);
assertThat(function2.getValue(), is(function.getValue()));
}
}
\ No newline at end of file
......@@ -102,7 +102,7 @@ public class MetricsQuery implements GraphQLQueryResolver {
/**
* Read metrics single value in the duration of required metrics
*/
public int readMetricsValue(MetricsCondition condition, Duration duration) throws IOException {
public long readMetricsValue(MetricsCondition condition, Duration duration) throws IOException {
if (MetricsType.UNKNOWN.equals(typeOfMetrics(condition.getName())) || !condition.getEntity().isValid()) {
return 0;
}
......
Subproject commit f38def1d502327856c1cae7ceb233f3c0c8c8e2a
Subproject commit 5ecefdf2c16ca16d4973806bdd26f9eafa3faf1d
......@@ -57,16 +57,20 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
}
@Override
public int readMetricsValue(final MetricsCondition condition,
public long readMetricsValue(final MetricsCondition condition,
final String valueColumnName,
final Duration duration) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
buildQuery(sourceBuilder, condition, duration);
int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
final Function function = ValueColumnMetadata.INSTANCE.getValueFunction(condition.getName());
if (function == Function.Latest) {
return readMetricsValues(condition, valueColumnName, duration).getValues().latestValue(defaultValue);
}
TermsAggregationBuilder entityIdAggregation = AggregationBuilders.terms(Metrics.ENTITY_ID)
.field(Metrics.ENTITY_ID)
.size(1);
final Function function = ValueColumnMetadata.INSTANCE.getValueFunction(condition.getName());
functionAggregation(function, entityIdAggregation, valueColumnName);
sourceBuilder.aggregation(entityIdAggregation);
......@@ -78,16 +82,16 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
switch (function) {
case Sum:
Sum sum = idBucket.getAggregations().get(valueColumnName);
return (int) sum.getValue();
return (long) sum.getValue();
case Avg:
Avg avg = idBucket.getAggregations().get(valueColumnName);
return (int) avg.getValue();
return (long) avg.getValue();
default:
avg = idBucket.getAggregations().get(valueColumnName);
return (int) avg.getValue();
return (long) avg.getValue();
}
}
return ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
return defaultValue;
}
@Override
......
......@@ -46,16 +46,20 @@ public class MetricsQueryEs7DAO extends MetricsQueryEsDAO {
}
@Override
public int readMetricsValue(final MetricsCondition condition,
public long readMetricsValue(final MetricsCondition condition,
final String valueColumnName,
final Duration duration) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
buildQuery(sourceBuilder, condition, duration);
TermsAggregationBuilder entityIdAggregation = AggregationBuilders.terms(Metrics.ENTITY_ID)
.field(Metrics.ENTITY_ID)
.size(1);
int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
final Function function = ValueColumnMetadata.INSTANCE.getValueFunction(condition.getName());
if (function == Function.Latest) {
return readMetricsValues(condition, valueColumnName, duration).getValues().latestValue(defaultValue);
}
TermsAggregationBuilder entityIdAggregation = AggregationBuilders.terms(Metrics.ENTITY_ID)
.field(Metrics.ENTITY_ID)
.size(1);
functionAggregation(function, entityIdAggregation, valueColumnName);
sourceBuilder.aggregation(entityIdAggregation);
......@@ -67,16 +71,16 @@ public class MetricsQueryEs7DAO extends MetricsQueryEsDAO {
switch (function) {
case Sum:
Sum sum = idBucket.getAggregations().get(valueColumnName);
return (int) sum.getValue();
return (long) sum.getValue();
case Avg:
Avg avg = idBucket.getAggregations().get(valueColumnName);
return (int) avg.getValue();
return (long) avg.getValue();
default:
avg = idBucket.getAggregations().get(valueColumnName);
return (int) avg.getValue();
return (long) avg.getValue();
}
}
return ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
return defaultValue;
}
protected void functionAggregation(Function function, TermsAggregationBuilder parentAggBuilder, String valueCName) {
......
......@@ -60,10 +60,14 @@ public class MetricsQuery implements IMetricsQueryDAO {
}
@Override
public int readMetricsValue(final MetricsCondition condition,
public long readMetricsValue(final MetricsCondition condition,
final String valueColumnName,
final Duration duration) throws IOException {
int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
final Function function = ValueColumnMetadata.INSTANCE.getValueFunction(condition.getName());
if (function == Function.Latest) {
return readMetricsValues(condition, valueColumnName, duration).getValues().latestValue(defaultValue);
}
final String measurement = condition.getName();
SelectionQueryImpl query = select();
......@@ -93,11 +97,11 @@ public class MetricsQuery implements IMetricsQueryDAO {
if (CollectionUtils.isNotEmpty(seriesList)) {
for (QueryResult.Series series : seriesList) {
Number value = (Number) series.getValues().get(0).get(1);
return value.intValue();
return value.longValue();
}
}
return ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
return defaultValue;
}
@Override
......
......@@ -49,10 +49,14 @@ public class H2MetricsQueryDAO extends H2SQLExecutor implements IMetricsQueryDAO
}
@Override
public int readMetricsValue(final MetricsCondition condition,
public long readMetricsValue(final MetricsCondition condition,
String valueColumnName,
final Duration duration) throws IOException {
int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
final Function function = ValueColumnMetadata.INSTANCE.getValueFunction(condition.getName());
if (function == Function.Latest) {
return readMetricsValues(condition, valueColumnName, duration).getValues().latestValue(defaultValue);
}
String op;
switch (function) {
case Avg:
......@@ -80,13 +84,13 @@ public class H2MetricsQueryDAO extends H2SQLExecutor implements IMetricsQueryDAO
parameters.toArray(new Object[0])
)) {
while (resultSet.next()) {
return resultSet.getInt("value");
return resultSet.getLong("value");
}
}
} catch (SQLException e) {
throw new IOException(e);
}
return ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
return defaultValue;
}
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册