diff --git a/apm-protocol/apm-network/src/main/proto b/apm-protocol/apm-network/src/main/proto index ad4a02c6c035da5182034a53b5fe6998bd52cb9f..f1b62de294f7f46f3334900417ffb349f19a479c 160000 --- a/apm-protocol/apm-network/src/main/proto +++ b/apm-protocol/apm-network/src/main/proto @@ -1 +1 @@ -Subproject commit ad4a02c6c035da5182034a53b5fe6998bd52cb9f +Subproject commit f1b62de294f7f46f3334900417ffb349f19a479c diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessTraffic.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessTraffic.java index 595977ed05f7c516ea0213466f1b574988157f8c..154c171ca8b43373ebb06c16407d99280d28e364 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessTraffic.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessTraffic.java @@ -19,6 +19,7 @@ package org.apache.skywalking.oap.server.core.analysis.manual.process; import com.google.gson.Gson; +import com.google.gson.JsonElement; import com.google.gson.JsonObject; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -36,6 +37,8 @@ import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage; import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder; import org.apache.skywalking.oap.server.library.util.StringUtil; +import java.util.Map; + import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROCESS; @Stream(name = ProcessTraffic.INDEX_NAME, scopeId = PROCESS, @@ -118,8 +121,12 @@ public class ProcessTraffic extends Metrics { if (StringUtil.isNotBlank(processTraffic.getAgentId())) { this.agentId = processTraffic.getAgentId(); } - if (processTraffic.getProperties() != null && processTraffic.getProperties().size() > 0) { + if (this.properties == null) { this.properties = processTraffic.getProperties(); + } else if (processTraffic.getProperties() != null) { + for (Map.Entry e : processTraffic.getProperties().entrySet()) { + this.properties.add(e.getKey(), e.getValue()); + } } if (processTraffic.getDetectType() > 0) { this.detectType = processTraffic.getDetectType(); diff --git a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/EBPFProcessServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/EBPFProcessServiceHandler.java index 0137b3540a6b18b3ef93642aeaa0f3c91944fa09..690b24bdc763595121703c3150a8e27ed6b427d5 100644 --- a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/EBPFProcessServiceHandler.java +++ b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/EBPFProcessServiceHandler.java @@ -35,7 +35,6 @@ import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFProcessPr import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFProcessReportList; import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFProcessServiceGrpc; import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFReportProcessDownstream; -import org.apache.skywalking.oap.server.core.Const; import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.analysis.DownSampling; import org.apache.skywalking.oap.server.core.analysis.IDManager; @@ -103,6 +102,7 @@ public class EBPFProcessServiceHandler extends EBPFProcessServiceGrpc.EBPFProces @Override public void keepAlive(EBPFProcessPingPkgList request, StreamObserver responseObserver) { final long timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(), DownSampling.Minute); + final String agentID = request.getEbpfAgentID(); request.getProcessesList().forEach(p -> { final EBPFProcessEntityMetadata entity = p.getEntityMetadata(); @@ -117,8 +117,10 @@ public class EBPFProcessServiceHandler extends EBPFProcessServiceGrpc.EBPFProces processUpdate.setServiceNormal(true); processUpdate.setName(entity.getProcessName()); processUpdate.setLabels(entity.getLabelsList()); + processUpdate.setProperties(convertProperties(p.getPropertiesList())); + processUpdate.setProfilingSupportStatus(getProfilingSupportStatus(p.getPropertiesList())); processUpdate.setTimeBucket(timeBucket); - processUpdate.setAgentId(Const.EMPTY_STRING); + processUpdate.setAgentId(agentID); sourceReceiver.receive(processUpdate); // instance @@ -155,11 +157,7 @@ public class EBPFProcessServiceHandler extends EBPFProcessServiceGrpc.EBPFProces // metadata process.setDetectType(ProcessDetectType.VM); process.setAgentId(agentId); - final JsonObject properties = new JsonObject(); - for (KeyStringValuePair kv : hostProcess.getPropertiesList()) { - properties.addProperty(kv.getKey(), kv.getValue()); - } - process.setProperties(properties); + process.setProperties(convertProperties(hostProcess.getPropertiesList())); process.setLabels(hostProcess.getEntity().getLabelsList()); process.setProfilingSupportStatus(getProfilingSupportStatus(hostProcess.getPropertiesList())); @@ -173,6 +171,7 @@ public class EBPFProcessServiceHandler extends EBPFProcessServiceGrpc.EBPFProces .setProcessId(processId) .setHostProcess(EBPFHostProcessDownstream.newBuilder() .setPid(hostProcess.getPid()) + .setEntityMetadata(hostProcess.getEntity()) .build()) .build(); return Tuple.of(process, downstream); @@ -190,11 +189,7 @@ public class EBPFProcessServiceHandler extends EBPFProcessServiceGrpc.EBPFProces // metadata process.setDetectType(ProcessDetectType.KUBERNETES); process.setAgentId(agentId); - final JsonObject properties = new JsonObject(); - for (KeyStringValuePair kv : kubernetesProcessMetadata.getPropertiesList()) { - properties.addProperty(kv.getKey(), kv.getValue()); - } - process.setProperties(properties); + process.setProperties(convertProperties(kubernetesProcessMetadata.getPropertiesList())); process.setLabels(kubernetesProcessMetadata.getEntity().getLabelsList()); process.setProfilingSupportStatus(getProfilingSupportStatus(kubernetesProcessMetadata.getPropertiesList())); @@ -208,6 +203,7 @@ public class EBPFProcessServiceHandler extends EBPFProcessServiceGrpc.EBPFProces .setProcessId(processId) .setK8SProcess(EBPFKubernetesProcessDownstream.newBuilder() .setPid(kubernetesProcessMetadata.getPid()) + .setEntityMetadata(kubernetesProcessMetadata.getEntity()) .build()) .build(); return Tuple.of(process, downstream); @@ -243,4 +239,15 @@ public class EBPFProcessServiceHandler extends EBPFProcessServiceGrpc.EBPFProces } return ProfilingSupportStatus.NOT_SUPPORT; } + + /** + * Convert process properties to source data + */ + private JsonObject convertProperties(List properties) { + final JsonObject result = new JsonObject(); + for (KeyStringValuePair kv : properties) { + result.addProperty(kv.getKey(), kv.getValue()); + } + return result; + } } diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env index 754691e0fd2876def0f9741ef850273f54427233..9b2e91921f0075ae40b342977063521cb81cf7e4 100644 --- a/test/e2e-v2/script/env +++ b/test/e2e-v2/script/env @@ -22,6 +22,6 @@ SW_AGENT_PYTHON_COMMIT=c76a6ec51a478ac91abb20ec8f22a99b8d4d6a58 SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449 SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016 SW_KUBERNETES_COMMIT_SHA=0f3ec68e5a7e1608cec8688716b848ed15e971e5 -SW_ROVER_COMMIT=00b5150ec70197af13a9e5f1ffc203d433dc886b +SW_ROVER_COMMIT=90c93c706743aac1f5853b677730edae8cc32a2c SW_CTL_COMMIT=03dbdcf8cecc3abdef661efaa5734c01ac49adea