未验证 提交 41035510 编写于 作者: Z Zhenxu Ke 提交者: GitHub

bugfix: Envoy error logs are not persisted when no metrics are generated (#6911)

上级 667f3be3
...@@ -36,7 +36,8 @@ Release Notes. ...@@ -36,7 +36,8 @@ Release Notes.
* CVE: fix Jetty vulnerability. https://nvd.nist.gov/vuln/detail/CVE-2019-17638 * CVE: fix Jetty vulnerability. https://nvd.nist.gov/vuln/detail/CVE-2019-17638
* Fix: MAL function would miss samples name after creating new samples. * Fix: MAL function would miss samples name after creating new samples.
* perf: use iterator.remove() to remove modulesWithoutProvider * perf: use iterator.remove() to remove modulesWithoutProvider
* Support analyzing Envoy TCP access logs. * Support analyzing Envoy TCP access logs and persist error TCP logs.
* Fix: Envoy error logs are not persisted when no metrics are generated
#### UI #### UI
* Add logo for kong plugin. * Add logo for kong plugin.
......
...@@ -24,7 +24,9 @@ rules: ...@@ -24,7 +24,9 @@ rules:
abort {} abort {}
} }
extractor { extractor {
if (parsed?.response?.responseCode) {
tag 'status.code': parsed?.response?.responseCode as int tag 'status.code': parsed?.response?.responseCode as int
}
tag 'response.flag': parsed?.commonProperties?.responseFlags?.keySet() tag 'response.flag': parsed?.commonProperties?.responseFlags?.keySet()
} }
sink { sink {
......
...@@ -31,7 +31,9 @@ import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher; ...@@ -31,7 +31,9 @@ import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException; import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis; import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis;
import org.apache.skywalking.oap.server.receiver.envoy.als.AccessLogAnalyzer;
import org.apache.skywalking.oap.server.receiver.envoy.als.Role; import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
import org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer; import org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
...@@ -129,11 +131,13 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS ...@@ -129,11 +131,13 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS
StreamAccessLogsMessage.HTTPAccessLogEntries logs = message.getHttpLogs(); StreamAccessLogsMessage.HTTPAccessLogEntries logs = message.getHttpLogs();
for (final HTTPAccessLogEntry log : logs.getLogEntryList()) { for (final HTTPAccessLogEntry log : logs.getLogEntryList()) {
List<ServiceMeshMetric.Builder> result = new ArrayList<>(); AccessLogAnalyzer.Result result = AccessLogAnalyzer.Result.builder().build();
for (ALSHTTPAnalysis analysis : envoyHTTPAnalysisList) { for (ALSHTTPAnalysis analysis : envoyHTTPAnalysisList) {
result = analysis.analysis(result, identifier, log, role); result = analysis.analysis(result, identifier, log, role);
} }
sourceResult.addAll(result); if (CollectionUtils.isNotEmpty(result.getMetrics())) {
sourceResult.addAll(result.getMetrics());
}
} }
break; break;
...@@ -141,11 +145,13 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS ...@@ -141,11 +145,13 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS
StreamAccessLogsMessage.TCPAccessLogEntries tcpLogs = message.getTcpLogs(); StreamAccessLogsMessage.TCPAccessLogEntries tcpLogs = message.getTcpLogs();
for (final TCPAccessLogEntry tcpLog : tcpLogs.getLogEntryList()) { for (final TCPAccessLogEntry tcpLog : tcpLogs.getLogEntryList()) {
List<ServiceMeshMetric.Builder> result = new ArrayList<>(); AccessLogAnalyzer.Result result = AccessLogAnalyzer.Result.builder().build();
for (TCPAccessLogAnalyzer analyzer : envoyTCPAnalysisList) { for (TCPAccessLogAnalyzer analyzer : envoyTCPAnalysisList) {
result = analyzer.analysis(result, identifier, tcpLog, role); result = analyzer.analysis(result, identifier, tcpLog, role);
} }
sourceResult.addAll(result); if (CollectionUtils.isNotEmpty(result.getMetrics())) {
sourceResult.addAll(result.getMetrics());
}
} }
break; break;
......
...@@ -21,6 +21,8 @@ package org.apache.skywalking.oap.server.receiver.envoy.als; ...@@ -21,6 +21,8 @@ package org.apache.skywalking.oap.server.receiver.envoy.als;
import io.envoyproxy.envoy.config.core.v3.Node; import io.envoyproxy.envoy.config.core.v3.Node;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage; import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import java.util.List; import java.util.List;
import lombok.Builder;
import lombok.Data;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException; import org.apache.skywalking.oap.server.library.module.ModuleStartException;
...@@ -42,8 +44,8 @@ public interface AccessLogAnalyzer<E> { ...@@ -42,8 +44,8 @@ public interface AccessLogAnalyzer<E> {
* @param role the role of the Envoy node where the logs are emitted. * @param role the role of the Envoy node where the logs are emitted.
* @return the analysis results. * @return the analysis results.
*/ */
List<ServiceMeshMetric.Builder> analysis( Result analysis(
final List<ServiceMeshMetric.Builder> result, final Result result,
final StreamAccessLogsMessage.Identifier identifier, final StreamAccessLogsMessage.Identifier identifier,
final E entry, final E entry,
final Role role final Role role
...@@ -65,4 +67,18 @@ public interface AccessLogAnalyzer<E> { ...@@ -65,4 +67,18 @@ public interface AccessLogAnalyzer<E> {
} }
return defaultRole; return defaultRole;
} }
@Data
@Builder
class Result {
/**
* The service representing the Envoy node.
*/
private ServiceMetaInfo service;
/**
* The analyzed metrics result.
*/
private List<ServiceMeshMetric.Builder> metrics;
}
} }
...@@ -91,7 +91,7 @@ public class LogEntry2MetricsAdapter { ...@@ -91,7 +91,7 @@ public class LogEntry2MetricsAdapter {
.setDetectPoint(DetectPoint.client); .setDetectPoint(DetectPoint.client);
} }
protected ServiceMeshMetric.Builder adaptCommonPart() { public ServiceMeshMetric.Builder adaptCommonPart() {
final AccessLogCommon properties = entry.getCommonProperties(); final AccessLogCommon properties = entry.getCommonProperties();
final String endpoint = endpoint(); final String endpoint = endpoint();
int responseCode = entry.getResponse().getResponseCode().getValue(); int responseCode = entry.getResponse().getResponseCode().getValue();
......
...@@ -24,7 +24,6 @@ import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon; ...@@ -24,7 +24,6 @@ import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon;
import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry; import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage; import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
...@@ -60,17 +59,17 @@ public class K8sALSServiceMeshHTTPAnalysis extends AbstractALSAnalyzer { ...@@ -60,17 +59,17 @@ public class K8sALSServiceMeshHTTPAnalysis extends AbstractALSAnalyzer {
} }
@Override @Override
public List<ServiceMeshMetric.Builder> analysis( public Result analysis(
final List<ServiceMeshMetric.Builder> result, final Result result,
final StreamAccessLogsMessage.Identifier identifier, final StreamAccessLogsMessage.Identifier identifier,
final HTTPAccessLogEntry entry, final HTTPAccessLogEntry entry,
final Role role final Role role
) { ) {
if (isNotEmpty(result)) { if (isNotEmpty(result.getMetrics())) {
return result; return result;
} }
if (serviceRegistry.isEmpty()) { if (serviceRegistry.isEmpty()) {
return Collections.emptyList(); return Result.builder().build();
} }
switch (role) { switch (role) {
case PROXY: case PROXY:
...@@ -79,17 +78,17 @@ public class K8sALSServiceMeshHTTPAnalysis extends AbstractALSAnalyzer { ...@@ -79,17 +78,17 @@ public class K8sALSServiceMeshHTTPAnalysis extends AbstractALSAnalyzer {
return analyzeSideCar(entry); return analyzeSideCar(entry);
} }
return Collections.emptyList(); return Result.builder().build();
} }
protected List<ServiceMeshMetric.Builder> analyzeSideCar(final HTTPAccessLogEntry entry) { protected Result analyzeSideCar(final HTTPAccessLogEntry entry) {
if (!entry.hasCommonProperties()) { if (!entry.hasCommonProperties()) {
return Collections.emptyList(); return Result.builder().build();
} }
final AccessLogCommon properties = entry.getCommonProperties(); final AccessLogCommon properties = entry.getCommonProperties();
final String cluster = properties.getUpstreamCluster(); final String cluster = properties.getUpstreamCluster();
if (isBlank(cluster)) { if (isBlank(cluster)) {
return Collections.emptyList(); return Result.builder().build();
} }
final List<ServiceMeshMetric.Builder> sources = new ArrayList<>(); final List<ServiceMeshMetric.Builder> sources = new ArrayList<>();
...@@ -101,7 +100,7 @@ public class K8sALSServiceMeshHTTPAnalysis extends AbstractALSAnalyzer { ...@@ -101,7 +100,7 @@ public class K8sALSServiceMeshHTTPAnalysis extends AbstractALSAnalyzer {
final ServiceMetaInfo downstreamService = find(downstreamRemoteAddress.getSocketAddress().getAddress()); final ServiceMetaInfo downstreamService = find(downstreamRemoteAddress.getSocketAddress().getAddress());
final Address downstreamLocalAddress = properties.getDownstreamLocalAddress(); final Address downstreamLocalAddress = properties.getDownstreamLocalAddress();
if (!isValid(downstreamRemoteAddress) || !isValid(downstreamLocalAddress)) { if (!isValid(downstreamRemoteAddress) || !isValid(downstreamLocalAddress)) {
return Collections.emptyList(); return Result.builder().build();
} }
final ServiceMetaInfo localService = find(downstreamLocalAddress.getSocketAddress().getAddress()); final ServiceMetaInfo localService = find(downstreamLocalAddress.getSocketAddress().getAddress());
...@@ -125,7 +124,7 @@ public class K8sALSServiceMeshHTTPAnalysis extends AbstractALSAnalyzer { ...@@ -125,7 +124,7 @@ public class K8sALSServiceMeshHTTPAnalysis extends AbstractALSAnalyzer {
// sidecar(client side) -> sidecar // sidecar(client side) -> sidecar
final Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress(); final Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress();
if (!isValid(upstreamRemoteAddress)) { if (!isValid(upstreamRemoteAddress)) {
return sources; return Result.builder().metrics(sources).service(localService).build();
} }
final ServiceMetaInfo destService = find(upstreamRemoteAddress.getSocketAddress().getAddress()); final ServiceMetaInfo destService = find(upstreamRemoteAddress.getSocketAddress().getAddress());
...@@ -135,12 +134,12 @@ public class K8sALSServiceMeshHTTPAnalysis extends AbstractALSAnalyzer { ...@@ -135,12 +134,12 @@ public class K8sALSServiceMeshHTTPAnalysis extends AbstractALSAnalyzer {
sources.add(metric); sources.add(metric);
} }
return sources; return Result.builder().metrics(sources).service(localService).build();
} }
protected List<ServiceMeshMetric.Builder> analyzeProxy(final HTTPAccessLogEntry entry) { protected Result analyzeProxy(final HTTPAccessLogEntry entry) {
if (!entry.hasCommonProperties()) { if (!entry.hasCommonProperties()) {
return Collections.emptyList(); return Result.builder().build();
} }
final AccessLogCommon properties = entry.getCommonProperties(); final AccessLogCommon properties = entry.getCommonProperties();
final Address downstreamLocalAddress = properties.getDownstreamLocalAddress(); final Address downstreamLocalAddress = properties.getDownstreamLocalAddress();
...@@ -148,7 +147,7 @@ public class K8sALSServiceMeshHTTPAnalysis extends AbstractALSAnalyzer { ...@@ -148,7 +147,7 @@ public class K8sALSServiceMeshHTTPAnalysis extends AbstractALSAnalyzer {
properties.getDownstreamDirectRemoteAddress() : properties.getDownstreamRemoteAddress(); properties.getDownstreamDirectRemoteAddress() : properties.getDownstreamRemoteAddress();
final Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress(); final Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress();
if (!isValid(downstreamLocalAddress) || !isValid(downstreamRemoteAddress) || !isValid(upstreamRemoteAddress)) { if (!isValid(downstreamLocalAddress) || !isValid(downstreamRemoteAddress) || !isValid(upstreamRemoteAddress)) {
return Collections.emptyList(); return Result.builder().build();
} }
final List<ServiceMeshMetric.Builder> result = new ArrayList<>(2); final List<ServiceMeshMetric.Builder> result = new ArrayList<>(2);
...@@ -175,7 +174,7 @@ public class K8sALSServiceMeshHTTPAnalysis extends AbstractALSAnalyzer { ...@@ -175,7 +174,7 @@ public class K8sALSServiceMeshHTTPAnalysis extends AbstractALSAnalyzer {
log.debug("Transformed ingress outbound mesh metric {}", outboundMetric); log.debug("Transformed ingress outbound mesh metric {}", outboundMetric);
result.add(outboundMetric); result.add(outboundMetric);
return result; return Result.builder().metrics(result).service(ingress).build();
} }
/** /**
......
...@@ -23,8 +23,8 @@ import com.google.protobuf.TextFormat; ...@@ -23,8 +23,8 @@ import com.google.protobuf.TextFormat;
import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon; import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon;
import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry; import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage; import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import java.util.ArrayList;
import java.util.Base64; import java.util.Base64;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
...@@ -68,31 +68,32 @@ public class MetaExchangeALSHTTPAnalyzer extends AbstractALSAnalyzer { ...@@ -68,31 +68,32 @@ public class MetaExchangeALSHTTPAnalyzer extends AbstractALSAnalyzer {
} }
@Override @Override
public List<ServiceMeshMetric.Builder> analysis( public Result analysis(
final List<ServiceMeshMetric.Builder> result, final Result previousResult,
final StreamAccessLogsMessage.Identifier identifier, final StreamAccessLogsMessage.Identifier identifier,
final HTTPAccessLogEntry entry, final HTTPAccessLogEntry entry,
final Role role final Role role
) { ) {
if (isNotEmpty(result)) { if (isNotEmpty(previousResult.getMetrics())) {
return result; return previousResult;
} }
if (!entry.hasCommonProperties()) { if (!entry.hasCommonProperties()) {
return Collections.emptyList(); return previousResult;
}
final AccessLogCommon properties = entry.getCommonProperties();
final Map<String, Any> stateMap = properties.getFilterStateObjectsMap();
if (stateMap.isEmpty()) {
return Collections.emptyList();
} }
final ServiceMetaInfo currSvc; final ServiceMetaInfo currSvc;
try { try {
currSvc = adaptToServiceMetaInfo(identifier); currSvc = adaptToServiceMetaInfo(identifier);
} catch (Exception e) { } catch (Exception e) {
log.error("Failed to inflate the ServiceMetaInfo from identifier.node.metadata. ", e); log.error("Failed to inflate the ServiceMetaInfo from identifier.node.metadata. ", e);
return Collections.emptyList(); return previousResult;
}
final AccessLogCommon properties = entry.getCommonProperties();
final Map<String, Any> stateMap = properties.getFilterStateObjectsMap();
if (stateMap.isEmpty()) {
return Result.builder().service(currSvc).build();
} }
final List<ServiceMeshMetric.Builder> result = new ArrayList<>();
final AtomicBoolean downstreamExists = new AtomicBoolean(); final AtomicBoolean downstreamExists = new AtomicBoolean();
stateMap.forEach((key, value) -> { stateMap.forEach((key, value) -> {
if (!key.equals(UPSTREAM_KEY) && !key.equals(DOWNSTREAM_KEY)) { if (!key.equals(UPSTREAM_KEY) && !key.equals(DOWNSTREAM_KEY)) {
...@@ -131,7 +132,7 @@ public class MetaExchangeALSHTTPAnalyzer extends AbstractALSAnalyzer { ...@@ -131,7 +132,7 @@ public class MetaExchangeALSHTTPAnalyzer extends AbstractALSAnalyzer {
} }
result.add(metric); result.add(metric);
} }
return result; return Result.builder().metrics(result).service(currSvc).build();
} }
protected ServiceMetaInfo adaptToServiceMetaInfo(final Any value) throws Exception { protected ServiceMetaInfo adaptToServiceMetaInfo(final Any value) throws Exception {
......
...@@ -85,7 +85,7 @@ public class TCPLogEntry2MetricsAdapter { ...@@ -85,7 +85,7 @@ public class TCPLogEntry2MetricsAdapter {
.setDetectPoint(DetectPoint.client); .setDetectPoint(DetectPoint.client);
} }
protected ServiceMeshMetric.Builder adaptCommonPart() { public ServiceMeshMetric.Builder adaptCommonPart() {
final AccessLogCommon properties = entry.getCommonProperties(); final AccessLogCommon properties = entry.getCommonProperties();
final ConnectionProperties connectionProperties = entry.getConnectionProperties(); final ConnectionProperties connectionProperties = entry.getConnectionProperties();
final String tlsMode = parseTLS(properties.getTlsProperties()); final String tlsMode = parseTLS(properties.getTlsProperties());
......
...@@ -24,7 +24,6 @@ import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon; ...@@ -24,7 +24,6 @@ import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon;
import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry; import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage; import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
...@@ -59,36 +58,36 @@ public class K8sALSServiceMeshTCPAnalysis extends AbstractTCPAccessLogAnalyzer { ...@@ -59,36 +58,36 @@ public class K8sALSServiceMeshTCPAnalysis extends AbstractTCPAccessLogAnalyzer {
} }
@Override @Override
public List<ServiceMeshMetric.Builder> analysis( public Result analysis(
final List<ServiceMeshMetric.Builder> result, final Result previousResult,
final StreamAccessLogsMessage.Identifier identifier, final StreamAccessLogsMessage.Identifier identifier,
final TCPAccessLogEntry entry, final TCPAccessLogEntry entry,
final Role role final Role role
) { ) {
if (isNotEmpty(result)) { if (isNotEmpty(previousResult.getMetrics())) {
return result; return previousResult;
} }
if (serviceRegistry.isEmpty()) { if (serviceRegistry.isEmpty()) {
return Collections.emptyList(); return previousResult;
} }
switch (role) { switch (role) {
case PROXY: case PROXY:
return analyzeProxy(entry); return analyzeProxy(previousResult, entry);
case SIDECAR: case SIDECAR:
return analyzeSideCar(entry); return analyzeSideCar(previousResult, entry);
} }
return Collections.emptyList(); return previousResult;
} }
protected List<ServiceMeshMetric.Builder> analyzeSideCar(final TCPAccessLogEntry entry) { protected Result analyzeSideCar(final Result previousResult, final TCPAccessLogEntry entry) {
final AccessLogCommon properties = entry.getCommonProperties(); final AccessLogCommon properties = entry.getCommonProperties();
if (properties == null) { if (properties == null) {
return Collections.emptyList(); return previousResult;
} }
final String cluster = properties.getUpstreamCluster(); final String cluster = properties.getUpstreamCluster();
if (cluster == null) { if (cluster == null) {
return Collections.emptyList(); return previousResult;
} }
final List<ServiceMeshMetric.Builder> sources = new ArrayList<>(); final List<ServiceMeshMetric.Builder> sources = new ArrayList<>();
...@@ -128,20 +127,20 @@ public class K8sALSServiceMeshTCPAnalysis extends AbstractTCPAccessLogAnalyzer { ...@@ -128,20 +127,20 @@ public class K8sALSServiceMeshTCPAnalysis extends AbstractTCPAccessLogAnalyzer {
sources.add(metric); sources.add(metric);
} }
return sources; return Result.builder().metrics(sources).service(localService).build();
} }
protected List<ServiceMeshMetric.Builder> analyzeProxy(final TCPAccessLogEntry entry) { protected Result analyzeProxy(final Result previousResult, final TCPAccessLogEntry entry) {
final AccessLogCommon properties = entry.getCommonProperties(); final AccessLogCommon properties = entry.getCommonProperties();
if (properties == null) { if (properties == null) {
return Collections.emptyList(); return previousResult;
} }
final Address downstreamLocalAddress = properties.getDownstreamLocalAddress(); final Address downstreamLocalAddress = properties.getDownstreamLocalAddress();
final Address downstreamRemoteAddress = properties.hasDownstreamDirectRemoteAddress() ? final Address downstreamRemoteAddress = properties.hasDownstreamDirectRemoteAddress() ?
properties.getDownstreamDirectRemoteAddress() : properties.getDownstreamRemoteAddress(); properties.getDownstreamDirectRemoteAddress() : properties.getDownstreamRemoteAddress();
final Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress(); final Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress();
if (downstreamLocalAddress == null || downstreamRemoteAddress == null || upstreamRemoteAddress == null) { if (downstreamLocalAddress == null || downstreamRemoteAddress == null || upstreamRemoteAddress == null) {
return Collections.emptyList(); return previousResult;
} }
final List<ServiceMeshMetric.Builder> result = new ArrayList<>(2); final List<ServiceMeshMetric.Builder> result = new ArrayList<>(2);
...@@ -168,7 +167,7 @@ public class K8sALSServiceMeshTCPAnalysis extends AbstractTCPAccessLogAnalyzer { ...@@ -168,7 +167,7 @@ public class K8sALSServiceMeshTCPAnalysis extends AbstractTCPAccessLogAnalyzer {
log.debug("Transformed ingress outbound mesh metric {}", outboundMetric); log.debug("Transformed ingress outbound mesh metric {}", outboundMetric);
result.add(outboundMetric); result.add(outboundMetric);
return result; return Result.builder().metrics(result).service(ingress).build();
} }
/** /**
......
...@@ -23,8 +23,8 @@ import com.google.protobuf.TextFormat; ...@@ -23,8 +23,8 @@ import com.google.protobuf.TextFormat;
import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon; import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon;
import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry; import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage; import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import java.util.ArrayList;
import java.util.Base64; import java.util.Base64;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
...@@ -67,31 +67,32 @@ public class MetaExchangeTCPAccessLogAnalyzer extends AbstractTCPAccessLogAnalyz ...@@ -67,31 +67,32 @@ public class MetaExchangeTCPAccessLogAnalyzer extends AbstractTCPAccessLogAnalyz
} }
@Override @Override
public List<ServiceMeshMetric.Builder> analysis( public Result analysis(
final List<ServiceMeshMetric.Builder> result, final Result previousResult,
final StreamAccessLogsMessage.Identifier identifier, final StreamAccessLogsMessage.Identifier identifier,
final TCPAccessLogEntry entry, final TCPAccessLogEntry entry,
final Role role final Role role
) { ) {
if (isNotEmpty(result)) { if (isNotEmpty(previousResult.getMetrics())) {
return result; return previousResult;
} }
if (!entry.hasCommonProperties()) { if (!entry.hasCommonProperties()) {
return Collections.emptyList(); return previousResult;
}
final AccessLogCommon properties = entry.getCommonProperties();
final Map<String, Any> stateMap = properties.getFilterStateObjectsMap();
if (stateMap.isEmpty()) {
return Collections.emptyList();
} }
final ServiceMetaInfo currSvc; final ServiceMetaInfo currSvc;
try { try {
currSvc = adaptToServiceMetaInfo(identifier); currSvc = adaptToServiceMetaInfo(identifier);
} catch (Exception e) { } catch (Exception e) {
log.error("Failed to inflate the ServiceMetaInfo from identifier.node.metadata. ", e); log.error("Failed to inflate the ServiceMetaInfo from identifier.node.metadata. ", e);
return Collections.emptyList(); return previousResult;
}
final AccessLogCommon properties = entry.getCommonProperties();
final Map<String, Any> stateMap = properties.getFilterStateObjectsMap();
if (stateMap.isEmpty()) {
return Result.builder().service(currSvc).build();
} }
final List<ServiceMeshMetric.Builder> result = new ArrayList<>();
final AtomicBoolean downstreamExists = new AtomicBoolean(); final AtomicBoolean downstreamExists = new AtomicBoolean();
stateMap.forEach((key, value) -> { stateMap.forEach((key, value) -> {
if (!key.equals(UPSTREAM_KEY) && !key.equals(DOWNSTREAM_KEY)) { if (!key.equals(UPSTREAM_KEY) && !key.equals(DOWNSTREAM_KEY)) {
...@@ -130,7 +131,7 @@ public class MetaExchangeTCPAccessLogAnalyzer extends AbstractTCPAccessLogAnalyz ...@@ -130,7 +131,7 @@ public class MetaExchangeTCPAccessLogAnalyzer extends AbstractTCPAccessLogAnalyz
} }
result.add(metric); result.add(metric);
} }
return result; return Result.builder().metrics(result).service(currSvc).build();
} }
protected ServiceMetaInfo adaptToServiceMetaInfo(final Any value) throws Exception { protected ServiceMetaInfo adaptToServiceMetaInfo(final Any value) throws Exception {
......
...@@ -18,13 +18,9 @@ ...@@ -18,13 +18,9 @@
package org.apache.skywalking.oap.server.receiver.envoy.persistence; package org.apache.skywalking.oap.server.receiver.envoy.persistence;
import com.google.protobuf.TextFormat;
import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry; import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage; import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import java.io.IOException;
import java.util.List;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.common.v3.DetectPoint;
import org.apache.skywalking.apm.network.logging.v3.JSONLog; import org.apache.skywalking.apm.network.logging.v3.JSONLog;
import org.apache.skywalking.apm.network.logging.v3.LogData; import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.apm.network.logging.v3.LogDataBody; import org.apache.skywalking.apm.network.logging.v3.LogDataBody;
...@@ -35,7 +31,9 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager; ...@@ -35,7 +31,9 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException; import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig; import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis; import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis;
import org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter;
import org.apache.skywalking.oap.server.receiver.envoy.als.Role; import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
import static org.apache.skywalking.oap.server.library.util.ProtoBufJsonUtils.toJSON; import static org.apache.skywalking.oap.server.library.util.ProtoBufJsonUtils.toJSON;
...@@ -59,27 +57,15 @@ public class LogsPersistence implements ALSHTTPAnalysis { ...@@ -59,27 +57,15 @@ public class LogsPersistence implements ALSHTTPAnalysis {
} }
@Override @Override
public List<ServiceMeshMetric.Builder> analysis( public Result analysis(
final List<ServiceMeshMetric.Builder> result, final Result result,
final StreamAccessLogsMessage.Identifier identifier, final StreamAccessLogsMessage.Identifier identifier,
final HTTPAccessLogEntry entry, final HTTPAccessLogEntry entry,
final Role role final Role role
) { ) {
try { try {
result.stream() final LogData logData = convertToLogData(entry, result);
.findFirst()
.ifPresent(metrics -> {
try {
final LogData logData = convertToLogData(entry, metrics);
logAnalyzerService.doAnalysis(logData); logAnalyzerService.doAnalysis(logData);
} catch (IOException e) {
log.error(
"Failed to parse error log entry to log data: {}",
TextFormat.shortDebugString(entry),
e
);
}
});
} catch (final Exception e) { } catch (final Exception e) {
log.error("Failed to persist Envoy access log", e); log.error("Failed to persist Envoy access log", e);
} }
...@@ -92,16 +78,16 @@ public class LogsPersistence implements ALSHTTPAnalysis { ...@@ -92,16 +78,16 @@ public class LogsPersistence implements ALSHTTPAnalysis {
} }
public LogData convertToLogData(final HTTPAccessLogEntry logEntry, public LogData convertToLogData(final HTTPAccessLogEntry logEntry,
final ServiceMeshMetric.Builder metrics) throws IOException { final Result result) throws Exception {
final boolean isServerSide = metrics.getDetectPoint() == DetectPoint.server;
final String svc = isServerSide ? metrics.getDestServiceName() : metrics.getSourceServiceName(); final ServiceMetaInfo service = result.getService();
final String svcInst = isServerSide ? metrics.getDestServiceInstance() : metrics.getSourceServiceInstance();
final ServiceMeshMetric.Builder metrics = new LogEntry2MetricsAdapter(logEntry, null, null).adaptCommonPart();
return LogData return LogData
.newBuilder() .newBuilder()
.setService(svc) .setService(service.getServiceName())
.setServiceInstance(svcInst) .setServiceInstance(service.getServiceInstanceName())
.setEndpoint(metrics.getEndpoint())
.setTimestamp(metrics.getEndTime()) .setTimestamp(metrics.getEndTime())
.setBody( .setBody(
LogDataBody LogDataBody
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.envoy.persistence;
import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.logging.v3.JSONLog;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.apm.network.logging.v3.LogDataBody;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
import org.apache.skywalking.oap.log.analyzer.module.LogAnalyzerModule;
import org.apache.skywalking.oap.log.analyzer.provider.log.ILogAnalyzerService;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
import org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer;
import org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPLogEntry2MetricsAdapter;
import static org.apache.skywalking.oap.server.library.util.ProtoBufJsonUtils.toJSON;
/**
* {@code LogsPersistence} analyzes the error logs and persists them to the log system.
*/
@Slf4j
public class TCPLogsPersistence implements TCPAccessLogAnalyzer {
private ILogAnalyzerService logAnalyzerService;
@Override
public String name() {
return "persistence";
}
@Override
public void init(final ModuleManager manager, final EnvoyMetricReceiverConfig config) throws ModuleStartException {
logAnalyzerService = manager.find(LogAnalyzerModule.NAME)
.provider()
.getService(ILogAnalyzerService.class);
}
@Override
public Result analysis(
final Result result,
final StreamAccessLogsMessage.Identifier identifier,
final TCPAccessLogEntry entry,
final Role role
) {
try {
final LogData logData = convertToLogData(entry, result);
logAnalyzerService.doAnalysis(logData);
} catch (final Exception e) {
log.error("Failed to persist Envoy access log", e);
}
return result;
}
@Override
public Role identify(final StreamAccessLogsMessage.Identifier alsIdentifier, final Role prev) {
return prev;
}
public LogData convertToLogData(final TCPAccessLogEntry logEntry,
final Result result) throws Exception {
final ServiceMetaInfo service = result.getService();
final ServiceMeshMetric.Builder metrics = new TCPLogEntry2MetricsAdapter(logEntry, null, null).adaptCommonPart();
return LogData
.newBuilder()
.setService(service.getServiceName())
.setServiceInstance(service.getServiceInstanceName())
.setTimestamp(metrics.getEndTime())
.setBody(
LogDataBody
.newBuilder()
.setJson(
JSONLog
.newBuilder()
.setJson(toJSON(logEntry))
)
)
.build();
}
}
...@@ -18,3 +18,4 @@ ...@@ -18,3 +18,4 @@
org.apache.skywalking.oap.server.receiver.envoy.als.tcp.k8s.K8sALSServiceMeshTCPAnalysis org.apache.skywalking.oap.server.receiver.envoy.als.tcp.k8s.K8sALSServiceMeshTCPAnalysis
org.apache.skywalking.oap.server.receiver.envoy.als.tcp.mx.MetaExchangeTCPAccessLogAnalyzer org.apache.skywalking.oap.server.receiver.envoy.als.tcp.mx.MetaExchangeTCPAccessLogAnalyzer
org.apache.skywalking.oap.server.receiver.envoy.persistence.TCPLogsPersistence
...@@ -23,13 +23,12 @@ import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage; ...@@ -23,13 +23,12 @@ import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.apm.network.common.v3.DetectPoint; import org.apache.skywalking.apm.network.common.v3.DetectPoint;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig; import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
import org.apache.skywalking.oap.server.receiver.envoy.MetricServiceGRPCHandlerTestMain; import org.apache.skywalking.oap.server.receiver.envoy.MetricServiceGRPCHandlerTestMain;
import org.apache.skywalking.oap.server.receiver.envoy.als.AccessLogAnalyzer;
import org.apache.skywalking.oap.server.receiver.envoy.als.Role; import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo; import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
import org.junit.Assert; import org.junit.Assert;
...@@ -78,16 +77,16 @@ public class K8SALSServiceMeshHTTPAnalysisTest { ...@@ -78,16 +77,16 @@ public class K8SALSServiceMeshHTTPAnalysisTest {
StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder(); StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
JsonFormat.parser().merge(isr, requestBuilder); JsonFormat.parser().merge(isr, requestBuilder);
List<ServiceMeshMetric.Builder> result = this.analysis.analysis(new ArrayList<>(), requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.PROXY); AccessLogAnalyzer.Result result = this.analysis.analysis(AccessLogAnalyzer.Result.builder().build(), requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.PROXY);
Assert.assertEquals(2, result.size()); Assert.assertEquals(2, result.getMetrics().size());
ServiceMeshMetric.Builder incoming = result.get(0); ServiceMeshMetric.Builder incoming = result.getMetrics().get(0);
Assert.assertEquals("UNKNOWN", incoming.getSourceServiceName()); Assert.assertEquals("UNKNOWN", incoming.getSourceServiceName());
Assert.assertEquals("ingress", incoming.getDestServiceName()); Assert.assertEquals("ingress", incoming.getDestServiceName());
Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint()); Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint());
ServiceMeshMetric.Builder outgoing = result.get(1); ServiceMeshMetric.Builder outgoing = result.getMetrics().get(1);
Assert.assertEquals("ingress", outgoing.getSourceServiceName()); Assert.assertEquals("ingress", outgoing.getSourceServiceName());
Assert.assertEquals("productpage", outgoing.getDestServiceName()); Assert.assertEquals("productpage", outgoing.getDestServiceName());
Assert.assertEquals(DetectPoint.client, outgoing.getDetectPoint()); Assert.assertEquals(DetectPoint.client, outgoing.getDetectPoint());
...@@ -100,11 +99,11 @@ public class K8SALSServiceMeshHTTPAnalysisTest { ...@@ -100,11 +99,11 @@ public class K8SALSServiceMeshHTTPAnalysisTest {
StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder(); StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
JsonFormat.parser().merge(isr, requestBuilder); JsonFormat.parser().merge(isr, requestBuilder);
List<ServiceMeshMetric.Builder> result = this.analysis.analysis(new ArrayList<>(), requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR); AccessLogAnalyzer.Result result = this.analysis.analysis(AccessLogAnalyzer.Result.builder().build(), requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
Assert.assertEquals(1, result.size()); Assert.assertEquals(1, result.getMetrics().size());
ServiceMeshMetric.Builder incoming = result.get(0); ServiceMeshMetric.Builder incoming = result.getMetrics().get(0);
Assert.assertEquals("", incoming.getSourceServiceName()); Assert.assertEquals("", incoming.getSourceServiceName());
Assert.assertEquals("productpage", incoming.getDestServiceName()); Assert.assertEquals("productpage", incoming.getDestServiceName());
Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint()); Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint());
...@@ -117,11 +116,11 @@ public class K8SALSServiceMeshHTTPAnalysisTest { ...@@ -117,11 +116,11 @@ public class K8SALSServiceMeshHTTPAnalysisTest {
StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder(); StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
JsonFormat.parser().merge(isr, requestBuilder); JsonFormat.parser().merge(isr, requestBuilder);
List<ServiceMeshMetric.Builder> result = this.analysis.analysis(new ArrayList<>(), requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR); AccessLogAnalyzer.Result result = this.analysis.analysis(AccessLogAnalyzer.Result.builder().build(), requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
Assert.assertEquals(1, result.size()); Assert.assertEquals(1, result.getMetrics().size());
ServiceMeshMetric.Builder incoming = result.get(0); ServiceMeshMetric.Builder incoming = result.getMetrics().get(0);
Assert.assertEquals("productpage", incoming.getSourceServiceName()); Assert.assertEquals("productpage", incoming.getSourceServiceName());
Assert.assertEquals("review", incoming.getDestServiceName()); Assert.assertEquals("review", incoming.getDestServiceName());
Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint()); Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint());
...@@ -134,11 +133,11 @@ public class K8SALSServiceMeshHTTPAnalysisTest { ...@@ -134,11 +133,11 @@ public class K8SALSServiceMeshHTTPAnalysisTest {
StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder(); StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder();
JsonFormat.parser().merge(isr, requestBuilder); JsonFormat.parser().merge(isr, requestBuilder);
List<ServiceMeshMetric.Builder> result = this.analysis.analysis(new ArrayList<>(), requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR); AccessLogAnalyzer.Result result = this.analysis.analysis(AccessLogAnalyzer.Result.builder().build(), requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
Assert.assertEquals(1, result.size()); Assert.assertEquals(1, result.getMetrics().size());
ServiceMeshMetric.Builder incoming = result.get(0); ServiceMeshMetric.Builder incoming = result.getMetrics().get(0);
Assert.assertEquals("productpage", incoming.getSourceServiceName()); Assert.assertEquals("productpage", incoming.getSourceServiceName());
Assert.assertEquals("detail", incoming.getDestServiceName()); Assert.assertEquals("detail", incoming.getDestServiceName());
Assert.assertEquals(DetectPoint.client, incoming.getDetectPoint()); Assert.assertEquals(DetectPoint.client, incoming.getDetectPoint());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册