未验证 提交 777c670c 编写于 作者: W wankai123 提交者: GitHub

Support the identification of the source host of the metric data in otel-receiver-plugin (#6243)

上级 100823cb
......@@ -74,6 +74,7 @@ Release Notes.
* Add the implementation of ConfigurationDiscovery on the OAP side.
* Fix bug in `parseInternalErrorCode` where some error codes are never reached.
* OAL supports multiple values when as numeric
* Add node information from the Openensus proto to the labels of the samples, to support the identification of the source of the Metric data.
#### UI
* Fix un-removed tags in trace query.
......
......@@ -182,6 +182,9 @@ receiver-otel:
enabledHandlers: ${SW_OTEL_RECEIVER_ENABLED_HANDLERS:"oc"}
enabledOcRules: ${SW_OTEL_RECEIVER_ENABLED_OC_RULES:"istio-controlplane"}
```
The receiver adds labels with `key = node_identifier_host_name` and `key = node_identifier_pid` to the collected data samples,
and values from `Node.identifier.host_name` and `Node.identifier.pid` defined in opencensus agent proto,
to be the identification of the metric data.
## Meter receiver
......
......@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.receiver.otel.oc;
import com.google.protobuf.Timestamp;
import io.grpc.stub.StreamObserver;
import io.opencensus.proto.agent.common.v1.Node;
import io.opencensus.proto.agent.metrics.v1.ExportMetricsServiceRequest;
import io.opencensus.proto.agent.metrics.v1.ExportMetricsServiceResponse;
import io.opencensus.proto.agent.metrics.v1.MetricsServiceGrpc;
......@@ -34,6 +35,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.meter.analyzer.MetricConvert;
import org.apache.skywalking.oap.meter.analyzer.prometheus.PrometheusMetricConverter;
import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rule;
......@@ -57,11 +59,32 @@ public class OCMetricHandler extends MetricsServiceGrpc.MetricsServiceImplBase i
@Override public StreamObserver<ExportMetricsServiceRequest> export(
StreamObserver<ExportMetricsServiceResponse> responseObserver) {
return new StreamObserver<ExportMetricsServiceRequest>() {
@Override public void onNext(ExportMetricsServiceRequest request) {
private Node node;
private Map<String, String> nodeLabels = new HashMap<>();
@Override
public void onNext(ExportMetricsServiceRequest request) {
if (request.hasNode()) {
node = request.getNode();
nodeLabels.clear();
if (node.hasIdentifier()) {
if (StringUtil.isNotBlank(node.getIdentifier().getHostName())) {
nodeLabels.put("node_identifier_host_name", node.getIdentifier().getHostName());
}
if (node.getIdentifier().getPid() > 0) {
nodeLabels.put("node_identifier_pid", String.valueOf(node.getIdentifier().getPid()));
}
}
}
metrics.forEach(m -> m.toMeter(request.getMetricsList().stream()
.flatMap(metric -> metric.getTimeseriesList().stream().map(timeSeries ->
Tuple.of(metric.getMetricDescriptor(),
buildLabels(metric.getMetricDescriptor().getLabelKeysList(), timeSeries.getLabelValuesList()),
buildLabelsFromNodeInfo(
nodeLabels, buildLabels(
metric.getMetricDescriptor().getLabelKeysList(),
timeSeries.getLabelValuesList()
)
),
timeSeries)))
.flatMap(t -> t._3.getPointsList().stream().map(point -> Tuple.of(t._1, t._2, point)))
.map(Function1.liftTry(t -> {
......@@ -107,6 +130,12 @@ public class OCMetricHandler extends MetricsServiceGrpc.MetricsServiceImplBase i
return result;
}
private static Map<String, String> buildLabelsFromNodeInfo(Map<String, String> nodeLabels,
Map<String, String> buildLabelsResult) {
buildLabelsResult.putAll(nodeLabels);
return buildLabelsResult;
}
private static Map<Double, Long> buildBuckets(DistributionValue distributionValue) {
Map<Double, Long> result = new HashMap<>();
List<Double> bounds = distributionValue.getBucketOptions().getExplicit().getBoundsList();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册