提交 c272abb0 编写于 作者: Z zhangxin10

添加追加EOF标志位的线程

上级 414f6f90
package com.ai.cloud.skywalking.reciever.buffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
class AppendEOFFlagThread extends Thread {
private Logger logger = LogManager.getLogger(AppendEOFFlagThread.class);
private File[] dataBufferFiles;
public AppendEOFFlagThread(File[] dataBufferFiles) {
this.dataBufferFiles = dataBufferFiles;
}
@Override
public void run() {
BufferedWriter bufferedWriter;
for (File file : dataBufferFiles) {
try {
logger.info("Add EOF flags to unprocessed data file[{}]", file.getName());
bufferedWriter = new BufferedWriter(new FileWriter(new File(file.getParent(), file.getName()), true));
bufferedWriter.write("EOF\n");
bufferedWriter.flush();
bufferedWriter.close();
} catch (IOException e) {
logger.info("Add EOF flags to the unprocessed data file failed.", e);
}
}
}
}
......@@ -5,17 +5,17 @@ import org.apache.commons.io.comparator.NameFileComparator;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import static com.ai.cloud.skywalking.reciever.conf.Config.Buffer.MAX_THREAD_NUMBER;
import static com.ai.cloud.skywalking.reciever.conf.Config.Persistence.MAX_APPEND_EOF_FLAGS_THREAD_NUMBER;
public class DataBufferThreadContainer {
private static Logger logger = LogManager.getLogger(DataBufferThreadContainer.class);
private static List<DataBufferThread> buffers = new ArrayList<DataBufferThread>();
......@@ -34,19 +34,17 @@ public class DataBufferThreadContainer {
File parentDir = new File(Config.Buffer.DATA_BUFFER_FILE_PARENT_DIRECTORY);
NameFileComparator sizeComparator = new NameFileComparator();
File[] dataFileList = sizeComparator.sort(parentDir.listFiles());
BufferedWriter bufferedWriter;
for (File file : dataFileList) {
try {
logger.info("Add EOF flags to unprocessed data file[{}]", file.getName());
bufferedWriter = new BufferedWriter(new FileWriter(new File(file.getParent(), file.getName()), true));
bufferedWriter.write("EOF\n");
bufferedWriter.flush();
bufferedWriter.close();
} catch (IOException e) {
logger.info("Add EOF flags to the unprocessed data file failed.", e);
int step = (int) Math.ceil(dataFileList.length * 1.0 / MAX_APPEND_EOF_FLAGS_THREAD_NUMBER);
int start = 0, end = 0;
while (true) {
if (end + step >= dataFileList.length) {
new AppendEOFFlagThread(Arrays.copyOf(dataFileList, start)).start();
break;
}
end += step;
new AppendEOFFlagThread(Arrays.copyOfRange(dataFileList, start, end)).start();
start = end;
}
logger.info("Data buffer thread size {} begin to init ", MAX_THREAD_NUMBER);
for (int i = 0; i < MAX_THREAD_NUMBER; i++) {
DataBufferThread dataBufferThread = new DataBufferThread();
......
......@@ -48,6 +48,9 @@ public class Config {
// 处理文件完成之后,等待时间
public static long SWITCH_FILE_WAIT_TIME = 5000L;
// 追加EOF标志位的线程数量
public static final int MAX_APPEND_EOF_FLAGS_THREAD_NUMBER = 11;
}
public static class RegisterPersistence {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册