未验证 提交 f06c220e 编写于 作者: 静夜思朝颜's avatar 静夜思朝颜 提交者: GitHub

Introduce the entity of Process Type (#8617)

上级 ff1103ec
...@@ -78,6 +78,7 @@ Release Notes. ...@@ -78,6 +78,7 @@ Release Notes.
* Fix Zipkin receiver wrong condition for decoding `gzip`. * Fix Zipkin receiver wrong condition for decoding `gzip`.
* Add a new sampler (`possibility`) in LAL. * Add a new sampler (`possibility`) in LAL.
* Unify module name `receiver_zipkin` to `receiver-zipkin`, remove `receiver_jaeger` from `application.yaml`. * Unify module name `receiver_zipkin` to `receiver-zipkin`, remove `receiver_jaeger` from `application.yaml`.
* Introduce the entity of Process type.
#### UI #### UI
......
Subproject commit 667134d08ffd63122062971d320c34e7e10a7c9e Subproject commit b7548df896330bd4cd0ae42d95df9b9cc511f139
...@@ -11,7 +11,7 @@ In multi-language, continuously deployed environments, cloud native infrastructu ...@@ -11,7 +11,7 @@ In multi-language, continuously deployed environments, cloud native infrastructu
SkyWalking's service mesh receiver allows SkyWalking to receive telemetry data from service mesh frameworks SkyWalking's service mesh receiver allows SkyWalking to receive telemetry data from service mesh frameworks
such as Istio/Envoy and Linkerd, allowing users to understand the entire distributed system. such as Istio/Envoy and Linkerd, allowing users to understand the entire distributed system.
SkyWalking provides observability capabilities for **service**(s), **service instance**(s), **endpoint**(s). The terms Service, SkyWalking provides observability capabilities for **service**(s), **service instance**(s), **endpoint**(s), **process**(s). The terms Service,
Instance and Endpoint are used everywhere today, so it is worth defining their specific meanings in the context of SkyWalking: Instance and Endpoint are used everywhere today, so it is worth defining their specific meanings in the context of SkyWalking:
- **Service**. Represents a set/group of workloads which provide the same behaviours for incoming requests. You can define the service - **Service**. Represents a set/group of workloads which provide the same behaviours for incoming requests. You can define the service
...@@ -19,6 +19,8 @@ Instance and Endpoint are used everywhere today, so it is worth defining their s ...@@ -19,6 +19,8 @@ Instance and Endpoint are used everywhere today, so it is worth defining their s
- **Service Instance**. Each individual workload in the Service group is known as an instance. Like `pods` in Kubernetes, it - **Service Instance**. Each individual workload in the Service group is known as an instance. Like `pods` in Kubernetes, it
doesn't need to be a single OS process, however, if you are using instrument agents, an instance is actually a real OS process. doesn't need to be a single OS process, however, if you are using instrument agents, an instance is actually a real OS process.
- **Endpoint**. A path in a service for incoming requests, such as an HTTP URI path or a gRPC service class + method signature. - **Endpoint**. A path in a service for incoming requests, such as an HTTP URI path or a gRPC service class + method signature.
- **Process**. An operating system process. In some scenarios, a Service Instance is
not a process, such as a pod Kubernetes could contain multiple processes.
SkyWalking allows users to understand the topology relationship between Services and Endpoints, to view the metrics of every SkyWalking allows users to understand the topology relationship between Services and Endpoints, to view the metrics of every
Service/Service Instance/Endpoint and to set alarm rules. Service/Service Instance/Endpoint and to set alarm rules.
......
...@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.exporter.provider; ...@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.exporter.provider;
import lombok.Setter; import lombok.Setter;
import org.apache.skywalking.oap.server.core.analysis.IDManager; import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsEntityMetaInfo;
import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo; import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
...@@ -75,6 +76,9 @@ public class MetricFormatter { ...@@ -75,6 +76,9 @@ public class MetricFormatter {
endpointRelationDefine.getDestServiceId()); endpointRelationDefine.getDestServiceId());
return endpointRelationDefine.getSource() + " in " + sourceService.getName() return endpointRelationDefine.getSource() + " in " + sourceService.getName()
+ " to " + endpointRelationDefine.getDest() + " in " + destService.getName(); + " to " + endpointRelationDefine.getDest() + " in " + destService.getName();
} else if (DefaultScopeDefine.inProcessCatalog(scope)) {
final MetricsEntityMetaInfo entity = meta.getEntity();
return entity.getProcessName() + " in " + entity.getInstanceName() + " of " + entity.getServiceName();
} else if (scope == DefaultScopeDefine.ALL) { } else if (scope == DefaultScopeDefine.ALL) {
return ""; return "";
} else { } else {
......
...@@ -24,6 +24,7 @@ import java.util.List; ...@@ -24,6 +24,7 @@ import java.util.List;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.alarm.AlarmCallback; import org.apache.skywalking.oap.server.core.alarm.AlarmCallback;
import org.apache.skywalking.oap.server.core.alarm.EndpointRelationMetaInAlarm; import org.apache.skywalking.oap.server.core.alarm.EndpointRelationMetaInAlarm;
import org.apache.skywalking.oap.server.core.alarm.ProcessMetaInAlarm;
import org.apache.skywalking.oap.server.core.alarm.ServiceInstanceRelationMetaInAlarm; import org.apache.skywalking.oap.server.core.alarm.ServiceInstanceRelationMetaInAlarm;
import org.apache.skywalking.oap.server.core.alarm.ServiceRelationMetaInAlarm; import org.apache.skywalking.oap.server.core.alarm.ServiceRelationMetaInAlarm;
import org.apache.skywalking.oap.server.core.alarm.EndpointMetaInAlarm; import org.apache.skywalking.oap.server.core.alarm.EndpointMetaInAlarm;
...@@ -39,6 +40,7 @@ import org.apache.skywalking.oap.server.core.alarm.provider.wechat.WechatHookCal ...@@ -39,6 +40,7 @@ import org.apache.skywalking.oap.server.core.alarm.provider.wechat.WechatHookCal
import org.apache.skywalking.oap.server.core.alarm.provider.welink.WeLinkHookCallback; import org.apache.skywalking.oap.server.core.alarm.provider.welink.WeLinkHookCallback;
import org.apache.skywalking.oap.server.core.analysis.IDManager; import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsEntityMetaInfo;
import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo; import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo;
import org.apache.skywalking.oap.server.core.analysis.metrics.WithMetadata; import org.apache.skywalking.oap.server.core.analysis.metrics.WithMetadata;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
...@@ -64,7 +66,8 @@ public class NotifyHandler implements MetricsNotify { ...@@ -64,7 +66,8 @@ public class NotifyHandler implements MetricsNotify {
if (!DefaultScopeDefine.inServiceCatalog(scope) && !DefaultScopeDefine.inServiceInstanceCatalog(scope) if (!DefaultScopeDefine.inServiceCatalog(scope) && !DefaultScopeDefine.inServiceInstanceCatalog(scope)
&& !DefaultScopeDefine.inEndpointCatalog(scope) && !DefaultScopeDefine.inServiceRelationCatalog(scope) && !DefaultScopeDefine.inEndpointCatalog(scope) && !DefaultScopeDefine.inServiceRelationCatalog(scope)
&& !DefaultScopeDefine.inServiceInstanceRelationCatalog(scope) && !DefaultScopeDefine.inEndpointRelationCatalog(scope)) { && !DefaultScopeDefine.inServiceInstanceRelationCatalog(scope) && !DefaultScopeDefine.inEndpointRelationCatalog(scope)
&& !DefaultScopeDefine.inProcessCatalog(scope)) {
return; return;
} }
...@@ -150,6 +153,16 @@ public class NotifyHandler implements MetricsNotify { ...@@ -150,6 +153,16 @@ public class NotifyHandler implements MetricsNotify {
endpointRelationMetaInAlarm.setName(endpointRelationDefine.getSource() + " in " + sourceService.getName() endpointRelationMetaInAlarm.setName(endpointRelationDefine.getSource() + " in " + sourceService.getName()
+ " to " + endpointRelationDefine.getDest() + " in " + destService.getName()); + " to " + endpointRelationDefine.getDest() + " in " + destService.getName());
metaInAlarm = endpointRelationMetaInAlarm; metaInAlarm = endpointRelationMetaInAlarm;
} else if (DefaultScopeDefine.inProcessCatalog(scope)) {
final String processId = meta.getId();
final MetricsEntityMetaInfo entity = meta.getEntity();
ProcessMetaInAlarm processMetaInAlarm = new ProcessMetaInAlarm();
processMetaInAlarm.setMetricsName(meta.getMetricsName());
processMetaInAlarm.setId(processId);
processMetaInAlarm.setName(entity.getProcessName() + " in " + entity.getInstanceName()
+ " of " + entity.getServiceName());
metaInAlarm = processMetaInAlarm;
} else { } else {
return; return;
} }
......
...@@ -22,6 +22,7 @@ import com.google.common.collect.Lists; ...@@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.alarm.AlarmMessage; import org.apache.skywalking.oap.server.core.alarm.AlarmMessage;
import org.apache.skywalking.oap.server.core.alarm.EndpointRelationMetaInAlarm; import org.apache.skywalking.oap.server.core.alarm.EndpointRelationMetaInAlarm;
import org.apache.skywalking.oap.server.core.alarm.ProcessMetaInAlarm;
import org.apache.skywalking.oap.server.core.alarm.ServiceInstanceRelationMetaInAlarm; import org.apache.skywalking.oap.server.core.alarm.ServiceInstanceRelationMetaInAlarm;
import org.apache.skywalking.oap.server.core.alarm.ServiceRelationMetaInAlarm; import org.apache.skywalking.oap.server.core.alarm.ServiceRelationMetaInAlarm;
import org.apache.skywalking.oap.server.core.alarm.EndpointMetaInAlarm; import org.apache.skywalking.oap.server.core.alarm.EndpointMetaInAlarm;
...@@ -31,6 +32,7 @@ import org.apache.skywalking.oap.server.core.alarm.ServiceMetaInAlarm; ...@@ -31,6 +32,7 @@ import org.apache.skywalking.oap.server.core.alarm.ServiceMetaInAlarm;
import org.apache.skywalking.oap.server.core.analysis.IDManager; import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic; import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsEntityMetaInfo;
import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo; import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo;
import org.apache.skywalking.oap.server.core.analysis.metrics.WithMetadata; import org.apache.skywalking.oap.server.core.analysis.metrics.WithMetadata;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
...@@ -252,6 +254,39 @@ public class NotifyHandlerTest { ...@@ -252,6 +254,39 @@ public class NotifyHandlerTest {
assertEquals(DefaultScopeDefine.ENDPOINT_RELATION, metaInAlarm.getScopeId()); assertEquals(DefaultScopeDefine.ENDPOINT_RELATION, metaInAlarm.getScopeId());
} }
@Test
public void testNotifyWithProcessCatalog() {
prepareNotify();
String metricsName = "process-metrics";
String serviceName = "test-service";
String instanceName = "test-instance";
String processName = "test-process";
when(metadata.getMetricsName()).thenReturn(metricsName);
when(DefaultScopeDefine.inProcessCatalog(0)).thenReturn(true);
final String processId = IDManager.ProcessID.buildId(
IDManager.ServiceInstanceID.buildId(IDManager.ServiceID.buildId(serviceName, true), instanceName),
processName
);
when(metadata.getId()).thenReturn(processId);
when(metadata.getEntity()).thenReturn(MetricsEntityMetaInfo.buildProcess(serviceName, instanceName, processName));
ArgumentCaptor<MetaInAlarm> metaCaptor = ArgumentCaptor.forClass(MetaInAlarm.class);
notifyHandler.notify(metrics);
verify(rule).in(metaCaptor.capture(), any());
MetaInAlarm metaInAlarm = metaCaptor.getValue();
assertTrue(metaInAlarm instanceof ProcessMetaInAlarm);
assertEquals(metricsName, metaInAlarm.getMetricsName());
assertEquals("cf4be92893026c77c539266f47f2aa22f1e53ff64fc64803f5814293ff10a56f", metaInAlarm.getId0());
assertEquals("", metaInAlarm.getId1());
assertEquals(DefaultScopeDefine.PROCESS_CATALOG_NAME, metaInAlarm.getScope());
assertEquals("test-process in test-instance of test-service", metaInAlarm.getName());
assertEquals(DefaultScopeDefine.PROCESS, metaInAlarm.getScopeId());
}
private void prepareNotify() { private void prepareNotify() {
metadata = mock(MetricsMetaInfo.class); metadata = mock(MetricsMetaInfo.class);
when(metadata.getScope()).thenReturn(DefaultScopeDefine.ALL); when(metadata.getScope()).thenReturn(DefaultScopeDefine.ALL);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.alarm;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
@Setter
@Getter
public class ProcessMetaInAlarm extends MetaInAlarm {
private String metricsName;
private String id;
private String name;
@Override
public String getScope() {
return DefaultScopeDefine.PROCESS_CATALOG_NAME;
}
@Override
public int getScopeId() {
return DefaultScopeDefine.PROCESS;
}
@Override
public String getId0() {
return id;
}
@Override
public String getId1() {
return Const.EMPTY_STRING;
}
}
...@@ -20,6 +20,9 @@ package org.apache.skywalking.oap.server.core.analysis; ...@@ -20,6 +20,9 @@ package org.apache.skywalking.oap.server.core.analysis;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Base64; import java.util.Base64;
import com.google.common.base.Charsets;
import com.google.common.hash.Hashing;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.Getter; import lombok.Getter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
...@@ -252,6 +255,21 @@ public class IDManager { ...@@ -252,6 +255,21 @@ public class IDManager {
} }
} }
/**
* Process ID related functions.
*/
public static class ProcessID {
/**
* @param instanceId built by {@link ServiceInstanceID#buildId(String, String)}
* @param name process name
* @return process id
*/
public static String buildId(String instanceId, String name) {
return Hashing.sha256().newHasher().putString(String.format("%s_%s",
name, instanceId), Charsets.UTF_8).hash().toString();
}
}
/** /**
* Network Address Alias ID related functions. * Network Address Alias ID related functions.
*/ */
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.manual.process;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Process Detect Type is used to describe how the process was found
*/
public enum ProcessDetectType {
/**
* Not set
*/
UNDEFINED(0),
/**
* Detect by VM process
*/
VM(1),
/**
* Detect by kubernetes platform
*/
KUBERNETES(2)
;
private final int value;
private static final Map<Integer, ProcessDetectType> DICTIONARY = new HashMap<>();
static {
Arrays.stream(ProcessDetectType.values()).collect(Collectors.toMap(ProcessDetectType::value, type -> type)).forEach(DICTIONARY::put);
}
ProcessDetectType(int value) {
this.value = value;
}
public int value() {
return value;
}
public static ProcessDetectType valueOf(int value) {
ProcessDetectType type = DICTIONARY.get(value);
if (type == null) {
throw new UnexpectedException("Unknown ProcessDetectType value");
}
return type;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.manual.process;
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.source.Process;
public class ProcessDispatcher implements SourceDispatcher<Process> {
@Override
public void dispatch(Process source) {
final ProcessTraffic traffic = new ProcessTraffic();
traffic.setServiceId(source.getServiceId());
traffic.setInstanceId(source.getInstanceId());
traffic.setName(source.getName());
traffic.setLayer(source.getLayer().value());
traffic.setAgentId(source.getAgentId());
traffic.setProperties(source.getProperties());
if (source.getDetectType() != null) {
traffic.setDetectType(source.getDetectType().value());
}
traffic.setTimeBucket(source.getTimeBucket());
traffic.setLastPingTimestamp(source.getTimeBucket());
MetricsStreamProcessor.getInstance().in(traffic);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.manual.process;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.Layer;
import org.apache.skywalking.oap.server.core.analysis.MetricsExtension;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import java.util.HashMap;
import java.util.Map;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROCESS;
@Stream(name = ProcessTraffic.INDEX_NAME, scopeId = PROCESS,
builder = ProcessTraffic.Builder.class, processor = MetricsStreamProcessor.class)
@MetricsExtension(supportDownSampling = false, supportUpdate = true)
@EqualsAndHashCode(of = {
"instanceId",
"name",
})
public class ProcessTraffic extends Metrics {
public static final String INDEX_NAME = "process_traffic";
public static final String SERVICE_ID = "service_id";
public static final String INSTANCE_ID = "instance_id";
public static final String NAME = "name";
public static final String LAYER = "layer";
public static final String AGENT_ID = "agent_id";
public static final String PROPERTIES = "properties";
public static final String LAST_PING_TIME_BUCKET = "last_ping";
public static final String DETECT_TYPE = "detect_type";
private static final Gson GSON = new Gson();
@Setter
@Getter
@Column(columnName = SERVICE_ID)
private String serviceId;
@Setter
@Getter
@Column(columnName = INSTANCE_ID, length = 600)
private String instanceId;
@Setter
@Getter
@Column(columnName = NAME, length = 500)
private String name;
@Setter
@Getter
@Column(columnName = LAYER)
private int layer = Layer.UNDEFINED.value();
@Setter
@Getter
@Column(columnName = LAST_PING_TIME_BUCKET)
private long lastPingTimestamp;
@Setter
@Getter
@Column(columnName = DETECT_TYPE)
private int detectType = ProcessDetectType.UNDEFINED.value();
@Setter
@Getter
@Column(columnName = AGENT_ID, length = 500)
private String agentId;
@Setter
@Getter
@Column(columnName = PROPERTIES, storageOnly = true, length = 50000)
private JsonObject properties;
@Override
public boolean combine(Metrics metrics) {
final ProcessTraffic processTraffic = (ProcessTraffic) metrics;
this.lastPingTimestamp = processTraffic.getLastPingTimestamp();
if (StringUtil.isNotBlank(processTraffic.getAgentId())) {
this.agentId = processTraffic.getAgentId();
}
if (processTraffic.getProperties() != null && processTraffic.getProperties().size() > 0) {
this.properties = processTraffic.getProperties();
}
if (processTraffic.getDetectType() > 0) {
this.detectType = processTraffic.getDetectType();
}
return true;
}
@Override
public int remoteHashCode() {
return this.hashCode();
}
@Override
public void deserialize(RemoteData remoteData) {
setServiceId(remoteData.getDataStrings(0));
setInstanceId(remoteData.getDataStrings(1));
setName(remoteData.getDataStrings(2));
setLayer(remoteData.getDataIntegers(0));
setAgentId(remoteData.getDataStrings(3));
final String propString = remoteData.getDataStrings(4);
if (StringUtil.isNotEmpty(propString)) {
setProperties(GSON.fromJson(propString, JsonObject.class));
}
setLastPingTimestamp(remoteData.getDataLongs(0));
setDetectType(remoteData.getDataIntegers(1));
setTimeBucket(remoteData.getDataLongs(1));
}
@Override
public RemoteData.Builder serialize() {
final RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(serviceId);
builder.addDataStrings(instanceId);
builder.addDataStrings(name);
builder.addDataIntegers(layer);
builder.addDataStrings(agentId);
if (properties == null) {
builder.addDataStrings(Const.EMPTY_STRING);
} else {
builder.addDataStrings(GSON.toJson(properties));
}
builder.addDataLongs(lastPingTimestamp);
builder.addDataIntegers(detectType);
builder.addDataLongs(getTimeBucket());
return builder;
}
@Override
protected String id0() {
return IDManager.ProcessID.buildId(instanceId, name);
}
public static class Builder implements StorageHashMapBuilder<ProcessTraffic> {
@Override
public ProcessTraffic storage2Entity(Map<String, Object> dbMap) {
final ProcessTraffic processTraffic = new ProcessTraffic();
processTraffic.setServiceId((String) dbMap.get(SERVICE_ID));
processTraffic.setInstanceId((String) dbMap.get(INSTANCE_ID));
processTraffic.setName((String) dbMap.get(NAME));
processTraffic.setLayer(((Number) dbMap.get(LAYER)).intValue());
processTraffic.setAgentId((String) dbMap.get(AGENT_ID));
final String propString = (String) dbMap.get(PROPERTIES);
if (StringUtil.isNotEmpty(propString)) {
processTraffic.setProperties(GSON.fromJson(propString, JsonObject.class));
}
processTraffic.setLastPingTimestamp(((Number) dbMap.get(LAST_PING_TIME_BUCKET)).longValue());
processTraffic.setDetectType(((Number) dbMap.get(DETECT_TYPE)).intValue());
processTraffic.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue());
return processTraffic;
}
@Override
public Map<String, Object> entity2Storage(ProcessTraffic storageData) {
final HashMap<String, Object> map = new HashMap<>();
map.put(SERVICE_ID, storageData.getServiceId());
map.put(INSTANCE_ID, storageData.getInstanceId());
map.put(NAME, storageData.getName());
map.put(LAYER, storageData.getLayer());
map.put(AGENT_ID, storageData.getAgentId());
if (storageData.getProperties() != null) {
map.put(PROPERTIES, GSON.toJson(storageData.getProperties()));
} else {
map.put(PROPERTIES, Const.EMPTY_STRING);
}
map.put(LAST_PING_TIME_BUCKET, storageData.getLastPingTimestamp());
map.put(DETECT_TYPE, storageData.getDetectType());
map.put(TIME_BUCKET, storageData.getTimeBucket());
return map;
}
}
@Override
public void calculate() {
}
@Override
public Metrics toHour() {
return null;
}
@Override
public Metrics toDay() {
return null;
}
public static class PropertyUtil {
public static final String HOST_IP = "host_ip";
public static final String PID = "pid";
public static final String COMMAND_LINE = "command_line";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.metrics;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@Setter
@Getter
@AllArgsConstructor
@ToString
public class MetricsEntityMetaInfo {
/**
* The name of {@link org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic}
*/
private String serviceName;
/**
* The name of {@link org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic}
*/
private String instanceName;
/**
* The name of {@link org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic}
*/
private String endpointName;
/**
* The name of {@link org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessTraffic}
*/
private String processName;
/**
* Build Service Entity
*/
public static MetricsEntityMetaInfo buildService(String serviceName) {
return new MetricsEntityMetaInfo(serviceName, "", "", "");
}
/**
* Build Service Instance Entity
*/
public static MetricsEntityMetaInfo buildServiceInstance(String serviceName, String instanceName) {
return new MetricsEntityMetaInfo(serviceName, instanceName, "", "");
}
/**
* Build Endpoint Entity
*/
public static MetricsEntityMetaInfo buildEndpoint(String serviceName, String endpointName) {
return new MetricsEntityMetaInfo(serviceName, "", endpointName, "");
}
/**
* Build Process Entity
*/
public static MetricsEntityMetaInfo buildProcess(String serviceName, String instanceName, String processName) {
return new MetricsEntityMetaInfo(serviceName, instanceName, "", processName);
}
}
...@@ -32,6 +32,9 @@ public class MetricsMetaInfo { ...@@ -32,6 +32,9 @@ public class MetricsMetaInfo {
@Setter @Setter
@Getter @Getter
private String id; private String id;
@Setter
@Getter
private MetricsEntityMetaInfo entity;
public MetricsMetaInfo(String metricsName, int scope) { public MetricsMetaInfo(String metricsName, int scope) {
this.metricsName = metricsName; this.metricsName = metricsName;
...@@ -45,8 +48,16 @@ public class MetricsMetaInfo { ...@@ -45,8 +48,16 @@ public class MetricsMetaInfo {
this.id = id; this.id = id;
} }
public MetricsMetaInfo(String metricsName, int scope, String id, MetricsEntityMetaInfo entity) {
this.metricsName = metricsName;
this.scope = scope;
this.id = id;
this.entity = entity;
}
@Override @Override
public String toString() { public String toString() {
return "MetricsMetaInfo{" + "metricsName='" + metricsName + '\'' + ", scope=" + scope + ", id='" + id + '\'' + '}'; return "MetricsMetaInfo{" + "metricsName='" + metricsName + '\'' + ", scope=" + scope + ", id='" + id + '\''
+ ", entity=" + entity + '}';
} }
} }
...@@ -92,6 +92,8 @@ public class DefaultScopeDefine { ...@@ -92,6 +92,8 @@ public class DefaultScopeDefine {
public static final int SERVICE_INSTANCE_JVM_CLASS = 44; public static final int SERVICE_INSTANCE_JVM_CLASS = 44;
public static final int PROCESS = 45;
/** /**
* Catalog of scope, the metrics processor could use this to group all generated metrics by oal rt. * Catalog of scope, the metrics processor could use this to group all generated metrics by oal rt.
*/ */
...@@ -101,6 +103,7 @@ public class DefaultScopeDefine { ...@@ -101,6 +103,7 @@ public class DefaultScopeDefine {
public static final String SERVICE_RELATION_CATALOG_NAME = "SERVICE_RELATION"; public static final String SERVICE_RELATION_CATALOG_NAME = "SERVICE_RELATION";
public static final String SERVICE_INSTANCE_RELATION_CATALOG_NAME = "SERVICE_INSTANCE_RELATION"; public static final String SERVICE_INSTANCE_RELATION_CATALOG_NAME = "SERVICE_INSTANCE_RELATION";
public static final String ENDPOINT_RELATION_CATALOG_NAME = "ENDPOINT_RELATION"; public static final String ENDPOINT_RELATION_CATALOG_NAME = "ENDPOINT_RELATION";
public static final String PROCESS_CATALOG_NAME = "PROCESS";
private static final Map<Integer, Boolean> SERVICE_CATALOG = new HashMap<>(); private static final Map<Integer, Boolean> SERVICE_CATALOG = new HashMap<>();
private static final Map<Integer, Boolean> SERVICE_INSTANCE_CATALOG = new HashMap<>(); private static final Map<Integer, Boolean> SERVICE_INSTANCE_CATALOG = new HashMap<>();
...@@ -108,6 +111,7 @@ public class DefaultScopeDefine { ...@@ -108,6 +111,7 @@ public class DefaultScopeDefine {
private static final Map<Integer, Boolean> SERVICE_RELATION_CATALOG = new HashMap<>(); private static final Map<Integer, Boolean> SERVICE_RELATION_CATALOG = new HashMap<>();
private static final Map<Integer, Boolean> SERVICE_INSTANCE_RELATION_CATALOG = new HashMap<>(); private static final Map<Integer, Boolean> SERVICE_INSTANCE_RELATION_CATALOG = new HashMap<>();
private static final Map<Integer, Boolean> ENDPOINT_RELATION_CATALOG = new HashMap<>(); private static final Map<Integer, Boolean> ENDPOINT_RELATION_CATALOG = new HashMap<>();
private static final Map<Integer, Boolean> PROCESS_CATALOG = new HashMap<>();
@Setter @Setter
private static boolean ACTIVE_EXTRA_MODEL_COLUMNS = false; private static boolean ACTIVE_EXTRA_MODEL_COLUMNS = false;
...@@ -209,6 +213,9 @@ public class DefaultScopeDefine { ...@@ -209,6 +213,9 @@ public class DefaultScopeDefine {
case ENDPOINT_RELATION_CATALOG_NAME: case ENDPOINT_RELATION_CATALOG_NAME:
ENDPOINT_RELATION_CATALOG.put(id, Boolean.TRUE); ENDPOINT_RELATION_CATALOG.put(id, Boolean.TRUE);
break; break;
case PROCESS_CATALOG_NAME:
PROCESS_CATALOG.put(id, Boolean.TRUE);
break;
} }
} }
...@@ -250,7 +257,7 @@ public class DefaultScopeDefine { ...@@ -250,7 +257,7 @@ public class DefaultScopeDefine {
} }
/** /**
* Check whether current service belongs service catalog * Check whether the given scope ID belongs service catalog
* *
* @param scopeId represents an existing scope id. * @param scopeId represents an existing scope id.
* @return true is current scope set {@link ScopeDeclaration#catalog()} == {@link #SERVICE_CATALOG_NAME} * @return true is current scope set {@link ScopeDeclaration#catalog()} == {@link #SERVICE_CATALOG_NAME}
...@@ -260,7 +267,7 @@ public class DefaultScopeDefine { ...@@ -260,7 +267,7 @@ public class DefaultScopeDefine {
} }
/** /**
* Check whether current service belongs service instance catalog * Check whether the given scope ID belongs service instance catalog
* *
* @param scopeId represents an existing scope id. * @param scopeId represents an existing scope id.
* @return true is current scope set {@link ScopeDeclaration#catalog()} == {@link #SERVICE_INSTANCE_CATALOG_NAME} * @return true is current scope set {@link ScopeDeclaration#catalog()} == {@link #SERVICE_INSTANCE_CATALOG_NAME}
...@@ -270,7 +277,7 @@ public class DefaultScopeDefine { ...@@ -270,7 +277,7 @@ public class DefaultScopeDefine {
} }
/** /**
* Check whether current service belongs endpoint catalog * Check whether the given scope ID belongs endpoint catalog
* *
* @param scopeId represents an existing scope id. * @param scopeId represents an existing scope id.
* @return true is current scope set {@link ScopeDeclaration#catalog()} == {@link #ENDPOINT_CATALOG_NAME} * @return true is current scope set {@link ScopeDeclaration#catalog()} == {@link #ENDPOINT_CATALOG_NAME}
...@@ -280,7 +287,7 @@ public class DefaultScopeDefine { ...@@ -280,7 +287,7 @@ public class DefaultScopeDefine {
} }
/** /**
* Check whether current service belongs service relation catalog * Check whether the given scope ID belongs service relation catalog
* *
* @param scopeId represents an existing scope id. * @param scopeId represents an existing scope id.
* @return true is current scope set {@link ScopeDeclaration#catalog()} == {@link #SERVICE_RELATION_CATALOG_NAME} * @return true is current scope set {@link ScopeDeclaration#catalog()} == {@link #SERVICE_RELATION_CATALOG_NAME}
...@@ -290,7 +297,7 @@ public class DefaultScopeDefine { ...@@ -290,7 +297,7 @@ public class DefaultScopeDefine {
} }
/** /**
* Check whether current service belongs service instance relation catalog * Check whether the given scope ID belongs service instance relation catalog
* *
* @param scopeId represents an existing scope id. * @param scopeId represents an existing scope id.
* @return true is current scope set {@link ScopeDeclaration#catalog()} == {@link #SERVICE_INSTANCE_RELATION_CATALOG_NAME} * @return true is current scope set {@link ScopeDeclaration#catalog()} == {@link #SERVICE_INSTANCE_RELATION_CATALOG_NAME}
...@@ -300,7 +307,7 @@ public class DefaultScopeDefine { ...@@ -300,7 +307,7 @@ public class DefaultScopeDefine {
} }
/** /**
* Check whether current service belongs endpoint relation catalog * Check whether the given scope ID belongs endpoint relation catalog
* *
* @param scopeId represents an existing scope id. * @param scopeId represents an existing scope id.
* @return true is current scope set {@link ScopeDeclaration#catalog()} == {@link #ENDPOINT_RELATION_CATALOG_NAME} * @return true is current scope set {@link ScopeDeclaration#catalog()} == {@link #ENDPOINT_RELATION_CATALOG_NAME}
...@@ -309,6 +316,16 @@ public class DefaultScopeDefine { ...@@ -309,6 +316,16 @@ public class DefaultScopeDefine {
return ENDPOINT_RELATION_CATALOG.containsKey(scopeId); return ENDPOINT_RELATION_CATALOG.containsKey(scopeId);
} }
/**
* Check whether the given scope ID belongs process catalog
*
* @param scopeId represents an existing scope id.
* @return true is current scope set {@link ScopeDeclaration#catalog()} == {@link #PROCESS_CATALOG_NAME}
*/
public static boolean inProcessCatalog(int scopeId) {
return PROCESS_CATALOG.containsKey(scopeId);
}
/** /**
* Get the catalog string name of the given scope * Get the catalog string name of the given scope
* *
...@@ -334,6 +351,9 @@ public class DefaultScopeDefine { ...@@ -334,6 +351,9 @@ public class DefaultScopeDefine {
if (inEndpointRelationCatalog(scope)) { if (inEndpointRelationCatalog(scope)) {
return ENDPOINT_RELATION_CATALOG_NAME; return ENDPOINT_RELATION_CATALOG_NAME;
} }
if (inProcessCatalog(scope)) {
return PROCESS_CATALOG_NAME;
}
return "ALL"; return "ALL";
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.source;
import com.google.gson.JsonObject;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.Layer;
import org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessDetectType;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROCESS;
@ScopeDeclaration(id = PROCESS, name = "Process")
@ScopeDefaultColumn.VirtualColumnDefinition(fieldName = "entityId", columnName = "entity_id", isID = true, type = String.class)
public class Process extends Source {
private volatile String entityId;
@Override
public int scope() {
return PROCESS;
}
@Override
public String getEntityId() {
if (entityId == null) {
entityId = IDManager.ProcessID.buildId(instanceId, name);
}
return entityId;
}
@Getter
private String instanceId;
@Getter
private String serviceId;
@Getter
@Setter
private String name;
@Getter
@Setter
private String serviceName;
@Getter
@Setter
private String instanceName;
@Getter
@Setter
private Layer layer;
@Getter
@Setter
private boolean isServiceNormal;
@Getter
@Setter
private String agentId;
@Getter
@Setter
private ProcessDetectType detectType;
@Getter
@Setter
private JsonObject properties;
@Override
public void prepare() {
serviceId = IDManager.ServiceID.buildId(serviceName, isServiceNormal);
instanceId = IDManager.ServiceInstanceID.buildId(serviceId, instanceName);
}
}
...@@ -45,6 +45,7 @@ ...@@ -45,6 +45,7 @@
<module>configuration-discovery-receiver-plugin</module> <module>configuration-discovery-receiver-plugin</module>
<module>skywalking-event-receiver-plugin</module> <module>skywalking-event-receiver-plugin</module>
<module>skywalking-zabbix-receiver-plugin</module> <module>skywalking-zabbix-receiver-plugin</module>
<module>skywalking-ebpf-receiver-plugin</module>
</modules> </modules>
<dependencies> <dependencies>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>server-receiver-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>9.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>skywalking-ebpf-receiver-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-sharing-server-plugin</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.ebpf.module;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
public class EBPFReceiverModule extends ModuleDefine {
public static final String NAME = "receiver-ebpf";
public EBPFReceiverModule() {
super(NAME);
}
@Override
public Class[] services() {
return new Class[0];
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.ebpf.provider;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.receiver.ebpf.module.EBPFReceiverModule;
import org.apache.skywalking.oap.server.receiver.ebpf.provider.handler.EBPFProcessServiceHandler;
import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule;
public class EBPFReceiverProvider extends ModuleProvider {
@Override
public String name() {
return "default";
}
@Override
public Class<? extends ModuleDefine> module() {
return EBPFReceiverModule.class;
}
@Override
public ModuleConfig createConfigBeanIfAbsent() {
return null;
}
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
}
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
final GRPCHandlerRegister grpcHandlerRegister = getManager().find(SharingServerModule.NAME)
.provider()
.getService(GRPCHandlerRegister.class);
grpcHandlerRegister.addHandler(new EBPFProcessServiceHandler(getManager()));
}
@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
}
@Override
public String[] requiredModules() {
return new String[] {
CoreModule.NAME,
SharingServerModule.NAME
};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.ebpf.provider.handler;
import com.google.gson.JsonObject;
import io.grpc.stub.StreamObserver;
import io.vavr.Tuple;
import io.vavr.Tuple2;
import org.apache.skywalking.apm.network.common.v3.Commands;
import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFHostProcessDownstream;
import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFHostProcessMetadata;
import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFProcessDownstream;
import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFProcessEntityMetadata;
import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFProcessPingPkgList;
import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFProcessProperties;
import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFProcessReportList;
import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFProcessServiceGrpc;
import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFReportProcessDownstream;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.Layer;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessDetectType;
import org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessTraffic;
import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.source.Process;
import org.apache.skywalking.oap.server.core.source.ServiceInstanceUpdate;
import org.apache.skywalking.oap.server.core.source.ServiceMeta;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
import java.util.ArrayList;
public class EBPFProcessServiceHandler extends EBPFProcessServiceGrpc.EBPFProcessServiceImplBase implements GRPCHandler {
private final SourceReceiver sourceReceiver;
private final NamingControl namingControl;
public EBPFProcessServiceHandler(ModuleManager moduleManager) {
this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
this.namingControl = moduleManager.find(CoreModule.NAME)
.provider()
.getService(NamingControl.class);
}
@Override
public void reportProcesses(EBPFProcessReportList request, StreamObserver<EBPFReportProcessDownstream> responseObserver) {
final String agentId = request.getEbpfAgentID();
// build per process data
final ArrayList<Tuple2<Process, EBPFProcessDownstream>> processes = new ArrayList<>();
for (EBPFProcessProperties ebpfProcessProperties : request.getProcessesList()) {
Tuple2<Process, EBPFProcessDownstream> processData = null;
if (ebpfProcessProperties.hasHostProcess()) {
processData = prepareReportHostProcess(ebpfProcessProperties.getHostProcess(), agentId);
}
if (processData != null) {
processes.add(processData);
}
}
// report process and downstream the process id data
final EBPFReportProcessDownstream.Builder builder = EBPFReportProcessDownstream.newBuilder();
processes.stream().forEach(e -> {
sourceReceiver.receive(e._1);
builder.addProcesses(e._2);
});
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}
@Override
public void keepAlive(EBPFProcessPingPkgList request, StreamObserver<Commands> responseObserver) {
final long timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(), DownSampling.Minute);
request.getProcessesList().forEach(p -> {
final EBPFProcessEntityMetadata entity = p.getEntityMetadata();
final String serviceName = namingControl.formatServiceName(entity.getServiceName());
final String instanceName = namingControl.formatInstanceName(entity.getInstanceName());
final Layer layer = Layer.valueOf(entity.getLayer());
// process
final Process processUpdate = new Process();
processUpdate.setServiceName(serviceName);
processUpdate.setInstanceName(instanceName);
processUpdate.setLayer(layer);
processUpdate.setServiceNormal(true);
processUpdate.setName(entity.getProcessName());
processUpdate.setTimeBucket(timeBucket);
sourceReceiver.receive(processUpdate);
// instance
final ServiceInstanceUpdate serviceInstanceUpdate = new ServiceInstanceUpdate();
serviceInstanceUpdate.setServiceId(IDManager.ServiceID.buildId(serviceName, true));
serviceInstanceUpdate.setName(instanceName);
serviceInstanceUpdate.setTimeBucket(timeBucket);
serviceInstanceUpdate.setLayer(layer);
sourceReceiver.receive(serviceInstanceUpdate);
// service
final ServiceMeta serviceMeta = new ServiceMeta();
serviceMeta.setName(serviceName);
serviceMeta.setNormal(true);
serviceMeta.setTimeBucket(timeBucket);
serviceMeta.setLayer(layer);
sourceReceiver.receive(serviceMeta);
});
responseObserver.onNext(Commands.newBuilder().build());
responseObserver.onCompleted();
}
private Tuple2<Process, EBPFProcessDownstream> prepareReportHostProcess(EBPFHostProcessMetadata hostProcess, String agentId) {
final Process process = new Process();
// entity
process.setServiceName(namingControl.formatServiceName(hostProcess.getEntity().getServiceName()));
process.setServiceNormal(true);
process.setLayer(Layer.valueOf(hostProcess.getEntity().getLayer()));
process.setInstanceName(namingControl.formatInstanceName(hostProcess.getEntity().getInstanceName()));
process.setName(hostProcess.getEntity().getProcessName());
// metadata
process.setDetectType(ProcessDetectType.VM);
process.setAgentId(agentId);
final JsonObject properties = new JsonObject();
properties.addProperty(ProcessTraffic.PropertyUtil.HOST_IP, hostProcess.getHostIP());
properties.addProperty(ProcessTraffic.PropertyUtil.PID, hostProcess.getPid());
properties.addProperty(ProcessTraffic.PropertyUtil.COMMAND_LINE, hostProcess.getCmd());
process.setProperties(properties);
// timestamp
process.setTimeBucket(
TimeBucket.getTimeBucket(System.currentTimeMillis(), DownSampling.Minute));
process.prepare();
final String processId = process.getEntityId();
final EBPFProcessDownstream downstream = EBPFProcessDownstream.newBuilder()
.setProcessId(processId)
.setHostProcess(EBPFHostProcessDownstream.newBuilder()
.setPid(hostProcess.getPid())
.build())
.build();
return Tuple.of(process, downstream);
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.receiver.ebpf.module.EBPFReceiverModule
\ No newline at end of file
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.receiver.ebpf.provider.EBPFReceiverProvider
\ No newline at end of file
...@@ -156,6 +156,11 @@ ...@@ -156,6 +156,11 @@
<artifactId>skywalking-zabbix-receiver-plugin</artifactId> <artifactId>skywalking-zabbix-receiver-plugin</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-ebpf-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<!-- receiver module --> <!-- receiver module -->
<!-- fetcher module --> <!-- fetcher module -->
......
...@@ -506,3 +506,7 @@ configuration-discovery: ...@@ -506,3 +506,7 @@ configuration-discovery:
receiver-event: receiver-event:
selector: ${SW_RECEIVER_EVENT:default} selector: ${SW_RECEIVER_EVENT:default}
default: default:
receiver-ebpf:
selector: ${SW_RECEIVER_EBPF:default}
default:
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册