提交 065a384f 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

Trace buffer test success. (#1733)

上级 46c55ebd
......@@ -68,7 +68,7 @@ public class BufferStream<MESSAGE_TYPE extends GeneratedMessageV3> {
}
private void tryLock(File directory) {
logger.info("Try to lock buffer directory, directory is: " + absolutePath);
logger.info("Try to lock buffer directory, directory is: " + directory.getAbsolutePath());
FileLock lock = null;
try {
......@@ -78,10 +78,10 @@ public class BufferStream<MESSAGE_TYPE extends GeneratedMessageV3> {
}
if (lock == null) {
throw new RuntimeException("The buffer directory is reading or writing by another thread, directory is: " + absolutePath);
throw new RuntimeException("The buffer directory is reading or writing by another thread, directory is: " + directory.getAbsolutePath());
}
logger.info("Lock buffer directory successfully, directory is: " + absolutePath);
logger.info("Lock buffer directory successfully, directory is: " + directory.getAbsolutePath());
}
public static class Builder<MESSAGE_TYPE extends GeneratedMessageV3> {
......
......@@ -35,13 +35,11 @@ public class SegmentStandardizationWorker extends AbstractWorker<SegmentStandard
private static final Logger logger = LoggerFactory.getLogger(SegmentStandardizationWorker.class);
private final BufferStream<UpstreamSegment> stream;
private final DataCarrier<SegmentStandardization> dataCarrier;
public SegmentStandardizationWorker(SegmentParse segmentParse, String path,
int offsetFileMaxSize, int dataFileMaxSize, boolean cleanWhenRestart) throws IOException {
super(Integer.MAX_VALUE);
DataCarrier<SegmentStandardization> dataCarrier = new DataCarrier<>(1, 1024);
dataCarrier.consume(new Consumer(this), 1);
BufferStream.Builder<UpstreamSegment> builder = new BufferStream.Builder<>(path);
builder.cleanWhenRestart(cleanWhenRestart);
......@@ -50,21 +48,24 @@ public class SegmentStandardizationWorker extends AbstractWorker<SegmentStandard
builder.parser(UpstreamSegment.parser());
builder.callBack(segmentParse);
stream = builder.build();
BufferStream<UpstreamSegment> stream = builder.build();
stream.initialize();
dataCarrier = new DataCarrier<>(1, 1024);
dataCarrier.consume(new Consumer(stream), 1);
}
@Override
public void in(SegmentStandardization standardization) {
stream.write(standardization.getUpstreamSegment());
dataCarrier.produce(standardization);
}
private class Consumer implements IConsumer<SegmentStandardization> {
private final SegmentStandardizationWorker aggregator;
private final BufferStream<UpstreamSegment> stream;
private Consumer(SegmentStandardizationWorker aggregator) {
this.aggregator = aggregator;
private Consumer(BufferStream<UpstreamSegment> stream) {
this.stream = stream;
}
@Override
......@@ -73,16 +74,8 @@ public class SegmentStandardizationWorker extends AbstractWorker<SegmentStandard
@Override
public void consume(List<SegmentStandardization> data) {
Iterator<SegmentStandardization> inputIterator = data.iterator();
int i = 0;
while (inputIterator.hasNext()) {
SegmentStandardization indicator = inputIterator.next();
i++;
if (i == data.size()) {
indicator.getEndOfBatchContext().setEndOfBatch(true);
}
aggregator.in(indicator);
for (SegmentStandardization aData : data) {
stream.write(aData.getUpstreamSegment());
}
}
......
......@@ -52,6 +52,14 @@ public class AgentDataMock {
UniqueId.Builder providerSegmentId = UniqueIdBuilder.INSTANCE.create();
providerMock.mock(streamObserver, globalTraceId, providerSegmentId, consumerSegmentId, startTimestamp, true);
TimeUnit.SECONDS.sleep(10);
globalTraceId = UniqueIdBuilder.INSTANCE.create();
consumerSegmentId = UniqueIdBuilder.INSTANCE.create();
providerSegmentId = UniqueIdBuilder.INSTANCE.create();
consumerMock.mock(streamObserver, globalTraceId, consumerSegmentId, startTimestamp, false);
providerMock.mock(streamObserver, globalTraceId, providerSegmentId, consumerSegmentId, startTimestamp, false);
streamObserver.onCompleted();
while (!IS_COMPLETED) {
TimeUnit.MILLISECONDS.sleep(500);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册