From 8da9bf94abb23e34a3c76168abacc8b59a7e5d70 Mon Sep 17 00:00:00 2001 From: kezhenxu94 Date: Mon, 9 Nov 2020 00:01:12 +0800 Subject: [PATCH] ALS analyzer based on Envoy metadata exchange (#5800) --- .github/workflows/docker-ci.yaml | 1 + .github/workflows/e2e.istio.yaml | 8 +- CHANGES.md | 1 + LICENSE | 1 + dist-material/release-docs/LICENSE | 2 + docker/oap/log4j2.xml | 2 +- docs/en/setup/envoy/als_setting.md | 9 +- oap-server/pom.xml | 7 + .../resources/metadata-service-mapping.yaml | 17 + .../envoy-metrics-receiver-plugin/pom.xml | 5 + .../envoy/AccessLogServiceGRPCHandler.java | 19 +- .../receiver/envoy/als/ALSHTTPAnalysis.java | 8 +- .../envoy/als/AbstractALSAnalyzer.java | 64 ++++ .../envoy/als/LogEntry2MetricsAdapter.java | 163 +++++++++ .../receiver/envoy/als/ServiceMetaInfo.java | 30 +- .../k8s/K8sALSServiceMeshHTTPAnalysis.java | 320 +++++------------- .../receiver/envoy/als/mx/FieldsHelper.java | 137 ++++++++ .../als/mx/MetaExchangeALSHTTPAnalyzer.java | 125 +++++++ .../envoy/als/mx/ServiceMetaInfoAdapter.java | 85 +++++ ....server.receiver.envoy.als.ALSHTTPAnalysis | 1 + ...=> K8SALSServiceMeshHTTPAnalysisTest.java} | 52 ++- .../envoy/als/mx/FieldsHelperTest.java | 95 ++++++ .../src/test/resources/field-helper.msg | 90 +++++ .../receiver-proto/pom.xml | 120 ++++++- .../src/main/fbs/istio/node-info.fbs | 51 +++ pom.xml | 4 + .../e2e/retryable/RetryableTest.java | 2 +- .../apache/skywalking/e2e/mesh/IDManager.java | 58 ---- .../src/test/resources/expected/als/topo.yml | 1 + .../known-oap-backend-dependencies-es7.txt | 1 + .../known-oap-backend-dependencies.txt | 1 + 31 files changed, 1107 insertions(+), 373 deletions(-) create mode 100644 oap-server/server-bootstrap/src/main/resources/metadata-service-mapping.yaml create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/FieldsHelper.java create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/MetaExchangeALSHTTPAnalyzer.java create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/ServiceMetaInfoAdapter.java rename oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/{K8sHTTPAnalysisTest.java => K8SALSServiceMeshHTTPAnalysisTest.java} (76%) create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/FieldsHelperTest.java create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/field-helper.msg create mode 100644 oap-server/server-receiver-plugin/receiver-proto/src/main/fbs/istio/node-info.fbs delete mode 100644 test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/mesh/IDManager.java diff --git a/.github/workflows/docker-ci.yaml b/.github/workflows/docker-ci.yaml index 49f1ffc3a8..60834cd198 100644 --- a/.github/workflows/docker-ci.yaml +++ b/.github/workflows/docker-ci.yaml @@ -31,6 +31,7 @@ jobs: runs-on: ubuntu-16.04 timeout-minutes: 90 strategy: + fail-fast: true matrix: es: [es6, es7] steps: diff --git a/.github/workflows/e2e.istio.yaml b/.github/workflows/e2e.istio.yaml index 0ff13c06d5..4d24cee0dd 100644 --- a/.github/workflows/e2e.istio.yaml +++ b/.github/workflows/e2e.istio.yaml @@ -37,7 +37,11 @@ jobs: als: runs-on: ubuntu-16.04 timeout-minutes: 60 - name: Istio+Envoy Access Log Service + strategy: + fail-fast: true + matrix: + analyzer: [k8s-mesh, mx-mesh] + name: Istio+ALS(${{ matrix.analyzer }}) steps: - uses: actions/checkout@v2 with: @@ -73,7 +77,7 @@ jobs: --set elasticsearch.replicas=1 \ --set elasticsearch.minimumMasterNodes=1 \ --set elasticsearch.imageTag=7.5.1 \ - --set oap.env.SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS=k8s-mesh \ + --set oap.env.SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS=${{ matrix.analyzer }} \ --set oap.envoy.als.enabled=true \ --set oap.replicas=1 \ --set ui.image.repository=skywalking/ui \ diff --git a/CHANGES.md b/CHANGES.md index 42eb2f8c97..46e4279791 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -23,6 +23,7 @@ Release Notes. * Add the `@SuperDataset` annotation for BrowserErrorLog. * Add the thread pool to the Kafka fetcher to increase the performance. * Add `contain` and `not contain` OPS in OAL. +* Add Envoy ALS analyzer based on metadata exchange. * Support keeping collecting the slowly segments in the sampling mechanism. * Support choose files to active the meter analyzer. * Improve Kubernetes service registry for ALS analysis. diff --git a/LICENSE b/LICENSE index b3f1f516ad..cb1efe582a 100644 --- a/LICENSE +++ b/LICENSE @@ -221,6 +221,7 @@ The text of each license is the standard Apache 2.0 license. proto files from prometheus/client_model: https://github.com/prometheus/client_model Apache 2.0 proto files from lyft/protoc-gen-validate: https://github.com/lyft/protoc-gen-validate Apache 2.0 proto files from gogo/googleapis: https://github.com/gogo/googleapis Apache 2.0 + flatbuffers files from istio/proxy: https://github.com/istio/proxy Apache 2.0 mvnw files from https://github.com/takari/maven-wrapper Apache 2.0 svg files from skywalking-ui/src/assets/icons: https://github.com/google/material-design-icons Apache 2.0 diff --git a/dist-material/release-docs/LICENSE b/dist-material/release-docs/LICENSE index b5d3a524ee..c390f00da4 100755 --- a/dist-material/release-docs/LICENSE +++ b/dist-material/release-docs/LICENSE @@ -232,6 +232,7 @@ The text of each license is the standard Apache 2.0 license. Google: gson 2.8.6: https://github.com/google/gson , Apache 2.0 Google: proto-google-common-protos 1.17.0: https://github.com/googleapis/googleapis , Apache 2.0 Google: jsr305 3.0.2: http://central.maven.org/maven2/com/google/code/findbugs/jsr305/3.0.0/jsr305-3.0.0.pom , Apache 2.0 + Google: flatbuffers-java 1.12.0: https://github.com/google/flatbuffers/ , Apache 2.0 Elasticsearch BV (Elasticsearch) 6.3.2: https://www.elastic.co/products/elasticsearch , Apache 2.0 Elasticsearch BV (Elasticsearch) 7.5.0: https://www.elastic.co/products/elasticsearch , Apache 2.0 aggs-matrix-stats-client 6.3.2, 7.5.0: https://github.com/elastic/elasticsearch/tree/master/modules/aggs-matrix-stats Apache 2.0 @@ -319,6 +320,7 @@ The text of each license is the standard Apache 2.0 license. proto files from prometheus/client_model: https://github.com/prometheus/client_model Apache 2.0 proto files from lyft/protoc-gen-validate: https://github.com/lyft/protoc-gen-validate Apache 2.0 proto files from gogo/googleapis: https://github.com/gogo/googleapis Apache 2.0 + flatbuffers files from istio/proxy: https://github.com/istio/proxy Apache 2.0 json-flatter 0.6.0: https://github.com/wnameless/json-flattener Apache 2.0 Apache: commons-text 1.4: https://github.com/apache/commons-text Apache 2.0 sundrio 0.9.2: https://github.com/sundrio/sundrio Apache 2.0 diff --git a/docker/oap/log4j2.xml b/docker/oap/log4j2.xml index fadfaa1a21..28994ea966 100644 --- a/docker/oap/log4j2.xml +++ b/docker/oap/log4j2.xml @@ -29,7 +29,7 @@ - + diff --git a/docs/en/setup/envoy/als_setting.md b/docs/en/setup/envoy/als_setting.md index 5c02dbd0c3..7a5bb3e347 100644 --- a/docs/en/setup/envoy/als_setting.md +++ b/docs/en/setup/envoy/als_setting.md @@ -19,7 +19,14 @@ You need three steps to open ALS. Note: SkyWalking OAP service is at skywalking namespace, and the port of gRPC service is 11800 2. (Default is ACTIVATED) Activate SkyWalking [envoy receiver](../backend/backend-receivers.md). -3. Active ALS k8s-mesh analysis, set system env variable `SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS`=`k8s-mesh` +3. Active ALS analyzer, there are two available analyzers, `k8s-mesh` and `mx-mesh`, +`k8s-mesh` uses the metadata from Kubernetes cluster, hence in this analyzer OAP needs access roles to `Pod`, `Service`, and `Endpoints`; +`mx-mesh` uses the Envoy metadata exchange mechanism to get the service name, etc., +this analyzer requires Istio to enable the metadata exchange filter(you can enable it by +`--set telemetry.v2.enabled=true`, or if you're using Istio 1.7+ and installing it with profile `demo`/`preview`, +it should be enabled then). +Setting system env variable **SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS** to activate the analyzer, +such as `SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS=k8s-mesh`. ```yaml envoy-metric: selector: ${SW_ENVOY_METRIC:default} diff --git a/oap-server/pom.xml b/oap-server/pom.xml index 90185e9c76..319bc2c347 100755 --- a/oap-server/pom.xml +++ b/oap-server/pom.xml @@ -100,6 +100,7 @@ 2.4.1 2.4.6.RELEASE 1.9.4 + 1.12.0 @@ -554,6 +555,12 @@ + + + com.google.flatbuffers + flatbuffers-java + ${flatbuffers-java.version} + diff --git a/oap-server/server-bootstrap/src/main/resources/metadata-service-mapping.yaml b/oap-server/server-bootstrap/src/main/resources/metadata-service-mapping.yaml new file mode 100644 index 0000000000..3526f66399 --- /dev/null +++ b/oap-server/server-bootstrap/src/main/resources/metadata-service-mapping.yaml @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +serviceName: ${LABELS.app} +serviceInstanceName: ${NAME} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml index 5e2428fd0d..5d46fd3f34 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml @@ -61,5 +61,10 @@ ${org.apache.tomcat.annotations-api.version} provided + + + com.google.flatbuffers + flatbuffers-java + diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java index b715ee3208..4b8b79d812 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java @@ -25,10 +25,10 @@ import io.grpc.stub.StreamObserver; import java.util.ArrayList; import java.util.List; import java.util.ServiceLoader; -import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.source.Source; -import org.apache.skywalking.oap.server.core.source.SourceReceiver; +import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher; +import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.module.ModuleStartException; import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis; import org.apache.skywalking.oap.server.receiver.envoy.als.Role; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; @@ -42,18 +42,18 @@ import org.slf4j.LoggerFactory; public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogServiceImplBase { private static final Logger LOGGER = LoggerFactory.getLogger(AccessLogServiceGRPCHandler.class); private final List envoyHTTPAnalysisList; - private final SourceReceiver sourceReceiver; + private final CounterMetrics counter; private final HistogramMetrics histogram; private final CounterMetrics sourceDispatcherCounter; - public AccessLogServiceGRPCHandler(ModuleManager manager, EnvoyMetricReceiverConfig config) { + public AccessLogServiceGRPCHandler(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException { ServiceLoader alshttpAnalyses = ServiceLoader.load(ALSHTTPAnalysis.class); envoyHTTPAnalysisList = new ArrayList<>(); for (String httpAnalysisName : config.getAlsHTTPAnalysis()) { for (ALSHTTPAnalysis httpAnalysis : alshttpAnalyses) { if (httpAnalysisName.equals(httpAnalysis.name())) { - httpAnalysis.init(config); + httpAnalysis.init(manager, config); envoyHTTPAnalysisList.add(httpAnalysis); } } @@ -61,8 +61,6 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS LOGGER.debug("envoy HTTP analysis: " + envoyHTTPAnalysisList); - sourceReceiver = manager.find(CoreModule.NAME).provider().getService(SourceReceiver.class); - MetricsCreator metricCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class); counter = metricCreator.createCounter("envoy_als_in_count", "The count of envoy ALS metric received", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE); histogram = metricCreator.createHistogramMetric("envoy_als_in_latency", "The process latency of service ALS metric receiver", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE); @@ -103,7 +101,7 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS case HTTP_LOGS: StreamAccessLogsMessage.HTTPAccessLogEntries logs = message.getHttpLogs(); - List sourceResult = new ArrayList<>(); + List sourceResult = new ArrayList<>(); for (ALSHTTPAnalysis analysis : envoyHTTPAnalysisList) { logs.getLogEntryList().forEach(log -> { sourceResult.addAll(analysis.analysis(identifier, log, role)); @@ -111,7 +109,8 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS } sourceDispatcherCounter.inc(sourceResult.size()); - sourceResult.forEach(sourceReceiver::receive); + sourceResult.forEach(TelemetryDataDispatcher::process); + break; } } finally { timer.finish(); diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java index 4297f3d631..ae75c327e9 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java @@ -21,7 +21,9 @@ package org.apache.skywalking.oap.server.receiver.envoy.als; import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry; import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage; import java.util.List; -import org.apache.skywalking.oap.server.core.source.Source; +import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.module.ModuleStartException; import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig; /** @@ -30,9 +32,9 @@ import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig public interface ALSHTTPAnalysis { String name(); - void init(EnvoyMetricReceiverConfig config); + void init(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException; - List analysis(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry, Role role); + List analysis(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry, Role role); Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, Role prev); } diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java new file mode 100644 index 0000000000..fae8cd1d52 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als; + +import io.envoyproxy.envoy.api.v2.core.Node; +import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry; +import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; + +@Slf4j +public abstract class AbstractALSAnalyzer implements ALSHTTPAnalysis { + + @Override + public Role identify(final StreamAccessLogsMessage.Identifier alsIdentifier, final Role defaultRole) { + if (alsIdentifier == null) { + return defaultRole; + } + final Node node = alsIdentifier.getNode(); + if (node == null) { + return defaultRole; + } + final String id = node.getId(); + if (id.startsWith("router~")) { + return Role.PROXY; + } else if (id.startsWith("sidecar~")) { + return Role.SIDECAR; + } + return defaultRole; + } + + /** + * Create an adapter to adapt the {@link HTTPAccessLogEntry log entry} into a {@link ServiceMeshMetric.Builder}. + * + * @param entry the access log entry that is to be adapted from. + * @param sourceService the source service. + * @param targetService the target/destination service. + * @return an adapter that adapts {@link HTTPAccessLogEntry log entry} into a {@link ServiceMeshMetric.Builder}. + */ + protected LogEntry2MetricsAdapter newAdapter( + final HTTPAccessLogEntry entry, + final ServiceMetaInfo sourceService, + final ServiceMetaInfo targetService + ) { + return new LogEntry2MetricsAdapter(entry, sourceService, targetService); + } + +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java new file mode 100644 index 0000000000..de05c7a268 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als; + +import com.google.protobuf.Duration; +import com.google.protobuf.Timestamp; +import com.google.protobuf.UInt32Value; +import io.envoyproxy.envoy.data.accesslog.v2.AccessLogCommon; +import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry; +import io.envoyproxy.envoy.data.accesslog.v2.HTTPRequestProperties; +import io.envoyproxy.envoy.data.accesslog.v2.HTTPResponseProperties; +import io.envoyproxy.envoy.data.accesslog.v2.TLSProperties; +import java.time.Instant; +import java.util.Optional; +import lombok.RequiredArgsConstructor; +import org.apache.skywalking.apm.network.common.v3.DetectPoint; +import org.apache.skywalking.apm.network.servicemesh.v3.Protocol; +import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; + +import static com.google.common.base.Strings.isNullOrEmpty; +import static java.util.Optional.ofNullable; + +/** + * Adapt {@link HTTPAccessLogEntry} objects to {@link ServiceMeshMetric} builders. + */ +@RequiredArgsConstructor +public class LogEntry2MetricsAdapter { + + public static final String NON_TLS = "NONE"; + + public static final String M_TLS = "mTLS"; + + public static final String TLS = "TLS"; + + /** + * The access log entry that is to be adapted into metrics builders. + */ + private final HTTPAccessLogEntry entry; + + private final ServiceMetaInfo sourceService; + + private final ServiceMetaInfo targetService; + + /** + * Adapt the {@code entry} into a downstream metrics {@link ServiceMeshMetric.Builder}. + * + * @return the {@link ServiceMeshMetric.Builder} adapted from the given entry. + */ + public ServiceMeshMetric.Builder adaptToDownstreamMetrics() { + final AccessLogCommon properties = entry.getCommonProperties(); + final long startTime = formatAsLong(properties.getStartTime()); + final long duration = formatAsLong(properties.getTimeToLastDownstreamTxByte()); + + return adaptCommonPart() + .setStartTime(startTime) + .setEndTime(startTime + duration) + .setLatency((int) Math.max(1L, duration)) + .setDetectPoint(DetectPoint.server); + } + + /** + * Adapt the {@code entry} into a upstream metrics {@link ServiceMeshMetric.Builder}. + * + * @return the {@link ServiceMeshMetric.Builder} adapted from the given entry. + */ + public ServiceMeshMetric.Builder adaptToUpstreamMetrics() { + final AccessLogCommon properties = entry.getCommonProperties(); + final long startTime = formatAsLong(properties.getStartTime()); + final long outboundStartTime = startTime + formatAsLong(properties.getTimeToFirstUpstreamTxByte()); + final long outboundEndTime = startTime + formatAsLong(properties.getTimeToLastUpstreamRxByte()); + + return adaptCommonPart() + .setStartTime(outboundStartTime) + .setEndTime(outboundEndTime) + .setLatency((int) Math.max(1L, outboundEndTime - outboundStartTime)) + .setDetectPoint(DetectPoint.client); + } + + protected ServiceMeshMetric.Builder adaptCommonPart() { + final AccessLogCommon properties = entry.getCommonProperties(); + final String endpoint = ofNullable(entry.getRequest()).map(HTTPRequestProperties::getPath).orElse("/"); + final int responseCode = ofNullable(entry.getResponse()).map(HTTPResponseProperties::getResponseCode).map(UInt32Value::getValue).orElse(200); + final boolean status = responseCode >= 200 && responseCode < 400; + final Protocol protocol = requestProtocol(entry.getRequest()); + final String tlsMode = parseTLS(properties.getTlsProperties()); + + final ServiceMeshMetric.Builder builder = + ServiceMeshMetric.newBuilder() + .setEndpoint(endpoint) + .setResponseCode(Math.toIntExact(responseCode)) + .setStatus(status) + .setProtocol(protocol) + .setTlsMode(tlsMode); + + Optional.ofNullable(sourceService) + .map(ServiceMetaInfo::getServiceName) + .ifPresent(builder::setSourceServiceName); + Optional.ofNullable(sourceService) + .map(ServiceMetaInfo::getServiceInstanceName) + .ifPresent(builder::setSourceServiceInstance); + Optional.ofNullable(targetService) + .map(ServiceMetaInfo::getServiceName) + .ifPresent(builder::setDestServiceName); + Optional.ofNullable(targetService) + .map(ServiceMetaInfo::getServiceInstanceName) + .ifPresent(builder::setDestServiceInstance); + + return builder; + } + + protected static long formatAsLong(final Timestamp timestamp) { + return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos()).toEpochMilli(); + } + + protected static long formatAsLong(final Duration duration) { + return Instant.ofEpochSecond(duration.getSeconds(), duration.getNanos()).toEpochMilli(); + } + + protected static Protocol requestProtocol(final HTTPRequestProperties request) { + if (request == null) { + return Protocol.HTTP; + } + final String scheme = request.getScheme(); + if (scheme.startsWith("http")) { + return Protocol.HTTP; + } + return Protocol.gRPC; + } + + protected static String parseTLS(final TLSProperties properties) { + if (properties == null) { + return NON_TLS; + } + if (isNullOrEmpty(Optional.ofNullable(properties.getLocalCertificateProperties()) + .orElse(TLSProperties.CertificateProperties.newBuilder().build()) + .getSubject())) { + return NON_TLS; + } + if (isNullOrEmpty(Optional.ofNullable(properties.getPeerCertificateProperties()) + .orElse(TLSProperties.CertificateProperties.newBuilder().build()) + .getSubject())) { + return TLS; + } + return M_TLS; + } + +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java index 41ba0e59da..75a9c3c7cb 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java @@ -19,8 +19,9 @@ package org.apache.skywalking.oap.server.receiver.envoy.als; import java.util.List; -import java.util.Objects; +import lombok.EqualsAndHashCode; import lombok.Getter; +import lombok.NoArgsConstructor; import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.ToString; @@ -28,13 +29,16 @@ import lombok.ToString; @Getter @Setter @ToString +@NoArgsConstructor +@EqualsAndHashCode(onlyExplicitlyIncluded = true) public class ServiceMetaInfo { + @EqualsAndHashCode.Include private String serviceName; + + @EqualsAndHashCode.Include private String serviceInstanceName; - private List tags; - public ServiceMetaInfo() { - } + private List tags; public ServiceMetaInfo(String serviceName, String serviceInstanceName) { this.serviceName = serviceName; @@ -43,26 +47,12 @@ public class ServiceMetaInfo { @Setter @Getter - @RequiredArgsConstructor @ToString + @RequiredArgsConstructor public static class KeyValue { private final String key; - private final String value; - } - @Override - public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - ServiceMetaInfo info = (ServiceMetaInfo) o; - return Objects.equals(serviceName, info.serviceName) && Objects.equals(serviceInstanceName, info.serviceInstanceName); - } - - @Override - public int hashCode() { - return Objects.hash(serviceName, serviceInstanceName); + private final String value; } public static final ServiceMetaInfo UNKNOWN = new ServiceMetaInfo("UNKNOWN", "UNKNOWN"); diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java index f06d941578..0e2e0682f5 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java @@ -18,46 +18,30 @@ package org.apache.skywalking.oap.server.receiver.envoy.als.k8s; -import com.google.common.base.Strings; -import com.google.protobuf.Duration; -import com.google.protobuf.Timestamp; import io.envoyproxy.envoy.api.v2.core.Address; -import io.envoyproxy.envoy.api.v2.core.Node; import io.envoyproxy.envoy.api.v2.core.SocketAddress; import io.envoyproxy.envoy.data.accesslog.v2.AccessLogCommon; import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry; -import io.envoyproxy.envoy.data.accesslog.v2.HTTPRequestProperties; -import io.envoyproxy.envoy.data.accesslog.v2.HTTPResponseProperties; -import io.envoyproxy.envoy.data.accesslog.v2.TLSProperties; import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage; -import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Optional; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher; -import org.apache.skywalking.apm.network.common.v3.DetectPoint; -import org.apache.skywalking.apm.network.servicemesh.v3.Protocol; import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; -import org.apache.skywalking.oap.server.core.source.Source; +import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig; -import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis; +import org.apache.skywalking.oap.server.receiver.envoy.als.AbstractALSAnalyzer; import org.apache.skywalking.oap.server.receiver.envoy.als.Role; import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo; +import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.NON_TLS; + /** * Analysis log based on ingress and mesh scenarios. */ @Slf4j -public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis { - private static final String NON_TLS = "NONE"; - - private static final String M_TLS = "mTLS"; - - private static final String TLS = "TLS"; - +public class K8sALSServiceMeshHTTPAnalysis extends AbstractALSAnalyzer { protected K8SServiceRegistry serviceRegistry; @Override @@ -67,250 +51,113 @@ public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis { @Override @SneakyThrows - public void init(EnvoyMetricReceiverConfig config) { + public void init(ModuleManager manager, EnvoyMetricReceiverConfig config) { serviceRegistry = new K8SServiceRegistry(config); serviceRegistry.start(); } @Override - public List analysis(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry, Role role) { + public List analysis(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry, Role role) { if (serviceRegistry.isEmpty()) { return Collections.emptyList(); } switch (role) { case PROXY: - analysisProxy(identifier, entry); - break; + return analyzeProxy(entry); case SIDECAR: - return analysisSideCar(identifier, entry); + return analyzeSideCar(entry); } return Collections.emptyList(); } - protected List analysisSideCar(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry) { - List sources = new ArrayList<>(); - AccessLogCommon properties = entry.getCommonProperties(); - if (properties != null) { - String cluster = properties.getUpstreamCluster(); - if (cluster != null) { - long startTime = formatAsLong(properties.getStartTime()); - long duration = formatAsLong(properties.getTimeToLastDownstreamTxByte()); - - HTTPRequestProperties request = entry.getRequest(); - String endpoint = "/"; - Protocol protocol = Protocol.HTTP; - if (request != null) { - endpoint = request.getPath(); - String schema = request.getScheme(); - if ("http".equals(schema) || "https".equals(schema)) { - protocol = Protocol.HTTP; - } else { - protocol = Protocol.gRPC; - } - } - HTTPResponseProperties response = entry.getResponse(); - int responseCode = 200; - if (response != null) { - responseCode = response.getResponseCode().getValue(); - } - boolean status = responseCode >= 200 && responseCode < 400; - - Address downstreamRemoteAddress = properties.getDownstreamRemoteAddress(); - ServiceMetaInfo downstreamService = find(downstreamRemoteAddress.getSocketAddress().getAddress()); - Address downstreamLocalAddress = properties.getDownstreamLocalAddress(); - ServiceMetaInfo localService = find(downstreamLocalAddress.getSocketAddress().getAddress()); - String tlsMode = parseTLS(properties.getTlsProperties()); - - ServiceMeshMetric.Builder metric = null; - if (cluster.startsWith("inbound|")) { - // Server side - if (downstreamService.equals(ServiceMetaInfo.UNKNOWN)) { - // Ingress -> sidecar(server side) - // Mesh telemetry without source, the relation would be generated. - metric = ServiceMeshMetric.newBuilder() - .setStartTime(startTime) - .setEndTime(startTime + duration) - .setDestServiceName(localService.getServiceName()) - .setDestServiceInstance(localService.getServiceInstanceName()) - .setEndpoint(endpoint) - .setLatency((int) duration) - .setResponseCode(Math.toIntExact(responseCode)) - .setStatus(status) - .setProtocol(protocol) - .setTlsMode(tlsMode) - .setDetectPoint(DetectPoint.server); - - log.debug("Transformed ingress->sidecar inbound mesh metric {}", metric); - } else { - // sidecar -> sidecar(server side) - metric = ServiceMeshMetric.newBuilder() - .setStartTime(startTime) - .setEndTime(startTime + duration) - .setSourceServiceName(downstreamService.getServiceName()) - .setSourceServiceInstance(downstreamService.getServiceInstanceName()) - .setDestServiceName(localService.getServiceName()) - .setDestServiceInstance(localService.getServiceInstanceName()) - .setEndpoint(endpoint) - .setLatency((int) duration) - .setResponseCode(Math.toIntExact(responseCode)) - .setStatus(status) - .setProtocol(protocol) - .setTlsMode(tlsMode) - .setDetectPoint(DetectPoint.server); - - log.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric); - } - } else if (cluster.startsWith("outbound|")) { - // sidecar(client side) -> sidecar - Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress(); - ServiceMetaInfo destService = find(upstreamRemoteAddress.getSocketAddress().getAddress()); + protected List analyzeSideCar(final HTTPAccessLogEntry entry) { + final AccessLogCommon properties = entry.getCommonProperties(); + if (properties == null) { + return Collections.emptyList(); + } + final String cluster = properties.getUpstreamCluster(); + if (cluster == null) { + return Collections.emptyList(); + } - metric = ServiceMeshMetric.newBuilder() - .setStartTime(startTime) - .setEndTime(startTime + duration) - .setSourceServiceName(downstreamService.getServiceName()) - .setSourceServiceInstance(downstreamService.getServiceInstanceName()) - .setDestServiceName(destService.getServiceName()) - .setDestServiceInstance(destService.getServiceInstanceName()) - .setEndpoint(endpoint) - .setLatency((int) duration) - .setResponseCode(Math.toIntExact(responseCode)) - .setStatus(status) - .setProtocol(protocol) - .setTlsMode(tlsMode) - .setDetectPoint(DetectPoint.client); + final List sources = new ArrayList<>(); + + final Address downstreamRemoteAddress = + properties.hasDownstreamDirectRemoteAddress() + ? properties.getDownstreamDirectRemoteAddress() + : properties.getDownstreamRemoteAddress(); + final ServiceMetaInfo downstreamService = find(downstreamRemoteAddress.getSocketAddress().getAddress()); + final Address downstreamLocalAddress = properties.getDownstreamLocalAddress(); + final ServiceMetaInfo localService = find(downstreamLocalAddress.getSocketAddress().getAddress()); + + if (cluster.startsWith("inbound|")) { + // Server side + final ServiceMeshMetric.Builder metrics; + if (downstreamService.equals(ServiceMetaInfo.UNKNOWN)) { + // Ingress -> sidecar(server side) + // Mesh telemetry without source, the relation would be generated. + metrics = newAdapter(entry, null, localService).adaptToDownstreamMetrics(); + + log.debug("Transformed ingress->sidecar inbound mesh metrics {}", metrics); + } else { + // sidecar -> sidecar(server side) + metrics = newAdapter(entry, downstreamService, localService).adaptToDownstreamMetrics(); + + log.debug("Transformed sidecar->sidecar(server side) inbound mesh metrics {}", metrics); + } + sources.add(metrics); + } else if (cluster.startsWith("outbound|")) { + // sidecar(client side) -> sidecar + final Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress(); + final ServiceMetaInfo destService = find(upstreamRemoteAddress.getSocketAddress().getAddress()); - log.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric); - } + final ServiceMeshMetric.Builder metric = newAdapter(entry, downstreamService, destService).adaptToUpstreamMetrics(); - Optional.ofNullable(metric).ifPresent(this::forward); - } + log.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric); + sources.add(metric); } + return sources; } - private String parseTLS(TLSProperties properties) { + protected List analyzeProxy(final HTTPAccessLogEntry entry) { + final AccessLogCommon properties = entry.getCommonProperties(); if (properties == null) { - return NON_TLS; - } - if (Strings.isNullOrEmpty(Optional.ofNullable(properties.getLocalCertificateProperties()) - .orElse(TLSProperties.CertificateProperties.newBuilder().build()).getSubject())) { - return NON_TLS; + return Collections.emptyList(); } - if (Strings.isNullOrEmpty(Optional.ofNullable(properties.getPeerCertificateProperties()) - .orElse(TLSProperties.CertificateProperties.newBuilder().build()).getSubject())) { - return TLS; + final Address downstreamLocalAddress = properties.getDownstreamLocalAddress(); + final Address downstreamRemoteAddress = properties.getDownstreamRemoteAddress(); + final Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress(); + if (downstreamLocalAddress == null || downstreamRemoteAddress == null || upstreamRemoteAddress == null) { + return Collections.emptyList(); } - return M_TLS; - } - - protected void analysisProxy(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry) { - AccessLogCommon properties = entry.getCommonProperties(); - if (properties != null) { - Address downstreamLocalAddress = properties.getDownstreamLocalAddress(); - Address downstreamRemoteAddress = properties.getDownstreamRemoteAddress(); - Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress(); - if (downstreamLocalAddress != null && downstreamRemoteAddress != null && upstreamRemoteAddress != null) { - SocketAddress downstreamRemoteAddressSocketAddress = downstreamRemoteAddress.getSocketAddress(); - ServiceMetaInfo outside = find(downstreamRemoteAddressSocketAddress.getAddress()); - - SocketAddress downstreamLocalAddressSocketAddress = downstreamLocalAddress.getSocketAddress(); - ServiceMetaInfo ingress = find(downstreamLocalAddressSocketAddress.getAddress()); - long startTime = formatAsLong(properties.getStartTime()); - long duration = formatAsLong(properties.getTimeToLastDownstreamTxByte()); + final List result = new ArrayList<>(2); + final SocketAddress downstreamRemoteAddressSocketAddress = downstreamRemoteAddress.getSocketAddress(); + final ServiceMetaInfo outside = find(downstreamRemoteAddressSocketAddress.getAddress()); - HTTPRequestProperties request = entry.getRequest(); - String endpoint = "/"; - Protocol protocol = Protocol.HTTP; - if (request != null) { - endpoint = request.getPath(); - String schema = request.getScheme(); - if ("http".equals(schema) || "https".equals(schema)) { - protocol = Protocol.HTTP; - } else { - protocol = Protocol.gRPC; - } - } - HTTPResponseProperties response = entry.getResponse(); - int responseCode = 200; - if (response != null) { - responseCode = response.getResponseCode().getValue(); - } - boolean status = responseCode >= 200 && responseCode < 400; - String tlsMode = parseTLS(properties.getTlsProperties()); + final SocketAddress downstreamLocalAddressSocketAddress = downstreamLocalAddress.getSocketAddress(); + final ServiceMetaInfo ingress = find(downstreamLocalAddressSocketAddress.getAddress()); - ServiceMeshMetric.Builder metric = ServiceMeshMetric.newBuilder() - .setStartTime(startTime) - .setEndTime(startTime + duration) - .setSourceServiceName(outside.getServiceName()) - .setSourceServiceInstance( - outside.getServiceInstanceName()) - .setDestServiceName(ingress.getServiceName()) - .setDestServiceInstance( - ingress.getServiceInstanceName()) - .setEndpoint(endpoint) - .setLatency((int) duration) - .setResponseCode(Math.toIntExact(responseCode)) - .setStatus(status) - .setProtocol(protocol) - .setTlsMode(tlsMode) - .setDetectPoint(DetectPoint.server); + final ServiceMeshMetric.Builder metric = newAdapter(entry, outside, ingress).adaptToDownstreamMetrics(); - log.debug("Transformed ingress inbound mesh metric {}", metric); - forward(metric); + log.debug("Transformed ingress inbound mesh metric {}", metric); + result.add(metric); - SocketAddress upstreamRemoteAddressSocketAddress = upstreamRemoteAddress.getSocketAddress(); - ServiceMetaInfo targetService = find(upstreamRemoteAddressSocketAddress.getAddress()); + final SocketAddress upstreamRemoteAddressSocketAddress = upstreamRemoteAddress.getSocketAddress(); + final ServiceMetaInfo targetService = find(upstreamRemoteAddressSocketAddress.getAddress()); - long outboundStartTime = startTime + formatAsLong(properties.getTimeToFirstUpstreamTxByte()); - long outboundEndTime = startTime + formatAsLong(properties.getTimeToLastUpstreamRxByte()); - - ServiceMeshMetric.Builder outboundMetric = ServiceMeshMetric.newBuilder() - .setStartTime(outboundStartTime) - .setEndTime(outboundEndTime) - .setSourceServiceName( - ingress.getServiceName()) - .setSourceServiceInstance( - ingress.getServiceInstanceName()) - .setDestServiceName( - targetService.getServiceName()) - .setDestServiceInstance( - targetService.getServiceInstanceName()) - .setEndpoint(endpoint) - .setLatency( - (int) (outboundEndTime - outboundStartTime)) - .setResponseCode( - Math.toIntExact(responseCode)) - .setStatus(status) - .setProtocol(protocol) - // Can't parse it from tls properties, leave - // it to Server side. - .setTlsMode(NON_TLS) - .setDetectPoint(DetectPoint.client); - - log.debug("Transformed ingress outbound mesh metric {}", outboundMetric); - forward(outboundMetric); - } - } - } + final ServiceMeshMetric.Builder outboundMetric = + newAdapter(entry, ingress, targetService) + .adaptToUpstreamMetrics() + // Can't parse it from tls properties, leave it to Server side. + .setTlsMode(NON_TLS); - @Override - public Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, Role prev) { - if (alsIdentifier != null) { - Node node = alsIdentifier.getNode(); - if (node != null) { - String id = node.getId(); - if (id.startsWith("router~")) { - return Role.PROXY; - } else if (id.startsWith("sidecar~")) { - return Role.SIDECAR; - } - } - } + log.debug("Transformed ingress outbound mesh metric {}", outboundMetric); + result.add(outboundMetric); - return prev; + return result; } /** @@ -320,15 +167,4 @@ public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis { return serviceRegistry.findService(ip); } - protected void forward(ServiceMeshMetric.Builder metric) { - TelemetryDataDispatcher.process(metric); - } - - private long formatAsLong(Timestamp timestamp) { - return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos()).toEpochMilli(); - } - - private long formatAsLong(Duration duration) { - return Instant.ofEpochSecond(duration.getSeconds(), duration.getNanos()).toEpochMilli(); - } } diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/FieldsHelper.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/FieldsHelper.java new file mode 100644 index 0000000000..1ef5c45510 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/FieldsHelper.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als.mx; + +import com.google.common.base.Splitter; +import com.google.common.base.Strings; +import com.google.common.reflect.Invokable; +import com.google.common.reflect.TypeToken; +import com.google.protobuf.Struct; +import com.google.protobuf.Value; +import java.io.InputStream; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.skywalking.oap.server.library.module.ModuleStartException; +import org.apache.skywalking.oap.server.library.util.ResourceUtils; +import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo; +import org.yaml.snakeyaml.Yaml; + +@Slf4j +@SuppressWarnings("UnstableApiUsage") +enum FieldsHelper { + SINGLETON; + + private boolean initialized = false; + + /** + * The mappings from the field name of {@link ServiceMetaInfo} to the field name of {@code flatbuffers}. + */ + private Map fieldNameMapping; + + /** + * The mappings from the field name of {@link ServiceMetaInfo} to its {@code setter}. + */ + private Map> fieldSetterMapping; + + public void init(final String file) throws Exception { + init(ResourceUtils.readToStream(file)); + } + + @SuppressWarnings("unchecked") + void init(final InputStream inputStream) throws ModuleStartException { + if (initialized) { + return; + } + final Yaml yaml = new Yaml(); + final Map config = (Map) yaml.load(inputStream); + + fieldNameMapping = new HashMap<>(config.size()); + fieldSetterMapping = new HashMap<>(config.size()); + + for (final Map.Entry entry : config.entrySet()) { + final String serviceMetaInfoFieldName = entry.getKey(); + final String flatBuffersFieldName = entry.getValue(); + + final Pattern p = Pattern.compile("(\\$\\{(?.+?)})"); + final Matcher m = p.matcher(flatBuffersFieldName); + final List> flatBuffersFieldNames = new ArrayList<>(m.groupCount()); + final StringBuffer serviceNamePattern = new StringBuffer(); + while (m.find()) { + final String property = m.group("property"); + flatBuffersFieldNames.add(Splitter.on('.').omitEmptyStrings().splitToList(property)); + m.appendReplacement(serviceNamePattern, "%s"); + } + + fieldNameMapping.put( + serviceMetaInfoFieldName, + new ServiceNameFormat(serviceNamePattern.toString(), flatBuffersFieldNames) + ); + + try { + final Method setterMethod = ServiceMetaInfo.class.getMethod("set" + StringUtils.capitalize(serviceMetaInfoFieldName), String.class); + final Invokable setter = new TypeToken() { + }.method(setterMethod); + setter.setAccessible(true); + fieldSetterMapping.put(serviceMetaInfoFieldName, setter); + } catch (final NoSuchMethodException e) { + throw new ModuleStartException("Initialize method error", e); + } + } + initialized = true; + } + + /** + * Inflates the {@code serviceMetaInfo} with the given {@link Struct struct}. + * + * @param metadata the {@link Struct} metadata from where to retrieve and inflate the {@code serviceMetaInfo}. + * @param serviceMetaInfo the {@code serviceMetaInfo} to be inflated. + * @throws Exception if failed to inflate the {@code serviceMetaInfo} + */ + public void inflate(final Struct metadata, final ServiceMetaInfo serviceMetaInfo) throws Exception { + final Value root = Value.newBuilder().setStructValue(metadata).build(); + for (final Map.Entry entry : fieldNameMapping.entrySet()) { + final ServiceNameFormat serviceNameFormat = entry.getValue(); + final Object[] values = new String[serviceNameFormat.properties.size()]; + for (int i = 0; i < serviceNameFormat.properties.size(); i++) { + final List properties = serviceNameFormat.properties.get(i); + Value value = root; + for (final String property : properties) { + value = value.getStructValue().getFieldsOrThrow(property); + } + values[i] = value.getStringValue(); + } + fieldSetterMapping.get(entry.getKey()).invoke(serviceMetaInfo, Strings.lenientFormat(serviceNameFormat.format, values)); + } + } + + @RequiredArgsConstructor + private static class ServiceNameFormat { + private final String format; + + private final List> properties; + } +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/MetaExchangeALSHTTPAnalyzer.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/MetaExchangeALSHTTPAnalyzer.java new file mode 100644 index 0000000000..796c23e744 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/MetaExchangeALSHTTPAnalyzer.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als.mx; + +import com.google.protobuf.Any; +import com.google.protobuf.TextFormat; +import io.envoyproxy.envoy.data.accesslog.v2.AccessLogCommon; +import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry; +import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage; +import java.util.ArrayList; +import java.util.Base64; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.module.ModuleStartException; +import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig; +import org.apache.skywalking.oap.server.receiver.envoy.als.AbstractALSAnalyzer; +import org.apache.skywalking.oap.server.receiver.envoy.als.Role; +import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo; + +import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.NON_TLS; +import static org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo.UNKNOWN; + +@Slf4j +public class MetaExchangeALSHTTPAnalyzer extends AbstractALSAnalyzer { + + private static final String UPSTREAM_KEY = "wasm.upstream_peer"; + + private static final String DOWNSTREAM_KEY = "wasm.downstream_peer"; + + @Override + public String name() { + return "mx-mesh"; + } + + @Override + public void init(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException { + try { + FieldsHelper.SINGLETON.init("metadata-service-mapping.yaml"); + } catch (final Exception e) { + throw new ModuleStartException("Failed to load metadata-service-mapping.yaml", e); + } + } + + @Override + public List analysis(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry, Role role) { + final AccessLogCommon properties = entry.getCommonProperties(); + if (properties == null) { + return Collections.emptyList(); + } + final Map stateMap = properties.getFilterStateObjectsMap(); + if (stateMap == null) { + return Collections.emptyList(); + } + final ServiceMetaInfo currSvc; + try { + currSvc = new ServiceMetaInfoAdapter(identifier.getNode().getMetadata()); + } catch (Exception e) { + log.error("Failed to inflate the ServiceMetaInfo from identifier.node.metadata. ", e); + return Collections.emptyList(); + } + + final List result = new ArrayList<>(); + final AtomicBoolean downstreamExists = new AtomicBoolean(); + stateMap.forEach((key, value) -> { + if (!key.equals(UPSTREAM_KEY) && !key.equals(DOWNSTREAM_KEY)) { + return; + } + final ServiceMetaInfo svc; + try { + svc = new ServiceMetaInfoAdapter(value); + } catch (Exception e) { + log.error("Fail to parse metadata {} to FlatNode", Base64.getEncoder().encode(value.toByteArray())); + return; + } + final ServiceMeshMetric.Builder metrics; + switch (key) { + case UPSTREAM_KEY: + metrics = newAdapter(entry, currSvc, svc).adaptToUpstreamMetrics().setTlsMode(NON_TLS); + if (log.isDebugEnabled()) { + log.debug("Transformed a {} outbound mesh metrics {}", role, TextFormat.shortDebugString(metrics)); + } + result.add(metrics); + break; + case DOWNSTREAM_KEY: + metrics = newAdapter(entry, svc, currSvc).adaptToDownstreamMetrics(); + if (log.isDebugEnabled()) { + log.debug("Transformed a {} inbound mesh metrics {}", role, TextFormat.shortDebugString(metrics)); + } + result.add(metrics); + downstreamExists.set(true); + break; + } + }); + if (role.equals(Role.PROXY) && !downstreamExists.get()) { + final ServiceMeshMetric.Builder metric = newAdapter(entry, UNKNOWN, currSvc).adaptToDownstreamMetrics(); + if (log.isDebugEnabled()) { + log.debug("Transformed a {} inbound mesh metric {}", role, TextFormat.shortDebugString(metric)); + } + result.add(metric); + } + return result; + } + +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/ServiceMetaInfoAdapter.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/ServiceMetaInfoAdapter.java new file mode 100644 index 0000000000..c8ca1acd77 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/ServiceMetaInfoAdapter.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als.mx; + +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import com.google.protobuf.Struct; +import java.nio.ByteBuffer; +import java.util.Optional; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo; +import Wasm.Common.FlatNode; +import Wasm.Common.KeyVal; + +import static java.util.Objects.nonNull; +import static java.util.Objects.requireNonNull; + +/** + * Adapter to {@link ServiceMetaInfo} from various of other datastructures. + */ +@Slf4j +@RequiredArgsConstructor +public class ServiceMetaInfoAdapter extends ServiceMetaInfo { + + /** + * Try to adapt a {@link ByteString} to {@link ServiceMetaInfo} instance. + * + * @param bv the {@link ByteString byte string} to adapt from. + * @throws Exception if the {@link ByteString byte string} can not be adapted to a {@link ServiceMetaInfo}. + */ + public ServiceMetaInfoAdapter(final ByteString bv) throws Exception { + final ByteBuffer buffer = ByteBuffer.wrap(BytesValue.parseFrom(bv).getValue().toByteArray()); + final FlatNode flatNode = FlatNode.getRootAsFlatNode(buffer); + if (log.isDebugEnabled()) { + for (int i = 0; i < flatNode.labelsLength(); i++) { + final KeyVal kv = flatNode.labels(i); + if (nonNull(kv)) { + log.debug("wasm label: {} : {}", kv.key(), kv.value()); + } + } + } + + setServiceName(Optional.ofNullable(flatNode.labelsByKey("app")).map(KeyVal::value).orElse("-")); + setServiceInstanceName(flatNode.name()); + } + + /** + * The same functionality with {@link ServiceMetaInfoAdapter#ServiceMetaInfoAdapter(com.google.protobuf.ByteString)}. + * + * @param any {@link Any any object} to adapt from. + * @throws Exception if the {@link Any any object} can not be adapted to a {@link ServiceMetaInfo}. + */ + public ServiceMetaInfoAdapter(final Any any) throws Exception { + this(any.getValue()); + } + + /** + * The same functionality with {@link ServiceMetaInfoAdapter#ServiceMetaInfoAdapter(com.google.protobuf.ByteString)}. + * + * @param metadata the {@link Struct struct} to adapt from. + * @throws Exception if the {@link Struct struct} can not be adapted to a {@link ServiceMetaInfo}. + */ + public ServiceMetaInfoAdapter(final Struct metadata) throws Exception { + FieldsHelper.SINGLETON.inflate(requireNonNull(metadata), this); + } + +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis index 215d7a0496..bb5a7d97e2 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis @@ -18,3 +18,4 @@ org.apache.skywalking.oap.server.receiver.envoy.als.k8s.K8sALSServiceMeshHTTPAnalysis +org.apache.skywalking.oap.server.receiver.envoy.als.mx.MetaExchangeALSHTTPAnalyzer diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sHTTPAnalysisTest.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SALSServiceMeshHTTPAnalysisTest.java similarity index 76% rename from oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sHTTPAnalysisTest.java rename to oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SALSServiceMeshHTTPAnalysisTest.java index 7ad64a5060..718996c2b7 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sHTTPAnalysisTest.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SALSServiceMeshHTTPAnalysisTest.java @@ -23,10 +23,10 @@ import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.util.ArrayList; import java.util.List; import org.apache.skywalking.apm.network.common.v3.DetectPoint; import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; +import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig; import org.apache.skywalking.oap.server.receiver.envoy.MetricServiceGRPCHandlerTestMain; import org.apache.skywalking.oap.server.receiver.envoy.als.Role; @@ -39,14 +39,14 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class K8sHTTPAnalysisTest { +public class K8SALSServiceMeshHTTPAnalysisTest { - private MockK8sAnalysis analysis; + private MockK8SAnalysis analysis; @Before public void setUp() { - analysis = new MockK8sAnalysis(); - analysis.init(null); + analysis = new MockK8SAnalysis(); + analysis.init(null, null); } @Test @@ -77,16 +77,16 @@ public class K8sHTTPAnalysisTest { StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder(); JsonFormat.parser().merge(isr, requestBuilder); - analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.PROXY); + List result = this.analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.PROXY); - Assert.assertEquals(2, analysis.metrics.size()); + Assert.assertEquals(2, result.size()); - ServiceMeshMetric.Builder incoming = analysis.metrics.get(0); + ServiceMeshMetric.Builder incoming = result.get(0); Assert.assertEquals("UNKNOWN", incoming.getSourceServiceName()); Assert.assertEquals("ingress", incoming.getDestServiceName()); Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint()); - ServiceMeshMetric.Builder outgoing = analysis.metrics.get(1); + ServiceMeshMetric.Builder outgoing = result.get(1); Assert.assertEquals("ingress", outgoing.getSourceServiceName()); Assert.assertEquals("productpage", outgoing.getDestServiceName()); Assert.assertEquals(DetectPoint.client, outgoing.getDetectPoint()); @@ -99,12 +99,11 @@ public class K8sHTTPAnalysisTest { StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder(); JsonFormat.parser().merge(isr, requestBuilder); - analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs() - .getLogEntry(0), Role.SIDECAR); + List result = this.analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR); - Assert.assertEquals(1, analysis.metrics.size()); + Assert.assertEquals(1, result.size()); - ServiceMeshMetric.Builder incoming = analysis.metrics.get(0); + ServiceMeshMetric.Builder incoming = result.get(0); Assert.assertEquals("", incoming.getSourceServiceName()); Assert.assertEquals("productpage", incoming.getDestServiceName()); Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint()); @@ -117,12 +116,11 @@ public class K8sHTTPAnalysisTest { StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder(); JsonFormat.parser().merge(isr, requestBuilder); - analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs() - .getLogEntry(0), Role.SIDECAR); + List result = this.analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR); - Assert.assertEquals(1, analysis.metrics.size()); + Assert.assertEquals(1, result.size()); - ServiceMeshMetric.Builder incoming = analysis.metrics.get(0); + ServiceMeshMetric.Builder incoming = result.get(0); Assert.assertEquals("productpage", incoming.getSourceServiceName()); Assert.assertEquals("review", incoming.getDestServiceName()); Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint()); @@ -135,38 +133,32 @@ public class K8sHTTPAnalysisTest { StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder(); JsonFormat.parser().merge(isr, requestBuilder); - analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs() - .getLogEntry(0), Role.SIDECAR); + List result = this.analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR); - Assert.assertEquals(1, analysis.metrics.size()); + Assert.assertEquals(1, result.size()); - ServiceMeshMetric.Builder incoming = analysis.metrics.get(0); + ServiceMeshMetric.Builder incoming = result.get(0); Assert.assertEquals("productpage", incoming.getSourceServiceName()); Assert.assertEquals("detail", incoming.getDestServiceName()); Assert.assertEquals(DetectPoint.client, incoming.getDetectPoint()); } } - public static class MockK8sAnalysis extends K8sALSServiceMeshHTTPAnalysis { - private List metrics = new ArrayList<>(); + public static class MockK8SAnalysis extends K8sALSServiceMeshHTTPAnalysis { @Override - public void init(EnvoyMetricReceiverConfig config) { + public void init(ModuleManager manager, EnvoyMetricReceiverConfig config) { serviceRegistry = mock(K8SServiceRegistry.class); when(serviceRegistry.findService(anyString())).thenReturn(ServiceMetaInfo.UNKNOWN); when(serviceRegistry.findService("10.44.2.56")).thenReturn(new ServiceMetaInfo("ingress", "ingress-Inst")); when(serviceRegistry.findService("10.44.2.54")).thenReturn(new ServiceMetaInfo("productpage", "productpage-Inst")); when(serviceRegistry.findService("10.44.6.66")).thenReturn(new ServiceMetaInfo("detail", "detail-Inst")); - when(serviceRegistry.findService("10.44.2.55")).thenReturn(new ServiceMetaInfo("review", "detail-Inst")); + when(serviceRegistry.findService("10.44.2.55")).thenReturn(new ServiceMetaInfo("review", "review-Inst")); } - @Override - protected void forward(ServiceMeshMetric.Builder metric) { - metrics.add(metric); - } } - private static InputStream getResourceAsStream(final String resource) { + public static InputStream getResourceAsStream(final String resource) { final InputStream in = getContextClassLoader().getResourceAsStream(resource); return in == null ? MetricServiceGRPCHandlerTestMain.class.getResourceAsStream(resource) : in; } diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/FieldsHelperTest.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/FieldsHelperTest.java new file mode 100644 index 0000000000..35e91f8583 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/FieldsHelperTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als.mx; + +import com.google.protobuf.util.JsonFormat; +import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage; +import java.io.ByteArrayInputStream; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.Collection; +import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.powermock.reflect.Whitebox; + +import static org.apache.skywalking.oap.server.receiver.envoy.als.k8s.K8SALSServiceMeshHTTPAnalysisTest.getResourceAsStream; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +@RunWith(Parameterized.class) +public class FieldsHelperTest { + + @Parameterized.Parameter() + public String mapping; + + @Parameterized.Parameter(1) + public String expectedServiceName; + + @Parameterized.Parameter(2) + public String expectedServiceInstanceName; + + @Parameterized.Parameters(name = "{index}: {0}") + public static Collection data() { + return Arrays.asList(new Object[][] { + { + "serviceName: ${LABELS.app}\nserviceInstanceName: ${NAME}", + "productpage", + "productpage-v1-65576bb7bf-4mzsp" + }, + { + "serviceName: ${LABELS.app}-${LABELS.version}\nserviceInstanceName: ${NAME}.${NAMESPACE}", + "productpage-v1", + "productpage-v1-65576bb7bf-4mzsp.default" + }, + { + "serviceName: ${LABELS.app}-${CLUSTER_ID}\nserviceInstanceName: ${NAME}.${NAMESPACE}.${SERVICE_ACCOUNT}", + "productpage-Kubernetes", + "productpage-v1-65576bb7bf-4mzsp.default.bookinfo-productpage" + }, + { + "serviceName: fixed-${LABELS.app}\nserviceInstanceName: yeah_${NAME}", + "fixed-productpage", + "yeah_productpage-v1-65576bb7bf-4mzsp" + } + }); + } + + @Before + public void setUp() { + Whitebox.setInternalState(FieldsHelper.SINGLETON, "initialized", false); + } + + @Test + public void testFormat() throws Exception { + try (final InputStreamReader isr = new InputStreamReader(getResourceAsStream("field-helper.msg"))) { + final StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder(); + JsonFormat.parser().merge(isr, requestBuilder); + final ServiceMetaInfo info = new ServiceMetaInfo(); + FieldsHelper.SINGLETON.init(new ByteArrayInputStream(mapping.getBytes())); + FieldsHelper.SINGLETON.inflate( + requestBuilder.getIdentifier().getNode().getMetadata(), + info + ); + assertThat(info.getServiceName(), equalTo(expectedServiceName)); + } + } +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/field-helper.msg b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/field-helper.msg new file mode 100644 index 0000000000..9d589e8453 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/field-helper.msg @@ -0,0 +1,90 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +{ + "identifier": { + "node": { + "id": "sidecar~172.18.0.8~productpage-v1-65576bb7bf-4mzsp.default~default.svc.cluster.local", + "cluster": "productpage.default", + "metadata": { + "PROXY_CONFIG": { + "concurrency": 2.0, + "envoyAccessLogService": { + "address": "0.tcp.ngrok.io:13760" + }, + "statNameLength": 189.0, + "configPath": "./etc/istio/proxy", + "parentShutdownDuration": "60s", + "proxyAdminPort": 15000.0, + "controlPlaneAuthPolicy": "MUTUAL_TLS", + "drainDuration": "45s", + "proxyMetadata": { + "DNS_AGENT": "" + }, + "terminationDrainDuration": "5s", + "tracing": { + "zipkin": { + "address": "zipkin.istio-system:9411" + } + }, + "statusPort": 15020.0, + "serviceCluster": "productpage.default", + "envoyMetricsService": { + }, + "discoveryAddress": "istiod.istio-system.svc:15012", + "binaryPath": "/usr/local/bin/envoy" + }, + "PLATFORM_METADATA": { + "gcp_location": "us-central1-a", + "gcp_gce_instance_id": "2148869885222929334", + "gcp_gce_instance": "zhenxu-test", + "gcp_project_number": "191872121544", + "gcp_project": "skywalking-live-demo" + }, + "CLUSTER_ID": "Kubernetes", + "APP_CONTAINERS": "productpage", + "LABELS": { + "service.istio.io/canonical-name": "productpage", + "version": "v1", + "security.istio.io/tlsMode": "istio", + "app": "productpage", + "service.istio.io/canonical-revision": "v1", + "pod-template-hash": "65576bb7bf", + "istio.io/rev": "default" + }, + "ISTIO_PROXY_SHA": "istio-proxy:262253d9d066f8ef7ed82fd175c28b8f95acbec0", + "NAME": "productpage-v1-65576bb7bf-4mzsp", + "NAMESPACE": "default", + "EXCHANGE_KEYS": "NAME,NAMESPACE,INSTANCE_IPS,LABELS,OWNER,PLATFORM_METADATA,WORKLOAD_NAME,MESH_ID,SERVICE_ACCOUNT,CLUSTER_ID", + "INSTANCE_IPS": "172.18.0.8", + "POD_PORTS": "[{\"containerPort\":9080,\"protocol\":\"TCP\"}]", + "INTERCEPTION_MODE": "REDIRECT", + "SERVICE_ACCOUNT": "bookinfo-productpage", + "MESH_ID": "cluster.local", + "SDS": "true", + "WORKLOAD_NAME": "productpage-v1", + "OWNER": "kubernetes://apis/apps/v1/namespaces/default/deployments/productpage-v1", + "ISTIO_VERSION": "1.7.1" + }, + "locality": { + "region": "us-central1", + "zone": "us-central1-a" + }, + "buildVersion": "262253d9d066f8ef7ed82fd175c28b8f95acbec0/1.15.0/Clean/RELEASE/BoringSSL" + }, + "logName": "http_envoy_accesslog" + } +} diff --git a/oap-server/server-receiver-plugin/receiver-proto/pom.xml b/oap-server/server-receiver-plugin/receiver-proto/pom.xml index e7ca3bf7c4..117bb1d424 100644 --- a/oap-server/server-receiver-plugin/receiver-proto/pom.xml +++ b/oap-server/server-receiver-plugin/receiver-proto/pom.xml @@ -17,7 +17,8 @@ ~ --> - + server-receiver-plugin org.apache.skywalking @@ -28,9 +29,45 @@ receiver-proto jar + + ${basedir}/src/main/fbs + ${project.build.directory}/generated-sources/fbs/java + ${project.build.directory}/bin/flatc + tar.gz + + + + + com.google.flatbuffers + flatbuffers-java + provided + + + + + + windows + + + Windows + + + + zip + + + + + + + kr.motd.maven + os-maven-plugin + ${os-maven-plugin.version} + + - + kr.motd.maven os-maven-plugin ${os-maven-plugin.version} @@ -53,10 +90,12 @@ protobuf-java directly, you will be transitively depending on the protobuf-java version that grpc depends on. --> - com.google.protobuf:protoc:${com.google.protobuf.protoc.version}:exe:${os.detected.classifier} + + com.google.protobuf:protoc:${com.google.protobuf.protoc.version}:exe:${os.detected.classifier} grpc-java - io.grpc:protoc-gen-grpc-java:${protoc-gen-grpc-java.plugin.version}:exe:${os.detected.classifier} + + io.grpc:protoc-gen-grpc-java:${protoc-gen-grpc-java.plugin.version}:exe:${os.detected.classifier} @@ -68,6 +107,77 @@ + + + org.apache.maven.plugins + maven-dependency-plugin + ${maven-dependency-plugin.version} + + + unpack + generate-sources + + unpack + + + + + com.github.davidmoten + flatbuffers-compiler + 1.12.0.1 + ${fbs.compiler.artifact.type} + distribution-${os.detected.name} + true + ${project.build.directory} + + + + + + + + org.codehaus.mojo + exec-maven-plugin + ${exec-maven-plugin.version} + + + + exec + + generate-sources + + ${fbs.compiler} + ${fbs.sources} + + --java + --gen-mutable + -o + ${fbs.generated.sources} + istio/node-info.fbs + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + ${build-helper-maven-plugin.version} + + + add-source + generate-sources + + add-source + + + + ${fbs.generated.sources} + + + + + - \ No newline at end of file + diff --git a/oap-server/server-receiver-plugin/receiver-proto/src/main/fbs/istio/node-info.fbs b/oap-server/server-receiver-plugin/receiver-proto/src/main/fbs/istio/node-info.fbs new file mode 100644 index 0000000000..8b33c3a224 --- /dev/null +++ b/oap-server/server-receiver-plugin/receiver-proto/src/main/fbs/istio/node-info.fbs @@ -0,0 +1,51 @@ +/* Copyright 2020 Istio Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Original File Location: +// https://github.com/istio/proxy/blob/bcdc1684df0839a612526f688ff7b475902f2feb/extensions/common/node_info.fbs + +namespace Wasm.Common; + +table KeyVal { + key:string (key); + value:string; +} + +// NodeInfo represents the information extracted from proxy node metadata. +table FlatNode { + // Name of the node. e.g. in k8s, name is the pod name. + name:string; + // Namespace that the node runs in. + namespace:string; + // K8s or vm workload attributes. + labels:[KeyVal]; + owner:string; + workload_name:string; + // Platform metadata uses prefixed keys + // GCP uses gcp_* keys + platform_metadata:[KeyVal]; + // Version identifier for the proxy. + istio_version:string; + // Unique identifier for the mesh. Taken from global mesh id parameter (or + // the configured trust domain when not specified). + mesh_id:string; + // List of short names for application containers that are using this proxy. + // This is only used for kubernetes, and is populated by the sidecar injector. + app_containers:[string]; + // Identifier for the cluster to which this workload belongs (for k8s workloads). + cluster_id:string; +} + +root_type FlatNode; diff --git a/pom.xml b/pom.xml index ad73b569bf..815d3d2e53 100755 --- a/pom.xml +++ b/pom.xml @@ -207,9 +207,11 @@ 0.6.1 1.6.0 1.8 + 2.10 2.8.2 3.1.0 2.22.0 + 3.2.0 2.22.0 3.1.0 3.1.1 @@ -225,6 +227,7 @@ 1.5 2.7 true + @@ -498,6 +501,7 @@ skywalking-ui/package-lock.json + **/src/main/fbs/istio/** **/src/main/proto/envoy/** **/src/main/proto/gogoproto/gogo.proto **/src/main/proto/google/** diff --git a/test/e2e/e2e-common/src/main/java/org/apache/skywalking/e2e/retryable/RetryableTest.java b/test/e2e/e2e-common/src/main/java/org/apache/skywalking/e2e/retryable/RetryableTest.java index f6218c8df9..f6fdbb9358 100644 --- a/test/e2e/e2e-common/src/main/java/org/apache/skywalking/e2e/retryable/RetryableTest.java +++ b/test/e2e/e2e-common/src/main/java/org/apache/skywalking/e2e/retryable/RetryableTest.java @@ -49,7 +49,7 @@ public @interface RetryableTest { /** * @return maximum times to retry, or -1 for infinite retries. {@code -1} by default. */ - int value() default 120; + int value() default 60; /** * @return the interval between any two retries, in millisecond. {@code 1000} by default. diff --git a/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/mesh/IDManager.java b/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/mesh/IDManager.java deleted file mode 100644 index ce2fb0c15e..0000000000 --- a/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/mesh/IDManager.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.e2e.mesh; - -import java.nio.charset.StandardCharsets; -import java.util.Base64; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.RequiredArgsConstructor; - -public class IDManager { - public static class ServiceID { - - public static ServiceIDDefinition analysisId(String id) { - final String[] strings = id.split("\\."); - if (strings.length != 2) { - throw new RuntimeException("Can't split service id into 2 parts, " + id); - } - return new ServiceIDDefinition( - decode(strings[0]), - Integer.parseInt(strings[1]) == 1 - ); - } - - @RequiredArgsConstructor - @Getter - @EqualsAndHashCode - public static class ServiceIDDefinition { - private final String name; - - private final boolean isReal; - } - } - - /** - * @param base64text Base64 encoded UTF-8 string - * @return normal literal string - */ - private static String decode(String base64text) { - return new String(Base64.getDecoder().decode(base64text), StandardCharsets.UTF_8); - } -} diff --git a/test/e2e/e2e-test/src/test/resources/expected/als/topo.yml b/test/e2e/e2e-test/src/test/resources/expected/als/topo.yml index 0d5ee72907..b60dcc0550 100644 --- a/test/e2e/e2e-test/src/test/resources/expected/als/topo.yml +++ b/test/e2e/e2e-test/src/test/resources/expected/als/topo.yml @@ -47,6 +47,7 @@ calls: target: cHJvZHVjdHBhZ2U=.1 detectPoints: - CLIENT + - SERVER - id: not null source: cHJvZHVjdHBhZ2U=.1 target: cmV2aWV3cw==.1 diff --git a/tools/dependencies/known-oap-backend-dependencies-es7.txt b/tools/dependencies/known-oap-backend-dependencies-es7.txt index cbd448fd51..ea82e6d442 100755 --- a/tools/dependencies/known-oap-backend-dependencies-es7.txt +++ b/tools/dependencies/known-oap-backend-dependencies-es7.txt @@ -44,6 +44,7 @@ rank-eval-client-7.5.0.jar error_prone_annotations-2.3.2.jar etcd4j-2.17.0.jar failureaccess-1.0.1.jar +flatbuffers-java-1.12.0.jar freemarker-2.3.28.jar graphql-java-8.0.jar graphql-java-tools-5.2.3.jar diff --git a/tools/dependencies/known-oap-backend-dependencies.txt b/tools/dependencies/known-oap-backend-dependencies.txt index 9c37c31138..12041d54bd 100755 --- a/tools/dependencies/known-oap-backend-dependencies.txt +++ b/tools/dependencies/known-oap-backend-dependencies.txt @@ -41,6 +41,7 @@ elasticsearch-x-content-6.3.2.jar error_prone_annotations-2.3.2.jar etcd4j-2.17.0.jar failureaccess-1.0.1.jar +flatbuffers-java-1.12.0.jar freemarker-2.3.28.jar graphql-java-8.0.jar graphql-java-tools-5.2.3.jar -- GitLab