提交 064595eb 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

Fixed the segment parse bug. Each use requires the creation of a new instance. (#1784)

上级 4e99e896
...@@ -67,12 +67,12 @@ public class TraceModuleProvider extends ModuleProvider { ...@@ -67,12 +67,12 @@ public class TraceModuleProvider extends ModuleProvider {
GRPCHandlerRegister grpcHandlerRegister = getManager().find(CoreModule.NAME).getService(GRPCHandlerRegister.class); GRPCHandlerRegister grpcHandlerRegister = getManager().find(CoreModule.NAME).getService(GRPCHandlerRegister.class);
JettyHandlerRegister jettyHandlerRegister = getManager().find(CoreModule.NAME).getService(JettyHandlerRegister.class); JettyHandlerRegister jettyHandlerRegister = getManager().find(CoreModule.NAME).getService(JettyHandlerRegister.class);
try { try {
SegmentParse segmentParse = new SegmentParse(getManager(), listenerManager); SegmentParse.Producer segmentProducer = new SegmentParse.Producer(getManager(), listenerManager);
grpcHandlerRegister.addHandler(new TraceSegmentServiceHandler(segmentParse)); grpcHandlerRegister.addHandler(new TraceSegmentServiceHandler(segmentProducer));
jettyHandlerRegister.addHandler(new TraceSegmentServletHandler(segmentParse)); jettyHandlerRegister.addHandler(new TraceSegmentServletHandler(segmentProducer));
SegmentStandardizationWorker standardizationWorker = new SegmentStandardizationWorker(segmentParse, moduleConfig.getBufferPath(), moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig.isBufferFileCleanWhenRestart()); SegmentStandardizationWorker standardizationWorker = new SegmentStandardizationWorker(segmentProducer, moduleConfig.getBufferPath(), moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig.isBufferFileCleanWhenRestart());
segmentParse.setStandardizationWorker(standardizationWorker); segmentProducer.setStandardizationWorker(standardizationWorker);
} catch (IOException e) { } catch (IOException e) {
throw new ModuleStartException(e.getMessage(), e); throw new ModuleStartException(e.getMessage(), e);
} }
......
...@@ -32,11 +32,11 @@ public class TraceSegmentServiceHandler extends TraceSegmentServiceGrpc.TraceSeg ...@@ -32,11 +32,11 @@ public class TraceSegmentServiceHandler extends TraceSegmentServiceGrpc.TraceSeg
private static final Logger logger = LoggerFactory.getLogger(TraceSegmentServiceHandler.class); private static final Logger logger = LoggerFactory.getLogger(TraceSegmentServiceHandler.class);
private final Boolean debug; private final Boolean debug;
private final SegmentParse segmentParse; private final SegmentParse.Producer segmentProducer;
public TraceSegmentServiceHandler(SegmentParse segmentParse) { public TraceSegmentServiceHandler(SegmentParse.Producer segmentProducer) {
this.debug = System.getProperty("debug") != null; this.debug = System.getProperty("debug") != null;
this.segmentParse = segmentParse; this.segmentProducer = segmentProducer;
} }
@Override public StreamObserver<UpstreamSegment> collect(StreamObserver<Downstream> responseObserver) { @Override public StreamObserver<UpstreamSegment> collect(StreamObserver<Downstream> responseObserver) {
...@@ -46,7 +46,7 @@ public class TraceSegmentServiceHandler extends TraceSegmentServiceGrpc.TraceSeg ...@@ -46,7 +46,7 @@ public class TraceSegmentServiceHandler extends TraceSegmentServiceGrpc.TraceSeg
logger.debug("receive segment"); logger.debug("receive segment");
} }
segmentParse.parse(segment, SegmentParse.Source.Agent); segmentProducer.send(segment, SegmentParse.Source.Agent);
if (debug) { if (debug) {
long count = SegmentCounter.INSTANCE.incrementAndGet(); long count = SegmentCounter.INSTANCE.incrementAndGet();
......
...@@ -34,10 +34,10 @@ public class TraceSegmentServletHandler extends JettyJsonHandler { ...@@ -34,10 +34,10 @@ public class TraceSegmentServletHandler extends JettyJsonHandler {
private static final Logger logger = LoggerFactory.getLogger(TraceSegmentServletHandler.class); private static final Logger logger = LoggerFactory.getLogger(TraceSegmentServletHandler.class);
private final SegmentParse segmentParse; private final SegmentParse.Producer segmentProducer;
public TraceSegmentServletHandler(SegmentParse segmentParse) { public TraceSegmentServletHandler(SegmentParse.Producer segmentProducer) {
this.segmentParse = segmentParse; this.segmentProducer = segmentProducer;
} }
@Override public String pathSpec() { @Override public String pathSpec() {
...@@ -71,7 +71,7 @@ public class TraceSegmentServletHandler extends JettyJsonHandler { ...@@ -71,7 +71,7 @@ public class TraceSegmentServletHandler extends JettyJsonHandler {
reader.beginArray(); reader.beginArray();
while (reader.hasNext()) { while (reader.hasNext()) {
TraceSegment traceSegment = jsonReader.read(reader); TraceSegment traceSegment = jsonReader.read(reader);
segmentParse.parse(traceSegment.getUpstreamSegment(), SegmentParse.Source.Agent); segmentProducer.send(traceSegment.getUpstreamSegment(), SegmentParse.Source.Agent);
} }
reader.endArray(); reader.endArray();
} }
......
...@@ -33,7 +33,7 @@ import org.slf4j.*; ...@@ -33,7 +33,7 @@ import org.slf4j.*;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment> { public class SegmentParse {
private static final Logger logger = LoggerFactory.getLogger(SegmentParse.class); private static final Logger logger = LoggerFactory.getLogger(SegmentParse.class);
...@@ -43,7 +43,7 @@ public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment> ...@@ -43,7 +43,7 @@ public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment>
private final SegmentCoreInfo segmentCoreInfo; private final SegmentCoreInfo segmentCoreInfo;
@Setter private SegmentStandardizationWorker standardizationWorker; @Setter private SegmentStandardizationWorker standardizationWorker;
public SegmentParse(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) { private SegmentParse(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
this.moduleManager = moduleManager; this.moduleManager = moduleManager;
this.listenerManager = listenerManager; this.listenerManager = listenerManager;
this.spanListeners = new LinkedList<>(); this.spanListeners = new LinkedList<>();
...@@ -52,10 +52,6 @@ public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment> ...@@ -52,10 +52,6 @@ public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment>
this.segmentCoreInfo.setEndTime(Long.MIN_VALUE); this.segmentCoreInfo.setEndTime(Long.MIN_VALUE);
} }
@Override public boolean call(UpstreamSegment segment) {
return parse(segment, Source.Buffer);
}
public boolean parse(UpstreamSegment segment, Source source) { public boolean parse(UpstreamSegment segment, Source source) {
createSpanListeners(); createSpanListeners();
...@@ -220,4 +216,28 @@ public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment> ...@@ -220,4 +216,28 @@ public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment>
public enum Source { public enum Source {
Agent, Buffer Agent, Buffer
} }
public static class Producer implements DataStreamReader.CallBack<UpstreamSegment> {
@Setter private SegmentStandardizationWorker standardizationWorker;
private final ModuleManager moduleManager;
private final SegmentParserListenerManager listenerManager;
public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager) {
this.moduleManager = moduleManager;
this.listenerManager = listenerManager;
}
public void send(UpstreamSegment segment, Source source) {
SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager);
segmentParse.setStandardizationWorker(standardizationWorker);
segmentParse.parse(segment, source);
}
@Override public boolean call(UpstreamSegment segment) {
SegmentParse segmentParse = new SegmentParse(moduleManager, listenerManager);
segmentParse.setStandardizationWorker(standardizationWorker);
return segmentParse.parse(segment, Source.Buffer);
}
}
} }
...@@ -84,7 +84,7 @@ public class SegmentSpanListener implements FirstSpanListener, EntrySpanListener ...@@ -84,7 +84,7 @@ public class SegmentSpanListener implements FirstSpanListener, EntrySpanListener
@Override public void build() { @Override public void build() {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("segment duration listener build"); logger.debug("segment listener build, segment id: {}", segment.getSegmentId());
} }
if (entryEndpointId == 0) { if (entryEndpointId == 0) {
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization; package org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.List;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.apm.network.language.agent.UpstreamSegment; import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
...@@ -37,7 +37,7 @@ public class SegmentStandardizationWorker extends AbstractWorker<SegmentStandard ...@@ -37,7 +37,7 @@ public class SegmentStandardizationWorker extends AbstractWorker<SegmentStandard
private final DataCarrier<SegmentStandardization> dataCarrier; private final DataCarrier<SegmentStandardization> dataCarrier;
public SegmentStandardizationWorker(SegmentParse segmentParse, String path, public SegmentStandardizationWorker(SegmentParse.Producer segmentParseCreator, String path,
int offsetFileMaxSize, int dataFileMaxSize, boolean cleanWhenRestart) throws IOException { int offsetFileMaxSize, int dataFileMaxSize, boolean cleanWhenRestart) throws IOException {
super(Integer.MAX_VALUE); super(Integer.MAX_VALUE);
...@@ -46,7 +46,7 @@ public class SegmentStandardizationWorker extends AbstractWorker<SegmentStandard ...@@ -46,7 +46,7 @@ public class SegmentStandardizationWorker extends AbstractWorker<SegmentStandard
builder.dataFileMaxSize(dataFileMaxSize); builder.dataFileMaxSize(dataFileMaxSize);
builder.offsetFileMaxSize(offsetFileMaxSize); builder.offsetFileMaxSize(offsetFileMaxSize);
builder.parser(UpstreamSegment.parser()); builder.parser(UpstreamSegment.parser());
builder.callBack(segmentParse); builder.callBack(segmentParseCreator);
BufferStream<UpstreamSegment> stream = builder.build(); BufferStream<UpstreamSegment> stream = builder.build();
stream.initialize(); stream.initialize();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册