diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java index 7929436cfabfabe80a0f935412bc1d113c4539db..f286a8315f073d62d63484caa451f334910fc8d0 100644 --- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java +++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java @@ -68,7 +68,7 @@ public class BufferStream { } private void tryLock(File directory) { - logger.info("Try to lock buffer directory, directory is: " + absolutePath); + logger.info("Try to lock buffer directory, directory is: " + directory.getAbsolutePath()); FileLock lock = null; try { @@ -78,10 +78,10 @@ public class BufferStream { } if (lock == null) { - throw new RuntimeException("The buffer directory is reading or writing by another thread, directory is: " + absolutePath); + throw new RuntimeException("The buffer directory is reading or writing by another thread, directory is: " + directory.getAbsolutePath()); } - logger.info("Lock buffer directory successfully, directory is: " + absolutePath); + logger.info("Lock buffer directory successfully, directory is: " + directory.getAbsolutePath()); } public static class Builder { diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java index d696a3615cc68b1f36108feb4761ae15401604a6..a30c7a46de3307efaec1cc1d0132a38e091bf1eb 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java @@ -35,13 +35,11 @@ public class SegmentStandardizationWorker extends AbstractWorker stream; + private final DataCarrier dataCarrier; public SegmentStandardizationWorker(SegmentParse segmentParse, String path, int offsetFileMaxSize, int dataFileMaxSize, boolean cleanWhenRestart) throws IOException { super(Integer.MAX_VALUE); - DataCarrier dataCarrier = new DataCarrier<>(1, 1024); - dataCarrier.consume(new Consumer(this), 1); BufferStream.Builder builder = new BufferStream.Builder<>(path); builder.cleanWhenRestart(cleanWhenRestart); @@ -50,21 +48,24 @@ public class SegmentStandardizationWorker extends AbstractWorker stream = builder.build(); stream.initialize(); + + dataCarrier = new DataCarrier<>(1, 1024); + dataCarrier.consume(new Consumer(stream), 1); } @Override public void in(SegmentStandardization standardization) { - stream.write(standardization.getUpstreamSegment()); + dataCarrier.produce(standardization); } private class Consumer implements IConsumer { - private final SegmentStandardizationWorker aggregator; + private final BufferStream stream; - private Consumer(SegmentStandardizationWorker aggregator) { - this.aggregator = aggregator; + private Consumer(BufferStream stream) { + this.stream = stream; } @Override @@ -73,16 +74,8 @@ public class SegmentStandardizationWorker extends AbstractWorker data) { - Iterator inputIterator = data.iterator(); - - int i = 0; - while (inputIterator.hasNext()) { - SegmentStandardization indicator = inputIterator.next(); - i++; - if (i == data.size()) { - indicator.getEndOfBatchContext().setEndOfBatch(true); - } - aggregator.in(indicator); + for (SegmentStandardization aData : data) { + stream.write(aData.getUpstreamSegment()); } } diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java index d19be0ee8a54816db371df9002491e82fc6b5a03..a96e56b507b33d9bb372bf4aa0cbcca4f28d5883 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java @@ -52,6 +52,14 @@ public class AgentDataMock { UniqueId.Builder providerSegmentId = UniqueIdBuilder.INSTANCE.create(); providerMock.mock(streamObserver, globalTraceId, providerSegmentId, consumerSegmentId, startTimestamp, true); + TimeUnit.SECONDS.sleep(10); + + globalTraceId = UniqueIdBuilder.INSTANCE.create(); + consumerSegmentId = UniqueIdBuilder.INSTANCE.create(); + providerSegmentId = UniqueIdBuilder.INSTANCE.create(); + consumerMock.mock(streamObserver, globalTraceId, consumerSegmentId, startTimestamp, false); + providerMock.mock(streamObserver, globalTraceId, providerSegmentId, consumerSegmentId, startTimestamp, false); + streamObserver.onCompleted(); while (!IS_COMPLETED) { TimeUnit.MILLISECONDS.sleep(500);