未验证 提交 54efea65 编写于 作者: K kezhenxu94

Unify the Service & ServiceInstance names with ALS

上级 d9b1f2fb
......@@ -62,7 +62,7 @@ public class EnvoyMetricReceiverProvider extends ModuleProvider {
.provider()
.getService(GRPCHandlerRegister.class);
if (config.isAcceptMetricsService()) {
final MetricServiceGRPCHandler handler = new MetricServiceGRPCHandler(getManager());
final MetricServiceGRPCHandler handler = new MetricServiceGRPCHandler(getManager(), config);
service.addHandler(handler);
service.addHandler(new MetricServiceGRPCHandlerV3(handler));
}
......
......@@ -18,8 +18,7 @@
package org.apache.skywalking.oap.server.receiver.envoy;
import io.envoyproxy.envoy.config.core.v3.Node;
import io.envoyproxy.envoy.service.metrics.v3.MetricsServiceGrpc;
import io.envoyproxy.envoy.service.metrics.v2.MetricsServiceGrpc;
import io.envoyproxy.envoy.service.metrics.v3.StreamMetricsMessage;
import io.envoyproxy.envoy.service.metrics.v3.StreamMetricsResponse;
import io.grpc.stub.StreamObserver;
......@@ -27,6 +26,7 @@ import io.prometheus.client.Metrics;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.meter.analyzer.prometheus.PrometheusMetricConverter;
......@@ -40,6 +40,8 @@ import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Metric;
import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
import org.apache.skywalking.oap.server.receiver.envoy.als.mx.ServiceMetaInfoAdapter;
import org.apache.skywalking.oap.server.receiver.envoy.metrics.adapters.ProtoMetricFamily2MetricsAdapter;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
......@@ -80,10 +82,10 @@ public class MetricServiceGRPCHandler extends MetricsServiceGrpc.MetricsServiceI
public StreamObserver<StreamMetricsMessage> streamMetrics(StreamObserver<StreamMetricsResponse> responseObserver) {
return new StreamObserver<StreamMetricsMessage>() {
private volatile boolean isFirst = true;
private String serviceName = null;
private String serviceInstanceName = null;
private ServiceMetaInfo service;
@Override
@SneakyThrows
public void onNext(StreamMetricsMessage message) {
if (log.isDebugEnabled()) {
log.debug("Received msg {}", message);
......@@ -91,46 +93,27 @@ public class MetricServiceGRPCHandler extends MetricsServiceGrpc.MetricsServiceI
if (isFirst) {
isFirst = false;
StreamMetricsMessage.Identifier identifier = message.getIdentifier();
Node node = identifier.getNode();
String nodeId = node.getId();
if (!StringUtil.isEmpty(nodeId)) {
serviceInstanceName = nodeId;
}
String cluster = node.getCluster();
if (!StringUtil.isEmpty(cluster)) {
serviceName = cluster;
if (serviceInstanceName == null) {
serviceInstanceName = serviceName;
}
}
if (serviceName == null) {
serviceName = serviceInstanceName;
}
service = new ServiceMetaInfoAdapter(message.getIdentifier().getNode().getMetadata());
}
if (log.isDebugEnabled()) {
log.debug(
"Envoy metrics reported from service[{}], service instance[{}]", serviceName,
serviceInstanceName
);
log.debug("Envoy metrics reported from service[{}]", service);
}
if (StringUtil.isNotEmpty(serviceName) && StringUtil.isNotEmpty(serviceInstanceName)) {
if (service != null && StringUtil.isNotEmpty(service.getServiceName()) && StringUtil.isNotEmpty(service.getServiceInstanceName())) {
List<Metrics.MetricFamily> list = message.getEnvoyMetricsList();
boolean needHeartbeatUpdate = true;
for (final Metrics.MetricFamily metricFamily : list) {
counter.inc();
final String serviceId = IDManager.ServiceID.buildId(serviceName, NodeType.Normal);
final String serviceId = IDManager.ServiceID.buildId(service.getServiceName(), NodeType.Normal);
try (final HistogramMetrics.Timer ignored = histogram.createTimer()) {
final ProtoMetricFamily2MetricsAdapter adapter = new ProtoMetricFamily2MetricsAdapter(metricFamily);
final Stream<Metric> metrics = adapter.adapt().peek(it -> {
it.getLabels().putIfAbsent("cluster", serviceName);
it.getLabels().putIfAbsent("instance", serviceInstanceName);
it.getLabels().putIfAbsent("cluster", service.getServiceName());
it.getLabels().putIfAbsent("instance", service.getServiceInstanceName());
});
converters.forEach(converter -> converter.toMeter(metrics));
......@@ -139,7 +122,7 @@ public class MetricServiceGRPCHandler extends MetricsServiceGrpc.MetricsServiceI
// Send heartbeat
ServiceInstanceUpdate serviceInstanceUpdate = new ServiceInstanceUpdate();
serviceInstanceUpdate.setName(serviceInstanceName);
serviceInstanceUpdate.setName(service.getServiceInstanceName());
serviceInstanceUpdate.setServiceId(serviceId);
serviceInstanceUpdate.setTimeBucket(TimeBucket.getMinuteTimeBucket(timestamp));
sourceReceiver.receive(serviceInstanceUpdate);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册