提交 ee46760d 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

Buffer file implementation (#1607)

* Buffer stream.

* Buffer file reader.
上级 08509753
......@@ -16,7 +16,6 @@
*
*/
package org.apache.skywalking.apm.util;
public final class StringUtil {
......
......@@ -56,6 +56,7 @@
<h2.version>1.4.196</h2.version>
<shardingjdbc.version>2.0.3</shardingjdbc.version>
<commons-dbcp.version>1.4</commons-dbcp.version>
<commons-io.version>2.6</commons-io.version>
<elasticsearch.version>6.3.2</elasticsearch.version>
<joda-time.version>2.9.9</joda-time.version>
<kubernetes.version>2.0.0</kubernetes.version>
......@@ -250,6 +251,11 @@
<artifactId>commons-dbcp</artifactId>
<version>${commons-dbcp.version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
</dependency>
<dependency>
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>server-library</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>6.0.0-alpha-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>library-buffer</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-network</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.library.buffer;
import java.text.*;
import java.util.*;
/**
* @author peng-yongsheng
*/
class BufferFileUtils {
private BufferFileUtils() {
}
static final String CHARSET = "UTF-8";
static final String DATA_FILE_PREFIX = "data";
static final String OFFSET_FILE_PREFIX = "offset";
private static final String SEPARATOR = "-";
private static final String SUFFIX = ".sw";
private static final String DATA_FORMAT_STR = "yyyyMMddHHmmss";
static void sort(String[] fileList) {
Arrays.sort(fileList, (f1, f2) -> {
int fileId1 = Integer.parseInt(f1.split("_")[1]);
int fileId2 = Integer.parseInt(f2.split("_")[1]);
return fileId1 - fileId2;
});
}
static String buildFileName(String prefix) {
DateFormat dateFormat = new SimpleDateFormat(DATA_FORMAT_STR);
return prefix + SEPARATOR + dateFormat.format(new Date()) + SUFFIX;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.library.buffer;
import com.google.protobuf.*;
import java.io.*;
import java.nio.channels.FileLock;
import org.apache.commons.io.FileUtils;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class BufferStream<MESSAGE_TYPE extends GeneratedMessageV3> {
private static final Logger logger = LoggerFactory.getLogger(BufferStream.class);
private final String absolutePath;
private final boolean cleanWhenRestart;
private final int dataFileMaxSize;
private final int offsetFileMaxSize;
private final Parser<MESSAGE_TYPE> parser;
private final DataStreamReader.CallBack<MESSAGE_TYPE> callBack;
private DataStream<MESSAGE_TYPE> dataStream;
private BufferStream(String absolutePath, boolean cleanWhenRestart, int dataFileMaxSize, int offsetFileMaxSize,
Parser<MESSAGE_TYPE> parser, DataStreamReader.CallBack<MESSAGE_TYPE> callBack) {
this.absolutePath = absolutePath;
this.cleanWhenRestart = cleanWhenRestart;
this.dataFileMaxSize = dataFileMaxSize;
this.offsetFileMaxSize = offsetFileMaxSize;
this.parser = parser;
this.callBack = callBack;
}
public synchronized void initialize() throws IOException {
File directory = new File(absolutePath);
FileUtils.forceMkdir(directory);
tryLock(directory);
dataStream = new DataStream<>(directory, dataFileMaxSize, offsetFileMaxSize, parser, callBack);
if (cleanWhenRestart) {
dataStream.clean();
}
dataStream.initialize();
}
public synchronized void write(AbstractMessageLite messageLite) {
dataStream.getWriter().write(messageLite);
}
private void tryLock(File directory) {
logger.info("Try to lock buffer directory, directory is: " + absolutePath);
FileLock lock = null;
try {
lock = new FileOutputStream(new File(directory, "lock")).getChannel().tryLock();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
if (lock == null) {
throw new RuntimeException("The buffer directory is reading or writing by another thread, directory is: " + absolutePath);
}
logger.info("Lock buffer directory successfully, directory is: " + absolutePath);
}
public static class Builder<MESSAGE_TYPE extends GeneratedMessageV3> {
private final String absolutePath;
private boolean cleanWhenRestart;
private int dataFileMaxSize;
private int offsetFileMaxSize;
private Parser<MESSAGE_TYPE> parser;
private DataStreamReader.CallBack<MESSAGE_TYPE> callBack;
public Builder(String absolutePath) {
this.absolutePath = absolutePath;
}
public BufferStream<MESSAGE_TYPE> build() {
return new BufferStream<>(absolutePath, cleanWhenRestart, dataFileMaxSize, offsetFileMaxSize, parser, callBack);
}
public Builder<MESSAGE_TYPE> cleanWhenRestart(boolean cleanWhenRestart) {
this.cleanWhenRestart = cleanWhenRestart;
return this;
}
public Builder<MESSAGE_TYPE> offsetFileMaxSize(int offsetFileMaxSize) {
this.offsetFileMaxSize = offsetFileMaxSize;
return this;
}
public Builder<MESSAGE_TYPE> dataFileMaxSize(int dataFileMaxSize) {
this.dataFileMaxSize = dataFileMaxSize;
return this;
}
public Builder<MESSAGE_TYPE> parser(Parser<MESSAGE_TYPE> parser) {
this.parser = parser;
return this;
}
public Builder<MESSAGE_TYPE> callBack(DataStreamReader.CallBack<MESSAGE_TYPE> callBack) {
this.callBack = callBack;
return this;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.library.buffer;
import com.google.protobuf.*;
import java.io.*;
import lombok.Getter;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.PrefixFileFilter;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
class DataStream<MESSAGE_TYPE extends GeneratedMessageV3> {
private static final Logger logger = LoggerFactory.getLogger(DataStream.class);
private final File directory;
private final OffsetStream offsetStream;
@Getter private final DataStreamReader<MESSAGE_TYPE> reader;
@Getter private final DataStreamWriter<MESSAGE_TYPE> writer;
private boolean initialized = false;
DataStream(File directory, int offsetFileMaxSize, int dataFileMaxSize, Parser<MESSAGE_TYPE> parser,
DataStreamReader.CallBack<MESSAGE_TYPE> callBack) {
this.directory = directory;
this.offsetStream = new OffsetStream(directory, offsetFileMaxSize);
this.writer = new DataStreamWriter<>(directory, offsetStream.getOffset().getWriteOffset(), dataFileMaxSize);
this.reader = new DataStreamReader<>(directory, offsetStream.getOffset().getReadOffset(), parser, callBack);
}
void clean() throws IOException {
String[] fileNames = directory.list(new PrefixFileFilter(BufferFileUtils.DATA_FILE_PREFIX));
if (fileNames != null) {
for (String fileName : fileNames) {
File file = new File(directory, fileName);
if (logger.isDebugEnabled()) {
logger.debug("Delete buffer data file: {}", file.getAbsolutePath());
}
FileUtils.forceDelete(file);
}
}
offsetStream.clean();
}
synchronized void initialize() throws IOException {
if (!initialized) {
offsetStream.initialize();
writer.initialize();
reader.initialize();
initialized = true;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.library.buffer;
import com.google.protobuf.*;
import java.io.*;
import java.util.concurrent.*;
import org.apache.commons.io.filefilter.PrefixFileFilter;
import org.apache.skywalking.apm.util.*;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
private static final Logger logger = LoggerFactory.getLogger(DataStreamReader.class);
private final File directory;
private final Offset.ReadOffset readOffset;
private final Parser<MESSAGE_TYPE> parser;
private final CallBack<MESSAGE_TYPE> callBack;
private InputStream inputStream;
DataStreamReader(File directory, Offset.ReadOffset readOffset, Parser<MESSAGE_TYPE> parser,
CallBack<MESSAGE_TYPE> callBack) {
this.directory = directory;
this.readOffset = readOffset;
this.parser = parser;
this.callBack = callBack;
}
void initialize() {
preRead();
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
new RunnableWithExceptionProtection(this::read,
t -> logger.error("Segment buffer pre read failure.", t)), 3, 3, TimeUnit.SECONDS);
}
private void preRead() {
String fileName = readOffset.getFileName();
if (StringUtil.isEmpty(fileName)) {
openInputStream(readEarliestCreateDataFile());
} else {
File dataFile = new File(directory, fileName);
if (dataFile.exists()) {
openInputStream(dataFile);
read();
} else {
openInputStream(readEarliestCreateDataFile());
}
}
}
private void openInputStream(File readFile) {
try {
inputStream = new FileInputStream(readFile);
} catch (FileNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
private File readEarliestCreateDataFile() {
String[] fileNames = directory.list(new PrefixFileFilter(BufferFileUtils.DATA_FILE_PREFIX));
if (fileNames != null && fileNames.length > 0) {
BufferFileUtils.sort(fileNames);
readOffset.setFileName(fileNames[0]);
readOffset.setOffset(0);
return new File(directory, fileNames[0]);
} else {
return null;
}
}
private void read() {
try {
MESSAGE_TYPE messageType = parser.parseDelimitedFrom(inputStream);
if (messageType != null) {
callBack.call(messageType);
final int serialized = messageType.getSerializedSize();
final int offset = CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized;
readOffset.setOffset(readOffset.getOffset() + offset);
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
interface CallBack<MESSAGE_TYPE extends GeneratedMessageV3> {
void call(MESSAGE_TYPE message);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.library.buffer;
import com.google.protobuf.*;
import java.io.*;
import org.apache.commons.io.FileUtils;
import org.apache.skywalking.apm.util.StringUtil;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
class DataStreamWriter<MESSAGE_TYPE extends GeneratedMessageV3> {
private static final Logger logger = LoggerFactory.getLogger(DataStreamWriter.class);
private final File directory;
private final Offset.WriteOffset writeOffset;
private final int dataFileMaxSize;
private boolean initialized = false;
private FileOutputStream outputStream;
DataStreamWriter(File directory, Offset.WriteOffset writeOffset, int dataFileMaxSize) {
this.directory = directory;
this.dataFileMaxSize = dataFileMaxSize;
this.writeOffset = writeOffset;
}
synchronized void initialize() throws IOException {
if (!initialized) {
String writeFileName = writeOffset.getFileName();
File dataFile;
if (StringUtil.isEmpty(writeFileName)) {
dataFile = createNewFile();
} else {
dataFile = new File(directory, writeFileName);
if (!dataFile.exists()) {
dataFile = createNewFile();
}
}
outputStream = FileUtils.openOutputStream(dataFile, true);
initialized = true;
}
}
private File createNewFile() throws IOException {
String fileName = BufferFileUtils.buildFileName(BufferFileUtils.DATA_FILE_PREFIX);
File dataFile = new File(directory, fileName);
boolean created = dataFile.createNewFile();
if (!created) {
logger.info("The file named {} already exists.", dataFile.getAbsolutePath());
} else {
logger.info("Create a new buffer data file: {}", dataFile.getAbsolutePath());
}
writeOffset.setOffset(0);
writeOffset.setFileName(dataFile.getName());
return dataFile;
}
void write(AbstractMessageLite messageLite) {
try {
messageLite.writeDelimitedTo(outputStream);
long position = outputStream.getChannel().position();
writeOffset.setOffset(position);
if (position > (FileUtils.ONE_MB * dataFileMaxSize)) {
File dataFile = createNewFile();
outputStream = FileUtils.openOutputStream(dataFile, true);
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.library.buffer;
import lombok.*;
import org.apache.skywalking.apm.util.StringUtil;
/**
* @author peng-yongsheng
*/
class Offset {
private static final String SPLIT_CHARACTER = ",";
@Getter private final ReadOffset readOffset;
@Getter private final WriteOffset writeOffset;
Offset() {
readOffset = new ReadOffset();
writeOffset = new WriteOffset();
}
String serialize() {
return readOffset.getFileName() + SPLIT_CHARACTER + String.valueOf(readOffset.getOffset())
+ SPLIT_CHARACTER + writeOffset.getFileName() + SPLIT_CHARACTER + String.valueOf(writeOffset.getOffset());
}
void deserialize(String value) {
if (!StringUtil.isEmpty(value)) {
String[] values = value.split(SPLIT_CHARACTER);
if (values.length == 4) {
readOffset.setFileName(values[0]);
readOffset.setOffset(Long.parseLong(values[1]));
writeOffset.setFileName(values[2]);
writeOffset.setOffset(Long.parseLong(values[3]));
}
}
}
static class ReadOffset {
@Getter @Setter private String fileName;
@Getter @Setter private long offset = 0;
}
static class WriteOffset {
@Getter @Setter private String fileName;
@Getter @Setter private long offset = 0;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.library.buffer;
import java.io.*;
import java.nio.charset.Charset;
import java.util.concurrent.*;
import lombok.Getter;
import org.apache.commons.io.*;
import org.apache.commons.io.filefilter.PrefixFileFilter;
import org.apache.commons.io.input.ReversedLinesFileReader;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
class OffsetStream {
private static final Logger logger = LoggerFactory.getLogger(OffsetStream.class);
private final File directory;
private final int offsetFileMaxSize;
@Getter private final Offset offset;
private File offsetFile;
private boolean initialized = false;
private String lastOffsetRecord = "";
OffsetStream(File directory, int offsetFileMaxSize) {
this.directory = directory;
this.offsetFileMaxSize = offsetFileMaxSize;
this.offset = new Offset();
}
void clean() throws IOException {
String[] fileNames = directory.list(new PrefixFileFilter(BufferFileUtils.OFFSET_FILE_PREFIX));
if (fileNames != null) {
for (String fileName : fileNames) {
File file = new File(directory, fileName);
if (logger.isDebugEnabled()) {
logger.debug("Delete buffer offset file: {}", file.getAbsolutePath());
}
FileUtils.forceDelete(new File(directory, fileName));
}
}
}
synchronized void initialize() throws IOException {
if (!initialized) {
String[] fileNames = directory.list(new PrefixFileFilter(BufferFileUtils.OFFSET_FILE_PREFIX));
if (fileNames != null && fileNames.length > 0) {
for (int i = 0; i < fileNames.length; i++) {
}
} else {
offsetFile = newFile();
}
offset.deserialize(readLastLine());
initialized = true;
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
new RunnableWithExceptionProtection(this::flush,
t -> logger.error("Flush offset file in background failure.", t)
), 2, 1, TimeUnit.SECONDS);
}
}
void flush() {
try {
String offsetRecord = offset.serialize();
logger.debug("flush offset, record: {}", offsetRecord);
if (!lastOffsetRecord.equals(offsetRecord)) {
if (offsetFile.length() >= FileUtils.ONE_MB * offsetFileMaxSize) {
nextFile();
}
try (OutputStream out = new BufferedOutputStream(FileUtils.openOutputStream(offsetFile, true))) {
IOUtils.write(offsetRecord, out, Charset.forName(BufferFileUtils.CHARSET));
IOUtils.write(System.lineSeparator(), out, Charset.forName(BufferFileUtils.CHARSET));
}
lastOffsetRecord = offsetRecord;
}
} catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
private void nextFile() throws IOException {
File newOffsetFile = newFile();
if (!offsetFile.delete()) {
logger.warn("Offset file {} delete failure.", newOffsetFile.getAbsolutePath());
}
offsetFile = newOffsetFile;
this.flush();
}
private File newFile() throws IOException {
String fileName = BufferFileUtils.buildFileName(BufferFileUtils.OFFSET_FILE_PREFIX);
File file = new File(directory, fileName);
if (file.createNewFile()) {
logger.info("Create a new offset file {}", fileName);
}
return file;
}
private String readLastLine() throws IOException {
ReversedLinesFileReader reader = new ReversedLinesFileReader(offsetFile, Charset.forName(BufferFileUtils.CHARSET));
return reader.readLine();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.library.buffer;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.network.language.agent.*;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class BufferStreamTestCase {
private static final Logger logger = LoggerFactory.getLogger(BufferStreamTestCase.class);
public static void main(String[] args) throws IOException, InterruptedException {
String directory = "/Users/pengys5/code/sky-walking/buffer-test";
BufferStream.Builder<TraceSegmentObject> builder = new BufferStream.Builder<>(directory);
builder.cleanWhenRestart(true);
builder.dataFileMaxSize(1);
builder.offsetFileMaxSize(1);
builder.parser(TraceSegmentObject.parser());
builder.callBack(new SegmentParse());
BufferStream<TraceSegmentObject> stream = builder.build();
stream.initialize();
TimeUnit.SECONDS.sleep(5);
String str = "2018-08-27 11:59:45,261 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28" +
"main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28" +
"main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28" +
"main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28 main DEBUG Registering MBean org.apache.logging.log4j2:type=6d6f6e28";
for (int i = 0; i < 100; i++) {
TraceSegmentObject.Builder segment = TraceSegmentObject.newBuilder();
SpanObject.Builder span = SpanObject.newBuilder();
span.setOperationName(String.valueOf(i) + " " + str);
segment.addSpans(span);
stream.write(segment.build());
}
}
private static class SegmentParse implements DataStreamReader.CallBack<TraceSegmentObject> {
@Override public void call(TraceSegmentObject message) {
logger.info("segment parse: {}", message.getSpans(0).getOperationName());
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<Configuration status="DEBUG">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="DEBUG">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.library.util;
import java.io.*;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public enum FileUtils {
INSTANCE;
private static final Logger logger = LoggerFactory.getLogger(FileUtils.class);
public String readLastLine(File file) {
RandomAccessFile randomAccessFile = null;
try {
randomAccessFile = new RandomAccessFile(file, "r");
long length = randomAccessFile.length();
if (length == 0) {
return "";
} else {
long position = length - 1;
randomAccessFile.seek(position);
while (position >= 0) {
if (randomAccessFile.read() == '\n') {
return randomAccessFile.readLine();
}
randomAccessFile.seek(position);
if (position == 0) {
return randomAccessFile.readLine();
}
position--;
}
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
} finally {
if (randomAccessFile != null) {
try {
randomAccessFile.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
return "";
}
public void writeAppendToLast(File file, RandomAccessFile randomAccessFile, String value) {
if (randomAccessFile == null) {
try {
randomAccessFile = new RandomAccessFile(file, "rwd");
} catch (FileNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
try {
long length = randomAccessFile.length();
randomAccessFile.seek(length);
randomAccessFile.writeBytes(System.lineSeparator());
randomAccessFile.writeBytes(value);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
......@@ -34,5 +34,13 @@
<module>library-server</module>
<module>library-util</module>
<module>library-client</module>
<module>library-buffer</module>
</modules>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-util</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -75,6 +75,11 @@
<artifactId>skywalking-jvm-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-trace-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<!-- receiver module -->
<!-- storage module -->
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册