未验证 提交 f93a98d4 编写于 作者: 叶梦飞 提交者: GitHub

Adapt otel exponential histogram data (#10449)

上级 4d4858b3
......@@ -104,6 +104,7 @@
* `Scope` in the Entity of Metrics query v1 protocol is not required and automatical correction. The scope is determined based on the metric itself.
* Add explicit `ReadTimeout` for ConsulConfigurationWatcher to avoid `IllegalArgumentException: Cache watchInterval=10sec >= networkClientReadTimeout=10000ms`.
* Fix `DurationUtils.getDurationPoints` exceed, when `startTimeBucket` equals `endTimeBucket`.
* Support process OpenTelemetry ExponentialHistogram metrics
#### UI
......
......@@ -24,12 +24,14 @@ import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.metrics.v1.Sum;
import io.opentelemetry.proto.metrics.v1.SummaryDataPoint;
import io.vavr.Function1;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Stream;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.meter.analyzer.MetricConvert;
......@@ -161,6 +163,57 @@ public class OpenTelemetryMetricRequestProcessor implements Service {
return result;
}
/**
* ExponentialHistogram data points are an alternate representation to the Histogram data point in OpenTelemetry
* metric format(https://opentelemetry.io/docs/reference/specification/metrics/data-model/#exponentialhistogram).
* It uses scale, offset and bucket index to calculate the bound. Firstly, calculate the base using scale by
* formula: base = 2**(2**(-scale)). Then the upperBound of specific bucket can be calculated by formula:
* base**(offset+index+1). Above calculation way is about positive buckets. For the negative case, we just
* map them by their absolute value into the negative range using the same scale as the positive range. So the
* upperBound should be calculated as -base**(offset+index).
*
* Ignored the zero_count field temporarily,
* because the zero_threshold even could overlap the existing bucket scopes.
*
* @param positiveOffset corresponding to positive Buckets' offset in ExponentialHistogramDataPoint
* @param positiveBucketCounts corresponding to positive Buckets' bucket_counts in ExponentialHistogramDataPoint
* @param negativeOffset corresponding to negative Buckets' offset in ExponentialHistogramDataPoint
* @param negativeBucketCounts corresponding to negative Buckets' bucket_counts in ExponentialHistogramDataPoint
* @param scale corresponding to scale in ExponentialHistogramDataPoint
* @return The map is a bucket set for histogram, the key is specific bucket's upperBound, the value is item count
* in this bucket lower than or equals to key(upperBound)
*/
private static Map<Double, Long> buildBucketsFromExponentialHistogram(
int positiveOffset, final List<Long> positiveBucketCounts,
int negativeOffset, final List<Long> negativeBucketCounts, int scale) {
final Map<Double, Long> result = new HashMap<>();
double base = Math.pow(2.0, Math.pow(2.0, -scale));
if (base == Double.POSITIVE_INFINITY) {
log.warn("Receive and reject out-of-range ExponentialHistogram data");
return result;
}
double upperBound;
for (int i = 0; i < negativeBucketCounts.size(); i++) {
upperBound = -Math.pow(base, negativeOffset + i);
if (upperBound == Double.NEGATIVE_INFINITY) {
log.warn("Receive and reject out-of-range ExponentialHistogram data");
return new HashMap<>();
}
result.put(upperBound, negativeBucketCounts.get(i));
}
for (int i = 0; i < positiveBucketCounts.size() - 1; i++) {
upperBound = Math.pow(base, positiveOffset + i + 1);
if (upperBound == Double.POSITIVE_INFINITY) {
log.warn("Receive and reject out-of-range ExponentialHistogram data");
return new HashMap<>();
}
result.put(upperBound, positiveBucketCounts.get(i));
}
result.put(Double.POSITIVE_INFINITY, positiveBucketCounts.get(positiveBucketCounts.size() - 1));
return result;
}
// Adapt the OpenTelemetry metrics to SkyWalking metrics
private Stream<? extends Metric> adaptMetrics(
final Map<String, String> nodeLabels,
......@@ -187,16 +240,16 @@ public class OpenTelemetryMetricRequestProcessor implements Service {
if (sum
.getAggregationTemporality() == AGGREGATION_TEMPORALITY_DELTA) {
return sum.getDataPointsList().stream()
.map(point -> new Gauge(
metric.getName(),
mergeLabels(
nodeLabels,
buildLabels(point.getAttributesList())
),
point.hasAsDouble() ? point.getAsDouble()
: point.getAsInt(),
point.getTimeUnixNano() / 1000000
));
.map(point -> new Gauge(
metric.getName(),
mergeLabels(
nodeLabels,
buildLabels(point.getAttributesList())
),
point.hasAsDouble() ? point.getAsDouble()
: point.getAsInt(),
point.getTimeUnixNano() / 1000000
));
}
if (sum.getIsMonotonic()) {
return sum.getDataPointsList().stream()
......@@ -241,6 +294,26 @@ public class OpenTelemetryMetricRequestProcessor implements Service {
point.getTimeUnixNano() / 1000000
));
}
if (metric.hasExponentialHistogram()) {
return metric.getExponentialHistogram().getDataPointsList().stream()
.map(point -> new Histogram(
metric.getName(),
mergeLabels(
nodeLabels,
buildLabels(point.getAttributesList())
),
point.getCount(),
point.getSum(),
buildBucketsFromExponentialHistogram(
point.getPositive().getOffset(),
point.getPositive().getBucketCountsList(),
point.getNegative().getOffset(),
point.getNegative().getBucketCountsList(),
point.getScale()
),
point.getTimeUnixNano() / 1000000
));
}
if (metric.hasSummary()) {
return metric.getSummary().getDataPointsList().stream()
.map(point -> new Summary(
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.server.receiver.otel.otlp;
import io.opentelemetry.proto.metrics.v1.ExponentialHistogram;
import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint;
import io.opentelemetry.proto.metrics.v1.Metric;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Histogram;
import org.apache.skywalking.oap.server.receiver.otel.OtelMetricReceiverConfig;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class OpenTelemetryMetricRequestProcessorTest {
private OtelMetricReceiverConfig config;
private ModuleManager manager;
private OpenTelemetryMetricRequestProcessor metricRequestProcessor;
private Map<String, String> nodeLabels;
@BeforeEach
public void setUp() {
manager = new ModuleManager();
config = new OtelMetricReceiverConfig();
metricRequestProcessor = new OpenTelemetryMetricRequestProcessor(manager, config);
nodeLabels = new HashMap<>();
}
@Test
public void testAdaptExponentialHistogram() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
Class<OpenTelemetryMetricRequestProcessor> clazz = OpenTelemetryMetricRequestProcessor.class;
Method adaptMetricsMethod = clazz.getDeclaredMethod("adaptMetrics", Map.class, Metric.class);
adaptMetricsMethod.setAccessible(true);
// number is 4; 7, 7.5; 8.5, 8.7, 9.4
var positiveBuckets = ExponentialHistogramDataPoint.Buckets.newBuilder()
.setOffset(10)
.addBucketCounts(
1) // (0, 6.72]
.addBucketCounts(
2
) // (6.72, 8]
.addBucketCounts(
3
) // (8, 9.51]
.build();
// number is -14, -14.5, -15; -18; -21, -26
var negativeBuckets = ExponentialHistogramDataPoint.Buckets.newBuilder()
.setOffset(15)
.addBucketCounts(
3
) // (-16, -13.45]
.addBucketCounts(
1
) // (-19.02, -16]
.addBucketCounts(
2
) // (-INFINITY, -19.02]
.build();
var dataPoint = ExponentialHistogramDataPoint.newBuilder()
.setCount(12)
.setSum(-63.4)
.setScale(2)
.setPositive(positiveBuckets)
.setNegative(negativeBuckets)
.setTimeUnixNano(1000000)
.build();
ExponentialHistogram exponentialHistogram = ExponentialHistogram.newBuilder()
.addDataPoints(dataPoint)
.build();
Metric metric = Metric.newBuilder()
.setName("test_metric")
.setExponentialHistogram(exponentialHistogram)
.build();
Stream<Histogram> stream = (Stream<Histogram>) adaptMetricsMethod.invoke(
metricRequestProcessor, nodeLabels, metric);
List<Histogram> list = stream.collect(Collectors.toList());
Histogram histogramMetric = list.get(0);
assertEquals("test_metric", histogramMetric.getName());
assertEquals(1, histogramMetric.getTimestamp());
assertEquals(12, histogramMetric.getSampleCount());
assertEquals(-63.4, histogramMetric.getSampleSum());
// validate the key and value of bucket
double base = Math.pow(2, Math.pow(2, -2));
assertTrue(histogramMetric.getBuckets().containsKey(Math.pow(base, 11)));
assertEquals(1, histogramMetric.getBuckets().get(Math.pow(base, 11)));
assertTrue(histogramMetric.getBuckets().containsKey(Math.pow(base, 12)));
assertEquals(2, histogramMetric.getBuckets().get(Math.pow(base, 12)));
assertTrue(histogramMetric.getBuckets().containsKey(Double.POSITIVE_INFINITY));
assertEquals(3, histogramMetric.getBuckets().get(Double.POSITIVE_INFINITY));
assertTrue(histogramMetric.getBuckets().containsKey(-Math.pow(base, 15)));
assertEquals(3, histogramMetric.getBuckets().get(-Math.pow(base, 15)));
assertTrue(histogramMetric.getBuckets().containsKey(-Math.pow(base, 16)));
assertEquals(1, histogramMetric.getBuckets().get(-Math.pow(base, 16)));
assertTrue(histogramMetric.getBuckets().containsKey(-Math.pow(base, 17)));
assertEquals(2, histogramMetric.getBuckets().get(-Math.pow(base, 17)));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册