未验证 提交 d36b3501 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Fix the priority setting doesn't work of the ALS analyzers (#6121)

上级 f54f639c
...@@ -43,6 +43,7 @@ Release Notes. ...@@ -43,6 +43,7 @@ Release Notes.
* Fix the uuid field in GRPCConfigWatcherRegister is not updated. * Fix the uuid field in GRPCConfigWatcherRegister is not updated.
* Support Envoy {AccessLog,Metrics}Service API V3. * Support Envoy {AccessLog,Metrics}Service API V3.
* Adopt the [MAL](docs/en/concepts-and-designs/mal.md) in Envoy metrics service analyzer. * Adopt the [MAL](docs/en/concepts-and-designs/mal.md) in Envoy metrics service analyzer.
* Fix the priority setting doesn't work of the ALS analyzers.
#### UI #### UI
* Fix un-removed tags in trace query. * Fix un-removed tags in trace query.
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.receiver.envoy; package org.apache.skywalking.oap.server.receiver.envoy;
import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v2.AccessLogServiceGrpc; import io.envoyproxy.envoy.service.accesslog.v2.AccessLogServiceGrpc;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage; import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsResponse; import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsResponse;
...@@ -29,6 +30,7 @@ import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher; ...@@ -29,6 +30,7 @@ import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException; import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis; import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis;
import org.apache.skywalking.oap.server.receiver.envoy.als.Role; import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
...@@ -47,7 +49,8 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS ...@@ -47,7 +49,8 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS
private final HistogramMetrics histogram; private final HistogramMetrics histogram;
private final CounterMetrics sourceDispatcherCounter; private final CounterMetrics sourceDispatcherCounter;
public AccessLogServiceGRPCHandler(ModuleManager manager, EnvoyMetricReceiverConfig config) throws ModuleStartException { public AccessLogServiceGRPCHandler(ModuleManager manager,
EnvoyMetricReceiverConfig config) throws ModuleStartException {
ServiceLoader<ALSHTTPAnalysis> alshttpAnalyses = ServiceLoader.load(ALSHTTPAnalysis.class); ServiceLoader<ALSHTTPAnalysis> alshttpAnalyses = ServiceLoader.load(ALSHTTPAnalysis.class);
envoyHTTPAnalysisList = new ArrayList<>(); envoyHTTPAnalysisList = new ArrayList<>();
for (String httpAnalysisName : config.getAlsHTTPAnalysis()) { for (String httpAnalysisName : config.getAlsHTTPAnalysis()) {
...@@ -62,9 +65,18 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS ...@@ -62,9 +65,18 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS
LOGGER.debug("envoy HTTP analysis: " + envoyHTTPAnalysisList); LOGGER.debug("envoy HTTP analysis: " + envoyHTTPAnalysisList);
MetricsCreator metricCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class); MetricsCreator metricCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
counter = metricCreator.createCounter("envoy_als_in_count", "The count of envoy ALS metric received", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE); counter = metricCreator.createCounter(
histogram = metricCreator.createHistogramMetric("envoy_als_in_latency", "The process latency of service ALS metric receiver", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE); "envoy_als_in_count", "The count of envoy ALS metric received", MetricsTag.EMPTY_KEY,
sourceDispatcherCounter = metricCreator.createCounter("envoy_als_source_dispatch_count", "The count of envoy ALS metric received", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE); MetricsTag.EMPTY_VALUE
);
histogram = metricCreator.createHistogramMetric(
"envoy_als_in_latency", "The process latency of service ALS metric receiver", MetricsTag.EMPTY_KEY,
MetricsTag.EMPTY_VALUE
);
sourceDispatcherCounter = metricCreator.createCounter(
"envoy_als_source_dispatch_count", "The count of envoy ALS metric received", MetricsTag.EMPTY_KEY,
MetricsTag.EMPTY_VALUE
);
} }
@Override @Override
...@@ -93,9 +105,10 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS ...@@ -93,9 +105,10 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS
StreamAccessLogsMessage.LogEntriesCase logCase = message.getLogEntriesCase(); StreamAccessLogsMessage.LogEntriesCase logCase = message.getLogEntriesCase();
if (LOGGER.isDebugEnabled()) { if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Messaged is identified from Envoy[{}], role[{}] in [{}]. Received msg {}", identifier LOGGER.debug(
.getNode() "Messaged is identified from Envoy[{}], role[{}] in [{}]. Received msg {}", identifier
.getId(), role, logCase, message); .getNode()
.getId(), role, logCase, message);
} }
switch (logCase) { switch (logCase) {
...@@ -103,10 +116,16 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS ...@@ -103,10 +116,16 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS
StreamAccessLogsMessage.HTTPAccessLogEntries logs = message.getHttpLogs(); StreamAccessLogsMessage.HTTPAccessLogEntries logs = message.getHttpLogs();
List<ServiceMeshMetric.Builder> sourceResult = new ArrayList<>(); List<ServiceMeshMetric.Builder> sourceResult = new ArrayList<>();
for (ALSHTTPAnalysis analysis : envoyHTTPAnalysisList) { for (final HTTPAccessLogEntry log : logs.getLogEntryList()) {
logs.getLogEntryList().forEach(log -> { for (ALSHTTPAnalysis analysis : envoyHTTPAnalysisList) {
sourceResult.addAll(analysis.analysis(identifier, log, role)); final List<ServiceMeshMetric.Builder> result =
}); analysis.analysis(identifier, log, role);
if (CollectionUtils.isNotEmpty(result)) {
// Once the analysis has results, don't need to continue analysis in lower priority analyzers.
sourceResult.addAll(result);
break;
}
}
} }
sourceDispatcherCounter.inc(sourceResult.size()); sourceDispatcherCounter.inc(sourceResult.size());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册