提交 b39f00a5 编写于 作者: A ascrutae

将所有的线程改为守护线程

上级 4679e26d
......@@ -37,6 +37,7 @@ public class AlarmMessageProcessThread extends Thread {
public AlarmMessageProcessThread() {
// 初始化生成ThreadId
threadId = UUID.randomUUID().toString();
this.setDaemon(true);
}
@Override
......
......@@ -31,6 +31,7 @@ public class UserInfoCoordinator extends Thread {
private boolean isCoordinator = false;
public UserInfoCoordinator() {
this.setDaemon(true);
}
@Override
......
......@@ -19,6 +19,10 @@ public class UsersChangedDetectionThread extends Thread {
private String userIdsEncryptedStr;
private Logger logger = LogManager.getLogger(UsersChangedDetectionThread.class);
public UsersChangedDetectionThread() {
this.setDaemon(true);
}
public void run() {
while (true) {
try {
......
......@@ -61,6 +61,7 @@ public class BufferGroup {
super("ConsumerWorker");
this.start = start;
this.end = end;
this.setDaemon(true);
}
@Override
......
......@@ -18,6 +18,7 @@ public class SDKHealthCollector extends Thread {
private SDKHealthCollector() {
super("HealthCollector");
this.setDaemon(true);
}
public static void init() {
......
......@@ -102,6 +102,7 @@ public class DataSenderFactoryWithBalance {
public static class DataSenderChecker extends Thread {
public DataSenderChecker() {
super("Data-Sender-Checker");
this.setDaemon(true);
}
@Override
......
......@@ -19,6 +19,7 @@ class AppendEOFFlagThread extends Thread {
super("AppendEOFFlagThread");
this.dataBufferFiles = dataBufferFiles;
this.countDownLatch = countDownLatch;
this.setDaemon(true);
}
@Override
......
......@@ -26,6 +26,7 @@ public class DataBufferThread extends Thread {
public DataBufferThread(int threadIdx) {
super("DataBufferThread_" + threadIdx);
this.setDaemon(true);
}
@Override
......
......@@ -24,6 +24,7 @@ public class PersistenceThread extends Thread {
public PersistenceThread(int trdIndex) {
super("PersistentThread" + trdIndex);
this.setDaemon(true);
}
@Override
......
......@@ -30,6 +30,7 @@ public class RegisterPersistenceThread extends Thread {
Config.RegisterPersistence.REGISTER_FILE_PARENT_DIRECTORY, Config.RegisterPersistence.REGISTER_FILE_NAME);
bakOffsetFile = new File(
Config.RegisterPersistence.REGISTER_FILE_PARENT_DIRECTORY, Config.RegisterPersistence.REGISTER_BAK_FILE_NAME);
this.setDaemon(true);
}
@Override
......
......@@ -57,6 +57,7 @@ public class AlarmRedisConnector {
Config.Alarm.ALARM_OFF_FLAG = true;
}
}
this.setDaemon(true);
}
private RedisInspector connect() {
......
......@@ -10,66 +10,67 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class ServerHealthCollector extends Thread {
private Logger logger = LogManager.getLogger(ServerHealthCollector.class);
private Logger logger = LogManager.getLogger(ServerHealthCollector.class);
private static Map<String, ServerHeathReading> heathReadings = new ConcurrentHashMap<String, ServerHeathReading>();
private static Map<String, ServerHeathReading> heathReadings = new ConcurrentHashMap<String, ServerHeathReading>();
private ServerHealthCollector(){
super("ServerHealthCollector");
}
public static void init(){
new ServerHealthCollector().start();
}
public static ServerHeathReading getCurrentHeathReading(String extraId) {
String id = getId(extraId);
if (!heathReadings.containsKey(id)) {
synchronized (heathReadings) {
if (!heathReadings.containsKey(id)) {
if(heathReadings.keySet().size() > 5000){
throw new RuntimeException("use ServerHealthCollector illegal. There is an overflow trend of Server Health Collector Report Data.");
}
heathReadings.put(id, new ServerHeathReading(id));
}
}
}
return heathReadings.get(id);
}
private ServerHealthCollector() {
super("ServerHealthCollector");
this.setDaemon(true);
}
private static String getId(String extraId) {
return "SkyWalkingServer,M:" + MachineUtil.getHostDesc() + ",P:"
+ MachineUtil.getProcessNo() + ",T:"
+ Thread.currentThread().getName() + "("
+ Thread.currentThread().getId() + ")"
+ (extraId == null ? "" : ",extra:" + extraId);
}
public static void init() {
new ServerHealthCollector().start();
}
@Override
public void run() {
while (true) {
try {
Map<String, ServerHeathReading> heathReadingsSnapshot = heathReadings;
heathReadings = new ConcurrentHashMap<String, ServerHeathReading>();
String[] keyList = heathReadingsSnapshot.keySet().toArray(new String[0]);
Arrays.sort(keyList);
StringBuilder log = new StringBuilder();
log.append("\n---------Server Health Collector Report---------\n");
for(String key : keyList){
log.append(heathReadingsSnapshot.get(key)).append("\n");
}
log.append("------------------------------------------------\n");
logger.info(log);
try {
Thread.sleep(Config.HealthCollector.REPORT_INTERVAL);
} catch (InterruptedException e) {
logger.warn("sleep error.", e);
}
} catch (Throwable t) {
logger.error("ServerHealthCollector report error.", t);
}
}
}
public static ServerHeathReading getCurrentHeathReading(String extraId) {
String id = getId(extraId);
if (!heathReadings.containsKey(id)) {
synchronized (heathReadings) {
if (!heathReadings.containsKey(id)) {
if (heathReadings.keySet().size() > 5000) {
throw new RuntimeException(
"use ServerHealthCollector illegal. There is an overflow trend of Server Health Collector Report Data.");
}
heathReadings.put(id, new ServerHeathReading(id));
}
}
}
return heathReadings.get(id);
}
private static String getId(String extraId) {
return "SkyWalkingServer,M:" + MachineUtil.getHostDesc() + ",P:" + MachineUtil.getProcessNo() + ",T:" + Thread
.currentThread().getName() + "(" + Thread.currentThread().getId() + ")" + (extraId == null ?
"" :
",extra:" + extraId);
}
@Override
public void run() {
while (true) {
try {
Map<String, ServerHeathReading> heathReadingsSnapshot = heathReadings;
heathReadings = new ConcurrentHashMap<String, ServerHeathReading>();
String[] keyList = heathReadingsSnapshot.keySet().toArray(new String[0]);
Arrays.sort(keyList);
StringBuilder log = new StringBuilder();
log.append("\n---------Server Health Collector Report---------\n");
for (String key : keyList) {
log.append(heathReadingsSnapshot.get(key)).append("\n");
}
log.append("------------------------------------------------\n");
logger.info(log);
try {
Thread.sleep(Config.HealthCollector.REPORT_INTERVAL);
} catch (InterruptedException e) {
logger.warn("sleep error.", e);
}
} catch (Throwable t) {
logger.error("ServerHealthCollector report error.", t);
}
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册