From 3ca7f7b73cbe219d64a48e2844cb47377157c851 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E6=99=9F=20Wu=20Sheng?= Date: Fri, 17 May 2019 15:29:38 +0800 Subject: [PATCH] Support ALS and observe service mesh without Mixer (#2460) * Add ALS proto and receiver in envoy --- README.md | 5 +- docker/oap/docker-entrypoint.sh | 9 +- docs/en/setup/README.md | 4 +- docs/en/setup/backend/backend-receivers.md | 2 +- docs/en/setup/envoy/als_setting.md | 17 + .../{README.md => metrics_service_setting.md} | 0 oap-server/pom.xml | 2 +- .../envoy-metrics-receiver-plugin/pom.xml | 10 + .../envoy/AccessLogServiceGRPCHandler.java | 123 ++++++ .../envoy/EnvoyMetricReceiverConfig.java | 35 ++ .../envoy/EnvoyMetricReceiverProvider.java | 9 +- .../receiver/envoy/als/ALSHTTPAnalysis.java | 42 ++ .../envoy/als/DependencyResource.java | 63 +++ .../server/receiver/envoy/als/Fetcher.java | 48 +++ .../als/K8sALSServiceMeshHTTPAnalysis.java | 372 ++++++++++++++++++ .../oap/server/receiver/envoy/als/Role.java | 37 ++ .../receiver/envoy/als/ServiceMetaInfo.java | 67 ++++ ....server.receiver.envoy.als.ALSHTTPAnalysis | 20 + .../envoy/als/DependencyResourceTest.java | 97 +++++ .../envoy/als/K8sHTTPAnalysisTest.java | 164 ++++++++ .../src/test/resources/envoy-ingress.msg | 89 +++++ .../test/resources/envoy-ingress2sidecar.msg | 99 +++++ .../resources/envoy-mesh-client-sidecar.msg | 92 +++++ .../resources/envoy-mesh-server-sidecar.msg | 96 +++++ .../proto/envoy/api/v2/core/address.proto | 121 ++++++ .../main/proto/envoy/api/v2/core/base.proto | 176 +++++++++ .../envoy/data/accesslog/v2/accesslog.proto | 335 ++++++++++++++++ .../envoy/service/accesslog/v2/als.proto | 73 ++++ .../mesh/ServiceMeshMetricDataDecorator.java | 17 +- .../mesh/TelemetryDataDispatcher.java | 30 +- .../src/main/assembly/application.yml | 1 + .../src/main/resources/application.yml | 1 + 32 files changed, 2232 insertions(+), 24 deletions(-) create mode 100644 docs/en/setup/envoy/als_setting.md rename docs/en/setup/envoy/{README.md => metrics_service_setting.md} (100%) create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResource.java create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/Fetcher.java create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/Role.java create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResourceTest.java create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sHTTPAnalysisTest.java create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/envoy-ingress.msg create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/envoy-ingress2sidecar.msg create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/envoy-mesh-client-sidecar.msg create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/envoy-mesh-server-sidecar.msg create mode 100644 oap-server/server-receiver-plugin/receiver-proto/src/main/proto/envoy/api/v2/core/address.proto create mode 100644 oap-server/server-receiver-plugin/receiver-proto/src/main/proto/envoy/data/accesslog/v2/accesslog.proto create mode 100644 oap-server/server-receiver-plugin/receiver-proto/src/main/proto/envoy/service/accesslog/v2/als.proto diff --git a/README.md b/README.md index b28f8aca2..4c6a8bd54 100644 --- a/README.md +++ b/README.md @@ -37,9 +37,10 @@ including 1. Java, [.NET Core](https://github.com/SkyAPM/SkyAPM-dotnet), [NodeJS](https://github.com/SkyAPM/SkyAPM-nodejs) and [PHP](https://github.com/SkyAPM/SkyAPM-php-sdk) auto-instrument agents in SkyWalking format 1. Manual-instrument [Go agent](https://github.com/tetratelabs/go2sky) in SkyWalking format. 1. Istio telemetry format -1. Zipkin v1/v2 format +1. Envoy gRPC Access Log Service (ALS) format in Istio controlled service mesh +1. Envoy Metrics Service format. +1. Zipkin v1/v2 format. 1. Jaeger gRPC format. -1. Envoy metrics format (the metrics entries itself is prometheus client [metrics family](https://github.com/prometheus/client_model/blob/fd36f4220a901265f90734c3183c5f0c91daa0b8/metrics.proto#L77)) # Document diff --git a/docker/oap/docker-entrypoint.sh b/docker/oap/docker-entrypoint.sh index 764cf0145..b44d612c2 100755 --- a/docker/oap/docker-entrypoint.sh +++ b/docker/oap/docker-entrypoint.sh @@ -186,8 +186,6 @@ service-mesh: bufferFileCleanWhenRestart: \${SW_SERVICE_MESH_BUFFER_FILE_CLEAN_WHEN_RESTART:false} istio-telemetry: default: -envoy-metric: - default: query: graphql: path: \${SW_QUERY_GRAPHQL_PATH:/graphql} @@ -197,7 +195,14 @@ telemetry: prometheus: host: \${SW_TELEMETRY_PROMETHEUS_HOST:0.0.0.0} port: \${SW_TELEMETRY_PROMETHEUS_PORT:1234} +envoy-metric: + default: EOT + if [[ "$SW_ENVOY_ALS_ENABLED" = "true" ]]; then + cat <> ${var_application_file} + alsHTTPAnalysis: \${SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS:k8s-mesh} +EOT + fi if [[ "$SW_RECEIVER_ZIPKIN_ENABLED" = "true" ]]; then cat <> ${var_application_file} diff --git a/docs/en/setup/README.md b/docs/en/setup/README.md index 43af35c37..801fd2e37 100644 --- a/docs/en/setup/README.md +++ b/docs/en/setup/README.md @@ -22,10 +22,12 @@ You could go to their project repositories to find out the releases and how to u ## Service Mesh - Istio - [SkyWalking on Istio](istio/README.md). Introduce how to use Istio Mixer bypass Adapter to work with SkyWalking. + - Envoy + - Use [ALS(access log service)](https://www.envoyproxy.io/docs/envoy/latest/api-v2/service/accesslog/v2/als.proto) to observe service mesh, without Mixer. Follow [document](envoy/als_setting.md) to open it. ## Proxy - [Envoy Proxy](https://www.envoyproxy.io/) - - [Sending metrics to Skywalking from Envoy](envoy/README.md). How to send metrics from Envoy to SkyWalking using [Metrics service](https://www.envoyproxy.io/docs/envoy/latest/api-v2/config/metrics/v2/metrics_service.proto.html). + - [Sending metrics to Skywalking from Envoy](envoy/metrics_service_setting.md). How to send metrics from Envoy to SkyWalking using [Metrics service](https://www.envoyproxy.io/docs/envoy/latest/api-v2/config/metrics/v2/metrics_service.proto.html). ## Setup backend Follow [backend and UI setup document](backend/backend-ui-setup.md) to understand and config the backend for different diff --git a/docs/en/setup/backend/backend-receivers.md b/docs/en/setup/backend/backend-receivers.md index a5cd09b4a..2dbc5891d 100644 --- a/docs/en/setup/backend/backend-receivers.md +++ b/docs/en/setup/backend/backend-receivers.md @@ -10,7 +10,7 @@ We have following receivers, and `default` implementors are provided in our Apac 1. **service-mesh**. gRPC services accept data from inbound mesh probes. 1. **receiver-jvm**. gRPC services accept JVM metrics data. 1. **istio-telemetry**. Istio telemetry is from Istio official bypass adaptor, this receiver match its gRPC services. -1. **envoy-metric**. Envoy `metrics_service` supported by this receiver. OAL script support all GAUGE type metrics. +1. **envoy-metric**. Envoy `metrics_service` and `ALS(access log service)` supported by this receiver. OAL script support all GAUGE type metrics. 1. **receiver_zipkin**. See [details](#zipkin-receiver). 1. **receiver_jaeger**. See [details](#jaeger-receiver). diff --git a/docs/en/setup/envoy/als_setting.md b/docs/en/setup/envoy/als_setting.md new file mode 100644 index 000000000..b57b6086c --- /dev/null +++ b/docs/en/setup/envoy/als_setting.md @@ -0,0 +1,17 @@ +# Observe service mesh through ALS +Envoy [ALS(access log service)](https://www.envoyproxy.io/docs/envoy/latest/api-v2/service/accesslog/v2/als.proto) provides +fully logs about RPC routed, including HTTP and TCP. + +You need three steps to open ALS. +1. Right now, Istio pilot hasn't supported to open ALS, so you have to change pilot codes. +1. Open SkyWalking [envoy receiver](../backend/backend-receivers.md). +1. Active ALS k8s-mesh analysis +```yaml +envoy-metric: + default: + alsHTTPAnalysis: + - k8s-mesh +``` + +Notice, only use this when using envoy under Istio controlled. +Otherwise, you need to implement your own `ALSHTTPAnalysis` and register it to receiver. \ No newline at end of file diff --git a/docs/en/setup/envoy/README.md b/docs/en/setup/envoy/metrics_service_setting.md similarity index 100% rename from docs/en/setup/envoy/README.md rename to docs/en/setup/envoy/metrics_service_setting.md diff --git a/oap-server/pom.xml b/oap-server/pom.xml index b70168894..6c4948c75 100644 --- a/oap-server/pom.xml +++ b/oap-server/pom.xml @@ -64,7 +64,7 @@ 2.6 6.3.2 2.9.9 - 2.0.0 + 4.0.0 3.1.0 2.9.1 2.6.2 diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml index 0b665dbd3..9a78fb996 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml @@ -39,6 +39,16 @@ skywalking-mesh-receiver-plugin ${project.version} + + io.kubernetes + client-java + + + com.google.protobuf + protobuf-java-util + 3.5.1 + test + diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java new file mode 100644 index 000000000..f534a87cd --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy; + +import io.envoyproxy.envoy.service.accesslog.v2.*; +import io.grpc.stub.StreamObserver; +import java.util.*; +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.source.*; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.receiver.envoy.als.*; +import org.apache.skywalking.oap.server.telemetry.TelemetryModule; +import org.apache.skywalking.oap.server.telemetry.api.*; +import org.slf4j.*; + +public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogServiceImplBase { + private static final Logger logger = LoggerFactory.getLogger(AccessLogServiceGRPCHandler.class); + private final List envoyHTTPAnalysisList; + private final SourceReceiver sourceReceiver; + private final CounterMetrics counter; + private final HistogramMetrics histogram; + private final CounterMetrics sourceDispatcherCounter; + + public AccessLogServiceGRPCHandler(ModuleManager manager, EnvoyMetricReceiverConfig config) { + ServiceLoader alshttpAnalyses = ServiceLoader.load(ALSHTTPAnalysis.class); + envoyHTTPAnalysisList = new ArrayList<>(); + for (String httpAnalysisName : config.getAlsHTTPAnalysis()) { + for (ALSHTTPAnalysis httpAnalysis : alshttpAnalyses) { + if (httpAnalysisName.equals(httpAnalysis.name())) { + httpAnalysis.init(config); + envoyHTTPAnalysisList.add(httpAnalysis); + } + } + } + + logger.debug("envoy HTTP analysis: " + envoyHTTPAnalysisList); + + sourceReceiver = manager.find(CoreModule.NAME).provider().getService(SourceReceiver.class); + + MetricsCreator metricCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class); + counter = metricCreator.createCounter("envoy_als_in_count", "The count of envoy ALS metric received", + MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE); + histogram = metricCreator.createHistogramMetric("envoy_als_in_latency", "The process latency of service ALS metric receiver", + MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE); + sourceDispatcherCounter = metricCreator.createCounter("envoy_als_source_dispatch_count", "The count of envoy ALS metric received", + MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE); + } + + public StreamObserver streamAccessLogs( + StreamObserver responseObserver) { + return new StreamObserver() { + private volatile boolean isFirst = true; + private Role role; + private StreamAccessLogsMessage.Identifier identifier; + + @Override public void onNext(StreamAccessLogsMessage message) { + counter.inc(); + + HistogramMetrics.Timer timer = histogram.createTimer(); + try { + if (isFirst) { + identifier = message.getIdentifier(); + isFirst = false; + role = Role.NONE; + for (ALSHTTPAnalysis analysis : envoyHTTPAnalysisList) { + role = analysis.identify(identifier, role); + } + } + + StreamAccessLogsMessage.LogEntriesCase logCase = message.getLogEntriesCase(); + + if (logger.isDebugEnabled()) { + logger.debug("Messaged is identified from Envoy[{}], role[{}] in [{}]. Received msg {}", + identifier.getNode().getId(), role, logCase, message); + } + + switch (logCase) { + case HTTP_LOGS: + StreamAccessLogsMessage.HTTPAccessLogEntries logs = message.getHttpLogs(); + + List sourceResult = new ArrayList<>(); + for (ALSHTTPAnalysis analysis : envoyHTTPAnalysisList) { + logs.getLogEntryList().forEach(log -> { + sourceResult.addAll(analysis.analysis(identifier, log, role)); + }); + } + + sourceDispatcherCounter.inc(sourceResult.size()); + sourceResult.forEach(sourceReceiver::receive); + } + } finally { + timer.finish(); + } + } + + @Override public void onError(Throwable throwable) { + logger.error("Error in receiving access log from envoy", throwable); + responseObserver.onCompleted(); + } + + @Override public void onCompleted() { + responseObserver.onNext(StreamAccessLogsResponse.newBuilder().build()); + responseObserver.onCompleted(); + } + }; + } +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java new file mode 100644 index 000000000..b7ff4d8d6 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy; + +import java.util.*; +import java.util.stream.Collectors; + +import org.apache.skywalking.oap.server.library.module.ModuleConfig; + +/** + * @author wusheng,gaohongtao + */ +public class EnvoyMetricReceiverConfig extends ModuleConfig { + private String alsHTTPAnalysis; + + public List getAlsHTTPAnalysis() { + return Arrays.stream(alsHTTPAnalysis.trim().split(",")).map(String::trim).collect(Collectors.toList()); + } +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java index 1631615f1..398eac91b 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java @@ -27,6 +27,12 @@ import org.apache.skywalking.oap.server.telemetry.TelemetryModule; * @author wusheng */ public class EnvoyMetricReceiverProvider extends ModuleProvider { + private final EnvoyMetricReceiverConfig config; + + public EnvoyMetricReceiverProvider() { + config = new EnvoyMetricReceiverConfig(); + } + @Override public String name() { return "default"; } @@ -36,7 +42,7 @@ public class EnvoyMetricReceiverProvider extends ModuleProvider { } @Override public ModuleConfig createConfigBeanIfAbsent() { - return null; + return config; } @Override public void prepare() throws ServiceNotProvidedException, ModuleStartException { @@ -46,6 +52,7 @@ public class EnvoyMetricReceiverProvider extends ModuleProvider { @Override public void start() throws ServiceNotProvidedException, ModuleStartException { GRPCHandlerRegister service = getManager().find(CoreModule.NAME).provider().getService(GRPCHandlerRegister.class); service.addHandler(new MetricServiceGRPCHandler(getManager())); + service.addHandler(new AccessLogServiceGRPCHandler(getManager(), config)); } @Override public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException { diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java new file mode 100644 index 000000000..1de742d58 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als; + +import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry; +import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage; +import java.util.List; +import org.apache.skywalking.oap.server.core.source.Source; +import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig; + +/** + * Analysis source metrics from ALS + * + * @author wusheng + */ +public interface ALSHTTPAnalysis { + String name(); + + void init(EnvoyMetricReceiverConfig config); + + List analysis(StreamAccessLogsMessage.Identifier identifier, + HTTPAccessLogEntry entry, Role role); + + Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, + Role prev); +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResource.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResource.java new file mode 100644 index 000000000..e65399080 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResource.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als; + +import io.kubernetes.client.models.V1ObjectMeta; +import io.kubernetes.client.models.V1OwnerReference; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +import java.util.Optional; + +@RequiredArgsConstructor +class DependencyResource { + @Getter(AccessLevel.PACKAGE) + private final V1ObjectMeta metadata; + + private boolean stop; + + DependencyResource getOwnerResource(final String kind, final Fetcher transform) { + if (stop) { + return this; + } + if (metadata.getOwnerReferences() == null) { + stop = true; + return this; + } + V1OwnerReference ownerReference = null; + for (V1OwnerReference each : metadata.getOwnerReferences()) { + if (each.getKind().equals(kind)) { + ownerReference = each; + break; + } + } + if (ownerReference == null) { + stop = true; + return this; + } + Optional metaOptional = transform.apply(ownerReference); + if (!metaOptional.isPresent()) { + stop = true; + return this; + } + return new DependencyResource(metaOptional.get()); + } +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/Fetcher.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/Fetcher.java new file mode 100644 index 000000000..7efcc8a83 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/Fetcher.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als; + +import io.kubernetes.client.ApiException; +import io.kubernetes.client.models.V1ObjectMeta; +import io.kubernetes.client.models.V1OwnerReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; +import java.util.function.Function; + +interface Fetcher extends Function> { + + Logger logger = LoggerFactory.getLogger(Fetcher.class); + + V1ObjectMeta go(V1OwnerReference ownerReference) throws ApiException; + + default Optional apply(V1OwnerReference ownerReference) { + try { + return Optional.ofNullable(go(ownerReference)); + } catch (final ApiException e) { + logger.error("code:{} header:{} body:{}", e.getCode(), e.getResponseHeaders(), e.getResponseBody()); + return Optional.empty(); + } catch (final Throwable th) { + logger.error("other errors", th); + return Optional.empty(); + } + } +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java new file mode 100644 index 000000000..64aedc349 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.protobuf.Duration; +import com.google.protobuf.Timestamp; +import io.envoyproxy.envoy.api.v2.core.Address; +import io.envoyproxy.envoy.api.v2.core.Node; +import io.envoyproxy.envoy.api.v2.core.SocketAddress; +import io.envoyproxy.envoy.data.accesslog.v2.AccessLogCommon; +import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry; +import io.envoyproxy.envoy.data.accesslog.v2.HTTPRequestProperties; +import io.envoyproxy.envoy.data.accesslog.v2.HTTPResponseProperties; +import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage; +import io.kubernetes.client.ApiClient; +import io.kubernetes.client.Configuration; +import io.kubernetes.client.apis.CoreV1Api; +import io.kubernetes.client.apis.ExtensionsV1beta1Api; +import io.kubernetes.client.models.*; +import io.kubernetes.client.util.Config; +import lombok.AccessLevel; +import lombok.Getter; +import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher; +import org.apache.skywalking.apm.network.common.DetectPoint; +import org.apache.skywalking.apm.network.servicemesh.Protocol; +import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric; +import org.apache.skywalking.oap.server.core.source.Source; +import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.*; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Analysis log based on ingress and mesh scenarios. + * + * @author wusheng + */ +public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis { + private static final Logger logger = LoggerFactory.getLogger(K8sALSServiceMeshHTTPAnalysis.class); + + private static final String ADDRESS_TYPE_INTERNAL_IP = "InternalIP"; + + @Getter(AccessLevel.PROTECTED) + private final AtomicReference> ipServiceMap = new AtomicReference<>(); + + private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() + .setNameFormat("load-pod-%d").setDaemon(true).build()); + + @Override public String name() { + return "k8s-mesh"; + } + + @Override public void init(EnvoyMetricReceiverConfig config) { + executorService.scheduleAtFixedRate(this::loadPodInfo, 0,15, TimeUnit.SECONDS); + } + + private boolean invalidPodList() { + Map map = ipServiceMap.get(); + return map == null || map.isEmpty(); + } + + private void loadPodInfo() { + try { + ApiClient client = Config.defaultClient(); + client.getHttpClient().setReadTimeout(20, TimeUnit.SECONDS); + Configuration.setDefaultApiClient(client); + CoreV1Api api = new CoreV1Api(); + V1PodList list = api.listPodForAllNamespaces(null, null, null, + null, null, null, null, null, null); + Map ipMap = new HashMap<>(list.getItems().size()); + long startTime = System.nanoTime(); + for (V1Pod item : list.getItems()) { + if (item.getStatus().getPodIP().equals(item.getStatus().getHostIP())) { + logger.warn("Pod {}.{} is removed because hostIP and podIP are identical ", item.getMetadata().getName()); + continue; + } + ipMap.put(item.getStatus().getPodIP(), createServiceMetaInfo(item.getMetadata())); + } + logger.info("Load {} pods in {}ms", ipMap.size(), (System.nanoTime() - startTime) / 1_000_000); + ipServiceMap.set(ipMap); + } catch (Throwable th) { + logger.error("run load pod error", th); + } + } + + private ServiceMetaInfo createServiceMetaInfo(final V1ObjectMeta podMeta) { + ExtensionsV1beta1Api extensionsApi = new ExtensionsV1beta1Api(); + DependencyResource dr = new DependencyResource(podMeta); + DependencyResource meta = dr + .getOwnerResource("ReplicaSet", ownerReference -> + extensionsApi.readNamespacedReplicaSet(ownerReference.getName(), podMeta.getNamespace(), + "", true, true).getMetadata()); + ServiceMetaInfo result = new ServiceMetaInfo(); + if (meta.getMetadata().getOwnerReferences() != null && meta.getMetadata().getOwnerReferences().size() > 0) { + V1OwnerReference owner = meta.getMetadata().getOwnerReferences().get(0); + result.setServiceName(String.format("%s.%s", owner.getName(), meta.getMetadata().getNamespace())); + } else { + result.setServiceName(String.format("%s.%s", meta.getMetadata().getName(), meta.getMetadata().getNamespace())); + } + result.setServiceInstanceName(String.format("%s.%s", podMeta.getName(), podMeta.getNamespace())); + result.setTags(transformLabelsToTags(podMeta.getLabels())); + return result; + } + + private List transformLabelsToTags(final Map labels) { + if (labels == null || labels.size() < 1) { + return Collections.emptyList(); + } + List result = new ArrayList<>(labels.size()); + for (Map.Entry each : labels.entrySet()) { + result.add(new ServiceMetaInfo.KeyValue(each.getKey(), each.getValue())); + } + return result; + } + + @Override public List analysis(StreamAccessLogsMessage.Identifier identifier, + HTTPAccessLogEntry entry, Role role) { + if (invalidPodList()) { + return Collections.emptyList(); + } + switch (role) { + case PROXY: + analysisProxy(identifier, entry); + break; + case SIDECAR: + return analysisSideCar(identifier, entry); + } + + return Collections.emptyList(); + } + + protected List analysisSideCar(StreamAccessLogsMessage.Identifier identifier, + HTTPAccessLogEntry entry) { + List sources = new ArrayList<>(); + AccessLogCommon properties = entry.getCommonProperties(); + if (properties != null) { + String cluster = properties.getUpstreamCluster(); + if (cluster != null) { + long startTime = formatAsLong(properties.getStartTime()); + long duration = formatAsLong(properties.getTimeToLastDownstreamTxByte()); + + HTTPRequestProperties request = entry.getRequest(); + String endpoint = "/"; + Protocol protocol = Protocol.HTTP; + if (request != null) { + endpoint = request.getPath(); + String schema = request.getScheme(); + if (schema.equals("http") || schema.equals("https")) { + protocol = Protocol.HTTP; + } else { + protocol = Protocol.gRPC; + } + } + HTTPResponseProperties response = entry.getResponse(); + int responseCode = 200; + if (response != null) { + responseCode = response.getResponseCode().getValue(); + } + boolean status = responseCode >= 200 && responseCode < 400; + + Address downstreamRemoteAddress = properties.getDownstreamRemoteAddress(); + ServiceMetaInfo downstreamService = find(downstreamRemoteAddress.getSocketAddress().getAddress(), + downstreamRemoteAddress.getSocketAddress().getPortValue()); + Address downstreamLocalAddress = properties.getDownstreamLocalAddress(); + ServiceMetaInfo localService = find(downstreamLocalAddress.getSocketAddress().getAddress(), + downstreamLocalAddress.getSocketAddress().getPortValue()); + if (cluster.startsWith("inbound|")) { + // Server side + if (downstreamService.equals(ServiceMetaInfo.UNKNOWN)) { + // Ingress -> sidecar(server side) + // Mesh telemetry without source, the relation would be generated. + ServiceMeshMetric metric = ServiceMeshMetric.newBuilder().setStartTime(startTime) + .setEndTime(startTime + duration) + .setDestServiceName(localService.getServiceName()) + .setDestServiceInstance(localService.getServiceInstanceName()) + .setEndpoint(endpoint).setLatency((int)duration) + .setResponseCode(Math.toIntExact(responseCode)) + .setStatus(status).setProtocol(protocol) + .setDetectPoint(DetectPoint.server) + .build(); + + logger.debug("Transformed ingress->sidecar inbound mesh metric {}", metric); + forward(metric); + } else { + // sidecar -> sidecar(server side) + ServiceMeshMetric metric = ServiceMeshMetric.newBuilder().setStartTime(startTime) + .setEndTime(startTime + duration) + .setSourceServiceName(downstreamService.getServiceName()) + .setSourceServiceInstance(downstreamService.getServiceInstanceName()) + .setDestServiceName(localService.getServiceName()) + .setDestServiceInstance(localService.getServiceInstanceName()) + .setEndpoint(endpoint).setLatency((int)duration) + .setResponseCode(Math.toIntExact(responseCode)) + .setStatus(status).setProtocol(protocol) + .setDetectPoint(DetectPoint.server) + .build(); + + logger.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric); + forward(metric); + } + } else if (cluster.startsWith("outbound|")) { + // sidecar(client side) -> sidecar + Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress(); + ServiceMetaInfo destService = find(upstreamRemoteAddress.getSocketAddress().getAddress(), + upstreamRemoteAddress.getSocketAddress().getPortValue()); + + ServiceMeshMetric metric = ServiceMeshMetric.newBuilder().setStartTime(startTime) + .setEndTime(startTime + duration) + .setSourceServiceName(downstreamService.getServiceName()) + .setSourceServiceInstance(downstreamService.getServiceInstanceName()) + .setDestServiceName(destService.getServiceName()) + .setDestServiceInstance(destService.getServiceInstanceName()) + .setEndpoint(endpoint).setLatency((int)duration) + .setResponseCode(Math.toIntExact(responseCode)) + .setStatus(status).setProtocol(protocol) + .setDetectPoint(DetectPoint.client) + .build(); + + logger.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric); + forward(metric); + + } + } + } + return sources; + } + + protected void analysisProxy(StreamAccessLogsMessage.Identifier identifier, + HTTPAccessLogEntry entry) { + AccessLogCommon properties = entry.getCommonProperties(); + if (properties != null) { + Address downstreamLocalAddress = properties.getDownstreamLocalAddress(); + Address downstreamRemoteAddress = properties.getDownstreamRemoteAddress(); + Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress(); + if (downstreamLocalAddress != null && downstreamRemoteAddress != null && upstreamRemoteAddress != null) { + SocketAddress downstreamRemoteAddressSocketAddress = downstreamRemoteAddress.getSocketAddress(); + ServiceMetaInfo outside = find(downstreamRemoteAddressSocketAddress.getAddress(), downstreamRemoteAddressSocketAddress.getPortValue()); + + SocketAddress downstreamLocalAddressSocketAddress = downstreamLocalAddress.getSocketAddress(); + ServiceMetaInfo ingress = find(downstreamLocalAddressSocketAddress.getAddress(), downstreamLocalAddressSocketAddress.getPortValue()); + + long startTime = formatAsLong(properties.getStartTime()); + long duration = formatAsLong(properties.getTimeToLastDownstreamTxByte()); + + HTTPRequestProperties request = entry.getRequest(); + String endpoint = "/"; + Protocol protocol = Protocol.HTTP; + if (request != null) { + endpoint = request.getPath(); + String schema = request.getScheme(); + if (schema.equals("http") || schema.equals("https")) { + protocol = Protocol.HTTP; + } else { + protocol = Protocol.gRPC; + } + } + HTTPResponseProperties response = entry.getResponse(); + int responseCode = 200; + if (response != null) { + responseCode = response.getResponseCode().getValue(); + } + boolean status = responseCode >= 200 && responseCode < 400; + + ServiceMeshMetric metric = ServiceMeshMetric.newBuilder().setStartTime(startTime) + .setEndTime(startTime + duration) + .setSourceServiceName(outside.getServiceName()) + .setSourceServiceInstance(outside.getServiceInstanceName()) + .setDestServiceName(ingress.getServiceName()) + .setDestServiceInstance(ingress.getServiceInstanceName()) + .setEndpoint(endpoint).setLatency((int)duration) + .setResponseCode(Math.toIntExact(responseCode)) + .setStatus(status).setProtocol(protocol) + .setDetectPoint(DetectPoint.server) + .build(); + + logger.debug("Transformed ingress inbound mesh metric {}", metric); + forward(metric); + + SocketAddress upstreamRemoteAddressSocketAddress = upstreamRemoteAddress.getSocketAddress(); + ServiceMetaInfo targetService = find(upstreamRemoteAddressSocketAddress.getAddress(), upstreamRemoteAddressSocketAddress.getPortValue()); + + long outboundStartTime = startTime + formatAsLong(properties.getTimeToFirstUpstreamTxByte()); + long outboundEndTime = startTime + formatAsLong(properties.getTimeToLastUpstreamRxByte()); + + ServiceMeshMetric outboundMetric = ServiceMeshMetric.newBuilder().setStartTime(outboundStartTime) + .setEndTime(outboundEndTime) + .setSourceServiceName(ingress.getServiceName()) + .setSourceServiceInstance(ingress.getServiceInstanceName()) + .setDestServiceName(targetService.getServiceName()) + .setDestServiceInstance(targetService.getServiceInstanceName()) + .setEndpoint(endpoint).setLatency((int)(outboundEndTime - outboundStartTime)) + .setResponseCode(Math.toIntExact(responseCode)) + .setStatus(status).setProtocol(protocol) + .setDetectPoint(DetectPoint.client) + .build(); + + logger.debug("Transformed ingress outbound mesh metric {}", outboundMetric); + forward(outboundMetric); + } + } + } + + @Override public Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, + Role prev) { + if (alsIdentifier != null) { + Node node = alsIdentifier.getNode(); + if (node != null) { + String id = node.getId(); + if (id.startsWith("router~")) { + return Role.PROXY; + } else if (id.startsWith("sidecar~")) { + return Role.SIDECAR; + } + } + } + + return prev; + } + + /** + * @param ip + * @param port + * @return found service info, or {@link ServiceMetaInfo#UNKNOWN} to represent not found. + */ + protected ServiceMetaInfo find(String ip, int port) { + Map map = ipServiceMap.get(); + if (map == null) { + logger.debug("Unknown ip {}, ip -> service is null", ip); + return ServiceMetaInfo.UNKNOWN; + } + if (map.containsKey(ip)) { + return map.get(ip); + } + logger.debug("Unknown ip {}, ip -> service is {}", map); + return ServiceMetaInfo.UNKNOWN; + } + + protected void forward(ServiceMeshMetric metric) { + TelemetryDataDispatcher.preProcess(metric); + } + + private long formatAsLong(Timestamp timestamp) { + return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos()).toEpochMilli(); + } + + private long formatAsLong(Duration duration) { + return Instant.ofEpochSecond(duration.getSeconds(), duration.getNanos()).toEpochMilli(); + } +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/Role.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/Role.java new file mode 100644 index 000000000..8842c39b8 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/Role.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als; + +/** + * The role of envoy in this RPC. + */ +public enum Role { + /** + * Can't identify + */ + NONE, + /** + * Proxy, such as Ingress, or not mesh + */ + PROXY, + /** + * Sidecar in mesh + */ + SIDECAR +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java new file mode 100644 index 000000000..822f87307 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als; + +import java.util.*; +import lombok.*; + +/** + * @author wusheng + */ +@Getter +@Setter +@ToString +public class ServiceMetaInfo { + private String serviceName; + private String serviceInstanceName; + private List tags; + + public ServiceMetaInfo() { + } + + public ServiceMetaInfo(String serviceName, String serviceInstanceName) { + this.serviceName = serviceName; + this.serviceInstanceName = serviceInstanceName; + } + + @Setter + @Getter + @RequiredArgsConstructor + @ToString + public static class KeyValue { + private final String key; + private final String value; + } + + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + ServiceMetaInfo info = (ServiceMetaInfo)o; + return Objects.equals(serviceName, info.serviceName) && + Objects.equals(serviceInstanceName, info.serviceInstanceName); + } + + @Override public int hashCode() { + return Objects.hash(serviceName, serviceInstanceName); + } + + public static final ServiceMetaInfo UNKNOWN = new ServiceMetaInfo("UNKNOWN", "UNKNOWN"); +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis new file mode 100644 index 000000000..c744de58e --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + + +org.apache.skywalking.oap.server.receiver.envoy.als.K8sALSServiceMeshHTTPAnalysis \ No newline at end of file diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResourceTest.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResourceTest.java new file mode 100644 index 000000000..b37ebedeb --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResourceTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als; + +import io.kubernetes.client.ApiException; +import io.kubernetes.client.models.V1ObjectMeta; +import io.kubernetes.client.models.V1OwnerReference; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +@RunWith(Parameterized.class) +public class DependencyResourceTest { + + @Parameterized.Parameter + public String resourceName; + + @Parameterized.Parameter(1) + public ThrowableFunction function; + + @Parameterized.Parameters(name = "{index}: {0}") + public static Collection data() { + return Arrays.asList(new Object[][] { + {"deploy1", (ThrowableFunction) result -> result}, + {"pod1", (ThrowableFunction) result -> { throw new RuntimeException(); } }, + {"pod1", (ThrowableFunction) result -> { throw new ApiException(); } }, + {"pod1", (ThrowableFunction) result -> null}, + {"rs1", (ThrowableFunction) result -> { + result.setOwnerReferences(null); + return result; + } }, + {"rs1", (ThrowableFunction) result -> { + V1OwnerReference reference1 = new V1OwnerReference(); + reference1.setKind("StatefulSet"); + reference1.setName("ss1"); + result.setOwnerReferences(Collections.singletonList(reference1)); + return result; + } }, + }); + } + + @Test + public void test() { + V1ObjectMeta meta = new V1ObjectMeta(); + meta.setName("pod1"); + V1OwnerReference reference = new V1OwnerReference(); + reference.setKind("ReplicaSet"); + reference.setName("rs1"); + meta.addOwnerReferencesItem(reference); + DependencyResource dr = new DependencyResource(meta); + DependencyResource drr = dr.getOwnerResource("ReplicaSet", ownerReference -> { + assertThat(ownerReference.getName(), is("rs1")); + V1ObjectMeta result = new V1ObjectMeta(); + result.setName("rs1"); + V1OwnerReference reference1 = new V1OwnerReference(); + reference1.setKind("Deployment"); + reference1.setName("deploy1"); + result.addOwnerReferencesItem(reference1); + return function.go(result); + }).getOwnerResource("Deployment", ownerReference -> { + assertThat(ownerReference.getName(), is("deploy1")); + V1ObjectMeta result = new V1ObjectMeta(); + result.setName("deploy1"); + return result; + }); + assertThat(drr.getMetadata().getName(), is(resourceName)); + } + + interface ThrowableFunction { + V1ObjectMeta go(final V1ObjectMeta result) throws ApiException; + } + +} \ No newline at end of file diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sHTTPAnalysisTest.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sHTTPAnalysisTest.java new file mode 100644 index 000000000..f54a22b0d --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sHTTPAnalysisTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als; + +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.util.JsonFormat; +import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage; +import java.io.*; +import java.util.*; +import org.apache.skywalking.apm.network.common.DetectPoint; +import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric; +import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig; +import org.apache.skywalking.oap.server.receiver.envoy.MetricServiceGRPCHandlerTestMain; +import org.junit.*; + +public class K8sHTTPAnalysisTest { + + private MockK8sAnalysis analysis; + + @Before + public void setUp() { + analysis = new MockK8sAnalysis(); + analysis.init(null); + } + + @Test + public void testIngressRoleIdentify() throws IOException { + try (InputStreamReader isr = new InputStreamReader(getResourceAsStream("envoy-ingress.msg"))) { + StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder(); + JsonFormat.parser().merge(isr, requestBuilder); + Role identify = analysis.identify(requestBuilder.getIdentifier(), Role.NONE); + + Assert.assertEquals(Role.PROXY, identify); + } + } + + @Test + public void testSidecarRoleIdentify() throws IOException { + try (InputStreamReader isr = new InputStreamReader(getResourceAsStream("envoy-mesh-server-sidecar.msg"))) { + StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder(); + JsonFormat.parser().merge(isr, requestBuilder); + Role identify = analysis.identify(requestBuilder.getIdentifier(), Role.NONE); + + Assert.assertEquals(Role.SIDECAR, identify); + } + } + + @Test + public void testIngressMetric() throws IOException { + try (InputStreamReader isr = new InputStreamReader(getResourceAsStream("envoy-ingress.msg"))) { + StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder(); + JsonFormat.parser().merge(isr, requestBuilder); + + analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.PROXY); + + Assert.assertEquals(2, analysis.metrics.size()); + + ServiceMeshMetric incoming = analysis.metrics.get(0); + Assert.assertEquals("UNKNOWN", incoming.getSourceServiceName()); + Assert.assertEquals("ingress", incoming.getDestServiceName()); + Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint()); + + ServiceMeshMetric outgoing = analysis.metrics.get(1); + Assert.assertEquals("ingress", outgoing.getSourceServiceName()); + Assert.assertEquals("productpage", outgoing.getDestServiceName()); + Assert.assertEquals(DetectPoint.client, outgoing.getDetectPoint()); + } + } + + @Test + public void testIngress2SidecarMetric() throws IOException { + try (InputStreamReader isr = new InputStreamReader(getResourceAsStream("envoy-ingress2sidecar.msg"))) { + StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder(); + JsonFormat.parser().merge(isr, requestBuilder); + + analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR); + + Assert.assertEquals(1, analysis.metrics.size()); + + ServiceMeshMetric incoming = analysis.metrics.get(0); + Assert.assertEquals("", incoming.getSourceServiceName()); + Assert.assertEquals("productpage", incoming.getDestServiceName()); + Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint()); + } + } + + @Test + public void testSidecar2SidecarServerMetric() throws IOException { + try (InputStreamReader isr = new InputStreamReader(getResourceAsStream("envoy-mesh-server-sidecar.msg"))) { + StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder(); + JsonFormat.parser().merge(isr, requestBuilder); + + analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR); + + Assert.assertEquals(1, analysis.metrics.size()); + + ServiceMeshMetric incoming = analysis.metrics.get(0); + Assert.assertEquals("productpage", incoming.getSourceServiceName()); + Assert.assertEquals("review", incoming.getDestServiceName()); + Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint()); + } + } + + @Test + public void testSidecar2SidecarClientMetric() throws IOException { + try (InputStreamReader isr = new InputStreamReader(getResourceAsStream("envoy-mesh-client-sidecar.msg"))) { + StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder(); + JsonFormat.parser().merge(isr, requestBuilder); + + analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR); + + Assert.assertEquals(1, analysis.metrics.size()); + + ServiceMeshMetric incoming = analysis.metrics.get(0); + Assert.assertEquals("productpage", incoming.getSourceServiceName()); + Assert.assertEquals("detail", incoming.getDestServiceName()); + Assert.assertEquals(DetectPoint.client, incoming.getDetectPoint()); + } + } + + public static class MockK8sAnalysis extends K8sALSServiceMeshHTTPAnalysis { + private List metrics = new ArrayList<>(); + + @Override + public void init(EnvoyMetricReceiverConfig config) { + getIpServiceMap().set(ImmutableMap.of( + "10.44.2.56", new ServiceMetaInfo("ingress", "ingress-Inst"), + "10.44.2.54", new ServiceMetaInfo("productpage", "productpage-Inst"), + "10.44.6.66", new ServiceMetaInfo("detail", "detail-Inst"), + "10.44.2.55", new ServiceMetaInfo("review", "detail-Inst") + )); + } + + @Override + protected void forward(ServiceMeshMetric metric) { + metrics.add(metric); + } + } + + private static InputStream getResourceAsStream(final String resource) { + final InputStream in = getContextClassLoader().getResourceAsStream(resource); + return in == null ? MetricServiceGRPCHandlerTestMain.class.getResourceAsStream(resource) : in; + } + + private static ClassLoader getContextClassLoader() { + return Thread.currentThread().getContextClassLoader(); + } +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/envoy-ingress.msg b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/envoy-ingress.msg new file mode 100644 index 000000000..0f4917ef6 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/envoy-ingress.msg @@ -0,0 +1,89 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +{ + "identifier": { + "node": { + "id": "router~10.44.2.56~istio-ingressgateway-699c7dc774-hjxq5.istio-system~istio-system.svc.cluster.local", + "cluster": "istio-ingressgateway", + "metadata": { + "CONFIG_NAMESPACE": "istio-system", + "ISTIO_META_INSTANCE_IPS": "10.44.2.56,10.44.2.56,fe80::9ca5:e5ff:fede:6414", + "ISTIO_PROXY_SHA": "istio-proxy:55c80965eab994e6bfa2227e3942fa89928d0d70", + "ISTIO_PROXY_VERSION": "1.1.0", + "ISTIO_VERSION": "1.0-dev", + "POD_NAME": "istio-ingressgateway-699c7dc774-hjxq5", + "ROUTER_MODE": "sni-dnat", + "istio": "sidecar" + }, + "locality": { }, + "buildVersion": "55c80965eab994e6bfa2227e3942fa89928d0d70/1.10.0-dev/Clean/RELEASE/BoringSSL" + }, + "logName": "als" + }, + "httpLogs": { + "logEntry": [ + { + "commonProperties": { + "downstreamRemoteAddress": { + "socketAddress": { + "address": "10.138.0.14", + "portValue": 51489 + } + }, + "downstreamLocalAddress": { + "socketAddress": { + "address": "10.44.2.56", + "portValue": 80 + } + }, + "startTime": "2019-04-13T03:59:53.687224601Z", + "timeToLastRxByte": "0.000031206s", + "timeToFirstUpstreamTxByte": "0.000869250s", + "timeToLastUpstreamTxByte": "0.000881276s", + "timeToFirstUpstreamRxByte": "1.010010710s", + "timeToLastUpstreamRxByte": "1.010423815s", + "timeToFirstDownstreamTxByte": "1.010053396s", + "timeToLastDownstreamTxByte": "1.010432910s", + "upstreamRemoteAddress": { + "socketAddress": { + "address": "10.44.2.54", + "portValue": 9080 + } + }, + "upstreamCluster": "outbound|9080||productpage.default.svc.cluster.local" + }, + "protocolVersion": "HTTP11", + "request": { + "requestMethod": "GET", + "scheme": "http", + "authority": "35.227.162.132", + "path": "/productpage", + "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.1 Safari/605.1.15", + "forwardedFor": "10.138.0.14", + "requestId": "0ac1feff-84ae-4d3a-8b15-890da2b194c5", + "requestHeadersBytes": "1038" + }, + "response": { + "responseCode": 200, + "responseHeadersBytes": "147", + "responseBodyBytes": "4415" + } + } + ] + } +} \ No newline at end of file diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/envoy-ingress2sidecar.msg b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/envoy-ingress2sidecar.msg new file mode 100644 index 000000000..b641b142a --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/envoy-ingress2sidecar.msg @@ -0,0 +1,99 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +{ + // Mock identifier + "identifier": { + "node": { + "id": "sidecar~10.44.2.54~product-v1-d66dcfdc5-kh6v7.default~default.svc.cluster.local", + "cluster": "product.default", + "metadata": { + "CONFIG_NAMESPACE": "default", + "INTERCEPTION_MODE": "REDIRECT", + "ISTIO_META_INSTANCE_IPS": "10.44.2.54,10.44.2.54,fe80::d8e8:b6ff:fed6:f857", + "ISTIO_PROXY_SHA": "istio-proxy:55c80965eab994e6bfa2227e3942fa89928d0d70", + "ISTIO_PROXY_VERSION": "1.1.0", + "ISTIO_VERSION": "1.0-dev", + "POD_NAME": "product-v1-d66dcfdc5-kh6v7", + "app": "product", + "istio": "sidecar", + "kubernetes.io/limit-ranger": "LimitRanger plugin set: cpu request for container istio-proxy; cpu request for container reviews", + "pod-template-hash": "822879871", + "version": "v1" + }, + "locality": { }, + "buildVersion": "55c80965eab994e6bfa2227e3942fa89928d0d70/1.10.0-dev/Clean/RELEASE/BoringSSL" + }, + "logName": "als" + }, + // Real log sample + "httpLogs": { + "logEntry": [ + { + "commonProperties": { + "downstreamRemoteAddress": { + "socketAddress": { + "address": "10.138.0.14", + "portValue": 0 + } + }, + "downstreamLocalAddress": { + "socketAddress": { + "address": "10.44.2.54", + "portValue": 9080 + } + }, + "startTime": "2019-04-13T03:59:53.688609181Z", + "timeToLastRxByte": "0.000081758s", + "timeToFirstUpstreamTxByte": "0.000789220s", + "timeToLastUpstreamTxByte": "0.000808326s", + "timeToFirstUpstreamRxByte": "1.008120501s", + "timeToLastUpstreamRxByte": "1.008369826s", + "timeToFirstDownstreamTxByte": "1.008242458s", + "timeToLastDownstreamTxByte": "1.008378251s", + "upstreamRemoteAddress": { + "socketAddress": { + "address": "127.0.0.1", + "portValue": 9080 + } + }, + "upstreamCluster": "inbound|9080|http|productpage.default.svc.cluster.local", + "metadata": { + "filterMetadata": { + "istio_authn": { } + } + } + }, + "protocolVersion": "HTTP11", + "request": { + "requestMethod": "GET", + "scheme": "http", + "authority": "35.227.162.132", + "path": "/productpage", + "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.1 Safari/605.1.15", + "forwardedFor": "10.138.0.14", + "requestId": "0ac1feff-84ae-4d3a-8b15-890da2b194c5", + "requestHeadersBytes": "579" + }, + "response": { + "responseCode": 200, + "responseHeadersBytes": "147", + "responseBodyBytes": "4415" + } + } + ] + } +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/envoy-mesh-client-sidecar.msg b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/envoy-mesh-client-sidecar.msg new file mode 100644 index 000000000..104981823 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/envoy-mesh-client-sidecar.msg @@ -0,0 +1,92 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +{ + "identifier": { + "node": { + "id": "sidecar~10.44.2.55~productpage-v1-d66dcfdc5-kh6v7.default~default.svc.cluster.local", + "cluster": "productpage.default", + "metadata": { + "CONFIG_NAMESPACE": "default", + "INTERCEPTION_MODE": "REDIRECT", + "ISTIO_META_INSTANCE_IPS": "10.44.2.55,10.44.2.55,fe80::d8e8:b6ff:fed6:f857", + "ISTIO_PROXY_SHA": "istio-proxy:55c80965eab994e6bfa2227e3942fa89928d0d70", + "ISTIO_PROXY_VERSION": "1.1.0", + "ISTIO_VERSION": "1.0-dev", + "POD_NAME": "productpage-v1-d66dcfdc5-kh6v7", + "app": "productpage", + "istio": "sidecar", + "kubernetes.io/limit-ranger": "LimitRanger plugin set: cpu request for container istio-proxy; cpu request for container reviews", + "pod-template-hash": "822879871", + "version": "v1" + }, + "locality": { }, + "buildVersion": "55c80965eab994e6bfa2227e3942fa89928d0d70/1.10.0-dev/Clean/RELEASE/BoringSSL" + }, + "logName": "als" + }, + "httpLogs": { + "logEntry": [ + { + "commonProperties": { + "downstreamRemoteAddress": { + "socketAddress": { + "address": "10.44.2.54", + "portValue": 58996 + } + }, + "downstreamLocalAddress": { + "socketAddress": { + "address": "10.47.247.180", + "portValue": 9080 + } + }, + "startTime": "2019-04-13T03:59:53.695750999Z", + "timeToLastRxByte": "0.000082339s", + "timeToFirstUpstreamTxByte": "0.002353100s", + "timeToLastUpstreamTxByte": "0.002362295s", + "timeToFirstUpstreamRxByte": "0.010500490s", + "timeToLastUpstreamRxByte": "0.010735195s", + "timeToFirstDownstreamTxByte": "0.010669993s", + "timeToLastDownstreamTxByte": "0.010745496s", + "upstreamRemoteAddress": { + "socketAddress": { + "address": "10.44.6.66", + "portValue": 9080 + } + }, + "upstreamCluster": "outbound|9080||details.default.svc.cluster.local" + }, + "protocolVersion": "HTTP11", + "request": { + "requestMethod": "GET", + "scheme": "http", + "authority": "details:9080", + "path": "/details/0", + "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.1 Safari/605.1.15", + "requestId": "0ac1feff-84ae-4d3a-8b15-890da2b194c5", + "requestHeadersBytes": "869" + }, + "response": { + "responseCode": 200, + "responseHeadersBytes": "129", + "responseBodyBytes": "178" + } + } + ] + } + } +} \ No newline at end of file diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/envoy-mesh-server-sidecar.msg b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/envoy-mesh-server-sidecar.msg new file mode 100644 index 000000000..d02ce2bc9 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/envoy-mesh-server-sidecar.msg @@ -0,0 +1,96 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +{ + "identifier": { + "node": { + "id": "sidecar~10.44.2.55~reviews-v1-d66dcfdc5-kh6v7.default~default.svc.cluster.local", + "cluster": "reviews.default", + "metadata": { + "CONFIG_NAMESPACE": "default", + "INTERCEPTION_MODE": "REDIRECT", + "ISTIO_META_INSTANCE_IPS": "10.44.2.55,10.44.2.55,fe80::d8e8:b6ff:fed6:f857", + "ISTIO_PROXY_SHA": "istio-proxy:55c80965eab994e6bfa2227e3942fa89928d0d70", + "ISTIO_PROXY_VERSION": "1.1.0", + "ISTIO_VERSION": "1.0-dev", + "POD_NAME": "reviews-v1-d66dcfdc5-kh6v7", + "app": "reviews", + "istio": "sidecar", + "kubernetes.io/limit-ranger": "LimitRanger plugin set: cpu request for container istio-proxy; cpu request for container reviews", + "pod-template-hash": "822879871", + "version": "v1" + }, + "locality": { }, + "buildVersion": "55c80965eab994e6bfa2227e3942fa89928d0d70/1.10.0-dev/Clean/RELEASE/BoringSSL" + }, + "logName": "als" + }, + "httpLogs": { + "logEntry": [ + { + "commonProperties": { + "downstreamRemoteAddress": { + "socketAddress": { + "address": "10.44.2.54", + "portValue": 58356 + } + }, + "downstreamLocalAddress": { + "socketAddress": { + "address": "10.44.2.55", + "portValue": 9080 + } + }, + "startTime": "2019-04-13T03:59:53.712690678Z", + "timeToLastRxByte": "0.000127695s", + "timeToFirstUpstreamTxByte": "0.000841545s", + "timeToLastUpstreamTxByte": "0.000854020s", + "timeToFirstUpstreamRxByte": "0.977617052s", + "timeToLastUpstreamRxByte": "0.977797037s", + "timeToFirstDownstreamTxByte": "0.977764621s", + "timeToLastDownstreamTxByte": "0.977811534s", + "upstreamRemoteAddress": { + "socketAddress": { + "address": "127.0.0.1", + "portValue": 9080 + } + }, + "upstreamCluster": "inbound|9080|http|reviews.default.svc.cluster.local", + "metadata": { + "filterMetadata": { + "istio_authn": { } + } + } + }, + "protocolVersion": "HTTP11", + "request": { + "requestMethod": "GET", + "scheme": "http", + "authority": "reviews:9080", + "path": "/reviews/0", + "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.1 Safari/605.1.15", + "requestId": "0ac1feff-84ae-4d3a-8b15-890da2b194c5", + "requestHeadersBytes": "423" + }, + "response": { + "responseCode": 200, + "responseHeadersBytes": "181", + "responseBodyBytes": "295" + } + } + ] + } +} \ No newline at end of file diff --git a/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/envoy/api/v2/core/address.proto b/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/envoy/api/v2/core/address.proto new file mode 100644 index 000000000..6e76f5b72 --- /dev/null +++ b/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/envoy/api/v2/core/address.proto @@ -0,0 +1,121 @@ +syntax = "proto3"; + +package envoy.api.v2.core; + +option java_outer_classname = "AddressProto"; +option java_multiple_files = true; +option java_package = "io.envoyproxy.envoy.api.v2.core"; + +import "envoy/api/v2/core/base.proto"; + +import "google/protobuf/wrappers.proto"; + +import "validate/validate.proto"; +import "gogoproto/gogo.proto"; + +option (gogoproto.equal_all) = true; + +// [#protodoc-title: Network addresses] + +message Pipe { + // Unix Domain Socket path. On Linux, paths starting with '@' will use the + // abstract namespace. The starting '@' is replaced by a null byte by Envoy. + // Paths starting with '@' will result in an error in environments other than + // Linux. + string path = 1 [(validate.rules).string.min_bytes = 1]; +} + +message SocketAddress { + enum Protocol { + option (gogoproto.goproto_enum_prefix) = false; + TCP = 0; + // [#not-implemented-hide:] + UDP = 1; + } + Protocol protocol = 1 [(validate.rules).enum.defined_only = true]; + // The address for this socket. :ref:`Listeners ` will bind + // to the address. An empty address is not allowed. Specify ``0.0.0.0`` or ``::`` + // to bind to any address. [#comment:TODO(zuercher) reinstate when implemented: + // It is possible to distinguish a Listener address via the prefix/suffix matching + // in :ref:`FilterChainMatch `.] When used + // within an upstream :ref:`BindConfig `, the address + // controls the source address of outbound connections. For :ref:`clusters + // `, the cluster type determines whether the + // address must be an IP (*STATIC* or *EDS* clusters) or a hostname resolved by DNS + // (*STRICT_DNS* or *LOGICAL_DNS* clusters). Address resolution can be customized + // via :ref:`resolver_name `. + string address = 2 [(validate.rules).string.min_bytes = 1]; + oneof port_specifier { + option (validate.required) = true; + uint32 port_value = 3 [(validate.rules).uint32.lte = 65535]; + // This is only valid if :ref:`resolver_name + // ` is specified below and the + // named resolver is capable of named port resolution. + string named_port = 4; + } + // The name of the resolver. This must have been registered with Envoy. If this is + // empty, a context dependent default applies. If address is a hostname this + // should be set for resolution other than DNS. If the address is a concrete + // IP address, no resolution will occur. + string resolver_name = 5; + + // When binding to an IPv6 address above, this enables `IPv4 compatibility + // `_. Binding to ``::`` will + // allow both IPv4 and IPv6 connections, with peer IPv4 addresses mapped into + // IPv6 space as ``::FFFF:``. + bool ipv4_compat = 6; +} + +message TcpKeepalive { + // Maximum number of keepalive probes to send without response before deciding + // the connection is dead. Default is to use the OS level configuration (unless + // overridden, Linux defaults to 9.) + google.protobuf.UInt32Value keepalive_probes = 1; + // The number of seconds a connection needs to be idle before keep-alive probes + // start being sent. Default is to use the OS level configuration (unless + // overridden, Linux defaults to 7200s (ie 2 hours.) + google.protobuf.UInt32Value keepalive_time = 2; + // The number of seconds between keep-alive probes. Default is to use the OS + // level configuration (unless overridden, Linux defaults to 75s.) + google.protobuf.UInt32Value keepalive_interval = 3; +} + +message BindConfig { + // The address to bind to when creating a socket. + SocketAddress source_address = 1 + [(validate.rules).message.required = true, (gogoproto.nullable) = false]; + + // Whether to set the *IP_FREEBIND* option when creating the socket. When this + // flag is set to true, allows the :ref:`source_address + // ` to be an IP address + // that is not configured on the system running Envoy. When this flag is set + // to false, the option *IP_FREEBIND* is disabled on the socket. When this + // flag is not set (default), the socket is not modified, i.e. the option is + // neither enabled nor disabled. + google.protobuf.BoolValue freebind = 2; + + // Additional socket options that may not be present in Envoy source code or + // precompiled binaries. + repeated SocketOption socket_options = 3; +} + +// Addresses specify either a logical or physical address and port, which are +// used to tell Envoy where to bind/listen, connect to upstream and find +// management servers. +message Address { + oneof address { + option (validate.required) = true; + + SocketAddress socket_address = 1; + Pipe pipe = 2; + } +} + +// CidrRange specifies an IP Address and a prefix length to construct +// the subnet mask for a `CIDR `_ range. +message CidrRange { + // IPv4 or IPv6 address, e.g. ``192.0.0.0`` or ``2001:db8::``. + string address_prefix = 1 [(validate.rules).string.min_bytes = 1]; + // Length of prefix, e.g. 0, 32. + google.protobuf.UInt32Value prefix_len = 2 [(validate.rules).uint32.lte = 128]; +} diff --git a/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/envoy/api/v2/core/base.proto b/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/envoy/api/v2/core/base.proto index 3be09cdea..6b4e931cf 100644 --- a/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/envoy/api/v2/core/base.proto +++ b/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/envoy/api/v2/core/base.proto @@ -17,6 +17,7 @@ import "gogoproto/gogo.proto"; import "envoy/type/percent.proto"; option (gogoproto.equal_all) = true; +option (gogoproto.stable_marshaler_all) = true; // [#protodoc-title: Common types] @@ -78,3 +79,178 @@ message Node { // by Envoy in management server RPCs. string build_version = 5; } + +// Metadata provides additional inputs to filters based on matched listeners, +// filter chains, routes and endpoints. It is structured as a map, usually from +// filter name (in reverse DNS format) to metadata specific to the filter. Metadata +// key-values for a filter are merged as connection and request handling occurs, +// with later values for the same key overriding earlier values. +// +// An example use of metadata is providing additional values to +// http_connection_manager in the envoy.http_connection_manager.access_log +// namespace. +// +// Another example use of metadata is to per service config info in cluster metadata, which may get +// consumed by multiple filters. +// +// For load balancing, Metadata provides a means to subset cluster endpoints. +// Endpoints have a Metadata object associated and routes contain a Metadata +// object to match against. There are some well defined metadata used today for +// this purpose: +// +// * ``{"envoy.lb": {"canary": }}`` This indicates the canary status of an +// endpoint and is also used during header processing +// (x-envoy-upstream-canary) and for stats purposes. +message Metadata { + // Key is the reverse DNS filter name, e.g. com.acme.widget. The envoy.* + // namespace is reserved for Envoy's built-in filters. + map filter_metadata = 1; +} + +// Runtime derived uint32 with a default when not specified. +message RuntimeUInt32 { + // Default value if runtime value is not available. + uint32 default_value = 2; + + // Runtime key to get value for comparison. This value is used if defined. + string runtime_key = 3 [(validate.rules).string.min_bytes = 1]; +} + +// Envoy supports :ref:`upstream priority routing +// ` both at the route and the virtual +// cluster level. The current priority implementation uses different connection +// pool and circuit breaking settings for each priority level. This means that +// even for HTTP/2 requests, two physical connections will be used to an +// upstream host. In the future Envoy will likely support true HTTP/2 priority +// over a single upstream connection. +enum RoutingPriority { + DEFAULT = 0; + HIGH = 1; +} + +// HTTP request method. +enum RequestMethod { + option (gogoproto.goproto_enum_prefix) = false; + METHOD_UNSPECIFIED = 0; + GET = 1; + HEAD = 2; + POST = 3; + PUT = 4; + DELETE = 5; + CONNECT = 6; + OPTIONS = 7; + TRACE = 8; +} + +// Header name/value pair. +message HeaderValue { + // Header name. + string key = 1 [(validate.rules).string = {min_bytes: 1, max_bytes: 16384}]; + + // Header value. + // + // The same :ref:`format specifier ` as used for + // :ref:`HTTP access logging ` applies here, however + // unknown header values are replaced with the empty string instead of `-`. + string value = 2 [(validate.rules).string.max_bytes = 16384]; +} + +// Header name/value pair plus option to control append behavior. +message HeaderValueOption { + // Header name/value pair that this option applies to. + HeaderValue header = 1 [(validate.rules).message.required = true]; + + // Should the value be appended? If true (default), the value is appended to + // existing values. + google.protobuf.BoolValue append = 2; +} + +// Wrapper for a set of headers. +message HeaderMap { + repeated HeaderValue headers = 1; +} + +// Data source consisting of either a file or an inline value. +message DataSource { + oneof specifier { + option (validate.required) = true; + + // Local filesystem data source. + string filename = 1 [(validate.rules).string.min_bytes = 1]; + + // Bytes inlined in the configuration. + bytes inline_bytes = 2 [(validate.rules).bytes.min_len = 1]; + + // String inlined in the configuration. + string inline_string = 3 [(validate.rules).string.min_bytes = 1]; + } +} + +// Configuration for transport socket in :ref:`listeners ` and +// :ref:`clusters `. If the configuration is +// empty, a default transport socket implementation and configuration will be +// chosen based on the platform and existence of tls_context. +message TransportSocket { + // The name of the transport socket to instantiate. The name must match a supported transport + // socket implementation. + string name = 1 [(validate.rules).string.min_bytes = 1]; + + // Implementation specific configuration which depends on the implementation being instantiated. + // See the supported transport socket implementations for further documentation. + oneof config_type { + google.protobuf.Struct config = 2; + + google.protobuf.Any typed_config = 3; + } +} + +// Generic socket option message. This would be used to set socket options that +// might not exist in upstream kernels or precompiled Envoy binaries. +message SocketOption { + // An optional name to give this socket option for debugging, etc. + // Uniqueness is not required and no special meaning is assumed. + string description = 1; + // Corresponding to the level value passed to setsockopt, such as IPPROTO_TCP + int64 level = 2; + // The numeric name as passed to setsockopt + int64 name = 3; + oneof value { + option (validate.required) = true; + + // Because many sockopts take an int value. + int64 int_value = 4; + // Otherwise it's a byte buffer. + bytes buf_value = 5; + } + enum SocketState { + option (gogoproto.goproto_enum_prefix) = false; + // Socket options are applied after socket creation but before binding the socket to a port + STATE_PREBIND = 0; + // Socket options are applied after binding the socket to a port but before calling listen() + STATE_BOUND = 1; + // Socket options are applied after calling listen() + STATE_LISTENING = 2; + } + // The state in which the option will be applied. When used in BindConfig + // STATE_PREBIND is currently the only valid value. + SocketState state = 6 + [(validate.rules).message.required = true, (validate.rules).enum.defined_only = true]; +} + +// Runtime derived FractionalPercent with defaults for when the numerator or denominator is not +// specified via a runtime key. +message RuntimeFractionalPercent { + // Default value if the runtime value's for the numerator/denominator keys are not available. + envoy.type.FractionalPercent default_value = 1 [(validate.rules).message.required = true]; + + // Runtime key for a YAML representation of a FractionalPercent. + string runtime_key = 2; +} + +// Identifies a specific ControlPlane instance that Envoy is connected to. +message ControlPlane { + // An opaque control plane identifier that uniquely identifies an instance + // of control plane. This can be used to identify which control plane instance, + // the Envoy is connected to. + string identifier = 1; +} diff --git a/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/envoy/data/accesslog/v2/accesslog.proto b/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/envoy/data/accesslog/v2/accesslog.proto new file mode 100644 index 000000000..b38743339 --- /dev/null +++ b/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/envoy/data/accesslog/v2/accesslog.proto @@ -0,0 +1,335 @@ +syntax = "proto3"; + +package envoy.data.accesslog.v2; + +option java_outer_classname = "AccesslogProto"; +option java_multiple_files = true; +option java_package = "io.envoyproxy.envoy.data.accesslog.v2"; + +import "envoy/api/v2/core/address.proto"; +import "envoy/api/v2/core/base.proto"; + +import "google/protobuf/duration.proto"; +import "google/protobuf/timestamp.proto"; +import "google/protobuf/wrappers.proto"; +import "gogoproto/gogo.proto"; +import "validate/validate.proto"; + +option (gogoproto.stable_marshaler_all) = true; + +// [#protodoc-title: gRPC access logs] +// Envoy access logs describe incoming interaction with Envoy over a fixed +// period of time, and typically cover a single request/response exchange, +// (e.g. HTTP), stream (e.g. over HTTP/gRPC), or proxied connection (e.g. TCP). +// Access logs contain fields defined in protocol-specific protobuf messages. +// +// Except where explicitly declared otherwise, all fields describe +// *downstream* interaction between Envoy and a connected client. +// Fields describing *upstream* interaction will explicitly include ``upstream`` +// in their name. + +// [#not-implemented-hide:] +message TCPAccessLogEntry { + // Common properties shared by all Envoy access logs. + AccessLogCommon common_properties = 1; +} + +message HTTPAccessLogEntry { + // Common properties shared by all Envoy access logs. + AccessLogCommon common_properties = 1; + + // HTTP version + enum HTTPVersion { + PROTOCOL_UNSPECIFIED = 0; + HTTP10 = 1; + HTTP11 = 2; + HTTP2 = 3; + } + HTTPVersion protocol_version = 2; + + // Description of the incoming HTTP request. + HTTPRequestProperties request = 3; + + // Description of the outgoing HTTP response. + HTTPResponseProperties response = 4; +} + +// Defines fields that are shared by all Envoy access logs. +message AccessLogCommon { + // [#not-implemented-hide:] + // This field indicates the rate at which this log entry was sampled. + // Valid range is (0.0, 1.0]. + double sample_rate = 1 [(validate.rules).double.gt = 0.0, (validate.rules).double.lte = 1.0]; + + // This field is the remote/origin address on which the request from the user was received. + // Note: This may not be the physical peer. E.g, if the remote address is inferred from for + // example the x-forwarder-for header, proxy protocol, etc. + envoy.api.v2.core.Address downstream_remote_address = 2; + + // This field is the local/destination address on which the request from the user was received. + envoy.api.v2.core.Address downstream_local_address = 3; + + // If the connection is secure,S this field will contain TLS properties. + TLSProperties tls_properties = 4; + + // The time that Envoy started servicing this request. This is effectively the time that the first + // downstream byte is received. + google.protobuf.Timestamp start_time = 5 [(gogoproto.stdtime) = true]; + + // Interval between the first downstream byte received and the last + // downstream byte received (i.e. time it takes to receive a request). + google.protobuf.Duration time_to_last_rx_byte = 6 [(gogoproto.stdduration) = true]; + + // Interval between the first downstream byte received and the first upstream byte sent. There may + // by considerable delta between *time_to_last_rx_byte* and this value due to filters. + // Additionally, the same caveats apply as documented in *time_to_last_downstream_tx_byte* about + // not accounting for kernel socket buffer time, etc. + google.protobuf.Duration time_to_first_upstream_tx_byte = 7 [(gogoproto.stdduration) = true]; + + // Interval between the first downstream byte received and the last upstream byte sent. There may + // by considerable delta between *time_to_last_rx_byte* and this value due to filters. + // Additionally, the same caveats apply as documented in *time_to_last_downstream_tx_byte* about + // not accounting for kernel socket buffer time, etc. + google.protobuf.Duration time_to_last_upstream_tx_byte = 8 [(gogoproto.stdduration) = true]; + + // Interval between the first downstream byte received and the first upstream + // byte received (i.e. time it takes to start receiving a response). + google.protobuf.Duration time_to_first_upstream_rx_byte = 9 [(gogoproto.stdduration) = true]; + + // Interval between the first downstream byte received and the last upstream + // byte received (i.e. time it takes to receive a complete response). + google.protobuf.Duration time_to_last_upstream_rx_byte = 10 [(gogoproto.stdduration) = true]; + + // Interval between the first downstream byte received and the first downstream byte sent. + // There may be a considerable delta between the *time_to_first_upstream_rx_byte* and this field + // due to filters. Additionally, the same caveats apply as documented in + // *time_to_last_downstream_tx_byte* about not accounting for kernel socket buffer time, etc. + google.protobuf.Duration time_to_first_downstream_tx_byte = 11 [(gogoproto.stdduration) = true]; + + // Interval between the first downstream byte received and the last downstream byte sent. + // Depending on protocol, buffering, windowing, filters, etc. there may be a considerable delta + // between *time_to_last_upstream_rx_byte* and this field. Note also that this is an approximate + // time. In the current implementation it does not include kernel socket buffer time. In the + // current implementation it also does not include send window buffering inside the HTTP/2 codec. + // In the future it is likely that work will be done to make this duration more accurate. + google.protobuf.Duration time_to_last_downstream_tx_byte = 12 [(gogoproto.stdduration) = true]; + + // The upstream remote/destination address that handles this exchange. This does not include + // retries. + envoy.api.v2.core.Address upstream_remote_address = 13; + + // The upstream local/origin address that handles this exchange. This does not include retries. + envoy.api.v2.core.Address upstream_local_address = 14; + + // The upstream cluster that *upstream_remote_address* belongs to. + string upstream_cluster = 15; + + // Flags indicating occurrences during request/response processing. + ResponseFlags response_flags = 16; + + // All metadata encountered during request processing, including endpoint + // selection. + // + // This can be used to associate IDs attached to the various configurations + // used to process this request with the access log entry. For example, a + // route created from a higher level forwarding rule with some ID can place + // that ID in this field and cross reference later. It can also be used to + // determine if a canary endpoint was used or not. + envoy.api.v2.core.Metadata metadata = 17; + + // If upstream connection failed due to transport socket (e.g. TLS handshake), provides the + // failure reason from the transport socket. The format of this field depends on the configured + // upstream transport socket. Common TLS failures are in + // :ref:`TLS trouble shooting `. + string upstream_transport_failure_reason = 18; +} + +// Flags indicating occurrences during request/response processing. +message ResponseFlags { + // Indicates local server healthcheck failed. + bool failed_local_healthcheck = 1; + + // Indicates there was no healthy upstream. + bool no_healthy_upstream = 2; + + // Indicates an there was an upstream request timeout. + bool upstream_request_timeout = 3; + + // Indicates local codec level reset was sent on the stream. + bool local_reset = 4; + + // Indicates remote codec level reset was received on the stream. + bool upstream_remote_reset = 5; + + // Indicates there was a local reset by a connection pool due to an initial connection failure. + bool upstream_connection_failure = 6; + + // Indicates the stream was reset due to an upstream connection termination. + bool upstream_connection_termination = 7; + + // Indicates the stream was reset because of a resource overflow. + bool upstream_overflow = 8; + + // Indicates no route was found for the request. + bool no_route_found = 9; + + // Indicates that the request was delayed before proxying. + bool delay_injected = 10; + + // Indicates that the request was aborted with an injected error code. + bool fault_injected = 11; + + // Indicates that the request was rate-limited locally. + bool rate_limited = 12; + + message Unauthorized { + // Reasons why the request was unauthorized + enum Reason { + REASON_UNSPECIFIED = 0; + // The request was denied by the external authorization service. + EXTERNAL_SERVICE = 1; + } + + Reason reason = 1; + } + + // Indicates if the request was deemed unauthorized and the reason for it. + Unauthorized unauthorized_details = 13; + + // Indicates that the request was rejected because there was an error in rate limit service. + bool rate_limit_service_error = 14; + + // Indicates the stream was reset due to a downstream connection termination. + bool downstream_connection_termination = 15; + + // Indicates that the upstream retry limit was exceeded, resulting in a downstream error. + bool upstream_retry_limit_exceeded = 16; + + // Indicates that the stream idle timeout was hit, resulting in a downstream 408. + bool stream_idle_timeout = 17; +} + +// Properties of a negotiated TLS connection. +message TLSProperties { + // [#not-implemented-hide:] + enum TLSVersion { + VERSION_UNSPECIFIED = 0; + TLSv1 = 1; + TLSv1_1 = 2; + TLSv1_2 = 3; + TLSv1_3 = 4; + } + // [#not-implemented-hide:] + // Version of TLS that was negotiated. + TLSVersion tls_version = 1; + + // [#not-implemented-hide:] + // TLS cipher suite negotiated during handshake. The value is a + // four-digit hex code defined by the IANA TLS Cipher Suite Registry + // (e.g. ``009C`` for ``TLS_RSA_WITH_AES_128_GCM_SHA256``). + // + // Here it is expressed as an integer. + google.protobuf.UInt32Value tls_cipher_suite = 2; + + // SNI hostname from handshake. + string tls_sni_hostname = 3; + + message CertificateProperties { + message SubjectAltName { + oneof san { + string uri = 1; + // [#not-implemented-hide:] + string dns = 2; + } + } + + // SANs present in the certificate. + repeated SubjectAltName subject_alt_name = 1; + + // The subject field of the certificate. + string subject = 2; + } + + // Properties of the local certificate used to negotiate TLS. + CertificateProperties local_certificate_properties = 4; + + // Properties of the peer certificate used to negotiate TLS. + CertificateProperties peer_certificate_properties = 5; +} + +message HTTPRequestProperties { + // The request method (RFC 7231/2616). + // [#comment:TODO(htuch): add (validate.rules).enum.defined_only = true once + // https://github.com/lyft/protoc-gen-validate/issues/42 is resolved.] + envoy.api.v2.core.RequestMethod request_method = 1; + + // The scheme portion of the incoming request URI. + string scheme = 2; + + // HTTP/2 ``:authority`` or HTTP/1.1 ``Host`` header value. + string authority = 3; + + // The port of the incoming request URI + // (unused currently, as port is composed onto authority). + google.protobuf.UInt32Value port = 4; + + // The path portion from the incoming request URI. + string path = 5; + + // Value of the ``User-Agent`` request header. + string user_agent = 6; + + // Value of the ``Referer`` request header. + string referer = 7; + + // Value of the ``X-Forwarded-For`` request header. + string forwarded_for = 8; + + // Value of the ``X-Request-Id`` request header + // + // This header is used by Envoy to uniquely identify a request. + // It will be generated for all external requests and internal requests that + // do not already have a request ID. + string request_id = 9; + + // Value of the ``X-Envoy-Original-Path`` request header. + string original_path = 10; + + // Size of the HTTP request headers in bytes. + // + // This value is captured from the OSI layer 7 perspective, i.e. it does not + // include overhead from framing or encoding at other networking layers. + uint64 request_headers_bytes = 11; + + // Size of the HTTP request body in bytes. + // + // This value is captured from the OSI layer 7 perspective, i.e. it does not + // include overhead from framing or encoding at other networking layers. + uint64 request_body_bytes = 12; + + // Map of additional headers that have been configured to be logged. + map request_headers = 13; +} + +message HTTPResponseProperties { + // The HTTP response code returned by Envoy. + google.protobuf.UInt32Value response_code = 1; + + // Size of the HTTP response headers in bytes. + // + // This value is captured from the OSI layer 7 perspective, i.e. it does not + // include overhead from framing or encoding at other networking layers. + uint64 response_headers_bytes = 2; + + // Size of the HTTP response body in bytes. + // + // This value is captured from the OSI layer 7 perspective, i.e. it does not + // include overhead from framing or encoding at other networking layers. + uint64 response_body_bytes = 3; + + // Map of additional headers configured to be logged. + map response_headers = 4; + + // Map of trailers configured to be logged. + map response_trailers = 5; +} diff --git a/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/envoy/service/accesslog/v2/als.proto b/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/envoy/service/accesslog/v2/als.proto new file mode 100644 index 000000000..1ee6ccd00 --- /dev/null +++ b/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/envoy/service/accesslog/v2/als.proto @@ -0,0 +1,73 @@ +syntax = "proto3"; + +package envoy.service.accesslog.v2; + +option java_outer_classname = "AlsProto"; +option java_multiple_files = true; +option java_package = "io.envoyproxy.envoy.service.accesslog.v2"; +option go_package = "v2"; +option java_generic_services = true; + +import "envoy/api/v2/core/base.proto"; +import "envoy/data/accesslog/v2/accesslog.proto"; + +import "validate/validate.proto"; + +// [#protodoc-title: gRPC Access Log Service (ALS)] + +// Service for streaming access logs from Envoy to an access log server. +service AccessLogService { + // Envoy will connect and send StreamAccessLogsMessage messages forever. It does not expect any + // response to be sent as nothing would be done in the case of failure. The server should + // disconnect if it expects Envoy to reconnect. In the future we may decide to add a different + // API for "critical" access logs in which Envoy will buffer access logs for some period of time + // until it gets an ACK so it could then retry. This API is designed for high throughput with the + // expectation that it might be lossy. + rpc StreamAccessLogs(stream StreamAccessLogsMessage) returns (StreamAccessLogsResponse) { + } +} + +// Empty response for the StreamAccessLogs API. Will never be sent. See below. +message StreamAccessLogsResponse { +} + +// Stream message for the StreamAccessLogs API. Envoy will open a stream to the server and stream +// access logs without ever expecting a response. +message StreamAccessLogsMessage { + message Identifier { + // The node sending the access log messages over the stream. + envoy.api.v2.core.Node node = 1 [(validate.rules).message.required = true]; + + // The friendly name of the log configured in :ref:`CommonGrpcAccessLogConfig + // `. + string log_name = 2 [(validate.rules).string.min_bytes = 1]; + } + + // Identifier data that will only be sent in the first message on the stream. This is effectively + // structured metadata and is a performance optimization. + Identifier identifier = 1; + + // Wrapper for batches of HTTP access log entries. + message HTTPAccessLogEntries { + repeated envoy.data.accesslog.v2.HTTPAccessLogEntry log_entry = 1 + [(validate.rules).repeated .min_items = 1]; + } + + // [#not-implemented-hide:] + // Wrapper for batches of TCP access log entries. + message TCPAccessLogEntries { + repeated envoy.data.accesslog.v2.TCPAccessLogEntry log_entry = 1 + [(validate.rules).repeated .min_items = 1]; + } + + // Batches of log entries of a single type. Generally speaking, a given stream should only + // ever include one type of log entry. + oneof log_entries { + option (validate.required) = true; + + HTTPAccessLogEntries http_logs = 2; + + // [#not-implemented-hide:] + TCPAccessLogEntries tcp_logs = 3; + } +} diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/ServiceMeshMetricDataDecorator.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/ServiceMeshMetricDataDecorator.java index 6d3be3b46..f3ea308bd 100644 --- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/ServiceMeshMetricDataDecorator.java +++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/ServiceMeshMetricDataDecorator.java @@ -21,6 +21,7 @@ package org.apache.skywalking.aop.server.receiver.mesh; import com.google.gson.JsonObject; import org.apache.skywalking.apm.network.common.DetectPoint; import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric; +import org.apache.skywalking.apm.util.StringUtil; import org.apache.skywalking.oap.server.core.Const; import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory; import org.apache.skywalking.oap.server.receiver.sharing.server.CoreRegisterLinker; @@ -46,12 +47,18 @@ public class ServiceMeshMetricDataDecorator { boolean isRegistered = true; sourceServiceId = origin.getSourceServiceId(); if (sourceServiceId == Const.NONE) { - sourceServiceId = CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(origin.getSourceServiceName(), null); - if (sourceServiceId != Const.NONE) { - getNewDataBuilder().setSourceServiceId(sourceServiceId); - } else { - isRegistered = false; + String sourceServiceName = origin.getSourceServiceName(); + // sourceServiceName is optional now, + // which means only generate dest service traffic. + if (!StringUtil.isEmpty(sourceServiceName)) { + sourceServiceId = CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(sourceServiceName, null); + if (sourceServiceId != Const.NONE) { + getNewDataBuilder().setSourceServiceId(sourceServiceId); + } else { + isRegistered = false; + } } + // No service name, service instance will be ignored too. } sourceServiceInstanceId = origin.getSourceServiceInstanceId(); if (sourceServiceId != Const.NONE && sourceServiceInstanceId == Const.NONE) { diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java index dbe069b42..fe2bb43d5 100644 --- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java +++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java @@ -98,8 +98,13 @@ public class TelemetryDataDispatcher { toServiceInstance(decorator, minuteTimeBucket); toEndpoint(decorator, minuteTimeBucket); } - toServiceRelation(decorator, minuteTimeBucket); - toServiceInstanceRelation(decorator, minuteTimeBucket); + + int sourceServiceId = metrics.getSourceServiceId(); + // Don't generate relation, if no source. + if (sourceServiceId != Const.NONE) { + toServiceRelation(decorator, minuteTimeBucket); + toServiceInstanceRelation(decorator, minuteTimeBucket); + } } private static void heartbeat(ServiceMeshMetricDataDecorator decorator, long minuteTimeBucket) { @@ -108,20 +113,23 @@ public class TelemetryDataDispatcher { int heartbeatCycle = 10000; // source int instanceId = metrics.getSourceServiceInstanceId(); - ServiceInstanceInventory serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId); - if (Objects.nonNull(serviceInstanceInventory)) { - if (metrics.getEndTime() - serviceInstanceInventory.getHeartbeatTime() > heartbeatCycle) { - // trigger heartbeat every 10s. - SERVICE_INSTANCE_INVENTORY_REGISTER.heartbeat(metrics.getSourceServiceInstanceId(), metrics.getEndTime()); - SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metrics.getEndTime()); + // Don't generate source heartbeat, if no source. + if (instanceId != Const.NONE) { + ServiceInstanceInventory serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId); + if (Objects.nonNull(serviceInstanceInventory)) { + if (metrics.getEndTime() - serviceInstanceInventory.getHeartbeatTime() > heartbeatCycle) { + // trigger heartbeat every 10s. + SERVICE_INSTANCE_INVENTORY_REGISTER.heartbeat(metrics.getSourceServiceInstanceId(), metrics.getEndTime()); + SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metrics.getEndTime()); + } + } else { + logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId); } - } else { - logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId); } // dest instanceId = metrics.getDestServiceInstanceId(); - serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId); + ServiceInstanceInventory serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId); if (Objects.nonNull(serviceInstanceInventory)) { if (metrics.getEndTime() - serviceInstanceInventory.getHeartbeatTime() > heartbeatCycle) { // trigger heartbeat every 10s. diff --git a/oap-server/server-starter/src/main/assembly/application.yml b/oap-server/server-starter/src/main/assembly/application.yml index 05edfc04f..708b1a67b 100644 --- a/oap-server/server-starter/src/main/assembly/application.yml +++ b/oap-server/server-starter/src/main/assembly/application.yml @@ -105,6 +105,7 @@ istio-telemetry: default: envoy-metric: default: +# alsHTTPAnalysis: ${SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS:k8s-mesh} #receiver_zipkin: # default: # host: ${SW_RECEIVER_ZIPKIN_HOST:0.0.0.0} diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml index ced2d4373..b70ef7144 100644 --- a/oap-server/server-starter/src/main/resources/application.yml +++ b/oap-server/server-starter/src/main/resources/application.yml @@ -105,6 +105,7 @@ istio-telemetry: default: envoy-metric: default: +# alsHTTPAnalysis: ${SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS:k8s-mesh} #receiver_zipkin: # default: # host: ${SW_RECEIVER_ZIPKIN_HOST:0.0.0.0} -- GitLab