未验证 提交 12a18e69 编写于 作者: Y yswdqz 提交者: GitHub

Add layer property to LAL script (#9593)

上级 b3c7658a
......@@ -9,6 +9,7 @@
* Use prepareStatement in H2SQLExecutor#getByIDs.(No function change).
* Bump up snakeyaml to 1.31 for fixing CVE-2022-25857
* Fix `DurationUtils.convertToTimeBucket` missed verify date format.
* [**Breaking Change**] Change the LAL script format(Add layer property).
#### UI
......
......@@ -8,6 +8,9 @@ The LAL config files are in YAML format, and are located under directory `lal`.
set `log-analyzer/default/lalFiles` in the `application.yml` file or set environment variable `SW_LOG_LAL_FILES` to
activate specific LAL config files.
## Layer
Layer should be declared in the LAL script to represent the analysis scope of the logs.
## Filter
A filter is a group of [parser](#parser), [extractor](#extractor) and [sink](#sink). Users can use one or more filters
......
......@@ -37,9 +37,9 @@ import org.apache.skywalking.oap.log.analyzer.dsl.spec.parser.TextParserSpec;
import org.apache.skywalking.oap.log.analyzer.dsl.spec.parser.YamlParserSpec;
import org.apache.skywalking.oap.log.analyzer.dsl.spec.sink.SinkSpec;
import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
import org.apache.skywalking.oap.log.analyzer.provider.log.listener.LogAnalysisListenerFactory;
import org.apache.skywalking.oap.log.analyzer.provider.log.listener.RecordAnalysisListener;
import org.apache.skywalking.oap.log.analyzer.provider.log.listener.TrafficAnalysisListener;
import org.apache.skywalking.oap.log.analyzer.provider.log.listener.LogSinkListenerFactory;
import org.apache.skywalking.oap.log.analyzer.provider.log.listener.RecordSinkListener;
import org.apache.skywalking.oap.log.analyzer.provider.log.listener.TrafficSinkListener;
import org.apache.skywalking.oap.server.core.source.Log;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
......@@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory;
public class FilterSpec extends AbstractSpec {
private static final Logger LOGGER = LoggerFactory.getLogger(FilterSpec.class);
private final List<LogAnalysisListenerFactory> factories;
private final List<LogSinkListenerFactory> sinkListenerFactories;
private final TextParserSpec textParser;
......@@ -70,9 +70,9 @@ public class FilterSpec extends AbstractSpec {
parsedType = new TypeReference<Map<String, Object>>() {
};
factories = Arrays.asList(
new RecordAnalysisListener.Factory(moduleManager(), moduleConfig()),
new TrafficAnalysisListener.Factory(moduleManager(), moduleConfig())
sinkListenerFactories = Arrays.asList(
new RecordSinkListener.Factory(moduleManager(), moduleConfig()),
new TrafficSinkListener.Factory(moduleManager(), moduleConfig())
);
textParser = new TextParserSpec(moduleManager(), moduleConfig());
......@@ -168,17 +168,17 @@ public class FilterSpec extends AbstractSpec {
final Optional<AtomicReference<Log>> container = BINDING.get().logContainer();
if (container.isPresent()) {
factories.stream()
.map(LogAnalysisListenerFactory::create)
.filter(it -> it instanceof RecordAnalysisListener)
sinkListenerFactories.stream()
.map(LogSinkListenerFactory::create)
.filter(it -> it instanceof RecordSinkListener)
.map(it -> it.parse(logData, extraLog))
.map(it -> (RecordAnalysisListener) it)
.map(RecordAnalysisListener::getLog)
.map(it -> (RecordSinkListener) it)
.map(RecordSinkListener::getLog)
.findFirst()
.ifPresent(log -> container.get().set(log));
} else {
factories.stream()
.map(LogAnalysisListenerFactory::create)
sinkListenerFactories.stream()
.map(LogSinkListenerFactory::create)
.forEach(it -> it.parse(logData, extraLog).build());
}
}
......
......@@ -25,4 +25,6 @@ public class LALConfig {
private String name;
private String dsl;
private String layer;
}
......@@ -51,7 +51,7 @@ public class LogAnalyzerModuleConfig extends ModuleConfig {
private List<Rule> meterConfigs;
public List<String> lalFiles() {
return Splitter.on(",").omitEmptyStrings().splitToList(Strings.nullToEmpty(getLalFiles()));
return Splitter.on(",").omitEmptyStrings().trimResults().splitToList(Strings.nullToEmpty(getLalFiles()));
}
public List<Rule> malConfigs() throws ModuleStartException {
......
......@@ -19,10 +19,15 @@ package org.apache.skywalking.oap.log.analyzer.provider.log;
import java.util.List;
import org.apache.skywalking.oap.log.analyzer.provider.log.listener.LogAnalysisListenerFactory;
import org.apache.skywalking.oap.log.analyzer.provider.log.listener.LogSinkListenerFactory;
public interface ILogAnalysisListenerManager {
void addListenerFactory(LogAnalysisListenerFactory factory);
List<LogAnalysisListenerFactory> getLogAnalysisListenerFactories();
void addSinkListenerFactory(LogSinkListenerFactory factory);
List<LogSinkListenerFactory> getSinkListenerFactory();
}
......@@ -20,9 +20,13 @@ package org.apache.skywalking.oap.log.analyzer.provider.log;
import com.google.protobuf.Message;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.Layer;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
import org.apache.skywalking.oap.log.analyzer.provider.log.listener.LogAnalysisListener;
......@@ -46,26 +50,41 @@ public class LogAnalyzer {
log.debug("The log is ignored because the Service name is empty");
return;
}
createListeners();
Layer layer;
if ("".equals(builder.getLayer())) {
layer = Layer.GENERAL;
} else {
try {
layer = Layer.nameOf(builder.getLayer());
} catch (UnexpectedException e) {
log.warn("The Layer {} is not found, abandon the log.", builder.getLayer());
return;
}
}
createAnalysisListeners(layer);
if (builder.getTimestamp() == 0) {
// If no timestamp, OAP server would use the received timestamp as log's timestamp
builder.setTimestamp(System.currentTimeMillis());
}
notifyListener(builder, extraLog);
notifyListenerToBuild();
notifyAnalysisListener(builder, extraLog);
notifyAnalysisListenerToBuild();
}
private void notifyListener(LogData.Builder builder, final Message extraLog) {
private void notifyAnalysisListener(LogData.Builder builder, final Message extraLog) {
listeners.forEach(listener -> listener.parse(builder, extraLog));
}
private void notifyListenerToBuild() {
private void notifyAnalysisListenerToBuild() {
listeners.forEach(LogAnalysisListener::build);
}
private void createListeners() {
private void createAnalysisListeners(Layer layer) {
factoryManager.getLogAnalysisListenerFactories()
.forEach(factory -> listeners.add(factory.create()));
.stream()
.map(factory -> factory.create(layer))
.filter(Objects::nonNull)
.forEach(listeners::add);
}
}
......@@ -24,13 +24,15 @@ import lombok.RequiredArgsConstructor;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
import org.apache.skywalking.oap.log.analyzer.provider.log.listener.LogAnalysisListenerFactory;
import org.apache.skywalking.oap.log.analyzer.provider.log.listener.LogSinkListenerFactory;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
@RequiredArgsConstructor
public class LogAnalyzerServiceImpl implements ILogAnalyzerService, ILogAnalysisListenerManager {
private final ModuleManager moduleManager;
private final LogAnalyzerModuleConfig moduleConfig;
private final List<LogAnalysisListenerFactory> factories = new ArrayList<>();
private final List<LogAnalysisListenerFactory> analysisListenerFactories = new ArrayList<>();
private final List<LogSinkListenerFactory> sinkListenerFactories = new ArrayList<>();
@Override
public void doAnalysis(final LogData.Builder log, Message extraLog) {
......@@ -40,11 +42,21 @@ public class LogAnalyzerServiceImpl implements ILogAnalyzerService, ILogAnalysis
@Override
public void addListenerFactory(final LogAnalysisListenerFactory factory) {
factories.add(factory);
analysisListenerFactories.add(factory);
}
@Override
public List<LogAnalysisListenerFactory> getLogAnalysisListenerFactories() {
return factories;
return analysisListenerFactories;
}
@Override
public void addSinkListenerFactory(LogSinkListenerFactory factory) {
sinkListenerFactories.add(factory);
}
@Override
public List<LogSinkListenerFactory> getSinkListenerFactory() {
return sinkListenerFactories;
}
}
......@@ -25,8 +25,7 @@ import org.apache.skywalking.apm.network.logging.v3.LogData;
*/
public interface LogAnalysisListener {
/**
* The last step of the analysis process. Typically, the implementations forward the analysis results to the source
* receiver.
* The last step of the analysis process. Typically, the implementations execute corresponding DSL.
*/
void build();
......
......@@ -17,11 +17,13 @@
package org.apache.skywalking.oap.log.analyzer.provider.log.listener;
import org.apache.skywalking.oap.server.core.analysis.Layer;
/**
* LogAnalysisListenerFactory implementation creates the listener instance when required.
* Every LogAnalysisListener could have its own creation factory.
*/
public interface LogAnalysisListenerFactory {
LogAnalysisListener create();
LogAnalysisListener create(Layer layer);
}
......@@ -19,61 +19,74 @@
package org.apache.skywalking.oap.log.analyzer.provider.log.listener;
import com.google.protobuf.Message;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.oap.log.analyzer.dsl.Binding;
import org.apache.skywalking.oap.log.analyzer.dsl.DSL;
import org.apache.skywalking.oap.log.analyzer.provider.LALConfig;
import org.apache.skywalking.oap.log.analyzer.provider.LALConfigs;
import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
import org.apache.skywalking.oap.server.core.analysis.Layer;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
@Slf4j
@RequiredArgsConstructor
public class LogFilterListener implements LogAnalysisListener {
private final List<DSL> dsls;
@lombok.NonNull
private final DSL dsl;
@Override
public void build() {
dsls.forEach(dsl -> {
try {
dsl.evaluate();
} catch (final Exception e) {
log.warn("Failed to evaluate dsl: {}", dsl, e);
}
});
try {
dsl.evaluate();
} catch (final Exception e) {
log.warn("Failed to evaluate dsl: {}", dsl, e);
}
}
@Override
public LogAnalysisListener parse(final LogData.Builder logData,
final Message extraLog) {
dsls.forEach(dsl -> dsl.bind(new Binding().log(logData.build())
.extraLog(extraLog)));
dsl.bind(new Binding().log(logData.build()).extraLog(extraLog));
return this;
}
public static class Factory implements LogAnalysisListenerFactory {
private final List<DSL> dsls;
private final Map<Layer, DSL> dsls;
public Factory(final ModuleManager moduleManager, final LogAnalyzerModuleConfig config) throws Exception {
dsls = new ArrayList<>();
dsls = new HashMap<>();
final List<LALConfig> configList = LALConfigs.load(config.getLalPath(), config.lalFiles())
.stream()
.flatMap(it -> it.getRules().stream())
.collect(Collectors.toList());
for (final LALConfig c : configList) {
dsls.add(DSL.of(moduleManager, config, c.getDsl()));
Layer layer = Layer.nameOf(c.getLayer());
if (dsls.put(layer, DSL.of(moduleManager, config, c.getDsl())) != null) {
throw new ModuleStartException("Layer " + layer.name() + " has already set a rule.");
}
}
}
@Override
public LogAnalysisListener create() {
return new LogFilterListener(dsls);
public LogAnalysisListener create(Layer layer) {
if (layer == null) {
return null;
}
final DSL dsl = dsls.get(layer);
if (dsl == null) {
return null;
}
return new LogFilterListener(dsl);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.log.analyzer.provider.log.listener;
import com.google.protobuf.Message;
import org.apache.skywalking.apm.network.logging.v3.LogData;
public interface LogSinkListener {
/**
* The last step of the sink process. Typically, the implementations forward the results to the source
* receiver.
*/
void build();
/**
* Parse the raw data from the probe.
* @return {@code this} for chaining.
*/
LogSinkListener parse(LogData.Builder logData, final Message extraLog);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.log.analyzer.provider.log.listener;
/**
* LogSinkListenerFactory implementation creates the listener instance when required.
* Every LogSinkListener could have its own creation factory.
*/
public interface LogSinkListenerFactory {
LogSinkListener create();
}
......@@ -29,6 +29,7 @@ import lombok.SneakyThrows;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.apm.network.logging.v3.LogDataBody;
import org.apache.skywalking.apm.network.logging.v3.TraceContext;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagType;
import org.apache.skywalking.oap.server.core.source.TagAutocomplete;
import org.apache.skywalking.oap.server.library.util.StringUtil;
......@@ -48,10 +49,10 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager;
import static org.apache.skywalking.oap.server.library.util.ProtoBufJsonUtils.toJSON;
/**
* RecordAnalysisListener forwards the log data to the persistence layer with the query required conditions.
* RecordSinkListener forwards the log data to the persistence layer with the query required conditions.
*/
@RequiredArgsConstructor
public class RecordAnalysisListener implements LogAnalysisListener {
public class RecordSinkListener implements LogSinkListener {
private final SourceReceiver sourceReceiver;
private final NamingControl namingControl;
private final List<String> searchableTagKeys;
......@@ -66,7 +67,7 @@ public class RecordAnalysisListener implements LogAnalysisListener {
@Override
@SneakyThrows
public LogAnalysisListener parse(final LogData.Builder logData,
public LogSinkListener parse(final LogData.Builder logData,
final Message extraLog) {
LogDataBody body = logData.getBody();
log.setUniqueId(UUID.randomUUID().toString().replace("-", ""));
......@@ -142,7 +143,7 @@ public class RecordAnalysisListener implements LogAnalysisListener {
});
}
public static class Factory implements LogAnalysisListenerFactory {
public static class Factory implements LogSinkListenerFactory {
private final SourceReceiver sourceReceiver;
private final NamingControl namingControl;
private final List<String> searchableTagKeys;
......@@ -161,8 +162,8 @@ public class RecordAnalysisListener implements LogAnalysisListener {
}
@Override
public LogAnalysisListener create() {
return new RecordAnalysisListener(sourceReceiver, namingControl, searchableTagKeys);
public RecordSinkListener create() {
return new RecordSinkListener(sourceReceiver, namingControl, searchableTagKeys);
}
}
}
......@@ -40,7 +40,7 @@ import static java.util.Objects.nonNull;
* Generate service, service instance and endpoint traffic by log data.
*/
@RequiredArgsConstructor
public class TrafficAnalysisListener implements LogAnalysisListener {
public class TrafficSinkListener implements LogSinkListener {
private final SourceReceiver sourceReceiver;
private final NamingControl namingControl;
......@@ -62,7 +62,7 @@ public class TrafficAnalysisListener implements LogAnalysisListener {
}
@Override
public LogAnalysisListener parse(final LogData.Builder logData,
public LogSinkListener parse(final LogData.Builder logData,
final Message extraLog) {
Layer layer;
if (StringUtil.isNotEmpty(logData.getLayer())) {
......@@ -97,7 +97,7 @@ public class TrafficAnalysisListener implements LogAnalysisListener {
return this;
}
public static class Factory implements LogAnalysisListenerFactory {
public static class Factory implements LogSinkListenerFactory {
private final SourceReceiver sourceReceiver;
private final NamingControl namingControl;
......@@ -111,8 +111,8 @@ public class TrafficAnalysisListener implements LogAnalysisListener {
}
@Override
public LogAnalysisListener create() {
return new TrafficAnalysisListener(sourceReceiver, namingControl);
public LogSinkListener create() {
return new TrafficSinkListener(sourceReceiver, namingControl);
}
}
}
......@@ -131,7 +131,7 @@ public class DSLSecurityTest {
public void testSecurity() throws ModuleStartException {
final DSL dsl = DSL.of(manager, new LogAnalyzerModuleConfig(), script);
Whitebox.setInternalState(
Whitebox.getInternalState(dsl, "filterSpec"), "factories", Collections.emptyList()
Whitebox.getInternalState(dsl, "filterSpec"), "sinkListenerFactories", Collections.emptyList()
);
dsl.bind(new Binding().log(LogData.newBuilder()));
......
......@@ -191,7 +191,7 @@ public class DSLTest {
public void testDslStaticCompile() throws ModuleStartException {
final DSL dsl = DSL.of(manager, new LogAnalyzerModuleConfig(), script);
Whitebox.setInternalState(
Whitebox.getInternalState(dsl, "filterSpec"), "factories", Collections.emptyList()
Whitebox.getInternalState(dsl, "filterSpec"), "sinkListenerFactories", Collections.emptyList()
);
dsl.bind(new Binding().log(LogData.newBuilder().build()));
......
......@@ -16,6 +16,7 @@
# The default LAL script to save all logs, behaving like the versions before 8.5.0.
rules:
- name: default
layer: GENERAL
dsl: |
filter {
sink {
......
......@@ -15,6 +15,7 @@
rules:
- name: envoy-als
layer: MESH
dsl: |
filter {
// only collect abnormal logs (http status code >= 300, or commonProperties?.responseFlags is not empty)
......
......@@ -15,6 +15,7 @@
rules:
- name: example
layer: GENERAL
dsl: |
filter {
text {
......
......@@ -15,6 +15,7 @@
rules:
- name: example
layer: GENERAL
dsl: |
filter {
text {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册