提交 e22111d5 编写于 作者: Z zhangxin10

1.添加监控端口到启动脚本中

2. 添加数据包长度限制
3. 修复多线程启动时,文件分配异常。
上级 a073af00
......@@ -46,4 +46,6 @@ done
echo "CLASSPATH=$CLASSPATH"
$JAVA -classpath $CLASSPATH com.ai.cloud.skywalking.reciever.CollectionServer >> ${SW_SERVER_BIN_DIR}/../log/sw-server.log & 2>&1&
JAVA_OPTS="$JAVA_OPTS -Djava.rmi.server.hostname=10.1.241.16 -Dcom.sun.management.jmxremote.port=28999 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false"
$JAVA ${JAVA_OPTS} -classpath $CLASSPATH com.ai.cloud.skywalking.reciever.CollectionServer >> ${SW_SERVER_BIN_DIR}/../log/sw-server.log & 2>&1&
......@@ -66,7 +66,7 @@
<executions>
<execution>
<id>copy-resources</id>
<phase>install</phase>
<phase>package</phase>
<goals>
<goal>copy-resources</goal>
</goals>
......@@ -82,7 +82,7 @@
</execution>
<execution>
<id>copy-start-script</id>
<phase>install</phase>
<phase>package</phase>
<goals>
<goal>copy-resources</goal>
</goals>
......@@ -103,7 +103,7 @@
<executions>
<execution>
<id>copy-dependencies</id>
<phase>pre-integration-test</phase>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
......
......@@ -29,13 +29,17 @@ public class Config {
public static int DATA_FILE_MAX_LENGTH = 30 * 1024 * 1024;
// 每次缓存数据写入失败,最大尝试时间
public static long WRITE_DATA_FAILURE_RETRY_INTERVAL = 10 * 60 * 1000L;
public static long WRITE_DATA_FAILURE_RETRY_INTERVAL =10 * 1000L;
//每次Flush的缓存数据的个数
public static int FLUSH_NUMBER_OF_CACHE = 30;
}
public static class DataPackage {
public static int MAX_DATA_PACKAGE = 1024 * 1024;
}
public static class Persistence {
// 定位文件时,每次读取偏移量跳过大小
public static int STEP_SIZE_FOR_LOCATING_FILE_OFFSET = 2048;
......@@ -47,7 +51,7 @@ public class Config {
public static int MAX_APPEND_EOF_FLAGS_THREAD_NUMBER = 2;
// 每次存储的最大数量
public static final int MAX_STORAGE_SIZE_PER_TIME = 1024 * 1024;
public static int MAX_STORAGE_SIZE_PER_TIME = 1024 * 1024;
// 当读取到文件结束时等待时间
public static long READ_ENDING_FILE_MAX_WAITE_TIME = 500L;
......
package com.ai.cloud.skywalking.reciever.handler;
import com.ai.cloud.skywalking.reciever.buffer.DataBufferThreadContainer;
import com.ai.cloud.skywalking.reciever.conf.Config;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import static com.ai.cloud.skywalking.reciever.conf.Config.Persistence.*;
public class CollectionServerDataHandler extends SimpleChannelInboundHandler<byte[]> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, byte[] msg) throws Exception {
Thread.currentThread().setName("ServerReceiver");
// 当接受到这条消息的是空,则忽略
if (msg != null && msg.length >= 0 && msg.length < MAX_STORAGE_SIZE_PER_TIME) {
if (msg != null && msg.length >= 0 && msg.length < Config.DataPackage.MAX_DATA_PACKAGE) {
DataBufferThreadContainer.getDataBufferThread().saveTemporarily(msg);
}
}
......
......@@ -20,30 +20,53 @@ public class MemoryRegister {
return memoryRegister;
}
public synchronized void doRegisterStatus(FileRegisterEntry fileRegisterEntry) {
public void updateOffSet(String fileName, int offset) {
if (logger.isDebugEnabled()) {
logger.debug("Register entry[{}] into the memory register", fileRegisterEntry.getFileName());
logger.debug("Register entry[{}] into the memory register", fileName);
}
if (entries.containsKey(fileName)){
entries.get(fileName).setOffset(offset);
}
entries.put(fileRegisterEntry.getFileName(), fileRegisterEntry);
}
public void unRegister(String fileName) {
if (logger.isDebugEnabled()) {
logger.debug("Unregister[{}] from the memory register", fileName);
}
if (entries.containsKey(fileName)) {
entries.get(fileName).setStatus(FileRegisterEntry.FileRegisterEntryStatus.UNREGISTER);
}
}
public void removeEntry(String fileName){
entries.remove(fileName);
}
public synchronized boolean isRegister(String fileName) {
public synchronized FileRegisterEntry doRegister(String fileName) {
logger.debug("Begin to register File[{}]", fileName);
FileRegisterEntry entry = null;
// 已经存在entries.
if (entries.containsKey(fileName)) {
logger.debug("FileRegisterEntry[{}] Status:[{}]", entries.get(fileName).getStatus());
// 已经被别的线程处理中
if (entries.get(fileName).getStatus() == FileRegisterEntry.FileRegisterEntryStatus.REGISTER) {
if (logger.isDebugEnabled()) {
logger.debug("Entry[{}] has been register", fileName);
}
return true;
} else {
// 没有被别的线程处理
entry = entries.get(fileName);
entry.setStatus(FileRegisterEntry.FileRegisterEntryStatus.REGISTER);
}
} else {
// 以前没有被注册过的
entry = new FileRegisterEntry(fileName, 0, FileRegisterEntry.FileRegisterEntryStatus.REGISTER);
entries.put(fileName, entry);
}
return false;
return entry;
}
......
......@@ -23,12 +23,12 @@ public class PersistenceThread extends Thread {
@Override
public void run() {
File file1;
File file1 = null;
BufferedReader bufferedReader = null;
int offset;
while (true) {
try {
file1 = getDataFiles();
file1 = getDataFiles();
if (file1 == null) {
try {
Thread.sleep(SWITCH_FILE_WAIT_TIME);
......@@ -37,7 +37,7 @@ public class PersistenceThread extends Thread {
}
continue;
}
bufferedReader = new BufferedReader(new FileReader(file1));
offset = moveOffSet(file1, bufferedReader);
if (logger.isDebugEnabled()) {
......@@ -52,6 +52,7 @@ public class PersistenceThread extends Thread {
//文件结束
if (tmpData == null) {
if (stringBuilder != null && stringBuilder.length() > 0) {
MemoryRegister.instance().updateOffSet(file1.getName(), offset);
StorageChainController.doStorage(stringBuilder
.toString());
stringBuilder.delete(0, stringBuilder.length());
......@@ -96,7 +97,8 @@ public class PersistenceThread extends Thread {
}
logger.info("Delete file[{}] {}", file1.getName(),
(deleteSuccess ? "success" : "failed"));
MemoryRegister.instance().unRegister(file1.getName());
MemoryRegister.instance().removeEntry(file1.getName());
break;
}
......@@ -104,13 +106,7 @@ public class PersistenceThread extends Thread {
StorageChainController.doStorage(stringBuilder
.toString());
stringBuilder.delete(0, stringBuilder.length());
MemoryRegister
.instance()
.doRegisterStatus(
new FileRegisterEntry(
file1.getName(),
offset,
FileRegisterEntry.FileRegisterEntryStatus.REGISTER));
MemoryRegister.instance().updateOffSet(file1.getName(), offset);
}
stringBuilder.append(tmpData);
......@@ -122,6 +118,9 @@ public class PersistenceThread extends Thread {
} catch (IOException e) {
logger.error("The data file could not be found", e);
} finally {
if (file1 != null) {
MemoryRegister.instance().unRegister(file1.getName());
}
try {
if (bufferedReader != null)
bufferedReader.close();
......@@ -142,14 +141,6 @@ public class PersistenceThread extends Thread {
throws IOException {
int offset = MemoryRegister.instance().getOffSet(file1.getName());
if (-1 == offset || offset == 0) {
// 以前该文件没有被任何人处理过,需要重新注册
MemoryRegister
.instance()
.doRegisterStatus(
new FileRegisterEntry(
file1.getName(),
0,
FileRegisterEntry.FileRegisterEntryStatus.REGISTER));
offset = 0;
} else {
char[] cha = new char[STEP_SIZE_FOR_LOCATING_FILE_OFFSET];
......@@ -174,7 +165,7 @@ public class PersistenceThread extends Thread {
if (file.getName().startsWith(".")) {
continue;
}
if (MemoryRegister.instance().isRegister(file.getName())) {
if (MemoryRegister.instance().doRegister(file.getName()) == null) {
if (logger.isDebugEnabled())
logger.debug(
"The file [{}] is being used by another thread ",
......
#采集服务器的端口
server.port=34000
server.max_deal_data_thread_number=1
server.max_deal_data_thread_number=10
#每个线程最大缓存数量
buffer.per_thread_max_buffer_number=1024
......@@ -14,6 +14,11 @@ buffer.data_buffer_file_parent_directory=D:/test-data/data/buffer
buffer.data_file_max_length=104857600
#每次Flush的缓存数据的个数
buffer.flush_number_of_cache=30
#每次缓存数据写入失败,最大尝试时间
buffer.write_data_failure_retry_interval = 10000
#数据包的最大限制
datapackage.max_data_package=1048576
#定位文件时,每次读取偏移量跳过大小
persistence.step_size_for_location_file_offset=20480
......@@ -23,6 +28,8 @@ persistence.switch_file_wait_time=5000
persistence.max_append_eof_flags_thread_number=2
#当读取文件结束时最大等待时间
persistence.read_ending_file_max_waite_time=50
#每次存储的最大数量
persistence.max_storage_size_per_time = 1048576
#偏移量注册文件的目录
registerpersistence.register_file_parent_directory=d:/test-data/data/offset
......@@ -45,7 +52,7 @@ hbaseconfig.client_port=29181
#告警失效时间
alarm.alarm_expire_seconds=3600000
#Redis配置
alarm.redis_server=127.0.0.1:16379
alarm.redis_server=10.1.241.18:16379
#Redis最大空闲数量
alarm.edis_max_idle=10
#Redis最小空闲数量
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册