未验证 提交 29556a0d 编写于 作者: P pg.yang 提交者: GitHub

Add aws-firehose-receiver to support collecting AWS CloudWatch...

Add aws-firehose-receiver to support collecting AWS CloudWatch metric(OpenTelemetry format) (#10300)
上级 e360ca36
......@@ -83,6 +83,7 @@
* [Optional] Optimize single trace query performance by customizing routing in ElasticSearch. SkyWalking trace segments and Zipkin spans are using trace ID for routing. This is OFF by default, controlled by `storage/elasticsearch/enableCustomRouting`.
* Enhance OAP HTTP server to support HTTPS
* Remove handler scan in otel receiver, manual initialization instead
* Add aws-firehose-receiver to support collecting AWS CloudWatch metric(OpenTelemetry format)
#### UI
......
# AWS Firehose receiver
AWS Firehose receiver listens on `0.0.0.0:12801` by default, and provides an HTTP Endpoint `/aws/firehose/metrics` that follows [Amazon Kinesis Data Firehose Delivery Stream HTTP Endpoint Delivery Specifications](https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html)
You could leverage the receiver to collect [AWS CloudWatch metrics](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/working_with_metrics.html), and analysis it through [MAL](../../concepts-and-designs/mal.md) as the receiver bases on [OpenTelemetry receiver](./opentelemetry-receiver.md)
## Setup(S3 example)
1. Create CloudWatch metrics configuration for S3 (refer to [S3 CloudWatch metrics](https://docs.aws.amazon.com/AmazonS3/latest/userguide/configure-request-metrics-bucket.html))
2. Stream CloudWatch metrics to AWS Kinesis Data Firehose delivery stream by [CloudWatch metrics stream](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-metric-streams-setup-datalake.html)
3. Specify AWS Kinesis Data Firehose delivery stream HTTP Endpoint (refer to [Choose HTTP Endpoint for Your Destination](https://docs.aws.amazon.com/firehose/latest/dev/create-destination.html#create-destination-http))
Usually, the [AWS CloudWatch metrics](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/working_with_metrics.html) process flow with OAP is as follows:
```
CloudWatch metrics with S3 --> CloudWatch Metric Stream (OpenTelemetry formart) --> Kinesis Data Firehose Delivery Stream --> AWS Firehose receiver(OAP) --> OpenTelemetry receiver(OAP)
```
## Notice
1. Only OpenTelemetry format is supported (refer to [Metric streams output formats](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-metric-streams-formats.html))
2. Only HTTPS could be accepted, you could directly enable TLS and set the receiver to listen 443, or put the receiver behind a gateway with HTTPS (refer to [Amazon Kinesis Data Firehose Delivery Stream HTTP Endpoint Delivery Specifications](https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html))
......@@ -304,6 +304,16 @@ The Configuration Vocabulary lists all available configurations provided by `app
| health-checker | default | checkIntervalSeconds | The period of checking OAP internal health status (in seconds). | SW_HEALTH_CHECKER_INTERVAL_SECONDS | 5 |
| configuration-discovery | default | disableMessageDigest | If true, agent receives the latest configuration every time, even without making any changes. By default, OAP uses the SHA512 message digest mechanism to detect changes in configuration. | SW_DISABLE_MESSAGE_DIGEST | false |
| receiver-event | default | gRPC services that handle events data. | - | - | |
| aws-firehose-receiver | default | host | Binding IP of HTTP server | SW_RECEIVER_AWS_FIREHOSE_HTTP_HOST | 0.0.0.0 |
| - | - | port | Binding port of HTTP server | SW_RECEIVER_AWS_FIREHOSE_HTTP_PORT | 12801 |
| - | - | contextPath | Context path of HTTP server | SW_RECEIVER_AWS_FIREHOSE_HTTP_CONTEXT_PATH | / |
| - | - | maxThreads | Max Thtread number of HTTP server | SW_RECEIVER_AWS_FIREHOSE_HTTP_MAX_THREADS | 200 |
| - | - | idleTimeOut | Idle timeout of a connection for keep-alive. | SW_RECEIVER_AWS_FIREHOSE_HTTP_IDLE_TIME_OUT | 30000 |
| - | - | acceptQueueSize | Maximum allowed number of open connections | SW_RECEIVER_AWS_FIREHOSE_HTTP_ACCEPT_QUEUE_SIZE | 0 |
| - | - | maxRequestHeaderSize | Maximum length of all headers in an HTTP/1 response | SW_RECEIVER_AWS_FIREHOSE_HTTP_MAX_REQUEST_HEADER_SIZE | 8192 |
| - | - | enableTLS | Indicate if enable HTTPS for the server | SW_RECEIVER_AWS_FIREHOSE_HTTP_ENABLE_TLS | false |
| - | - | tlsKeyPath | TLS key path | SW_RECEIVER_AWS_FIREHOSE_HTTP_TLS_KEY_PATH | |
| - | - | tlsCertChainPath | TLS certificate chain path | SW_RECEIVER_AWS_FIREHOSE_HTTP_TLS_CERT_CHAIN_PATH | |
## Note
......
......@@ -117,6 +117,8 @@ catalog:
path: "/en/guides/backend-oal-scripts"
- name: "OpenTelemetry Metrics"
path: "/en/setup/backend/opentelemetry-receiver"
- name: "AWS CloudWatch Metrics"
path: "/en/setup/backend/aws-firehose-receiver"
- name: "Zabbix Metrics"
path: "/en/setup/backend/backend-zabbix"
- name: "Meter Analysis"
......
......@@ -25,6 +25,7 @@ import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.server.Route;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.docs.DocService;
import com.linecorp.armeria.server.encoding.DecodingService;
import com.linecorp.armeria.server.healthcheck.HealthCheckService;
import com.linecorp.armeria.server.logging.LoggingService;
......@@ -72,8 +73,8 @@ public class HTTPServer implements Server {
}
return delegate.serve(ctx, req);
})
.decorator(DecodingService.newDecorator())
.decorator(LoggingService.newDecorator());
if (config.isEnableTLS()) {
sb.https(new InetSocketAddress(
config.getHost(),
......
......@@ -45,5 +45,4 @@ public class HTTPServerConfig {
private String tlsKeyPath;
private String tlsCertChainPath;
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.skywalking</groupId>
<artifactId>server-receiver-plugin</artifactId>
<version>9.4.0-SNAPSHOT</version>
</parent>
<artifactId>aws-firehose-receiver</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>otel-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
......@@ -16,26 +16,20 @@
*
*/
package org.apache.skywalking.oap.server.receiver.zipkin.handler;
package org.apache.skywalking.oap.server.receiver.aws.firehose;
import com.linecorp.armeria.common.AggregatedHttpRequest;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpHeaderNames;
import com.linecorp.armeria.common.encoding.StreamDecoderFactory;
import com.linecorp.armeria.server.ServiceRequestContext;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
final class UnzippingBytesRequestConverter {
public class AWSFirehoseReceiverModule extends ModuleDefine {
static HttpData convertRequest(ServiceRequestContext ctx, AggregatedHttpRequest request) {
String encoding = request.headers().get(HttpHeaderNames.CONTENT_ENCODING);
HttpData content = request.content();
if (!content.isEmpty() && encoding != null && encoding.contains("gzip")) {
content = StreamDecoderFactory.gzip().newDecoder(ctx.alloc()).decode(content);
if (content.isEmpty()) {
content.close();
throw new IllegalArgumentException("Cannot unzip request content bytes");
}
}
return content;
public static final String NAME = "aws-firehose";
public AWSFirehoseReceiverModule() {
super(NAME);
}
@Override
public Class[] services() {
return new Class[0];
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.server.receiver.aws.firehose;
import lombok.Getter;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
@Getter
public class AWSFirehoseReceiverModuleConfig extends ModuleConfig {
private String host;
private int port;
private String contextPath;
private int maxThreads = 200;
private long idleTimeOut = 30000;
private int acceptQueueSize = 0;
private int maxRequestHeaderSize = 8192;
private boolean enableTLS = false;
private String tlsKeyPath;
private String tlsCertChainPath;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.aws.firehose;
import com.linecorp.armeria.common.HttpMethod;
import java.util.Collections;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.library.server.http.HTTPServer;
import org.apache.skywalking.oap.server.library.server.http.HTTPServerConfig;
import org.apache.skywalking.oap.server.receiver.otel.OtelMetricReceiverModule;
import org.apache.skywalking.oap.server.receiver.otel.otlp.OpenTelemetryMetricRequestProcessor;
public class AWSFirehoseReceiverModuleProvider extends ModuleProvider {
public static final String NAME = "default";
private AWSFirehoseReceiverModuleConfig moduleConfig;
private HTTPServer httpServer;
@Override
public String name() {
return NAME;
}
@Override
public Class<? extends ModuleDefine> module() {
return AWSFirehoseReceiverModule.class;
}
@Override
public ConfigCreator<AWSFirehoseReceiverModuleConfig> newConfigCreator() {
return new ConfigCreator<AWSFirehoseReceiverModuleConfig>() {
@Override
public Class<AWSFirehoseReceiverModuleConfig> type() {
return AWSFirehoseReceiverModuleConfig.class;
}
@Override
public void onInitialized(final AWSFirehoseReceiverModuleConfig initialized) {
moduleConfig = initialized;
}
};
}
@Override
public void prepare() throws ServiceNotProvidedException {
final HTTPServerConfig httpServerConfig = HTTPServerConfig.builder().host(moduleConfig.getHost())
.port(moduleConfig.getPort())
.contextPath(moduleConfig.getContextPath())
.maxThreads(moduleConfig.getMaxThreads())
.idleTimeOut(moduleConfig.getIdleTimeOut())
.acceptQueueSize(moduleConfig.getAcceptQueueSize())
.maxRequestHeaderSize(
moduleConfig.getMaxRequestHeaderSize())
.enableTLS(moduleConfig.isEnableTLS())
.tlsKeyPath(moduleConfig.getTlsKeyPath())
.tlsCertChainPath(moduleConfig.getTlsCertChainPath())
.build();
httpServer = new HTTPServer(httpServerConfig);
httpServer.initialize();
}
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
final OpenTelemetryMetricRequestProcessor processor = getManager().find(OtelMetricReceiverModule.NAME)
.provider()
.getService(
OpenTelemetryMetricRequestProcessor.class);
httpServer.addHandler(
new FirehoseHTTPHandler(processor),
Collections.singletonList(HttpMethod.POST)
);
}
@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
httpServer.start();
}
@Override
public String[] requiredModules() {
return new String[] {
OtelMetricReceiverModule.NAME
};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.server.receiver.aws.firehose;
import com.google.protobuf.InvalidProtocolBufferException;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.server.annotation.ConsumesJson;
import com.linecorp.armeria.server.annotation.Post;
import com.linecorp.armeria.server.annotation.ProducesJson;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import java.io.ByteArrayInputStream;
import java.util.Base64;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.receiver.otel.otlp.OpenTelemetryMetricRequestProcessor;
@Slf4j
@AllArgsConstructor
public class FirehoseHTTPHandler {
private final OpenTelemetryMetricRequestProcessor openTelemetryMetricRequestProcessor;
@Post("/aws/firehose/metrics")
@ConsumesJson
@ProducesJson
public HttpResponse collectMetrics(final FirehoseReq firehoseReq) {
try {
for (RequestData record : firehoseReq.getRecords()) {
final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(
Base64.getDecoder().decode(record.getData()));
ExportMetricsServiceRequest request;
while ((request = ExportMetricsServiceRequest.parseDelimitedFrom(byteArrayInputStream)) != null) {
openTelemetryMetricRequestProcessor.processMetricsRequest(request);
}
}
} catch (InvalidProtocolBufferException e) {
log.warn("Only OpenTelemetry format is accepted", e);
return HttpResponse.ofJson(
HttpStatus.BAD_REQUEST,
new FirehoseRes(firehoseReq.getRequestId(), System.currentTimeMillis(),
"Only OpenTelemetry format is accepted"
)
);
} catch (Exception e) {
log.error("Server error", e);
return HttpResponse.ofJson(
HttpStatus.INTERNAL_SERVER_ERROR,
new FirehoseRes(firehoseReq.getRequestId(), System.currentTimeMillis(),
"Server error, please check the OAP log"
)
);
}
return HttpResponse.ofJson(
HttpStatus.OK,
new FirehoseRes(firehoseReq.getRequestId(), System.currentTimeMillis(), null)
);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.server.receiver.aws.firehose;
import java.util.List;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class FirehoseReq {
private String requestId;
private Long timestamp;
private List<RequestData> records;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.server.receiver.aws.firehose;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
@AllArgsConstructor
public class FirehoseRes {
private String requestId;
private Long timestamp;
private String errorMessage;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.server.receiver.aws.firehose;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class RequestData {
private String data;
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.receiver.aws.firehose.AWSFirehoseReceiverModule
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.receiver.aws.firehose.AWSFirehoseReceiverModuleProvider
......@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.receiver.otel;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.receiver.otel.otlp.OpenTelemetryMetricRequestProcessor;
public class OtelMetricReceiverModule extends ModuleDefine {
public static final String NAME = "receiver-otel";
......@@ -29,6 +30,6 @@ public class OtelMetricReceiverModule extends ModuleDefine {
@Override
public Class[] services() {
return new Class[0];
return new Class[] {OpenTelemetryMetricRequestProcessor.class};
}
}
......@@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.receiver.otel.oc.OCMetricHandler;
import org.apache.skywalking.oap.server.receiver.otel.otlp.OpenTelemetryMetricHandler;
import org.apache.skywalking.oap.server.receiver.otel.otlp.OpenTelemetryMetricRequestProcessor;
import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule;
public class OtelMetricReceiverProvider extends ModuleProvider {
......@@ -35,6 +36,8 @@ public class OtelMetricReceiverProvider extends ModuleProvider {
private OtelMetricReceiverConfig config;
private OpenTelemetryMetricRequestProcessor metricRequestProcessor;
@Override
public String name() {
return NAME;
......@@ -62,10 +65,13 @@ public class OtelMetricReceiverProvider extends ModuleProvider {
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
metricRequestProcessor = new OpenTelemetryMetricRequestProcessor(
getManager(), config);
registerServiceImplementation(OpenTelemetryMetricRequestProcessor.class, metricRequestProcessor);
final List<String> enabledHandlers = config.getEnabledHandlers();
List<Handler> handlers = new ArrayList<>();
final OpenTelemetryMetricHandler openTelemetryMetricHandler = new OpenTelemetryMetricHandler(
getManager(), config);
getManager(), metricRequestProcessor);
if (enabledHandlers.contains(openTelemetryMetricHandler.type())) {
handlers.add(openTelemetryMetricHandler);
}
......@@ -78,6 +84,7 @@ public class OtelMetricReceiverProvider extends ModuleProvider {
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
metricRequestProcessor.start();
for (Handler h : handlers) {
h.active();
}
......
......@@ -18,65 +18,26 @@
package org.apache.skywalking.oap.server.receiver.otel.otlp;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import io.grpc.stub.StreamObserver;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.metrics.v1.Sum;
import io.opentelemetry.proto.metrics.v1.SummaryDataPoint.ValueAtQuantile;
import io.vavr.Function1;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Stream;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.meter.analyzer.MetricConvert;
import org.apache.skywalking.oap.meter.analyzer.prometheus.PrometheusMetricConverter;
import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rule;
import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rules;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Counter;
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Gauge;
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Histogram;
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Metric;
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Summary;
import org.apache.skywalking.oap.server.receiver.otel.Handler;
import org.apache.skywalking.oap.server.receiver.otel.OtelMetricReceiverConfig;
import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule;
import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
@Slf4j
@RequiredArgsConstructor
public class OpenTelemetryMetricHandler
extends MetricsServiceGrpc.MetricsServiceImplBase
implements Handler {
private final ModuleManager manager;
private final OtelMetricReceiverConfig config;
private static final Map<String, String> LABEL_MAPPINGS =
ImmutableMap
.<String, String>builder()
.put("net.host.name", "node_identifier_host_name")
.put("host.name", "node_identifier_host_name")
.put("job", "job_name")
.put("service.name", "job_name")
.build();
private List<PrometheusMetricConverter> converters;
private final OpenTelemetryMetricRequestProcessor metricRequestProcessor;
@Override
public String type() {
......@@ -85,31 +46,9 @@ public class OpenTelemetryMetricHandler
@Override
public void active() throws ModuleStartException {
final List<String> enabledRules =
Splitter.on(",")
.omitEmptyStrings()
.splitToList(config.getEnabledOtelRules());
final List<Rule> rules;
try {
rules = Rules.loadRules("otel-rules", enabledRules);
} catch (IOException e) {
throw new ModuleStartException("Failed to load otel rules.", e);
}
if (rules.isEmpty()) {
return;
}
GRPCHandlerRegister grpcHandlerRegister = manager.find(SharingServerModule.NAME)
.provider()
.getService(GRPCHandlerRegister.class);
final MeterSystem meterSystem = manager.find(CoreModule.NAME).provider().getService(MeterSystem.class);
converters = rules
.stream()
.map(r -> new PrometheusMetricConverter(r, meterSystem))
.collect(toList());
grpcHandlerRegister.addHandler(this);
}
......@@ -117,136 +56,9 @@ public class OpenTelemetryMetricHandler
public void export(
final ExportMetricsServiceRequest requests,
final StreamObserver<ExportMetricsServiceResponse> responseObserver) {
requests.getResourceMetricsList().forEach(request -> {
if (log.isDebugEnabled()) {
log.debug("Resource attributes: {}", request.getResource().getAttributesList());
}
final Map<String, String> nodeLabels =
request
.getResource()
.getAttributesList()
.stream()
.collect(toMap(
it -> LABEL_MAPPINGS
.getOrDefault(it.getKey(), it.getKey())
.replaceAll("\\.", "_"),
it -> it.getValue().getStringValue(),
(v1, v2) -> v1));
converters
.forEach(convert -> convert.toMeter(
request
.getScopeMetricsList().stream()
.flatMap(scopeMetrics -> scopeMetrics
.getMetricsList().stream()
.flatMap(metric -> adaptMetrics(nodeLabels, metric))
.map(Function1.liftTry(Function.identity()))
.flatMap(tryIt -> MetricConvert.log(tryIt,
"Convert OTEL metric to prometheus metric")))));
});
metricRequestProcessor.processMetricsRequest(requests);
responseObserver.onNext(ExportMetricsServiceResponse.getDefaultInstance());
responseObserver.onCompleted();
}
private static Map<String, String> buildLabels(List<KeyValue> kvs) {
return kvs
.stream()
.collect(toMap(
KeyValue::getKey,
it -> it.getValue().getStringValue()));
}
private static Map<String, String> mergeLabels(
final Map<String, String> nodeLabels,
final Map<String, String> pointLabels) {
// data point labels should have higher precedence and override the one in node labels
final Map<String, String> result = new HashMap<>(nodeLabels);
result.putAll(pointLabels);
return result;
}
private static Map<Double, Long> buildBuckets(
final List<Long> bucketCounts,
final List<Double> explicitBounds) {
final Map<Double, Long> result = new HashMap<>();
for (int i = 0; i < explicitBounds.size(); i++) {
result.put(explicitBounds.get(i), bucketCounts.get(i));
}
result.put(Double.POSITIVE_INFINITY, bucketCounts.get(explicitBounds.size()));
return result;
}
// Adapt the OpenTelemetry metrics to SkyWalking metrics
private Stream<? extends Metric> adaptMetrics(
final Map<String, String> nodeLabels,
final io.opentelemetry.proto.metrics.v1.Metric metric) {
if (metric.hasGauge()) {
return metric.getGauge().getDataPointsList().stream()
.map(point -> new Gauge(
metric.getName(),
mergeLabels(nodeLabels,
buildLabels(point.getAttributesList())),
point.hasAsDouble() ? point.getAsDouble()
: point.getAsInt(),
point.getTimeUnixNano() / 1000000));
}
if (metric.hasSum()) {
final Sum sum = metric.getSum();
if (sum
.getAggregationTemporality() != AGGREGATION_TEMPORALITY_CUMULATIVE) {
return Stream.empty();
}
if (sum.getIsMonotonic()) {
return sum.getDataPointsList().stream()
.map(point -> new Counter(
metric.getName(),
mergeLabels(nodeLabels,
buildLabels(point.getAttributesList())),
point.hasAsDouble() ? point.getAsDouble()
: point.getAsInt(),
point.getTimeUnixNano() / 1000000));
} else {
return sum.getDataPointsList().stream()
.map(point -> new Gauge(
metric.getName(),
mergeLabels(nodeLabels,
buildLabels(point.getAttributesList())),
point.hasAsDouble() ? point.getAsDouble()
: point.getAsInt(),
point.getTimeUnixNano() / 1000000));
}
}
if (metric.hasHistogram()) {
return metric.getHistogram().getDataPointsList().stream()
.map(point -> new Histogram(
metric.getName(),
mergeLabels(nodeLabels,
buildLabels(point.getAttributesList())),
point.getCount(),
point.getSum(),
buildBuckets(point.getBucketCountsList(),
point.getExplicitBoundsList()),
point.getTimeUnixNano() / 1000000));
}
if (metric.hasSummary()) {
return metric.getSummary().getDataPointsList().stream()
.map(point -> new Summary(
metric.getName(),
mergeLabels(nodeLabels,
buildLabels(point.getAttributesList())),
point.getCount(),
point.getSum(),
point.getQuantileValuesList().stream().collect(
toMap(ValueAtQuantile::getQuantile,
ValueAtQuantile::getValue)),
point.getTimeUnixNano() / 1000000));
}
throw new UnsupportedOperationException("Unsupported type");
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.server.receiver.otel.otlp;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.metrics.v1.Sum;
import io.opentelemetry.proto.metrics.v1.SummaryDataPoint;
import io.vavr.Function1;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Stream;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.meter.analyzer.MetricConvert;
import org.apache.skywalking.oap.meter.analyzer.prometheus.PrometheusMetricConverter;
import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rule;
import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rules;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.Service;
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Counter;
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Gauge;
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Histogram;
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Metric;
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Summary;
import org.apache.skywalking.oap.server.receiver.otel.OtelMetricReceiverConfig;
import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
@RequiredArgsConstructor
@Slf4j
public class OpenTelemetryMetricRequestProcessor implements Service {
private final ModuleManager manager;
private final OtelMetricReceiverConfig config;
private static final Map<String, String> LABEL_MAPPINGS =
ImmutableMap
.<String, String>builder()
.put("net.host.name", "node_identifier_host_name")
.put("host.name", "node_identifier_host_name")
.put("job", "job_name")
.put("service.name", "job_name")
.build();
private List<PrometheusMetricConverter> converters;
public void processMetricsRequest(final ExportMetricsServiceRequest requests) {
requests.getResourceMetricsList().forEach(request -> {
if (log.isDebugEnabled()) {
log.debug("Resource attributes: {}", request.getResource().getAttributesList());
}
final Map<String, String> nodeLabels =
request
.getResource()
.getAttributesList()
.stream()
.collect(toMap(
it -> LABEL_MAPPINGS
.getOrDefault(it.getKey(), it.getKey())
.replaceAll("\\.", "_"),
it -> it.getValue().getStringValue(),
(v1, v2) -> v1
));
converters
.forEach(convert -> convert.toMeter(
request
.getScopeMetricsList().stream()
.flatMap(scopeMetrics -> scopeMetrics
.getMetricsList().stream()
.flatMap(metric -> adaptMetrics(nodeLabels, metric))
.map(Function1.liftTry(Function.identity()))
.flatMap(tryIt -> MetricConvert.log(
tryIt,
"Convert OTEL metric to prometheus metric"
)))));
});
}
public void start() throws ModuleStartException {
final List<String> enabledRules =
Splitter.on(",")
.omitEmptyStrings()
.splitToList(config.getEnabledOtelRules());
final List<Rule> rules;
try {
rules = Rules.loadRules("otel-rules", enabledRules);
} catch (IOException e) {
throw new ModuleStartException("Failed to load otel rules.", e);
}
if (rules.isEmpty()) {
return;
}
final MeterSystem meterSystem = manager.find(CoreModule.NAME).provider().getService(MeterSystem.class);
converters = rules
.stream()
.map(r -> new PrometheusMetricConverter(r, meterSystem))
.collect(toList());
}
private static Map<String, String> buildLabels(List<KeyValue> kvs) {
return kvs
.stream()
.collect(toMap(
KeyValue::getKey,
it -> it.getValue().getStringValue()
));
}
private static Map<String, String> mergeLabels(
final Map<String, String> nodeLabels,
final Map<String, String> pointLabels) {
// data point labels should have higher precedence and override the one in node labels
final Map<String, String> result = new HashMap<>(nodeLabels);
result.putAll(pointLabels);
return result;
}
private static Map<Double, Long> buildBuckets(
final List<Long> bucketCounts,
final List<Double> explicitBounds) {
final Map<Double, Long> result = new HashMap<>();
for (int i = 0; i < explicitBounds.size(); i++) {
result.put(explicitBounds.get(i), bucketCounts.get(i));
}
result.put(Double.POSITIVE_INFINITY, bucketCounts.get(explicitBounds.size()));
return result;
}
// Adapt the OpenTelemetry metrics to SkyWalking metrics
private Stream<? extends Metric> adaptMetrics(
final Map<String, String> nodeLabels,
final io.opentelemetry.proto.metrics.v1.Metric metric) {
if (metric.hasGauge()) {
return metric.getGauge().getDataPointsList().stream()
.map(point -> new Gauge(
metric.getName(),
mergeLabels(
nodeLabels,
buildLabels(point.getAttributesList())
),
point.hasAsDouble() ? point.getAsDouble()
: point.getAsInt(),
point.getTimeUnixNano() / 1000000
));
}
if (metric.hasSum()) {
final Sum sum = metric.getSum();
if (sum
.getAggregationTemporality() != AGGREGATION_TEMPORALITY_CUMULATIVE) {
return Stream.empty();
}
if (sum.getIsMonotonic()) {
return sum.getDataPointsList().stream()
.map(point -> new Counter(
metric.getName(),
mergeLabels(
nodeLabels,
buildLabels(point.getAttributesList())
),
point.hasAsDouble() ? point.getAsDouble()
: point.getAsInt(),
point.getTimeUnixNano() / 1000000
));
} else {
return sum.getDataPointsList().stream()
.map(point -> new Gauge(
metric.getName(),
mergeLabels(
nodeLabels,
buildLabels(point.getAttributesList())
),
point.hasAsDouble() ? point.getAsDouble()
: point.getAsInt(),
point.getTimeUnixNano() / 1000000
));
}
}
if (metric.hasHistogram()) {
return metric.getHistogram().getDataPointsList().stream()
.map(point -> new Histogram(
metric.getName(),
mergeLabels(
nodeLabels,
buildLabels(point.getAttributesList())
),
point.getCount(),
point.getSum(),
buildBuckets(
point.getBucketCountsList(),
point.getExplicitBoundsList()
),
point.getTimeUnixNano() / 1000000
));
}
if (metric.hasSummary()) {
return metric.getSummary().getDataPointsList().stream()
.map(point -> new Summary(
metric.getName(),
mergeLabels(
nodeLabels,
buildLabels(point.getAttributesList())
),
point.getCount(),
point.getSum(),
point.getQuantileValuesList().stream().collect(
toMap(
SummaryDataPoint.ValueAtQuantile::getQuantile,
SummaryDataPoint.ValueAtQuantile::getValue
)),
point.getTimeUnixNano() / 1000000
));
}
throw new UnsupportedOperationException("Unsupported type");
}
}
......@@ -47,6 +47,7 @@
<module>skywalking-zabbix-receiver-plugin</module>
<module>skywalking-ebpf-receiver-plugin</module>
<module>skywalking-telegraf-receiver-plugin</module>
<module>aws-firehose-receiver</module>
</modules>
<dependencies>
......
......@@ -22,7 +22,6 @@ import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.annotation.ConsumesJson;
import com.linecorp.armeria.server.annotation.ConsumesProtobuf;
import com.linecorp.armeria.server.annotation.Post;
......@@ -38,6 +37,7 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import zipkin2.Span;
import zipkin2.codec.SpanBytesDecoder;
import static java.util.Objects.nonNull;
@Slf4j
......@@ -62,45 +62,44 @@ public class ZipkinSpanHTTPHandler {
}
@Post("/api/v2/spans")
public HttpResponse collectV2Spans(ServiceRequestContext ctx, HttpRequest req) {
return doCollectSpans(SpanBytesDecoder.JSON_V2, ctx, req);
public HttpResponse collectV2Spans(HttpRequest req) {
return doCollectSpans(SpanBytesDecoder.JSON_V2, req);
}
@Post("/api/v2/spans")
@ConsumesJson
public HttpResponse collectV2JsonSpans(ServiceRequestContext ctx, HttpRequest req) {
return doCollectSpans(SpanBytesDecoder.JSON_V2, ctx, req);
public HttpResponse collectV2JsonSpans(HttpRequest req) {
return doCollectSpans(SpanBytesDecoder.JSON_V2, req);
}
@Post("/api/v2/spans")
@ConsumesProtobuf
public HttpResponse collectV2ProtobufSpans(ServiceRequestContext ctx, HttpRequest req) {
return doCollectSpans(SpanBytesDecoder.PROTO3, ctx, req);
public HttpResponse collectV2ProtobufSpans(HttpRequest req) {
return doCollectSpans(SpanBytesDecoder.PROTO3, req);
}
@Post("/api/v1/spans")
public HttpResponse collectV1Spans(ServiceRequestContext ctx, HttpRequest req) {
return doCollectSpans(SpanBytesDecoder.JSON_V1, ctx, req);
public HttpResponse collectV1Spans(HttpRequest req) {
return doCollectSpans(SpanBytesDecoder.JSON_V1, req);
}
@Post("/api/v1/spans")
@ConsumesJson
public HttpResponse collectV1JsonSpans(ServiceRequestContext ctx, HttpRequest req) {
return doCollectSpans(SpanBytesDecoder.JSON_V1, ctx, req);
public HttpResponse collectV1JsonSpans(HttpRequest req) {
return doCollectSpans(SpanBytesDecoder.JSON_V1, req);
}
@Post("/api/v1/spans")
@ConsumesThrift
public HttpResponse collectV1ThriftSpans(ServiceRequestContext ctx, HttpRequest req) {
return doCollectSpans(SpanBytesDecoder.THRIFT, ctx, req);
public HttpResponse collectV1ThriftSpans(HttpRequest req) {
return doCollectSpans(SpanBytesDecoder.THRIFT, req);
}
HttpResponse doCollectSpans(final SpanBytesDecoder decoder,
final ServiceRequestContext ctx,
final HttpRequest req) {
final HistogramMetrics.Timer timer = histogram.createTimer();
final HttpResponse response = HttpResponse.from(req.aggregate().thenApply(request -> {
try (final HttpData httpData = UnzippingBytesRequestConverter.convertRequest(ctx, request)) {
try (final HttpData httpData = request.content()) {
final List<Span> spanList = decoder.decodeList(httpData.byteBuf().nioBuffer());
spanForward.send(spanList);
return HttpResponse.of(HttpStatus.OK);
......
......@@ -171,6 +171,11 @@
<artifactId>skywalking-telegraf-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>aws-firehose-receiver</artifactId>
<version>${project.version}</version>
</dependency>
<!-- receiver module -->
<!-- fetcher module -->
......
......@@ -543,3 +543,17 @@ receiver-telegraf:
selector: ${SW_RECEIVER_TELEGRAF:default}
default:
activeFiles: ${SW_RECEIVER_TELEGRAF_ACTIVE_FILES:vm}
aws-firehose:
selector: ${SW_RECEIVER_AWS_FIREHOSE:default}
default:
host: ${SW_RECEIVER_AWS_FIREHOSE_HTTP_HOST:0.0.0.0}
port: ${SW_RECEIVER_AWS_FIREHOSE_HTTP_PORT:12801}
contextPath: ${SW_RECEIVER_AWS_FIREHOSE_HTTP_CONTEXT_PATH:/}
maxThreads: ${SW_RECEIVER_AWS_FIREHOSE_HTTP_MAX_THREADS:200}
idleTimeOut: ${SW_RECEIVER_AWS_FIREHOSE_HTTP_IDLE_TIME_OUT:30000}
acceptQueueSize: ${SW_RECEIVER_AWS_FIREHOSE_HTTP_ACCEPT_QUEUE_SIZE:0}
maxRequestHeaderSize: ${SW_RECEIVER_AWS_FIREHOSE_HTTP_MAX_REQUEST_HEADER_SIZE:8192}
enableTLS: ${SW_RECEIVER_AWS_FIREHOSE_HTTP_ENABLE_TLS:false}
tlsKeyPath: ${SW_RECEIVER_AWS_FIREHOSE_HTTP_TLS_KEY_PATH:}
tlsCertChainPath: ${SW_RECEIVER_AWS_FIREHOSE_HTTP_TLS_CERT_CHAIN_PATH:}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册