提交 2a49c593 编写于 作者: wu-sheng's avatar wu-sheng

* 移除无用的常量配置

* 优化AlarmChain、RedisInspector代码。避免新增线程造成JVM性能瓶颈
* 新增AlarmRedisConnector获取jedis连接,作为公用代码,供不能功能的alarm chain使用
* 修复 #44
* 增加health report报告类型限制,防止对health report的错误使用,造成内存溢出可能。
上级 02118b4c
......@@ -12,25 +12,30 @@ import com.ai.cloud.skywalking.conf.Config;
import com.ai.cloud.skywalking.util.BuriedPointMachineUtil;
public class SDKHealthCollector extends Thread {
private static Logger logger = LogManager.getLogger(SDKHealthCollector.class);
private static Logger logger = LogManager
.getLogger(SDKHealthCollector.class);
private static Map<String, HeathReading> heathReadings = new ConcurrentHashMap<String, HeathReading>();
private SDKHealthCollector(){
private SDKHealthCollector() {
super("HealthCollector");
}
public static void init(){
if(AuthDesc.isAuth()){
public static void init() {
if (AuthDesc.isAuth()) {
new SDKHealthCollector().start();
}
}
public static HeathReading getCurrentHeathReading(String extraId) {
String id = getId(extraId);
if (!heathReadings.containsKey(id)) {
synchronized (heathReadings) {
if (!heathReadings.containsKey(id)) {
if (heathReadings.keySet().size() > 5000) {
throw new RuntimeException(
"use ServerHealthCollector illegal. There is an overflow trend of SDK Health Collector Report Data.");
}
heathReadings.put(id, new HeathReading(id));
}
}
......@@ -52,17 +57,18 @@ public class SDKHealthCollector extends Thread {
try {
Map<String, HeathReading> heathReadingsSnapshot = heathReadings;
heathReadings = new ConcurrentHashMap<String, HeathReading>();
String[] keyList = heathReadingsSnapshot.keySet().toArray(new String[0]);
String[] keyList = heathReadingsSnapshot.keySet().toArray(
new String[0]);
Arrays.sort(keyList);
StringBuilder log = new StringBuilder();
log.append("\n---------SDK Health Collector Report---------\n");
for(String key : keyList){
for (String key : keyList) {
log.append(heathReadingsSnapshot.get(key)).append("\n");
}
log.append("------------------------------------------------\n");
logger.info(log);
try {
Thread.sleep(Config.HealthCollector.REPORT_INTERVAL);
} catch (InterruptedException e) {
......
......@@ -83,11 +83,7 @@ public class Config {
}
public static class StorageChain {
public static long RETRY_STORAGE_WAIT_TIME = 50L;
public static String STORAGE_TYPE = "hbase";
public static int RETRY_STORAGE_TIMES = 3;
}
public static class Alarm {
......@@ -105,6 +101,8 @@ public class Config {
public static int REDIS_MAX_TOTAL = 20;
public static boolean ALARM_OFF_FLAG = false;
public static long ALARM_REDIS_INSPECTOR_INTERVAL = 5 * 1000L;
}
public static class HealthCollector {
......
......@@ -28,6 +28,9 @@ public class ServerHealthCollector extends Thread {
if (!heathReadings.containsKey(id)) {
synchronized (heathReadings) {
if (!heathReadings.containsKey(id)) {
if(heathReadings.keySet().size() > 5000){
throw new RuntimeException("use ServerHealthCollector illegal. There is an overflow trend of Server Health Collector Report Data.");
}
heathReadings.put(id, new ServerHeathReading(id));
}
}
......
package com.ai.cloud.skywalking.reciever.storage;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import com.ai.cloud.skywalking.reciever.conf.Config;
import com.ai.cloud.skywalking.reciever.selfexamination.ServerHealthCollector;
import com.ai.cloud.skywalking.reciever.selfexamination.ServerHeathReading;
/**
* 告警的redis连接器,用于管理redis连接
*
* @author wusheng
*
*/
public class AlarmRedisConnector {
private static JedisPool jedisPool;
static {
new RedisInspector().start();
}
public static Jedis getJedis() {
if (Config.Alarm.ALARM_OFF_FLAG) {
return null;
} else if (jedisPool == null || jedisPool.isClosed()) {
reportJedisFailure();
return null;
} else {
return jedisPool.getResource();
}
}
public static void reportJedisFailure() {
RedisInspector.needConnectInit = true;
}
private static class RedisInspector extends Thread {
private static Logger logger = LogManager
.getLogger(RedisInspector.class);
private static boolean needConnectInit = true;
private String[] config;
public RedisInspector() {
super("RedisInspectorThread");
String redisServerConfig = Config.Alarm.REDIS_SERVER;
if (redisServerConfig == null || redisServerConfig.length() <= 0) {
logger.error("Redis server is not setting. Switch off alarm module. ");
Config.Alarm.ALARM_OFF_FLAG = true;
} else {
config = redisServerConfig.split(":");
if (config.length != 2) {
logger.error("Redis server address is illegal setting, need to be 'ip:port'. Switch off alarm module. ");
Config.Alarm.ALARM_OFF_FLAG = true;
}
}
}
@Override
public void run() {
if (Config.Alarm.ALARM_OFF_FLAG)
return;
while (true) {
try {
if (needConnectInit) {
if (jedisPool != null && !jedisPool.isClosed()) {
jedisPool.close();
}
GenericObjectPoolConfig genericObjectPoolConfig = buildGenericObjectPoolConfig();
jedisPool = new JedisPool(genericObjectPoolConfig,
config[0], Integer.valueOf(config[1]));
// Test connect redis.
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.get("ok");
needConnectInit = false;
} catch (Exception e) {
logger.error("can't connect to redis["
+ Config.Alarm.REDIS_SERVER + "]", e);
} finally {
if (jedis != null) {
jedis.close();
}
}
}
if (needConnectInit) {
ServerHealthCollector.getCurrentHeathReading(null)
.updateData(ServerHeathReading.ERROR,
"alarm redis connect failue.");
} else {
ServerHealthCollector.getCurrentHeathReading(null)
.updateData(ServerHeathReading.INFO,
"alarm redis connectted.");
}
} catch (Throwable t) {
logger.error("redis init connect failue", t);
}
try {
Thread.sleep(Config.Alarm.ALARM_REDIS_INSPECTOR_INTERVAL);
} catch (InterruptedException e) {
logger.error("Failure sleep.", e);
}
}
}
private GenericObjectPoolConfig buildGenericObjectPoolConfig() {
GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
genericObjectPoolConfig.setTestOnBorrow(true);
genericObjectPoolConfig.setMaxIdle(Config.Alarm.REDIS_MAX_IDLE);
genericObjectPoolConfig.setMinIdle(Config.Alarm.REDIS_MIN_IDLE);
genericObjectPoolConfig.setMaxTotal(Config.Alarm.REDIS_MAX_TOTAL);
return genericObjectPoolConfig;
}
}
}
package com.ai.cloud.skywalking.reciever.storage.chain;
import com.ai.cloud.skywalking.protocol.Span;
import com.ai.cloud.skywalking.reciever.conf.Config;
import com.ai.cloud.skywalking.reciever.storage.Chain;
import com.ai.cloud.skywalking.reciever.storage.IStorageChain;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import static com.ai.cloud.skywalking.reciever.conf.Config.Alarm.ALARM_EXCEPTION_STACK_LENGTH;
import static com.ai.cloud.skywalking.reciever.conf.Config.Alarm.ALARM_EXPIRE_SECONDS;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.exceptions.JedisConnectionException;
import java.util.List;
import redis.clients.jedis.Jedis;
import static com.ai.cloud.skywalking.reciever.conf.Config.Alarm.ALARM_EXCEPTION_STACK_LENGTH;
import static com.ai.cloud.skywalking.reciever.conf.Config.Alarm.ALARM_EXPIRE_SECONDS;
import com.ai.cloud.skywalking.protocol.Span;
import com.ai.cloud.skywalking.reciever.conf.Config;
import com.ai.cloud.skywalking.reciever.storage.AlarmRedisConnector;
import com.ai.cloud.skywalking.reciever.storage.Chain;
import com.ai.cloud.skywalking.reciever.storage.IStorageChain;
public class AlarmChain implements IStorageChain {
private static Logger logger = LogManager.getLogger(AlarmChain.class);
private static JedisPool jedisPool;
private static String[] config;
private static Object lock = new Object();
private static RedisInspector connector = new RedisInspector();
;
static {
GenericObjectPoolConfig genericObjectPoolConfig = buildGenericObjectPoolConfig();
String redisServerConfig = Config.Alarm.REDIS_SERVER;
if (redisServerConfig == null || redisServerConfig.length() <= 0) {
logger.error("Redis server is not setting. Switch off alarm module. ");
Config.Alarm.ALARM_OFF_FLAG = true;
} else {
config = redisServerConfig.split(":");
if (config.length != 2) {
logger.error("Redis server address is illegal setting, need to be 'ip:port'. Switch off alarm module. ");
Config.Alarm.ALARM_OFF_FLAG = true;
} else {
jedisPool = new JedisPool(genericObjectPoolConfig, config[0],
Integer.valueOf(config[1]));
// Test connect redis.
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
} catch (Exception e) {
handleFailedToConnectRedisServerException(e);
logger.error("can't connect to redis["
+ Config.Alarm.REDIS_SERVER + "]", e);
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}
}
@Override
public void doChain(List<Span> spans, Chain chain) {
......@@ -83,11 +47,15 @@ public class AlarmChain implements IStorageChain {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis = AlarmRedisConnector.getJedis();
if(jedis == null){
logger.error("Failed to set data. can't get jedis.");
return;
}
jedis.hsetnx(key, traceId, exceptionMsgOutline);
jedis.expire(key, ALARM_EXPIRE_SECONDS);
} catch (Exception e) {
handleFailedToConnectRedisServerException(e);
AlarmRedisConnector.reportJedisFailure();
logger.error("Failed to set data.", e);
} finally {
if (jedis != null) {
......@@ -95,57 +63,4 @@ public class AlarmChain implements IStorageChain {
}
}
}
private static class RedisInspector extends Thread {
@Override
public void run() {
logger.debug("Connecting to redis....");
Jedis jedis;
while (true) {
try {
jedisPool = new JedisPool(buildGenericObjectPoolConfig(),
config[0], Integer.valueOf(config[1]));
jedis = jedisPool.getResource();
jedis.get("ok");
break;
} catch (Exception e) {
if (e instanceof JedisConnectionException) {
try {
Thread.sleep(5000L);
} catch (InterruptedException e1) {
logger.error("Sleep failed", e);
}
continue;
}
}
}
logger.debug("Connected to redis success. Open alarm function.");
Config.Alarm.ALARM_OFF_FLAG = false;
// 清理当前线程
connector = null;
}
}
private static void handleFailedToConnectRedisServerException(Exception e) {
if (e instanceof JedisConnectionException) {
// 发生连接不上Redis
if (connector == null || !connector.isAlive()) {
synchronized (lock) {
if (!connector.isAlive()) {
// 启动巡检线程
connector.start();
}
}
}
}
}
private static GenericObjectPoolConfig buildGenericObjectPoolConfig() {
GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
genericObjectPoolConfig.setTestOnBorrow(true);
genericObjectPoolConfig.setMaxIdle(Config.Alarm.REDIS_MAX_IDLE);
genericObjectPoolConfig.setMinIdle(Config.Alarm.REDIS_MIN_IDLE);
genericObjectPoolConfig.setMaxTotal(Config.Alarm.REDIS_MAX_TOTAL);
return genericObjectPoolConfig;
}
}
package com.ai.cloud.skywalking.reciever.storage.chain;
import java.util.List;
import com.ai.cloud.skywalking.protocol.Span;
import com.ai.cloud.skywalking.reciever.storage.Chain;
import com.ai.cloud.skywalking.reciever.storage.IStorageChain;
public class ExecuteTimeAlarmChain implements IStorageChain {
@Override
public void doChain(List<Span> spans, Chain chain) {
// TODO Auto-generated method stub
}
}
......@@ -60,4 +60,6 @@ alarm.edis_min_idle=1
#Redis最大个数
alarm.edis_max_total=20
#是否关闭告警
alarm.larm_off_flag=false
\ No newline at end of file
alarm.larm_off_flag=false
#告警redis检测器检测周期
alarm.alarm_redis_inspector_interval=5000
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册