From 92bb474c2d2dc196e96f635709acac287daae953 Mon Sep 17 00:00:00 2001 From: kezhenxu94 Date: Thu, 29 Oct 2020 18:22:38 +0800 Subject: [PATCH] Improve Kubernetes service registry for ALS analysis (#5722) The current implementation of envoy ALS K8S analysis is based on the hierarchy, pod -> StatefulSet -> deployment, StatefulSet, or others. It's freaky and different from the Istio Kubernetes registry. The new path is pod -> endpoint -> service, and we should leverage Informer API instead of raw Kubernetes API. --- .github/workflows/e2e.istio.yaml | 11 +- CHANGES.md | 5 +- dist-material/release-docs/LICENSE | 1 + docker/oap/log4j2.xml | 1 + .../setup/backend/configuration-vocabulary.md | 1 + oap-server/pom.xml | 17 + .../src/main/resources/application.yml | 5 + .../server-library/library-util/pom.xml | 4 - .../envoy-metrics-receiver-plugin/pom.xml | 8 +- .../envoy/EnvoyMetricReceiverConfig.java | 4 +- .../envoy/als/DependencyResource.java | 62 --- .../server/receiver/envoy/als/Fetcher.java | 47 -- .../als/K8sALSServiceMeshHTTPAnalysis.java | 462 ------------------ .../envoy/als/k8s/K8SServiceRegistry.java | 302 ++++++++++++ .../k8s/K8sALSServiceMeshHTTPAnalysis.java | 334 +++++++++++++ .../envoy/als/k8s/ServiceNameFormatter.java | 61 +++ ....server.receiver.envoy.als.ALSHTTPAnalysis | 2 +- .../envoy/als/DependencyResourceTest.java | 118 ----- .../als/{ => k8s}/K8sHTTPAnalysisTest.java | 21 +- .../als/k8s/ServiceNameFormatterTest.java | 105 ++++ test/e2e-mesh/e2e-istio/scripts/istio.sh | 3 +- .../e2e/retryable/RetryableTest.java | 4 +- .../apache/skywalking/e2e/mesh/ALSE2E.java | 18 +- .../expected/als/instances-reviews.yml | 22 + .../test/resources/expected/als/services.yml | 12 +- .../src/test/resources/expected/als/topo.yml | 56 +-- .../known-oap-backend-dependencies-es7.txt | 1 + .../known-oap-backend-dependencies.txt | 1 + 28 files changed, 920 insertions(+), 768 deletions(-) delete mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResource.java delete mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/Fetcher.java delete mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/ServiceNameFormatter.java delete mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResourceTest.java rename oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/{ => k8s}/K8sHTTPAnalysisTest.java (88%) create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/ServiceNameFormatterTest.java create mode 100644 test/e2e/e2e-test/src/test/resources/expected/als/instances-reviews.yml diff --git a/.github/workflows/e2e.istio.yaml b/.github/workflows/e2e.istio.yaml index 28db870268..cb785159ab 100644 --- a/.github/workflows/e2e.istio.yaml +++ b/.github/workflows/e2e.istio.yaml @@ -28,7 +28,7 @@ env: ISTIO_VERSION: 1.7.1 TAG: ${{ github.sha }} SCRIPTS_DIR: test/e2e-mesh/e2e-istio/scripts - SW_OAP_BASE_IMAGE: openjdk:8-jre-alpine + SW_OAP_BASE_IMAGE: openjdk:11-jdk jobs: als: @@ -62,17 +62,22 @@ jobs: run: | git clone https://github.com/apache/skywalking-kubernetes.git cd skywalking-kubernetes - git reset --hard 419cd1aed8bb4ad972208e5a031527a25d2ae690 + git reset --hard dd749f25913830c47a97430618cefc4167612e75 cd chart helm dep up skywalking helm -n istio-system install skywalking skywalking \ --set fullnameOverride=skywalking \ --set elasticsearch.replicas=1 \ + --set elasticsearch.minimumMasterNodes=1 \ + --set elasticsearch.imageTag=7.5.1 \ --set oap.env.SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS=k8s-mesh \ --set oap.envoy.als.enabled=true \ --set oap.replicas=1 \ + --set ui.image.repository=skywalking/ui \ + --set ui.image.tag=$TAG \ --set oap.image.tag=$TAG \ - --set oap.image.repository=skywalking/oap + --set oap.image.repository=skywalking/oap \ + --set oap.storageType=elasticsearch7 kubectl -n istio-system get pods sleep 3 diff --git a/CHANGES.md b/CHANGES.md index 6dd064aeec..e9a0e0addb 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -7,11 +7,12 @@ Release Notes. #### Project #### Java Agent -Make the Feign plugin to support Java 14 -Make the okhttp3 plugin to support Java 14 +* Make the Feign plugin to support Java 14 +* Make the okhttp3 plugin to support Java 14 #### OAP-Backend * Add the `@SuperDataset` annotation for BrowserErrorLog. +* Improve Kubernetes service registry for ALS analysis. #### UI diff --git a/dist-material/release-docs/LICENSE b/dist-material/release-docs/LICENSE index a2628e6d1e..cec10ba0e7 100755 --- a/dist-material/release-docs/LICENSE +++ b/dist-material/release-docs/LICENSE @@ -256,6 +256,7 @@ The text of each license is the standard Apache 2.0 license. Apache: commons-pool 2.4.2: https://github.com/apache/commons-pool, Apache 2.0 Apache: commons-lang 3.6: https://github.com/apache/commons-lang, Apache 2.0 Apache: commons-text 1.8: https://github.com/apache/commons-text, Apache 2.0 + Apache: commons-beanutils 1.9.4: https://github.com/apache/commons-beanutils, Apache 2.0 Apache: lucene 6.6.0: https://github.com/apache/lucene-solr/tree/master/lucene, Apache 2.0 Apache: httpasyncclient 4.1.2: https://github.com/apache/httpasyncclient/tree/4.1.2, Apache 2.0 Apache: log4j 1.2.16: http://logging.apache.org/log4j/1.2/, Apache 2.0 diff --git a/docker/oap/log4j2.xml b/docker/oap/log4j2.xml index eb69a897dc..fadfaa1a21 100644 --- a/docker/oap/log4j2.xml +++ b/docker/oap/log4j2.xml @@ -29,6 +29,7 @@ + diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md index a28340fea7..2562fb7be0 100644 --- a/docs/en/setup/backend/configuration-vocabulary.md +++ b/docs/en/setup/backend/configuration-vocabulary.md @@ -168,6 +168,7 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode | envoy-metric| default| Read [receiver doc](backend-receivers.md) for more details | - | - | | - | - | acceptMetricsService | Open Envoy Metrics Service analysis | SW_ENVOY_METRIC_SERVICE | true| | - | - | alsHTTPAnalysis | Open Envoy Access Log Service analysis. Value = `k8s-mesh` means open the analysis | SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS | - | +| - | - | k8sServiceNameRule | `k8sServiceNameRule` allows you to customize the service name in ALS via Kubernetes metadata, the available variables are `pod`, `service`, e.g., you can use `${service.metadata.name}-${pod.metadata.labels.version}` to append the version number to the service name. Be careful, when using environment variables to pass this configuration, use single quotes(`''`) to avoid it being evaluated by the shell. | - | | receiver-oc | default | Read [receiver doc](backend-receivers.md) for more details | - | - | | - | - | gRPCHost|Binding IP of gRPC service. Services include gRPC data report and internal communication among OAP nodes| SW_OC_RECEIVER_GRPC_HOST | - | | - | - | gRPCPort| Binding port of gRPC service | SW_OC_RECEIVER_GRPC_PORT | - | diff --git a/oap-server/pom.xml b/oap-server/pom.xml index b8ab2e097b..90185e9c76 100755 --- a/oap-server/pom.xml +++ b/oap-server/pom.xml @@ -99,6 +99,7 @@ 3.5 2.4.1 2.4.6.RELEASE + 1.9.4 @@ -537,6 +538,22 @@ mvel2 ${mvel.version} + + + commons-beanutils + commons-beanutils + ${commons-beanutils.version} + + + commons-collections + commons-collections + + + commons-logging + commons-logging + + + diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml index dcc4227ab7..eaa2e4adbb 100755 --- a/oap-server/server-bootstrap/src/main/resources/application.yml +++ b/oap-server/server-bootstrap/src/main/resources/application.yml @@ -247,6 +247,11 @@ envoy-metric: default: acceptMetricsService: ${SW_ENVOY_METRIC_SERVICE:true} alsHTTPAnalysis: ${SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS:""} + # `k8sServiceNameRule` allows you to customize the service name in ALS via Kubernetes metadata, + # the available variables are `pod`, `service`, f.e., you can use `${service.metadata.name}-${pod.metadata.labels.version}` + # to append the version number to the service name. + # Be careful, when using environment variables to pass this configuration, use single quotes(`''`) to avoid it being evaluated by the shell. + k8sServiceNameRule: ${K8S_SERVICE_NAME_RULE:"${service.metadata.name}"} prometheus-fetcher: selector: ${SW_PROMETHEUS_FETCHER:default} diff --git a/oap-server/server-library/library-util/pom.xml b/oap-server/server-library/library-util/pom.xml index 5a87917b11..47b720f93f 100644 --- a/oap-server/server-library/library-util/pom.xml +++ b/oap-server/server-library/library-util/pom.xml @@ -37,10 +37,6 @@ joda-time joda-time - - com.google.guava - guava - org.yaml snakeyaml diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml index b94cae1863..5e2428fd0d 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml @@ -43,6 +43,12 @@ io.kubernetes client-java + + + commons-beanutils + commons-beanutils + + com.google.protobuf protobuf-java-util @@ -56,4 +62,4 @@ provided - \ No newline at end of file + diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java index 67e1071dab..5819d8093e 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java @@ -30,10 +30,12 @@ public class EnvoyMetricReceiverConfig extends ModuleConfig { @Getter private boolean acceptMetricsService = false; private String alsHTTPAnalysis; + @Getter + private String k8sServiceNameRule; public List getAlsHTTPAnalysis() { if (Strings.isNullOrEmpty(alsHTTPAnalysis)) { - return Collections.EMPTY_LIST; + return Collections.emptyList(); } return Arrays.stream(alsHTTPAnalysis.trim().split(",")).map(String::trim).collect(Collectors.toList()); } diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResource.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResource.java deleted file mode 100644 index 39b076f2ff..0000000000 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResource.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * - */ - -package org.apache.skywalking.oap.server.receiver.envoy.als; - -import io.kubernetes.client.openapi.models.V1ObjectMeta; -import io.kubernetes.client.openapi.models.V1OwnerReference; -import java.util.Optional; -import lombok.AccessLevel; -import lombok.Getter; -import lombok.RequiredArgsConstructor; - -@RequiredArgsConstructor -class DependencyResource { - @Getter(AccessLevel.PACKAGE) - private final V1ObjectMeta metadata; - - private boolean stop; - - DependencyResource getOwnerResource(final String kind, final Fetcher transform) { - if (stop) { - return this; - } - if (metadata.getOwnerReferences() == null) { - stop = true; - return this; - } - V1OwnerReference ownerReference = null; - for (V1OwnerReference each : metadata.getOwnerReferences()) { - if (each.getKind().equals(kind)) { - ownerReference = each; - break; - } - } - if (ownerReference == null) { - stop = true; - return this; - } - Optional metaOptional = transform.apply(ownerReference); - if (!metaOptional.isPresent()) { - stop = true; - return this; - } - return new DependencyResource(metaOptional.get()); - } -} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/Fetcher.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/Fetcher.java deleted file mode 100644 index 94a27d00af..0000000000 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/Fetcher.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * - */ - -package org.apache.skywalking.oap.server.receiver.envoy.als; - -import io.kubernetes.client.openapi.ApiException; -import io.kubernetes.client.openapi.models.V1ObjectMeta; -import io.kubernetes.client.openapi.models.V1OwnerReference; -import java.util.Optional; -import java.util.function.Function; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -interface Fetcher extends Function> { - - Logger LOGGER = LoggerFactory.getLogger(Fetcher.class); - - V1ObjectMeta go(V1OwnerReference ownerReference) throws ApiException; - - default Optional apply(V1OwnerReference ownerReference) { - try { - return Optional.ofNullable(go(ownerReference)); - } catch (final ApiException e) { - LOGGER.error("code:{} header:{} body:{}", e.getCode(), e.getResponseHeaders(), e.getResponseBody()); - return Optional.empty(); - } catch (final Throwable th) { - LOGGER.error("other errors", th); - return Optional.empty(); - } - } -} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java deleted file mode 100644 index 6d1e3dc61c..0000000000 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java +++ /dev/null @@ -1,462 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.oap.server.receiver.envoy.als; - -import com.google.common.base.Strings; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.protobuf.Duration; -import com.google.protobuf.Timestamp; -import io.envoyproxy.envoy.api.v2.core.Address; -import io.envoyproxy.envoy.api.v2.core.Node; -import io.envoyproxy.envoy.api.v2.core.SocketAddress; -import io.envoyproxy.envoy.data.accesslog.v2.AccessLogCommon; -import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry; -import io.envoyproxy.envoy.data.accesslog.v2.HTTPRequestProperties; -import io.envoyproxy.envoy.data.accesslog.v2.HTTPResponseProperties; -import io.envoyproxy.envoy.data.accesslog.v2.TLSProperties; -import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage; -import io.kubernetes.client.openapi.ApiClient; -import io.kubernetes.client.openapi.apis.CoreV1Api; -import io.kubernetes.client.openapi.apis.ExtensionsV1beta1Api; -import io.kubernetes.client.openapi.models.V1ObjectMeta; -import io.kubernetes.client.openapi.models.V1OwnerReference; -import io.kubernetes.client.openapi.models.V1Pod; -import io.kubernetes.client.openapi.models.V1PodList; -import io.kubernetes.client.util.Config; -import java.time.Instant; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import lombok.AccessLevel; -import lombok.Getter; -import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher; -import org.apache.skywalking.apm.network.common.v3.DetectPoint; -import org.apache.skywalking.apm.network.servicemesh.v3.Protocol; -import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; -import org.apache.skywalking.oap.server.core.source.Source; -import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Analysis log based on ingress and mesh scenarios. - */ -public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis { - private static final Logger LOGGER = LoggerFactory.getLogger(K8sALSServiceMeshHTTPAnalysis.class); - - private static final String VALID_PHASE = "Running"; - - private static final String NON_TLS = "NONE"; - - private static final String M_TLS = "mTLS"; - - private static final String TLS = "TLS"; - - @Getter(AccessLevel.PROTECTED) - private final AtomicReference> ipServiceMap = new AtomicReference<>(); - - private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool( - 1, new ThreadFactoryBuilder() - .setNameFormat("load-pod-%d") - .setDaemon(true) - .build()); - - @Override - public String name() { - return "k8s-mesh"; - } - - @Override - public void init(EnvoyMetricReceiverConfig config) { - executorService.scheduleAtFixedRate(this::loadPodInfo, 0, 15, TimeUnit.SECONDS); - } - - private boolean invalidPodList() { - Map map = ipServiceMap.get(); - return map == null || map.isEmpty(); - } - - private void loadPodInfo() { - try { - ApiClient client = Config.defaultClient(); - CoreV1Api api = new CoreV1Api(client); - - V1PodList list = api.listPodForAllNamespaces(null, null, null, null, null, null, null, null, null); - Map ipMap = new HashMap<>(list.getItems().size()); - long startTime = System.nanoTime(); - for (V1Pod item : list.getItems()) { - if (!item.getStatus().getPhase().equals(VALID_PHASE)) { - LOGGER.debug("Invalid pod {} is not in a valid phase {}", item.getMetadata() - .getName(), item.getStatus() - .getPhase()); - continue; - } - if (item.getStatus().getPodIP().equals(item.getStatus().getHostIP())) { - LOGGER.debug( - "Pod {}.{} is removed because hostIP and podIP are identical ", item.getMetadata() - .getName(), - item.getMetadata() - .getNamespace() - ); - continue; - } - ipMap.put(item.getStatus().getPodIP(), createServiceMetaInfo(item.getMetadata())); - } - LOGGER.info("Load {} pods in {}ms", ipMap.size(), (System.nanoTime() - startTime) / 1_000_000); - ipServiceMap.set(ipMap); - } catch (Throwable th) { - LOGGER.error("run load pod error", th); - } - } - - private ServiceMetaInfo createServiceMetaInfo(final V1ObjectMeta podMeta) { - ExtensionsV1beta1Api extensionsApi = new ExtensionsV1beta1Api(); - DependencyResource dr = new DependencyResource(podMeta); - DependencyResource meta = dr.getOwnerResource( - "ReplicaSet", ownerReference -> extensionsApi.readNamespacedReplicaSet( - ownerReference - .getName(), podMeta.getNamespace(), "", true, true) - .getMetadata()); - ServiceMetaInfo result = new ServiceMetaInfo(); - if (meta.getMetadata().getOwnerReferences() != null && meta.getMetadata().getOwnerReferences().size() > 0) { - V1OwnerReference owner = meta.getMetadata().getOwnerReferences().get(0); - result.setServiceName(String.format("%s.%s", owner.getName(), meta.getMetadata().getNamespace())); - } else { - result.setServiceName(String.format("%s.%s", meta.getMetadata().getName(), meta.getMetadata() - .getNamespace())); - } - result.setServiceInstanceName(String.format("%s.%s", podMeta.getName(), podMeta.getNamespace())); - result.setTags(transformLabelsToTags(podMeta.getLabels())); - return result; - } - - private List transformLabelsToTags(final Map labels) { - if (labels == null || labels.size() < 1) { - return Collections.emptyList(); - } - List result = new ArrayList<>(labels.size()); - for (Map.Entry each : labels.entrySet()) { - result.add(new ServiceMetaInfo.KeyValue(each.getKey(), each.getValue())); - } - return result; - } - - @Override - public List analysis(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry, Role role) { - if (invalidPodList()) { - return Collections.emptyList(); - } - switch (role) { - case PROXY: - analysisProxy(identifier, entry); - break; - case SIDECAR: - return analysisSideCar(identifier, entry); - } - - return Collections.emptyList(); - } - - protected List analysisSideCar(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry) { - List sources = new ArrayList<>(); - AccessLogCommon properties = entry.getCommonProperties(); - if (properties != null) { - String cluster = properties.getUpstreamCluster(); - if (cluster != null) { - long startTime = formatAsLong(properties.getStartTime()); - long duration = formatAsLong(properties.getTimeToLastDownstreamTxByte()); - - HTTPRequestProperties request = entry.getRequest(); - String endpoint = "/"; - Protocol protocol = Protocol.HTTP; - if (request != null) { - endpoint = request.getPath(); - String schema = request.getScheme(); - if ("http".equals(schema) || "https".equals(schema)) { - protocol = Protocol.HTTP; - } else { - protocol = Protocol.gRPC; - } - } - HTTPResponseProperties response = entry.getResponse(); - int responseCode = 200; - if (response != null) { - responseCode = response.getResponseCode().getValue(); - } - boolean status = responseCode >= 200 && responseCode < 400; - - Address downstreamRemoteAddress = properties.getDownstreamRemoteAddress(); - ServiceMetaInfo downstreamService = find( - downstreamRemoteAddress.getSocketAddress() - .getAddress(), downstreamRemoteAddress.getSocketAddress() - .getPortValue()); - Address downstreamLocalAddress = properties.getDownstreamLocalAddress(); - ServiceMetaInfo localService = find( - downstreamLocalAddress.getSocketAddress() - .getAddress(), downstreamLocalAddress.getSocketAddress() - .getPortValue()); - String tlsMode = parseTLS(properties.getTlsProperties()); - if (cluster.startsWith("inbound|")) { - // Server side - if (downstreamService.equals(ServiceMetaInfo.UNKNOWN)) { - // Ingress -> sidecar(server side) - // Mesh telemetry without source, the relation would be generated. - ServiceMeshMetric.Builder metric = ServiceMeshMetric.newBuilder() - .setStartTime(startTime) - .setEndTime(startTime + duration) - .setDestServiceName( - localService.getServiceName()) - .setDestServiceInstance( - localService.getServiceInstanceName()) - .setEndpoint(endpoint) - .setLatency((int) duration) - .setResponseCode( - Math.toIntExact(responseCode)) - .setStatus(status) - .setProtocol(protocol) - .setTlsMode(tlsMode) - .setDetectPoint(DetectPoint.server); - - LOGGER.debug("Transformed ingress->sidecar inbound mesh metric {}", metric); - forward(metric); - } else { - // sidecar -> sidecar(server side) - ServiceMeshMetric.Builder metric = ServiceMeshMetric.newBuilder() - .setStartTime(startTime) - .setEndTime(startTime + duration) - .setSourceServiceName( - downstreamService.getServiceName()) - .setSourceServiceInstance( - downstreamService.getServiceInstanceName()) - .setDestServiceName( - localService.getServiceName()) - .setDestServiceInstance( - localService.getServiceInstanceName()) - .setEndpoint(endpoint) - .setLatency((int) duration) - .setResponseCode( - Math.toIntExact(responseCode)) - .setStatus(status) - .setProtocol(protocol) - .setTlsMode(tlsMode) - .setDetectPoint(DetectPoint.server); - - LOGGER.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric); - forward(metric); - } - } else if (cluster.startsWith("outbound|")) { - // sidecar(client side) -> sidecar - Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress(); - ServiceMetaInfo destService = find( - upstreamRemoteAddress.getSocketAddress() - .getAddress(), upstreamRemoteAddress.getSocketAddress() - .getPortValue()); - - ServiceMeshMetric.Builder metric = ServiceMeshMetric.newBuilder() - .setStartTime(startTime) - .setEndTime(startTime + duration) - .setSourceServiceName( - downstreamService.getServiceName()) - .setSourceServiceInstance( - downstreamService.getServiceInstanceName()) - .setDestServiceName( - destService.getServiceName()) - .setDestServiceInstance( - destService.getServiceInstanceName()) - .setEndpoint(endpoint) - .setLatency((int) duration) - .setResponseCode(Math.toIntExact(responseCode)) - .setStatus(status) - .setProtocol(protocol) - .setTlsMode(tlsMode) - .setDetectPoint(DetectPoint.client); - - LOGGER.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric); - forward(metric); - - } - } - } - return sources; - } - - private String parseTLS(TLSProperties properties) { - if (properties == null) { - return NON_TLS; - } - if (Strings.isNullOrEmpty(Optional.ofNullable(properties.getLocalCertificateProperties()) - .orElse(TLSProperties.CertificateProperties.newBuilder().build()).getSubject())) { - return NON_TLS; - } - if (Strings.isNullOrEmpty(Optional.ofNullable(properties.getPeerCertificateProperties()) - .orElse(TLSProperties.CertificateProperties.newBuilder().build()).getSubject())) { - return TLS; - } - return M_TLS; - } - - protected void analysisProxy(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry) { - AccessLogCommon properties = entry.getCommonProperties(); - if (properties != null) { - Address downstreamLocalAddress = properties.getDownstreamLocalAddress(); - Address downstreamRemoteAddress = properties.getDownstreamRemoteAddress(); - Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress(); - if (downstreamLocalAddress != null && downstreamRemoteAddress != null && upstreamRemoteAddress != null) { - SocketAddress downstreamRemoteAddressSocketAddress = downstreamRemoteAddress.getSocketAddress(); - ServiceMetaInfo outside = find( - downstreamRemoteAddressSocketAddress.getAddress(), downstreamRemoteAddressSocketAddress - .getPortValue()); - - SocketAddress downstreamLocalAddressSocketAddress = downstreamLocalAddress.getSocketAddress(); - ServiceMetaInfo ingress = find( - downstreamLocalAddressSocketAddress.getAddress(), downstreamLocalAddressSocketAddress - .getPortValue()); - - long startTime = formatAsLong(properties.getStartTime()); - long duration = formatAsLong(properties.getTimeToLastDownstreamTxByte()); - - HTTPRequestProperties request = entry.getRequest(); - String endpoint = "/"; - Protocol protocol = Protocol.HTTP; - if (request != null) { - endpoint = request.getPath(); - String schema = request.getScheme(); - if ("http".equals(schema) || "https".equals(schema)) { - protocol = Protocol.HTTP; - } else { - protocol = Protocol.gRPC; - } - } - HTTPResponseProperties response = entry.getResponse(); - int responseCode = 200; - if (response != null) { - responseCode = response.getResponseCode().getValue(); - } - boolean status = responseCode >= 200 && responseCode < 400; - String tlsMode = parseTLS(properties.getTlsProperties()); - - ServiceMeshMetric.Builder metric = ServiceMeshMetric.newBuilder() - .setStartTime(startTime) - .setEndTime(startTime + duration) - .setSourceServiceName(outside.getServiceName()) - .setSourceServiceInstance( - outside.getServiceInstanceName()) - .setDestServiceName(ingress.getServiceName()) - .setDestServiceInstance( - ingress.getServiceInstanceName()) - .setEndpoint(endpoint) - .setLatency((int) duration) - .setResponseCode(Math.toIntExact(responseCode)) - .setStatus(status) - .setProtocol(protocol) - .setTlsMode(tlsMode) - .setDetectPoint(DetectPoint.server); - - LOGGER.debug("Transformed ingress inbound mesh metric {}", metric); - forward(metric); - - SocketAddress upstreamRemoteAddressSocketAddress = upstreamRemoteAddress.getSocketAddress(); - ServiceMetaInfo targetService = find( - upstreamRemoteAddressSocketAddress.getAddress(), upstreamRemoteAddressSocketAddress - .getPortValue()); - - long outboundStartTime = startTime + formatAsLong(properties.getTimeToFirstUpstreamTxByte()); - long outboundEndTime = startTime + formatAsLong(properties.getTimeToLastUpstreamRxByte()); - - ServiceMeshMetric.Builder outboundMetric = ServiceMeshMetric.newBuilder() - .setStartTime(outboundStartTime) - .setEndTime(outboundEndTime) - .setSourceServiceName( - ingress.getServiceName()) - .setSourceServiceInstance( - ingress.getServiceInstanceName()) - .setDestServiceName( - targetService.getServiceName()) - .setDestServiceInstance( - targetService.getServiceInstanceName()) - .setEndpoint(endpoint) - .setLatency( - (int) (outboundEndTime - outboundStartTime)) - .setResponseCode( - Math.toIntExact(responseCode)) - .setStatus(status) - .setProtocol(protocol) - // Can't parse it from tls properties, leave - // it to Server side. - .setTlsMode(NON_TLS) - .setDetectPoint(DetectPoint.client); - - LOGGER.debug("Transformed ingress outbound mesh metric {}", outboundMetric); - forward(outboundMetric); - } - } - } - - @Override - public Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, Role prev) { - if (alsIdentifier != null) { - Node node = alsIdentifier.getNode(); - if (node != null) { - String id = node.getId(); - if (id.startsWith("router~")) { - return Role.PROXY; - } else if (id.startsWith("sidecar~")) { - return Role.SIDECAR; - } - } - } - - return prev; - } - - /** - * @return found service info, or {@link ServiceMetaInfo#UNKNOWN} to represent not found. - */ - protected ServiceMetaInfo find(String ip, int port) { - Map map = ipServiceMap.get(); - if (map == null) { - LOGGER.debug("Unknown ip {}, ip -> service is null", ip); - return ServiceMetaInfo.UNKNOWN; - } - if (map.containsKey(ip)) { - return map.get(ip); - } - LOGGER.debug("Unknown ip {}, ip -> service is {}", ip, map); - return ServiceMetaInfo.UNKNOWN; - } - - protected void forward(ServiceMeshMetric.Builder metric) { - TelemetryDataDispatcher.process(metric); - } - - private long formatAsLong(Timestamp timestamp) { - return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos()).toEpochMilli(); - } - - private long formatAsLong(Duration duration) { - return Instant.ofEpochSecond(duration.getSeconds(), duration.getNanos()).toEpochMilli(); - } -} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java new file mode 100644 index 0000000000..90a1ee62e9 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als.k8s; + +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.kubernetes.client.informer.ResourceEventHandler; +import io.kubernetes.client.informer.SharedInformerFactory; +import io.kubernetes.client.openapi.ApiClient; +import io.kubernetes.client.openapi.Configuration; +import io.kubernetes.client.openapi.apis.CoreV1Api; +import io.kubernetes.client.openapi.models.V1Endpoints; +import io.kubernetes.client.openapi.models.V1EndpointsList; +import io.kubernetes.client.openapi.models.V1ObjectMeta; +import io.kubernetes.client.openapi.models.V1Pod; +import io.kubernetes.client.openapi.models.V1PodList; +import io.kubernetes.client.openapi.models.V1Service; +import io.kubernetes.client.openapi.models.V1ServiceList; +import io.kubernetes.client.util.Config; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig; +import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo; + +import static com.google.common.base.Strings.isNullOrEmpty; +import static java.util.Objects.isNull; +import static java.util.Objects.requireNonNull; + +@Slf4j +class K8SServiceRegistry { + final Map ipServiceMetaInfoMap; + + final Map idServiceMap; + + final Map ipPodMap; + + final Map ipServiceMap; + + final ExecutorService executor; + + final ServiceNameFormatter serviceNameFormatter; + + K8SServiceRegistry(final EnvoyMetricReceiverConfig config) { + serviceNameFormatter = new ServiceNameFormatter(config.getK8sServiceNameRule()); + ipServiceMetaInfoMap = new ConcurrentHashMap<>(); + idServiceMap = new ConcurrentHashMap<>(); + ipPodMap = new ConcurrentHashMap<>(); + ipServiceMap = new ConcurrentHashMap<>(); + executor = Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setNameFormat("K8SServiceRegistry-%d") + .setDaemon(true) + .build() + ); + } + + void start() throws IOException { + final ApiClient apiClient = Config.defaultClient(); + apiClient.setHttpClient(apiClient.getHttpClient() + .newBuilder() + .readTimeout(0, TimeUnit.SECONDS) + .build()); + Configuration.setDefaultApiClient(apiClient); + + final CoreV1Api coreV1Api = new CoreV1Api(); + final SharedInformerFactory factory = new SharedInformerFactory(executor); + + // TODO: also listen to the EndpointSlice event after the client supports us to do so + listenServiceEvents(coreV1Api, factory); + listenEndpointsEvents(coreV1Api, factory); + listenPodEvents(coreV1Api, factory); + + factory.startAllRegisteredInformers(); + } + + private void listenServiceEvents(final CoreV1Api coreV1Api, final SharedInformerFactory factory) { + factory.sharedIndexInformerFor( + params -> coreV1Api.listServiceForAllNamespacesCall( + null, + null, + null, + null, + null, + null, + params.resourceVersion, + params.timeoutSeconds, + params.watch, + null + ), + V1Service.class, + V1ServiceList.class + ).addEventHandler(new ResourceEventHandler() { + @Override + public void onAdd(final V1Service service) { + addService(service); + } + + @Override + public void onUpdate(final V1Service oldService, final V1Service newService) { + addService(newService); + } + + @Override + public void onDelete(final V1Service service, final boolean deletedFinalStateUnknown) { + removeService(service); + } + }); + } + + private void listenEndpointsEvents(final CoreV1Api coreV1Api, final SharedInformerFactory factory) { + factory.sharedIndexInformerFor( + params -> coreV1Api.listEndpointsForAllNamespacesCall( + null, + null, + null, + null, + null, + null, + params.resourceVersion, + params.timeoutSeconds, + params.watch, + null + ), + V1Endpoints.class, + V1EndpointsList.class + ).addEventHandler(new ResourceEventHandler() { + @Override + public void onAdd(final V1Endpoints endpoints) { + addEndpoints(endpoints); + } + + @Override + public void onUpdate(final V1Endpoints oldEndpoints, final V1Endpoints newEndpoints) { + addEndpoints(newEndpoints); + } + + @Override + public void onDelete(final V1Endpoints endpoints, final boolean deletedFinalStateUnknown) { + removeEndpoints(endpoints); + } + }); + } + + private void listenPodEvents(final CoreV1Api coreV1Api, final SharedInformerFactory factory) { + factory.sharedIndexInformerFor( + params -> coreV1Api.listPodForAllNamespacesCall( + null, + null, + null, + null, + null, + null, + params.resourceVersion, + params.timeoutSeconds, + params.watch, + null + ), + V1Pod.class, + V1PodList.class + ).addEventHandler(new ResourceEventHandler() { + @Override + public void onAdd(final V1Pod pod) { + addPod(pod); + } + + @Override + public void onUpdate(final V1Pod oldPod, final V1Pod newPod) { + addPod(newPod); + } + + @Override + public void onDelete(final V1Pod pod, final boolean deletedFinalStateUnknown) { + removePod(pod); + } + }); + } + + private void addService(final V1Service service) { + Optional.ofNullable(service.getMetadata()).ifPresent( + metadata -> idServiceMap.put(metadata.getNamespace() + ":" + metadata.getName(), service) + ); + + recompose(); + } + + private void removeService(final V1Service service) { + Optional.ofNullable(service.getMetadata()).ifPresent( + metadata -> idServiceMap.remove(metadata.getUid()) + ); + } + + private void addPod(final V1Pod pod) { + Optional.ofNullable(pod.getStatus()).ifPresent( + status -> ipPodMap.put(status.getPodIP(), pod) + ); + + recompose(); + } + + private void removePod(final V1Pod pod) { + Optional.ofNullable(pod.getStatus()).ifPresent( + status -> ipPodMap.remove(status.getPodIP()) + ); + } + + private void addEndpoints(final V1Endpoints endpoints) { + final String namespace = requireNonNull(endpoints.getMetadata()).getNamespace(); + final String name = requireNonNull(endpoints.getMetadata()).getName(); + + requireNonNull(endpoints.getSubsets()).forEach( + subset -> requireNonNull(subset.getAddresses()).forEach( + address -> ipServiceMap.put(address.getIp(), namespace + ":" + name) + ) + ); + + recompose(); + } + + private void removeEndpoints(final V1Endpoints endpoints) { + requireNonNull(endpoints.getSubsets()).forEach( + subset -> requireNonNull(subset.getAddresses()).forEach( + address -> ipServiceMap.remove(address.getIp()) + ) + ); + } + + private List transformLabelsToTags(final Map labels) { + if (isNull(labels)) { + return Collections.emptyList(); + } + return labels.entrySet() + .stream() + .map(each -> new ServiceMetaInfo.KeyValue(each.getKey(), each.getValue())) + .collect(Collectors.toList()); + } + + ServiceMetaInfo findService(final String ip) { + final ServiceMetaInfo service = ipServiceMetaInfoMap.get(ip); + if (isNull(service)) { + log.debug("Unknown ip {}, ip -> service is null", ip); + return ServiceMetaInfo.UNKNOWN; + } + return service; + } + + private void recompose() { + ipPodMap.forEach((ip, pod) -> { + final String namespaceService = ipServiceMap.get(ip); + final V1Service service; + if (isNullOrEmpty(namespaceService) || isNull(service = idServiceMap.get(namespaceService))) { + return; + } + + final Map context = ImmutableMap.of("service", service, "pod", pod); + final V1ObjectMeta podMetadata = requireNonNull(pod.getMetadata()); + + ipServiceMetaInfoMap.computeIfAbsent(ip, unused -> { + final ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo(); + + try { + serviceMetaInfo.setServiceName(serviceNameFormatter.format(context)); + } catch (Exception e) { + log.error("Failed to evaluate service name.", e); + serviceMetaInfo.setServiceName(requireNonNull(service.getMetadata()).getName()); + } + serviceMetaInfo.setServiceInstanceName(String.format("%s.%s", podMetadata.getName(), podMetadata.getNamespace())); + serviceMetaInfo.setTags(transformLabelsToTags(podMetadata.getLabels())); + + return serviceMetaInfo; + }); + }); + } + + boolean isEmpty() { + return ipServiceMetaInfoMap.isEmpty(); + } +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java new file mode 100644 index 0000000000..f06d941578 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als.k8s; + +import com.google.common.base.Strings; +import com.google.protobuf.Duration; +import com.google.protobuf.Timestamp; +import io.envoyproxy.envoy.api.v2.core.Address; +import io.envoyproxy.envoy.api.v2.core.Node; +import io.envoyproxy.envoy.api.v2.core.SocketAddress; +import io.envoyproxy.envoy.data.accesslog.v2.AccessLogCommon; +import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry; +import io.envoyproxy.envoy.data.accesslog.v2.HTTPRequestProperties; +import io.envoyproxy.envoy.data.accesslog.v2.HTTPResponseProperties; +import io.envoyproxy.envoy.data.accesslog.v2.TLSProperties; +import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher; +import org.apache.skywalking.apm.network.common.v3.DetectPoint; +import org.apache.skywalking.apm.network.servicemesh.v3.Protocol; +import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; +import org.apache.skywalking.oap.server.core.source.Source; +import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig; +import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis; +import org.apache.skywalking.oap.server.receiver.envoy.als.Role; +import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo; + +/** + * Analysis log based on ingress and mesh scenarios. + */ +@Slf4j +public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis { + private static final String NON_TLS = "NONE"; + + private static final String M_TLS = "mTLS"; + + private static final String TLS = "TLS"; + + protected K8SServiceRegistry serviceRegistry; + + @Override + public String name() { + return "k8s-mesh"; + } + + @Override + @SneakyThrows + public void init(EnvoyMetricReceiverConfig config) { + serviceRegistry = new K8SServiceRegistry(config); + serviceRegistry.start(); + } + + @Override + public List analysis(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry, Role role) { + if (serviceRegistry.isEmpty()) { + return Collections.emptyList(); + } + switch (role) { + case PROXY: + analysisProxy(identifier, entry); + break; + case SIDECAR: + return analysisSideCar(identifier, entry); + } + + return Collections.emptyList(); + } + + protected List analysisSideCar(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry) { + List sources = new ArrayList<>(); + AccessLogCommon properties = entry.getCommonProperties(); + if (properties != null) { + String cluster = properties.getUpstreamCluster(); + if (cluster != null) { + long startTime = formatAsLong(properties.getStartTime()); + long duration = formatAsLong(properties.getTimeToLastDownstreamTxByte()); + + HTTPRequestProperties request = entry.getRequest(); + String endpoint = "/"; + Protocol protocol = Protocol.HTTP; + if (request != null) { + endpoint = request.getPath(); + String schema = request.getScheme(); + if ("http".equals(schema) || "https".equals(schema)) { + protocol = Protocol.HTTP; + } else { + protocol = Protocol.gRPC; + } + } + HTTPResponseProperties response = entry.getResponse(); + int responseCode = 200; + if (response != null) { + responseCode = response.getResponseCode().getValue(); + } + boolean status = responseCode >= 200 && responseCode < 400; + + Address downstreamRemoteAddress = properties.getDownstreamRemoteAddress(); + ServiceMetaInfo downstreamService = find(downstreamRemoteAddress.getSocketAddress().getAddress()); + Address downstreamLocalAddress = properties.getDownstreamLocalAddress(); + ServiceMetaInfo localService = find(downstreamLocalAddress.getSocketAddress().getAddress()); + String tlsMode = parseTLS(properties.getTlsProperties()); + + ServiceMeshMetric.Builder metric = null; + if (cluster.startsWith("inbound|")) { + // Server side + if (downstreamService.equals(ServiceMetaInfo.UNKNOWN)) { + // Ingress -> sidecar(server side) + // Mesh telemetry without source, the relation would be generated. + metric = ServiceMeshMetric.newBuilder() + .setStartTime(startTime) + .setEndTime(startTime + duration) + .setDestServiceName(localService.getServiceName()) + .setDestServiceInstance(localService.getServiceInstanceName()) + .setEndpoint(endpoint) + .setLatency((int) duration) + .setResponseCode(Math.toIntExact(responseCode)) + .setStatus(status) + .setProtocol(protocol) + .setTlsMode(tlsMode) + .setDetectPoint(DetectPoint.server); + + log.debug("Transformed ingress->sidecar inbound mesh metric {}", metric); + } else { + // sidecar -> sidecar(server side) + metric = ServiceMeshMetric.newBuilder() + .setStartTime(startTime) + .setEndTime(startTime + duration) + .setSourceServiceName(downstreamService.getServiceName()) + .setSourceServiceInstance(downstreamService.getServiceInstanceName()) + .setDestServiceName(localService.getServiceName()) + .setDestServiceInstance(localService.getServiceInstanceName()) + .setEndpoint(endpoint) + .setLatency((int) duration) + .setResponseCode(Math.toIntExact(responseCode)) + .setStatus(status) + .setProtocol(protocol) + .setTlsMode(tlsMode) + .setDetectPoint(DetectPoint.server); + + log.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric); + } + } else if (cluster.startsWith("outbound|")) { + // sidecar(client side) -> sidecar + Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress(); + ServiceMetaInfo destService = find(upstreamRemoteAddress.getSocketAddress().getAddress()); + + metric = ServiceMeshMetric.newBuilder() + .setStartTime(startTime) + .setEndTime(startTime + duration) + .setSourceServiceName(downstreamService.getServiceName()) + .setSourceServiceInstance(downstreamService.getServiceInstanceName()) + .setDestServiceName(destService.getServiceName()) + .setDestServiceInstance(destService.getServiceInstanceName()) + .setEndpoint(endpoint) + .setLatency((int) duration) + .setResponseCode(Math.toIntExact(responseCode)) + .setStatus(status) + .setProtocol(protocol) + .setTlsMode(tlsMode) + .setDetectPoint(DetectPoint.client); + + log.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric); + } + + Optional.ofNullable(metric).ifPresent(this::forward); + } + } + return sources; + } + + private String parseTLS(TLSProperties properties) { + if (properties == null) { + return NON_TLS; + } + if (Strings.isNullOrEmpty(Optional.ofNullable(properties.getLocalCertificateProperties()) + .orElse(TLSProperties.CertificateProperties.newBuilder().build()).getSubject())) { + return NON_TLS; + } + if (Strings.isNullOrEmpty(Optional.ofNullable(properties.getPeerCertificateProperties()) + .orElse(TLSProperties.CertificateProperties.newBuilder().build()).getSubject())) { + return TLS; + } + return M_TLS; + } + + protected void analysisProxy(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry) { + AccessLogCommon properties = entry.getCommonProperties(); + if (properties != null) { + Address downstreamLocalAddress = properties.getDownstreamLocalAddress(); + Address downstreamRemoteAddress = properties.getDownstreamRemoteAddress(); + Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress(); + if (downstreamLocalAddress != null && downstreamRemoteAddress != null && upstreamRemoteAddress != null) { + SocketAddress downstreamRemoteAddressSocketAddress = downstreamRemoteAddress.getSocketAddress(); + ServiceMetaInfo outside = find(downstreamRemoteAddressSocketAddress.getAddress()); + + SocketAddress downstreamLocalAddressSocketAddress = downstreamLocalAddress.getSocketAddress(); + ServiceMetaInfo ingress = find(downstreamLocalAddressSocketAddress.getAddress()); + + long startTime = formatAsLong(properties.getStartTime()); + long duration = formatAsLong(properties.getTimeToLastDownstreamTxByte()); + + HTTPRequestProperties request = entry.getRequest(); + String endpoint = "/"; + Protocol protocol = Protocol.HTTP; + if (request != null) { + endpoint = request.getPath(); + String schema = request.getScheme(); + if ("http".equals(schema) || "https".equals(schema)) { + protocol = Protocol.HTTP; + } else { + protocol = Protocol.gRPC; + } + } + HTTPResponseProperties response = entry.getResponse(); + int responseCode = 200; + if (response != null) { + responseCode = response.getResponseCode().getValue(); + } + boolean status = responseCode >= 200 && responseCode < 400; + String tlsMode = parseTLS(properties.getTlsProperties()); + + ServiceMeshMetric.Builder metric = ServiceMeshMetric.newBuilder() + .setStartTime(startTime) + .setEndTime(startTime + duration) + .setSourceServiceName(outside.getServiceName()) + .setSourceServiceInstance( + outside.getServiceInstanceName()) + .setDestServiceName(ingress.getServiceName()) + .setDestServiceInstance( + ingress.getServiceInstanceName()) + .setEndpoint(endpoint) + .setLatency((int) duration) + .setResponseCode(Math.toIntExact(responseCode)) + .setStatus(status) + .setProtocol(protocol) + .setTlsMode(tlsMode) + .setDetectPoint(DetectPoint.server); + + log.debug("Transformed ingress inbound mesh metric {}", metric); + forward(metric); + + SocketAddress upstreamRemoteAddressSocketAddress = upstreamRemoteAddress.getSocketAddress(); + ServiceMetaInfo targetService = find(upstreamRemoteAddressSocketAddress.getAddress()); + + long outboundStartTime = startTime + formatAsLong(properties.getTimeToFirstUpstreamTxByte()); + long outboundEndTime = startTime + formatAsLong(properties.getTimeToLastUpstreamRxByte()); + + ServiceMeshMetric.Builder outboundMetric = ServiceMeshMetric.newBuilder() + .setStartTime(outboundStartTime) + .setEndTime(outboundEndTime) + .setSourceServiceName( + ingress.getServiceName()) + .setSourceServiceInstance( + ingress.getServiceInstanceName()) + .setDestServiceName( + targetService.getServiceName()) + .setDestServiceInstance( + targetService.getServiceInstanceName()) + .setEndpoint(endpoint) + .setLatency( + (int) (outboundEndTime - outboundStartTime)) + .setResponseCode( + Math.toIntExact(responseCode)) + .setStatus(status) + .setProtocol(protocol) + // Can't parse it from tls properties, leave + // it to Server side. + .setTlsMode(NON_TLS) + .setDetectPoint(DetectPoint.client); + + log.debug("Transformed ingress outbound mesh metric {}", outboundMetric); + forward(outboundMetric); + } + } + } + + @Override + public Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, Role prev) { + if (alsIdentifier != null) { + Node node = alsIdentifier.getNode(); + if (node != null) { + String id = node.getId(); + if (id.startsWith("router~")) { + return Role.PROXY; + } else if (id.startsWith("sidecar~")) { + return Role.SIDECAR; + } + } + } + + return prev; + } + + /** + * @return found service info, or {@link ServiceMetaInfo#UNKNOWN} to represent not found. + */ + protected ServiceMetaInfo find(String ip) { + return serviceRegistry.findService(ip); + } + + protected void forward(ServiceMeshMetric.Builder metric) { + TelemetryDataDispatcher.process(metric); + } + + private long formatAsLong(Timestamp timestamp) { + return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos()).toEpochMilli(); + } + + private long formatAsLong(Duration duration) { + return Instant.ofEpochSecond(duration.getSeconds(), duration.getNanos()).toEpochMilli(); + } +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/ServiceNameFormatter.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/ServiceNameFormatter.java new file mode 100644 index 0000000000..5e2e375085 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/ServiceNameFormatter.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als.k8s; + +import com.google.common.base.Strings; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.commons.beanutils.PropertyUtils; +import org.apache.commons.lang3.StringUtils; + +class ServiceNameFormatter { + + private final List properties; + + private final StringBuffer serviceNamePattern; + + ServiceNameFormatter(String rule) { + rule = StringUtils.defaultIfBlank(rule, "${service.metadata.name}"); + + this.properties = new ArrayList<>(); + this.serviceNamePattern = new StringBuffer(); + + final Pattern variablePattern = Pattern.compile("(\\$\\{(?.+?)})"); + final Matcher matcher = variablePattern.matcher(rule); + + while (matcher.find()) { + properties.add(matcher.group("property")); + matcher.appendReplacement(serviceNamePattern, "%s"); + } + } + + String format(final Map context) throws Exception { + final Object[] values = new Object[properties.size()]; + + for (int i = 0; i < properties.size(); i++) { + final Object value = PropertyUtils.getProperty(context, properties.get(i)); + values[i] = value; + } + + return Strings.lenientFormat(serviceNamePattern.toString(), values); + } +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis index c744de58e1..215d7a0496 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis @@ -17,4 +17,4 @@ # -org.apache.skywalking.oap.server.receiver.envoy.als.K8sALSServiceMeshHTTPAnalysis \ No newline at end of file +org.apache.skywalking.oap.server.receiver.envoy.als.k8s.K8sALSServiceMeshHTTPAnalysis diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResourceTest.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResourceTest.java deleted file mode 100644 index 9a4f551ca3..0000000000 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResourceTest.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * - */ - -package org.apache.skywalking.oap.server.receiver.envoy.als; - -import io.kubernetes.client.openapi.ApiException; -import io.kubernetes.client.openapi.models.V1ObjectMeta; -import io.kubernetes.client.openapi.models.V1OwnerReference; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertThat; - -@RunWith(Parameterized.class) -public class DependencyResourceTest { - - @Parameterized.Parameter - public String resourceName; - - @Parameterized.Parameter(1) - public ThrowableFunction function; - - @Parameterized.Parameters(name = "{index}: {0}") - public static Collection data() { - return Arrays.asList(new Object[][] { - { - "deploy1", - (ThrowableFunction) result -> result - }, - { - "pod1", - (ThrowableFunction) result -> { - throw new RuntimeException(); - } - }, - { - "pod1", - (ThrowableFunction) result -> { - throw new ApiException(); - } - }, - { - "pod1", - (ThrowableFunction) result -> null - }, - { - "rs1", - (ThrowableFunction) result -> { - result.setOwnerReferences(null); - return result; - } - }, - { - "rs1", - (ThrowableFunction) result -> { - V1OwnerReference reference1 = new V1OwnerReference(); - reference1.setKind("StatefulSet"); - reference1.setName("ss1"); - result.setOwnerReferences(Collections.singletonList(reference1)); - return result; - } - }, - }); - } - - @Test - public void test() { - V1ObjectMeta meta = new V1ObjectMeta(); - meta.setName("pod1"); - V1OwnerReference reference = new V1OwnerReference(); - reference.setKind("ReplicaSet"); - reference.setName("rs1"); - meta.addOwnerReferencesItem(reference); - DependencyResource dr = new DependencyResource(meta); - DependencyResource drr = dr.getOwnerResource("ReplicaSet", ownerReference -> { - assertThat(ownerReference.getName(), is("rs1")); - V1ObjectMeta result = new V1ObjectMeta(); - result.setName("rs1"); - V1OwnerReference reference1 = new V1OwnerReference(); - reference1.setKind("Deployment"); - reference1.setName("deploy1"); - result.addOwnerReferencesItem(reference1); - return function.go(result); - }).getOwnerResource("Deployment", ownerReference -> { - assertThat(ownerReference.getName(), is("deploy1")); - V1ObjectMeta result = new V1ObjectMeta(); - result.setName("deploy1"); - return result; - }); - assertThat(drr.getMetadata().getName(), is(resourceName)); - } - - interface ThrowableFunction { - V1ObjectMeta go(final V1ObjectMeta result) throws ApiException; - } - -} \ No newline at end of file diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sHTTPAnalysisTest.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sHTTPAnalysisTest.java similarity index 88% rename from oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sHTTPAnalysisTest.java rename to oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sHTTPAnalysisTest.java index e6c797f7a1..7ad64a5060 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sHTTPAnalysisTest.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sHTTPAnalysisTest.java @@ -16,9 +16,8 @@ * */ -package org.apache.skywalking.oap.server.receiver.envoy.als; +package org.apache.skywalking.oap.server.receiver.envoy.als.k8s; -import com.google.common.collect.ImmutableMap; import com.google.protobuf.util.JsonFormat; import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage; import java.io.IOException; @@ -30,10 +29,16 @@ import org.apache.skywalking.apm.network.common.v3.DetectPoint; import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig; import org.apache.skywalking.oap.server.receiver.envoy.MetricServiceGRPCHandlerTestMain; +import org.apache.skywalking.oap.server.receiver.envoy.als.Role; +import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class K8sHTTPAnalysisTest { private MockK8sAnalysis analysis; @@ -147,12 +152,12 @@ public class K8sHTTPAnalysisTest { @Override public void init(EnvoyMetricReceiverConfig config) { - getIpServiceMap().set( - ImmutableMap.of("10.44.2.56", new ServiceMetaInfo("ingress", "ingress-Inst"), "10.44.2.54", - new ServiceMetaInfo("productpage", "productpage-Inst"), "10.44.6.66", - new ServiceMetaInfo("detail", "detail-Inst"), "10.44.2.55", - new ServiceMetaInfo("review", "detail-Inst") - )); + serviceRegistry = mock(K8SServiceRegistry.class); + when(serviceRegistry.findService(anyString())).thenReturn(ServiceMetaInfo.UNKNOWN); + when(serviceRegistry.findService("10.44.2.56")).thenReturn(new ServiceMetaInfo("ingress", "ingress-Inst")); + when(serviceRegistry.findService("10.44.2.54")).thenReturn(new ServiceMetaInfo("productpage", "productpage-Inst")); + when(serviceRegistry.findService("10.44.6.66")).thenReturn(new ServiceMetaInfo("detail", "detail-Inst")); + when(serviceRegistry.findService("10.44.2.55")).thenReturn(new ServiceMetaInfo("review", "detail-Inst")); } @Override diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/ServiceNameFormatterTest.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/ServiceNameFormatterTest.java new file mode 100644 index 0000000000..581f5c14a8 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/ServiceNameFormatterTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als.k8s; + +import com.google.common.collect.ImmutableMap; +import io.kubernetes.client.openapi.models.V1ObjectMeta; +import io.kubernetes.client.openapi.models.V1Pod; +import io.kubernetes.client.openapi.models.V1Service; +import java.util.Map; +import lombok.RequiredArgsConstructor; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static junit.framework.TestCase.assertEquals; + +@RequiredArgsConstructor +@RunWith(Parameterized.class) +public class ServiceNameFormatterTest { + private final Case kase; + + @Parameterized.Parameters + public static Case[] parameters() { + return new Case[] { + new Case( + null, + ImmutableMap.of("service", service("Clash")), + "Clash" + ), + new Case( + null, + ImmutableMap.of("service", service("ClashX"), "pod", pod("version", "v1")), + "ClashX" + ), + new Case( + "${service.metadata.name}-${pod.metadata.labels.version}", + ImmutableMap.of("service", service("Clash"), "pod", pod("version", "v1beta")), + "Clash-v1beta" + ), + new Case( + "${pod.metadata.labels.app}", + ImmutableMap.of("service", service("Clash"), "pod", pod("app", "ClashX-alpha")), + "ClashX-alpha" + ) + }; + } + + @Test + public void testFormatDefaultRule() throws Exception { + assertEquals(new ServiceNameFormatter(kase.format).format(kase.context), kase.result); + } + + static V1Service service(final String name) { + return new V1Service() { + @Override + public V1ObjectMeta getMetadata() { + return new V1ObjectMeta() { + @Override + public String getName() { + return name; + } + }; + } + }; + } + + static V1Pod pod(final String label, final String value) { + return new V1Pod() { + @Override + public V1ObjectMeta getMetadata() { + return new V1ObjectMeta() { + @Override + public Map getLabels() { + return ImmutableMap.of(label, value); + } + }; + } + }; + } + + @RequiredArgsConstructor + static class Case { + private final String format; + + private final Map context; + + private final String result; + } +} diff --git a/test/e2e-mesh/e2e-istio/scripts/istio.sh b/test/e2e-mesh/e2e-istio/scripts/istio.sh index ffd432c302..2578818bca 100644 --- a/test/e2e-mesh/e2e-istio/scripts/istio.sh +++ b/test/e2e-mesh/e2e-istio/scripts/istio.sh @@ -21,7 +21,6 @@ set -ex -curl -L https://istio.io/downloadIstio | sh - -sudo mv $PWD/istio-$ISTIO_VERSION/bin/istioctl /usr/local/bin/ +istioctl version || (curl -L https://istio.io/downloadIstio | sh - && sudo mv $PWD/istio-$ISTIO_VERSION/bin/istioctl /usr/local/bin/) istioctl install $@ kubectl label namespace default istio-injection=enabled diff --git a/test/e2e/e2e-common/src/main/java/org/apache/skywalking/e2e/retryable/RetryableTest.java b/test/e2e/e2e-common/src/main/java/org/apache/skywalking/e2e/retryable/RetryableTest.java index 0dc440a9fc..f6218c8df9 100644 --- a/test/e2e/e2e-common/src/main/java/org/apache/skywalking/e2e/retryable/RetryableTest.java +++ b/test/e2e/e2e-common/src/main/java/org/apache/skywalking/e2e/retryable/RetryableTest.java @@ -49,10 +49,10 @@ public @interface RetryableTest { /** * @return maximum times to retry, or -1 for infinite retries. {@code -1} by default. */ - int value() default 300; + int value() default 120; /** * @return the interval between any two retries, in millisecond. {@code 1000} by default. */ - long interval() default 1000; + long interval() default 10000; } diff --git a/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/mesh/ALSE2E.java b/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/mesh/ALSE2E.java index 3442ee1753..b989eb3168 100644 --- a/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/mesh/ALSE2E.java +++ b/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/mesh/ALSE2E.java @@ -66,7 +66,9 @@ import static org.apache.skywalking.e2e.utils.Yamls.load; @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class ALSE2E extends SkyWalkingTestAdapter { private final String swWebappHost = Optional.ofNullable(Strings.emptyToNull(System.getenv("WEBAPP_HOST"))).orElse("127.0.0.1"); - private final String swWebappPort = Optional.ofNullable(Strings.emptyToNull(System.getenv("WEBAPP_PORT"))).orElse("8080"); + + private final String swWebappPort = Optional.ofNullable(Strings.emptyToNull(System.getenv("WEBAPP_PORT"))).orElse("12800"); + protected HostAndPort swWebappHostPort = HostAndPort.builder() .host(swWebappHost) .port(Integer.parseInt(swWebappPort)) @@ -74,10 +76,10 @@ public class ALSE2E extends SkyWalkingTestAdapter { private final Map, String> serviceEndpointExpectedDataFiles = ImmutableMap., String>builder() - .put(service -> IDManager.ServiceID.analysisId(service.getKey()).getName().startsWith("ratings-v1"), "expected/als/endpoints-ratings.yml") - .put(service -> IDManager.ServiceID.analysisId(service.getKey()).getName().startsWith("details-v1"), "expected/als/endpoints-details.yml") - .put(service -> IDManager.ServiceID.analysisId(service.getKey()).getName().startsWith("reviews-v"), "expected/als/endpoints-reviews.yml") - .put(service -> IDManager.ServiceID.analysisId(service.getKey()).getName().startsWith("productpage-v1"), "expected/als/endpoints-productpage.yml") + .put(service -> service.getLabel().startsWith("ratings"), "expected/als/endpoints-ratings.yml") + .put(service -> service.getLabel().startsWith("details"), "expected/als/endpoints-details.yml") + .put(service -> service.getLabel().startsWith("reviews"), "expected/als/endpoints-reviews.yml") + .put(service -> service.getLabel().startsWith("productpage"), "expected/als/endpoints-productpage.yml") .build(); @BeforeAll @@ -151,7 +153,11 @@ public class ALSE2E extends SkyWalkingTestAdapter { LOGGER.info("instances: {}", instances); - load("expected/als/instances.yml").as(InstancesMatcher.class).verify(instances); + String file = "expected/als/instances.yml"; + if (service.getLabel().equals("reviews")) { + file = "expected/als/instances-reviews.yml"; + } + load(file).as(InstancesMatcher.class).verify(instances); return instances; } diff --git a/test/e2e/e2e-test/src/test/resources/expected/als/instances-reviews.yml b/test/e2e/e2e-test/src/test/resources/expected/als/instances-reviews.yml new file mode 100644 index 0000000000..1abf02cff8 --- /dev/null +++ b/test/e2e/e2e-test/src/test/resources/expected/als/instances-reviews.yml @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +instances: + - key: not null + label: not null + - key: not null + label: not null + - key: not null + label: not null diff --git a/test/e2e/e2e-test/src/test/resources/expected/als/services.yml b/test/e2e/e2e-test/src/test/resources/expected/als/services.yml index bd79611af5..006a060b0b 100644 --- a/test/e2e/e2e-test/src/test/resources/expected/als/services.yml +++ b/test/e2e/e2e-test/src/test/resources/expected/als/services.yml @@ -15,16 +15,12 @@ services: - key: not null - label: re(ratings-v1.*) + label: re(ratings.*) - key: not null - label: re(reviews-v1.*) + label: re(reviews.*) - key: not null - label: re(reviews-v2.*) + label: re(productpage.*) - key: not null - label: re(reviews-v3.*) - - key: not null - label: re(productpage-v1.*) - - key: not null - label: re(details-v1.*) + label: re(details.*) - key: not null label: re(istio-ingressgateway.*) diff --git a/test/e2e/e2e-test/src/test/resources/expected/als/topo.yml b/test/e2e/e2e-test/src/test/resources/expected/als/topo.yml index c369efd746..0d5ee72907 100644 --- a/test/e2e/e2e-test/src/test/resources/expected/als/topo.yml +++ b/test/e2e/e2e-test/src/test/resources/expected/als/topo.yml @@ -15,30 +15,22 @@ nodes: - id: not null - name: re(ratings-v1.*) + name: ratings type: http isReal: true - id: not null - name: re(reviews-v1.*) + name: reviews type: http isReal: true - id: not null - name: re(reviews-v2.*) - type: http + name: productpage isReal: true - id: not null - name: re(reviews-v3.*) + name: details type: http isReal: true - id: not null - name: re(productpage-v1.*) - isReal: true - - id: not null - name: re(details-v1.*) - type: http - isReal: true - - id: not null - name: re(istio-ingressgateway.*) + name: istio-ingressgateway type: http isReal: true - id: not null @@ -46,48 +38,30 @@ nodes: isReal: true calls: - id: not null - source: re(VU5LTk9XTg.*) - target: re(aXN0aW8taW5ncmVzc2dhdGV3YXkt.*) - detectPoints: - - SERVER - - id: not null - source: re(cHJvZHVjdHBhZ2UtdjEt.*) - target: re(cmV2aWV3cy12MS0.*) + source: VU5LTk9XTg==.1 + target: aXN0aW8taW5ncmVzc2dhdGV3YXk=.1 detectPoints: - - CLIENT - SERVER - id: not null - source: re(cHJvZHVjdHBhZ2UtdjEt.*) - target: re(ZGV0YWlscy12MS.*) + source: aXN0aW8taW5ncmVzc2dhdGV3YXk=.1 + target: cHJvZHVjdHBhZ2U=.1 detectPoints: - CLIENT - - SERVER - id: not null - source: re(cHJvZHVjdHBhZ2UtdjEt.*) - target: re(cmV2aWV3cy12Mi.*) + source: cHJvZHVjdHBhZ2U=.1 + target: cmV2aWV3cw==.1 detectPoints: - CLIENT - SERVER - id: not null - source: re(cmV2aWV3cy12Mi.*) - target: re(cmF0aW5ncy12MS.*) + source: cHJvZHVjdHBhZ2U=.1 + target: ZGV0YWlscw==.1 detectPoints: - CLIENT - SERVER - id: not null - source: re(cmV2aWV3cy12My.*) - target: re(cmF0aW5ncy12MS.*) - detectPoints: - - CLIENT - - SERVER - - id: not null - source: re(aXN0aW8taW5ncmVzc2dhdGV3YXkt.*) - target: re(cHJvZHVjdHBhZ2UtdjEt.*) - detectPoints: - - CLIENT - - id: not null - source: re(cHJvZHVjdHBhZ2UtdjEt.*) - target: re(cmV2aWV3cy12My.*) + source: cmV2aWV3cw==.1 + target: cmF0aW5ncw==.1 detectPoints: - CLIENT - SERVER diff --git a/tools/dependencies/known-oap-backend-dependencies-es7.txt b/tools/dependencies/known-oap-backend-dependencies-es7.txt index 1a7b918d9d..e987be5083 100755 --- a/tools/dependencies/known-oap-backend-dependencies-es7.txt +++ b/tools/dependencies/known-oap-backend-dependencies-es7.txt @@ -169,3 +169,4 @@ lz4-java-1.6.0.jar snappy-java-1.1.7.3.jar zstd-jni-1.4.3-1.jar mvel2-2.4.8.Final.jar +commons-beanutils-1.9.4.jar diff --git a/tools/dependencies/known-oap-backend-dependencies.txt b/tools/dependencies/known-oap-backend-dependencies.txt index 6b0470f9b9..9c37c31138 100755 --- a/tools/dependencies/known-oap-backend-dependencies.txt +++ b/tools/dependencies/known-oap-backend-dependencies.txt @@ -168,3 +168,4 @@ lz4-java-1.6.0.jar snappy-java-1.1.7.3.jar zstd-jni-1.4.3-1.jar mvel2-2.4.8.Final.jar +commons-beanutils-1.9.4.jar -- GitLab