diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index 1ce3653081ef1cf2695ed10d83246084f36ae209..fee12e16e2a512784b1019402e604fade1b38b23 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -76,6 +76,8 @@ original `Service,ServiceInstance,ServiceRelation,ServiceInstanceRelation`. * [**Breaking Change**] TCP-related source names are changed, fields of TCP-related sources are changed, please refer to the latest `oal/tcp.oal` file. * Do not log error logs when failed to create ElasticSearch index because the index is created already. +* Support `sampledTrace` in LAL. +* Support multiple rules with different names under the same layer of LAL script. #### UI diff --git a/docs/en/concepts-and-designs/lal.md b/docs/en/concepts-and-designs/lal.md index 9511748b313db52b34881dfcce34d36458fdc7f3..95017917f3ad61e0f23f50b3fd8e52da433789af 100644 --- a/docs/en/concepts-and-designs/lal.md +++ b/docs/en/concepts-and-designs/lal.md @@ -338,6 +338,77 @@ filter { } } ``` +- `sampledTrace` + +`sampledTrace` aims to convert LogData to SampledTrace Records. It extracts data from `parsed` result and save them as SampledTraceRecord. SampledTrace will not abort or edit logs, you can use other LAL for further processing. +We require a log tag `"LOG_KIND" = "NET_PROFILING_SAMPLED_TRACE"` to make OAP distinguish slow trace logs from other log reports. +An example of JSON sent to OAP is as following: +``` json +[ + { + "tags":{ + "data":[ + { + "key":"LOG_KIND", + "value":"NET_PROFILING_SAMPLED_TRACE" + } + ] + }, + "layer":"MESH", + "body":{ + "json":{ + "json":"{\"uri\":\"/provider\",\"reason\":\"slow\",\"latency\":2048,\"client_process\":{\"process_id\":\"c1519f4555ec11eda8df0242ac1d0002\",\"local\":false,\"address\":\"\"},\"server_process\":{\"process_id\":\"\",\"local\":false,\"address\":\"172.31.0.3:443\"},\"detect_point\":\"client\",\"component\":\"http\",\"ssl\":true}" + } + }, + "service":"test-service", + "serviceInstance":"test-service-instance", + "timestamp": 1666916962406, + } +] +``` +Examples are as follows: + +```groovy +filter { + json { + } + if (tag("LOG_KIND") == "NET_PROFILING_SAMPLED_TRACE") { + sampledTrace { + latency parsed.latency as Long + uri parsed.uri as String + reason parsed.reason as String + + if (parsed.client_process.process_id as String != "") { + processId parsed.client_process.process_id as String + } else if (parsed.client_process.local as Boolean) { + processId ProcessRegistry.generateVirtualLocalProcess(parsed.service as String, parsed.serviceInstance as String) as String + } else { + processId ProcessRegistry.generateVirtualRemoteProcess(parsed.service as String, parsed.serviceInstance as String, parsed.client_process.address as String) as String + } + + if (parsed.server_process.process_id as String != "") { + destProcessId parsed.server_process.process_id as String + } else if (parsed.server_process.local as Boolean) { + destProcessId ProcessRegistry.generateVirtualLocalProcess(parsed.service as String, parsed.serviceInstance as String) as String + } else { + destProcessId ProcessRegistry.generateVirtualRemoteProcess(parsed.service as String, parsed.serviceInstance as String, parsed.server_process.address as String) as String + } + + detectPoint parsed.detect_point as String + + if (parsed.component as String == "http" && parsed.ssl as Boolean) { + componentId 129 + } else if (parsed.component as String == "http") { + componentId 49 + } else if (parsed.ssl as Boolean) { + componentId 130 + } else { + componentId 110 + } + } + } +} +``` ### Sink diff --git a/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/SampledTraceBuilder.java b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/SampledTraceBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..e3c628fc889b5f8448ce10466f0366131a70d73e --- /dev/null +++ b/oap-server/analyzer/agent-analyzer/src/main/java/org/apache/skywalking/oap/server/analyzer/provider/trace/parser/listener/SampledTraceBuilder.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import org.apache.skywalking.oap.server.core.analysis.DownSampling; +import org.apache.skywalking.oap.server.core.analysis.IDManager; +import org.apache.skywalking.oap.server.core.analysis.Layer; +import org.apache.skywalking.oap.server.core.analysis.TimeBucket; +import org.apache.skywalking.oap.server.core.analysis.manual.trace.SampledSlowTraceRecord; +import org.apache.skywalking.oap.server.core.analysis.record.Record; +import org.apache.skywalking.oap.server.core.config.NamingControl; +import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; +import org.apache.skywalking.oap.server.core.source.DetectPoint; +import org.apache.skywalking.oap.server.core.source.ISource; +import org.apache.skywalking.oap.server.core.source.ProcessRelation; + +@RequiredArgsConstructor +public class SampledTraceBuilder { + private final NamingControl namingControl; + + @Setter + @Getter + private String traceId; + @Setter + @Getter + private String uri; + @Setter + @Getter + private long latency; + @Setter + @Getter + private Reason reason; + + @Setter + @Getter + private String layer; + @Setter + @Getter + private String serviceName; + @Setter + @Getter + private String serviceInstanceName; + @Setter + @Getter + private String processId; + @Setter + @Getter + private String destProcessId; + @Setter + @Getter + private int componentId; + @Setter + @Getter + private DetectPoint detectPoint; + + @Setter + @Getter + private long timestamp; + + public void validate() { + Preconditions.checkArgument(!Strings.isNullOrEmpty(traceId), "traceId can't be empty"); + Preconditions.checkArgument(!Strings.isNullOrEmpty(uri), "uri can't be empty"); + Preconditions.checkArgument(latency > 0, "latency must bigger zero"); + Preconditions.checkArgument(reason != null, "reason can't be empty"); + Preconditions.checkArgument(layer != null, "layer can't be empty"); + Preconditions.checkArgument(!Strings.isNullOrEmpty(serviceName), "service name can't be empty"); + Preconditions.checkArgument(!Strings.isNullOrEmpty(serviceInstanceName), "service instance name can't be empty"); + Preconditions.checkArgument(!Strings.isNullOrEmpty(processId), "processId can't be empty"); + Preconditions.checkArgument(!Strings.isNullOrEmpty(destProcessId), "destProcessId can't be empty"); + Preconditions.checkArgument(componentId > 0, "componentId must bigger zero"); + Preconditions.checkArgument(detectPoint != null, "detestPoint can't be empty"); + Preconditions.checkArgument(timestamp > 0, "timestamp must bigger zero"); + } + + public Record toRecord() { + final SampledSlowTraceRecord record = new SampledSlowTraceRecord(); + record.setScope(DefaultScopeDefine.PROCESS_RELATION); + record.setEntityId(IDManager.ProcessID.buildRelationId(new IDManager.ProcessID.ProcessRelationDefine( + processId, destProcessId + ))); + record.setTraceId(traceId); + record.setUri(uri); + record.setLatency(latency); + record.setTimeBucket(TimeBucket.getTimeBucket(timestamp, DownSampling.Second)); + return record; + } + + public ISource toEntity() { + final ProcessRelation processRelation = new ProcessRelation(); + final String serviceId = IDManager.ServiceID.buildId(namingControl.formatServiceName(serviceName), + Layer.nameOf(layer).isNormal()); + final String instanceId = IDManager.ServiceInstanceID.buildId(serviceId, namingControl.formatInstanceName(serviceInstanceName)); + processRelation.setInstanceId(instanceId); + processRelation.setSourceProcessId(processId); + processRelation.setDestProcessId(destProcessId); + processRelation.setDetectPoint(detectPoint); + processRelation.setComponentId(componentId); + return processRelation; + } + + /** + * The reason of sampled trace. + */ + public enum Reason { + SLOW + } +} \ No newline at end of file diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/Binding.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/Binding.java index ff80fec594b2b741b4de59a69b0819bba337b88a..763dd64d28efb94f8da45d623812cb94d11f8a16 100644 --- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/Binding.java +++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/Binding.java @@ -32,6 +32,7 @@ import org.apache.skywalking.apm.network.logging.v3.LogData; import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily; import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.DatabaseSlowStatementBuilder; +import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.SampledTraceBuilder; import org.apache.skywalking.oap.server.core.source.Log; /** @@ -53,6 +54,8 @@ public class Binding extends groovy.lang.Binding { public static final String KEY_DATABASE_SLOW_STATEMENT = "database_slow_statement"; + public static final String KEY_SAMPLED_TRACE = "sampled_trace"; + public Binding() { setProperty(KEY_PARSED, new Parsed()); } @@ -107,6 +110,15 @@ public class Binding extends groovy.lang.Binding { return this; } + public SampledTraceBuilder sampledTraceBuilder() { + return (SampledTraceBuilder) getProperty(KEY_SAMPLED_TRACE); + } + + public Binding sampledTrace(SampledTraceBuilder sampledTraceBuilder) { + setProperty(KEY_SAMPLED_TRACE, sampledTraceBuilder); + return this; + } + public Binding save() { setProperty(KEY_SAVE, true); return this; diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/DSL.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/DSL.java index 7b54de6140bbfcd98686fa3af22eea90c3ebae26..ec7a85841cf609b2a401108f1c926cb1095d5808 100644 --- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/DSL.java +++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/DSL.java @@ -30,6 +30,7 @@ import lombok.RequiredArgsConstructor; import org.apache.skywalking.oap.log.analyzer.dsl.spec.LALDelegatingScript; import org.apache.skywalking.oap.log.analyzer.dsl.spec.filter.FilterSpec; import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig; +import org.apache.skywalking.oap.meter.analyzer.dsl.registry.ProcessRegistry; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleStartException; import org.codehaus.groovy.ast.stmt.DoWhileStatement; @@ -38,6 +39,7 @@ import org.codehaus.groovy.ast.stmt.Statement; import org.codehaus.groovy.ast.stmt.WhileStatement; import org.codehaus.groovy.control.CompilerConfiguration; import org.codehaus.groovy.control.customizers.ASTTransformationCustomizer; +import org.codehaus.groovy.control.customizers.ImportCustomizer; import org.codehaus.groovy.control.customizers.SecureASTCustomizer; import static java.util.Collections.singletonList; @@ -76,10 +78,16 @@ public class DSL { .add(Map.class) .add(List.class) .add(Array.class) + .add(String.class) + .add(ProcessRegistry.class) .build()); cc.addCompilationCustomizers(secureASTCustomizer); cc.setScriptBaseClass(LALDelegatingScript.class.getName()); + ImportCustomizer icz = new ImportCustomizer(); + icz.addImport("ProcessRegistry", ProcessRegistry.class.getName()); + cc.addCompilationCustomizers(icz); + final GroovyShell sh = new GroovyShell(cc); final DelegatingScript script = (DelegatingScript) sh.parse(dsl); final FilterSpec filterSpec = new FilterSpec(moduleManager, config); diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/extractor/ExtractorSpec.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/extractor/ExtractorSpec.java index 7eb6ce48e78d9080f7fa5035cf8c3627a3ec4674..bcad590f0d825dfb3b29dc43c3b69410e37aad94 100644 --- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/extractor/ExtractorSpec.java +++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/extractor/ExtractorSpec.java @@ -41,17 +41,22 @@ import org.apache.skywalking.apm.network.logging.v3.LogData; import org.apache.skywalking.apm.network.logging.v3.TraceContext; import org.apache.skywalking.oap.log.analyzer.dsl.spec.AbstractSpec; import org.apache.skywalking.oap.log.analyzer.dsl.spec.extractor.slowsql.SlowSqlSpec; +import org.apache.skywalking.oap.log.analyzer.dsl.spec.extractor.sampledtrace.SampledTraceSpec; import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig; import org.apache.skywalking.oap.meter.analyzer.MetricConvert; import org.apache.skywalking.oap.meter.analyzer.dsl.Sample; import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily; import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamilyBuilder; import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.DatabaseSlowStatementBuilder; +import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.SampledTraceBuilder; import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.analysis.Layer; import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem; +import org.apache.skywalking.oap.server.core.analysis.record.Record; +import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor; import org.apache.skywalking.oap.server.core.config.NamingControl; import org.apache.skywalking.oap.server.core.source.DatabaseSlowStatement; +import org.apache.skywalking.oap.server.core.source.ISource; import org.apache.skywalking.oap.server.core.source.ServiceMeta; import org.apache.skywalking.oap.server.core.source.SourceReceiver; import org.apache.skywalking.oap.server.library.module.ModuleManager; @@ -69,6 +74,7 @@ public class ExtractorSpec extends AbstractSpec { private final List metricConverts; private final SlowSqlSpec slowSql; + private final SampledTraceSpec sampledTrace; private final NamingControl namingControl; @@ -89,6 +95,7 @@ public class ExtractorSpec extends AbstractSpec { .collect(Collectors.toList()); slowSql = new SlowSqlSpec(moduleManager(), moduleConfig()); + sampledTrace = new SampledTraceSpec(moduleManager(), moduleConfig()); namingControl = moduleManager.find(CoreModule.NAME) .provider() @@ -292,6 +299,30 @@ public class ExtractorSpec extends AbstractSpec { sourceReceiver.receive(serviceMeta); } + @SuppressWarnings("unused") + public void sampledTrace(@DelegatesTo(SampledTraceSpec.class) final Closure cl) { + if (BINDING.get().shouldAbort()) { + return; + } + LogData.Builder log = BINDING.get().log(); + SampledTraceBuilder builder = new SampledTraceBuilder(namingControl); + builder.setLayer(log.getLayer()); + builder.setTimestamp(log.getTimestamp()); + builder.setServiceName(log.getService()); + builder.setServiceInstanceName(log.getServiceInstance()); + builder.setTraceId(log.getTraceContext().getTraceId()); + BINDING.get().sampledTrace(builder); + + cl.setDelegate(sampledTrace); + cl.call(); + + builder.validate(); + final Record record = builder.toRecord(); + final ISource entity = builder.toEntity(); + RecordStreamProcessor.getInstance().in(record); + sourceReceiver.receive(entity); + } + public static class SampleBuilder { @Delegate private final Sample.SampleBuilder sampleBuilder = Sample.builder(); diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/extractor/sampledtrace/SampledTraceSpec.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/extractor/sampledtrace/SampledTraceSpec.java new file mode 100644 index 0000000000000000000000000000000000000000..2034548bf64cbb4b8260bef9e8917422b8f451f4 --- /dev/null +++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/dsl/spec/extractor/sampledtrace/SampledTraceSpec.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.log.analyzer.dsl.spec.extractor.sampledtrace; + +import org.apache.skywalking.oap.log.analyzer.dsl.spec.AbstractSpec; +import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig; +import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.SampledTraceBuilder; +import org.apache.skywalking.oap.server.core.source.DetectPoint; +import org.apache.skywalking.oap.server.library.module.ModuleManager; + +import static java.util.Objects.nonNull; + +public class SampledTraceSpec extends AbstractSpec { + public SampledTraceSpec(ModuleManager moduleManager, LogAnalyzerModuleConfig moduleConfig) { + super(moduleManager, moduleConfig); + } + + public void latency(final Long latency) { + if (BINDING.get().shouldAbort()) { + return; + } + if (nonNull(latency)) { + final SampledTraceBuilder sampledTraceBuilder = BINDING.get().sampledTraceBuilder(); + sampledTraceBuilder.setLatency(latency); + } + } + + public void uri(final String uri) { + if (BINDING.get().shouldAbort()) { + return; + } + if (nonNull(uri)) { + final SampledTraceBuilder sampledTraceBuilder = BINDING.get().sampledTraceBuilder(); + sampledTraceBuilder.setUri(uri); + } + } + + public void reason(final String reason) { + if (BINDING.get().shouldAbort()) { + return; + } + if (nonNull(reason)) { + final SampledTraceBuilder sampledTraceBuilder = BINDING.get().sampledTraceBuilder(); + sampledTraceBuilder.setReason(SampledTraceBuilder.Reason.valueOf(reason.toUpperCase())); + } + } + + public void processId(final String id) { + if (BINDING.get().shouldAbort()) { + return; + } + if (nonNull(id)) { + final SampledTraceBuilder sampledTraceBuilder = BINDING.get().sampledTraceBuilder(); + sampledTraceBuilder.setProcessId(id); + } + } + + public void destProcessId(final String id) { + if (BINDING.get().shouldAbort()) { + return; + } + if (nonNull(id)) { + final SampledTraceBuilder sampledTraceBuilder = BINDING.get().sampledTraceBuilder(); + sampledTraceBuilder.setDestProcessId(id); + } + } + + public void detectPoint(String detectPoint) { + if (BINDING.get().shouldAbort()) { + return; + } + if (nonNull(detectPoint)) { + final DetectPoint point = DetectPoint.valueOf(detectPoint.toUpperCase()); + final SampledTraceBuilder sampledTraceBuilder = BINDING.get().sampledTraceBuilder(); + sampledTraceBuilder.setDetectPoint(point); + } + } + + public void componentId(final int id) { + if (BINDING.get().shouldAbort()) { + return; + } + if (id > 0) { + final SampledTraceBuilder sampledTraceBuilder = BINDING.get().sampledTraceBuilder(); + sampledTraceBuilder.setComponentId(id); + } + } + +} \ No newline at end of file diff --git a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogFilterListener.java b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogFilterListener.java index 28bdd09744300d3328095d81349e61487fe9e119..8b9f0b1fb80c03eb40c5bd4fa23dd84b2d41159b 100644 --- a/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogFilterListener.java +++ b/oap-server/analyzer/log-analyzer/src/main/java/org/apache/skywalking/oap/log/analyzer/provider/log/listener/LogFilterListener.java @@ -19,6 +19,7 @@ package org.apache.skywalking.oap.log.analyzer.provider.log.listener; import com.google.protobuf.Message; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -40,27 +41,29 @@ import org.apache.skywalking.oap.server.library.module.ModuleStartException; @Slf4j @RequiredArgsConstructor public class LogFilterListener implements LogAnalysisListener { - @lombok.NonNull - private final DSL dsl; + private final Collection dsls; @Override public void build() { - try { - dsl.evaluate(); - } catch (final Exception e) { - log.warn("Failed to evaluate dsl: {}", dsl, e); - } + dsls.forEach(dsl -> { + try { + dsl.evaluate(); + } catch (final Exception e) { + log.warn("Failed to evaluate dsl: {}", dsl, e); + } + }); } @Override public LogAnalysisListener parse(final LogData.Builder logData, final Message extraLog) { - dsl.bind(new Binding().log(logData.build()).extraLog(extraLog)); + dsls.forEach(dsl -> dsl.bind(new Binding().log(logData.build()) + .extraLog(extraLog))); return this; } public static class Factory implements LogAnalysisListenerFactory { - private final Map dsls; + private final Map> dsls; public Factory(final ModuleManager moduleManager, final LogAnalyzerModuleConfig config) throws Exception { dsls = new HashMap<>(); @@ -71,8 +74,9 @@ public class LogFilterListener implements LogAnalysisListener { .collect(Collectors.toList()); for (final LALConfig c : configList) { Layer layer = Layer.nameOf(c.getLayer()); - if (dsls.put(layer, DSL.of(moduleManager, config, c.getDsl())) != null) { - throw new ModuleStartException("Layer " + layer.name() + " has already set a rule."); + Map dsls = this.dsls.computeIfAbsent(layer, k -> new HashMap<>()); + if (dsls.put(c.getName(), DSL.of(moduleManager, config, c.getDsl())) != null) { + throw new ModuleStartException("Layer " + layer.name() + " has already set " + c.getName() + " rule."); } } } @@ -82,11 +86,11 @@ public class LogFilterListener implements LogAnalysisListener { if (layer == null) { return null; } - final DSL dsl = dsls.get(layer); + final Map dsl = dsls.get(layer); if (dsl == null) { return null; } - return new LogFilterListener(dsl); + return new LogFilterListener(dsl.values()); } } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/trace/SampledSlowTraceRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/trace/SampledSlowTraceRecord.java new file mode 100644 index 0000000000000000000000000000000000000000..abe2d321ac10727b01303f98f49213feb86436d9 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/trace/SampledSlowTraceRecord.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.manual.trace; + +import lombok.Getter; +import lombok.Setter; +import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.core.analysis.Stream; +import org.apache.skywalking.oap.server.core.analysis.record.Record; +import org.apache.skywalking.oap.server.core.analysis.topn.TopN; +import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor; +import org.apache.skywalking.oap.server.core.source.ScopeDeclaration; +import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB; +import org.apache.skywalking.oap.server.core.storage.annotation.Column; +import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity; +import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage; +import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder; + +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SAMPLED_SLOW_TRACE; + +@Setter +@Getter +@ScopeDeclaration(id = SAMPLED_SLOW_TRACE, name = "SampledTraceSlowRecord") +@Stream(name = SampledSlowTraceRecord.INDEX_NAME, scopeId = SAMPLED_SLOW_TRACE, builder = SampledSlowTraceRecord.Builder.class, processor = RecordStreamProcessor.class) +public class SampledSlowTraceRecord extends Record { + + public static final String INDEX_NAME = "sampled_slow_trace_record"; + public static final String SCOPE = "scope"; + public static final String ENTITY_ID = "entity_id"; + public static final String TRACE_ID = TopN.TRACE_ID; + public static final String URI = TopN.STATEMENT; + public static final String LATENCY = "latency"; + + @Column(columnName = SCOPE) + private int scope; + @Column(columnName = ENTITY_ID) + private String entityId; + @Column(columnName = TRACE_ID) + @BanyanDB.ShardingKey(index = 0) + private String traceId; + @Column(columnName = URI, storageOnly = true) + private String uri; + @Column(columnName = LATENCY, dataType = Column.ValueDataType.SAMPLED_RECORD) + private long latency; + + @Override + public String id() { + return getTimeBucket() + Const.ID_CONNECTOR + entityId + Const.ID_CONNECTOR + traceId; + } + + public static class Builder implements StorageBuilder { + + @Override + public SampledSlowTraceRecord storage2Entity(Convert2Entity converter) { + final SampledSlowTraceRecord record = new SampledSlowTraceRecord(); + record.setScope(((Number) converter.get(SCOPE)).intValue()); + record.setEntityId((String) converter.get(ENTITY_ID)); + record.setTraceId((String) converter.get(TRACE_ID)); + record.setUri((String) converter.get(URI)); + record.setLatency(((Number) converter.get(LATENCY)).longValue()); + record.setTimeBucket(((Number) converter.get(TIME_BUCKET)).longValue()); + return record; + } + + @Override + public void entity2Storage(SampledSlowTraceRecord entity, Convert2Storage converter) { + converter.accept(SCOPE, entity.getScope()); + converter.accept(ENTITY_ID, entity.getEntityId()); + converter.accept(TRACE_ID, entity.getTraceId()); + converter.accept(URI, entity.getUri()); + converter.accept(LATENCY, entity.getLatency()); + converter.accept(TIME_BUCKET, entity.getTimeBucket()); + } + } +} \ No newline at end of file diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java index e8fbf11e1dfa7d2404a8db75efeed1ad9c4b6740..7f7d00f635135549bfbb1457b9a7592626e16650 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java @@ -114,6 +114,7 @@ public class DefaultScopeDefine { public static final int TCP_SERVICE_RELATION = 59; public static final int TCP_SERVICE_INSTANCE_RELATION = 60; public static final int TCP_SERVICE_INSTANCE_UPDATE = 61; + public static final int SAMPLED_SLOW_TRACE = 62; /** * Catalog of scope, the metrics processor could use this to group all generated metrics by oal rt. diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml index fba48e31e26bd1828edfa4ff3d8658169f5782c1..e2fa7942dad0bf5899d328cf5c69289b15c6e2f6 100644 --- a/oap-server/server-starter/src/main/resources/application.yml +++ b/oap-server/server-starter/src/main/resources/application.yml @@ -255,7 +255,7 @@ agent-analyzer: log-analyzer: selector: ${SW_LOG_ANALYZER:default} default: - lalFiles: ${SW_LOG_LAL_FILES:envoy-als,mysql-slowsql,pgsql-slowsql,default} + lalFiles: ${SW_LOG_LAL_FILES:envoy-als,mesh-dp,mysql-slowsql,pgsql-slowsql,k8s-service,default} malFiles: ${SW_LOG_MAL_FILES:""} event-analyzer: diff --git a/oap-server/server-starter/src/main/resources/lal/envoy-als.yaml b/oap-server/server-starter/src/main/resources/lal/envoy-als.yaml index cd326a1d8a0f35c53048bc820dfd9371449def40..e6530c3fa713e6fe7e755fcf6a19f584a4547c6a 100644 --- a/oap-server/server-starter/src/main/resources/lal/envoy-als.yaml +++ b/oap-server/server-starter/src/main/resources/lal/envoy-als.yaml @@ -46,3 +46,47 @@ rules: } } } + - name: network-profiling-slow-trace + layer: MESH + dsl: | + filter { + json{ + } + extractor{ + if (tag("LOG_KIND") == "NET_PROFILING_SAMPLED_TRACE") { + sampledTrace { + latency parsed.latency as Long + uri parsed.uri as String + reason parsed.reason as String + + if (parsed.client_process.process_id as String != "") { + processId parsed.client_process.process_id as String + } else if (parsed.client_process.local as Boolean) { + processId ProcessRegistry.generateVirtualLocalProcess(parsed.service as String, parsed.serviceInstance as String) as String + } else { + processId ProcessRegistry.generateVirtualRemoteProcess(parsed.service as String, parsed.serviceInstance as String, parsed.client_process.address as String) as String + } + + if (parsed.server_process.process_id as String != "") { + destProcessId parsed.server_process.process_id as String + } else if (parsed.server_process.local as Boolean) { + destProcessId ProcessRegistry.generateVirtualLocalProcess(parsed.service as String, parsed.serviceInstance as String) as String + } else { + destProcessId ProcessRegistry.generateVirtualRemoteProcess(parsed.service as String, parsed.serviceInstance as String, parsed.server_process.address as String) as String + } + + detectPoint parsed.detect_point as String + + if (parsed.component as String == "http" && parsed.ssl as Boolean) { + componentId 129 + } else if (parsed.component as String == "http") { + componentId 49 + } else if (parsed.ssl as Boolean) { + componentId 130 + } else { + componentId 110 + } + } + } + } + } diff --git a/oap-server/server-starter/src/main/resources/lal/k8s-service.yaml b/oap-server/server-starter/src/main/resources/lal/k8s-service.yaml new file mode 100644 index 0000000000000000000000000000000000000000..2992b39ed7a76fb0de86aebeeff267bbcebcb8a9 --- /dev/null +++ b/oap-server/server-starter/src/main/resources/lal/k8s-service.yaml @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# The default LAL script to save all logs, behaving like the versions before 8.5.0. +rules: + - name: network-profiling-slow-trace + layer: K8S_SERVICE + dsl: | + filter { + json{ + } + extractor{ + if (tag("LOG_KIND") == "NET_PROFILING_SAMPLED_TRACE") { + sampledTrace { + latency parsed.latency as Long + uri parsed.uri as String + reason parsed.reason as String + + if (parsed.client_process.process_id as String != "") { + processId parsed.client_process.process_id as String + } else if (parsed.client_process.local as Boolean) { + processId ProcessRegistry.generateVirtualLocalProcess(parsed.service as String, parsed.serviceInstance as String) as String + } else { + processId ProcessRegistry.generateVirtualRemoteProcess(parsed.service as String, parsed.serviceInstance as String, parsed.client_process.address as String) as String + } + + if (parsed.server_process.process_id as String != "") { + destProcessId parsed.server_process.process_id as String + } else if (parsed.server_process.local as Boolean) { + destProcessId ProcessRegistry.generateVirtualLocalProcess(parsed.service as String, parsed.serviceInstance as String) as String + } else { + destProcessId ProcessRegistry.generateVirtualRemoteProcess(parsed.service as String, parsed.serviceInstance as String, parsed.server_process.address as String) as String + } + + detectPoint parsed.detect_point as String + + if (parsed.component as String == "http" && parsed.ssl as Boolean) { + componentId 129 + } else if (parsed.component as String == "http") { + componentId 49 + } else if (parsed.ssl as Boolean) { + componentId 130 + } else { + componentId 110 + } + } + } + } + } \ No newline at end of file diff --git a/oap-server/server-starter/src/main/resources/lal/mesh-dp.yaml b/oap-server/server-starter/src/main/resources/lal/mesh-dp.yaml new file mode 100644 index 0000000000000000000000000000000000000000..e8271ef8a7088c532d16182b6f940a40b3b6c788 --- /dev/null +++ b/oap-server/server-starter/src/main/resources/lal/mesh-dp.yaml @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +rules: + - name: network-profiling-slow-trace + layer: MESH_DP + dsl: | + filter { + json{ + } + extractor{ + if (tag("LOG_KIND") == "NET_PROFILING_SAMPLED_TRACE") { + sampledTrace { + latency parsed.latency as Long + uri parsed.uri as String + reason parsed.reason as String + + if (parsed.client_process.process_id as String != "") { + processId parsed.client_process.process_id as String + } else if (parsed.client_process.local as Boolean) { + processId ProcessRegistry.generateVirtualLocalProcess(parsed.service as String, parsed.serviceInstance as String) as String + } else { + processId ProcessRegistry.generateVirtualRemoteProcess(parsed.service as String, parsed.serviceInstance as String, parsed.client_process.address as String) as String + } + + if (parsed.server_process.process_id as String != "") { + destProcessId parsed.server_process.process_id as String + } else if (parsed.server_process.local as Boolean) { + destProcessId ProcessRegistry.generateVirtualLocalProcess(parsed.service as String, parsed.serviceInstance as String) as String + } else { + destProcessId ProcessRegistry.generateVirtualRemoteProcess(parsed.service as String, parsed.serviceInstance as String, parsed.server_process.address as String) as String + } + + detectPoint parsed.detect_point as String + + if (parsed.component as String == "http" && parsed.ssl as Boolean) { + componentId 129 + } else if (parsed.component as String == "http") { + componentId 49 + } else if (parsed.ssl as Boolean) { + componentId 130 + } else { + componentId 110 + } + } + } + } + } \ No newline at end of file