From f06c220e50d6d08f00153923f657688d84159aa0 Mon Sep 17 00:00:00 2001 From: mrproliu <741550557@qq.com> Date: Thu, 3 Mar 2022 20:16:02 +0800 Subject: [PATCH] Introduce the entity of Process Type (#8617) --- CHANGES.md | 1 + apm-protocol/apm-network/src/main/proto | 2 +- docs/en/concepts-and-designs/overview.md | 4 +- .../exporter/provider/MetricFormatter.java | 4 + .../core/alarm/provider/NotifyHandler.java | 15 +- .../alarm/provider/NotifyHandlerTest.java | 35 +++ .../server/core/alarm/ProcessMetaInAlarm.java | 53 +++++ .../oap/server/core/analysis/IDManager.java | 18 ++ .../manual/process/ProcessDetectType.java | 71 ++++++ .../manual/process/ProcessDispatcher.java | 44 ++++ .../manual/process/ProcessTraffic.java | 223 ++++++++++++++++++ .../metrics/MetricsEntityMetaInfo.java | 80 +++++++ .../analysis/metrics/MetricsMetaInfo.java | 13 +- .../core/source/DefaultScopeDefine.java | 32 ++- .../oap/server/core/source/Process.java | 82 +++++++ oap-server/server-receiver-plugin/pom.xml | 1 + .../skywalking-ebpf-receiver-plugin/pom.xml | 38 +++ .../ebpf/module/EBPFReceiverModule.java | 35 +++ .../ebpf/provider/EBPFReceiverProvider.java | 73 ++++++ .../handler/EBPFProcessServiceHandler.java | 166 +++++++++++++ ...ing.oap.server.library.module.ModuleDefine | 19 ++ ...g.oap.server.library.module.ModuleProvider | 19 ++ oap-server/server-starter/pom.xml | 5 + .../src/main/resources/application.yml | 4 + 24 files changed, 1027 insertions(+), 10 deletions(-) create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/ProcessMetaInAlarm.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessDetectType.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessDispatcher.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessTraffic.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/MetricsEntityMetaInfo.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Process.java create mode 100644 oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/pom.xml create mode 100644 oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/module/EBPFReceiverModule.java create mode 100644 oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/EBPFReceiverProvider.java create mode 100644 oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/EBPFProcessServiceHandler.java create mode 100644 oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine create mode 100644 oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider diff --git a/CHANGES.md b/CHANGES.md index 212d0d99e1..f96cbe4a69 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -78,6 +78,7 @@ Release Notes. * Fix Zipkin receiver wrong condition for decoding `gzip`. * Add a new sampler (`possibility`) in LAL. * Unify module name `receiver_zipkin` to `receiver-zipkin`, remove `receiver_jaeger` from `application.yaml`. +* Introduce the entity of Process type. #### UI diff --git a/apm-protocol/apm-network/src/main/proto b/apm-protocol/apm-network/src/main/proto index 667134d08f..b7548df896 160000 --- a/apm-protocol/apm-network/src/main/proto +++ b/apm-protocol/apm-network/src/main/proto @@ -1 +1 @@ -Subproject commit 667134d08ffd63122062971d320c34e7e10a7c9e +Subproject commit b7548df896330bd4cd0ae42d95df9b9cc511f139 diff --git a/docs/en/concepts-and-designs/overview.md b/docs/en/concepts-and-designs/overview.md index 42808f2e05..df413375c4 100644 --- a/docs/en/concepts-and-designs/overview.md +++ b/docs/en/concepts-and-designs/overview.md @@ -11,7 +11,7 @@ In multi-language, continuously deployed environments, cloud native infrastructu SkyWalking's service mesh receiver allows SkyWalking to receive telemetry data from service mesh frameworks such as Istio/Envoy and Linkerd, allowing users to understand the entire distributed system. -SkyWalking provides observability capabilities for **service**(s), **service instance**(s), **endpoint**(s). The terms Service, +SkyWalking provides observability capabilities for **service**(s), **service instance**(s), **endpoint**(s), **process**(s). The terms Service, Instance and Endpoint are used everywhere today, so it is worth defining their specific meanings in the context of SkyWalking: - **Service**. Represents a set/group of workloads which provide the same behaviours for incoming requests. You can define the service @@ -19,6 +19,8 @@ Instance and Endpoint are used everywhere today, so it is worth defining their s - **Service Instance**. Each individual workload in the Service group is known as an instance. Like `pods` in Kubernetes, it doesn't need to be a single OS process, however, if you are using instrument agents, an instance is actually a real OS process. - **Endpoint**. A path in a service for incoming requests, such as an HTTP URI path or a gRPC service class + method signature. +- **Process**. An operating system process. In some scenarios, a Service Instance is + not a process, such as a pod Kubernetes could contain multiple processes. SkyWalking allows users to understand the topology relationship between Services and Endpoints, to view the metrics of every Service/Service Instance/Endpoint and to set alarm rules. diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/MetricFormatter.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/MetricFormatter.java index bd281d9a5a..2319a8dbc0 100644 --- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/MetricFormatter.java +++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/MetricFormatter.java @@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.exporter.provider; import lombok.Setter; import org.apache.skywalking.oap.server.core.analysis.IDManager; +import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsEntityMetaInfo; import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; @@ -75,6 +76,9 @@ public class MetricFormatter { endpointRelationDefine.getDestServiceId()); return endpointRelationDefine.getSource() + " in " + sourceService.getName() + " to " + endpointRelationDefine.getDest() + " in " + destService.getName(); + } else if (DefaultScopeDefine.inProcessCatalog(scope)) { + final MetricsEntityMetaInfo entity = meta.getEntity(); + return entity.getProcessName() + " in " + entity.getInstanceName() + " of " + entity.getServiceName(); } else if (scope == DefaultScopeDefine.ALL) { return ""; } else { diff --git a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/NotifyHandler.java b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/NotifyHandler.java index 932a192955..38d6f7624b 100644 --- a/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/NotifyHandler.java +++ b/oap-server/server-alarm-plugin/src/main/java/org/apache/skywalking/oap/server/core/alarm/provider/NotifyHandler.java @@ -24,6 +24,7 @@ import java.util.List; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.core.alarm.AlarmCallback; import org.apache.skywalking.oap.server.core.alarm.EndpointRelationMetaInAlarm; +import org.apache.skywalking.oap.server.core.alarm.ProcessMetaInAlarm; import org.apache.skywalking.oap.server.core.alarm.ServiceInstanceRelationMetaInAlarm; import org.apache.skywalking.oap.server.core.alarm.ServiceRelationMetaInAlarm; import org.apache.skywalking.oap.server.core.alarm.EndpointMetaInAlarm; @@ -39,6 +40,7 @@ import org.apache.skywalking.oap.server.core.alarm.provider.wechat.WechatHookCal import org.apache.skywalking.oap.server.core.alarm.provider.welink.WeLinkHookCallback; import org.apache.skywalking.oap.server.core.analysis.IDManager; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; +import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsEntityMetaInfo; import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo; import org.apache.skywalking.oap.server.core.analysis.metrics.WithMetadata; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; @@ -64,7 +66,8 @@ public class NotifyHandler implements MetricsNotify { if (!DefaultScopeDefine.inServiceCatalog(scope) && !DefaultScopeDefine.inServiceInstanceCatalog(scope) && !DefaultScopeDefine.inEndpointCatalog(scope) && !DefaultScopeDefine.inServiceRelationCatalog(scope) - && !DefaultScopeDefine.inServiceInstanceRelationCatalog(scope) && !DefaultScopeDefine.inEndpointRelationCatalog(scope)) { + && !DefaultScopeDefine.inServiceInstanceRelationCatalog(scope) && !DefaultScopeDefine.inEndpointRelationCatalog(scope) + && !DefaultScopeDefine.inProcessCatalog(scope)) { return; } @@ -150,6 +153,16 @@ public class NotifyHandler implements MetricsNotify { endpointRelationMetaInAlarm.setName(endpointRelationDefine.getSource() + " in " + sourceService.getName() + " to " + endpointRelationDefine.getDest() + " in " + destService.getName()); metaInAlarm = endpointRelationMetaInAlarm; + } else if (DefaultScopeDefine.inProcessCatalog(scope)) { + final String processId = meta.getId(); + final MetricsEntityMetaInfo entity = meta.getEntity(); + + ProcessMetaInAlarm processMetaInAlarm = new ProcessMetaInAlarm(); + processMetaInAlarm.setMetricsName(meta.getMetricsName()); + processMetaInAlarm.setId(processId); + processMetaInAlarm.setName(entity.getProcessName() + " in " + entity.getInstanceName() + + " of " + entity.getServiceName()); + metaInAlarm = processMetaInAlarm; } else { return; } diff --git a/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/NotifyHandlerTest.java b/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/NotifyHandlerTest.java index 81ef533c64..e28fc21a76 100644 --- a/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/NotifyHandlerTest.java +++ b/oap-server/server-alarm-plugin/src/test/java/org/apache/skywalking/oap/server/core/alarm/provider/NotifyHandlerTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.Lists; import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.alarm.AlarmMessage; import org.apache.skywalking.oap.server.core.alarm.EndpointRelationMetaInAlarm; +import org.apache.skywalking.oap.server.core.alarm.ProcessMetaInAlarm; import org.apache.skywalking.oap.server.core.alarm.ServiceInstanceRelationMetaInAlarm; import org.apache.skywalking.oap.server.core.alarm.ServiceRelationMetaInAlarm; import org.apache.skywalking.oap.server.core.alarm.EndpointMetaInAlarm; @@ -31,6 +32,7 @@ import org.apache.skywalking.oap.server.core.alarm.ServiceMetaInAlarm; import org.apache.skywalking.oap.server.core.analysis.IDManager; import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; +import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsEntityMetaInfo; import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo; import org.apache.skywalking.oap.server.core.analysis.metrics.WithMetadata; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; @@ -252,6 +254,39 @@ public class NotifyHandlerTest { assertEquals(DefaultScopeDefine.ENDPOINT_RELATION, metaInAlarm.getScopeId()); } + @Test + public void testNotifyWithProcessCatalog() { + prepareNotify(); + + String metricsName = "process-metrics"; + String serviceName = "test-service"; + String instanceName = "test-instance"; + String processName = "test-process"; + when(metadata.getMetricsName()).thenReturn(metricsName); + when(DefaultScopeDefine.inProcessCatalog(0)).thenReturn(true); + final String processId = IDManager.ProcessID.buildId( + IDManager.ServiceInstanceID.buildId(IDManager.ServiceID.buildId(serviceName, true), instanceName), + processName + ); + when(metadata.getId()).thenReturn(processId); + when(metadata.getEntity()).thenReturn(MetricsEntityMetaInfo.buildProcess(serviceName, instanceName, processName)); + + ArgumentCaptor metaCaptor = ArgumentCaptor.forClass(MetaInAlarm.class); + + notifyHandler.notify(metrics); + verify(rule).in(metaCaptor.capture(), any()); + + MetaInAlarm metaInAlarm = metaCaptor.getValue(); + + assertTrue(metaInAlarm instanceof ProcessMetaInAlarm); + assertEquals(metricsName, metaInAlarm.getMetricsName()); + assertEquals("cf4be92893026c77c539266f47f2aa22f1e53ff64fc64803f5814293ff10a56f", metaInAlarm.getId0()); + assertEquals("", metaInAlarm.getId1()); + assertEquals(DefaultScopeDefine.PROCESS_CATALOG_NAME, metaInAlarm.getScope()); + assertEquals("test-process in test-instance of test-service", metaInAlarm.getName()); + assertEquals(DefaultScopeDefine.PROCESS, metaInAlarm.getScopeId()); + } + private void prepareNotify() { metadata = mock(MetricsMetaInfo.class); when(metadata.getScope()).thenReturn(DefaultScopeDefine.ALL); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/ProcessMetaInAlarm.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/ProcessMetaInAlarm.java new file mode 100644 index 0000000000..435a6eeb84 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/ProcessMetaInAlarm.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.alarm; + +import lombok.Getter; +import lombok.Setter; +import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; + +@Setter +@Getter +public class ProcessMetaInAlarm extends MetaInAlarm { + private String metricsName; + + private String id; + private String name; + + @Override + public String getScope() { + return DefaultScopeDefine.PROCESS_CATALOG_NAME; + } + + @Override + public int getScopeId() { + return DefaultScopeDefine.PROCESS; + } + + @Override + public String getId0() { + return id; + } + + @Override + public String getId1() { + return Const.EMPTY_STRING; + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/IDManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/IDManager.java index f7c596afda..dc09772c56 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/IDManager.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/IDManager.java @@ -20,6 +20,9 @@ package org.apache.skywalking.oap.server.core.analysis; import java.nio.charset.StandardCharsets; import java.util.Base64; + +import com.google.common.base.Charsets; +import com.google.common.hash.Hashing; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.RequiredArgsConstructor; @@ -252,6 +255,21 @@ public class IDManager { } } + /** + * Process ID related functions. + */ + public static class ProcessID { + /** + * @param instanceId built by {@link ServiceInstanceID#buildId(String, String)} + * @param name process name + * @return process id + */ + public static String buildId(String instanceId, String name) { + return Hashing.sha256().newHasher().putString(String.format("%s_%s", + name, instanceId), Charsets.UTF_8).hash().toString(); + } + } + /** * Network Address Alias ID related functions. */ diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessDetectType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessDetectType.java new file mode 100644 index 0000000000..70b88270b2 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessDetectType.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.manual.process; + +import org.apache.skywalking.oap.server.core.UnexpectedException; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Process Detect Type is used to describe how the process was found + */ +public enum ProcessDetectType { + + /** + * Not set + */ + UNDEFINED(0), + + /** + * Detect by VM process + */ + VM(1), + + /** + * Detect by kubernetes platform + */ + KUBERNETES(2) + ; + + private final int value; + private static final Map DICTIONARY = new HashMap<>(); + + static { + Arrays.stream(ProcessDetectType.values()).collect(Collectors.toMap(ProcessDetectType::value, type -> type)).forEach(DICTIONARY::put); + } + + ProcessDetectType(int value) { + this.value = value; + } + + public int value() { + return value; + } + + public static ProcessDetectType valueOf(int value) { + ProcessDetectType type = DICTIONARY.get(value); + if (type == null) { + throw new UnexpectedException("Unknown ProcessDetectType value"); + } + return type; + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessDispatcher.java new file mode 100644 index 0000000000..043338fa3b --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessDispatcher.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.manual.process; + +import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher; +import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor; +import org.apache.skywalking.oap.server.core.source.Process; + +public class ProcessDispatcher implements SourceDispatcher { + @Override + public void dispatch(Process source) { + final ProcessTraffic traffic = new ProcessTraffic(); + traffic.setServiceId(source.getServiceId()); + traffic.setInstanceId(source.getInstanceId()); + traffic.setName(source.getName()); + traffic.setLayer(source.getLayer().value()); + + traffic.setAgentId(source.getAgentId()); + traffic.setProperties(source.getProperties()); + if (source.getDetectType() != null) { + traffic.setDetectType(source.getDetectType().value()); + } + + traffic.setTimeBucket(source.getTimeBucket()); + traffic.setLastPingTimestamp(source.getTimeBucket()); + MetricsStreamProcessor.getInstance().in(traffic); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessTraffic.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessTraffic.java new file mode 100644 index 0000000000..5e5ca29082 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessTraffic.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.manual.process; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.core.analysis.IDManager; +import org.apache.skywalking.oap.server.core.analysis.Layer; +import org.apache.skywalking.oap.server.core.analysis.MetricsExtension; +import org.apache.skywalking.oap.server.core.analysis.Stream; +import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; +import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor; +import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; +import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder; +import org.apache.skywalking.oap.server.core.storage.annotation.Column; +import org.apache.skywalking.oap.server.library.util.StringUtil; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROCESS; + +@Stream(name = ProcessTraffic.INDEX_NAME, scopeId = PROCESS, + builder = ProcessTraffic.Builder.class, processor = MetricsStreamProcessor.class) +@MetricsExtension(supportDownSampling = false, supportUpdate = true) +@EqualsAndHashCode(of = { + "instanceId", + "name", +}) +public class ProcessTraffic extends Metrics { + public static final String INDEX_NAME = "process_traffic"; + public static final String SERVICE_ID = "service_id"; + public static final String INSTANCE_ID = "instance_id"; + public static final String NAME = "name"; + public static final String LAYER = "layer"; + public static final String AGENT_ID = "agent_id"; + public static final String PROPERTIES = "properties"; + public static final String LAST_PING_TIME_BUCKET = "last_ping"; + public static final String DETECT_TYPE = "detect_type"; + + private static final Gson GSON = new Gson(); + + @Setter + @Getter + @Column(columnName = SERVICE_ID) + private String serviceId; + + @Setter + @Getter + @Column(columnName = INSTANCE_ID, length = 600) + private String instanceId; + + @Setter + @Getter + @Column(columnName = NAME, length = 500) + private String name; + + @Setter + @Getter + @Column(columnName = LAYER) + private int layer = Layer.UNDEFINED.value(); + + @Setter + @Getter + @Column(columnName = LAST_PING_TIME_BUCKET) + private long lastPingTimestamp; + + @Setter + @Getter + @Column(columnName = DETECT_TYPE) + private int detectType = ProcessDetectType.UNDEFINED.value(); + + @Setter + @Getter + @Column(columnName = AGENT_ID, length = 500) + private String agentId; + + @Setter + @Getter + @Column(columnName = PROPERTIES, storageOnly = true, length = 50000) + private JsonObject properties; + + @Override + public boolean combine(Metrics metrics) { + final ProcessTraffic processTraffic = (ProcessTraffic) metrics; + this.lastPingTimestamp = processTraffic.getLastPingTimestamp(); + if (StringUtil.isNotBlank(processTraffic.getAgentId())) { + this.agentId = processTraffic.getAgentId(); + } + if (processTraffic.getProperties() != null && processTraffic.getProperties().size() > 0) { + this.properties = processTraffic.getProperties(); + } + if (processTraffic.getDetectType() > 0) { + this.detectType = processTraffic.getDetectType(); + } + return true; + } + + @Override + public int remoteHashCode() { + return this.hashCode(); + } + + @Override + public void deserialize(RemoteData remoteData) { + setServiceId(remoteData.getDataStrings(0)); + setInstanceId(remoteData.getDataStrings(1)); + setName(remoteData.getDataStrings(2)); + setLayer(remoteData.getDataIntegers(0)); + setAgentId(remoteData.getDataStrings(3)); + final String propString = remoteData.getDataStrings(4); + if (StringUtil.isNotEmpty(propString)) { + setProperties(GSON.fromJson(propString, JsonObject.class)); + } + setLastPingTimestamp(remoteData.getDataLongs(0)); + setDetectType(remoteData.getDataIntegers(1)); + setTimeBucket(remoteData.getDataLongs(1)); + } + + @Override + public RemoteData.Builder serialize() { + final RemoteData.Builder builder = RemoteData.newBuilder(); + builder.addDataStrings(serviceId); + builder.addDataStrings(instanceId); + builder.addDataStrings(name); + builder.addDataIntegers(layer); + builder.addDataStrings(agentId); + if (properties == null) { + builder.addDataStrings(Const.EMPTY_STRING); + } else { + builder.addDataStrings(GSON.toJson(properties)); + } + builder.addDataLongs(lastPingTimestamp); + builder.addDataIntegers(detectType); + builder.addDataLongs(getTimeBucket()); + return builder; + } + + @Override + protected String id0() { + return IDManager.ProcessID.buildId(instanceId, name); + } + + public static class Builder implements StorageHashMapBuilder { + + @Override + public ProcessTraffic storage2Entity(Map dbMap) { + final ProcessTraffic processTraffic = new ProcessTraffic(); + processTraffic.setServiceId((String) dbMap.get(SERVICE_ID)); + processTraffic.setInstanceId((String) dbMap.get(INSTANCE_ID)); + processTraffic.setName((String) dbMap.get(NAME)); + processTraffic.setLayer(((Number) dbMap.get(LAYER)).intValue()); + processTraffic.setAgentId((String) dbMap.get(AGENT_ID)); + final String propString = (String) dbMap.get(PROPERTIES); + if (StringUtil.isNotEmpty(propString)) { + processTraffic.setProperties(GSON.fromJson(propString, JsonObject.class)); + } + processTraffic.setLastPingTimestamp(((Number) dbMap.get(LAST_PING_TIME_BUCKET)).longValue()); + processTraffic.setDetectType(((Number) dbMap.get(DETECT_TYPE)).intValue()); + processTraffic.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue()); + return processTraffic; + } + + @Override + public Map entity2Storage(ProcessTraffic storageData) { + final HashMap map = new HashMap<>(); + map.put(SERVICE_ID, storageData.getServiceId()); + map.put(INSTANCE_ID, storageData.getInstanceId()); + map.put(NAME, storageData.getName()); + map.put(LAYER, storageData.getLayer()); + map.put(AGENT_ID, storageData.getAgentId()); + if (storageData.getProperties() != null) { + map.put(PROPERTIES, GSON.toJson(storageData.getProperties())); + } else { + map.put(PROPERTIES, Const.EMPTY_STRING); + } + map.put(LAST_PING_TIME_BUCKET, storageData.getLastPingTimestamp()); + map.put(DETECT_TYPE, storageData.getDetectType()); + map.put(TIME_BUCKET, storageData.getTimeBucket()); + return map; + } + } + + @Override + public void calculate() { + } + + @Override + public Metrics toHour() { + return null; + } + + @Override + public Metrics toDay() { + return null; + } + + public static class PropertyUtil { + public static final String HOST_IP = "host_ip"; + public static final String PID = "pid"; + public static final String COMMAND_LINE = "command_line"; + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/MetricsEntityMetaInfo.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/MetricsEntityMetaInfo.java new file mode 100644 index 0000000000..e924a6e7ec --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/MetricsEntityMetaInfo.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.metrics; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +@Setter +@Getter +@AllArgsConstructor +@ToString +public class MetricsEntityMetaInfo { + + /** + * The name of {@link org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic} + */ + private String serviceName; + + /** + * The name of {@link org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic} + */ + private String instanceName; + + /** + * The name of {@link org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic} + */ + private String endpointName; + + /** + * The name of {@link org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessTraffic} + */ + private String processName; + + /** + * Build Service Entity + */ + public static MetricsEntityMetaInfo buildService(String serviceName) { + return new MetricsEntityMetaInfo(serviceName, "", "", ""); + } + + /** + * Build Service Instance Entity + */ + public static MetricsEntityMetaInfo buildServiceInstance(String serviceName, String instanceName) { + return new MetricsEntityMetaInfo(serviceName, instanceName, "", ""); + } + + /** + * Build Endpoint Entity + */ + public static MetricsEntityMetaInfo buildEndpoint(String serviceName, String endpointName) { + return new MetricsEntityMetaInfo(serviceName, "", endpointName, ""); + } + + /** + * Build Process Entity + */ + public static MetricsEntityMetaInfo buildProcess(String serviceName, String instanceName, String processName) { + return new MetricsEntityMetaInfo(serviceName, instanceName, "", processName); + } + +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/MetricsMetaInfo.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/MetricsMetaInfo.java index 6540204baa..c655366890 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/MetricsMetaInfo.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/MetricsMetaInfo.java @@ -32,6 +32,9 @@ public class MetricsMetaInfo { @Setter @Getter private String id; + @Setter + @Getter + private MetricsEntityMetaInfo entity; public MetricsMetaInfo(String metricsName, int scope) { this.metricsName = metricsName; @@ -45,8 +48,16 @@ public class MetricsMetaInfo { this.id = id; } + public MetricsMetaInfo(String metricsName, int scope, String id, MetricsEntityMetaInfo entity) { + this.metricsName = metricsName; + this.scope = scope; + this.id = id; + this.entity = entity; + } + @Override public String toString() { - return "MetricsMetaInfo{" + "metricsName='" + metricsName + '\'' + ", scope=" + scope + ", id='" + id + '\'' + '}'; + return "MetricsMetaInfo{" + "metricsName='" + metricsName + '\'' + ", scope=" + scope + ", id='" + id + '\'' + + ", entity=" + entity + '}'; } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java index 443bbd9d86..59a07295ba 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java @@ -92,6 +92,8 @@ public class DefaultScopeDefine { public static final int SERVICE_INSTANCE_JVM_CLASS = 44; + public static final int PROCESS = 45; + /** * Catalog of scope, the metrics processor could use this to group all generated metrics by oal rt. */ @@ -101,6 +103,7 @@ public class DefaultScopeDefine { public static final String SERVICE_RELATION_CATALOG_NAME = "SERVICE_RELATION"; public static final String SERVICE_INSTANCE_RELATION_CATALOG_NAME = "SERVICE_INSTANCE_RELATION"; public static final String ENDPOINT_RELATION_CATALOG_NAME = "ENDPOINT_RELATION"; + public static final String PROCESS_CATALOG_NAME = "PROCESS"; private static final Map SERVICE_CATALOG = new HashMap<>(); private static final Map SERVICE_INSTANCE_CATALOG = new HashMap<>(); @@ -108,6 +111,7 @@ public class DefaultScopeDefine { private static final Map SERVICE_RELATION_CATALOG = new HashMap<>(); private static final Map SERVICE_INSTANCE_RELATION_CATALOG = new HashMap<>(); private static final Map ENDPOINT_RELATION_CATALOG = new HashMap<>(); + private static final Map PROCESS_CATALOG = new HashMap<>(); @Setter private static boolean ACTIVE_EXTRA_MODEL_COLUMNS = false; @@ -209,6 +213,9 @@ public class DefaultScopeDefine { case ENDPOINT_RELATION_CATALOG_NAME: ENDPOINT_RELATION_CATALOG.put(id, Boolean.TRUE); break; + case PROCESS_CATALOG_NAME: + PROCESS_CATALOG.put(id, Boolean.TRUE); + break; } } @@ -250,7 +257,7 @@ public class DefaultScopeDefine { } /** - * Check whether current service belongs service catalog + * Check whether the given scope ID belongs service catalog * * @param scopeId represents an existing scope id. * @return true is current scope set {@link ScopeDeclaration#catalog()} == {@link #SERVICE_CATALOG_NAME} @@ -260,7 +267,7 @@ public class DefaultScopeDefine { } /** - * Check whether current service belongs service instance catalog + * Check whether the given scope ID belongs service instance catalog * * @param scopeId represents an existing scope id. * @return true is current scope set {@link ScopeDeclaration#catalog()} == {@link #SERVICE_INSTANCE_CATALOG_NAME} @@ -270,7 +277,7 @@ public class DefaultScopeDefine { } /** - * Check whether current service belongs endpoint catalog + * Check whether the given scope ID belongs endpoint catalog * * @param scopeId represents an existing scope id. * @return true is current scope set {@link ScopeDeclaration#catalog()} == {@link #ENDPOINT_CATALOG_NAME} @@ -280,7 +287,7 @@ public class DefaultScopeDefine { } /** - * Check whether current service belongs service relation catalog + * Check whether the given scope ID belongs service relation catalog * * @param scopeId represents an existing scope id. * @return true is current scope set {@link ScopeDeclaration#catalog()} == {@link #SERVICE_RELATION_CATALOG_NAME} @@ -290,7 +297,7 @@ public class DefaultScopeDefine { } /** - * Check whether current service belongs service instance relation catalog + * Check whether the given scope ID belongs service instance relation catalog * * @param scopeId represents an existing scope id. * @return true is current scope set {@link ScopeDeclaration#catalog()} == {@link #SERVICE_INSTANCE_RELATION_CATALOG_NAME} @@ -300,7 +307,7 @@ public class DefaultScopeDefine { } /** - * Check whether current service belongs endpoint relation catalog + * Check whether the given scope ID belongs endpoint relation catalog * * @param scopeId represents an existing scope id. * @return true is current scope set {@link ScopeDeclaration#catalog()} == {@link #ENDPOINT_RELATION_CATALOG_NAME} @@ -309,6 +316,16 @@ public class DefaultScopeDefine { return ENDPOINT_RELATION_CATALOG.containsKey(scopeId); } + /** + * Check whether the given scope ID belongs process catalog + * + * @param scopeId represents an existing scope id. + * @return true is current scope set {@link ScopeDeclaration#catalog()} == {@link #PROCESS_CATALOG_NAME} + */ + public static boolean inProcessCatalog(int scopeId) { + return PROCESS_CATALOG.containsKey(scopeId); + } + /** * Get the catalog string name of the given scope * @@ -334,6 +351,9 @@ public class DefaultScopeDefine { if (inEndpointRelationCatalog(scope)) { return ENDPOINT_RELATION_CATALOG_NAME; } + if (inProcessCatalog(scope)) { + return PROCESS_CATALOG_NAME; + } return "ALL"; } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Process.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Process.java new file mode 100644 index 0000000000..e0513fdc09 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Process.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.source; + +import com.google.gson.JsonObject; +import lombok.Getter; +import lombok.Setter; +import org.apache.skywalking.oap.server.core.analysis.IDManager; +import org.apache.skywalking.oap.server.core.analysis.Layer; +import org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessDetectType; + +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROCESS; + +@ScopeDeclaration(id = PROCESS, name = "Process") +@ScopeDefaultColumn.VirtualColumnDefinition(fieldName = "entityId", columnName = "entity_id", isID = true, type = String.class) +public class Process extends Source { + private volatile String entityId; + + @Override + public int scope() { + return PROCESS; + } + + @Override + public String getEntityId() { + if (entityId == null) { + entityId = IDManager.ProcessID.buildId(instanceId, name); + } + return entityId; + } + + @Getter + private String instanceId; + @Getter + private String serviceId; + @Getter + @Setter + private String name; + @Getter + @Setter + private String serviceName; + @Getter + @Setter + private String instanceName; + @Getter + @Setter + private Layer layer; + @Getter + @Setter + private boolean isServiceNormal; + @Getter + @Setter + private String agentId; + @Getter + @Setter + private ProcessDetectType detectType; + @Getter + @Setter + private JsonObject properties; + + @Override + public void prepare() { + serviceId = IDManager.ServiceID.buildId(serviceName, isServiceNormal); + instanceId = IDManager.ServiceInstanceID.buildId(serviceId, instanceName); + } +} diff --git a/oap-server/server-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/pom.xml index e31b61023f..4e36b47ea1 100644 --- a/oap-server/server-receiver-plugin/pom.xml +++ b/oap-server/server-receiver-plugin/pom.xml @@ -45,6 +45,7 @@ configuration-discovery-receiver-plugin skywalking-event-receiver-plugin skywalking-zabbix-receiver-plugin + skywalking-ebpf-receiver-plugin diff --git a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/pom.xml new file mode 100644 index 0000000000..d7497e158b --- /dev/null +++ b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/pom.xml @@ -0,0 +1,38 @@ + + + + + + server-receiver-plugin + org.apache.skywalking + 9.0.0-SNAPSHOT + + 4.0.0 + + skywalking-ebpf-receiver-plugin + + + + org.apache.skywalking + skywalking-sharing-server-plugin + ${project.version} + + + + \ No newline at end of file diff --git a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/module/EBPFReceiverModule.java b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/module/EBPFReceiverModule.java new file mode 100644 index 0000000000..8ecd65c8f8 --- /dev/null +++ b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/module/EBPFReceiverModule.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.ebpf.module; + +import org.apache.skywalking.oap.server.library.module.ModuleDefine; + +public class EBPFReceiverModule extends ModuleDefine { + + public static final String NAME = "receiver-ebpf"; + + public EBPFReceiverModule() { + super(NAME); + } + + @Override + public Class[] services() { + return new Class[0]; + } +} diff --git a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/EBPFReceiverProvider.java b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/EBPFReceiverProvider.java new file mode 100644 index 0000000000..7a4c791a89 --- /dev/null +++ b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/EBPFReceiverProvider.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.ebpf.provider; + +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister; +import org.apache.skywalking.oap.server.library.module.ModuleConfig; +import org.apache.skywalking.oap.server.library.module.ModuleDefine; +import org.apache.skywalking.oap.server.library.module.ModuleProvider; +import org.apache.skywalking.oap.server.library.module.ModuleStartException; +import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; +import org.apache.skywalking.oap.server.receiver.ebpf.module.EBPFReceiverModule; +import org.apache.skywalking.oap.server.receiver.ebpf.provider.handler.EBPFProcessServiceHandler; +import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule; + +public class EBPFReceiverProvider extends ModuleProvider { + + @Override + public String name() { + return "default"; + } + + @Override + public Class module() { + return EBPFReceiverModule.class; + } + + @Override + public ModuleConfig createConfigBeanIfAbsent() { + return null; + } + + @Override + public void prepare() throws ServiceNotProvidedException, ModuleStartException { + } + + @Override + public void start() throws ServiceNotProvidedException, ModuleStartException { + final GRPCHandlerRegister grpcHandlerRegister = getManager().find(SharingServerModule.NAME) + .provider() + .getService(GRPCHandlerRegister.class); + grpcHandlerRegister.addHandler(new EBPFProcessServiceHandler(getManager())); + } + + @Override + public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException { + + } + + @Override + public String[] requiredModules() { + return new String[] { + CoreModule.NAME, + SharingServerModule.NAME + }; + } +} diff --git a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/EBPFProcessServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/EBPFProcessServiceHandler.java new file mode 100644 index 0000000000..3fb8d9c846 --- /dev/null +++ b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/EBPFProcessServiceHandler.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.ebpf.provider.handler; + +import com.google.gson.JsonObject; +import io.grpc.stub.StreamObserver; +import io.vavr.Tuple; +import io.vavr.Tuple2; +import org.apache.skywalking.apm.network.common.v3.Commands; +import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFHostProcessDownstream; +import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFHostProcessMetadata; +import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFProcessDownstream; +import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFProcessEntityMetadata; +import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFProcessPingPkgList; +import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFProcessProperties; +import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFProcessReportList; +import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFProcessServiceGrpc; +import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFReportProcessDownstream; +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.analysis.DownSampling; +import org.apache.skywalking.oap.server.core.analysis.IDManager; +import org.apache.skywalking.oap.server.core.analysis.Layer; +import org.apache.skywalking.oap.server.core.analysis.TimeBucket; +import org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessDetectType; +import org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessTraffic; +import org.apache.skywalking.oap.server.core.config.NamingControl; +import org.apache.skywalking.oap.server.core.source.Process; +import org.apache.skywalking.oap.server.core.source.ServiceInstanceUpdate; +import org.apache.skywalking.oap.server.core.source.ServiceMeta; +import org.apache.skywalking.oap.server.core.source.SourceReceiver; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler; + +import java.util.ArrayList; + +public class EBPFProcessServiceHandler extends EBPFProcessServiceGrpc.EBPFProcessServiceImplBase implements GRPCHandler { + + private final SourceReceiver sourceReceiver; + private final NamingControl namingControl; + + public EBPFProcessServiceHandler(ModuleManager moduleManager) { + this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class); + this.namingControl = moduleManager.find(CoreModule.NAME) + .provider() + .getService(NamingControl.class); + } + + @Override + public void reportProcesses(EBPFProcessReportList request, StreamObserver responseObserver) { + final String agentId = request.getEbpfAgentID(); + + // build per process data + final ArrayList> processes = new ArrayList<>(); + for (EBPFProcessProperties ebpfProcessProperties : request.getProcessesList()) { + Tuple2 processData = null; + if (ebpfProcessProperties.hasHostProcess()) { + processData = prepareReportHostProcess(ebpfProcessProperties.getHostProcess(), agentId); + } + + if (processData != null) { + processes.add(processData); + } + } + + // report process and downstream the process id data + final EBPFReportProcessDownstream.Builder builder = EBPFReportProcessDownstream.newBuilder(); + processes.stream().forEach(e -> { + sourceReceiver.receive(e._1); + builder.addProcesses(e._2); + }); + + responseObserver.onNext(builder.build()); + responseObserver.onCompleted(); + } + + @Override + public void keepAlive(EBPFProcessPingPkgList request, StreamObserver responseObserver) { + final long timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(), DownSampling.Minute); + + request.getProcessesList().forEach(p -> { + final EBPFProcessEntityMetadata entity = p.getEntityMetadata(); + final String serviceName = namingControl.formatServiceName(entity.getServiceName()); + final String instanceName = namingControl.formatInstanceName(entity.getInstanceName()); + final Layer layer = Layer.valueOf(entity.getLayer()); + + // process + final Process processUpdate = new Process(); + processUpdate.setServiceName(serviceName); + processUpdate.setInstanceName(instanceName); + processUpdate.setLayer(layer); + processUpdate.setServiceNormal(true); + processUpdate.setName(entity.getProcessName()); + processUpdate.setTimeBucket(timeBucket); + sourceReceiver.receive(processUpdate); + + // instance + final ServiceInstanceUpdate serviceInstanceUpdate = new ServiceInstanceUpdate(); + serviceInstanceUpdate.setServiceId(IDManager.ServiceID.buildId(serviceName, true)); + serviceInstanceUpdate.setName(instanceName); + serviceInstanceUpdate.setTimeBucket(timeBucket); + serviceInstanceUpdate.setLayer(layer); + sourceReceiver.receive(serviceInstanceUpdate); + + // service + final ServiceMeta serviceMeta = new ServiceMeta(); + serviceMeta.setName(serviceName); + serviceMeta.setNormal(true); + serviceMeta.setTimeBucket(timeBucket); + serviceMeta.setLayer(layer); + sourceReceiver.receive(serviceMeta); + }); + + responseObserver.onNext(Commands.newBuilder().build()); + responseObserver.onCompleted(); + } + + private Tuple2 prepareReportHostProcess(EBPFHostProcessMetadata hostProcess, String agentId) { + final Process process = new Process(); + + // entity + process.setServiceName(namingControl.formatServiceName(hostProcess.getEntity().getServiceName())); + process.setServiceNormal(true); + process.setLayer(Layer.valueOf(hostProcess.getEntity().getLayer())); + process.setInstanceName(namingControl.formatInstanceName(hostProcess.getEntity().getInstanceName())); + process.setName(hostProcess.getEntity().getProcessName()); + + // metadata + process.setDetectType(ProcessDetectType.VM); + process.setAgentId(agentId); + final JsonObject properties = new JsonObject(); + properties.addProperty(ProcessTraffic.PropertyUtil.HOST_IP, hostProcess.getHostIP()); + properties.addProperty(ProcessTraffic.PropertyUtil.PID, hostProcess.getPid()); + properties.addProperty(ProcessTraffic.PropertyUtil.COMMAND_LINE, hostProcess.getCmd()); + process.setProperties(properties); + + // timestamp + process.setTimeBucket( + TimeBucket.getTimeBucket(System.currentTimeMillis(), DownSampling.Minute)); + + process.prepare(); + final String processId = process.getEntityId(); + final EBPFProcessDownstream downstream = EBPFProcessDownstream.newBuilder() + .setProcessId(processId) + .setHostProcess(EBPFHostProcessDownstream.newBuilder() + .setPid(hostProcess.getPid()) + .build()) + .build(); + return Tuple.of(process, downstream); + } +} diff --git a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine new file mode 100644 index 0000000000..5e0eb1f079 --- /dev/null +++ b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +org.apache.skywalking.oap.server.receiver.ebpf.module.EBPFReceiverModule \ No newline at end of file diff --git a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider new file mode 100644 index 0000000000..efb6f51d48 --- /dev/null +++ b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +org.apache.skywalking.oap.server.receiver.ebpf.provider.EBPFReceiverProvider \ No newline at end of file diff --git a/oap-server/server-starter/pom.xml b/oap-server/server-starter/pom.xml index 25f248835d..c80a88aa6f 100644 --- a/oap-server/server-starter/pom.xml +++ b/oap-server/server-starter/pom.xml @@ -156,6 +156,11 @@ skywalking-zabbix-receiver-plugin ${project.version} + + org.apache.skywalking + skywalking-ebpf-receiver-plugin + ${project.version} + diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml index b50c829d22..8cd990922b 100755 --- a/oap-server/server-starter/src/main/resources/application.yml +++ b/oap-server/server-starter/src/main/resources/application.yml @@ -506,3 +506,7 @@ configuration-discovery: receiver-event: selector: ${SW_RECEIVER_EVENT:default} default: + +receiver-ebpf: + selector: ${SW_RECEIVER_EBPF:default} + default: \ No newline at end of file -- GitLab