未验证 提交 5f769ca2 编写于 作者: G Gao Hongtao 提交者: GitHub

Add AvgLabeledFunction to ingest multiple labels (#5187)

上级 5d7da1ae
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.meter.function;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
@MeterFunction(functionName = "avgLabeled")
@ToString
public abstract class AvgLabeledFunction extends Metrics implements AcceptableValue<DataTable> {
protected static final String SUMMATION = "summation";
protected static final String COUNT = "count";
protected static final String VALUE = "value";
@Setter
@Getter
@Column(columnName = ENTITY_ID, length = 512)
private String entityId;
/**
* Service ID is required for sort query.
*/
@Setter
@Getter
@Column(columnName = InstanceTraffic.SERVICE_ID)
private String serviceId;
@Getter
@Setter
@Column(columnName = SUMMATION, storageOnly = true)
protected DataTable summation = new DataTable(30);
@Getter
@Setter
@Column(columnName = COUNT, storageOnly = true)
protected DataTable count = new DataTable(30);
@Getter
@Setter
@Column(columnName = VALUE, dataType = Column.ValueDataType.LABELED_VALUE, storageOnly = true)
private DataTable value = new DataTable(30);
@Override
public final void combine(Metrics metrics) {
AvgLabeledFunction longAvgMetrics = (AvgLabeledFunction) metrics;
summation.append(longAvgMetrics.summation);
count.append(longAvgMetrics.count);
}
@Override
public final void calculate() {
Set<String> keys = count.keys();
for (String key : keys) {
Long s = summation.get(key);
if (Objects.isNull(s)) {
continue;
}
Long c = count.get(key);
if (Objects.isNull(c)) {
continue;
}
long result = s / c;
if (result == 0 && s > 0) {
result = 1;
}
value.put(key, result);
}
}
@Override
public Metrics toHour() {
AvgLabeledFunction metrics = (AvgLabeledFunction) createNew();
metrics.setEntityId(getEntityId());
metrics.setTimeBucket(toTimeBucketInHour());
metrics.setServiceId(getServiceId());
metrics.setSummation(getSummation());
metrics.setCount(getCount());
return metrics;
}
@Override
public Metrics toDay() {
AvgLabeledFunction metrics = (AvgLabeledFunction) createNew();
metrics.setEntityId(getEntityId());
metrics.setTimeBucket(toTimeBucketInDay());
metrics.setServiceId(getServiceId());
metrics.setSummation(getSummation());
metrics.setCount(getCount());
return metrics;
}
@Override
public int remoteHashCode() {
return entityId.hashCode();
}
@Override
public void deserialize(final RemoteData remoteData) {
this.setCount(new DataTable(remoteData.getDataObjectStrings(0)));
this.setSummation(new DataTable(remoteData.getDataObjectStrings(1)));
setTimeBucket(remoteData.getDataLongs(0));
this.entityId = remoteData.getDataStrings(0);
this.serviceId = remoteData.getDataStrings(1);
}
@Override
public RemoteData.Builder serialize() {
RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
remoteBuilder.addDataObjectStrings(count.toStorageData());
remoteBuilder.addDataObjectStrings(summation.toStorageData());
remoteBuilder.addDataLongs(getTimeBucket());
remoteBuilder.addDataStrings(entityId);
remoteBuilder.addDataStrings(serviceId);
return remoteBuilder;
}
@Override
public String id() {
return getTimeBucket() + Const.ID_CONNECTOR + entityId;
}
@Override
public void accept(final MeterEntity entity, final DataTable value) {
this.entityId = entity.id();
this.serviceId = entity.serviceId();
this.summation.append(value);
DataTable c = new DataTable();
value.keys().forEach(key -> c.put(key, 1L));
this.count.append(c);
}
@Override
public Class<? extends AvgLabeledStorageBuilder> builder() {
return AvgLabeledStorageBuilder.class;
}
public static class AvgLabeledStorageBuilder implements StorageBuilder<AvgLabeledFunction> {
@Override
public AvgLabeledFunction map2Data(final Map<String, Object> dbMap) {
AvgLabeledFunction metrics = new AvgLabeledFunction() {
@Override
public AcceptableValue<DataTable> createNew() {
throw new UnexpectedException("createNew should not be called");
}
};
metrics.setSummation(new DataTable((String) dbMap.get(SUMMATION)));
metrics.setValue(new DataTable((String) dbMap.get(VALUE)));
metrics.setCount(new DataTable((String) dbMap.get(COUNT)));
metrics.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue());
metrics.setServiceId((String) dbMap.get(InstanceTraffic.SERVICE_ID));
metrics.setEntityId((String) dbMap.get(ENTITY_ID));
return metrics;
}
@Override
public Map<String, Object> data2Map(final AvgLabeledFunction storageData) {
Map<String, Object> map = new HashMap<>();
map.put(SUMMATION, storageData.getSummation());
map.put(VALUE, storageData.getValue());
map.put(COUNT, storageData.getCount());
map.put(TIME_BUCKET, storageData.getTimeBucket());
map.put(InstanceTraffic.SERVICE_ID, storageData.getServiceId());
map.put(ENTITY_ID, storageData.getEntityId());
return map;
}
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (!(o instanceof AvgLabeledFunction))
return false;
AvgLabeledFunction function = (AvgLabeledFunction) o;
return Objects.equals(entityId, function.entityId) &&
getTimeBucket() == function.getTimeBucket();
}
@Override
public int hashCode() {
return Objects.hash(entityId, getTimeBucket());
}
}
......@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.EqualsAndHashCode;
import lombok.ToString;
......@@ -94,6 +95,10 @@ public class DataTable implements StorageDataComplexObject<DataTable> {
return values;
}
public Set<String> keys() {
return data.keySet();
}
public boolean hasData() {
return !data.isEmpty();
}
......
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.core.metric.promethues;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import io.vavr.Function1;
......@@ -25,6 +26,7 @@ import io.vavr.Tuple;
import io.vavr.Tuple3;
import io.vavr.control.Try;
import java.math.BigDecimal;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
......@@ -45,6 +47,7 @@ import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
import org.apache.skywalking.oap.server.core.analysis.meter.function.AvgHistogramPercentileFunction;
import org.apache.skywalking.oap.server.core.analysis.meter.function.BucketedValues;
import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.metric.promethues.counter.Window;
import org.apache.skywalking.oap.server.core.metric.promethues.operation.MetricSource;
......@@ -71,6 +74,8 @@ public class PrometheusMetricConverter {
private final static String AVG = "avg";
private final static String AVG_LABELED = "avgLabeled";
private final Window window = new Window();
private final List<MetricsRule> rules;
......@@ -132,6 +137,7 @@ public class PrometheusMetricConverter {
.timestamp(tuple._4.getTimestamp())
.scale(tuple._3.getScale())
.counterFunction(tuple._3.getCounterFunction())
.groupBy(tuple._3.getGroupBy())
.range(tuple._3.getRange());
switch (tuple._1.getScope()) {
case SERVICE:
......@@ -155,10 +161,23 @@ public class PrometheusMetricConverter {
case AVG:
sources.forEach((source, metrics) -> {
AcceptableValue<Long> value = service.buildMetrics(formatMetricName(operation.getMetricName()), Long.class);
Double sumDouble = sum(metrics).value();
sumDouble = window.get(source.getPromMetricName()).apply(source, sumDouble);
value.accept(source.getEntity(), BigDecimal.valueOf(Double.isNaN(sumDouble) ? 0D : sumDouble)
.multiply(BigDecimal.TEN.pow(source.getScale())).longValue());
value.accept(source.getEntity(), sum(metrics, source));
value.setTimeBucket(TimeBucket.getMinuteTimeBucket(source.getTimestamp()));
log.debug("Input metric {}", value.getTimeBucket());
service.doStreamingCalculation(value);
generateTraffic(source.getEntity());
});
break;
case AVG_LABELED:
sources.forEach((source, metrics) -> {
Preconditions.checkArgument(Objects.nonNull(source.getGroupBy()));
DataTable dt = new DataTable();
metrics.stream()
.collect(groupingBy(m -> source.getGroupBy().stream().map(m.getLabels()::get).collect(Collectors.joining("-"))))
.forEach((group, mm) -> dt.put(group, sum(mm, source, ImmutableMap.of("group", group))));
AcceptableValue<DataTable> value = service.buildMetrics(formatMetricName(operation.getMetricName()), DataTable.class);
value.accept(source.getEntity(), dt);
value.setTimeBucket(TimeBucket.getMinuteTimeBucket(source.getTimestamp()));
log.debug("Input metric {}", value.getTimeBucket());
service.doStreamingCalculation(value);
......@@ -228,6 +247,17 @@ public class PrometheusMetricConverter {
return metrics.stream().reduce(Metric::sum).orElseThrow(IllegalArgumentException::new);
}
private long sum(List<Metric> metrics, MetricSource source) {
return sum(metrics, source, Collections.emptyMap());
}
private long sum(List<Metric> metrics, MetricSource source, Map<String, String> labels) {
Double sumDouble = sum(metrics).value();
sumDouble = window.get(source.getPromMetricName(), labels).apply(source, sumDouble);
return BigDecimal.valueOf(Double.isNaN(sumDouble) ? 0D : sumDouble)
.multiply(BigDecimal.TEN.pow(source.getScale())).longValue();
}
private void generateTraffic(MeterEntity entity) {
ServiceTraffic s = new ServiceTraffic();
s.setName(requireNonNull(entity.getServiceName()));
......
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.core.metric.promethues.operation;
import java.util.List;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
......@@ -41,4 +42,6 @@ public class MetricSource {
private final String range;
private final int scale;
private final List<String> groupBy;
}
......@@ -28,6 +28,7 @@ public class PrometheusMetric {
private CounterFunction counterFunction;
private String range;
private List<LabelMatchRule> labelFilter;
private List<String> groupBy;
private Relabel relabel;
private int scale = 0;
}
......@@ -44,6 +44,7 @@ public class Rules {
throw new ModuleStartException("Load fetcher rules failed", e);
}
return Arrays.stream(rules)
.filter(File::isFile)
.map(f -> {
try (Reader r = new FileReader(f)) {
Rule rule = new Yaml().loadAs(r, Rule.class);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.meter.function;
import io.vavr.collection.Stream;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.Spy;
import org.mockito.runners.MockitoJUnitRunner;
import static org.apache.skywalking.oap.server.core.analysis.meter.function.AvgLabeledFunction.COUNT;
import static org.apache.skywalking.oap.server.core.analysis.meter.function.AvgLabeledFunction.SUMMATION;
import static org.apache.skywalking.oap.server.core.analysis.meter.function.AvgLabeledFunction.VALUE;
import static org.hamcrest.core.Is.is;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertThat;
@RunWith(MockitoJUnitRunner.class)
public class AvgLabeledFunctionTest {
@Spy
private AvgLabeledFunction function;
@Test
public void testAccept() {
function.accept(MeterEntity.newService("request_count"), build(asList("200", "404"), asList(10L, 2L)));
assertResult(asList("200", "404"), asList(10L, 2L), asList(1L, 1L));
function.accept(MeterEntity.newService("request_count"), build(asList("200", "500"), asList(2L, 3L)));
assertResult(asList("200", "404", "500"), asList(12L, 2L, 3L), asList(2L, 1L, 1L));
}
@Test
public void testCalculate() {
function.accept(MeterEntity.newService("request_count"), build(asList("200", "404"), asList(10L, 2L)));
function.accept(MeterEntity.newService("request_count"), build(asList("200", "500"), asList(2L, 3L)));
function.calculate();
assertThat(function.getValue().sortedKeys(Comparator.naturalOrder()), is(asList("200", "404", "500")));
assertThat(function.getValue().sortedValues(Comparator.naturalOrder()), is(asList(6L, 2L, 3L)));
}
@Test
public void testSerialize() {
function.accept(MeterEntity.newService("request_count"), build(asList("200", "404"), asList(10L, 2L)));
AvgLabeledFunction function2 = Mockito.spy(AvgLabeledFunction.class);
function2.deserialize(function.serialize().build());
assertThat(function2.getEntityId(), is(function.getEntityId()));
assertThat(function2.getTimeBucket(), is(function.getTimeBucket()));
}
@Test
public void testBuilder() throws IllegalAccessException, InstantiationException {
function.accept(MeterEntity.newService("request_count"), build(asList("200", "404"), asList(10L, 2L)));
function.calculate();
StorageBuilder<AvgLabeledFunction> storageBuilder = function.builder().newInstance();
Map<String, Object> map = storageBuilder.data2Map(function);
map.put(SUMMATION, ((DataTable) map.get(SUMMATION)).toStorageData());
map.put(COUNT, ((DataTable) map.get(COUNT)).toStorageData());
map.put(VALUE, ((DataTable) map.get(VALUE)).toStorageData());
AvgLabeledFunction function2 = storageBuilder.map2Data(map);
assertThat(function2.getValue(), is(function.getValue()));
}
private DataTable build(List<String> keys, List<Long> values) {
DataTable result = new DataTable();
Stream.ofAll(keys).forEachWithIndex((key, i) -> result.put(key, values.get(i)));
return result;
}
private void assertResult(List<String> expectedKeys, List<Long> expectedValues, List<Long> expectedCount) {
assertSummation(expectedKeys, expectedValues);
assertCount(expectedKeys, expectedCount);
}
private void assertCount(List<String> expectedKeys, List<Long> expectedCount) {
List<String> keys = function.getCount().sortedKeys(Comparator.comparingInt(Integer::parseInt));
assertThat(keys, is(expectedKeys));
List<Long> values = function.getCount().sortedValues(Comparator.comparingLong(Long::parseLong));
assertThat(values, is(expectedCount));
}
private void assertSummation(List<String> expectedKeys, List<Long> expectedValues) {
List<String> keys = function.getSummation().sortedKeys(Comparator.comparingInt(Integer::parseInt));
assertThat(keys, is(expectedKeys));
List<Long> values = function.getSummation().sortedValues(Comparator.comparingLong(Long::parseLong));
assertThat(values, is(expectedValues));
}
}
\ No newline at end of file
......@@ -127,6 +127,9 @@ public class PrometheusFetcherProvider extends ModuleProvider {
while ((mf = p.parse(now)) != null) {
result.addAll(mf.getMetrics().stream()
.peek(metric -> {
if (Objects.isNull(sc.getLabels())) {
return;
}
Map<String, String> extraLabels = Maps.newHashMap(sc.getLabels());
extraLabels.put("instance", url);
extraLabels.forEach((key, value) -> {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册