diff --git a/skywalking-server/pom.xml b/skywalking-server/pom.xml index ed4b5549c1754876efd12290c7dfc1880356625e..c9878d607cd5fbcd884017724713d9bafc8ff781 100644 --- a/skywalking-server/pom.xml +++ b/skywalking-server/pom.xml @@ -65,7 +65,7 @@ ${project.build.directory}/installer/bin - src/main/resources + bin false diff --git a/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/CollectionServer.java b/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/CollectionServer.java index e3dba40518baa7f4caffe2baaa3d2ba1887e6e0c..04fdf2f8d2e19e663067548f002cee8c083aa840 100644 --- a/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/CollectionServer.java +++ b/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/CollectionServer.java @@ -14,14 +14,24 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; import java.util.Properties; public class CollectionServer { static Logger logger = LogManager.getLogger(CollectionServer.class); private Selector selector; + static Map byteBuffers = new LinkedHashMap(); + public CollectionServer() { + byteBuffers.put(512, ByteBuffer.allocate(512)); + byteBuffers.put(128, ByteBuffer.allocate(128)); + byteBuffers.put(32, ByteBuffer.allocate(32)); + byteBuffers.put(8, ByteBuffer.allocate(8)); + byteBuffers.put(2, ByteBuffer.allocate(2)); + byteBuffers.put(1, ByteBuffer.allocate(1)); } public void doCollect() throws IOException { @@ -40,26 +50,42 @@ public class CollectionServer { sc.read(contextLengthBuffer); int length = ByteArrayUtil.byteArrayToInt(contextLengthBuffer.array(), 0); if (length > 0) { - ByteBuffer contentBuffer = ByteBuffer.allocate(length); - try{ - sc.read(contentBuffer); - dataBuffer = DataBufferThreadContainer.getDataBufferThread(); - dataBuffer.doCarry(new String(contentBuffer.array())); - }finally{ - contentBuffer.flip(); - } + readDataFromSocketChannel(length, byteBuffers, sc); } } catch (IOException e) { logger.error("The remote client disconnect service", e); sc.close(); - }finally{ - contextLengthBuffer.flip(); + } finally { + contextLengthBuffer.flip(); } } } } } + public static void readDataFromSocketChannel(int length, Map byteBuffers, ByteChannel byteChannel) { + int tmp = length; + StringBuilder stringBuilder = new StringBuilder(); + for (Map.Entry entry : byteBuffers.entrySet()) { + int j = tmp / entry.getKey(); + if (j == 0) { + continue; + } + for (int k = 0; k < j; k++) { + try { + byteChannel.read(entry.getValue()); + stringBuilder.append(new String(entry.getValue().array())); + } catch (IOException e) { + logger.error("Read data From socket channel", e); + } finally { + entry.getValue().clear(); + } + } + tmp = tmp % entry.getKey(); + } + DataBufferThreadContainer.getDataBufferThread().doCarry(stringBuilder.toString()); + } + private void beginToRead(ServerSocketChannel serverSocketChannel, SelectionKey key) throws IOException { if (key.isAcceptable()) { SocketChannel sc = serverSocketChannel.accept(); diff --git a/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/buffer/AppendEOFFlagThread.java b/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/buffer/AppendEOFFlagThread.java index 23612bff22e867550ffe77b518b5bbd8917d1b0a..c384a094e14f34a021718a64e0bd2a6454d1140a 100644 --- a/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/buffer/AppendEOFFlagThread.java +++ b/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/buffer/AppendEOFFlagThread.java @@ -19,16 +19,21 @@ class AppendEOFFlagThread extends Thread { @Override public void run() { - BufferedWriter bufferedWriter; + BufferedWriter bufferedWriter = null; for (File file : dataBufferFiles) { try { logger.info("Add EOF flags to unprocessed data file[{}]", file.getName()); bufferedWriter = new BufferedWriter(new FileWriter(new File(file.getParent(), file.getName()), true)); bufferedWriter.write("EOF\n"); - bufferedWriter.flush(); - bufferedWriter.close(); } catch (IOException e) { logger.info("Add EOF flags to the unprocessed data file failed.", e); + } finally { + try { + bufferedWriter.flush(); + bufferedWriter.close(); + } catch (IOException e) { + + } } } } diff --git a/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/buffer/DataBufferThreadContainer.java b/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/buffer/DataBufferThreadContainer.java index 576b848c9bdec301dc84517d26aa7be6431980c6..209255b72063f255461bee93a6b7b5a577281108 100644 --- a/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/buffer/DataBufferThreadContainer.java +++ b/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/buffer/DataBufferThreadContainer.java @@ -34,16 +34,19 @@ public class DataBufferThreadContainer { File parentDir = new File(Config.Buffer.DATA_BUFFER_FILE_PARENT_DIRECTORY); NameFileComparator sizeComparator = new NameFileComparator(); File[] dataFileList = sizeComparator.sort(parentDir.listFiles()); - int step = (int) Math.ceil(dataFileList.length * 1.0 / MAX_APPEND_EOF_FLAGS_THREAD_NUMBER); - int start = 0, end = 0; - while (true) { - if (end + step >= dataFileList.length) { - new AppendEOFFlagThread(Arrays.copyOf(dataFileList, start)).start(); - break; + logger.info("Pending file number :" + dataFileList.length); + if (dataFileList.length > 0) { + int step = (int) Math.ceil(dataFileList.length * 1.0 / MAX_APPEND_EOF_FLAGS_THREAD_NUMBER); + int start = 0, end = 0; + while (true) { + if (end + step >= dataFileList.length) { + new AppendEOFFlagThread(Arrays.copyOf(dataFileList, start)).start(); + break; + } + end += step; + new AppendEOFFlagThread(Arrays.copyOfRange(dataFileList, start, end)).start(); + start = end; } - end += step; - new AppendEOFFlagThread(Arrays.copyOfRange(dataFileList, start, end)).start(); - start = end; } logger.info("Data buffer thread size {} begin to init ", MAX_THREAD_NUMBER); for (int i = 0; i < MAX_THREAD_NUMBER; i++) { diff --git a/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/util/ByteArrayUtil.java b/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/util/ByteArrayUtil.java index f727544bb8295bb0dc18ab02b34b4262c7f39632..f22551f5a78681ed5a98f5c9f870df0fe977368f 100644 --- a/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/util/ByteArrayUtil.java +++ b/skywalking-server/src/main/java/com/ai/cloud/skywalking/reciever/util/ByteArrayUtil.java @@ -13,4 +13,6 @@ public class ByteArrayUtil { } return value; } + + } diff --git a/skywalking-server/src/main/resources/config.properties b/skywalking-server/src/main/resources/config.properties index 25cc38423db3e8ec184a95b5fd43271b9ca7b6ab..bf6fba490b0b9b99df52ada4a58478e1af5fcba8 100644 --- a/skywalking-server/src/main/resources/config.properties +++ b/skywalking-server/src/main/resources/config.properties @@ -10,14 +10,14 @@ buffer.max_wait_time=5000 #数据冲突时等待时间(单位:毫秒) buffer.data_conflict_wait_time=10 #数据缓存文件目录 -buffer.data_buffer_file_parent_directory=/Users/wusheng/Documents/code/github/test-data/buffer +buffer.data_buffer_file_parent_directory=D:/test-data/data/buffer #缓存数据文件最大长度(单位:byte) buffer.data_file_max_length=104857600 #每次Flush的缓存数据的个数 buffer.flush_number_of_cache=30 #最大持久化的线程数量 -persistence.max_thread_number=1 +persistence.max_thread_number=2 #定位文件时,每次读取偏移量跳过大小 persistence.offset_file_skip_length=2048 #每次读取文件偏移量大小 @@ -28,7 +28,7 @@ persistence.switch_file_wait_time=5000 persistence.max_append_eof_flags_thread_number=2 #偏移量注册文件的目录 -registerpersistence.register_file_parent_directory=/Users/wusheng/Documents/code/github/test-data/offset +registerpersistence.register_file_parent_directory=d:/test-data/data/offset #偏移量注册文件名 registerpersistence.register_file_name=offset.txt #偏移量注册备份文件名