diff --git a/.github/workflows/docker-ci.yaml b/.github/workflows/docker-ci.yaml
index 49f1ffc3a8862e1b3d4ed18ceb7b323567db126a..60834cd198df54b5af9b96718dbca670175a4069 100644
--- a/.github/workflows/docker-ci.yaml
+++ b/.github/workflows/docker-ci.yaml
@@ -31,6 +31,7 @@ jobs:
runs-on: ubuntu-16.04
timeout-minutes: 90
strategy:
+ fail-fast: true
matrix:
es: [es6, es7]
steps:
diff --git a/.github/workflows/e2e.istio.yaml b/.github/workflows/e2e.istio.yaml
index 0ff13c06d5c8ea9ac68d1eec245816f3d4a51f79..4d24cee0dd5ea3aa23429787d31e86b41a7e2fb1 100644
--- a/.github/workflows/e2e.istio.yaml
+++ b/.github/workflows/e2e.istio.yaml
@@ -37,7 +37,11 @@ jobs:
als:
runs-on: ubuntu-16.04
timeout-minutes: 60
- name: Istio+Envoy Access Log Service
+ strategy:
+ fail-fast: true
+ matrix:
+ analyzer: [k8s-mesh, mx-mesh]
+ name: Istio+ALS(${{ matrix.analyzer }})
steps:
- uses: actions/checkout@v2
with:
@@ -73,7 +77,7 @@ jobs:
--set elasticsearch.replicas=1 \
--set elasticsearch.minimumMasterNodes=1 \
--set elasticsearch.imageTag=7.5.1 \
- --set oap.env.SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS=k8s-mesh \
+ --set oap.env.SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS=${{ matrix.analyzer }} \
--set oap.envoy.als.enabled=true \
--set oap.replicas=1 \
--set ui.image.repository=skywalking/ui \
diff --git a/CHANGES.md b/CHANGES.md
index 42eb2f8c97b38f8079cc3442e3cf783290bf5cb3..46e42797911ff8a654e0f4bd254a06846b668a5a 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -23,6 +23,7 @@ Release Notes.
* Add the `@SuperDataset` annotation for BrowserErrorLog.
* Add the thread pool to the Kafka fetcher to increase the performance.
* Add `contain` and `not contain` OPS in OAL.
+* Add Envoy ALS analyzer based on metadata exchange.
* Support keeping collecting the slowly segments in the sampling mechanism.
* Support choose files to active the meter analyzer.
* Improve Kubernetes service registry for ALS analysis.
diff --git a/LICENSE b/LICENSE
index b3f1f516adb53663d7d83ac7b44ec4f1dafd14af..cb1efe582afef84184309691da05d06f556a5a11 100644
--- a/LICENSE
+++ b/LICENSE
@@ -221,6 +221,7 @@ The text of each license is the standard Apache 2.0 license.
proto files from prometheus/client_model: https://github.com/prometheus/client_model Apache 2.0
proto files from lyft/protoc-gen-validate: https://github.com/lyft/protoc-gen-validate Apache 2.0
proto files from gogo/googleapis: https://github.com/gogo/googleapis Apache 2.0
+ flatbuffers files from istio/proxy: https://github.com/istio/proxy Apache 2.0
mvnw files from https://github.com/takari/maven-wrapper Apache 2.0
svg files from skywalking-ui/src/assets/icons: https://github.com/google/material-design-icons Apache 2.0
diff --git a/dist-material/release-docs/LICENSE b/dist-material/release-docs/LICENSE
index b5d3a524eed303f1eb2d24dc3a2ab6c3ec977469..c390f00da432c3282c1d8f6807bbd7515bf7dcf6 100755
--- a/dist-material/release-docs/LICENSE
+++ b/dist-material/release-docs/LICENSE
@@ -232,6 +232,7 @@ The text of each license is the standard Apache 2.0 license.
Google: gson 2.8.6: https://github.com/google/gson , Apache 2.0
Google: proto-google-common-protos 1.17.0: https://github.com/googleapis/googleapis , Apache 2.0
Google: jsr305 3.0.2: http://central.maven.org/maven2/com/google/code/findbugs/jsr305/3.0.0/jsr305-3.0.0.pom , Apache 2.0
+ Google: flatbuffers-java 1.12.0: https://github.com/google/flatbuffers/ , Apache 2.0
Elasticsearch BV (Elasticsearch) 6.3.2: https://www.elastic.co/products/elasticsearch , Apache 2.0
Elasticsearch BV (Elasticsearch) 7.5.0: https://www.elastic.co/products/elasticsearch , Apache 2.0
aggs-matrix-stats-client 6.3.2, 7.5.0: https://github.com/elastic/elasticsearch/tree/master/modules/aggs-matrix-stats Apache 2.0
@@ -319,6 +320,7 @@ The text of each license is the standard Apache 2.0 license.
proto files from prometheus/client_model: https://github.com/prometheus/client_model Apache 2.0
proto files from lyft/protoc-gen-validate: https://github.com/lyft/protoc-gen-validate Apache 2.0
proto files from gogo/googleapis: https://github.com/gogo/googleapis Apache 2.0
+ flatbuffers files from istio/proxy: https://github.com/istio/proxy Apache 2.0
json-flatter 0.6.0: https://github.com/wnameless/json-flattener Apache 2.0
Apache: commons-text 1.4: https://github.com/apache/commons-text Apache 2.0
sundrio 0.9.2: https://github.com/sundrio/sundrio Apache 2.0
diff --git a/docker/oap/log4j2.xml b/docker/oap/log4j2.xml
index fadfaa1a218eb732365d659285caeb9fa402a8ca..28994ea966c169bab060252bafcb46f30e72947f 100644
--- a/docker/oap/log4j2.xml
+++ b/docker/oap/log4j2.xml
@@ -29,7 +29,7 @@
-
+
diff --git a/docs/en/setup/envoy/als_setting.md b/docs/en/setup/envoy/als_setting.md
index 5c02dbd0c33f7eb7363bb43645a8fcd60fd2a0be..7a5bb3e347ef536dedb8f389002a155768893e66 100644
--- a/docs/en/setup/envoy/als_setting.md
+++ b/docs/en/setup/envoy/als_setting.md
@@ -19,7 +19,14 @@ You need three steps to open ALS.
Note: SkyWalking OAP service is at skywalking namespace, and the port of gRPC service is 11800
2. (Default is ACTIVATED) Activate SkyWalking [envoy receiver](../backend/backend-receivers.md).
-3. Active ALS k8s-mesh analysis, set system env variable `SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS`=`k8s-mesh`
+3. Active ALS analyzer, there are two available analyzers, `k8s-mesh` and `mx-mesh`,
+`k8s-mesh` uses the metadata from Kubernetes cluster, hence in this analyzer OAP needs access roles to `Pod`, `Service`, and `Endpoints`;
+`mx-mesh` uses the Envoy metadata exchange mechanism to get the service name, etc.,
+this analyzer requires Istio to enable the metadata exchange filter(you can enable it by
+`--set telemetry.v2.enabled=true`, or if you're using Istio 1.7+ and installing it with profile `demo`/`preview`,
+it should be enabled then).
+Setting system env variable **SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS** to activate the analyzer,
+such as `SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS=k8s-mesh`.
```yaml
envoy-metric:
selector: ${SW_ENVOY_METRIC:default}
diff --git a/oap-server/pom.xml b/oap-server/pom.xml
index 90185e9c7663681bcf5e919cbdb3e8c96e7cbc6b..319bc2c3471474d2e4cd75935cce5064bebee6f2 100755
--- a/oap-server/pom.xml
+++ b/oap-server/pom.xml
@@ -100,6 +100,7 @@
2.4.1
2.4.6.RELEASE
1.9.4
+ 1.12.0
@@ -554,6 +555,12 @@
+
+
+ com.google.flatbuffers
+ flatbuffers-java
+ ${flatbuffers-java.version}
+
diff --git a/oap-server/server-bootstrap/src/main/resources/metadata-service-mapping.yaml b/oap-server/server-bootstrap/src/main/resources/metadata-service-mapping.yaml
new file mode 100644
index 0000000000000000000000000000000000000000..3526f66399a4fe568398d48c04d2bf7129d8d2a5
--- /dev/null
+++ b/oap-server/server-bootstrap/src/main/resources/metadata-service-mapping.yaml
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+serviceName: ${LABELS.app}
+serviceInstanceName: ${NAME}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml
index 5e2428fd0de87809dfda464dbe1d55bf3eddd9f3..5d46fd3f34d93d6b88ccf0adc3761183e6da8714 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml
@@ -61,5 +61,10 @@
${org.apache.tomcat.annotations-api.version}
provided
+
+
+ com.google.flatbuffers
+ flatbuffers-java
+
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java
index b715ee3208538175d6a13a38988a019817f19280..4b8b79d8128912efa0f039866784721eee71c0e1 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java
@@ -25,10 +25,10 @@ import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;
import java.util.ServiceLoader;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.source.Source;
-import org.apache.skywalking.oap.server.core.source.SourceReceiver;
+import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher;
+import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis;
import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
@@ -42,18 +42,18 @@ import org.slf4j.LoggerFactory;
public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogServiceImplBase {
private static final Logger LOGGER = LoggerFactory.getLogger(AccessLogServiceGRPCHandler.class);
private final List envoyHTTPAnalysisList;
- private final SourceReceiver sourceReceiver;
+
private final CounterMetrics counter;
private final HistogramMetrics histogram;
private final CounterMetrics sourceDispatcherCounter;
- public AccessLogServiceGRPCHandler(ModuleManager manager, EnvoyMetricReceiverConfig config) {
+ public AccessLogServiceGRPCHandler(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException {
ServiceLoader alshttpAnalyses = ServiceLoader.load(ALSHTTPAnalysis.class);
envoyHTTPAnalysisList = new ArrayList<>();
for (String httpAnalysisName : config.getAlsHTTPAnalysis()) {
for (ALSHTTPAnalysis httpAnalysis : alshttpAnalyses) {
if (httpAnalysisName.equals(httpAnalysis.name())) {
- httpAnalysis.init(config);
+ httpAnalysis.init(manager, config);
envoyHTTPAnalysisList.add(httpAnalysis);
}
}
@@ -61,8 +61,6 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS
LOGGER.debug("envoy HTTP analysis: " + envoyHTTPAnalysisList);
- sourceReceiver = manager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
-
MetricsCreator metricCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
counter = metricCreator.createCounter("envoy_als_in_count", "The count of envoy ALS metric received", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
histogram = metricCreator.createHistogramMetric("envoy_als_in_latency", "The process latency of service ALS metric receiver", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
@@ -103,7 +101,7 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS
case HTTP_LOGS:
StreamAccessLogsMessage.HTTPAccessLogEntries logs = message.getHttpLogs();
- List sourceResult = new ArrayList<>();
+ List sourceResult = new ArrayList<>();
for (ALSHTTPAnalysis analysis : envoyHTTPAnalysisList) {
logs.getLogEntryList().forEach(log -> {
sourceResult.addAll(analysis.analysis(identifier, log, role));
@@ -111,7 +109,8 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS
}
sourceDispatcherCounter.inc(sourceResult.size());
- sourceResult.forEach(sourceReceiver::receive);
+ sourceResult.forEach(TelemetryDataDispatcher::process);
+ break;
}
} finally {
timer.finish();
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
index 4297f3d631b87f4dfd0699eb78245ef93c0a2080..ae75c327e94962bbba1aea60edbd15ae97698dc6 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
@@ -21,7 +21,9 @@ package org.apache.skywalking.oap.server.receiver.envoy.als;
import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage;
import java.util.List;
-import org.apache.skywalking.oap.server.core.source.Source;
+import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
/**
@@ -30,9 +32,9 @@ import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig
public interface ALSHTTPAnalysis {
String name();
- void init(EnvoyMetricReceiverConfig config);
+ void init(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException;
- List analysis(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry, Role role);
+ List analysis(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry, Role role);
Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, Role prev);
}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java
new file mode 100644
index 0000000000000000000000000000000000000000..fae8cd1d5213169e9b9b97b8ff98512f02eb7321
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als;
+
+import io.envoyproxy.envoy.api.v2.core.Node;
+import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry;
+import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
+
+@Slf4j
+public abstract class AbstractALSAnalyzer implements ALSHTTPAnalysis {
+
+ @Override
+ public Role identify(final StreamAccessLogsMessage.Identifier alsIdentifier, final Role defaultRole) {
+ if (alsIdentifier == null) {
+ return defaultRole;
+ }
+ final Node node = alsIdentifier.getNode();
+ if (node == null) {
+ return defaultRole;
+ }
+ final String id = node.getId();
+ if (id.startsWith("router~")) {
+ return Role.PROXY;
+ } else if (id.startsWith("sidecar~")) {
+ return Role.SIDECAR;
+ }
+ return defaultRole;
+ }
+
+ /**
+ * Create an adapter to adapt the {@link HTTPAccessLogEntry log entry} into a {@link ServiceMeshMetric.Builder}.
+ *
+ * @param entry the access log entry that is to be adapted from.
+ * @param sourceService the source service.
+ * @param targetService the target/destination service.
+ * @return an adapter that adapts {@link HTTPAccessLogEntry log entry} into a {@link ServiceMeshMetric.Builder}.
+ */
+ protected LogEntry2MetricsAdapter newAdapter(
+ final HTTPAccessLogEntry entry,
+ final ServiceMetaInfo sourceService,
+ final ServiceMetaInfo targetService
+ ) {
+ return new LogEntry2MetricsAdapter(entry, sourceService, targetService);
+ }
+
+}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java
new file mode 100644
index 0000000000000000000000000000000000000000..de05c7a2681799674ac01f509e3a8a42566d65f4
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als;
+
+import com.google.protobuf.Duration;
+import com.google.protobuf.Timestamp;
+import com.google.protobuf.UInt32Value;
+import io.envoyproxy.envoy.data.accesslog.v2.AccessLogCommon;
+import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry;
+import io.envoyproxy.envoy.data.accesslog.v2.HTTPRequestProperties;
+import io.envoyproxy.envoy.data.accesslog.v2.HTTPResponseProperties;
+import io.envoyproxy.envoy.data.accesslog.v2.TLSProperties;
+import java.time.Instant;
+import java.util.Optional;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.apm.network.common.v3.DetectPoint;
+import org.apache.skywalking.apm.network.servicemesh.v3.Protocol;
+import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.Optional.ofNullable;
+
+/**
+ * Adapt {@link HTTPAccessLogEntry} objects to {@link ServiceMeshMetric} builders.
+ */
+@RequiredArgsConstructor
+public class LogEntry2MetricsAdapter {
+
+ public static final String NON_TLS = "NONE";
+
+ public static final String M_TLS = "mTLS";
+
+ public static final String TLS = "TLS";
+
+ /**
+ * The access log entry that is to be adapted into metrics builders.
+ */
+ private final HTTPAccessLogEntry entry;
+
+ private final ServiceMetaInfo sourceService;
+
+ private final ServiceMetaInfo targetService;
+
+ /**
+ * Adapt the {@code entry} into a downstream metrics {@link ServiceMeshMetric.Builder}.
+ *
+ * @return the {@link ServiceMeshMetric.Builder} adapted from the given entry.
+ */
+ public ServiceMeshMetric.Builder adaptToDownstreamMetrics() {
+ final AccessLogCommon properties = entry.getCommonProperties();
+ final long startTime = formatAsLong(properties.getStartTime());
+ final long duration = formatAsLong(properties.getTimeToLastDownstreamTxByte());
+
+ return adaptCommonPart()
+ .setStartTime(startTime)
+ .setEndTime(startTime + duration)
+ .setLatency((int) Math.max(1L, duration))
+ .setDetectPoint(DetectPoint.server);
+ }
+
+ /**
+ * Adapt the {@code entry} into a upstream metrics {@link ServiceMeshMetric.Builder}.
+ *
+ * @return the {@link ServiceMeshMetric.Builder} adapted from the given entry.
+ */
+ public ServiceMeshMetric.Builder adaptToUpstreamMetrics() {
+ final AccessLogCommon properties = entry.getCommonProperties();
+ final long startTime = formatAsLong(properties.getStartTime());
+ final long outboundStartTime = startTime + formatAsLong(properties.getTimeToFirstUpstreamTxByte());
+ final long outboundEndTime = startTime + formatAsLong(properties.getTimeToLastUpstreamRxByte());
+
+ return adaptCommonPart()
+ .setStartTime(outboundStartTime)
+ .setEndTime(outboundEndTime)
+ .setLatency((int) Math.max(1L, outboundEndTime - outboundStartTime))
+ .setDetectPoint(DetectPoint.client);
+ }
+
+ protected ServiceMeshMetric.Builder adaptCommonPart() {
+ final AccessLogCommon properties = entry.getCommonProperties();
+ final String endpoint = ofNullable(entry.getRequest()).map(HTTPRequestProperties::getPath).orElse("/");
+ final int responseCode = ofNullable(entry.getResponse()).map(HTTPResponseProperties::getResponseCode).map(UInt32Value::getValue).orElse(200);
+ final boolean status = responseCode >= 200 && responseCode < 400;
+ final Protocol protocol = requestProtocol(entry.getRequest());
+ final String tlsMode = parseTLS(properties.getTlsProperties());
+
+ final ServiceMeshMetric.Builder builder =
+ ServiceMeshMetric.newBuilder()
+ .setEndpoint(endpoint)
+ .setResponseCode(Math.toIntExact(responseCode))
+ .setStatus(status)
+ .setProtocol(protocol)
+ .setTlsMode(tlsMode);
+
+ Optional.ofNullable(sourceService)
+ .map(ServiceMetaInfo::getServiceName)
+ .ifPresent(builder::setSourceServiceName);
+ Optional.ofNullable(sourceService)
+ .map(ServiceMetaInfo::getServiceInstanceName)
+ .ifPresent(builder::setSourceServiceInstance);
+ Optional.ofNullable(targetService)
+ .map(ServiceMetaInfo::getServiceName)
+ .ifPresent(builder::setDestServiceName);
+ Optional.ofNullable(targetService)
+ .map(ServiceMetaInfo::getServiceInstanceName)
+ .ifPresent(builder::setDestServiceInstance);
+
+ return builder;
+ }
+
+ protected static long formatAsLong(final Timestamp timestamp) {
+ return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos()).toEpochMilli();
+ }
+
+ protected static long formatAsLong(final Duration duration) {
+ return Instant.ofEpochSecond(duration.getSeconds(), duration.getNanos()).toEpochMilli();
+ }
+
+ protected static Protocol requestProtocol(final HTTPRequestProperties request) {
+ if (request == null) {
+ return Protocol.HTTP;
+ }
+ final String scheme = request.getScheme();
+ if (scheme.startsWith("http")) {
+ return Protocol.HTTP;
+ }
+ return Protocol.gRPC;
+ }
+
+ protected static String parseTLS(final TLSProperties properties) {
+ if (properties == null) {
+ return NON_TLS;
+ }
+ if (isNullOrEmpty(Optional.ofNullable(properties.getLocalCertificateProperties())
+ .orElse(TLSProperties.CertificateProperties.newBuilder().build())
+ .getSubject())) {
+ return NON_TLS;
+ }
+ if (isNullOrEmpty(Optional.ofNullable(properties.getPeerCertificateProperties())
+ .orElse(TLSProperties.CertificateProperties.newBuilder().build())
+ .getSubject())) {
+ return TLS;
+ }
+ return M_TLS;
+ }
+
+}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java
index 41ba0e59da5ca81312467c6892c91f17cf7bcec7..75a9c3c7cb544bffd679aac84f25aa106d183488 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java
@@ -19,8 +19,9 @@
package org.apache.skywalking.oap.server.receiver.envoy.als;
import java.util.List;
-import java.util.Objects;
+import lombok.EqualsAndHashCode;
import lombok.Getter;
+import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
@@ -28,13 +29,16 @@ import lombok.ToString;
@Getter
@Setter
@ToString
+@NoArgsConstructor
+@EqualsAndHashCode(onlyExplicitlyIncluded = true)
public class ServiceMetaInfo {
+ @EqualsAndHashCode.Include
private String serviceName;
+
+ @EqualsAndHashCode.Include
private String serviceInstanceName;
- private List tags;
- public ServiceMetaInfo() {
- }
+ private List tags;
public ServiceMetaInfo(String serviceName, String serviceInstanceName) {
this.serviceName = serviceName;
@@ -43,26 +47,12 @@ public class ServiceMetaInfo {
@Setter
@Getter
- @RequiredArgsConstructor
@ToString
+ @RequiredArgsConstructor
public static class KeyValue {
private final String key;
- private final String value;
- }
- @Override
- public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
- ServiceMetaInfo info = (ServiceMetaInfo) o;
- return Objects.equals(serviceName, info.serviceName) && Objects.equals(serviceInstanceName, info.serviceInstanceName);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(serviceName, serviceInstanceName);
+ private final String value;
}
public static final ServiceMetaInfo UNKNOWN = new ServiceMetaInfo("UNKNOWN", "UNKNOWN");
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java
index f06d941578cb1f4eccc3381464c588f24881790a..0e2e0682f559f3df517f8bd8f1fff22038158f7b 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java
@@ -18,46 +18,30 @@
package org.apache.skywalking.oap.server.receiver.envoy.als.k8s;
-import com.google.common.base.Strings;
-import com.google.protobuf.Duration;
-import com.google.protobuf.Timestamp;
import io.envoyproxy.envoy.api.v2.core.Address;
-import io.envoyproxy.envoy.api.v2.core.Node;
import io.envoyproxy.envoy.api.v2.core.SocketAddress;
import io.envoyproxy.envoy.data.accesslog.v2.AccessLogCommon;
import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry;
-import io.envoyproxy.envoy.data.accesslog.v2.HTTPRequestProperties;
-import io.envoyproxy.envoy.data.accesslog.v2.HTTPResponseProperties;
-import io.envoyproxy.envoy.data.accesslog.v2.TLSProperties;
import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage;
-import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Optional;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher;
-import org.apache.skywalking.apm.network.common.v3.DetectPoint;
-import org.apache.skywalking.apm.network.servicemesh.v3.Protocol;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
-import org.apache.skywalking.oap.server.core.source.Source;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
-import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis;
+import org.apache.skywalking.oap.server.receiver.envoy.als.AbstractALSAnalyzer;
import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
+import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.NON_TLS;
+
/**
* Analysis log based on ingress and mesh scenarios.
*/
@Slf4j
-public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis {
- private static final String NON_TLS = "NONE";
-
- private static final String M_TLS = "mTLS";
-
- private static final String TLS = "TLS";
-
+public class K8sALSServiceMeshHTTPAnalysis extends AbstractALSAnalyzer {
protected K8SServiceRegistry serviceRegistry;
@Override
@@ -67,250 +51,113 @@ public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis {
@Override
@SneakyThrows
- public void init(EnvoyMetricReceiverConfig config) {
+ public void init(ModuleManager manager, EnvoyMetricReceiverConfig config) {
serviceRegistry = new K8SServiceRegistry(config);
serviceRegistry.start();
}
@Override
- public List analysis(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry, Role role) {
+ public List analysis(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry, Role role) {
if (serviceRegistry.isEmpty()) {
return Collections.emptyList();
}
switch (role) {
case PROXY:
- analysisProxy(identifier, entry);
- break;
+ return analyzeProxy(entry);
case SIDECAR:
- return analysisSideCar(identifier, entry);
+ return analyzeSideCar(entry);
}
return Collections.emptyList();
}
- protected List analysisSideCar(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry) {
- List sources = new ArrayList<>();
- AccessLogCommon properties = entry.getCommonProperties();
- if (properties != null) {
- String cluster = properties.getUpstreamCluster();
- if (cluster != null) {
- long startTime = formatAsLong(properties.getStartTime());
- long duration = formatAsLong(properties.getTimeToLastDownstreamTxByte());
-
- HTTPRequestProperties request = entry.getRequest();
- String endpoint = "/";
- Protocol protocol = Protocol.HTTP;
- if (request != null) {
- endpoint = request.getPath();
- String schema = request.getScheme();
- if ("http".equals(schema) || "https".equals(schema)) {
- protocol = Protocol.HTTP;
- } else {
- protocol = Protocol.gRPC;
- }
- }
- HTTPResponseProperties response = entry.getResponse();
- int responseCode = 200;
- if (response != null) {
- responseCode = response.getResponseCode().getValue();
- }
- boolean status = responseCode >= 200 && responseCode < 400;
-
- Address downstreamRemoteAddress = properties.getDownstreamRemoteAddress();
- ServiceMetaInfo downstreamService = find(downstreamRemoteAddress.getSocketAddress().getAddress());
- Address downstreamLocalAddress = properties.getDownstreamLocalAddress();
- ServiceMetaInfo localService = find(downstreamLocalAddress.getSocketAddress().getAddress());
- String tlsMode = parseTLS(properties.getTlsProperties());
-
- ServiceMeshMetric.Builder metric = null;
- if (cluster.startsWith("inbound|")) {
- // Server side
- if (downstreamService.equals(ServiceMetaInfo.UNKNOWN)) {
- // Ingress -> sidecar(server side)
- // Mesh telemetry without source, the relation would be generated.
- metric = ServiceMeshMetric.newBuilder()
- .setStartTime(startTime)
- .setEndTime(startTime + duration)
- .setDestServiceName(localService.getServiceName())
- .setDestServiceInstance(localService.getServiceInstanceName())
- .setEndpoint(endpoint)
- .setLatency((int) duration)
- .setResponseCode(Math.toIntExact(responseCode))
- .setStatus(status)
- .setProtocol(protocol)
- .setTlsMode(tlsMode)
- .setDetectPoint(DetectPoint.server);
-
- log.debug("Transformed ingress->sidecar inbound mesh metric {}", metric);
- } else {
- // sidecar -> sidecar(server side)
- metric = ServiceMeshMetric.newBuilder()
- .setStartTime(startTime)
- .setEndTime(startTime + duration)
- .setSourceServiceName(downstreamService.getServiceName())
- .setSourceServiceInstance(downstreamService.getServiceInstanceName())
- .setDestServiceName(localService.getServiceName())
- .setDestServiceInstance(localService.getServiceInstanceName())
- .setEndpoint(endpoint)
- .setLatency((int) duration)
- .setResponseCode(Math.toIntExact(responseCode))
- .setStatus(status)
- .setProtocol(protocol)
- .setTlsMode(tlsMode)
- .setDetectPoint(DetectPoint.server);
-
- log.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric);
- }
- } else if (cluster.startsWith("outbound|")) {
- // sidecar(client side) -> sidecar
- Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress();
- ServiceMetaInfo destService = find(upstreamRemoteAddress.getSocketAddress().getAddress());
+ protected List analyzeSideCar(final HTTPAccessLogEntry entry) {
+ final AccessLogCommon properties = entry.getCommonProperties();
+ if (properties == null) {
+ return Collections.emptyList();
+ }
+ final String cluster = properties.getUpstreamCluster();
+ if (cluster == null) {
+ return Collections.emptyList();
+ }
- metric = ServiceMeshMetric.newBuilder()
- .setStartTime(startTime)
- .setEndTime(startTime + duration)
- .setSourceServiceName(downstreamService.getServiceName())
- .setSourceServiceInstance(downstreamService.getServiceInstanceName())
- .setDestServiceName(destService.getServiceName())
- .setDestServiceInstance(destService.getServiceInstanceName())
- .setEndpoint(endpoint)
- .setLatency((int) duration)
- .setResponseCode(Math.toIntExact(responseCode))
- .setStatus(status)
- .setProtocol(protocol)
- .setTlsMode(tlsMode)
- .setDetectPoint(DetectPoint.client);
+ final List sources = new ArrayList<>();
+
+ final Address downstreamRemoteAddress =
+ properties.hasDownstreamDirectRemoteAddress()
+ ? properties.getDownstreamDirectRemoteAddress()
+ : properties.getDownstreamRemoteAddress();
+ final ServiceMetaInfo downstreamService = find(downstreamRemoteAddress.getSocketAddress().getAddress());
+ final Address downstreamLocalAddress = properties.getDownstreamLocalAddress();
+ final ServiceMetaInfo localService = find(downstreamLocalAddress.getSocketAddress().getAddress());
+
+ if (cluster.startsWith("inbound|")) {
+ // Server side
+ final ServiceMeshMetric.Builder metrics;
+ if (downstreamService.equals(ServiceMetaInfo.UNKNOWN)) {
+ // Ingress -> sidecar(server side)
+ // Mesh telemetry without source, the relation would be generated.
+ metrics = newAdapter(entry, null, localService).adaptToDownstreamMetrics();
+
+ log.debug("Transformed ingress->sidecar inbound mesh metrics {}", metrics);
+ } else {
+ // sidecar -> sidecar(server side)
+ metrics = newAdapter(entry, downstreamService, localService).adaptToDownstreamMetrics();
+
+ log.debug("Transformed sidecar->sidecar(server side) inbound mesh metrics {}", metrics);
+ }
+ sources.add(metrics);
+ } else if (cluster.startsWith("outbound|")) {
+ // sidecar(client side) -> sidecar
+ final Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress();
+ final ServiceMetaInfo destService = find(upstreamRemoteAddress.getSocketAddress().getAddress());
- log.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric);
- }
+ final ServiceMeshMetric.Builder metric = newAdapter(entry, downstreamService, destService).adaptToUpstreamMetrics();
- Optional.ofNullable(metric).ifPresent(this::forward);
- }
+ log.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric);
+ sources.add(metric);
}
+
return sources;
}
- private String parseTLS(TLSProperties properties) {
+ protected List analyzeProxy(final HTTPAccessLogEntry entry) {
+ final AccessLogCommon properties = entry.getCommonProperties();
if (properties == null) {
- return NON_TLS;
- }
- if (Strings.isNullOrEmpty(Optional.ofNullable(properties.getLocalCertificateProperties())
- .orElse(TLSProperties.CertificateProperties.newBuilder().build()).getSubject())) {
- return NON_TLS;
+ return Collections.emptyList();
}
- if (Strings.isNullOrEmpty(Optional.ofNullable(properties.getPeerCertificateProperties())
- .orElse(TLSProperties.CertificateProperties.newBuilder().build()).getSubject())) {
- return TLS;
+ final Address downstreamLocalAddress = properties.getDownstreamLocalAddress();
+ final Address downstreamRemoteAddress = properties.getDownstreamRemoteAddress();
+ final Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress();
+ if (downstreamLocalAddress == null || downstreamRemoteAddress == null || upstreamRemoteAddress == null) {
+ return Collections.emptyList();
}
- return M_TLS;
- }
-
- protected void analysisProxy(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry) {
- AccessLogCommon properties = entry.getCommonProperties();
- if (properties != null) {
- Address downstreamLocalAddress = properties.getDownstreamLocalAddress();
- Address downstreamRemoteAddress = properties.getDownstreamRemoteAddress();
- Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress();
- if (downstreamLocalAddress != null && downstreamRemoteAddress != null && upstreamRemoteAddress != null) {
- SocketAddress downstreamRemoteAddressSocketAddress = downstreamRemoteAddress.getSocketAddress();
- ServiceMetaInfo outside = find(downstreamRemoteAddressSocketAddress.getAddress());
-
- SocketAddress downstreamLocalAddressSocketAddress = downstreamLocalAddress.getSocketAddress();
- ServiceMetaInfo ingress = find(downstreamLocalAddressSocketAddress.getAddress());
- long startTime = formatAsLong(properties.getStartTime());
- long duration = formatAsLong(properties.getTimeToLastDownstreamTxByte());
+ final List result = new ArrayList<>(2);
+ final SocketAddress downstreamRemoteAddressSocketAddress = downstreamRemoteAddress.getSocketAddress();
+ final ServiceMetaInfo outside = find(downstreamRemoteAddressSocketAddress.getAddress());
- HTTPRequestProperties request = entry.getRequest();
- String endpoint = "/";
- Protocol protocol = Protocol.HTTP;
- if (request != null) {
- endpoint = request.getPath();
- String schema = request.getScheme();
- if ("http".equals(schema) || "https".equals(schema)) {
- protocol = Protocol.HTTP;
- } else {
- protocol = Protocol.gRPC;
- }
- }
- HTTPResponseProperties response = entry.getResponse();
- int responseCode = 200;
- if (response != null) {
- responseCode = response.getResponseCode().getValue();
- }
- boolean status = responseCode >= 200 && responseCode < 400;
- String tlsMode = parseTLS(properties.getTlsProperties());
+ final SocketAddress downstreamLocalAddressSocketAddress = downstreamLocalAddress.getSocketAddress();
+ final ServiceMetaInfo ingress = find(downstreamLocalAddressSocketAddress.getAddress());
- ServiceMeshMetric.Builder metric = ServiceMeshMetric.newBuilder()
- .setStartTime(startTime)
- .setEndTime(startTime + duration)
- .setSourceServiceName(outside.getServiceName())
- .setSourceServiceInstance(
- outside.getServiceInstanceName())
- .setDestServiceName(ingress.getServiceName())
- .setDestServiceInstance(
- ingress.getServiceInstanceName())
- .setEndpoint(endpoint)
- .setLatency((int) duration)
- .setResponseCode(Math.toIntExact(responseCode))
- .setStatus(status)
- .setProtocol(protocol)
- .setTlsMode(tlsMode)
- .setDetectPoint(DetectPoint.server);
+ final ServiceMeshMetric.Builder metric = newAdapter(entry, outside, ingress).adaptToDownstreamMetrics();
- log.debug("Transformed ingress inbound mesh metric {}", metric);
- forward(metric);
+ log.debug("Transformed ingress inbound mesh metric {}", metric);
+ result.add(metric);
- SocketAddress upstreamRemoteAddressSocketAddress = upstreamRemoteAddress.getSocketAddress();
- ServiceMetaInfo targetService = find(upstreamRemoteAddressSocketAddress.getAddress());
+ final SocketAddress upstreamRemoteAddressSocketAddress = upstreamRemoteAddress.getSocketAddress();
+ final ServiceMetaInfo targetService = find(upstreamRemoteAddressSocketAddress.getAddress());
- long outboundStartTime = startTime + formatAsLong(properties.getTimeToFirstUpstreamTxByte());
- long outboundEndTime = startTime + formatAsLong(properties.getTimeToLastUpstreamRxByte());
-
- ServiceMeshMetric.Builder outboundMetric = ServiceMeshMetric.newBuilder()
- .setStartTime(outboundStartTime)
- .setEndTime(outboundEndTime)
- .setSourceServiceName(
- ingress.getServiceName())
- .setSourceServiceInstance(
- ingress.getServiceInstanceName())
- .setDestServiceName(
- targetService.getServiceName())
- .setDestServiceInstance(
- targetService.getServiceInstanceName())
- .setEndpoint(endpoint)
- .setLatency(
- (int) (outboundEndTime - outboundStartTime))
- .setResponseCode(
- Math.toIntExact(responseCode))
- .setStatus(status)
- .setProtocol(protocol)
- // Can't parse it from tls properties, leave
- // it to Server side.
- .setTlsMode(NON_TLS)
- .setDetectPoint(DetectPoint.client);
-
- log.debug("Transformed ingress outbound mesh metric {}", outboundMetric);
- forward(outboundMetric);
- }
- }
- }
+ final ServiceMeshMetric.Builder outboundMetric =
+ newAdapter(entry, ingress, targetService)
+ .adaptToUpstreamMetrics()
+ // Can't parse it from tls properties, leave it to Server side.
+ .setTlsMode(NON_TLS);
- @Override
- public Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, Role prev) {
- if (alsIdentifier != null) {
- Node node = alsIdentifier.getNode();
- if (node != null) {
- String id = node.getId();
- if (id.startsWith("router~")) {
- return Role.PROXY;
- } else if (id.startsWith("sidecar~")) {
- return Role.SIDECAR;
- }
- }
- }
+ log.debug("Transformed ingress outbound mesh metric {}", outboundMetric);
+ result.add(outboundMetric);
- return prev;
+ return result;
}
/**
@@ -320,15 +167,4 @@ public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis {
return serviceRegistry.findService(ip);
}
- protected void forward(ServiceMeshMetric.Builder metric) {
- TelemetryDataDispatcher.process(metric);
- }
-
- private long formatAsLong(Timestamp timestamp) {
- return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos()).toEpochMilli();
- }
-
- private long formatAsLong(Duration duration) {
- return Instant.ofEpochSecond(duration.getSeconds(), duration.getNanos()).toEpochMilli();
- }
}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/FieldsHelper.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/FieldsHelper.java
new file mode 100644
index 0000000000000000000000000000000000000000..1ef5c45510242664c82c7eb71393217133897c2e
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/FieldsHelper.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als.mx;
+
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import com.google.common.reflect.Invokable;
+import com.google.common.reflect.TypeToken;
+import com.google.protobuf.Struct;
+import com.google.protobuf.Value;
+import java.io.InputStream;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.library.util.ResourceUtils;
+import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
+import org.yaml.snakeyaml.Yaml;
+
+@Slf4j
+@SuppressWarnings("UnstableApiUsage")
+enum FieldsHelper {
+ SINGLETON;
+
+ private boolean initialized = false;
+
+ /**
+ * The mappings from the field name of {@link ServiceMetaInfo} to the field name of {@code flatbuffers}.
+ */
+ private Map fieldNameMapping;
+
+ /**
+ * The mappings from the field name of {@link ServiceMetaInfo} to its {@code setter}.
+ */
+ private Map> fieldSetterMapping;
+
+ public void init(final String file) throws Exception {
+ init(ResourceUtils.readToStream(file));
+ }
+
+ @SuppressWarnings("unchecked")
+ void init(final InputStream inputStream) throws ModuleStartException {
+ if (initialized) {
+ return;
+ }
+ final Yaml yaml = new Yaml();
+ final Map config = (Map) yaml.load(inputStream);
+
+ fieldNameMapping = new HashMap<>(config.size());
+ fieldSetterMapping = new HashMap<>(config.size());
+
+ for (final Map.Entry entry : config.entrySet()) {
+ final String serviceMetaInfoFieldName = entry.getKey();
+ final String flatBuffersFieldName = entry.getValue();
+
+ final Pattern p = Pattern.compile("(\\$\\{(?.+?)})");
+ final Matcher m = p.matcher(flatBuffersFieldName);
+ final List> flatBuffersFieldNames = new ArrayList<>(m.groupCount());
+ final StringBuffer serviceNamePattern = new StringBuffer();
+ while (m.find()) {
+ final String property = m.group("property");
+ flatBuffersFieldNames.add(Splitter.on('.').omitEmptyStrings().splitToList(property));
+ m.appendReplacement(serviceNamePattern, "%s");
+ }
+
+ fieldNameMapping.put(
+ serviceMetaInfoFieldName,
+ new ServiceNameFormat(serviceNamePattern.toString(), flatBuffersFieldNames)
+ );
+
+ try {
+ final Method setterMethod = ServiceMetaInfo.class.getMethod("set" + StringUtils.capitalize(serviceMetaInfoFieldName), String.class);
+ final Invokable setter = new TypeToken() {
+ }.method(setterMethod);
+ setter.setAccessible(true);
+ fieldSetterMapping.put(serviceMetaInfoFieldName, setter);
+ } catch (final NoSuchMethodException e) {
+ throw new ModuleStartException("Initialize method error", e);
+ }
+ }
+ initialized = true;
+ }
+
+ /**
+ * Inflates the {@code serviceMetaInfo} with the given {@link Struct struct}.
+ *
+ * @param metadata the {@link Struct} metadata from where to retrieve and inflate the {@code serviceMetaInfo}.
+ * @param serviceMetaInfo the {@code serviceMetaInfo} to be inflated.
+ * @throws Exception if failed to inflate the {@code serviceMetaInfo}
+ */
+ public void inflate(final Struct metadata, final ServiceMetaInfo serviceMetaInfo) throws Exception {
+ final Value root = Value.newBuilder().setStructValue(metadata).build();
+ for (final Map.Entry entry : fieldNameMapping.entrySet()) {
+ final ServiceNameFormat serviceNameFormat = entry.getValue();
+ final Object[] values = new String[serviceNameFormat.properties.size()];
+ for (int i = 0; i < serviceNameFormat.properties.size(); i++) {
+ final List properties = serviceNameFormat.properties.get(i);
+ Value value = root;
+ for (final String property : properties) {
+ value = value.getStructValue().getFieldsOrThrow(property);
+ }
+ values[i] = value.getStringValue();
+ }
+ fieldSetterMapping.get(entry.getKey()).invoke(serviceMetaInfo, Strings.lenientFormat(serviceNameFormat.format, values));
+ }
+ }
+
+ @RequiredArgsConstructor
+ private static class ServiceNameFormat {
+ private final String format;
+
+ private final List> properties;
+ }
+}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/MetaExchangeALSHTTPAnalyzer.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/MetaExchangeALSHTTPAnalyzer.java
new file mode 100644
index 0000000000000000000000000000000000000000..796c23e744409577d5dc84dd6640434148a391fc
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/MetaExchangeALSHTTPAnalyzer.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als.mx;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.TextFormat;
+import io.envoyproxy.envoy.data.accesslog.v2.AccessLogCommon;
+import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry;
+import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
+import org.apache.skywalking.oap.server.receiver.envoy.als.AbstractALSAnalyzer;
+import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
+import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
+
+import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.NON_TLS;
+import static org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo.UNKNOWN;
+
+@Slf4j
+public class MetaExchangeALSHTTPAnalyzer extends AbstractALSAnalyzer {
+
+ private static final String UPSTREAM_KEY = "wasm.upstream_peer";
+
+ private static final String DOWNSTREAM_KEY = "wasm.downstream_peer";
+
+ @Override
+ public String name() {
+ return "mx-mesh";
+ }
+
+ @Override
+ public void init(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException {
+ try {
+ FieldsHelper.SINGLETON.init("metadata-service-mapping.yaml");
+ } catch (final Exception e) {
+ throw new ModuleStartException("Failed to load metadata-service-mapping.yaml", e);
+ }
+ }
+
+ @Override
+ public List analysis(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry, Role role) {
+ final AccessLogCommon properties = entry.getCommonProperties();
+ if (properties == null) {
+ return Collections.emptyList();
+ }
+ final Map stateMap = properties.getFilterStateObjectsMap();
+ if (stateMap == null) {
+ return Collections.emptyList();
+ }
+ final ServiceMetaInfo currSvc;
+ try {
+ currSvc = new ServiceMetaInfoAdapter(identifier.getNode().getMetadata());
+ } catch (Exception e) {
+ log.error("Failed to inflate the ServiceMetaInfo from identifier.node.metadata. ", e);
+ return Collections.emptyList();
+ }
+
+ final List result = new ArrayList<>();
+ final AtomicBoolean downstreamExists = new AtomicBoolean();
+ stateMap.forEach((key, value) -> {
+ if (!key.equals(UPSTREAM_KEY) && !key.equals(DOWNSTREAM_KEY)) {
+ return;
+ }
+ final ServiceMetaInfo svc;
+ try {
+ svc = new ServiceMetaInfoAdapter(value);
+ } catch (Exception e) {
+ log.error("Fail to parse metadata {} to FlatNode", Base64.getEncoder().encode(value.toByteArray()));
+ return;
+ }
+ final ServiceMeshMetric.Builder metrics;
+ switch (key) {
+ case UPSTREAM_KEY:
+ metrics = newAdapter(entry, currSvc, svc).adaptToUpstreamMetrics().setTlsMode(NON_TLS);
+ if (log.isDebugEnabled()) {
+ log.debug("Transformed a {} outbound mesh metrics {}", role, TextFormat.shortDebugString(metrics));
+ }
+ result.add(metrics);
+ break;
+ case DOWNSTREAM_KEY:
+ metrics = newAdapter(entry, svc, currSvc).adaptToDownstreamMetrics();
+ if (log.isDebugEnabled()) {
+ log.debug("Transformed a {} inbound mesh metrics {}", role, TextFormat.shortDebugString(metrics));
+ }
+ result.add(metrics);
+ downstreamExists.set(true);
+ break;
+ }
+ });
+ if (role.equals(Role.PROXY) && !downstreamExists.get()) {
+ final ServiceMeshMetric.Builder metric = newAdapter(entry, UNKNOWN, currSvc).adaptToDownstreamMetrics();
+ if (log.isDebugEnabled()) {
+ log.debug("Transformed a {} inbound mesh metric {}", role, TextFormat.shortDebugString(metric));
+ }
+ result.add(metric);
+ }
+ return result;
+ }
+
+}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/ServiceMetaInfoAdapter.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/ServiceMetaInfoAdapter.java
new file mode 100644
index 0000000000000000000000000000000000000000..c8ca1acd771d9f7a963299a34190cf2412405c30
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/ServiceMetaInfoAdapter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als.mx;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.Struct;
+import java.nio.ByteBuffer;
+import java.util.Optional;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
+import Wasm.Common.FlatNode;
+import Wasm.Common.KeyVal;
+
+import static java.util.Objects.nonNull;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Adapter to {@link ServiceMetaInfo} from various of other datastructures.
+ */
+@Slf4j
+@RequiredArgsConstructor
+public class ServiceMetaInfoAdapter extends ServiceMetaInfo {
+
+ /**
+ * Try to adapt a {@link ByteString} to {@link ServiceMetaInfo} instance.
+ *
+ * @param bv the {@link ByteString byte string} to adapt from.
+ * @throws Exception if the {@link ByteString byte string} can not be adapted to a {@link ServiceMetaInfo}.
+ */
+ public ServiceMetaInfoAdapter(final ByteString bv) throws Exception {
+ final ByteBuffer buffer = ByteBuffer.wrap(BytesValue.parseFrom(bv).getValue().toByteArray());
+ final FlatNode flatNode = FlatNode.getRootAsFlatNode(buffer);
+ if (log.isDebugEnabled()) {
+ for (int i = 0; i < flatNode.labelsLength(); i++) {
+ final KeyVal kv = flatNode.labels(i);
+ if (nonNull(kv)) {
+ log.debug("wasm label: {} : {}", kv.key(), kv.value());
+ }
+ }
+ }
+
+ setServiceName(Optional.ofNullable(flatNode.labelsByKey("app")).map(KeyVal::value).orElse("-"));
+ setServiceInstanceName(flatNode.name());
+ }
+
+ /**
+ * The same functionality with {@link ServiceMetaInfoAdapter#ServiceMetaInfoAdapter(com.google.protobuf.ByteString)}.
+ *
+ * @param any {@link Any any object} to adapt from.
+ * @throws Exception if the {@link Any any object} can not be adapted to a {@link ServiceMetaInfo}.
+ */
+ public ServiceMetaInfoAdapter(final Any any) throws Exception {
+ this(any.getValue());
+ }
+
+ /**
+ * The same functionality with {@link ServiceMetaInfoAdapter#ServiceMetaInfoAdapter(com.google.protobuf.ByteString)}.
+ *
+ * @param metadata the {@link Struct struct} to adapt from.
+ * @throws Exception if the {@link Struct struct} can not be adapted to a {@link ServiceMetaInfo}.
+ */
+ public ServiceMetaInfoAdapter(final Struct metadata) throws Exception {
+ FieldsHelper.SINGLETON.inflate(requireNonNull(metadata), this);
+ }
+
+}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis
index 215d7a04962b0266362c194afa96f22a66a05d15..bb5a7d97e26becce8ba81ca2fe5b4f6f9b4d914d 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis
@@ -18,3 +18,4 @@
org.apache.skywalking.oap.server.receiver.envoy.als.k8s.K8sALSServiceMeshHTTPAnalysis
+org.apache.skywalking.oap.server.receiver.envoy.als.mx.MetaExchangeALSHTTPAnalyzer
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sHTTPAnalysisTest.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SALSServiceMeshHTTPAnalysisTest.java
similarity index 76%
rename from oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sHTTPAnalysisTest.java
rename to oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SALSServiceMeshHTTPAnalysisTest.java
index 7ad64a506039cacdf6114c71139769f907240be7..718996c2b7a5d84dd91bc66fe0f872b17a5dfc09 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sHTTPAnalysisTest.java
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SALSServiceMeshHTTPAnalysisTest.java
@@ -23,10 +23,10 @@ import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
-import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.apm.network.common.v3.DetectPoint;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
import org.apache.skywalking.oap.server.receiver.envoy.MetricServiceGRPCHandlerTestMain;
import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
@@ -39,14 +39,14 @@ import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class K8sHTTPAnalysisTest {
+public class K8SALSServiceMeshHTTPAnalysisTest {
- private MockK8sAnalysis analysis;
+ private MockK8SAnalysis analysis;
@Before
public void setUp() {
- analysis = new MockK8sAnalysis();
- analysis.init(null);
+ analysis = new MockK8SAnalysis();
+ analysis.init(null, null);
}
@Test
@@ -77,16 +77,16 @@ public class K8sHTTPAnalysisTest {
StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
JsonFormat.parser().merge(isr, requestBuilder);
- analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.PROXY);
+ List result = this.analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.PROXY);
- Assert.assertEquals(2, analysis.metrics.size());
+ Assert.assertEquals(2, result.size());
- ServiceMeshMetric.Builder incoming = analysis.metrics.get(0);
+ ServiceMeshMetric.Builder incoming = result.get(0);
Assert.assertEquals("UNKNOWN", incoming.getSourceServiceName());
Assert.assertEquals("ingress", incoming.getDestServiceName());
Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint());
- ServiceMeshMetric.Builder outgoing = analysis.metrics.get(1);
+ ServiceMeshMetric.Builder outgoing = result.get(1);
Assert.assertEquals("ingress", outgoing.getSourceServiceName());
Assert.assertEquals("productpage", outgoing.getDestServiceName());
Assert.assertEquals(DetectPoint.client, outgoing.getDetectPoint());
@@ -99,12 +99,11 @@ public class K8sHTTPAnalysisTest {
StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
JsonFormat.parser().merge(isr, requestBuilder);
- analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs()
- .getLogEntry(0), Role.SIDECAR);
+ List result = this.analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
- Assert.assertEquals(1, analysis.metrics.size());
+ Assert.assertEquals(1, result.size());
- ServiceMeshMetric.Builder incoming = analysis.metrics.get(0);
+ ServiceMeshMetric.Builder incoming = result.get(0);
Assert.assertEquals("", incoming.getSourceServiceName());
Assert.assertEquals("productpage", incoming.getDestServiceName());
Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint());
@@ -117,12 +116,11 @@ public class K8sHTTPAnalysisTest {
StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
JsonFormat.parser().merge(isr, requestBuilder);
- analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs()
- .getLogEntry(0), Role.SIDECAR);
+ List result = this.analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
- Assert.assertEquals(1, analysis.metrics.size());
+ Assert.assertEquals(1, result.size());
- ServiceMeshMetric.Builder incoming = analysis.metrics.get(0);
+ ServiceMeshMetric.Builder incoming = result.get(0);
Assert.assertEquals("productpage", incoming.getSourceServiceName());
Assert.assertEquals("review", incoming.getDestServiceName());
Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint());
@@ -135,38 +133,32 @@ public class K8sHTTPAnalysisTest {
StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
JsonFormat.parser().merge(isr, requestBuilder);
- analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs()
- .getLogEntry(0), Role.SIDECAR);
+ List result = this.analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
- Assert.assertEquals(1, analysis.metrics.size());
+ Assert.assertEquals(1, result.size());
- ServiceMeshMetric.Builder incoming = analysis.metrics.get(0);
+ ServiceMeshMetric.Builder incoming = result.get(0);
Assert.assertEquals("productpage", incoming.getSourceServiceName());
Assert.assertEquals("detail", incoming.getDestServiceName());
Assert.assertEquals(DetectPoint.client, incoming.getDetectPoint());
}
}
- public static class MockK8sAnalysis extends K8sALSServiceMeshHTTPAnalysis {
- private List metrics = new ArrayList<>();
+ public static class MockK8SAnalysis extends K8sALSServiceMeshHTTPAnalysis {
@Override
- public void init(EnvoyMetricReceiverConfig config) {
+ public void init(ModuleManager manager, EnvoyMetricReceiverConfig config) {
serviceRegistry = mock(K8SServiceRegistry.class);
when(serviceRegistry.findService(anyString())).thenReturn(ServiceMetaInfo.UNKNOWN);
when(serviceRegistry.findService("10.44.2.56")).thenReturn(new ServiceMetaInfo("ingress", "ingress-Inst"));
when(serviceRegistry.findService("10.44.2.54")).thenReturn(new ServiceMetaInfo("productpage", "productpage-Inst"));
when(serviceRegistry.findService("10.44.6.66")).thenReturn(new ServiceMetaInfo("detail", "detail-Inst"));
- when(serviceRegistry.findService("10.44.2.55")).thenReturn(new ServiceMetaInfo("review", "detail-Inst"));
+ when(serviceRegistry.findService("10.44.2.55")).thenReturn(new ServiceMetaInfo("review", "review-Inst"));
}
- @Override
- protected void forward(ServiceMeshMetric.Builder metric) {
- metrics.add(metric);
- }
}
- private static InputStream getResourceAsStream(final String resource) {
+ public static InputStream getResourceAsStream(final String resource) {
final InputStream in = getContextClassLoader().getResourceAsStream(resource);
return in == null ? MetricServiceGRPCHandlerTestMain.class.getResourceAsStream(resource) : in;
}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/FieldsHelperTest.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/FieldsHelperTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..35e91f858380a163d0fa519363831fbcd592f41a
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/FieldsHelperTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als.mx;
+
+import com.google.protobuf.util.JsonFormat;
+import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage;
+import java.io.ByteArrayInputStream;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.powermock.reflect.Whitebox;
+
+import static org.apache.skywalking.oap.server.receiver.envoy.als.k8s.K8SALSServiceMeshHTTPAnalysisTest.getResourceAsStream;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+@RunWith(Parameterized.class)
+public class FieldsHelperTest {
+
+ @Parameterized.Parameter()
+ public String mapping;
+
+ @Parameterized.Parameter(1)
+ public String expectedServiceName;
+
+ @Parameterized.Parameter(2)
+ public String expectedServiceInstanceName;
+
+ @Parameterized.Parameters(name = "{index}: {0}")
+ public static Collection data() {
+ return Arrays.asList(new Object[][] {
+ {
+ "serviceName: ${LABELS.app}\nserviceInstanceName: ${NAME}",
+ "productpage",
+ "productpage-v1-65576bb7bf-4mzsp"
+ },
+ {
+ "serviceName: ${LABELS.app}-${LABELS.version}\nserviceInstanceName: ${NAME}.${NAMESPACE}",
+ "productpage-v1",
+ "productpage-v1-65576bb7bf-4mzsp.default"
+ },
+ {
+ "serviceName: ${LABELS.app}-${CLUSTER_ID}\nserviceInstanceName: ${NAME}.${NAMESPACE}.${SERVICE_ACCOUNT}",
+ "productpage-Kubernetes",
+ "productpage-v1-65576bb7bf-4mzsp.default.bookinfo-productpage"
+ },
+ {
+ "serviceName: fixed-${LABELS.app}\nserviceInstanceName: yeah_${NAME}",
+ "fixed-productpage",
+ "yeah_productpage-v1-65576bb7bf-4mzsp"
+ }
+ });
+ }
+
+ @Before
+ public void setUp() {
+ Whitebox.setInternalState(FieldsHelper.SINGLETON, "initialized", false);
+ }
+
+ @Test
+ public void testFormat() throws Exception {
+ try (final InputStreamReader isr = new InputStreamReader(getResourceAsStream("field-helper.msg"))) {
+ final StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
+ JsonFormat.parser().merge(isr, requestBuilder);
+ final ServiceMetaInfo info = new ServiceMetaInfo();
+ FieldsHelper.SINGLETON.init(new ByteArrayInputStream(mapping.getBytes()));
+ FieldsHelper.SINGLETON.inflate(
+ requestBuilder.getIdentifier().getNode().getMetadata(),
+ info
+ );
+ assertThat(info.getServiceName(), equalTo(expectedServiceName));
+ }
+ }
+}
diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/field-helper.msg b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/field-helper.msg
new file mode 100644
index 0000000000000000000000000000000000000000..9d589e8453a66428a182bd4aebf7fe3544bb0920
--- /dev/null
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/field-helper.msg
@@ -0,0 +1,90 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+ "identifier": {
+ "node": {
+ "id": "sidecar~172.18.0.8~productpage-v1-65576bb7bf-4mzsp.default~default.svc.cluster.local",
+ "cluster": "productpage.default",
+ "metadata": {
+ "PROXY_CONFIG": {
+ "concurrency": 2.0,
+ "envoyAccessLogService": {
+ "address": "0.tcp.ngrok.io:13760"
+ },
+ "statNameLength": 189.0,
+ "configPath": "./etc/istio/proxy",
+ "parentShutdownDuration": "60s",
+ "proxyAdminPort": 15000.0,
+ "controlPlaneAuthPolicy": "MUTUAL_TLS",
+ "drainDuration": "45s",
+ "proxyMetadata": {
+ "DNS_AGENT": ""
+ },
+ "terminationDrainDuration": "5s",
+ "tracing": {
+ "zipkin": {
+ "address": "zipkin.istio-system:9411"
+ }
+ },
+ "statusPort": 15020.0,
+ "serviceCluster": "productpage.default",
+ "envoyMetricsService": {
+ },
+ "discoveryAddress": "istiod.istio-system.svc:15012",
+ "binaryPath": "/usr/local/bin/envoy"
+ },
+ "PLATFORM_METADATA": {
+ "gcp_location": "us-central1-a",
+ "gcp_gce_instance_id": "2148869885222929334",
+ "gcp_gce_instance": "zhenxu-test",
+ "gcp_project_number": "191872121544",
+ "gcp_project": "skywalking-live-demo"
+ },
+ "CLUSTER_ID": "Kubernetes",
+ "APP_CONTAINERS": "productpage",
+ "LABELS": {
+ "service.istio.io/canonical-name": "productpage",
+ "version": "v1",
+ "security.istio.io/tlsMode": "istio",
+ "app": "productpage",
+ "service.istio.io/canonical-revision": "v1",
+ "pod-template-hash": "65576bb7bf",
+ "istio.io/rev": "default"
+ },
+ "ISTIO_PROXY_SHA": "istio-proxy:262253d9d066f8ef7ed82fd175c28b8f95acbec0",
+ "NAME": "productpage-v1-65576bb7bf-4mzsp",
+ "NAMESPACE": "default",
+ "EXCHANGE_KEYS": "NAME,NAMESPACE,INSTANCE_IPS,LABELS,OWNER,PLATFORM_METADATA,WORKLOAD_NAME,MESH_ID,SERVICE_ACCOUNT,CLUSTER_ID",
+ "INSTANCE_IPS": "172.18.0.8",
+ "POD_PORTS": "[{\"containerPort\":9080,\"protocol\":\"TCP\"}]",
+ "INTERCEPTION_MODE": "REDIRECT",
+ "SERVICE_ACCOUNT": "bookinfo-productpage",
+ "MESH_ID": "cluster.local",
+ "SDS": "true",
+ "WORKLOAD_NAME": "productpage-v1",
+ "OWNER": "kubernetes://apis/apps/v1/namespaces/default/deployments/productpage-v1",
+ "ISTIO_VERSION": "1.7.1"
+ },
+ "locality": {
+ "region": "us-central1",
+ "zone": "us-central1-a"
+ },
+ "buildVersion": "262253d9d066f8ef7ed82fd175c28b8f95acbec0/1.15.0/Clean/RELEASE/BoringSSL"
+ },
+ "logName": "http_envoy_accesslog"
+ }
+}
diff --git a/oap-server/server-receiver-plugin/receiver-proto/pom.xml b/oap-server/server-receiver-plugin/receiver-proto/pom.xml
index e7ca3bf7c41431a4ef6ad2ff87f58c6495d6868c..117bb1d42443e04c73a40c852d27a3c78cab4633 100644
--- a/oap-server/server-receiver-plugin/receiver-proto/pom.xml
+++ b/oap-server/server-receiver-plugin/receiver-proto/pom.xml
@@ -17,7 +17,8 @@
~
-->
-
+
server-receiver-plugin
org.apache.skywalking
@@ -28,9 +29,45 @@
receiver-proto
jar
+
+ ${basedir}/src/main/fbs
+ ${project.build.directory}/generated-sources/fbs/java
+ ${project.build.directory}/bin/flatc
+ tar.gz
+
+
+
+
+ com.google.flatbuffers
+ flatbuffers-java
+ provided
+
+
+
+
+
+ windows
+
+
+ Windows
+
+
+
+ zip
+
+
+
+
+
+
+ kr.motd.maven
+ os-maven-plugin
+ ${os-maven-plugin.version}
+
+
-
+
kr.motd.maven
os-maven-plugin
${os-maven-plugin.version}
@@ -53,10 +90,12 @@
protobuf-java directly, you will be transitively depending on the
protobuf-java version that grpc depends on.
-->
- com.google.protobuf:protoc:${com.google.protobuf.protoc.version}:exe:${os.detected.classifier}
+
+ com.google.protobuf:protoc:${com.google.protobuf.protoc.version}:exe:${os.detected.classifier}
grpc-java
- io.grpc:protoc-gen-grpc-java:${protoc-gen-grpc-java.plugin.version}:exe:${os.detected.classifier}
+
+ io.grpc:protoc-gen-grpc-java:${protoc-gen-grpc-java.plugin.version}:exe:${os.detected.classifier}
@@ -68,6 +107,77 @@
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+ ${maven-dependency-plugin.version}
+
+
+ unpack
+ generate-sources
+
+ unpack
+
+
+
+
+ com.github.davidmoten
+ flatbuffers-compiler
+ 1.12.0.1
+ ${fbs.compiler.artifact.type}
+ distribution-${os.detected.name}
+ true
+ ${project.build.directory}
+
+
+
+
+
+
+
+ org.codehaus.mojo
+ exec-maven-plugin
+ ${exec-maven-plugin.version}
+
+
+
+ exec
+
+ generate-sources
+
+ ${fbs.compiler}
+ ${fbs.sources}
+
+ --java
+ --gen-mutable
+ -o
+ ${fbs.generated.sources}
+ istio/node-info.fbs
+
+
+
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+ ${build-helper-maven-plugin.version}
+
+
+ add-source
+ generate-sources
+
+ add-source
+
+
+
+ ${fbs.generated.sources}
+
+
+
+
+
-
\ No newline at end of file
+
diff --git a/oap-server/server-receiver-plugin/receiver-proto/src/main/fbs/istio/node-info.fbs b/oap-server/server-receiver-plugin/receiver-proto/src/main/fbs/istio/node-info.fbs
new file mode 100644
index 0000000000000000000000000000000000000000..8b33c3a22404b348d8f080a9a55e4abcb2dafd15
--- /dev/null
+++ b/oap-server/server-receiver-plugin/receiver-proto/src/main/fbs/istio/node-info.fbs
@@ -0,0 +1,51 @@
+/* Copyright 2020 Istio Authors. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Original File Location:
+// https://github.com/istio/proxy/blob/bcdc1684df0839a612526f688ff7b475902f2feb/extensions/common/node_info.fbs
+
+namespace Wasm.Common;
+
+table KeyVal {
+ key:string (key);
+ value:string;
+}
+
+// NodeInfo represents the information extracted from proxy node metadata.
+table FlatNode {
+ // Name of the node. e.g. in k8s, name is the pod name.
+ name:string;
+ // Namespace that the node runs in.
+ namespace:string;
+ // K8s or vm workload attributes.
+ labels:[KeyVal];
+ owner:string;
+ workload_name:string;
+ // Platform metadata uses prefixed keys
+ // GCP uses gcp_* keys
+ platform_metadata:[KeyVal];
+ // Version identifier for the proxy.
+ istio_version:string;
+ // Unique identifier for the mesh. Taken from global mesh id parameter (or
+ // the configured trust domain when not specified).
+ mesh_id:string;
+ // List of short names for application containers that are using this proxy.
+ // This is only used for kubernetes, and is populated by the sidecar injector.
+ app_containers:[string];
+ // Identifier for the cluster to which this workload belongs (for k8s workloads).
+ cluster_id:string;
+}
+
+root_type FlatNode;
diff --git a/pom.xml b/pom.xml
index ad73b569bff1f52a5a05e37e3032af37d45db5da..815d3d2e53c6727f0c6927848b6cf6ae3b1456cc 100755
--- a/pom.xml
+++ b/pom.xml
@@ -207,9 +207,11 @@
0.6.1
1.6.0
1.8
+ 2.10
2.8.2
3.1.0
2.22.0
+ 3.2.0
2.22.0
3.1.0
3.1.1
@@ -225,6 +227,7 @@
1.5
2.7
true
+
@@ -498,6 +501,7 @@
skywalking-ui/package-lock.json
+ **/src/main/fbs/istio/**
**/src/main/proto/envoy/**
**/src/main/proto/gogoproto/gogo.proto
**/src/main/proto/google/**
diff --git a/test/e2e/e2e-common/src/main/java/org/apache/skywalking/e2e/retryable/RetryableTest.java b/test/e2e/e2e-common/src/main/java/org/apache/skywalking/e2e/retryable/RetryableTest.java
index f6218c8df9d30b9c52ad2f3392ef79e7df2a25ac..f6fdbb9358e1ef2c07f2ed6557df3481fbeb1050 100644
--- a/test/e2e/e2e-common/src/main/java/org/apache/skywalking/e2e/retryable/RetryableTest.java
+++ b/test/e2e/e2e-common/src/main/java/org/apache/skywalking/e2e/retryable/RetryableTest.java
@@ -49,7 +49,7 @@ public @interface RetryableTest {
/**
* @return maximum times to retry, or -1 for infinite retries. {@code -1} by default.
*/
- int value() default 120;
+ int value() default 60;
/**
* @return the interval between any two retries, in millisecond. {@code 1000} by default.
diff --git a/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/mesh/IDManager.java b/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/mesh/IDManager.java
deleted file mode 100644
index ce2fb0c15edc3049c48e57c1449c35535e6d1808..0000000000000000000000000000000000000000
--- a/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/mesh/IDManager.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.e2e.mesh;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Base64;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-public class IDManager {
- public static class ServiceID {
-
- public static ServiceIDDefinition analysisId(String id) {
- final String[] strings = id.split("\\.");
- if (strings.length != 2) {
- throw new RuntimeException("Can't split service id into 2 parts, " + id);
- }
- return new ServiceIDDefinition(
- decode(strings[0]),
- Integer.parseInt(strings[1]) == 1
- );
- }
-
- @RequiredArgsConstructor
- @Getter
- @EqualsAndHashCode
- public static class ServiceIDDefinition {
- private final String name;
-
- private final boolean isReal;
- }
- }
-
- /**
- * @param base64text Base64 encoded UTF-8 string
- * @return normal literal string
- */
- private static String decode(String base64text) {
- return new String(Base64.getDecoder().decode(base64text), StandardCharsets.UTF_8);
- }
-}
diff --git a/test/e2e/e2e-test/src/test/resources/expected/als/topo.yml b/test/e2e/e2e-test/src/test/resources/expected/als/topo.yml
index 0d5ee72907fcfeb10d280e3a8ea74190bdb81840..b60dcc055014b172c42cbb0fbcb13879ccb6bf18 100644
--- a/test/e2e/e2e-test/src/test/resources/expected/als/topo.yml
+++ b/test/e2e/e2e-test/src/test/resources/expected/als/topo.yml
@@ -47,6 +47,7 @@ calls:
target: cHJvZHVjdHBhZ2U=.1
detectPoints:
- CLIENT
+ - SERVER
- id: not null
source: cHJvZHVjdHBhZ2U=.1
target: cmV2aWV3cw==.1
diff --git a/tools/dependencies/known-oap-backend-dependencies-es7.txt b/tools/dependencies/known-oap-backend-dependencies-es7.txt
index cbd448fd513ae44ef4f4bd0cbb7db74b634a7b9a..ea82e6d4420e7681512ad9e27e9fd4a139d790cf 100755
--- a/tools/dependencies/known-oap-backend-dependencies-es7.txt
+++ b/tools/dependencies/known-oap-backend-dependencies-es7.txt
@@ -44,6 +44,7 @@ rank-eval-client-7.5.0.jar
error_prone_annotations-2.3.2.jar
etcd4j-2.17.0.jar
failureaccess-1.0.1.jar
+flatbuffers-java-1.12.0.jar
freemarker-2.3.28.jar
graphql-java-8.0.jar
graphql-java-tools-5.2.3.jar
diff --git a/tools/dependencies/known-oap-backend-dependencies.txt b/tools/dependencies/known-oap-backend-dependencies.txt
index 9c37c3113885b3254c7ad37789987a0b94d5edf0..12041d54bd1970bc75f86c240c6b7e80197a5301 100755
--- a/tools/dependencies/known-oap-backend-dependencies.txt
+++ b/tools/dependencies/known-oap-backend-dependencies.txt
@@ -41,6 +41,7 @@ elasticsearch-x-content-6.3.2.jar
error_prone_annotations-2.3.2.jar
etcd4j-2.17.0.jar
failureaccess-1.0.1.jar
+flatbuffers-java-1.12.0.jar
freemarker-2.3.28.jar
graphql-java-8.0.jar
graphql-java-tools-5.2.3.jar