未验证 提交 982001c6 编写于 作者: K kezhenxu94 提交者: GitHub

perf: optimize Envoy access log persistence performance (#7290)

上级 6228d450
......@@ -80,6 +80,7 @@ Release Notes.
* Add thread state metric and class loaded info metric to JVMMetric.
* Performance: compile LAL DSL statically and run with type checked.
* Add pagination to event query protocol.
* Performance: optimize Envoy error logs persistence performance.
#### UI
......
......@@ -18,6 +18,10 @@
package org.apache.skywalking.oap.log.analyzer.dsl;
import com.google.protobuf.Message;
import groovy.lang.Closure;
import groovy.lang.GroovyObjectSupport;
import groovy.lang.MissingPropertyException;
import java.util.Map;
import java.util.regex.Matcher;
import lombok.Getter;
......@@ -44,6 +48,7 @@ public class Binding extends groovy.lang.Binding {
setProperty(KEY_LOG, log);
setProperty(KEY_SAVE, true);
setProperty(KEY_ABORT, false);
parsed().log = log;
return this;
}
......@@ -55,6 +60,15 @@ public class Binding extends groovy.lang.Binding {
return (LogData.Builder) getProperty(KEY_LOG);
}
public Binding extraLog(final Message extraLog) {
parsed().extraLog = extraLog;
return this;
}
public Message extraLog() {
return parsed().getExtraLog();
}
public Binding parsed(final Matcher parsed) {
parsed().matcher = parsed;
return this;
......@@ -92,19 +106,32 @@ public class Binding extends groovy.lang.Binding {
return (boolean) getProperty(KEY_ABORT);
}
public static class Parsed {
public static class Parsed extends GroovyObjectSupport {
@Getter
private Matcher matcher;
@Getter
private Map<String, Object> map;
@Getter
private Message.Builder log;
@Getter
private Message extraLog;
public Object getAt(final String key) {
if (matcher != null) {
return matcher.group(key);
Object result;
if (matcher != null && (result = matcher.group(key)) != null) {
return result;
}
if (map != null && (result = map.get(key)) != null) {
return result;
}
if (map != null) {
return map.get(key);
if (extraLog != null && (result = getField(extraLog, key)) != null) {
return result;
}
if (log != null && (result = getField(log, key)) != null) {
return result;
}
return null;
}
......@@ -113,5 +140,15 @@ public class Binding extends groovy.lang.Binding {
public Object propertyMissing(final String name) {
return getAt(name);
}
static Object getField(Object obj, String name) {
try {
Closure<?> c = new Closure<Object>(obj, obj) {
};
return c.getProperty(name);
} catch (MissingPropertyException ignored) {
}
return null;
}
}
}
......@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.log.analyzer.dsl.spec.filter;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.protobuf.Message;
import com.google.protobuf.TextFormat;
import groovy.lang.Closure;
import groovy.lang.DelegatesTo;
......@@ -153,6 +154,7 @@ public class FilterSpec extends AbstractSpec {
final Binding b = BINDING.get();
final LogData.Builder logData = b.log();
final Message extraLog = b.extraLog();
if (!b.shouldSave()) {
if (LOGGER.isDebugEnabled()) {
......@@ -163,7 +165,7 @@ public class FilterSpec extends AbstractSpec {
factories.stream()
.map(LogAnalysisListenerFactory::create)
.forEach(it -> it.parse(logData).build());
.forEach(it -> it.parse(logData, extraLog).build());
}
@SuppressWarnings("unused")
......
......@@ -17,6 +17,7 @@
package org.apache.skywalking.oap.log.analyzer.provider.log;
import com.google.protobuf.Message;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.oap.server.library.module.Service;
......@@ -25,10 +26,10 @@ import org.apache.skywalking.oap.server.library.module.Service;
*/
public interface ILogAnalyzerService extends Service {
void doAnalysis(LogData.Builder log);
void doAnalysis(LogData.Builder log, Message extraLog);
default void doAnalysis(LogData logData) {
doAnalysis(logData.toBuilder());
default void doAnalysis(LogData logData, Message extraLog) {
doAnalysis(logData.toBuilder(), extraLog);
}
}
......@@ -17,6 +17,7 @@
package org.apache.skywalking.oap.log.analyzer.provider.log;
import com.google.protobuf.Message;
import java.util.ArrayList;
import java.util.List;
import lombok.RequiredArgsConstructor;
......@@ -39,7 +40,7 @@ public class LogAnalyzer {
private final List<LogAnalysisListener> listeners = new ArrayList<>();
public void doAnalysis(LogData.Builder builder) {
public void doAnalysis(LogData.Builder builder, Message extraLog) {
if (StringUtil.isEmpty(builder.getService())) {
// If the service name is empty, the log will be ignored.
log.debug("The log is ignored because the Service name is empty");
......@@ -51,12 +52,12 @@ public class LogAnalyzer {
builder.setTimestamp(System.currentTimeMillis());
}
notifyListener(builder);
notifyListener(builder, extraLog);
notifyListenerToBuild();
}
private void notifyListener(LogData.Builder builder) {
listeners.forEach(listener -> listener.parse(builder));
private void notifyListener(LogData.Builder builder, final Message extraLog) {
listeners.forEach(listener -> listener.parse(builder, extraLog));
}
private void notifyListenerToBuild() {
......
......@@ -17,6 +17,7 @@
package org.apache.skywalking.oap.log.analyzer.provider.log;
import com.google.protobuf.Message;
import java.util.ArrayList;
import java.util.List;
import lombok.RequiredArgsConstructor;
......@@ -32,9 +33,9 @@ public class LogAnalyzerServiceImpl implements ILogAnalyzerService, ILogAnalysis
private final List<LogAnalysisListenerFactory> factories = new ArrayList<>();
@Override
public void doAnalysis(final LogData.Builder log) {
public void doAnalysis(final LogData.Builder log, Message extraLog) {
LogAnalyzer analyzer = new LogAnalyzer(moduleManager, moduleConfig, this);
analyzer.doAnalysis(log);
analyzer.doAnalysis(log, extraLog);
}
@Override
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.log.analyzer.provider.log.analyzer;
public class LogAnalyzerFactory {
}
......@@ -17,6 +17,7 @@
package org.apache.skywalking.oap.log.analyzer.provider.log.listener;
import com.google.protobuf.Message;
import org.apache.skywalking.apm.network.logging.v3.LogData;
/**
......@@ -33,5 +34,5 @@ public interface LogAnalysisListener {
* Parse the raw data from the probe.
* @return {@code this} for chaining.
*/
LogAnalysisListener parse(LogData.Builder logData);
LogAnalysisListener parse(LogData.Builder logData, final Message extraLog);
}
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.log.analyzer.provider.log.listener;
import com.google.protobuf.Message;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
......@@ -48,8 +49,10 @@ public class LogFilterListener implements LogAnalysisListener {
}
@Override
public LogAnalysisListener parse(final LogData.Builder logData) {
dsls.forEach(dsl -> dsl.bind(new Binding().log(logData.build())));
public LogAnalysisListener parse(final LogData.Builder logData,
final Message extraLog) {
dsls.forEach(dsl -> dsl.bind(new Binding().log(logData.build())
.extraLog(extraLog)));
return this;
}
......
......@@ -17,12 +17,14 @@
package org.apache.skywalking.oap.log.analyzer.provider.log.listener;
import com.google.protobuf.Message;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.apm.network.logging.v3.LogDataBody;
import org.apache.skywalking.apm.network.logging.v3.TraceContext;
......@@ -41,6 +43,8 @@ import org.apache.skywalking.oap.server.core.source.Log;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import static org.apache.skywalking.oap.server.library.util.ProtoBufJsonUtils.toJSON;
/**
* RecordAnalysisListener forwards the log data to the persistence layer with the query required conditions.
*/
......@@ -57,7 +61,9 @@ public class RecordAnalysisListener implements LogAnalysisListener {
}
@Override
public LogAnalysisListener parse(final LogData.Builder logData) {
@SneakyThrows
public LogAnalysisListener parse(final LogData.Builder logData,
final Message extraLog) {
LogDataBody body = logData.getBody();
log.setUniqueId(UUID.randomUUID().toString().replace("-", ""));
// timestamp
......@@ -100,6 +106,9 @@ public class RecordAnalysisListener implements LogAnalysisListener {
} else if (body.hasJson()) {
log.setContentType(ContentType.JSON);
log.setContent(body.getJson().getJson());
} else if (extraLog != null) {
log.setContentType(ContentType.JSON);
log.setContent(toJSON(extraLog));
}
if (logData.getTags().getDataCount() > 0) {
log.setTagsRawData(logData.getTags().toByteArray());
......
......@@ -17,6 +17,7 @@
package org.apache.skywalking.oap.log.analyzer.provider.log.listener;
import com.google.protobuf.Message;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.apm.util.StringUtil;
......@@ -61,7 +62,8 @@ public class TrafficAnalysisListener implements LogAnalysisListener {
}
@Override
public LogAnalysisListener parse(final LogData.Builder logData) {
public LogAnalysisListener parse(final LogData.Builder logData,
final Message extraLog) {
final long timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(), DownSampling.Minute);
// to service traffic
String serviceName = namingControl.formatServiceName(logData.getService());
......
......@@ -17,24 +17,22 @@ rules:
- name: envoy-als
dsl: |
filter {
json {
}
// only collect abnormal logs (http status code >= 300, or commonProperties?.responseFlags is not empty)
if (parsed?.response?.responseCode as Integer < 400 && !parsed?.commonProperties?.responseFlags) {
if (parsed?.response?.responseCode?.value as Integer < 400 && !parsed?.commonProperties?.responseFlags?.toString()?.trim()) {
abort {}
}
extractor {
if (parsed?.response?.responseCode) {
tag 'status.code': parsed?.response?.responseCode as int
tag 'status.code': parsed?.response?.responseCode?.value
}
tag 'response.flag': (parsed?.commonProperties?.responseFlags as Map)?.keySet()
tag 'response.flag': parsed?.commonProperties?.responseFlags
}
sink {
sampler {
if (parsed?.commonProperties?.responseFlags) {
if (parsed?.commonProperties?.responseFlags?.toString()) {
// use service:errorCode as sampler id so that each service:errorCode has its own sampler,
// e.g. checkoutservice:[upstreamConnectionFailure], checkoutservice:[upstreamRetryLimitExceeded]
rateLimit("${log.service}:${(parsed?.commonProperties?.responseFlags as Map)?.keySet()}") {
rateLimit("${log.service}:${parsed?.commonProperties?.responseFlags?.toString()}") {
qps 100
}
} else {
......
......@@ -44,6 +44,7 @@ public abstract class AbstractLogRecord extends Record {
public static final String SPAN_ID = "span_id";
public static final String CONTENT_TYPE = "content_type";
public static final String CONTENT = "content";
public static final String CONTENT_TYPE_CLASS = "content_type_class";
public static final String TAGS_RAW_DATA = "tags_raw_data";
public static final String TIMESTAMP = "timestamp";
public static final String TAGS = "tags";
......
......@@ -77,7 +77,7 @@ public class LogHandler implements KafkaHandler {
public void handle(final ConsumerRecord<String, Bytes> record) {
try (HistogramMetrics.Timer ignore = histogram.createTimer()) {
LogData logData = parseConsumerRecord(record);
logAnalyzerService.doAnalysis(logData);
logAnalyzerService.doAnalysis(logData, null);
} catch (Exception e) {
errorCounter.inc();
log.error(e.getMessage(), e);
......
......@@ -21,9 +21,7 @@ package org.apache.skywalking.oap.server.receiver.envoy.persistence;
import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.logging.v3.JSONLog;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.apm.network.logging.v3.LogDataBody;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
import org.apache.skywalking.oap.log.analyzer.module.LogAnalyzerModule;
import org.apache.skywalking.oap.log.analyzer.provider.log.ILogAnalyzerService;
......@@ -35,8 +33,6 @@ import org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapt
import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
import static org.apache.skywalking.oap.server.library.util.ProtoBufJsonUtils.toJSON;
/**
* {@code LogsPersistence} analyzes the error logs and persists them to the log system.
*/
......@@ -69,7 +65,7 @@ public class LogsPersistence implements ALSHTTPAnalysis {
}
final LogData logData = convertToLogData(entry, result);
logAnalyzerService.doAnalysis(logData);
logAnalyzerService.doAnalysis(logData, entry);
} catch (final Exception e) {
log.error("Failed to persist Envoy access log", e);
}
......@@ -81,27 +77,17 @@ public class LogsPersistence implements ALSHTTPAnalysis {
return prev;
}
public LogData convertToLogData(final HTTPAccessLogEntry logEntry,
final Result result) throws Exception {
public LogData convertToLogData(final HTTPAccessLogEntry logEntry, final Result result) {
final ServiceMetaInfo service = result.getService();
final ServiceMeshMetric.Builder metrics = new LogEntry2MetricsAdapter(logEntry, null, null).adaptCommonPart();
final ServiceMeshMetric.Builder metrics =
new LogEntry2MetricsAdapter(logEntry, null, null).adaptCommonPart();
return LogData
.newBuilder()
.setService(service.getServiceName())
.setServiceInstance(service.getServiceInstanceName())
.setTimestamp(metrics.getEndTime())
.setBody(
LogDataBody
.newBuilder()
.setJson(
JSONLog
.newBuilder()
.setJson(toJSON(logEntry))
)
)
.build();
}
}
......@@ -21,9 +21,7 @@ package org.apache.skywalking.oap.server.receiver.envoy.persistence;
import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.logging.v3.JSONLog;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.apm.network.logging.v3.LogDataBody;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
import org.apache.skywalking.oap.log.analyzer.module.LogAnalyzerModule;
import org.apache.skywalking.oap.log.analyzer.provider.log.ILogAnalyzerService;
......@@ -35,8 +33,6 @@ import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
import org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer;
import org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPLogEntry2MetricsAdapter;
import static org.apache.skywalking.oap.server.library.util.ProtoBufJsonUtils.toJSON;
/**
* {@code LogsPersistence} analyzes the error logs and persists them to the log system.
*/
......@@ -69,7 +65,7 @@ public class TCPLogsPersistence implements TCPAccessLogAnalyzer {
}
final LogData logData = convertToLogData(entry, result);
logAnalyzerService.doAnalysis(logData);
logAnalyzerService.doAnalysis(logData, entry);
} catch (final Exception e) {
log.error("Failed to persist Envoy access log", e);
}
......@@ -81,27 +77,17 @@ public class TCPLogsPersistence implements TCPAccessLogAnalyzer {
return prev;
}
public LogData convertToLogData(final TCPAccessLogEntry logEntry,
final Result result) throws Exception {
public LogData convertToLogData(final TCPAccessLogEntry logEntry, final Result result) {
final ServiceMetaInfo service = result.getService();
final ServiceMeshMetric.Builder metrics = new TCPLogEntry2MetricsAdapter(logEntry, null, null).adaptCommonPart();
final ServiceMeshMetric.Builder metrics =
new TCPLogEntry2MetricsAdapter(logEntry, null, null).adaptCommonPart();
return LogData
.newBuilder()
.setService(service.getServiceName())
.setServiceInstance(service.getServiceInstanceName())
.setTimestamp(metrics.getEndTime())
.setBody(
LogDataBody
.newBuilder()
.setJson(
JSONLog
.newBuilder()
.setJson(toJSON(logEntry))
)
)
.build();
}
}
......@@ -89,7 +89,7 @@ public class LogReportServiceHandler extends LogReportServiceGrpc.LogReportServi
try {
LogData.Builder builder = logData.toBuilder();
setServiceName(builder);
logAnalyzerService.doAnalysis(builder);
logAnalyzerService.doAnalysis(builder, null);
} catch (Exception e) {
errorCounter.inc();
log.error(e.getMessage(), e);
......
......@@ -81,7 +81,7 @@ public class LogReportServiceRestHandler extends JettyHandler {
ProtoBufJsonUtils.fromJSON(it.toString(), builder);
logs.add(builder);
}
logs.forEach(logAnalyzerService::doAnalysis);
logs.forEach(it -> logAnalyzerService.doAnalysis(it, null));
} catch (final Exception e) {
log.error(e.getMessage(), e);
errorCounter.inc();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册