提交 5caaaf2d 编写于 作者: Z zhangxin10

1. 修改接受数据代码

上级 baa329bf
......@@ -65,7 +65,7 @@
<outputDirectory>${project.build.directory}/installer/bin</outputDirectory>
<resources>
<resource>
<directory>src/main/resources</directory>
<directory>bin</directory>
<filtering>false</filtering>
</resource>
</resources>
......
......@@ -14,14 +14,24 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
public class CollectionServer {
static Logger logger = LogManager.getLogger(CollectionServer.class);
private Selector selector;
static Map<Integer, ByteBuffer> byteBuffers = new LinkedHashMap<Integer, ByteBuffer>();
public CollectionServer() {
byteBuffers.put(512, ByteBuffer.allocate(512));
byteBuffers.put(128, ByteBuffer.allocate(128));
byteBuffers.put(32, ByteBuffer.allocate(32));
byteBuffers.put(8, ByteBuffer.allocate(8));
byteBuffers.put(2, ByteBuffer.allocate(2));
byteBuffers.put(1, ByteBuffer.allocate(1));
}
public void doCollect() throws IOException {
......@@ -40,26 +50,42 @@ public class CollectionServer {
sc.read(contextLengthBuffer);
int length = ByteArrayUtil.byteArrayToInt(contextLengthBuffer.array(), 0);
if (length > 0) {
ByteBuffer contentBuffer = ByteBuffer.allocate(length);
try{
sc.read(contentBuffer);
dataBuffer = DataBufferThreadContainer.getDataBufferThread();
dataBuffer.doCarry(new String(contentBuffer.array()));
}finally{
contentBuffer.flip();
}
readDataFromSocketChannel(length, byteBuffers, sc);
}
} catch (IOException e) {
logger.error("The remote client disconnect service", e);
sc.close();
}finally{
contextLengthBuffer.flip();
} finally {
contextLengthBuffer.flip();
}
}
}
}
}
public static void readDataFromSocketChannel(int length, Map<Integer, ByteBuffer> byteBuffers, ByteChannel byteChannel) {
int tmp = length;
StringBuilder stringBuilder = new StringBuilder();
for (Map.Entry<Integer, ByteBuffer> entry : byteBuffers.entrySet()) {
int j = tmp / entry.getKey();
if (j == 0) {
continue;
}
for (int k = 0; k < j; k++) {
try {
byteChannel.read(entry.getValue());
stringBuilder.append(new String(entry.getValue().array()));
} catch (IOException e) {
logger.error("Read data From socket channel", e);
} finally {
entry.getValue().clear();
}
}
tmp = tmp % entry.getKey();
}
DataBufferThreadContainer.getDataBufferThread().doCarry(stringBuilder.toString());
}
private void beginToRead(ServerSocketChannel serverSocketChannel, SelectionKey key) throws IOException {
if (key.isAcceptable()) {
SocketChannel sc = serverSocketChannel.accept();
......
......@@ -19,16 +19,21 @@ class AppendEOFFlagThread extends Thread {
@Override
public void run() {
BufferedWriter bufferedWriter;
BufferedWriter bufferedWriter = null;
for (File file : dataBufferFiles) {
try {
logger.info("Add EOF flags to unprocessed data file[{}]", file.getName());
bufferedWriter = new BufferedWriter(new FileWriter(new File(file.getParent(), file.getName()), true));
bufferedWriter.write("EOF\n");
bufferedWriter.flush();
bufferedWriter.close();
} catch (IOException e) {
logger.info("Add EOF flags to the unprocessed data file failed.", e);
} finally {
try {
bufferedWriter.flush();
bufferedWriter.close();
} catch (IOException e) {
}
}
}
}
......
......@@ -34,16 +34,19 @@ public class DataBufferThreadContainer {
File parentDir = new File(Config.Buffer.DATA_BUFFER_FILE_PARENT_DIRECTORY);
NameFileComparator sizeComparator = new NameFileComparator();
File[] dataFileList = sizeComparator.sort(parentDir.listFiles());
int step = (int) Math.ceil(dataFileList.length * 1.0 / MAX_APPEND_EOF_FLAGS_THREAD_NUMBER);
int start = 0, end = 0;
while (true) {
if (end + step >= dataFileList.length) {
new AppendEOFFlagThread(Arrays.copyOf(dataFileList, start)).start();
break;
logger.info("Pending file number :" + dataFileList.length);
if (dataFileList.length > 0) {
int step = (int) Math.ceil(dataFileList.length * 1.0 / MAX_APPEND_EOF_FLAGS_THREAD_NUMBER);
int start = 0, end = 0;
while (true) {
if (end + step >= dataFileList.length) {
new AppendEOFFlagThread(Arrays.copyOf(dataFileList, start)).start();
break;
}
end += step;
new AppendEOFFlagThread(Arrays.copyOfRange(dataFileList, start, end)).start();
start = end;
}
end += step;
new AppendEOFFlagThread(Arrays.copyOfRange(dataFileList, start, end)).start();
start = end;
}
logger.info("Data buffer thread size {} begin to init ", MAX_THREAD_NUMBER);
for (int i = 0; i < MAX_THREAD_NUMBER; i++) {
......
......@@ -10,14 +10,14 @@ buffer.max_wait_time=5000
#数据冲突时等待时间(单位:毫秒)
buffer.data_conflict_wait_time=10
#数据缓存文件目录
buffer.data_buffer_file_parent_directory=/Users/wusheng/Documents/code/github/test-data/buffer
buffer.data_buffer_file_parent_directory=D:/test-data/data/buffer
#缓存数据文件最大长度(单位:byte)
buffer.data_file_max_length=104857600
#每次Flush的缓存数据的个数
buffer.flush_number_of_cache=30
#最大持久化的线程数量
persistence.max_thread_number=1
persistence.max_thread_number=2
#定位文件时,每次读取偏移量跳过大小
persistence.offset_file_skip_length=2048
#每次读取文件偏移量大小
......@@ -28,7 +28,7 @@ persistence.switch_file_wait_time=5000
persistence.max_append_eof_flags_thread_number=2
#偏移量注册文件的目录
registerpersistence.register_file_parent_directory=/Users/wusheng/Documents/code/github/test-data/offset
registerpersistence.register_file_parent_directory=d:/test-data/data/offset
#偏移量注册文件名
registerpersistence.register_file_name=offset.txt
#偏移量注册备份文件名
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册