From ee46760dc6b8aedd08a16760c4b77d0ee398e3ee Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=BD=AD=E5=8B=87=E5=8D=87=20pengys?= <8082209@qq.com>
Date: Thu, 30 Aug 2018 12:55:05 +0800
Subject: [PATCH] Buffer file implementation (#1607)
* Buffer stream.
* Buffer file reader.
---
.../skywalking/apm/util/StringUtil.java | 1 -
oap-server/pom.xml | 6 +
.../server-library/library-buffer/pom.xml | 44 ++++++
.../library/buffer/BufferFileUtils.java | 52 +++++++
.../server/library/buffer/BufferStream.java | 129 ++++++++++++++++++
.../oap/server/library/buffer/DataStream.java | 72 ++++++++++
.../library/buffer/DataStreamReader.java | 110 +++++++++++++++
.../library/buffer/DataStreamWriter.java | 97 +++++++++++++
.../oap/server/library/buffer/Offset.java | 64 +++++++++
.../server/library/buffer/OffsetStream.java | 126 +++++++++++++++++
.../library/buffer/BufferStreamTestCase.java | 69 ++++++++++
.../src/test/resources/log4j2.xml | 31 +++++
.../oap/server/library/util/FileUtils.java | 84 ++++++++++++
oap-server/server-library/pom.xml | 8 ++
oap-server/server-starter/pom.xml | 5 +
15 files changed, 897 insertions(+), 1 deletion(-)
create mode 100644 oap-server/server-library/library-buffer/pom.xml
create mode 100644 oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtils.java
create mode 100644 oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java
create mode 100644 oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStream.java
create mode 100644 oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java
create mode 100644 oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamWriter.java
create mode 100644 oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/Offset.java
create mode 100644 oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/OffsetStream.java
create mode 100644 oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java
create mode 100644 oap-server/server-library/library-buffer/src/test/resources/log4j2.xml
create mode 100644 oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/FileUtils.java
diff --git a/apm-commons/apm-util/src/main/java/org/apache/skywalking/apm/util/StringUtil.java b/apm-commons/apm-util/src/main/java/org/apache/skywalking/apm/util/StringUtil.java
index e1ca378309..4c27745e47 100644
--- a/apm-commons/apm-util/src/main/java/org/apache/skywalking/apm/util/StringUtil.java
+++ b/apm-commons/apm-util/src/main/java/org/apache/skywalking/apm/util/StringUtil.java
@@ -16,7 +16,6 @@
*
*/
-
package org.apache.skywalking.apm.util;
public final class StringUtil {
diff --git a/oap-server/pom.xml b/oap-server/pom.xml
index e77986b668..c2995ce93b 100644
--- a/oap-server/pom.xml
+++ b/oap-server/pom.xml
@@ -56,6 +56,7 @@
1.4.196
2.0.3
1.4
+ 2.6
6.3.2
2.9.9
2.0.0
@@ -250,6 +251,11 @@
commons-dbcp
${commons-dbcp.version}
+
+ commons-io
+ commons-io
+ ${commons-io.version}
+
io.kubernetes
client-java
diff --git a/oap-server/server-library/library-buffer/pom.xml b/oap-server/server-library/library-buffer/pom.xml
new file mode 100644
index 0000000000..8a87b1e41d
--- /dev/null
+++ b/oap-server/server-library/library-buffer/pom.xml
@@ -0,0 +1,44 @@
+
+
+
+
+
+ server-library
+ org.apache.skywalking
+ 6.0.0-alpha-SNAPSHOT
+
+ 4.0.0
+
+ library-buffer
+ jar
+
+
+
+ commons-io
+ commons-io
+
+
+ org.apache.skywalking
+ apm-network
+ ${project.version}
+
+
+
\ No newline at end of file
diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtils.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtils.java
new file mode 100644
index 0000000000..98570050e3
--- /dev/null
+++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferFileUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.buffer;
+
+import java.text.*;
+import java.util.*;
+
+/**
+ * @author peng-yongsheng
+ */
+class BufferFileUtils {
+
+ private BufferFileUtils() {
+ }
+
+ static final String CHARSET = "UTF-8";
+ static final String DATA_FILE_PREFIX = "data";
+ static final String OFFSET_FILE_PREFIX = "offset";
+ private static final String SEPARATOR = "-";
+ private static final String SUFFIX = ".sw";
+ private static final String DATA_FORMAT_STR = "yyyyMMddHHmmss";
+
+ static void sort(String[] fileList) {
+ Arrays.sort(fileList, (f1, f2) -> {
+ int fileId1 = Integer.parseInt(f1.split("_")[1]);
+ int fileId2 = Integer.parseInt(f2.split("_")[1]);
+
+ return fileId1 - fileId2;
+ });
+ }
+
+ static String buildFileName(String prefix) {
+ DateFormat dateFormat = new SimpleDateFormat(DATA_FORMAT_STR);
+ return prefix + SEPARATOR + dateFormat.format(new Date()) + SUFFIX;
+ }
+}
diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java
new file mode 100644
index 0000000000..7929436cfa
--- /dev/null
+++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/BufferStream.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.buffer;
+
+import com.google.protobuf.*;
+import java.io.*;
+import java.nio.channels.FileLock;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class BufferStream {
+
+ private static final Logger logger = LoggerFactory.getLogger(BufferStream.class);
+
+ private final String absolutePath;
+ private final boolean cleanWhenRestart;
+ private final int dataFileMaxSize;
+ private final int offsetFileMaxSize;
+ private final Parser parser;
+ private final DataStreamReader.CallBack callBack;
+ private DataStream dataStream;
+
+ private BufferStream(String absolutePath, boolean cleanWhenRestart, int dataFileMaxSize, int offsetFileMaxSize,
+ Parser parser, DataStreamReader.CallBack callBack) {
+ this.absolutePath = absolutePath;
+ this.cleanWhenRestart = cleanWhenRestart;
+ this.dataFileMaxSize = dataFileMaxSize;
+ this.offsetFileMaxSize = offsetFileMaxSize;
+ this.parser = parser;
+ this.callBack = callBack;
+ }
+
+ public synchronized void initialize() throws IOException {
+ File directory = new File(absolutePath);
+ FileUtils.forceMkdir(directory);
+ tryLock(directory);
+
+ dataStream = new DataStream<>(directory, dataFileMaxSize, offsetFileMaxSize, parser, callBack);
+
+ if (cleanWhenRestart) {
+ dataStream.clean();
+ }
+
+ dataStream.initialize();
+ }
+
+ public synchronized void write(AbstractMessageLite messageLite) {
+ dataStream.getWriter().write(messageLite);
+ }
+
+ private void tryLock(File directory) {
+ logger.info("Try to lock buffer directory, directory is: " + absolutePath);
+ FileLock lock = null;
+
+ try {
+ lock = new FileOutputStream(new File(directory, "lock")).getChannel().tryLock();
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
+ }
+
+ if (lock == null) {
+ throw new RuntimeException("The buffer directory is reading or writing by another thread, directory is: " + absolutePath);
+ }
+
+ logger.info("Lock buffer directory successfully, directory is: " + absolutePath);
+ }
+
+ public static class Builder {
+
+ private final String absolutePath;
+ private boolean cleanWhenRestart;
+ private int dataFileMaxSize;
+ private int offsetFileMaxSize;
+ private Parser parser;
+ private DataStreamReader.CallBack callBack;
+
+ public Builder(String absolutePath) {
+ this.absolutePath = absolutePath;
+ }
+
+ public BufferStream build() {
+ return new BufferStream<>(absolutePath, cleanWhenRestart, dataFileMaxSize, offsetFileMaxSize, parser, callBack);
+ }
+
+ public Builder cleanWhenRestart(boolean cleanWhenRestart) {
+ this.cleanWhenRestart = cleanWhenRestart;
+ return this;
+ }
+
+ public Builder offsetFileMaxSize(int offsetFileMaxSize) {
+ this.offsetFileMaxSize = offsetFileMaxSize;
+ return this;
+ }
+
+ public Builder dataFileMaxSize(int dataFileMaxSize) {
+ this.dataFileMaxSize = dataFileMaxSize;
+ return this;
+ }
+
+ public Builder parser(Parser parser) {
+ this.parser = parser;
+ return this;
+ }
+
+ public Builder callBack(DataStreamReader.CallBack callBack) {
+ this.callBack = callBack;
+ return this;
+ }
+ }
+}
diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStream.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStream.java
new file mode 100644
index 0000000000..1bb380d660
--- /dev/null
+++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStream.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.buffer;
+
+import com.google.protobuf.*;
+import java.io.*;
+import lombok.Getter;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.PrefixFileFilter;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+class DataStream {
+
+ private static final Logger logger = LoggerFactory.getLogger(DataStream.class);
+
+ private final File directory;
+ private final OffsetStream offsetStream;
+ @Getter private final DataStreamReader reader;
+ @Getter private final DataStreamWriter writer;
+ private boolean initialized = false;
+
+ DataStream(File directory, int offsetFileMaxSize, int dataFileMaxSize, Parser parser,
+ DataStreamReader.CallBack callBack) {
+ this.directory = directory;
+ this.offsetStream = new OffsetStream(directory, offsetFileMaxSize);
+ this.writer = new DataStreamWriter<>(directory, offsetStream.getOffset().getWriteOffset(), dataFileMaxSize);
+ this.reader = new DataStreamReader<>(directory, offsetStream.getOffset().getReadOffset(), parser, callBack);
+ }
+
+ void clean() throws IOException {
+ String[] fileNames = directory.list(new PrefixFileFilter(BufferFileUtils.DATA_FILE_PREFIX));
+ if (fileNames != null) {
+ for (String fileName : fileNames) {
+ File file = new File(directory, fileName);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Delete buffer data file: {}", file.getAbsolutePath());
+ }
+ FileUtils.forceDelete(file);
+ }
+ }
+
+ offsetStream.clean();
+ }
+
+ synchronized void initialize() throws IOException {
+ if (!initialized) {
+ offsetStream.initialize();
+ writer.initialize();
+ reader.initialize();
+ initialized = true;
+ }
+ }
+}
diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java
new file mode 100644
index 0000000000..16b0cd9f9d
--- /dev/null
+++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.buffer;
+
+import com.google.protobuf.*;
+import java.io.*;
+import java.util.concurrent.*;
+import org.apache.commons.io.filefilter.PrefixFileFilter;
+import org.apache.skywalking.apm.util.*;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+class DataStreamReader {
+
+ private static final Logger logger = LoggerFactory.getLogger(DataStreamReader.class);
+
+ private final File directory;
+ private final Offset.ReadOffset readOffset;
+ private final Parser parser;
+ private final CallBack callBack;
+ private InputStream inputStream;
+
+ DataStreamReader(File directory, Offset.ReadOffset readOffset, Parser parser,
+ CallBack callBack) {
+ this.directory = directory;
+ this.readOffset = readOffset;
+ this.parser = parser;
+ this.callBack = callBack;
+ }
+
+ void initialize() {
+ preRead();
+
+ Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
+ new RunnableWithExceptionProtection(this::read,
+ t -> logger.error("Segment buffer pre read failure.", t)), 3, 3, TimeUnit.SECONDS);
+ }
+
+ private void preRead() {
+ String fileName = readOffset.getFileName();
+ if (StringUtil.isEmpty(fileName)) {
+ openInputStream(readEarliestCreateDataFile());
+ } else {
+ File dataFile = new File(directory, fileName);
+ if (dataFile.exists()) {
+ openInputStream(dataFile);
+ read();
+ } else {
+ openInputStream(readEarliestCreateDataFile());
+ }
+ }
+ }
+
+ private void openInputStream(File readFile) {
+ try {
+ inputStream = new FileInputStream(readFile);
+ } catch (FileNotFoundException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+
+ private File readEarliestCreateDataFile() {
+ String[] fileNames = directory.list(new PrefixFileFilter(BufferFileUtils.DATA_FILE_PREFIX));
+
+ if (fileNames != null && fileNames.length > 0) {
+ BufferFileUtils.sort(fileNames);
+ readOffset.setFileName(fileNames[0]);
+ readOffset.setOffset(0);
+ return new File(directory, fileNames[0]);
+ } else {
+ return null;
+ }
+ }
+
+ private void read() {
+ try {
+ MESSAGE_TYPE messageType = parser.parseDelimitedFrom(inputStream);
+ if (messageType != null) {
+ callBack.call(messageType);
+ final int serialized = messageType.getSerializedSize();
+ final int offset = CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized;
+ readOffset.setOffset(readOffset.getOffset() + offset);
+ }
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+
+ interface CallBack {
+ void call(MESSAGE_TYPE message);
+ }
+}
diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamWriter.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamWriter.java
new file mode 100644
index 0000000000..13a88b1a26
--- /dev/null
+++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamWriter.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.buffer;
+
+import com.google.protobuf.*;
+import java.io.*;
+import org.apache.commons.io.FileUtils;
+import org.apache.skywalking.apm.util.StringUtil;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+class DataStreamWriter {
+
+ private static final Logger logger = LoggerFactory.getLogger(DataStreamWriter.class);
+
+ private final File directory;
+ private final Offset.WriteOffset writeOffset;
+
+ private final int dataFileMaxSize;
+
+ private boolean initialized = false;
+ private FileOutputStream outputStream;
+
+ DataStreamWriter(File directory, Offset.WriteOffset writeOffset, int dataFileMaxSize) {
+ this.directory = directory;
+ this.dataFileMaxSize = dataFileMaxSize;
+ this.writeOffset = writeOffset;
+ }
+
+ synchronized void initialize() throws IOException {
+ if (!initialized) {
+ String writeFileName = writeOffset.getFileName();
+
+ File dataFile;
+ if (StringUtil.isEmpty(writeFileName)) {
+ dataFile = createNewFile();
+ } else {
+ dataFile = new File(directory, writeFileName);
+ if (!dataFile.exists()) {
+ dataFile = createNewFile();
+ }
+ }
+
+ outputStream = FileUtils.openOutputStream(dataFile, true);
+ initialized = true;
+ }
+ }
+
+ private File createNewFile() throws IOException {
+ String fileName = BufferFileUtils.buildFileName(BufferFileUtils.DATA_FILE_PREFIX);
+ File dataFile = new File(directory, fileName);
+
+ boolean created = dataFile.createNewFile();
+ if (!created) {
+ logger.info("The file named {} already exists.", dataFile.getAbsolutePath());
+ } else {
+ logger.info("Create a new buffer data file: {}", dataFile.getAbsolutePath());
+ }
+
+ writeOffset.setOffset(0);
+ writeOffset.setFileName(dataFile.getName());
+
+ return dataFile;
+ }
+
+ void write(AbstractMessageLite messageLite) {
+ try {
+ messageLite.writeDelimitedTo(outputStream);
+ long position = outputStream.getChannel().position();
+ writeOffset.setOffset(position);
+ if (position > (FileUtils.ONE_MB * dataFileMaxSize)) {
+ File dataFile = createNewFile();
+ outputStream = FileUtils.openOutputStream(dataFile, true);
+ }
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+}
diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/Offset.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/Offset.java
new file mode 100644
index 0000000000..09e5936e1e
--- /dev/null
+++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/Offset.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.buffer;
+
+import lombok.*;
+import org.apache.skywalking.apm.util.StringUtil;
+
+/**
+ * @author peng-yongsheng
+ */
+class Offset {
+
+ private static final String SPLIT_CHARACTER = ",";
+ @Getter private final ReadOffset readOffset;
+ @Getter private final WriteOffset writeOffset;
+
+ Offset() {
+ readOffset = new ReadOffset();
+ writeOffset = new WriteOffset();
+ }
+
+ String serialize() {
+ return readOffset.getFileName() + SPLIT_CHARACTER + String.valueOf(readOffset.getOffset())
+ + SPLIT_CHARACTER + writeOffset.getFileName() + SPLIT_CHARACTER + String.valueOf(writeOffset.getOffset());
+ }
+
+ void deserialize(String value) {
+ if (!StringUtil.isEmpty(value)) {
+ String[] values = value.split(SPLIT_CHARACTER);
+ if (values.length == 4) {
+ readOffset.setFileName(values[0]);
+ readOffset.setOffset(Long.parseLong(values[1]));
+ writeOffset.setFileName(values[2]);
+ writeOffset.setOffset(Long.parseLong(values[3]));
+ }
+ }
+ }
+
+ static class ReadOffset {
+ @Getter @Setter private String fileName;
+ @Getter @Setter private long offset = 0;
+ }
+
+ static class WriteOffset {
+ @Getter @Setter private String fileName;
+ @Getter @Setter private long offset = 0;
+ }
+}
diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/OffsetStream.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/OffsetStream.java
new file mode 100644
index 0000000000..7f46331411
--- /dev/null
+++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/OffsetStream.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.buffer;
+
+import java.io.*;
+import java.nio.charset.Charset;
+import java.util.concurrent.*;
+import lombok.Getter;
+import org.apache.commons.io.*;
+import org.apache.commons.io.filefilter.PrefixFileFilter;
+import org.apache.commons.io.input.ReversedLinesFileReader;
+import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+class OffsetStream {
+
+ private static final Logger logger = LoggerFactory.getLogger(OffsetStream.class);
+
+ private final File directory;
+ private final int offsetFileMaxSize;
+
+ @Getter private final Offset offset;
+ private File offsetFile;
+ private boolean initialized = false;
+ private String lastOffsetRecord = "";
+
+ OffsetStream(File directory, int offsetFileMaxSize) {
+ this.directory = directory;
+ this.offsetFileMaxSize = offsetFileMaxSize;
+ this.offset = new Offset();
+ }
+
+ void clean() throws IOException {
+ String[] fileNames = directory.list(new PrefixFileFilter(BufferFileUtils.OFFSET_FILE_PREFIX));
+ if (fileNames != null) {
+ for (String fileName : fileNames) {
+ File file = new File(directory, fileName);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Delete buffer offset file: {}", file.getAbsolutePath());
+ }
+ FileUtils.forceDelete(new File(directory, fileName));
+ }
+ }
+ }
+
+ synchronized void initialize() throws IOException {
+ if (!initialized) {
+ String[] fileNames = directory.list(new PrefixFileFilter(BufferFileUtils.OFFSET_FILE_PREFIX));
+ if (fileNames != null && fileNames.length > 0) {
+ for (int i = 0; i < fileNames.length; i++) {
+ }
+ } else {
+ offsetFile = newFile();
+ }
+ offset.deserialize(readLastLine());
+ initialized = true;
+
+ Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
+ new RunnableWithExceptionProtection(this::flush,
+ t -> logger.error("Flush offset file in background failure.", t)
+ ), 2, 1, TimeUnit.SECONDS);
+ }
+ }
+
+ void flush() {
+ try {
+ String offsetRecord = offset.serialize();
+ logger.debug("flush offset, record: {}", offsetRecord);
+ if (!lastOffsetRecord.equals(offsetRecord)) {
+ if (offsetFile.length() >= FileUtils.ONE_MB * offsetFileMaxSize) {
+ nextFile();
+ }
+
+ try (OutputStream out = new BufferedOutputStream(FileUtils.openOutputStream(offsetFile, true))) {
+ IOUtils.write(offsetRecord, out, Charset.forName(BufferFileUtils.CHARSET));
+ IOUtils.write(System.lineSeparator(), out, Charset.forName(BufferFileUtils.CHARSET));
+ }
+ lastOffsetRecord = offsetRecord;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ private void nextFile() throws IOException {
+ File newOffsetFile = newFile();
+ if (!offsetFile.delete()) {
+ logger.warn("Offset file {} delete failure.", newOffsetFile.getAbsolutePath());
+ }
+ offsetFile = newOffsetFile;
+ this.flush();
+ }
+
+ private File newFile() throws IOException {
+ String fileName = BufferFileUtils.buildFileName(BufferFileUtils.OFFSET_FILE_PREFIX);
+ File file = new File(directory, fileName);
+ if (file.createNewFile()) {
+ logger.info("Create a new offset file {}", fileName);
+ }
+ return file;
+ }
+
+ private String readLastLine() throws IOException {
+ ReversedLinesFileReader reader = new ReversedLinesFileReader(offsetFile, Charset.forName(BufferFileUtils.CHARSET));
+ return reader.readLine();
+ }
+}
diff --git a/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java b/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java
new file mode 100644
index 0000000000..cff910085c
--- /dev/null
+++ b/oap-server/server-library/library-buffer/src/test/java/org/apache/skywalking/oap/server/library/buffer/BufferStreamTestCase.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.buffer;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.apm.network.language.agent.*;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class BufferStreamTestCase {
+
+ private static final Logger logger = LoggerFactory.getLogger(BufferStreamTestCase.class);
+
+ public static void main(String[] args) throws IOException, InterruptedException {
+ String directory = "/Users/pengys5/code/sky-walking/buffer-test";
+ BufferStream.Builder builder = new BufferStream.Builder<>(directory);
+ builder.cleanWhenRestart(true);
+ builder.dataFileMaxSize(1);
+ builder.offsetFileMaxSize(1);
+ builder.parser(TraceSegmentObject.parser());
+ builder.callBack(new SegmentParse());
+
+ BufferStream stream = builder.build();
+ stream.initialize();
+
+ TimeUnit.SECONDS.sleep(5);
+
+ String str = "2018-08-27 11:59:45,261 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28" +
+ "main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28" +
+ "main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28" +
+ "main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28";
+
+ for (int i = 0; i < 100; i++) {
+ TraceSegmentObject.Builder segment = TraceSegmentObject.newBuilder();
+ SpanObject.Builder span = SpanObject.newBuilder();
+
+ span.setOperationName(String.valueOf(i) + " " + str);
+ segment.addSpans(span);
+ stream.write(segment.build());
+ }
+
+ }
+
+ private static class SegmentParse implements DataStreamReader.CallBack {
+
+ @Override public void call(TraceSegmentObject message) {
+ logger.info("segment parse: {}", message.getSpans(0).getOperationName());
+ }
+ }
+}
diff --git a/oap-server/server-library/library-buffer/src/test/resources/log4j2.xml b/oap-server/server-library/library-buffer/src/test/resources/log4j2.xml
new file mode 100644
index 0000000000..6eb5b3fb98
--- /dev/null
+++ b/oap-server/server-library/library-buffer/src/test/resources/log4j2.xml
@@ -0,0 +1,31 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/FileUtils.java b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/FileUtils.java
new file mode 100644
index 0000000000..9776730a17
--- /dev/null
+++ b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/FileUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.util;
+
+import java.io.*;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public enum FileUtils {
+ INSTANCE;
+
+ private static final Logger logger = LoggerFactory.getLogger(FileUtils.class);
+
+ public String readLastLine(File file) {
+ RandomAccessFile randomAccessFile = null;
+ try {
+ randomAccessFile = new RandomAccessFile(file, "r");
+ long length = randomAccessFile.length();
+ if (length == 0) {
+ return "";
+ } else {
+ long position = length - 1;
+ randomAccessFile.seek(position);
+ while (position >= 0) {
+ if (randomAccessFile.read() == '\n') {
+ return randomAccessFile.readLine();
+ }
+ randomAccessFile.seek(position);
+ if (position == 0) {
+ return randomAccessFile.readLine();
+ }
+ position--;
+ }
+ }
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
+ } finally {
+ if (randomAccessFile != null) {
+ try {
+ randomAccessFile.close();
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+ }
+ return "";
+ }
+
+ public void writeAppendToLast(File file, RandomAccessFile randomAccessFile, String value) {
+ if (randomAccessFile == null) {
+ try {
+ randomAccessFile = new RandomAccessFile(file, "rwd");
+ } catch (FileNotFoundException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+ try {
+ long length = randomAccessFile.length();
+ randomAccessFile.seek(length);
+ randomAccessFile.writeBytes(System.lineSeparator());
+ randomAccessFile.writeBytes(value);
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+}
diff --git a/oap-server/server-library/pom.xml b/oap-server/server-library/pom.xml
index 5df8a5a0f5..ac79fb6d1a 100644
--- a/oap-server/server-library/pom.xml
+++ b/oap-server/server-library/pom.xml
@@ -34,5 +34,13 @@
library-server
library-util
library-client
+ library-buffer
+
+
+
+ org.apache.skywalking
+ apm-util
+
+
\ No newline at end of file
diff --git a/oap-server/server-starter/pom.xml b/oap-server/server-starter/pom.xml
index 6e9a95c060..bb89bbce7d 100644
--- a/oap-server/server-starter/pom.xml
+++ b/oap-server/server-starter/pom.xml
@@ -75,6 +75,11 @@
skywalking-jvm-receiver-plugin
${project.version}
+
+ org.apache.skywalking
+ skywalking-trace-receiver-plugin
+ ${project.version}
+
--
GitLab