提交 5e3afe5e 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

Buffer library performance test and functional test successfully. (#1612)

* Buffer stream.

* Buffer file reader.

* Buffer library performance test and functional test successfully.

* Fixed the code merge mistake.
上级 b5378e75
...@@ -18,8 +18,7 @@ ...@@ -18,8 +18,7 @@
package org.apache.skywalking.oap.server.library.buffer; package org.apache.skywalking.oap.server.library.buffer;
import java.text.*; import java.util.Arrays;
import java.util.*;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
...@@ -34,19 +33,17 @@ class BufferFileUtils { ...@@ -34,19 +33,17 @@ class BufferFileUtils {
static final String OFFSET_FILE_PREFIX = "offset"; static final String OFFSET_FILE_PREFIX = "offset";
private static final String SEPARATOR = "-"; private static final String SEPARATOR = "-";
private static final String SUFFIX = ".sw"; private static final String SUFFIX = ".sw";
private static final String DATA_FORMAT_STR = "yyyyMMddHHmmss";
static void sort(String[] fileList) { static void sort(String[] fileList) {
Arrays.sort(fileList, (f1, f2) -> { Arrays.sort(fileList, (f1, f2) -> {
int fileId1 = Integer.parseInt(f1.split("_")[1]); long t1 = Long.parseLong(f1.substring(0, f1.length() - 3).split(SEPARATOR)[1]);
int fileId2 = Integer.parseInt(f2.split("_")[1]); long t2 = Long.parseLong(f2.substring(0, f2.length() - 3).split(SEPARATOR)[1]);
return fileId1 - fileId2; return (int)(t1 - t2);
}); });
} }
static String buildFileName(String prefix) { static String buildFileName(String prefix) {
DateFormat dateFormat = new SimpleDateFormat(DATA_FORMAT_STR); return prefix + SEPARATOR + System.currentTimeMillis() + SUFFIX;
return prefix + SEPARATOR + dateFormat.format(new Date()) + SUFFIX;
} }
} }
\ No newline at end of file
...@@ -38,7 +38,7 @@ class DataStream<MESSAGE_TYPE extends GeneratedMessageV3> { ...@@ -38,7 +38,7 @@ class DataStream<MESSAGE_TYPE extends GeneratedMessageV3> {
@Getter private final DataStreamWriter<MESSAGE_TYPE> writer; @Getter private final DataStreamWriter<MESSAGE_TYPE> writer;
private boolean initialized = false; private boolean initialized = false;
DataStream(File directory, int offsetFileMaxSize, int dataFileMaxSize, Parser<MESSAGE_TYPE> parser, DataStream(File directory, int dataFileMaxSize, int offsetFileMaxSize, Parser<MESSAGE_TYPE> parser,
DataStreamReader.CallBack<MESSAGE_TYPE> callBack) { DataStreamReader.CallBack<MESSAGE_TYPE> callBack) {
this.directory = directory; this.directory = directory;
this.offsetStream = new OffsetStream(directory, offsetFileMaxSize); this.offsetStream = new OffsetStream(directory, offsetFileMaxSize);
......
...@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.library.buffer; ...@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.library.buffer;
import com.google.protobuf.*; import com.google.protobuf.*;
import java.io.*; import java.io.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.PrefixFileFilter; import org.apache.commons.io.filefilter.PrefixFileFilter;
import org.apache.skywalking.apm.util.*; import org.apache.skywalking.apm.util.*;
import org.slf4j.*; import org.slf4j.*;
...@@ -36,6 +37,7 @@ class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> { ...@@ -36,6 +37,7 @@ class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
private final Offset.ReadOffset readOffset; private final Offset.ReadOffset readOffset;
private final Parser<MESSAGE_TYPE> parser; private final Parser<MESSAGE_TYPE> parser;
private final CallBack<MESSAGE_TYPE> callBack; private final CallBack<MESSAGE_TYPE> callBack;
private File readingFile;
private InputStream inputStream; private InputStream inputStream;
DataStreamReader(File directory, Offset.ReadOffset readOffset, Parser<MESSAGE_TYPE> parser, DataStreamReader(File directory, Offset.ReadOffset readOffset, Parser<MESSAGE_TYPE> parser,
...@@ -49,35 +51,40 @@ class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> { ...@@ -49,35 +51,40 @@ class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
void initialize() { void initialize() {
preRead(); preRead();
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate( Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(
new RunnableWithExceptionProtection(this::read, new RunnableWithExceptionProtection(this::read,
t -> logger.error("Segment buffer pre read failure.", t)), 3, 3, TimeUnit.SECONDS); t -> logger.error("Buffer data pre read failure.", t)), 3, 1, TimeUnit.SECONDS);
} }
private void preRead() { private void preRead() {
String fileName = readOffset.getFileName(); String fileName = readOffset.getFileName();
if (StringUtil.isEmpty(fileName)) { if (StringUtil.isEmpty(fileName)) {
openInputStream(readEarliestCreateDataFile()); openInputStream(readEarliestDataFile());
} else { } else {
File dataFile = new File(directory, fileName); File readingFile = new File(directory, fileName);
if (dataFile.exists()) { if (readingFile.exists()) {
openInputStream(dataFile); openInputStream(readingFile);
read(); try {
inputStream.skip(readOffset.getOffset());
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
} else { } else {
openInputStream(readEarliestCreateDataFile()); openInputStream(readEarliestDataFile());
} }
} }
} }
private void openInputStream(File readFile) { private void openInputStream(File readingFile) {
try { try {
inputStream = new FileInputStream(readFile); this.readingFile = readingFile;
inputStream = new FileInputStream(readingFile);
} catch (FileNotFoundException e) { } catch (FileNotFoundException e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
} }
} }
private File readEarliestCreateDataFile() { private File readEarliestDataFile() {
String[] fileNames = directory.list(new PrefixFileFilter(BufferFileUtils.DATA_FILE_PREFIX)); String[] fileNames = directory.list(new PrefixFileFilter(BufferFileUtils.DATA_FILE_PREFIX));
if (fileNames != null && fileNames.length > 0) { if (fileNames != null && fileNames.length > 0) {
...@@ -92,6 +99,13 @@ class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> { ...@@ -92,6 +99,13 @@ class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
private void read() { private void read() {
try { try {
if (readOffset.getOffset() == readingFile.length() && !readOffset.isCurrentWriteFile()) {
FileUtils.forceDelete(readingFile);
openInputStream(readEarliestDataFile());
}
while (readOffset.getOffset() < readingFile.length()) {
MESSAGE_TYPE messageType = parser.parseDelimitedFrom(inputStream); MESSAGE_TYPE messageType = parser.parseDelimitedFrom(inputStream);
if (messageType != null) { if (messageType != null) {
callBack.call(messageType); callBack.call(messageType);
...@@ -99,6 +113,7 @@ class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> { ...@@ -99,6 +113,7 @@ class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
final int offset = CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized; final int offset = CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized;
readOffset.setOffset(readOffset.getOffset() + offset); readOffset.setOffset(readOffset.getOffset() + offset);
} }
}
} catch (IOException e) { } catch (IOException e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
} }
......
...@@ -49,46 +49,46 @@ class DataStreamWriter<MESSAGE_TYPE extends GeneratedMessageV3> { ...@@ -49,46 +49,46 @@ class DataStreamWriter<MESSAGE_TYPE extends GeneratedMessageV3> {
if (!initialized) { if (!initialized) {
String writeFileName = writeOffset.getFileName(); String writeFileName = writeOffset.getFileName();
File dataFile; File writingFile;
if (StringUtil.isEmpty(writeFileName)) { if (StringUtil.isEmpty(writeFileName)) {
dataFile = createNewFile(); writingFile = createNewFile();
} else { } else {
dataFile = new File(directory, writeFileName); writingFile = new File(directory, writeFileName);
if (!dataFile.exists()) { if (!writingFile.exists()) {
dataFile = createNewFile(); writingFile = createNewFile();
} }
} }
outputStream = FileUtils.openOutputStream(dataFile, true); outputStream = FileUtils.openOutputStream(writingFile, true);
initialized = true; initialized = true;
} }
} }
private File createNewFile() throws IOException { private File createNewFile() throws IOException {
String fileName = BufferFileUtils.buildFileName(BufferFileUtils.DATA_FILE_PREFIX); String fileName = BufferFileUtils.buildFileName(BufferFileUtils.DATA_FILE_PREFIX);
File dataFile = new File(directory, fileName); File writingFile = new File(directory, fileName);
boolean created = dataFile.createNewFile(); boolean created = writingFile.createNewFile();
if (!created) { if (!created) {
logger.info("The file named {} already exists.", dataFile.getAbsolutePath()); logger.info("The file named {} already exists.", writingFile.getAbsolutePath());
} else { } else {
logger.info("Create a new buffer data file: {}", dataFile.getAbsolutePath()); logger.info("Create a new buffer data file: {}", writingFile.getAbsolutePath());
} }
writeOffset.setOffset(0); writeOffset.setOffset(0);
writeOffset.setFileName(dataFile.getName()); writeOffset.setFileName(writingFile.getName());
return dataFile; return writingFile;
} }
void write(AbstractMessageLite messageLite) { synchronized void write(AbstractMessageLite messageLite) {
try { try {
messageLite.writeDelimitedTo(outputStream); messageLite.writeDelimitedTo(outputStream);
long position = outputStream.getChannel().position(); long position = outputStream.getChannel().position();
writeOffset.setOffset(position); writeOffset.setOffset(position);
if (position > (FileUtils.ONE_MB * dataFileMaxSize)) { if (position >= (FileUtils.ONE_MB * dataFileMaxSize)) {
File dataFile = createNewFile(); File writingFile = createNewFile();
outputStream = FileUtils.openOutputStream(dataFile, true); outputStream = FileUtils.openOutputStream(writingFile, true);
} }
} catch (IOException e) { } catch (IOException e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
......
...@@ -31,8 +31,8 @@ class Offset { ...@@ -31,8 +31,8 @@ class Offset {
@Getter private final WriteOffset writeOffset; @Getter private final WriteOffset writeOffset;
Offset() { Offset() {
readOffset = new ReadOffset();
writeOffset = new WriteOffset(); writeOffset = new WriteOffset();
readOffset = new ReadOffset(writeOffset);
} }
String serialize() { String serialize() {
...@@ -55,6 +55,15 @@ class Offset { ...@@ -55,6 +55,15 @@ class Offset {
static class ReadOffset { static class ReadOffset {
@Getter @Setter private String fileName; @Getter @Setter private String fileName;
@Getter @Setter private long offset = 0; @Getter @Setter private long offset = 0;
private final WriteOffset writeOffset;
private ReadOffset(WriteOffset writeOffset) {
this.writeOffset = writeOffset;
}
boolean isCurrentWriteFile() {
return fileName.equals(writeOffset.fileName);
}
} }
static class WriteOffset { static class WriteOffset {
......
...@@ -66,8 +66,8 @@ class OffsetStream { ...@@ -66,8 +66,8 @@ class OffsetStream {
if (!initialized) { if (!initialized) {
String[] fileNames = directory.list(new PrefixFileFilter(BufferFileUtils.OFFSET_FILE_PREFIX)); String[] fileNames = directory.list(new PrefixFileFilter(BufferFileUtils.OFFSET_FILE_PREFIX));
if (fileNames != null && fileNames.length > 0) { if (fileNames != null && fileNames.length > 0) {
for (int i = 0; i < fileNames.length; i++) { BufferFileUtils.sort(fileNames);
} offsetFile = new File(directory, fileNames[0]);
} else { } else {
offsetFile = newFile(); offsetFile = newFile();
} }
......
...@@ -33,9 +33,9 @@ public class BufferStreamTestCase { ...@@ -33,9 +33,9 @@ public class BufferStreamTestCase {
public static void main(String[] args) throws IOException, InterruptedException { public static void main(String[] args) throws IOException, InterruptedException {
String directory = "/Users/pengys5/code/sky-walking/buffer-test"; String directory = "/Users/pengys5/code/sky-walking/buffer-test";
BufferStream.Builder<TraceSegmentObject> builder = new BufferStream.Builder<>(directory); BufferStream.Builder<TraceSegmentObject> builder = new BufferStream.Builder<>(directory);
builder.cleanWhenRestart(true); // builder.cleanWhenRestart(true);
builder.dataFileMaxSize(1); builder.dataFileMaxSize(50);
builder.offsetFileMaxSize(1); builder.offsetFileMaxSize(10);
builder.parser(TraceSegmentObject.parser()); builder.parser(TraceSegmentObject.parser());
builder.callBack(new SegmentParse()); builder.callBack(new SegmentParse());
...@@ -44,18 +44,23 @@ public class BufferStreamTestCase { ...@@ -44,18 +44,23 @@ public class BufferStreamTestCase {
TimeUnit.SECONDS.sleep(5); TimeUnit.SECONDS.sleep(5);
String str = "2018-08-27 11:59:45,261 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28" + StringBuilder str = new StringBuilder("2018-08-27 11:59:45,261 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28");
"main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28" + for (int i = 0; i < 1000; i++) {
"main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28" + str.append("main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28");
"main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28"; }
for (int i = 0; i < 100; i++) { for (int i = 0; i < 20000; i++) {
TraceSegmentObject.Builder segment = TraceSegmentObject.newBuilder(); TraceSegmentObject.Builder segment = TraceSegmentObject.newBuilder();
SpanObject.Builder span = SpanObject.newBuilder(); SpanObject.Builder span = SpanObject.newBuilder();
span.setOperationName(String.valueOf(i) + " " + str); span.setSpanId(i);
span.setOperationName(str.toString());
segment.addSpans(span); segment.addSpans(span);
stream.write(segment.build()); stream.write(segment.build());
if (i % 1000 == 0) {
TimeUnit.MILLISECONDS.sleep(50);
}
} }
} }
...@@ -63,7 +68,7 @@ public class BufferStreamTestCase { ...@@ -63,7 +68,7 @@ public class BufferStreamTestCase {
private static class SegmentParse implements DataStreamReader.CallBack<TraceSegmentObject> { private static class SegmentParse implements DataStreamReader.CallBack<TraceSegmentObject> {
@Override public void call(TraceSegmentObject message) { @Override public void call(TraceSegmentObject message) {
logger.info("segment parse: {}", message.getSpans(0).getOperationName()); logger.info("segment parse: {}", message.getSpans(0).getSpanId());
} }
} }
} }
Subproject commit ad3ee45dbadfae35d77238bdd7a1df593158f109 Subproject commit f9c602936ab4f386576bf16f203efac61962e424
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册