diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/DBLatencyThresholdsAndWatcher.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/DBLatencyThresholdsAndWatcher.java index a2b2dc3f9b39911827dd44c813cdcebd853b3009..458af2c08a6366ecd69cbad9dacbf7a323ee7cf6 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/DBLatencyThresholdsAndWatcher.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/DBLatencyThresholdsAndWatcher.java @@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.receiver.trace.provider; import java.util.*; import java.util.concurrent.atomic.AtomicReference; import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher; +import org.apache.skywalking.oap.server.core.Const; import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule; /** @@ -30,10 +31,10 @@ public class DBLatencyThresholdsAndWatcher extends ConfigChangeWatcher { private AtomicReference> thresholds; private AtomicReference settingsString; - DBLatencyThresholdsAndWatcher(String config, TraceModuleProvider provider) { + public DBLatencyThresholdsAndWatcher(String config, TraceModuleProvider provider) { super(TraceModule.NAME, provider, "slowDBAccessThreshold"); - thresholds = new AtomicReference(new HashMap<>()); - settingsString = new AtomicReference<>(""); + thresholds = new AtomicReference<>(new HashMap<>()); + settingsString = new AtomicReference<>(Const.EMPTY_STRING); activeSetting(config); } diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java index c54840836eca2a59a8e6df873eb28b1eb36d644a..a1502e3d9f99e4b9f883ad55def4ac5f6ac361d5 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java @@ -66,25 +66,21 @@ public class TraceModuleProvider extends ModuleProvider { moduleConfig.setDbLatencyThresholdsAndWatcher(thresholds); - SegmentParserListenerManager listenerManager = new SegmentParserListenerManager(); - if (moduleConfig.isTraceAnalysis()) { - listenerManager.add(new MultiScopesSpanListener.Factory()); - listenerManager.add(new ServiceMappingSpanListener.Factory()); - } - listenerManager.add(new SegmentSpanListener.Factory(moduleConfig.getSampleRate())); + segmentProducer = new SegmentParse.Producer(getManager(), listenerManager(), moduleConfig); + segmentProducerV2 = new SegmentParseV2.Producer(getManager(), listenerManager(), moduleConfig); - segmentProducer = new SegmentParse.Producer(getManager(), listenerManager, moduleConfig); + this.registerServiceImplementation(ISegmentParserService.class, new SegmentParserServiceImpl(segmentProducerV2)); + } - listenerManager = new SegmentParserListenerManager(); + public SegmentParserListenerManager listenerManager() { + SegmentParserListenerManager listenerManager = new SegmentParserListenerManager(); if (moduleConfig.isTraceAnalysis()) { listenerManager.add(new MultiScopesSpanListener.Factory()); listenerManager.add(new ServiceMappingSpanListener.Factory()); } listenerManager.add(new SegmentSpanListener.Factory(moduleConfig.getSampleRate())); - segmentProducerV2 = new SegmentParseV2.Producer(getManager(), listenerManager, moduleConfig); - - this.registerServiceImplementation(ISegmentParserService.class, new SegmentParserServiceImpl(segmentProducerV2)); + return listenerManager; } @Override public void start() throws ModuleStartException { @@ -98,15 +94,11 @@ public class TraceModuleProvider extends ModuleProvider { grpcHandlerRegister.addHandler(new TraceSegmentReportServiceHandler(segmentProducerV2, getManager())); jettyHandlerRegister.addHandler(new TraceSegmentServletHandler(segmentProducer)); - SegmentStandardizationWorker standardizationWorker = new SegmentStandardizationWorker(getManager(), segmentProducer, - moduleConfig.getBufferPath() + "v5", moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig.isBufferFileCleanWhenRestart(), - false); + SegmentStandardizationWorker standardizationWorker = new SegmentStandardizationWorker(getManager(), segmentProducer, moduleConfig.getBufferPath() + "v5", moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig.isBufferFileCleanWhenRestart(), false); segmentProducer.setStandardizationWorker(standardizationWorker); - SegmentStandardizationWorker standardizationWorker2 = new SegmentStandardizationWorker(getManager(), segmentProducer, - moduleConfig.getBufferPath(), moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig.isBufferFileCleanWhenRestart(), - true); - segmentProducerV2.setStandardizationWorker(standardizationWorker2); + SegmentStandardizationWorker standardizationWorkerV2 = new SegmentStandardizationWorker(getManager(), segmentProducerV2, moduleConfig.getBufferPath(), moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig.isBufferFileCleanWhenRestart(), true); + segmentProducerV2.setStandardizationWorker(standardizationWorkerV2); } catch (IOException e) { throw new ModuleStartException(e.getMessage(), e); } diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java index 5e363b009980b4f341ca4d3f94ca6e45b9759dd0..92fc4a0b9e08bd8ef523ca61e1bdaed7932116ad 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java @@ -24,9 +24,8 @@ import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; import org.apache.skywalking.apm.network.language.agent.UpstreamSegment; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; -import org.apache.skywalking.oap.server.library.buffer.BufferStream; +import org.apache.skywalking.oap.server.library.buffer.*; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; -import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParse; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.*; import org.slf4j.*; @@ -42,7 +41,7 @@ public class SegmentStandardizationWorker extends AbstractWorker segmentParse, String path, int offsetFileMaxSize, int dataFileMaxSize, boolean cleanWhenRestart, boolean isV6) throws IOException { super(moduleDefineHolder); @@ -51,7 +50,7 @@ public class SegmentStandardizationWorker extends AbstractWorker stream = builder.build(); stream.initialize();