未验证 提交 91b94cfc 编写于 作者: 静夜思朝颜's avatar 静夜思朝颜 提交者: GitHub

Adapt the process procotol update (#9069)

上级 6b7d6038
Subproject commit ad4a02c6c035da5182034a53b5fe6998bd52cb9f Subproject commit f1b62de294f7f46f3334900417ffb349f19a479c
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core.analysis.manual.process; package org.apache.skywalking.oap.server.core.analysis.manual.process;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.Getter; import lombok.Getter;
...@@ -36,6 +37,8 @@ import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage; ...@@ -36,6 +37,8 @@ import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder; import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.util.StringUtil; import org.apache.skywalking.oap.server.library.util.StringUtil;
import java.util.Map;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROCESS; import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROCESS;
@Stream(name = ProcessTraffic.INDEX_NAME, scopeId = PROCESS, @Stream(name = ProcessTraffic.INDEX_NAME, scopeId = PROCESS,
...@@ -118,8 +121,12 @@ public class ProcessTraffic extends Metrics { ...@@ -118,8 +121,12 @@ public class ProcessTraffic extends Metrics {
if (StringUtil.isNotBlank(processTraffic.getAgentId())) { if (StringUtil.isNotBlank(processTraffic.getAgentId())) {
this.agentId = processTraffic.getAgentId(); this.agentId = processTraffic.getAgentId();
} }
if (processTraffic.getProperties() != null && processTraffic.getProperties().size() > 0) { if (this.properties == null) {
this.properties = processTraffic.getProperties(); this.properties = processTraffic.getProperties();
} else if (processTraffic.getProperties() != null) {
for (Map.Entry<String, JsonElement> e : processTraffic.getProperties().entrySet()) {
this.properties.add(e.getKey(), e.getValue());
}
} }
if (processTraffic.getDetectType() > 0) { if (processTraffic.getDetectType() > 0) {
this.detectType = processTraffic.getDetectType(); this.detectType = processTraffic.getDetectType();
......
...@@ -35,7 +35,6 @@ import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFProcessPr ...@@ -35,7 +35,6 @@ import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFProcessPr
import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFProcessReportList; import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFProcessReportList;
import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFProcessServiceGrpc; import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFProcessServiceGrpc;
import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFReportProcessDownstream; import org.apache.skywalking.apm.network.ebpf.profiling.process.v3.EBPFReportProcessDownstream;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.DownSampling; import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.IDManager; import org.apache.skywalking.oap.server.core.analysis.IDManager;
...@@ -103,6 +102,7 @@ public class EBPFProcessServiceHandler extends EBPFProcessServiceGrpc.EBPFProces ...@@ -103,6 +102,7 @@ public class EBPFProcessServiceHandler extends EBPFProcessServiceGrpc.EBPFProces
@Override @Override
public void keepAlive(EBPFProcessPingPkgList request, StreamObserver<Commands> responseObserver) { public void keepAlive(EBPFProcessPingPkgList request, StreamObserver<Commands> responseObserver) {
final long timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(), DownSampling.Minute); final long timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(), DownSampling.Minute);
final String agentID = request.getEbpfAgentID();
request.getProcessesList().forEach(p -> { request.getProcessesList().forEach(p -> {
final EBPFProcessEntityMetadata entity = p.getEntityMetadata(); final EBPFProcessEntityMetadata entity = p.getEntityMetadata();
...@@ -117,8 +117,10 @@ public class EBPFProcessServiceHandler extends EBPFProcessServiceGrpc.EBPFProces ...@@ -117,8 +117,10 @@ public class EBPFProcessServiceHandler extends EBPFProcessServiceGrpc.EBPFProces
processUpdate.setServiceNormal(true); processUpdate.setServiceNormal(true);
processUpdate.setName(entity.getProcessName()); processUpdate.setName(entity.getProcessName());
processUpdate.setLabels(entity.getLabelsList()); processUpdate.setLabels(entity.getLabelsList());
processUpdate.setProperties(convertProperties(p.getPropertiesList()));
processUpdate.setProfilingSupportStatus(getProfilingSupportStatus(p.getPropertiesList()));
processUpdate.setTimeBucket(timeBucket); processUpdate.setTimeBucket(timeBucket);
processUpdate.setAgentId(Const.EMPTY_STRING); processUpdate.setAgentId(agentID);
sourceReceiver.receive(processUpdate); sourceReceiver.receive(processUpdate);
// instance // instance
...@@ -155,11 +157,7 @@ public class EBPFProcessServiceHandler extends EBPFProcessServiceGrpc.EBPFProces ...@@ -155,11 +157,7 @@ public class EBPFProcessServiceHandler extends EBPFProcessServiceGrpc.EBPFProces
// metadata // metadata
process.setDetectType(ProcessDetectType.VM); process.setDetectType(ProcessDetectType.VM);
process.setAgentId(agentId); process.setAgentId(agentId);
final JsonObject properties = new JsonObject(); process.setProperties(convertProperties(hostProcess.getPropertiesList()));
for (KeyStringValuePair kv : hostProcess.getPropertiesList()) {
properties.addProperty(kv.getKey(), kv.getValue());
}
process.setProperties(properties);
process.setLabels(hostProcess.getEntity().getLabelsList()); process.setLabels(hostProcess.getEntity().getLabelsList());
process.setProfilingSupportStatus(getProfilingSupportStatus(hostProcess.getPropertiesList())); process.setProfilingSupportStatus(getProfilingSupportStatus(hostProcess.getPropertiesList()));
...@@ -173,6 +171,7 @@ public class EBPFProcessServiceHandler extends EBPFProcessServiceGrpc.EBPFProces ...@@ -173,6 +171,7 @@ public class EBPFProcessServiceHandler extends EBPFProcessServiceGrpc.EBPFProces
.setProcessId(processId) .setProcessId(processId)
.setHostProcess(EBPFHostProcessDownstream.newBuilder() .setHostProcess(EBPFHostProcessDownstream.newBuilder()
.setPid(hostProcess.getPid()) .setPid(hostProcess.getPid())
.setEntityMetadata(hostProcess.getEntity())
.build()) .build())
.build(); .build();
return Tuple.of(process, downstream); return Tuple.of(process, downstream);
...@@ -190,11 +189,7 @@ public class EBPFProcessServiceHandler extends EBPFProcessServiceGrpc.EBPFProces ...@@ -190,11 +189,7 @@ public class EBPFProcessServiceHandler extends EBPFProcessServiceGrpc.EBPFProces
// metadata // metadata
process.setDetectType(ProcessDetectType.KUBERNETES); process.setDetectType(ProcessDetectType.KUBERNETES);
process.setAgentId(agentId); process.setAgentId(agentId);
final JsonObject properties = new JsonObject(); process.setProperties(convertProperties(kubernetesProcessMetadata.getPropertiesList()));
for (KeyStringValuePair kv : kubernetesProcessMetadata.getPropertiesList()) {
properties.addProperty(kv.getKey(), kv.getValue());
}
process.setProperties(properties);
process.setLabels(kubernetesProcessMetadata.getEntity().getLabelsList()); process.setLabels(kubernetesProcessMetadata.getEntity().getLabelsList());
process.setProfilingSupportStatus(getProfilingSupportStatus(kubernetesProcessMetadata.getPropertiesList())); process.setProfilingSupportStatus(getProfilingSupportStatus(kubernetesProcessMetadata.getPropertiesList()));
...@@ -208,6 +203,7 @@ public class EBPFProcessServiceHandler extends EBPFProcessServiceGrpc.EBPFProces ...@@ -208,6 +203,7 @@ public class EBPFProcessServiceHandler extends EBPFProcessServiceGrpc.EBPFProces
.setProcessId(processId) .setProcessId(processId)
.setK8SProcess(EBPFKubernetesProcessDownstream.newBuilder() .setK8SProcess(EBPFKubernetesProcessDownstream.newBuilder()
.setPid(kubernetesProcessMetadata.getPid()) .setPid(kubernetesProcessMetadata.getPid())
.setEntityMetadata(kubernetesProcessMetadata.getEntity())
.build()) .build())
.build(); .build();
return Tuple.of(process, downstream); return Tuple.of(process, downstream);
...@@ -243,4 +239,15 @@ public class EBPFProcessServiceHandler extends EBPFProcessServiceGrpc.EBPFProces ...@@ -243,4 +239,15 @@ public class EBPFProcessServiceHandler extends EBPFProcessServiceGrpc.EBPFProces
} }
return ProfilingSupportStatus.NOT_SUPPORT; return ProfilingSupportStatus.NOT_SUPPORT;
} }
/**
* Convert process properties to source data
*/
private JsonObject convertProperties(List<KeyStringValuePair> properties) {
final JsonObject result = new JsonObject();
for (KeyStringValuePair kv : properties) {
result.addProperty(kv.getKey(), kv.getValue());
}
return result;
}
} }
...@@ -22,6 +22,6 @@ SW_AGENT_PYTHON_COMMIT=c76a6ec51a478ac91abb20ec8f22a99b8d4d6a58 ...@@ -22,6 +22,6 @@ SW_AGENT_PYTHON_COMMIT=c76a6ec51a478ac91abb20ec8f22a99b8d4d6a58
SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449 SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449
SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016 SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016
SW_KUBERNETES_COMMIT_SHA=0f3ec68e5a7e1608cec8688716b848ed15e971e5 SW_KUBERNETES_COMMIT_SHA=0f3ec68e5a7e1608cec8688716b848ed15e971e5
SW_ROVER_COMMIT=00b5150ec70197af13a9e5f1ffc203d433dc886b SW_ROVER_COMMIT=90c93c706743aac1f5853b677730edae8cc32a2c
SW_CTL_COMMIT=03dbdcf8cecc3abdef661efaa5734c01ac49adea SW_CTL_COMMIT=03dbdcf8cecc3abdef661efaa5734c01ac49adea
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册