未验证 提交 96f1c121 编写于 作者: Z Zhenxu Ke 提交者: GitHub

Introduce log analysis language (LAL) (#6388)

上级 1d4ff667
...@@ -27,6 +27,7 @@ Release Notes. ...@@ -27,6 +27,7 @@ Release Notes.
* Storage plugin supports postgresql. * Storage plugin supports postgresql.
* Fix kubernetes.client.opeanapi.ApiException. * Fix kubernetes.client.opeanapi.ApiException.
* Remove filename suffix in the meter active file config. * Remove filename suffix in the meter active file config.
* Introduce log analysis language (LAL).
#### UI #### UI
* Update selector scroller to show in all pages. * Update selector scroller to show in all pages.
......
...@@ -40,7 +40,13 @@ ...@@ -40,7 +40,13 @@
<includes> <includes>
<include>log4j2.xml</include> <include>log4j2.xml</include>
<include>alarm-settings.yml</include> <include>alarm-settings.yml</include>
<include>alarm-settings-sample.yml</include> </includes>
</fileSet>
<fileSet>
<directory>${project.basedir}/../dist-material</directory>
<outputDirectory>/</outputDirectory>
<includes>
<include>config-examples/*</include>
</includes> </includes>
</fileSet> </fileSet>
<fileSet> <fileSet>
...@@ -59,6 +65,8 @@ ...@@ -59,6 +65,8 @@
<include>zabbix-rules/*.yaml</include> <include>zabbix-rules/*.yaml</include>
<include>otel-oc-rules/*</include> <include>otel-oc-rules/*</include>
<include>ui-initialized-templates/*</include> <include>ui-initialized-templates/*</include>
<include>lal/*</include>
<include>log-mal-rules/*</include>
</includes> </includes>
<outputDirectory>/config</outputDirectory> <outputDirectory>/config</outputDirectory>
</fileSet> </fileSet>
......
...@@ -40,7 +40,13 @@ ...@@ -40,7 +40,13 @@
<includes> <includes>
<include>log4j2.xml</include> <include>log4j2.xml</include>
<include>alarm-settings.yml</include> <include>alarm-settings.yml</include>
<include>alarm-settings-sample.yml</include> </includes>
</fileSet>
<fileSet>
<directory>${project.basedir}/../dist-material</directory>
<outputDirectory>/</outputDirectory>
<includes>
<include>config-examples/*</include>
</includes> </includes>
</fileSet> </fileSet>
<fileSet> <fileSet>
...@@ -59,6 +65,8 @@ ...@@ -59,6 +65,8 @@
<include>zabbix-rules/*.yaml</include> <include>zabbix-rules/*.yaml</include>
<include>otel-oc-rules/*</include> <include>otel-oc-rules/*</include>
<include>ui-initialized-templates/*</include> <include>ui-initialized-templates/*</include>
<include>lal/*</include>
<include>log-mal-rules/*</include>
</includes> </includes>
<outputDirectory>/config</outputDirectory> <outputDirectory>/config</outputDirectory>
</fileSet> </fileSet>
......
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Example config file of path: config/lal/config.yaml
rules:
- name: example
dsl: |
filter {
if (log.service == "TestService") {
abort {}
}
text {
if (!regexp($/(?s)(?<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}.\d{3}) \[TID:(?<tid>.+?)] \[(?<thread>.+?)] (?<level>\w{4,}) (?<logger>.{1,36}) (?<msg>.+)/$)) {
abort {}
}
}
extractor {
metrics {
timestamp log.timestamp
labels level: parsed.level, service: log.service, instance: log.serviceInstance
name "log_count"
value 1
}
}
sink {
sampler {
if (log.service == "ImportantApp") {
rateLimit("ImportantAppSampler") {
qps 300
}
} else {
rateLimit("OtherSampler") {
qps 30
}
}
}
}
}
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This will parse a textual representation of a duration. The formats
# accepted are based on the ISO-8601 duration format {@code PnDTnHnMn.nS}
# with days considered to be exactly 24 hours.
# <p>
# Examples:
# <pre>
# "PT20.345S" -- parses as "20.345 seconds"
# "PT15M" -- parses as "15 minutes" (where a minute is 60 seconds)
# "PT10H" -- parses as "10 hours" (where an hour is 3600 seconds)
# "P2D" -- parses as "2 days" (where a day is 24 hours or 86400 seconds)
# "P2DT3H4M" -- parses as "2 days, 3 hours and 4 minutes"
# "P-6H3M" -- parses as "-6 hours and +3 minutes"
# "-P6H3M" -- parses as "-6 hours and -3 minutes"
# "-P-6H+3M" -- parses as "+6 hours and -3 minutes"
# </pre>
# Example config file of path: config/log-mal-rules/config.yaml
expSuffix: instance(['service'], ['instance'])
metricPrefix: log
metricsRules:
- name: count_info
exp: log_count.tagEqual('level', 'INFO').sum(['service', 'instance'])
# Log Analysis Language
Log Analysis Language (LAL) in SkyWalking is essentially a Domain-Specific Language (DSL) to analyze logs. You can use
LAL to parse, extract, and save the logs, as well as collaborate the logs with traces (by extracting the trace id,
segment id and span id) and metrics (by generating metrics from the logs and send them to the meter system).
The LAL config files are in YAML format, and are located under directory `lal`, you can
set `log-analyzer/default/lalFiles` in the `application.yml` file or set environment variable `SW_LOG_LAL_FILES` to
activate specific LAL config files.
## Filter
A filter is a group of [parser](#parser), [extractor](#extractor) and [sink](#sink). Users can use one or more filters
to organize their processing logics. Every piece of log will be sent to all filters in an LAL rule. The piece of log
sent into the filter is available as property `log` in the LAL, therefore you can access the log service name
via `log.service`, for all available fields of `log`, please refer to [the protocol definition](https://github.com/apache/skywalking-data-collect-protocol/blob/master/logging/Logging.proto#L41).
All components are executed sequentially in the orders they are declared.
### Global Functions
There are functions globally available that you can use them in all components (i.e. parsers, extractors, and sinks)
when needed.
- `abort`
By default, all components declared are executed no matter what flags (`dropped`, `saved`, etc.) have been set. There
are cases where you may want the filter chain to stop earlier when specified conditions are met. `abort` function aborts
the remaining filter chain from where it's declared, all the remaining components won't be executed at all.
`abort` function serves as a fast-fail mechanism in LAL.
```groovy
filter {
if (log.service == "TestingService") { // Don't waste resources on TestingServices
abort {} // all remaining components won't be executed at all
}
text {
if (!regexp("(?<timestamp>\\d{8}) (?<thread>\\w+) (?<level>\\w+) (?<traceId>\\w+) (?<msg>.+)")) {
// if the logs don't match this regexp, skip it
abort {}
}
}
// ... extractors, sinks
}
```
Note that when you put `regexp` in an `if` statement, you need to surround the expression with `()`
like `regexp(<the expression>)`, instead of `regexp <the expression>`.
### Parser
Parsers are responsible for parsing the raw logs into structured data in SkyWalking for further processing. There are 3
types of parsers at the moment, namely `json`, `yaml`, and `text`.
When a piece of log is parsed, there is a corresponding property available, called `parsed`, injected by LAL.
Property `parsed` is typically a map, containing all the fields parsed from the raw logs, for example, if the parser
is `json` / `yaml`, `parsed` is a map containing all the key-values in the `json` / `yaml`, if the parser is `text`
, `parsed` is a map containing all the captured groups and their values (for `regexp` and `grok`). See examples below.
#### `json`
<!-- TODO: is structured in the reported (gRPC) `LogData`, not much to do -->
#### `yaml`
<!-- TODO: is structured in the reported (gRPC) `LogData`, not much to do -->
#### `text`
For unstructured logs, there are some `text` parsers for use.
- `regexp`
`regexp` parser uses a regular expression (`regexp`) to parse the logs. It leverages the captured groups of the regexp,
all the captured groups can be used later in the extractors or sinks.
`regexp` returns a `boolean` indicating whether the log matches the pattern or not.
```groovy
filter {
text {
regexp "(?<timestamp>\\d{8}) (?<thread>\\w+) (?<level>\\w+) (?<traceId>\\w+) (?<msg>.+)"
// this is just a demo pattern
}
extractor {
tag level: parsed.level
// we add a tag called `level` and its value is parsed.level, captured from the regexp above
traceId parsed.traceId
// we also extract the trace id from the parsed result, which will be used to associate the log with the trace
}
// ...
}
```
- `grok`
<!-- TODO: grok Java library has poor performance, need to benchmark it, the idea is basically the same with `regexp` above -->
### Extractor
Extractors aim to extract metadata from the logs. The metadata can be a service name, a service instance name, an
endpoint name, or even a trace ID, all of which can be associated with the existing traces and metrics.
- `service`
`service` extracts the service name from the `parsed` result, and set it into the `LogData`, which will be persisted (if
not dropped) and is used to associate with traces / metrics.
- `instance`
`instance` extracts the service instance name from the `parsed` result, and set it into the `LogData`, which will be
persisted (if not dropped) and is used to associate with traces / metrics.
- `endpoint`
`endpoint` extracts the service instance name from the `parsed` result, and set it into the `LogData`, which will be
persisted (if not dropped) and is used to associate with traces / metrics.
- `traceId`
`traceId` extracts the trace ID from the `parsed` result, and set it into the `LogData`, which will be persisted (if not
dropped) and is used to associate with traces / metrics.
- `segmentId`
`segmentId` extracts the segment ID from the `parsed` result, and set it into the `LogData`, which will be persisted (if
not dropped) and is used to associate with traces / metrics.
- `spanId`
`spanId` extracts the span ID from the `parsed` result, and set it into the `LogData`, which will be persisted (if not
dropped) and is used to associate with traces / metrics.
- `timestamp`
`timestamp` extracts the timestamp from the `parsed` result, and set it into the `LogData`, which will be persisted (if
not dropped) and is used to associate with traces / metrics.
The unit of `timestamp` is millisecond.
- `tag`
`tag` extracts the tags from the `parsed` result, and set them into the `LogData`. The form of this extractor is
something like `tag key1: value, key2: value2`, you can use the properties of `parsed` as both keys and values.
```groovy
filter {
// ... parser
extractor {
tag level: parsed.level, (parsed.statusCode): parsed.statusMsg
tag anotherKey: "anotherConstantValue"
}
}
```
- `metrics`
`metrics` extracts / generates metrics from the logs, and sends the generated metrics to the meter system, you can
configure [MAL](mal.md) for further analysis of these metrics. The dedicated MAL config files are under
directory `log-mal-rules`, you can set `log-analyzer/default/malFiles` to enable configured files.
```yaml
# application.yml
# ...
log-analyzer:
selector: ${SW_LOG_ANALYZER:default}
default:
lalFiles: ${SW_LOG_LAL_FILES:my-lal-config} # files are under "lal" directory
malFiles: ${SW_LOG_MAL_FILES:my-lal-mal-config,another-lal-mal-config} # files are under "log-mal-rules" directory
```
Examples are as follows:
```groovy
filter {
// ...
extractor {
service parsed.serviceName
metrics {
name "log_count"
timestamp parsed.timestamp
labels level: parsed.level, service: parsed.service, instance: parsed.instance
value 1
}
metrics {
name "http_response_time"
timestamp parsed.timestamp
labels status_code: parsed.statusCode, service: parsed.service, instance: parsed.instance
value parsed.duration
}
}
// ...
}
```
The extractor above generates a metrics named `log_count`, with tag key `level` and value `1`, after this, you can
configure MAL rules to calculate the log count grouping by logging level like this:
```yaml
# ... other configurations of MAL
metrics:
- name: log_count_debug
exp: log_count.tagEqual('level', 'DEBUG').sum(['service', 'instance']).increase('PT1M')
- name: log_count_error
exp: log_count.tagEqual('level', 'ERROR').sum(['service', 'instance']).increase('PT1M')
```
The other metrics generated is `http_response_time`, so that you can configure MAL rules to generate more useful metrics
like percentiles.
```yaml
# ... other configurations of MAL
metrics:
- name: response_time_percentile
exp: http_response_time.sum(['le', 'service', 'instance']).increase('PT5M').histogram().histogram_percentile([50,70,90,99])
```
### Sink
Sinks are the persistent layer of the LAL. By default, all the logs of each filter are persisted into the storage.
However, there are some mechanisms that allow you to selectively save some logs, or even drop all the logs after you've
extracted useful information, such as metrics.
#### Sampler
Sampler allows you to save the logs in a sampling manner. Currently, sampling strategy `rateLimit` is supported, welcome
to contribute more sampling strategies. If multiple samplers are specified, the last one determines the final sampling
result, see examples in [Enforcer](#enforcer).
`rateLimit` samples `n` logs at most in 1 second. `rateLimit("SamplerID")` requires an ID for the sampler, sampler
declarations with the same ID share the same sampler instance, and thus share the same `qps`, resetting logics.
Examples:
```groovy
filter {
// ... parser
sink {
sampler {
if (parsed.service == "ImportantApp") {
rateLimit("ImportantAppSampler") {
qps 30 // samples 30 pieces of logs every second for service "ImportantApp"
}
} else {
rateLimit("OtherSampler") {
qps 3 // samples 3 pieces of logs every second for other services than "ImportantApp"
}
}
}
}
}
```
#### Dropper
Dropper is a special sink, meaning that all the logs are dropped without any exception. This is useful when you want to
drop debugging logs,
```groovy
filter {
// ... parser
sink {
if (parsed.level == "DEBUG") {
dropper {}
} else {
sampler {
// ... configs
}
}
}
}
```
or you have multiple filters, some of which are for extracting metrics, only one of them needs to be persisted.
```groovy
filter { // filter A: this is for persistence
// ... parser
sink {
sampler {
// .. sampler configs
}
}
}
filter { // filter B:
// ... extractors to generate many metrics
extractors {
metrics {
// ... metrics
}
}
sink {
dropper {} // drop all logs because they have been saved in "filter A" above.
}
}
```
#### Enforcer
Enforcer is another special sink that forcibly samples the log, a typical use case of enforcer is when you have
configured a sampler and want to save some logs forcibly, for example, to save error logs even if the sampling mechanism
is configured.
```groovy
filter {
// ... parser
sink {
sampler {
// ... sampler configs
}
if (parserd.level == "ERROR" || parsed.userId == "TestingUserId") { // sample error logs or testing users' logs (userId == "TestingUserId") even if the sampling strategy is configured
enforcer {
}
}
}
}
```
You can use `enforcer` and `dropper` to simulate a probabilistic sampler like this.
```groovy
filter {
// ... parser
sink {
sampler { // simulate a probabilistic sampler with sampler rate 30% (not accurate though)
if (Math.abs(Math.random()) > 0.3) {
enforcer {}
} else {
dropper {}
}
}
}
}
```
...@@ -33,6 +33,15 @@ ...@@ -33,6 +33,15 @@
<artifactId>server-core</artifactId> <artifactId>server-core</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>meter-analyzer</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.log.analyzer.dsl;
import java.util.Map;
import java.util.regex.Matcher;
import lombok.Getter;
import org.apache.skywalking.apm.network.logging.v3.LogData;
/**
* The binding bridge between OAP and the DSL, which provides some convenient methods to ease the use of the raw {@link groovy.lang.Binding#setProperty(java.lang.String, java.lang.Object)} and {@link
* groovy.lang.Binding#getProperty(java.lang.String)}.
*/
public class Binding extends groovy.lang.Binding {
public static final String KEY_LOG = "log";
public static final String KEY_PARSED = "parsed";
public static final String KEY_SAVE = "save";
public static final String KEY_ABORT = "abort";
public Binding() {
setProperty(KEY_PARSED, new Parsed());
}
public Binding log(final LogData.Builder log) {
setProperty(KEY_LOG, log);
setProperty(KEY_SAVE, true);
setProperty(KEY_ABORT, false);
return this;
}
public Binding log(final LogData log) {
return log(log.toBuilder());
}
public LogData.Builder log() {
return (LogData.Builder) getProperty(KEY_LOG);
}
public Binding parsed(final Matcher parsed) {
parsed().matcher = parsed;
return this;
}
public Binding parsed(final Map<String, Object> parsed) {
parsed().map = parsed;
return this;
}
public Parsed parsed() {
return (Parsed) getProperty(KEY_PARSED);
}
public Binding save() {
setProperty(KEY_SAVE, true);
return this;
}
public Binding drop() {
setProperty(KEY_SAVE, false);
return this;
}
public boolean shouldSave() {
return (boolean) getProperty(KEY_SAVE);
}
public Binding abort() {
setProperty(KEY_ABORT, true);
return this;
}
public boolean shouldAbort() {
return (boolean) getProperty(KEY_ABORT);
}
public static class Parsed {
@Getter
private Matcher matcher;
@Getter
private Map<String, Object> map;
public Object getAt(final String key) {
if (matcher != null) {
return matcher.group(key);
}
if (map != null) {
return map.get(key);
}
return null;
}
@SuppressWarnings("unused")
public Object propertyMissing(final String name) {
return getAt(name);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.log.analyzer.dsl;
import groovy.lang.GroovyShell;
import groovy.util.DelegatingScript;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.oap.log.analyzer.dsl.spec.filter.FilterSpec;
import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.codehaus.groovy.control.CompilerConfiguration;
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public class DSL {
private final DelegatingScript script;
private final FilterSpec filterSpec;
public static DSL of(final ModuleManager moduleManager,
final LogAnalyzerModuleConfig config,
final String dsl) throws ModuleStartException {
final CompilerConfiguration cc = new CompilerConfiguration();
cc.setScriptBaseClass(DelegatingScript.class.getName());
final GroovyShell sh = new GroovyShell(cc);
final DelegatingScript script = (DelegatingScript) sh.parse(dsl);
final FilterSpec filterSpec = new FilterSpec(moduleManager, config);
script.setDelegate(filterSpec);
return new DSL(script, filterSpec);
}
public void bind(final Binding binding) {
this.filterSpec.bind(binding);
}
public void evaluate() {
script.run();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.log.analyzer.dsl.spec;
import groovy.lang.Closure;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Accessors;
import org.apache.skywalking.oap.log.analyzer.dsl.Binding;
import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
@Getter
@RequiredArgsConstructor
@Accessors(fluent = true)
public abstract class AbstractSpec {
private final ModuleManager moduleManager;
private final LogAnalyzerModuleConfig moduleConfig;
protected static final ThreadLocal<Binding> BINDING = ThreadLocal.withInitial(Binding::new);
public void bind(final Binding b) {
BINDING.set(b);
}
@SuppressWarnings("unused")
public void abort(final Closure<Void> cl) {
BINDING.get().abort();
}
@SuppressWarnings("unused")
public Object propertyMissing(final String name) {
return BINDING.get().getVariable(name);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.log.analyzer.dsl.spec.extractor;
import com.google.common.collect.ImmutableMap;
import groovy.lang.Closure;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import lombok.experimental.Delegate;
import org.apache.commons.lang3.StringUtils;
import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.apm.network.logging.v3.TraceContext;
import org.apache.skywalking.oap.log.analyzer.dsl.spec.AbstractSpec;
import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
import org.apache.skywalking.oap.meter.analyzer.MetricConvert;
import org.apache.skywalking.oap.meter.analyzer.dsl.Sample;
import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily;
import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamilyBuilder;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import static java.util.Objects.nonNull;
import static org.apache.skywalking.apm.util.StringUtil.isNotBlank;
public class ExtractorSpec extends AbstractSpec {
private final List<MetricConvert> metricConverts;
public ExtractorSpec(final ModuleManager moduleManager,
final LogAnalyzerModuleConfig moduleConfig) throws ModuleStartException {
super(moduleManager, moduleConfig);
final MeterSystem meterSystem = moduleManager.find(CoreModule.NAME).provider().getService(MeterSystem.class);
metricConverts = moduleConfig.malConfigs()
.stream()
.map(it -> new MetricConvert(it, meterSystem))
.collect(Collectors.toList());
}
@SuppressWarnings("unused")
public void service(final String service) {
if (BINDING.get().shouldAbort()) {
return;
}
if (nonNull(service)) {
BINDING.get().log().setService(service);
}
}
@SuppressWarnings("unused")
public void instance(final String instance) {
if (BINDING.get().shouldAbort()) {
return;
}
if (nonNull(instance)) {
BINDING.get().log().setServiceInstance(instance);
}
}
@SuppressWarnings("unused")
public void endpoint(final String endpoint) {
if (BINDING.get().shouldAbort()) {
return;
}
if (nonNull(endpoint)) {
BINDING.get().log().setEndpoint(endpoint);
}
}
@SuppressWarnings("unused")
public void tag(final Map<String, Object> kv) {
if (BINDING.get().shouldAbort()) {
return;
}
if (CollectionUtils.isEmpty(kv)) {
return;
}
final LogData.Builder logData = BINDING.get().log();
logData.setTags(
logData.getTags()
.toBuilder()
.addAllData(
kv.entrySet()
.stream()
.filter(it -> isNotBlank(it.getKey()))
.filter(it -> nonNull(it.getValue()) && isNotBlank(Objects.toString(it.getValue())))
.map(it -> KeyStringValuePair.newBuilder().setKey(it.getKey()).setValue(Objects.toString(it.getValue())).build())
.collect(Collectors.toList())
)
);
BINDING.get().log(logData);
}
@SuppressWarnings("unused")
public void traceId(final String traceId) {
if (BINDING.get().shouldAbort()) {
return;
}
if (nonNull(traceId)) {
final LogData.Builder logData = BINDING.get().log();
final TraceContext.Builder traceContext = logData.getTraceContext().toBuilder();
traceContext.setTraceId(traceId);
logData.setTraceContext(traceContext);
}
}
@SuppressWarnings("unused")
public void segmentId(final String segmentId) {
if (BINDING.get().shouldAbort()) {
return;
}
if (nonNull(segmentId)) {
final LogData.Builder logData = BINDING.get().log();
final TraceContext.Builder traceContext = logData.getTraceContext().toBuilder();
traceContext.setTraceSegmentId(segmentId);
logData.setTraceContext(traceContext);
}
}
@SuppressWarnings("unused")
public void spanId(final String spanId) {
if (BINDING.get().shouldAbort()) {
return;
}
if (nonNull(spanId)) {
final LogData.Builder logData = BINDING.get().log();
final TraceContext.Builder traceContext = logData.getTraceContext().toBuilder();
traceContext.setSpanId(Integer.parseInt(spanId));
logData.setTraceContext(traceContext);
}
}
@SuppressWarnings("unused")
public void timestamp(final String timestamp) {
if (BINDING.get().shouldAbort()) {
return;
}
if (nonNull(timestamp) && StringUtils.isNumeric(timestamp)) {
BINDING.get().log().setTimestamp(Long.parseLong(timestamp));
}
}
@SuppressWarnings("unused")
public void metrics(final Closure<Void> cl) {
if (BINDING.get().shouldAbort()) {
return;
}
final SampleBuilder builder = new SampleBuilder();
cl.setDelegate(builder);
cl.call();
final Sample sample = builder.build();
metricConverts.forEach(it -> it.toMeter(
ImmutableMap.<String, SampleFamily>builder()
.put(sample.getName(), SampleFamilyBuilder.newBuilder(sample).build())
.build()
));
}
public static class SampleBuilder {
@Delegate
private final Sample.SampleBuilder sampleBuilder = Sample.builder();
@SuppressWarnings("unused")
public Sample.SampleBuilder labels(final Map<String, String> labels) {
final Map<String, String> filtered = labels.entrySet()
.stream()
.filter(it -> isNotBlank(it.getKey()) && isNotBlank(it.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return sampleBuilder.labels(ImmutableMap.copyOf(filtered));
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.log.analyzer.dsl.spec.filter;
import com.google.gson.reflect.TypeToken;
import com.google.protobuf.TextFormat;
import groovy.lang.Closure;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.oap.log.analyzer.dsl.Binding;
import org.apache.skywalking.oap.log.analyzer.dsl.spec.AbstractSpec;
import org.apache.skywalking.oap.log.analyzer.dsl.spec.extractor.ExtractorSpec;
import org.apache.skywalking.oap.log.analyzer.dsl.spec.parser.JsonParserSpec;
import org.apache.skywalking.oap.log.analyzer.dsl.spec.parser.TextParserSpec;
import org.apache.skywalking.oap.log.analyzer.dsl.spec.parser.YamlParserSpec;
import org.apache.skywalking.oap.log.analyzer.dsl.spec.sink.SinkSpec;
import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
import org.apache.skywalking.oap.log.analyzer.provider.log.listener.LogAnalysisListenerFactory;
import org.apache.skywalking.oap.log.analyzer.provider.log.listener.RecordAnalysisListener;
import org.apache.skywalking.oap.log.analyzer.provider.log.listener.TrafficAnalysisListener;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FilterSpec extends AbstractSpec {
private static final Logger LOGGER = LoggerFactory.getLogger(FilterSpec.class);
private final List<LogAnalysisListenerFactory> factories;
private final TextParserSpec textParser;
private final JsonParserSpec jsonParser;
private final YamlParserSpec yamlParser;
private final ExtractorSpec extractor;
private final SinkSpec sink;
private final Type parsedType;
public FilterSpec(final ModuleManager moduleManager,
final LogAnalyzerModuleConfig moduleConfig) throws ModuleStartException {
super(moduleManager, moduleConfig);
parsedType = new TypeToken<Map<String, Object>>() {
}.getType();
factories = Arrays.asList(
new RecordAnalysisListener.Factory(moduleManager(), moduleConfig()),
new TrafficAnalysisListener.Factory(moduleManager(), moduleConfig())
);
textParser = new TextParserSpec(moduleManager(), moduleConfig());
jsonParser = new JsonParserSpec(moduleManager(), moduleConfig());
yamlParser = new YamlParserSpec(moduleManager(), moduleConfig());
extractor = new ExtractorSpec(moduleManager(), moduleConfig());
sink = new SinkSpec(moduleManager(), moduleConfig());
}
@SuppressWarnings("unused")
public void text(final Closure<Void> cl) {
if (BINDING.get().shouldAbort()) {
return;
}
cl.setDelegate(textParser);
cl.call();
}
@SuppressWarnings("unused")
public void json(final Closure<Void> cl) {
if (BINDING.get().shouldAbort()) {
return;
}
cl.setDelegate(jsonParser);
cl.call();
final LogData.Builder logData = BINDING.get().log();
final Map<String, Object> parsed = jsonParser.create().fromJson(
logData.getBody().getJson().getJson(), parsedType
);
BINDING.get().parsed(parsed);
}
@SuppressWarnings({"unused", "unchecked"})
public void yaml(final Closure<Void> cl) {
if (BINDING.get().shouldAbort()) {
return;
}
cl.setDelegate(yamlParser);
cl.call();
final LogData.Builder logData = BINDING.get().log();
final Map<String, Object> parsed = (Map<String, Object>) yamlParser.create().load(
logData.getBody().getYaml().getYaml()
);
BINDING.get().parsed(parsed);
}
@SuppressWarnings("unused")
public void extractor(final Closure<Void> cl) {
if (BINDING.get().shouldAbort()) {
return;
}
cl.setDelegate(extractor);
cl.call();
}
@SuppressWarnings("unused")
public void sink(final Closure<Void> cl) {
if (BINDING.get().shouldAbort()) {
return;
}
cl.setDelegate(sink);
cl.call();
final Binding b = BINDING.get();
final LogData.Builder logData = b.log();
if (!b.shouldSave()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Log is dropped: {}", TextFormat.shortDebugString(logData));
}
return;
}
factories.stream()
.map(LogAnalysisListenerFactory::create)
.forEach(it -> it.parse(logData).build());
}
@SuppressWarnings("unused")
public void filter(final Closure<Void> cl) {
cl.call();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.log.analyzer.dsl.spec.parser;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.skywalking.oap.log.analyzer.dsl.spec.AbstractSpec;
import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
public class JsonParserSpec extends AbstractSpec {
private final GsonBuilder gsonBuilder;
public JsonParserSpec(final ModuleManager moduleManager,
final LogAnalyzerModuleConfig moduleConfig) {
super(moduleManager, moduleConfig);
gsonBuilder = new GsonBuilder();
}
public Gson create() {
return gsonBuilder.create();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.log.analyzer.dsl.spec.parser;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.oap.log.analyzer.dsl.spec.AbstractSpec;
import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
public class TextParserSpec extends AbstractSpec {
public TextParserSpec(final ModuleManager moduleManager,
final LogAnalyzerModuleConfig moduleConfig) {
super(moduleManager, moduleConfig);
}
@SuppressWarnings("unused")
public boolean regexp(final String regexp) {
return regexp(Pattern.compile(regexp));
}
public boolean regexp(final Pattern pattern) {
if (BINDING.get().shouldAbort()) {
return false;
}
final LogData.Builder log = BINDING.get().log();
final Matcher matcher = pattern.matcher(log.getBody().getText().getText());
final boolean matched = matcher.find();
if (matched) {
BINDING.get().parsed(matcher);
}
return matched;
}
public boolean grok(final String grok) {
// TODO
return false;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.log.analyzer.dsl.spec.parser;
import org.apache.skywalking.oap.log.analyzer.dsl.spec.AbstractSpec;
import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.yaml.snakeyaml.DumperOptions;
import org.yaml.snakeyaml.LoaderOptions;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.SafeConstructor;
import org.yaml.snakeyaml.representer.Representer;
public class YamlParserSpec extends AbstractSpec {
private final LoaderOptions loaderOptions;
public YamlParserSpec(final ModuleManager moduleManager,
final LogAnalyzerModuleConfig moduleConfig) {
super(moduleManager, moduleConfig);
loaderOptions = new LoaderOptions();
}
public Yaml create() {
return new Yaml(new SafeConstructor(), new Representer(), new DumperOptions(), loaderOptions);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.log.analyzer.dsl.spec.sink;
import groovy.lang.Closure;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.skywalking.oap.log.analyzer.dsl.spec.AbstractSpec;
import org.apache.skywalking.oap.log.analyzer.dsl.spec.sink.sampler.RateLimitingSampler;
import org.apache.skywalking.oap.log.analyzer.dsl.spec.sink.sampler.Sampler;
import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SamplerSpec extends AbstractSpec {
private static final Logger LOGGER = LoggerFactory.getLogger(SamplerSpec.class);
private final Map<String, Sampler> samplers;
private final RateLimitingSampler.ResetHandler rlsResetHandler;
public SamplerSpec(final ModuleManager moduleManager,
final LogAnalyzerModuleConfig moduleConfig) {
super(moduleManager, moduleConfig);
samplers = new ConcurrentHashMap<>();
rlsResetHandler = new RateLimitingSampler.ResetHandler();
}
@SuppressWarnings("unused")
public void rateLimit(final String id, final Closure<Void> cl) {
if (BINDING.get().shouldAbort()) {
return;
}
final RateLimitingSampler newSampler = new RateLimitingSampler(rlsResetHandler);
cl.setDelegate(newSampler);
cl.call();
final Sampler sampler = samplers.computeIfAbsent(id, $ -> Sampler.NOOP);
if (Objects.equals(sampler, newSampler)) { // Unchanged
sampleWith(sampler);
return;
}
try {
sampler.close();
} catch (final Exception e) {
LOGGER.error("Failed to cancel old sampler: {}", sampler, e);
}
samplers.put(id, newSampler.start());
sampleWith(newSampler);
}
private void sampleWith(final Sampler sampler) {
if (BINDING.get().shouldAbort()) {
return;
}
if (sampler.sample()) {
BINDING.get().save();
} else {
BINDING.get().drop();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.log.analyzer.dsl.spec.sink;
import groovy.lang.Closure;
import org.apache.skywalking.oap.log.analyzer.dsl.spec.AbstractSpec;
import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
public class SinkSpec extends AbstractSpec {
private final SamplerSpec sampler;
public SinkSpec(final ModuleManager moduleManager,
final LogAnalyzerModuleConfig moduleConfig) {
super(moduleManager, moduleConfig);
sampler = new SamplerSpec(moduleManager(), moduleConfig());
}
@SuppressWarnings("unused")
public void sampler(final Closure<Void> cl) {
if (BINDING.get().shouldAbort()) {
return;
}
cl.setDelegate(sampler);
cl.call();
}
@SuppressWarnings("unused")
public void enforcer(final Closure<Void> cl) {
if (BINDING.get().shouldAbort()) {
return;
}
BINDING.get().save();
}
@SuppressWarnings("unused")
public void dropper(final Closure<Void> cl) {
if (BINDING.get().shouldAbort()) {
return;
}
BINDING.get().drop();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.log.analyzer.dsl.spec.sink.sampler;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
@Accessors(fluent = true)
@EqualsAndHashCode(of = {"qps"})
public class RateLimitingSampler implements Sampler {
@Getter
@Setter
private volatile int qps;
private final AtomicInteger factor = new AtomicInteger();
private final ResetHandler resetHandler;
public RateLimitingSampler(final ResetHandler resetHandler) {
this.resetHandler = resetHandler;
}
@Override
public RateLimitingSampler start() {
resetHandler.start(this);
return this;
}
@Override
public void close() {
resetHandler.close(this);
}
@Override
public boolean sample() {
return factor.getAndIncrement() < qps;
}
@Override
public RateLimitingSampler reset() {
factor.set(0);
return this;
}
@Slf4j
public static class ResetHandler {
private final Set<Sampler> samplers = new HashSet<>();
private volatile ScheduledFuture<?> future;
private volatile boolean started = false;
private synchronized void start(final Sampler sampler) {
samplers.add(sampler);
if (!started) {
future = Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(this::reset, 1, 1, TimeUnit.SECONDS);
started = true;
}
}
private synchronized void close(final Sampler sampler) {
samplers.remove(sampler);
if (samplers.isEmpty() && future != null) {
future.cancel(true);
started = false;
}
}
private synchronized void reset() {
samplers.forEach(sampler -> {
try {
sampler.reset();
} catch (final Exception e) {
log.error("Failed to reset sampler {}.", sampler, e);
}
});
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.log.analyzer.dsl.spec.sink.sampler;
public interface Sampler extends AutoCloseable {
Sampler NOOP = new Sampler() {
@Override
public boolean sample() {
return false;
}
@Override
public void close() {
}
};
boolean sample();
default Sampler start() {
return this;
}
default Sampler reset() {
return this;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.log.analyzer.provider;
import lombok.Data;
@Data
public class LALConfig {
private String name;
private String dsl;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.log.analyzer.provider;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.util.ResourceUtils;
import org.yaml.snakeyaml.Yaml;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.io.Files.getNameWithoutExtension;
import static org.apache.skywalking.apm.util.StringUtil.isNotBlank;
import static org.apache.skywalking.oap.server.library.util.CollectionUtils.isEmpty;
@Data
@Slf4j
public class LALConfigs {
private List<LALConfig> rules;
public static List<LALConfigs> load(final String path, final List<String> files) throws Exception {
if (isEmpty(files)) {
return Collections.emptyList();
}
checkArgument(isNotBlank(path), "path cannot be blank");
try {
final File[] rules = ResourceUtils.getPathFiles(path);
return Arrays.stream(rules)
.filter(File::isFile)
.filter(it -> {
//noinspection UnstableApiUsage
return files.contains(getNameWithoutExtension(it.getName()));
})
.map(f -> {
try (final Reader r = new FileReader(f)) {
return new Yaml().loadAs(r, LALConfigs.class);
} catch (IOException e) {
log.debug("Failed to read file {}", f, e);
}
return null;
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
} catch (FileNotFoundException e) {
throw new ModuleStartException("Failed to load LAL config rules", e);
}
}
}
...@@ -17,7 +17,52 @@ ...@@ -17,7 +17,52 @@
package org.apache.skywalking.oap.log.analyzer.provider; package org.apache.skywalking.oap.log.analyzer.provider;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rule;
import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rules;
import org.apache.skywalking.oap.server.library.module.ModuleConfig; import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import static java.util.Objects.nonNull;
@EqualsAndHashCode(callSuper = false)
public class LogAnalyzerModuleConfig extends ModuleConfig { public class LogAnalyzerModuleConfig extends ModuleConfig {
@Getter
@Setter
private String lalPath = "lal";
@Getter
@Setter
private String malPath = "log-mal-rules";
@Getter
@Setter
private String lalFiles = "default.yaml";
@Getter
@Setter
private String malFiles;
private List<Rule> meterConfigs;
public List<String> lalFiles() {
return Splitter.on(",").omitEmptyStrings().splitToList(Strings.nullToEmpty(getLalFiles()));
}
public List<Rule> malConfigs() throws ModuleStartException {
if (nonNull(meterConfigs)) {
return meterConfigs;
}
final List<String> files = Splitter.on(",")
.omitEmptyStrings()
.splitToList(Strings.nullToEmpty(getMalFiles()));
meterConfigs = Rules.loadRules(getMalPath(), files);
return meterConfigs;
}
} }
...@@ -20,8 +20,7 @@ package org.apache.skywalking.oap.log.analyzer.provider; ...@@ -20,8 +20,7 @@ package org.apache.skywalking.oap.log.analyzer.provider;
import org.apache.skywalking.oap.log.analyzer.module.LogAnalyzerModule; import org.apache.skywalking.oap.log.analyzer.module.LogAnalyzerModule;
import org.apache.skywalking.oap.log.analyzer.provider.log.ILogAnalyzerService; import org.apache.skywalking.oap.log.analyzer.provider.log.ILogAnalyzerService;
import org.apache.skywalking.oap.log.analyzer.provider.log.LogAnalyzerServiceImpl; import org.apache.skywalking.oap.log.analyzer.provider.log.LogAnalyzerServiceImpl;
import org.apache.skywalking.oap.log.analyzer.provider.log.listener.RecordAnalysisListener; import org.apache.skywalking.oap.log.analyzer.provider.log.listener.LogFilterListener;
import org.apache.skywalking.oap.log.analyzer.provider.log.listener.TrafficAnalysisListener;
import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule; import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.library.module.ModuleConfig; import org.apache.skywalking.oap.server.library.module.ModuleConfig;
...@@ -59,12 +58,15 @@ public class LogAnalyzerModuleProvider extends ModuleProvider { ...@@ -59,12 +58,15 @@ public class LogAnalyzerModuleProvider extends ModuleProvider {
@Override @Override
public void start() throws ServiceNotProvidedException, ModuleStartException { public void start() throws ServiceNotProvidedException, ModuleStartException {
logAnalyzerService.addListenerFactory(new RecordAnalysisListener.Factory(getManager(), moduleConfig)); try {
logAnalyzerService.addListenerFactory(new TrafficAnalysisListener.Factory(getManager(), moduleConfig)); logAnalyzerService.addListenerFactory(new LogFilterListener.Factory(getManager(), moduleConfig));
} catch (final Exception e) {
throw new ModuleStartException("Failed to create LAL listener.", e);
}
} }
@Override @Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException { public void notifyAfterCompleted() throws ServiceNotProvidedException {
} }
......
...@@ -65,6 +65,6 @@ public class LogAnalyzer { ...@@ -65,6 +65,6 @@ public class LogAnalyzer {
private void createListeners() { private void createListeners() {
factoryManager.getLogAnalysisListenerFactories() factoryManager.getLogAnalysisListenerFactories()
.forEach(factory -> listeners.add(factory.create(moduleManager, moduleConfig))); .forEach(factory -> listeners.add(factory.create()));
} }
} }
...@@ -31,6 +31,7 @@ public interface LogAnalysisListener { ...@@ -31,6 +31,7 @@ public interface LogAnalysisListener {
/** /**
* Parse the raw data from the probe. * Parse the raw data from the probe.
* @return {@code this} for chaining.
*/ */
void parse(LogData.Builder logData); LogAnalysisListener parse(LogData.Builder logData);
} }
...@@ -17,14 +17,11 @@ ...@@ -17,14 +17,11 @@
package org.apache.skywalking.oap.log.analyzer.provider.log.listener; package org.apache.skywalking.oap.log.analyzer.provider.log.listener;
import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
/** /**
* LogAnalysisListenerFactory implementation creates the listener instance when required. * LogAnalysisListenerFactory implementation creates the listener instance when required.
* Every LogAnalysisListener could have its own creation factory. * Every LogAnalysisListener could have its own creation factory.
*/ */
public interface LogAnalysisListenerFactory { public interface LogAnalysisListenerFactory {
LogAnalysisListener create(ModuleManager moduleManager, LogAnalyzerModuleConfig moduleConfig); LogAnalysisListener create();
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.log.analyzer.provider.log.listener;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.oap.log.analyzer.dsl.Binding;
import org.apache.skywalking.oap.log.analyzer.dsl.DSL;
import org.apache.skywalking.oap.log.analyzer.provider.LALConfig;
import org.apache.skywalking.oap.log.analyzer.provider.LALConfigs;
import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
@Slf4j
@RequiredArgsConstructor
public class LogFilterListener implements LogAnalysisListener {
private final List<DSL> dsls;
@Override
public void build() {
dsls.forEach(dsl -> {
try {
dsl.evaluate();
} catch (final Exception e) {
log.warn("Failed to evaluate dsl: {}", dsl, e);
}
});
}
@Override
public LogAnalysisListener parse(final LogData.Builder logData) {
dsls.forEach(dsl -> dsl.bind(new Binding().log(logData.build())));
return this;
}
public static class Factory implements LogAnalysisListenerFactory {
private final List<DSL> dsls;
public Factory(final ModuleManager moduleManager, final LogAnalyzerModuleConfig config) throws Exception {
dsls = new ArrayList<>();
final List<LALConfig> configList = LALConfigs.load(config.getLalPath(), config.lalFiles())
.stream()
.flatMap(it -> it.getRules().stream())
.collect(Collectors.toList());
for (final LALConfig c : configList) {
dsls.add(DSL.of(moduleManager, config, c.getDsl()));
}
}
@Override
public LogAnalysisListener create() {
return new LogFilterListener(dsls);
}
}
}
...@@ -57,7 +57,7 @@ public class RecordAnalysisListener implements LogAnalysisListener { ...@@ -57,7 +57,7 @@ public class RecordAnalysisListener implements LogAnalysisListener {
} }
@Override @Override
public void parse(final LogData.Builder logData) { public LogAnalysisListener parse(final LogData.Builder logData) {
LogDataBody body = logData.getBody(); LogDataBody body = logData.getBody();
log.setUniqueId(UUID.randomUUID().toString().replace("-", "")); log.setUniqueId(UUID.randomUUID().toString().replace("-", ""));
// timestamp // timestamp
...@@ -105,6 +105,7 @@ public class RecordAnalysisListener implements LogAnalysisListener { ...@@ -105,6 +105,7 @@ public class RecordAnalysisListener implements LogAnalysisListener {
log.setTagsRawData(logData.getTags().toByteArray()); log.setTagsRawData(logData.getTags().toByteArray());
} }
log.getTags().addAll(appendSearchableTags(logData)); log.getTags().addAll(appendSearchableTags(logData));
return this;
} }
private Collection<Tag> appendSearchableTags(LogData.Builder logData) { private Collection<Tag> appendSearchableTags(LogData.Builder logData) {
...@@ -112,9 +113,7 @@ public class RecordAnalysisListener implements LogAnalysisListener { ...@@ -112,9 +113,7 @@ public class RecordAnalysisListener implements LogAnalysisListener {
logData.getTags().getDataList().forEach(tag -> { logData.getTags().getDataList().forEach(tag -> {
if (searchableTagKeys.contains(tag.getKey())) { if (searchableTagKeys.contains(tag.getKey())) {
final Tag logTag = new Tag(tag.getKey(), tag.getValue()); final Tag logTag = new Tag(tag.getKey(), tag.getValue());
if (!logTags.contains(logTag)) { logTags.add(logTag);
logTags.add(logTag);
}
} }
}); });
return logTags; return logTags;
...@@ -139,8 +138,7 @@ public class RecordAnalysisListener implements LogAnalysisListener { ...@@ -139,8 +138,7 @@ public class RecordAnalysisListener implements LogAnalysisListener {
} }
@Override @Override
public LogAnalysisListener create(final ModuleManager moduleManager, public LogAnalysisListener create() {
final LogAnalyzerModuleConfig moduleConfig) {
return new RecordAnalysisListener(sourceReceiver, namingControl, searchableTagKeys); return new RecordAnalysisListener(sourceReceiver, namingControl, searchableTagKeys);
} }
} }
......
...@@ -61,7 +61,7 @@ public class TrafficAnalysisListener implements LogAnalysisListener { ...@@ -61,7 +61,7 @@ public class TrafficAnalysisListener implements LogAnalysisListener {
} }
@Override @Override
public void parse(final LogData.Builder logData) { public LogAnalysisListener parse(final LogData.Builder logData) {
final long timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(), DownSampling.Minute); final long timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(), DownSampling.Minute);
// to service traffic // to service traffic
String serviceName = namingControl.formatServiceName(logData.getService()); String serviceName = namingControl.formatServiceName(logData.getService());
...@@ -85,6 +85,7 @@ public class TrafficAnalysisListener implements LogAnalysisListener { ...@@ -85,6 +85,7 @@ public class TrafficAnalysisListener implements LogAnalysisListener {
endpointMeta.setEndpoint(namingControl.formatEndpointName(serviceName, logData.getEndpoint())); endpointMeta.setEndpoint(namingControl.formatEndpointName(serviceName, logData.getEndpoint()));
endpointMeta.setTimeBucket(timeBucket); endpointMeta.setTimeBucket(timeBucket);
} }
return this;
} }
public static class Factory implements LogAnalysisListenerFactory { public static class Factory implements LogAnalysisListenerFactory {
...@@ -96,13 +97,12 @@ public class TrafficAnalysisListener implements LogAnalysisListener { ...@@ -96,13 +97,12 @@ public class TrafficAnalysisListener implements LogAnalysisListener {
.provider() .provider()
.getService(SourceReceiver.class); .getService(SourceReceiver.class);
this.namingControl = moduleManager.find(CoreModule.NAME) this.namingControl = moduleManager.find(CoreModule.NAME)
.provider(). .provider()
getService(NamingControl.class); .getService(NamingControl.class);
} }
@Override @Override
public LogAnalysisListener create(final ModuleManager moduleManager, public LogAnalysisListener create() {
final LogAnalyzerModuleConfig moduleConfig) {
return new TrafficAnalysisListener(sourceReceiver, namingControl); return new TrafficAnalysisListener(sourceReceiver, namingControl);
} }
} }
......
...@@ -283,6 +283,8 @@ ...@@ -283,6 +283,8 @@
<exclude>otel-oc-rules/</exclude> <exclude>otel-oc-rules/</exclude>
<exclude>ui-initialized-templates/</exclude> <exclude>ui-initialized-templates/</exclude>
<exclude>zabbix-rules/</exclude> <exclude>zabbix-rules/</exclude>
<exclude>lal/</exclude>
<exclude>log-mal-rules/</exclude>
</excludes> </excludes>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -230,6 +230,8 @@ agent-analyzer: ...@@ -230,6 +230,8 @@ agent-analyzer:
log-analyzer: log-analyzer:
selector: ${SW_LOG_ANALYZER:default} selector: ${SW_LOG_ANALYZER:default}
default: default:
lalFiles: ${SW_LOG_LAL_FILES:default}
malFiles: ${SW_LOG_MAL_FILES:""}
event-analyzer: event-analyzer:
selector: ${SW_EVENT_ANALYZER:default} selector: ${SW_EVENT_ANALYZER:default}
......
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# The default LAL script to save all logs, behaving like the versions before 8.5.0.
rules:
- name: default
dsl: |
filter {
sink {
}
}
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Refer to examples in config-examples/log-mal.yaml
...@@ -25,7 +25,7 @@ services: ...@@ -25,7 +25,7 @@ services:
environment: environment:
- discovery.type=single-node - discovery.type=single-node
healthcheck: healthcheck:
test: [ "CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9200" ] test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9200"]
interval: 5s interval: 5s
timeout: 60s timeout: 60s
retries: 120 retries: 120
...@@ -36,6 +36,11 @@ services: ...@@ -36,6 +36,11 @@ services:
service: oap service: oap
environment: environment:
SW_STORAGE: elasticsearch SW_STORAGE: elasticsearch
SW_LOG_LAL_FILES: test
SW_LOG_MAL_FILES: test
volumes:
- ./lal.yaml:/skywalking/config/lal/test.yaml
- ./log-mal.yaml:/skywalking/config/log-mal-rules/test.yaml
depends_on: depends_on:
es: es:
condition: service_healthy condition: service_healthy
...@@ -48,4 +53,4 @@ services: ...@@ -48,4 +53,4 @@ services:
oap: oap:
condition: service_healthy condition: service_healthy
networks: networks:
e2e: e2e:
\ No newline at end of file
...@@ -36,6 +36,11 @@ services: ...@@ -36,6 +36,11 @@ services:
service: oap-es7 service: oap-es7
environment: environment:
SW_STORAGE: elasticsearch7 SW_STORAGE: elasticsearch7
SW_LOG_LAL_FILES: test
SW_LOG_MAL_FILES: test
volumes:
- ./lal.yaml:/skywalking/config/lal/test.yaml
- ./log-mal.yaml:/skywalking/config/log-mal-rules/test.yaml
depends_on: depends_on:
es: es:
condition: service_healthy condition: service_healthy
...@@ -48,4 +53,4 @@ services: ...@@ -48,4 +53,4 @@ services:
oap: oap:
condition: service_healthy condition: service_healthy
networks: networks:
e2e: e2e:
\ No newline at end of file
...@@ -25,7 +25,7 @@ services: ...@@ -25,7 +25,7 @@ services:
expose: expose:
- 1521 - 1521
healthcheck: healthcheck:
test: ["CMD", "sh", "-c", "nc -z 127.0.0.1 1521"] test: [ "CMD", "sh", "-c", "nc -z 127.0.0.1 1521" ]
interval: 5s interval: 5s
timeout: 60s timeout: 60s
retries: 120 retries: 120
...@@ -37,6 +37,11 @@ services: ...@@ -37,6 +37,11 @@ services:
environment: environment:
SW_STORAGE: h2 SW_STORAGE: h2
SW_STORAGE_H2_URL: jdbc:h2:tcp://h2db:1521/skywalking-oap-db SW_STORAGE_H2_URL: jdbc:h2:tcp://h2db:1521/skywalking-oap-db
SW_LOG_LAL_FILES: test
SW_LOG_MAL_FILES: test
volumes:
- ./lal.yaml:/skywalking/config/lal/test.yaml
- ./log-mal.yaml:/skywalking/config/log-mal-rules/test.yaml
depends_on: depends_on:
h2db: h2db:
condition: service_healthy condition: service_healthy
...@@ -50,4 +55,4 @@ services: ...@@ -50,4 +55,4 @@ services:
condition: service_healthy condition: service_healthy
networks: networks:
e2e: e2e:
\ No newline at end of file
...@@ -34,6 +34,11 @@ services: ...@@ -34,6 +34,11 @@ services:
service: oap service: oap
environment: environment:
SW_STORAGE: influxdb SW_STORAGE: influxdb
SW_LOG_LAL_FILES: test
SW_LOG_MAL_FILES: test
volumes:
- ./lal.yaml:/skywalking/config/lal/test.yaml
- ./log-mal.yaml:/skywalking/config/log-mal-rules/test.yaml
depends_on: depends_on:
influxdb: influxdb:
condition: service_healthy condition: service_healthy
...@@ -47,4 +52,4 @@ services: ...@@ -47,4 +52,4 @@ services:
condition: service_healthy condition: service_healthy
networks: networks:
e2e: e2e:
\ No newline at end of file
...@@ -38,6 +38,11 @@ services: ...@@ -38,6 +38,11 @@ services:
service: oap service: oap
environment: environment:
SW_STORAGE: mysql SW_STORAGE: mysql
SW_LOG_LAL_FILES: test
SW_LOG_MAL_FILES: test
volumes:
- ./lal.yaml:/skywalking/config/lal/test.yaml
- ./log-mal.yaml:/skywalking/config/log-mal-rules/test.yaml
depends_on: depends_on:
mysql: mysql:
condition: service_healthy condition: service_healthy
...@@ -52,4 +57,4 @@ services: ...@@ -52,4 +57,4 @@ services:
condition: service_healthy condition: service_healthy
networks: networks:
e2e: e2e:
\ No newline at end of file
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
rules:
- name: example
dsl: |
filter {
text {
regexp $/(?s)(?<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}.\d{3}) \[TID:(?<tid>.+?)] \[(?<thread>.+?)] (?<level>\w{4,}) (?<logger>.{1,36}) (?<msg>.+)/$
}
extractor {
metrics {
timestamp log.timestamp
labels level: parsed.level, service: log.service, instance: log.serviceInstance
name "log_count"
value 1
}
}
sink {
}
}
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This will parse a textual representation of a duration. The formats
# accepted are based on the ISO-8601 duration format {@code PnDTnHnMn.nS}
# with days considered to be exactly 24 hours.
# <p>
# Examples:
# <pre>
# "PT20.345S" -- parses as "20.345 seconds"
# "PT15M" -- parses as "15 minutes" (where a minute is 60 seconds)
# "PT10H" -- parses as "10 hours" (where an hour is 3600 seconds)
# "P2D" -- parses as "2 days" (where a day is 24 hours or 86400 seconds)
# "P2DT3H4M" -- parses as "2 days, 3 hours and 4 minutes"
# "P-6H3M" -- parses as "-6 hours and +3 minutes"
# "-P6H3M" -- parses as "-6 hours and -3 minutes"
# "-P-6H+3M" -- parses as "+6 hours and -3 minutes"
# </pre>
expSuffix: instance(['service'], ['instance'])
metricPrefix: log
metricsRules:
- name: count_info
exp: log_count.tagEqual('level', 'INFO').sum(['service', 'instance'])
...@@ -25,13 +25,15 @@ import org.apache.skywalking.e2e.annotation.DockerCompose; ...@@ -25,13 +25,15 @@ import org.apache.skywalking.e2e.annotation.DockerCompose;
import org.apache.skywalking.e2e.base.SkyWalkingE2E; import org.apache.skywalking.e2e.base.SkyWalkingE2E;
import org.apache.skywalking.e2e.base.SkyWalkingTestAdapter; import org.apache.skywalking.e2e.base.SkyWalkingTestAdapter;
import org.apache.skywalking.e2e.common.HostAndPort; import org.apache.skywalking.e2e.common.HostAndPort;
import org.apache.skywalking.e2e.metrics.AtLeastOneOfMetricsMatcher;
import org.apache.skywalking.e2e.metrics.MetricsValueMatcher;
import org.apache.skywalking.e2e.metrics.ReadMetrics;
import org.apache.skywalking.e2e.metrics.ReadMetricsQuery;
import org.apache.skywalking.e2e.retryable.RetryableTest; import org.apache.skywalking.e2e.retryable.RetryableTest;
import org.apache.skywalking.e2e.service.Service; import org.apache.skywalking.e2e.service.Service;
import org.apache.skywalking.e2e.service.ServicesMatcher; import org.apache.skywalking.e2e.service.ServicesMatcher;
import org.apache.skywalking.e2e.service.ServicesQuery; import org.apache.skywalking.e2e.service.ServicesQuery;
import org.apache.skywalking.e2e.service.endpoint.EndpointQuery; import org.apache.skywalking.e2e.service.instance.Instance;
import org.apache.skywalking.e2e.service.endpoint.Endpoints;
import org.apache.skywalking.e2e.service.endpoint.EndpointsMatcher;
import org.apache.skywalking.e2e.service.instance.Instances; import org.apache.skywalking.e2e.service.instance.Instances;
import org.apache.skywalking.e2e.service.instance.InstancesMatcher; import org.apache.skywalking.e2e.service.instance.InstancesMatcher;
import org.apache.skywalking.e2e.service.instance.InstancesQuery; import org.apache.skywalking.e2e.service.instance.InstancesQuery;
...@@ -136,12 +138,26 @@ public class LogE2E extends SkyWalkingTestAdapter { ...@@ -136,12 +138,26 @@ public class LogE2E extends SkyWalkingTestAdapter {
LOGGER.info("instances: {}", instances); LOGGER.info("instances: {}", instances);
load("expected/log/instances.yml").as(InstancesMatcher.class).verify(instances); load("expected/log/instances.yml").as(InstancesMatcher.class).verify(instances);
}
private void verifyServiceEndpoints(final Service service) throws Exception { verifyInstanceMetrics(service, instances);
final Endpoints endpoints = graphql.endpoints(new EndpointQuery().serviceId(service.getKey())); }
LOGGER.info("endpoints: {}", endpoints);
load("expected/log/endpoints.yml").as(EndpointsMatcher.class).verify(endpoints); private void verifyInstanceMetrics(final Service service, final Instances instances) throws Exception {
for (Instance instance : instances.getInstances()) {
final String metricsName = "log_count_info";
LOGGER.info("verifying service instance response time: {}", instance);
final ReadMetrics instanceMetrics = graphql.readMetrics(
new ReadMetricsQuery().stepByMinute().metricsName(metricsName)
.serviceName(service.getLabel()).instanceName(instance.getLabel())
);
LOGGER.info("{}: {}", metricsName, instanceMetrics);
final AtLeastOneOfMetricsMatcher instanceRespTimeMatcher = new AtLeastOneOfMetricsMatcher();
final MetricsValueMatcher greaterThanZero = new MetricsValueMatcher();
greaterThanZero.setValue("gt 0");
instanceRespTimeMatcher.setValue(greaterThanZero);
instanceRespTimeMatcher.verify(instanceMetrics.getValues());
}
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册