提交 90253591 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

Fixed a mistake that inject v1 segment producer into v2 standardization worker. (#3090)

上级 a90ed129
......@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.receiver.trace.provider;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
/**
......@@ -30,10 +31,10 @@ public class DBLatencyThresholdsAndWatcher extends ConfigChangeWatcher {
private AtomicReference<Map<String, Integer>> thresholds;
private AtomicReference<String> settingsString;
DBLatencyThresholdsAndWatcher(String config, TraceModuleProvider provider) {
public DBLatencyThresholdsAndWatcher(String config, TraceModuleProvider provider) {
super(TraceModule.NAME, provider, "slowDBAccessThreshold");
thresholds = new AtomicReference(new HashMap<>());
settingsString = new AtomicReference<>("");
thresholds = new AtomicReference<>(new HashMap<>());
settingsString = new AtomicReference<>(Const.EMPTY_STRING);
activeSetting(config);
}
......
......@@ -66,25 +66,21 @@ public class TraceModuleProvider extends ModuleProvider {
moduleConfig.setDbLatencyThresholdsAndWatcher(thresholds);
SegmentParserListenerManager listenerManager = new SegmentParserListenerManager();
if (moduleConfig.isTraceAnalysis()) {
listenerManager.add(new MultiScopesSpanListener.Factory());
listenerManager.add(new ServiceMappingSpanListener.Factory());
}
listenerManager.add(new SegmentSpanListener.Factory(moduleConfig.getSampleRate()));
segmentProducer = new SegmentParse.Producer(getManager(), listenerManager(), moduleConfig);
segmentProducerV2 = new SegmentParseV2.Producer(getManager(), listenerManager(), moduleConfig);
segmentProducer = new SegmentParse.Producer(getManager(), listenerManager, moduleConfig);
this.registerServiceImplementation(ISegmentParserService.class, new SegmentParserServiceImpl(segmentProducerV2));
}
listenerManager = new SegmentParserListenerManager();
public SegmentParserListenerManager listenerManager() {
SegmentParserListenerManager listenerManager = new SegmentParserListenerManager();
if (moduleConfig.isTraceAnalysis()) {
listenerManager.add(new MultiScopesSpanListener.Factory());
listenerManager.add(new ServiceMappingSpanListener.Factory());
}
listenerManager.add(new SegmentSpanListener.Factory(moduleConfig.getSampleRate()));
segmentProducerV2 = new SegmentParseV2.Producer(getManager(), listenerManager, moduleConfig);
this.registerServiceImplementation(ISegmentParserService.class, new SegmentParserServiceImpl(segmentProducerV2));
return listenerManager;
}
@Override public void start() throws ModuleStartException {
......@@ -98,15 +94,11 @@ public class TraceModuleProvider extends ModuleProvider {
grpcHandlerRegister.addHandler(new TraceSegmentReportServiceHandler(segmentProducerV2, getManager()));
jettyHandlerRegister.addHandler(new TraceSegmentServletHandler(segmentProducer));
SegmentStandardizationWorker standardizationWorker = new SegmentStandardizationWorker(getManager(), segmentProducer,
moduleConfig.getBufferPath() + "v5", moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig.isBufferFileCleanWhenRestart(),
false);
SegmentStandardizationWorker standardizationWorker = new SegmentStandardizationWorker(getManager(), segmentProducer, moduleConfig.getBufferPath() + "v5", moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig.isBufferFileCleanWhenRestart(), false);
segmentProducer.setStandardizationWorker(standardizationWorker);
SegmentStandardizationWorker standardizationWorker2 = new SegmentStandardizationWorker(getManager(), segmentProducer,
moduleConfig.getBufferPath(), moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig.isBufferFileCleanWhenRestart(),
true);
segmentProducerV2.setStandardizationWorker(standardizationWorker2);
SegmentStandardizationWorker standardizationWorkerV2 = new SegmentStandardizationWorker(getManager(), segmentProducerV2, moduleConfig.getBufferPath(), moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig.isBufferFileCleanWhenRestart(), true);
segmentProducerV2.setStandardizationWorker(standardizationWorkerV2);
} catch (IOException e) {
throw new ModuleStartException(e.getMessage(), e);
}
......
......@@ -24,9 +24,8 @@ import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.buffer.BufferStream;
import org.apache.skywalking.oap.server.library.buffer.*;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParse;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.slf4j.*;
......@@ -42,7 +41,7 @@ public class SegmentStandardizationWorker extends AbstractWorker<SegmentStandard
private CounterMetrics traceBufferFileIn;
public SegmentStandardizationWorker(ModuleDefineHolder moduleDefineHolder,
SegmentParse.Producer segmentParseCreator, String path, int offsetFileMaxSize,
DataStreamReader.CallBack<UpstreamSegment> segmentParse, String path, int offsetFileMaxSize,
int dataFileMaxSize, boolean cleanWhenRestart, boolean isV6) throws IOException {
super(moduleDefineHolder);
......@@ -51,7 +50,7 @@ public class SegmentStandardizationWorker extends AbstractWorker<SegmentStandard
builder.dataFileMaxSize(dataFileMaxSize);
builder.offsetFileMaxSize(offsetFileMaxSize);
builder.parser(UpstreamSegment.parser());
builder.callBack(segmentParseCreator);
builder.callBack(segmentParse);
BufferStream<UpstreamSegment> stream = builder.build();
stream.initialize();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册