diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index 8183dd871667fb42bfa360a2d9c787065ff9fb15..2e71039940ca898d93b7212dbc4600b077930091 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -104,6 +104,7 @@ * `Scope` in the Entity of Metrics query v1 protocol is not required and automatical correction. The scope is determined based on the metric itself. * Add explicit `ReadTimeout` for ConsulConfigurationWatcher to avoid `IllegalArgumentException: Cache watchInterval=10sec >= networkClientReadTimeout=10000ms`. * Fix `DurationUtils.getDurationPoints` exceed, when `startTimeBucket` equals `endTimeBucket`. +* Support process OpenTelemetry ExponentialHistogram metrics #### UI diff --git a/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryMetricRequestProcessor.java b/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryMetricRequestProcessor.java index e5302472b9df68d14ce510ff5dcab76fdb09466d..4d3adbbe0787584bc9f8ff45ff325c77e9a00a43 100644 --- a/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryMetricRequestProcessor.java +++ b/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryMetricRequestProcessor.java @@ -24,12 +24,14 @@ import io.opentelemetry.proto.common.v1.KeyValue; import io.opentelemetry.proto.metrics.v1.Sum; import io.opentelemetry.proto.metrics.v1.SummaryDataPoint; import io.vavr.Function1; + import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.Function; import java.util.stream.Stream; + import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.meter.analyzer.MetricConvert; @@ -161,6 +163,57 @@ public class OpenTelemetryMetricRequestProcessor implements Service { return result; } + /** + * ExponentialHistogram data points are an alternate representation to the Histogram data point in OpenTelemetry + * metric format(https://opentelemetry.io/docs/reference/specification/metrics/data-model/#exponentialhistogram). + * It uses scale, offset and bucket index to calculate the bound. Firstly, calculate the base using scale by + * formula: base = 2**(2**(-scale)). Then the upperBound of specific bucket can be calculated by formula: + * base**(offset+index+1). Above calculation way is about positive buckets. For the negative case, we just + * map them by their absolute value into the negative range using the same scale as the positive range. So the + * upperBound should be calculated as -base**(offset+index). + * + * Ignored the zero_count field temporarily, + * because the zero_threshold even could overlap the existing bucket scopes. + * + * @param positiveOffset corresponding to positive Buckets' offset in ExponentialHistogramDataPoint + * @param positiveBucketCounts corresponding to positive Buckets' bucket_counts in ExponentialHistogramDataPoint + * @param negativeOffset corresponding to negative Buckets' offset in ExponentialHistogramDataPoint + * @param negativeBucketCounts corresponding to negative Buckets' bucket_counts in ExponentialHistogramDataPoint + * @param scale corresponding to scale in ExponentialHistogramDataPoint + * @return The map is a bucket set for histogram, the key is specific bucket's upperBound, the value is item count + * in this bucket lower than or equals to key(upperBound) + */ + private static Map buildBucketsFromExponentialHistogram( + int positiveOffset, final List positiveBucketCounts, + int negativeOffset, final List negativeBucketCounts, int scale) { + + final Map result = new HashMap<>(); + double base = Math.pow(2.0, Math.pow(2.0, -scale)); + if (base == Double.POSITIVE_INFINITY) { + log.warn("Receive and reject out-of-range ExponentialHistogram data"); + return result; + } + double upperBound; + for (int i = 0; i < negativeBucketCounts.size(); i++) { + upperBound = -Math.pow(base, negativeOffset + i); + if (upperBound == Double.NEGATIVE_INFINITY) { + log.warn("Receive and reject out-of-range ExponentialHistogram data"); + return new HashMap<>(); + } + result.put(upperBound, negativeBucketCounts.get(i)); + } + for (int i = 0; i < positiveBucketCounts.size() - 1; i++) { + upperBound = Math.pow(base, positiveOffset + i + 1); + if (upperBound == Double.POSITIVE_INFINITY) { + log.warn("Receive and reject out-of-range ExponentialHistogram data"); + return new HashMap<>(); + } + result.put(upperBound, positiveBucketCounts.get(i)); + } + result.put(Double.POSITIVE_INFINITY, positiveBucketCounts.get(positiveBucketCounts.size() - 1)); + return result; + } + // Adapt the OpenTelemetry metrics to SkyWalking metrics private Stream adaptMetrics( final Map nodeLabels, @@ -187,16 +240,16 @@ public class OpenTelemetryMetricRequestProcessor implements Service { if (sum .getAggregationTemporality() == AGGREGATION_TEMPORALITY_DELTA) { return sum.getDataPointsList().stream() - .map(point -> new Gauge( - metric.getName(), - mergeLabels( - nodeLabels, - buildLabels(point.getAttributesList()) - ), - point.hasAsDouble() ? point.getAsDouble() - : point.getAsInt(), - point.getTimeUnixNano() / 1000000 - )); + .map(point -> new Gauge( + metric.getName(), + mergeLabels( + nodeLabels, + buildLabels(point.getAttributesList()) + ), + point.hasAsDouble() ? point.getAsDouble() + : point.getAsInt(), + point.getTimeUnixNano() / 1000000 + )); } if (sum.getIsMonotonic()) { return sum.getDataPointsList().stream() @@ -241,6 +294,26 @@ public class OpenTelemetryMetricRequestProcessor implements Service { point.getTimeUnixNano() / 1000000 )); } + if (metric.hasExponentialHistogram()) { + return metric.getExponentialHistogram().getDataPointsList().stream() + .map(point -> new Histogram( + metric.getName(), + mergeLabels( + nodeLabels, + buildLabels(point.getAttributesList()) + ), + point.getCount(), + point.getSum(), + buildBucketsFromExponentialHistogram( + point.getPositive().getOffset(), + point.getPositive().getBucketCountsList(), + point.getNegative().getOffset(), + point.getNegative().getBucketCountsList(), + point.getScale() + ), + point.getTimeUnixNano() / 1000000 + )); + } if (metric.hasSummary()) { return metric.getSummary().getDataPointsList().stream() .map(point -> new Summary( diff --git a/oap-server/server-receiver-plugin/otel-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryMetricRequestProcessorTest.java b/oap-server/server-receiver-plugin/otel-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryMetricRequestProcessorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..0a40e3bd21a8093afa08a60255240f9167fc62db --- /dev/null +++ b/oap-server/server-receiver-plugin/otel-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryMetricRequestProcessorTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.oap.server.receiver.otel.otlp; + +import io.opentelemetry.proto.metrics.v1.ExponentialHistogram; +import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint; +import io.opentelemetry.proto.metrics.v1.Metric; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Histogram; +import org.apache.skywalking.oap.server.receiver.otel.OtelMetricReceiverConfig; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class OpenTelemetryMetricRequestProcessorTest { + + private OtelMetricReceiverConfig config; + + private ModuleManager manager; + + private OpenTelemetryMetricRequestProcessor metricRequestProcessor; + + private Map nodeLabels; + + @BeforeEach + public void setUp() { + manager = new ModuleManager(); + config = new OtelMetricReceiverConfig(); + metricRequestProcessor = new OpenTelemetryMetricRequestProcessor(manager, config); + nodeLabels = new HashMap<>(); + } + + @Test + public void testAdaptExponentialHistogram() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + Class clazz = OpenTelemetryMetricRequestProcessor.class; + Method adaptMetricsMethod = clazz.getDeclaredMethod("adaptMetrics", Map.class, Metric.class); + adaptMetricsMethod.setAccessible(true); + + // number is 4; 7, 7.5; 8.5, 8.7, 9.4 + var positiveBuckets = ExponentialHistogramDataPoint.Buckets.newBuilder() + .setOffset(10) + .addBucketCounts( + 1) // (0, 6.72] + .addBucketCounts( + 2 + ) // (6.72, 8] + .addBucketCounts( + 3 + ) // (8, 9.51] + .build(); + // number is -14, -14.5, -15; -18; -21, -26 + var negativeBuckets = ExponentialHistogramDataPoint.Buckets.newBuilder() + .setOffset(15) + .addBucketCounts( + 3 + ) // (-16, -13.45] + .addBucketCounts( + 1 + ) // (-19.02, -16] + .addBucketCounts( + 2 + ) // (-INFINITY, -19.02] + .build(); + var dataPoint = ExponentialHistogramDataPoint.newBuilder() + .setCount(12) + .setSum(-63.4) + .setScale(2) + .setPositive(positiveBuckets) + .setNegative(negativeBuckets) + .setTimeUnixNano(1000000) + .build(); + ExponentialHistogram exponentialHistogram = ExponentialHistogram.newBuilder() + .addDataPoints(dataPoint) + .build(); + Metric metric = Metric.newBuilder() + .setName("test_metric") + .setExponentialHistogram(exponentialHistogram) + .build(); + + Stream stream = (Stream) adaptMetricsMethod.invoke( + metricRequestProcessor, nodeLabels, metric); + List list = stream.collect(Collectors.toList()); + Histogram histogramMetric = list.get(0); + assertEquals("test_metric", histogramMetric.getName()); + assertEquals(1, histogramMetric.getTimestamp()); + assertEquals(12, histogramMetric.getSampleCount()); + assertEquals(-63.4, histogramMetric.getSampleSum()); + + // validate the key and value of bucket + double base = Math.pow(2, Math.pow(2, -2)); + + assertTrue(histogramMetric.getBuckets().containsKey(Math.pow(base, 11))); + assertEquals(1, histogramMetric.getBuckets().get(Math.pow(base, 11))); + + assertTrue(histogramMetric.getBuckets().containsKey(Math.pow(base, 12))); + assertEquals(2, histogramMetric.getBuckets().get(Math.pow(base, 12))); + + assertTrue(histogramMetric.getBuckets().containsKey(Double.POSITIVE_INFINITY)); + assertEquals(3, histogramMetric.getBuckets().get(Double.POSITIVE_INFINITY)); + + assertTrue(histogramMetric.getBuckets().containsKey(-Math.pow(base, 15))); + assertEquals(3, histogramMetric.getBuckets().get(-Math.pow(base, 15))); + + assertTrue(histogramMetric.getBuckets().containsKey(-Math.pow(base, 16))); + assertEquals(1, histogramMetric.getBuckets().get(-Math.pow(base, 16))); + + assertTrue(histogramMetric.getBuckets().containsKey(-Math.pow(base, 17))); + assertEquals(2, histogramMetric.getBuckets().get(-Math.pow(base, 17))); + } +}