未验证 提交 b991cfd1 编写于 作者: P pg.yang 提交者: GitHub

Polish aws-firehose-receiver to adapt existing OTEL proto (#10343)

上级 b8e4a025
......@@ -220,6 +220,7 @@ The text of each license is the standard Apache 2.0 license.
proto files from opencensus: https://github.com/census-instrumentation/opencensus-proto/tree/master/gen-go Apache 2.0
proto files from prometheus/client_model: https://github.com/prometheus/client_model Apache 2.0
proto files from opentelemetry: https://github.com/open-telemetry/opentelemetry-proto/tree/main/opentelemetry/proto Apache 2.0
proto files from opentelemetry: https://github.com/open-telemetry/opentelemetry-proto/tree/v0.7.0 Apache 2.0
flatbuffers files from istio/proxy: https://github.com/istio/proxy Apache 2.0
mvnw files from https://github.com/apache/maven-wrapper Apache 2.0
svg files from skywalking-ui/src/assets/icons: https://github.com/google/material-design-icons Apache 2.0
......
......@@ -5,7 +5,7 @@ You could leverage the receiver to collect [AWS CloudWatch metrics](https://docs
## Setup(S3 example)
1. Create CloudWatch metrics configuration for S3 (refer to [S3 CloudWatch metrics](https://docs.aws.amazon.com/AmazonS3/latest/userguide/configure-request-metrics-bucket.html))
1. Create CloudWatch metrics configuration for S3 (refer to [S3 CloudWatch metrics](https://docs.aws.amazon.com/AmazonS3/latest/userguide/configure-request-metrics-bucket.html))
2. Stream CloudWatch metrics to AWS Kinesis Data Firehose delivery stream by [CloudWatch metrics stream](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-metric-streams-setup-datalake.html)
3. Specify AWS Kinesis Data Firehose delivery stream HTTP Endpoint (refer to [Choose HTTP Endpoint for Your Destination](https://docs.aws.amazon.com/firehose/latest/dev/create-destination.html#create-destination-http))
......
......@@ -38,4 +38,35 @@
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>${protobuf-maven-plugin.version}</version>
<configuration>
<!--
The version of protoc must match protobuf-java. If you don't depend on
protobuf-java directly, you will be transitively depending on the
protobuf-java version that grpc depends on.
-->
<protocArtifact>com.google.protobuf:protoc:3.19.2:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.42.1:exe:${os.detected.classifier}
</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
......@@ -23,7 +23,7 @@ import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.server.annotation.ConsumesJson;
import com.linecorp.armeria.server.annotation.Post;
import com.linecorp.armeria.server.annotation.ProducesJson;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.metrics.firehose.v0_7.ExportMetricsServiceRequest;
import java.io.ByteArrayInputStream;
import java.util.Base64;
import lombok.AllArgsConstructor;
......@@ -33,7 +33,6 @@ import org.apache.skywalking.oap.server.receiver.otel.otlp.OpenTelemetryMetricRe
@Slf4j
@AllArgsConstructor
public class FirehoseHTTPHandler {
private final OpenTelemetryMetricRequestProcessor openTelemetryMetricRequestProcessor;
@Post("/aws/firehose/metrics")
......@@ -46,7 +45,8 @@ public class FirehoseHTTPHandler {
Base64.getDecoder().decode(record.getData()));
ExportMetricsServiceRequest request;
while ((request = ExportMetricsServiceRequest.parseDelimitedFrom(byteArrayInputStream)) != null) {
openTelemetryMetricRequestProcessor.processMetricsRequest(request);
openTelemetryMetricRequestProcessor.processMetricsRequest(
OtelMetricsConvertor.convertExportMetricsRequest(request));
}
}
} catch (InvalidProtocolBufferException e) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.server.receiver.aws.firehose;
import io.opentelemetry.proto.collector.metrics.firehose.v0_7.ExportMetricsServiceRequest;
import io.opentelemetry.proto.common.firehose.v0_7.ArrayValue;
import io.opentelemetry.proto.common.firehose.v0_7.KeyValue;
import io.opentelemetry.proto.common.firehose.v0_7.KeyValueList;
import io.opentelemetry.proto.common.firehose.v0_7.StringKeyValue;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.metrics.firehose.v0_7.DoubleDataPoint;
import io.opentelemetry.proto.metrics.firehose.v0_7.DoubleGauge;
import io.opentelemetry.proto.metrics.firehose.v0_7.DoubleHistogram;
import io.opentelemetry.proto.metrics.firehose.v0_7.DoubleHistogramDataPoint;
import io.opentelemetry.proto.metrics.firehose.v0_7.DoubleSum;
import io.opentelemetry.proto.metrics.firehose.v0_7.DoubleSummary;
import io.opentelemetry.proto.metrics.firehose.v0_7.DoubleSummaryDataPoint;
import io.opentelemetry.proto.metrics.firehose.v0_7.InstrumentationLibraryMetrics;
import io.opentelemetry.proto.metrics.firehose.v0_7.IntDataPoint;
import io.opentelemetry.proto.metrics.firehose.v0_7.IntGauge;
import io.opentelemetry.proto.metrics.firehose.v0_7.IntHistogram;
import io.opentelemetry.proto.metrics.firehose.v0_7.IntHistogramDataPoint;
import io.opentelemetry.proto.metrics.firehose.v0_7.IntSum;
import io.opentelemetry.proto.metrics.firehose.v0_7.Metric;
import io.opentelemetry.proto.metrics.firehose.v0_7.ResourceMetrics;
import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
import io.opentelemetry.proto.metrics.v1.DataPointFlags;
import io.opentelemetry.proto.metrics.v1.Gauge;
import io.opentelemetry.proto.metrics.v1.Histogram;
import io.opentelemetry.proto.metrics.v1.HistogramDataPoint;
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
import io.opentelemetry.proto.metrics.v1.Sum;
import io.opentelemetry.proto.metrics.v1.Summary;
import io.opentelemetry.proto.metrics.v1.SummaryDataPoint;
import io.opentelemetry.proto.resource.v1.Resource;
import java.util.Optional;
public class OtelMetricsConvertor {
public static io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest convertExportMetricsRequest(
ExportMetricsServiceRequest sourceRequest) {
io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest.Builder targetRequestBuilder = io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest.newBuilder();
for (ResourceMetrics resourceMetrics : sourceRequest.getResourceMetricsList()) {
targetRequestBuilder.addResourceMetrics(convertResourceMetrics(resourceMetrics));
}
return targetRequestBuilder.build();
}
private static io.opentelemetry.proto.metrics.v1.ResourceMetrics convertResourceMetrics(final ResourceMetrics resourceMetrics) {
io.opentelemetry.proto.metrics.v1.ResourceMetrics.Builder targetResourceMetricsBuilder = io.opentelemetry.proto.metrics.v1.ResourceMetrics.newBuilder();
targetResourceMetricsBuilder.setResource(convertResource(resourceMetrics.getResource()));
for (InstrumentationLibraryMetrics instrumentationLibraryMetrics : resourceMetrics.getInstrumentationLibraryMetricsList()) {
targetResourceMetricsBuilder.addScopeMetrics(convertScopeMetrics(instrumentationLibraryMetrics));
}
return targetResourceMetricsBuilder.build();
}
private static ScopeMetrics convertScopeMetrics(final InstrumentationLibraryMetrics instrumentationLibraryMetrics) {
final ScopeMetrics.Builder builder = ScopeMetrics.newBuilder();
for (Metric metric : instrumentationLibraryMetrics.getMetricsList()) {
builder.addMetrics(convertMetrics(metric));
}
return builder.build();
}
private static io.opentelemetry.proto.metrics.v1.Metric convertMetrics(final Metric metric) {
final io.opentelemetry.proto.metrics.v1.Metric.Builder builder = io.opentelemetry.proto.metrics.v1.Metric.newBuilder();
builder.setDescription(metric.getDescription());
builder.setDescriptionBytes(metric.getDescriptionBytes());
builder.setName(metric.getName());
builder.setNameBytes(metric.getNameBytes());
builder.setUnit(metric.getUnit());
builder.setUnitBytes(metric.getUnitBytes());
Optional.of(metric.getDoubleGauge())
.map(OtelMetricsConvertor::convertDoubleGauge)
.ifPresent(builder::setGauge);
Optional.of(metric.getIntGauge()).map(OtelMetricsConvertor::convertIntGauge).ifPresent(builder::setGauge);
Optional.of(metric.getDoubleHistogram())
.map(OtelMetricsConvertor::convertDoubleHistogram)
.ifPresent(builder::setHistogram);
Optional.of(metric.getIntHistogram())
.map(OtelMetricsConvertor::convertIntHistogram)
.ifPresent(builder::setHistogram);
Optional.of(metric.getDoubleSum()).map(OtelMetricsConvertor::convertDoubleSum).ifPresent(builder::setSum);
Optional.of(metric.getIntSum()).map(OtelMetricsConvertor::convertIntSum).ifPresent(builder::setSum);
Optional.of(metric.getDoubleSummary())
.map(OtelMetricsConvertor::convertDoubleSummary)
.ifPresent(builder::setSummary);
return builder.build();
}
private static Summary convertDoubleSummary(final DoubleSummary doubleSummary) {
final Summary.Builder builder = Summary.newBuilder();
doubleSummary.getDataPointsList()
.stream()
.map(OtelMetricsConvertor::convertDoubleSummaryDatapoint)
.forEach(builder::addDataPoints);
return builder.build();
}
private static SummaryDataPoint convertDoubleSummaryDatapoint(final DoubleSummaryDataPoint doubleSummaryDataPoint) {
final SummaryDataPoint.Builder builder = SummaryDataPoint.newBuilder();
doubleSummaryDataPoint.getLabelsList()
.stream()
.map(OtelMetricsConvertor::convertStringKV)
.forEach(builder::addAttributes);
builder.setCount(doubleSummaryDataPoint.getCount());
builder.setSum(doubleSummaryDataPoint.getSum());
builder.setFlags(DataPointFlags.FLAG_NONE_VALUE);
doubleSummaryDataPoint.getQuantileValuesList()
.stream()
.map(OtelMetricsConvertor::convertValueAtQuantile)
.forEach(builder::addQuantileValues);
builder.setStartTimeUnixNano(doubleSummaryDataPoint.getStartTimeUnixNano());
builder.setTimeUnixNano(doubleSummaryDataPoint.getTimeUnixNano());
return builder.build();
}
private static SummaryDataPoint.ValueAtQuantile convertValueAtQuantile(final DoubleSummaryDataPoint.ValueAtQuantile valueAtQuantile) {
final SummaryDataPoint.ValueAtQuantile.Builder builder = SummaryDataPoint.ValueAtQuantile.newBuilder();
builder.setValue(valueAtQuantile.getValue());
builder.setQuantile(valueAtQuantile.getQuantile());
return builder.build();
}
private static Sum convertIntSum(final IntSum intSum) {
final Sum.Builder builder = Sum.newBuilder();
intSum.getDataPointsList()
.stream()
.map(OtelMetricsConvertor::convertIntDataPoint)
.forEach(builder::addDataPoints);
return builder.build();
}
private static Sum convertDoubleSum(final DoubleSum doubleSum) {
final Sum.Builder builder = Sum.newBuilder();
doubleSum.getDataPointsList()
.stream()
.map(OtelMetricsConvertor::convertDoubleDataPoint)
.forEach(builder::addDataPoints);
return builder.build();
}
/**
* Convert DoubleDataPoint in OTEL 0.7 to NumberDataPoint
* Notice this method ignore Exemplar field in HistogramDataPoint
*/
private static NumberDataPoint convertDoubleDataPoint(final DoubleDataPoint doubleDataPoint) {
final NumberDataPoint.Builder builder = NumberDataPoint.newBuilder();
doubleDataPoint.getLabelsList()
.stream()
.map(OtelMetricsConvertor::convertStringKV)
.forEach(builder::addAttributes);
builder.setTimeUnixNano(doubleDataPoint.getTimeUnixNano());
builder.setStartTimeUnixNano(doubleDataPoint.getStartTimeUnixNano());
builder.setAsDouble(doubleDataPoint.getValue());
return builder.build();
}
private static NumberDataPoint convertIntDataPoint(final IntDataPoint intDataPoint) {
final NumberDataPoint.Builder builder = NumberDataPoint.newBuilder();
intDataPoint.getLabelsList()
.stream()
.map(OtelMetricsConvertor::convertStringKV)
.forEach(builder::addAttributes);
builder.setTimeUnixNano(intDataPoint.getTimeUnixNano());
builder.setStartTimeUnixNano(intDataPoint.getStartTimeUnixNano());
builder.setAsInt(intDataPoint.getValue());
return builder.build();
}
private static io.opentelemetry.proto.common.v1.KeyValue convertStringKV(final StringKeyValue stringKeyValue) {
return io.opentelemetry.proto.common.v1.KeyValue.newBuilder()
.setKey(stringKeyValue.getKey())
.setValue(
AnyValue.newBuilder()
.setStringValue(stringKeyValue.getValue()))
.build();
}
private static Histogram convertIntHistogram(final IntHistogram intHistogram) {
final Histogram.Builder builder = Histogram.newBuilder();
builder.setAggregationTemporality(convertAggregationTemporality(intHistogram.getAggregationTemporality()));
builder.setAggregationTemporalityValue(intHistogram.getAggregationTemporalityValue());
intHistogram.getDataPointsList()
.stream()
.map(OtelMetricsConvertor::convertIntHistogramDataPoint)
.forEach(builder::addDataPoints);
return builder.build();
}
/**
* Convert IntHistogramDataPoint in OTEL 0.7 to HistogramDataPoint
* Notice this method ignore min, max, Exemplar fields in HistogramDataPoint
*/
private static HistogramDataPoint convertIntHistogramDataPoint(final IntHistogramDataPoint intHistogramDataPoint) {
final HistogramDataPoint.Builder builder = HistogramDataPoint.newBuilder();
intHistogramDataPoint.getLabelsList()
.stream()
.map(OtelMetricsConvertor::convertStringKV)
.forEach(builder::addAttributes);
builder.setCount(intHistogramDataPoint.getCount());
builder.setSum(intHistogramDataPoint.getSum());
builder.setStartTimeUnixNano(intHistogramDataPoint.getStartTimeUnixNano());
builder.setTimeUnixNano(intHistogramDataPoint.getTimeUnixNano());
builder.addBucketCounts(intHistogramDataPoint.getBucketCountsCount());
builder.setFlags(DataPointFlags.FLAG_NONE_VALUE);
builder.addAllExplicitBounds(intHistogramDataPoint.getExplicitBoundsList());
return builder.build();
}
/**
* Convert IntHistogramDataPoint in OTEL 0.7 to HistogramDataPoint
* Notice this method ignore min, max, Exemplar fields in HistogramDataPoint
*/
private static HistogramDataPoint convertDoubleHistogramDataPoint(final DoubleHistogramDataPoint intHistogramDataPoint) {
final HistogramDataPoint.Builder builder = HistogramDataPoint.newBuilder();
intHistogramDataPoint.getLabelsList()
.stream()
.map(OtelMetricsConvertor::convertStringKV)
.forEach(builder::addAttributes);
builder.setCount(intHistogramDataPoint.getCount());
builder.setSum(intHistogramDataPoint.getSum());
builder.setStartTimeUnixNano(intHistogramDataPoint.getStartTimeUnixNano());
builder.setTimeUnixNano(intHistogramDataPoint.getTimeUnixNano());
builder.addBucketCounts(intHistogramDataPoint.getBucketCountsCount());
builder.setFlags(DataPointFlags.FLAG_NONE_VALUE);
builder.addAllExplicitBounds(intHistogramDataPoint.getExplicitBoundsList());
return builder.build();
}
private static AggregationTemporality convertAggregationTemporality(final io.opentelemetry.proto.metrics.firehose.v0_7.AggregationTemporality aggregationTemporality) {
if (aggregationTemporality == io.opentelemetry.proto.metrics.firehose.v0_7.AggregationTemporality.AGGREGATION_TEMPORALITY_UNSPECIFIED) {
return AggregationTemporality.AGGREGATION_TEMPORALITY_UNSPECIFIED;
}
if (aggregationTemporality == io.opentelemetry.proto.metrics.firehose.v0_7.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE) {
return AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE;
}
if (aggregationTemporality == io.opentelemetry.proto.metrics.firehose.v0_7.AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA) {
return AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA;
}
throw new UnsupportedOperationException("Can't convert " + aggregationTemporality);
}
private static Histogram convertDoubleHistogram(final DoubleHistogram doubleHistogram) {
final Histogram.Builder builder = Histogram.newBuilder();
builder.setAggregationTemporality(convertAggregationTemporality(doubleHistogram.getAggregationTemporality()));
builder.setAggregationTemporalityValue(doubleHistogram.getAggregationTemporalityValue());
doubleHistogram.getDataPointsList()
.stream()
.map(OtelMetricsConvertor::convertDoubleHistogramDataPoint)
.forEach(builder::addDataPoints);
return builder.build();
}
private static Gauge convertIntGauge(final IntGauge intGauge) {
final Gauge.Builder builder = Gauge.newBuilder();
intGauge.getDataPointsList()
.stream()
.map(OtelMetricsConvertor::convertIntDataPoint)
.forEach(builder::addDataPoints);
return builder.build();
}
private static Gauge convertDoubleGauge(final DoubleGauge doubleGauge) {
final Gauge.Builder builder = Gauge.newBuilder();
doubleGauge.getDataPointsList()
.stream()
.map(OtelMetricsConvertor::convertDoubleDataPoint)
.forEach(builder::addDataPoints);
return builder.build();
}
private static Resource convertResource(final io.opentelemetry.proto.resource.firehose.v0_7.Resource resource) {
final Resource.Builder builder = Resource.newBuilder();
for (KeyValue keyValue : resource.getAttributesList()) {
builder.addAttributes(convertKeyValue(keyValue));
}
return builder.build();
}
private static AnyValue convertAnyValue(final io.opentelemetry.proto.common.firehose.v0_7.AnyValue value) {
final AnyValue.Builder builder = AnyValue.newBuilder();
if (value.hasBoolValue()) {
builder.setBoolValue(value.getBoolValue());
}
if (value.hasDoubleValue()) {
builder.setDoubleValue(value.getDoubleValue());
}
if (value.hasIntValue()) {
builder.setIntValue(value.getIntValue());
}
if (value.hasStringValue()) {
builder.setStringValue(value.getStringValue());
}
if (value.hasArrayValue()) {
builder.setArrayValue(convertValuList(value.getArrayValue()));
}
if (value.hasKvlistValue()) {
builder.setKvlistValue(convertKvlistValue(value.getKvlistValue()));
}
return builder.build();
}
private static io.opentelemetry.proto.common.v1.KeyValueList convertKvlistValue(final KeyValueList keyValueList) {
final io.opentelemetry.proto.common.v1.KeyValueList.Builder builder = io.opentelemetry.proto.common.v1.KeyValueList.newBuilder();
keyValueList.getValuesList().stream().map(OtelMetricsConvertor::convertKeyValue).forEach(builder::addValues);
return builder.build();
}
private static io.opentelemetry.proto.common.v1.KeyValue convertKeyValue(final KeyValue keyValue) {
return io.opentelemetry.proto.common.v1.KeyValue.newBuilder()
.setKey(keyValue.getKey())
.setValue(convertAnyValue(keyValue.getValue()))
.build();
}
private static io.opentelemetry.proto.common.v1.ArrayValue convertValuList(final ArrayValue arrayValue) {
final io.opentelemetry.proto.common.v1.ArrayValue.Builder builder = io.opentelemetry.proto.common.v1.ArrayValue.newBuilder();
arrayValue.getValuesList().stream().map(OtelMetricsConvertor::convertAnyValue).forEach(builder::addValues);
return builder.build();
}
}
# OpenTelemetry Collector Proto
This package describes the OpenTelemetry collector protocol.
## Packages
1. `common` package contains the common messages shared between different services.
2. `trace` package contains the Trace Service protos.
3. `metrics` package contains the Metrics Service protos.
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package opentelemetry.proto.collector.metrics.v1;
import "opentelemetry/proto/metrics/v1/metrics.proto";
option java_multiple_files = true;
option java_package = "io.opentelemetry.proto.collector.metrics.firehose.v0_7";
option java_outer_classname = "MetricsServiceProto";
option go_package = "github.com/open-telemetry/opentelemetry-proto/gen/go/collector/metrics/v1";
// Service that can be used to push metrics between one Application
// instrumented with OpenTelemetry and a collector, or between a collector and a
// central collector.
service MetricsService {
// For performance reasons, it is recommended to keep this RPC
// alive for the entire life of the application.
rpc Export(ExportMetricsServiceRequest) returns (ExportMetricsServiceResponse) {}
}
message ExportMetricsServiceRequest {
// An array of ResourceMetrics.
// For data coming from a single resource this array will typically contain one
// element. Intermediary nodes (such as OpenTelemetry Collector) that receive
// data from multiple origins typically batch the data before forwarding further and
// in that case this array will contain multiple elements.
repeated opentelemetry.proto.metrics.v1.ResourceMetrics resource_metrics = 1;
}
message ExportMetricsServiceResponse {
}
# This is an API configuration to generate an HTTP/JSON -> gRPC gateway for the
# OpenTelemetry service using github.com/grpc-ecosystem/grpc-gateway.
type: google.api.Service
config_version: 3
http:
rules:
- selector: opentelemetry.proto.collector.metrics.v1.MetricsService.Export
post: /v1/metrics
body: "*"
\ No newline at end of file
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package opentelemetry.proto.common.v1;
option java_multiple_files = true;
option java_package = "io.opentelemetry.proto.common.firehose.v0_7";
option java_outer_classname = "CommonProto";
option go_package = "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1";
// AnyValue is used to represent any type of attribute value. AnyValue may contain a
// primitive value such as a string or integer or it may contain an arbitrary nested
// object containing arrays, key-value lists and primitives.
message AnyValue {
// The value is one of the listed fields. It is valid for all values to be unspecified
// in which case this AnyValue is considered to be "null".
oneof value {
string string_value = 1;
bool bool_value = 2;
int64 int_value = 3;
double double_value = 4;
ArrayValue array_value = 5;
KeyValueList kvlist_value = 6;
}
}
// ArrayValue is a list of AnyValue messages. We need ArrayValue as a message
// since oneof in AnyValue does not allow repeated fields.
message ArrayValue {
// Array of values. The array may be empty (contain 0 elements).
repeated AnyValue values = 1;
}
// KeyValueList is a list of KeyValue messages. We need KeyValueList as a message
// since `oneof` in AnyValue does not allow repeated fields. Everywhere else where we need
// a list of KeyValue messages (e.g. in Span) we use `repeated KeyValue` directly to
// avoid unnecessary extra wrapping (which slows down the protocol). The 2 approaches
// are semantically equivalent.
message KeyValueList {
// A collection of key/value pairs of key-value pairs. The list may be empty (may
// contain 0 elements).
repeated KeyValue values = 1;
}
// KeyValue is a key-value pair that is used to store Span attributes, Link
// attributes, etc.
message KeyValue {
string key = 1;
AnyValue value = 2;
}
// StringKeyValue is a pair of key/value strings. This is the simpler (and faster) version
// of KeyValue that only supports string values.
message StringKeyValue {
string key = 1;
string value = 2;
}
// InstrumentationLibrary is a message representing the instrumentation library information
// such as the fully qualified name and version.
message InstrumentationLibrary {
// An empty instrumentation library name means the name is unknown.
string name = 1;
string version = 2;
}
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package opentelemetry.proto.metrics.experimental;
import "opentelemetry/proto/resource/v1/resource.proto";
option java_multiple_files = true;
option java_package = "io.opentelemetry.proto.metrics.firehose.v0_7.experimental";
option java_outer_classname = "MetricConfigServiceProto";
option go_package = "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/experimental";
// MetricConfig is a service that enables updating metric schedules, trace
// parameters, and other configurations on the SDK without having to restart the
// instrumented application. The collector can also serve as the configuration
// service, acting as a bridge between third-party configuration services and
// the SDK, piping updated configs from a third-party source to an instrumented
// application.
service MetricConfig {
rpc GetMetricConfig (MetricConfigRequest) returns (MetricConfigResponse);
}
message MetricConfigRequest{
// Required. The resource for which configuration should be returned.
opentelemetry.proto.resource.v1.Resource resource = 1;
// Optional. The value of MetricConfigResponse.fingerprint for the last
// configuration that the caller received and successfully applied.
bytes last_known_fingerprint = 2;
}
message MetricConfigResponse {
// Optional. The fingerprint associated with this MetricConfigResponse. Each
// change in configs yields a different fingerprint. The resource SHOULD copy
// this value to MetricConfigRequest.last_known_fingerprint for the next
// configuration request. If there are no changes between fingerprint and
// MetricConfigRequest.last_known_fingerprint, then all other fields besides
// fingerprint in the response are optional, or the same as the last update if
// present.
//
// The exact mechanics of generating the fingerprint is up to the
// implementation. However, a fingerprint must be deterministically determined
// by the configurations -- the same configuration will generate the same
// fingerprint on any instance of an implementation. Hence using a timestamp is
// unacceptable, but a deterministic hash is fine.
bytes fingerprint = 1;
// A Schedule is used to apply a particular scheduling configuration to
// a metric. If a metric name matches a schedule's patterns, then the metric
// adopts the configuration specified by the schedule.
message Schedule {
// A light-weight pattern that can match 1 or more
// metrics, for which this schedule will apply. The string is used to
// match against metric names. It should not exceed 100k characters.
message Pattern {
oneof match {
string equals = 1; // matches the metric name exactly
string starts_with = 2; // prefix-matches the metric name
}
}
// Metrics with names that match a rule in the inclusion_patterns are
// targeted by this schedule. Metrics that match the exclusion_patterns
// are not targeted for this schedule, even if they match an inclusion
// pattern.
repeated Pattern exclusion_patterns = 1;
repeated Pattern inclusion_patterns = 2;
// Describes the collection period for each metric in seconds.
// A period of 0 means to not export.
int32 period_sec = 3;
}
// A single metric may match multiple schedules. In such cases, the schedule
// that specifies the smallest period is applied.
//
// Note, for optimization purposes, it is recommended to use as few schedules
// as possible to capture all required metric updates. Where you can be
// conservative, do take full advantage of the inclusion/exclusion patterns to
// capture as much of your targeted metrics.
repeated Schedule schedules = 2;
// Optional. The client is suggested to wait this long (in seconds) before
// pinging the configuration service again.
int32 suggested_wait_time_sec = 3;
}
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package opentelemetry.proto.resource.v1;
import "opentelemetry/proto/common/v1/common.proto";
option java_multiple_files = true;
option java_package = "io.opentelemetry.proto.resource.firehose.v0_7";
option java_outer_classname = "ResourceProto";
option go_package = "github.com/open-telemetry/opentelemetry-proto/gen/go/resource/v1";
// Resource information.
message Resource {
// Set of labels that describe the resource.
repeated opentelemetry.proto.common.v1.KeyValue attributes = 1;
// dropped_attributes_count is the number of dropped attributes. If the value is 0, then
// no attributes were dropped.
uint32 dropped_attributes_count = 2;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.server.receiver.aws.firehose;
import com.google.gson.Gson;
import com.google.protobuf.util.JsonFormat;
import io.opentelemetry.proto.collector.metrics.firehose.v0_7.ExportMetricsServiceRequest;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import org.junit.Assert;
import org.junit.Test;
public class OtelMetricsConvertorTest {
@Test
public void test() throws IOException {
for (TestData testData : findTestData()) {
io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest request = convertSource(
testData.getSourceFile());
String str = JsonFormat.printer().print(request);
final Map convertedData = new Gson().fromJson(str, Map.class);
final Map expect = new Gson().fromJson(
new String(Files.readAllBytes(testData.getExpectFile().toPath())), Map.class);
Assert.assertEquals(
String.format("diff , %s -> %s", testData.getSourceFile(), testData.getExpectFile()),
expect,
convertedData
);
System.out.printf("test pass %s -> %s %n", testData.getSourceFile(), testData.getExpectFile());
}
}
private io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest convertSource(final File sourceFile) throws IOException {
String source = new String(Files.readAllBytes(sourceFile.toPath()));
final ExportMetricsServiceRequest.Builder builder = ExportMetricsServiceRequest.newBuilder();
JsonFormat.parser().merge(source, builder);
return OtelMetricsConvertor.convertExportMetricsRequest(
builder.build());
}
private List<TestData> findTestData() {
List<TestData> res = new ArrayList<>();
Path resourceDirectory = Paths.get("src", "test", "resources", "convertor-test-data");
final File[] subFiles = resourceDirectory.toFile().listFiles(File::isDirectory);
if (subFiles == null) {
return res;
}
for (File subFile : subFiles) {
File sourceFile = new File(subFile.getAbsolutePath(), "source.json");
File expectFile = new File(subFile.getAbsolutePath(), "expect.json");
res.add(new TestData(sourceFile, expectFile));
}
return res;
}
@Getter
@Setter
@AllArgsConstructor
private static class TestData {
// OTEL 0.7.0
private File sourceFile;
private File expectFile;
}
}
{
"resourceMetrics": [
{
"resource": {
"attributes": [
{
"key": "cloud.provider",
"value": {
"stringValue": "aws"
}
},
{
"key": "cloud.account.id",
"value": {
"stringValue": "xxxxxxxx"
}
},
{
"key": "cloud.region",
"value": {
"stringValue": "ap-northeast-1"
}
},
{
"key": "aws.exporter.arn",
"value": {
"stringValue": "arn:aws:cloudwatch:ap-northeast-1:xxxxxxxx:metric-stream/CustomFull-CmHV2P"
}
}
]
},
"scopeMetrics": [
{
"metrics": [
{
"name": "amazonaws.com/AWS/S3/4xxErrors",
"unit": "{Count}",
"summary": {
"dataPoints": [
{
"startTimeUnixNano": "1674547500000000000",
"timeUnixNano": "1674547560000000000",
"count": "1",
"quantileValues": [
{
},
{
"quantile": 1.0
}
],
"attributes": [
{
"key": "Namespace",
"value": {
"stringValue": "AWS/S3"
}
},
{
"key": "MetricName",
"value": {
"stringValue": "4xxErrors"
}
},
{
"key": "BucketName",
"value": {
"stringValue": "skywalking"
}
},
{
"key": "FilterId",
"value": {
"stringValue": "test"
}
}
]
}
]
}
},
{
"name": "amazonaws.com/AWS/S3/5xxErrors",
"unit": "{Count}",
"summary": {
"dataPoints": [
{
"startTimeUnixNano": "1674547500000000000",
"timeUnixNano": "1674547560000000000",
"count": "1",
"quantileValues": [
{
},
{
"quantile": 1.0
}
],
"attributes": [
{
"key": "Namespace",
"value": {
"stringValue": "AWS/S3"
}
},
{
"key": "MetricName",
"value": {
"stringValue": "5xxErrors"
}
},
{
"key": "BucketName",
"value": {
"stringValue": "skywalking"
}
},
{
"key": "FilterId",
"value": {
"stringValue": "test"
}
}
]
}
]
}
}
]
}
]
}
]
}
{
"resourceMetrics": [
{
"resource": {
"attributes": [
{
"key": "cloud.provider",
"value": {
"stringValue": "aws"
}
},
{
"key": "cloud.account.id",
"value": {
"stringValue": "xxxxxxxx"
}
},
{
"key": "cloud.region",
"value": {
"stringValue": "ap-northeast-1"
}
},
{
"key": "aws.exporter.arn",
"value": {
"stringValue": "arn:aws:cloudwatch:ap-northeast-1:xxxxxxxx:metric-stream/CustomFull-CmHV2P"
}
}
]
},
"instrumentationLibraryMetrics": [
{
"metrics": [
{
"name": "amazonaws.com/AWS/S3/4xxErrors",
"unit": "{Count}",
"doubleSummary": {
"dataPoints": [
{
"labels": [
{
"key": "Namespace",
"value": "AWS/S3"
},
{
"key": "MetricName",
"value": "4xxErrors"
},
{
"key": "BucketName",
"value": "skywalking"
},
{
"key": "FilterId",
"value": "test"
}
],
"startTimeUnixNano": "1674547500000000000",
"timeUnixNano": "1674547560000000000",
"count": "1",
"quantileValues": [
{
},
{
"quantile": 1.0
}
]
}
]
}
},
{
"name": "amazonaws.com/AWS/S3/5xxErrors",
"unit": "{Count}",
"doubleSummary": {
"dataPoints": [
{
"labels": [
{
"key": "Namespace",
"value": "AWS/S3"
},
{
"key": "MetricName",
"value": "5xxErrors"
},
{
"key": "BucketName",
"value": "skywalking"
},
{
"key": "FilterId",
"value": "test"
}
],
"startTimeUnixNano": "1674547500000000000",
"timeUnixNano": "1674547560000000000",
"count": "1",
"quantileValues": [
{
},
{
"quantile": 1.0
}
]
}
]
}
}
]
}
]
}
]
}
{
"resourceMetrics": [
{
"resource": {
"attributes": [
{
"key": "cloud.provider",
"value": {
"stringValue": "aws"
}
},
{
"key": "cloud.account.id",
"value": {
"stringValue": "xxxxxxxx"
}
},
{
"key": "cloud.region",
"value": {
"stringValue": "ap-northeast-1"
}
},
{
"key": "aws.exporter.arn",
"value": {
"stringValue": "arn:aws:cloudwatch:ap-northeast-1:xxxxxxxx:metric-stream/CustomFull-CmHV2P"
}
}
]
},
"scopeMetrics": [
{
"metrics": [
{
"name": "amazonaws.com/AWS/S3/BytesDownloaded",
"unit": "By",
"summary": {
"dataPoints": [
{
"startTimeUnixNano": "1674547500000000000",
"timeUnixNano": "1674547560000000000",
"count": "1",
"sum": 166199.0,
"quantileValues": [
{
"value": 166199.0
},
{
"quantile": 1.0,
"value": 166199.0
}
],
"attributes": [
{
"key": "Namespace",
"value": {
"stringValue": "AWS/S3"
}
},
{
"key": "MetricName",
"value": {
"stringValue": "BytesDownloaded"
}
},
{
"key": "BucketName",
"value": {
"stringValue": "skywalking"
}
},
{
"key": "FilterId",
"value": {
"stringValue": "test"
}
}
]
}
]
}
},
{
"name": "amazonaws.com/AWS/S3/TotalRequestLatency",
"unit": "ms",
"summary": {
"dataPoints": [
{
"startTimeUnixNano": "1674547500000000000",
"timeUnixNano": "1674547560000000000",
"count": "1",
"sum": 66.0,
"quantileValues": [
{
"value": 66.0
},
{
"quantile": 1.0,
"value": 66.0
}
],
"attributes": [
{
"key": "Namespace",
"value": {
"stringValue": "AWS/S3"
}
},
{
"key": "MetricName",
"value": {
"stringValue": "TotalRequestLatency"
}
},
{
"key": "BucketName",
"value": {
"stringValue": "skywalking"
}
},
{
"key": "FilterId",
"value": {
"stringValue": "test"
}
}
]
}
]
}
},
{
"name": "amazonaws.com/AWS/S3/FirstByteLatency",
"unit": "ms",
"summary": {
"dataPoints": [
{
"startTimeUnixNano": "1674547500000000000",
"timeUnixNano": "1674547560000000000",
"count": "1",
"sum": 64.0,
"quantileValues": [
{
"value": 64.0
},
{
"quantile": 1.0,
"value": 64.0
}
],
"attributes": [
{
"key": "Namespace",
"value": {
"stringValue": "AWS/S3"
}
},
{
"key": "MetricName",
"value": {
"stringValue": "FirstByteLatency"
}
},
{
"key": "BucketName",
"value": {
"stringValue": "skywalking"
}
},
{
"key": "FilterId",
"value": {
"stringValue": "test"
}
}
]
}
]
}
},
{
"name": "amazonaws.com/AWS/S3/GetRequests",
"unit": "{Count}",
"summary": {
"dataPoints": [
{
"startTimeUnixNano": "1674547500000000000",
"timeUnixNano": "1674547560000000000",
"count": "1",
"sum": 1.0,
"quantileValues": [
{
"value": 1.0
},
{
"quantile": 1.0,
"value": 1.0
}
],
"attributes": [
{
"key": "Namespace",
"value": {
"stringValue": "AWS/S3"
}
},
{
"key": "MetricName",
"value": {
"stringValue": "GetRequests"
}
},
{
"key": "BucketName",
"value": {
"stringValue": "skywalking"
}
},
{
"key": "FilterId",
"value": {
"stringValue": "test"
}
}
]
}
]
}
},
{
"name": "amazonaws.com/AWS/S3/AllRequests",
"unit": "{Count}",
"summary": {
"dataPoints": [
{
"startTimeUnixNano": "1674547500000000000",
"timeUnixNano": "1674547560000000000",
"count": "1",
"sum": 1.0,
"quantileValues": [
{
"value": 1.0
},
{
"quantile": 1.0,
"value": 1.0
}
],
"attributes": [
{
"key": "Namespace",
"value": {
"stringValue": "AWS/S3"
}
},
{
"key": "MetricName",
"value": {
"stringValue": "AllRequests"
}
},
{
"key": "BucketName",
"value": {
"stringValue": "skywalking"
}
},
{
"key": "FilterId",
"value": {
"stringValue": "test"
}
}
]
}
]
}
}
]
}
]
}
]
}
{
"resourceMetrics": [
{
"resource": {
"attributes": [
{
"key": "cloud.provider",
"value": {
"stringValue": "aws"
}
},
{
"key": "cloud.account.id",
"value": {
"stringValue": "xxxxxxxx"
}
},
{
"key": "cloud.region",
"value": {
"stringValue": "ap-northeast-1"
}
},
{
"key": "aws.exporter.arn",
"value": {
"stringValue": "arn:aws:cloudwatch:ap-northeast-1:xxxxxxxx:metric-stream/CustomFull-CmHV2P"
}
}
]
},
"instrumentationLibraryMetrics": [
{
"metrics": [
{
"name": "amazonaws.com/AWS/S3/BytesDownloaded",
"unit": "By",
"doubleSummary": {
"dataPoints": [
{
"labels": [
{
"key": "Namespace",
"value": "AWS/S3"
},
{
"key": "MetricName",
"value": "BytesDownloaded"
},
{
"key": "BucketName",
"value": "skywalking"
},
{
"key": "FilterId",
"value": "test"
}
],
"startTimeUnixNano": "1674547500000000000",
"timeUnixNano": "1674547560000000000",
"count": "1",
"sum": 166199.0,
"quantileValues": [
{
"value": 166199.0
},
{
"quantile": 1.0,
"value": 166199.0
}
]
}
]
}
},
{
"name": "amazonaws.com/AWS/S3/TotalRequestLatency",
"unit": "ms",
"doubleSummary": {
"dataPoints": [
{
"labels": [
{
"key": "Namespace",
"value": "AWS/S3"
},
{
"key": "MetricName",
"value": "TotalRequestLatency"
},
{
"key": "BucketName",
"value": "skywalking"
},
{
"key": "FilterId",
"value": "test"
}
],
"startTimeUnixNano": "1674547500000000000",
"timeUnixNano": "1674547560000000000",
"count": "1",
"sum": 66.0,
"quantileValues": [
{
"value": 66.0
},
{
"quantile": 1.0,
"value": 66.0
}
]
}
]
}
},
{
"name": "amazonaws.com/AWS/S3/FirstByteLatency",
"unit": "ms",
"doubleSummary": {
"dataPoints": [
{
"labels": [
{
"key": "Namespace",
"value": "AWS/S3"
},
{
"key": "MetricName",
"value": "FirstByteLatency"
},
{
"key": "BucketName",
"value": "skywalking"
},
{
"key": "FilterId",
"value": "test"
}
],
"startTimeUnixNano": "1674547500000000000",
"timeUnixNano": "1674547560000000000",
"count": "1",
"sum": 64.0,
"quantileValues": [
{
"value": 64.0
},
{
"quantile": 1.0,
"value": 64.0
}
]
}
]
}
},
{
"name": "amazonaws.com/AWS/S3/GetRequests",
"unit": "{Count}",
"doubleSummary": {
"dataPoints": [
{
"labels": [
{
"key": "Namespace",
"value": "AWS/S3"
},
{
"key": "MetricName",
"value": "GetRequests"
},
{
"key": "BucketName",
"value": "skywalking"
},
{
"key": "FilterId",
"value": "test"
}
],
"startTimeUnixNano": "1674547500000000000",
"timeUnixNano": "1674547560000000000",
"count": "1",
"sum": 1.0,
"quantileValues": [
{
"value": 1.0
},
{
"quantile": 1.0,
"value": 1.0
}
]
}
]
}
},
{
"name": "amazonaws.com/AWS/S3/AllRequests",
"unit": "{Count}",
"doubleSummary": {
"dataPoints": [
{
"labels": [
{
"key": "Namespace",
"value": "AWS/S3"
},
{
"key": "MetricName",
"value": "AllRequests"
},
{
"key": "BucketName",
"value": "skywalking"
},
{
"key": "FilterId",
"value": "test"
}
],
"startTimeUnixNano": "1674547500000000000",
"timeUnixNano": "1674547560000000000",
"count": "1",
"sum": 1.0,
"quantileValues": [
{
"value": 1.0
},
{
"quantile": 1.0,
"value": 1.0
}
]
}
]
}
}
]
}
]
}
]
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册