From 065a384f1034795a58ad39c5a9fd2e416f89adb7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BD=AD=E5=8B=87=E5=8D=87=20pengys?= <8082209@qq.com> Date: Mon, 8 Oct 2018 23:17:15 +0800 Subject: [PATCH] Trace buffer test success. (#1733) --- .../server/library/buffer/BufferStream.java | 6 ++-- .../SegmentStandardizationWorker.java | 29 +++++++------------ .../receiver/trace/mock/AgentDataMock.java | 8 +++++ 3 files changed, 22 insertions(+), 21 deletions(-) diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java index 7929436cf..f286a8315 100644 --- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java +++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java @@ -68,7 +68,7 @@ public class BufferStream { } private void tryLock(File directory) { - logger.info("Try to lock buffer directory, directory is: " + absolutePath); + logger.info("Try to lock buffer directory, directory is: " + directory.getAbsolutePath()); FileLock lock = null; try { @@ -78,10 +78,10 @@ public class BufferStream { } if (lock == null) { - throw new RuntimeException("The buffer directory is reading or writing by another thread, directory is: " + absolutePath); + throw new RuntimeException("The buffer directory is reading or writing by another thread, directory is: " + directory.getAbsolutePath()); } - logger.info("Lock buffer directory successfully, directory is: " + absolutePath); + logger.info("Lock buffer directory successfully, directory is: " + directory.getAbsolutePath()); } public static class Builder { diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java index d696a3615..a30c7a46d 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardizationWorker.java @@ -35,13 +35,11 @@ public class SegmentStandardizationWorker extends AbstractWorker stream; + private final DataCarrier dataCarrier; public SegmentStandardizationWorker(SegmentParse segmentParse, String path, int offsetFileMaxSize, int dataFileMaxSize, boolean cleanWhenRestart) throws IOException { super(Integer.MAX_VALUE); - DataCarrier dataCarrier = new DataCarrier<>(1, 1024); - dataCarrier.consume(new Consumer(this), 1); BufferStream.Builder builder = new BufferStream.Builder<>(path); builder.cleanWhenRestart(cleanWhenRestart); @@ -50,21 +48,24 @@ public class SegmentStandardizationWorker extends AbstractWorker stream = builder.build(); stream.initialize(); + + dataCarrier = new DataCarrier<>(1, 1024); + dataCarrier.consume(new Consumer(stream), 1); } @Override public void in(SegmentStandardization standardization) { - stream.write(standardization.getUpstreamSegment()); + dataCarrier.produce(standardization); } private class Consumer implements IConsumer { - private final SegmentStandardizationWorker aggregator; + private final BufferStream stream; - private Consumer(SegmentStandardizationWorker aggregator) { - this.aggregator = aggregator; + private Consumer(BufferStream stream) { + this.stream = stream; } @Override @@ -73,16 +74,8 @@ public class SegmentStandardizationWorker extends AbstractWorker data) { - Iterator inputIterator = data.iterator(); - - int i = 0; - while (inputIterator.hasNext()) { - SegmentStandardization indicator = inputIterator.next(); - i++; - if (i == data.size()) { - indicator.getEndOfBatchContext().setEndOfBatch(true); - } - aggregator.in(indicator); + for (SegmentStandardization aData : data) { + stream.write(aData.getUpstreamSegment()); } } diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java index d19be0ee8..a96e56b50 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/trace/mock/AgentDataMock.java @@ -52,6 +52,14 @@ public class AgentDataMock { UniqueId.Builder providerSegmentId = UniqueIdBuilder.INSTANCE.create(); providerMock.mock(streamObserver, globalTraceId, providerSegmentId, consumerSegmentId, startTimestamp, true); + TimeUnit.SECONDS.sleep(10); + + globalTraceId = UniqueIdBuilder.INSTANCE.create(); + consumerSegmentId = UniqueIdBuilder.INSTANCE.create(); + providerSegmentId = UniqueIdBuilder.INSTANCE.create(); + consumerMock.mock(streamObserver, globalTraceId, consumerSegmentId, startTimestamp, false); + providerMock.mock(streamObserver, globalTraceId, providerSegmentId, consumerSegmentId, startTimestamp, false); + streamObserver.onCompleted(); while (!IS_COMPLETED) { TimeUnit.MILLISECONDS.sleep(500); -- GitLab