提交 dd2e1e02 编写于 作者: Z zhangxin10

上传未上传的Bean

上级 9df8f638
package com.ai.cloud.skywalking.alarm;
import com.ai.cloud.skywalking.alarm.dao.AlarmMessageDao;
import com.ai.cloud.skywalking.alarm.model.ApplicationInfo;
import com.ai.cloud.skywalking.alarm.model.UserInfo;
import com.ai.cloud.skywalking.alarm.zk.ZKUtil;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.*;
public class AlarmMessageProcessThread extends Thread {
......@@ -25,7 +22,6 @@ public class AlarmMessageProcessThread extends Thread {
threadId = UUID.randomUUID().toString();
registerServer(threadId);
rank = ballot(threadId);
ZKUtil.watch(AlarmServerRegisterWatcher.getInstance());
toBeProcessUsers = getToBeProcessUsers();
}
......@@ -33,11 +29,21 @@ public class AlarmMessageProcessThread extends Thread {
@Override
public void run() {
lockAllUsers();
Set<String> traceIds;
Set<String> toBeSenderTraceId;
while (true) {
for (UserInfo userInfo : toBeProcessUsers) {
for (ApplicationInfo applicationInfo : userInfo.getApplicationInfos()) {
System.out.println(threadId + applicationInfo.getAppId());
}
traceIds = new HashSet<String>();
// for (ApplicationInfo applicationInfo : userInfo.getApplicationInfos()) {
// toBeSenderTraceId = RedisUtil.getAlarmMessage(applicationInfo);
// toBeSenderTraceId.removeAll(traceIds);
// if (toBeSenderTraceId == null || toBeSenderTraceId.size() <= 0){
// continue;
// }else{
//
// }
// }
}
if (isChanged) {
......@@ -83,8 +89,8 @@ public class AlarmMessageProcessThread extends Thread {
private List<UserInfo> getToBeProcessUsers() {
List<UserInfo> userInfos = AlarmMessageDao.selectAllUserInfo();
List<String> allServerIds = ZKUtil.selectAllServerIds();
int step = (int) Math.ceil(userInfos.size() * 1.0 / allServerIds.size());
List<String> allThreadIds = ZKUtil.selectAllThreadIds();
int step = (int) Math.ceil(userInfos.size() * 1.0 / allThreadIds.size());
int start = rank * step;
int end = (rank + 1) * step;
if (end > userInfos.size()) {
......@@ -93,7 +99,7 @@ public class AlarmMessageProcessThread extends Thread {
List<UserInfo> toBeProcessUsers = userInfos.subList(start, end);
for (UserInfo userInfo : toBeProcessUsers) {
userInfo.setApplicationInfos(AlarmMessageDao.selectAllApplicationsByUserId(userInfo.getUserId()));
// userInfo.setApplicationInfos(AlarmMessageDao.selectAlarmRulesByUserId(userInfo.getUserId()));
}
return toBeProcessUsers;
}
......@@ -118,7 +124,7 @@ public class AlarmMessageProcessThread extends Thread {
}
private int ballot(String threadId) {
List<String> serverIds = ZKUtil.selectAllServerIds();
List<String> serverIds = ZKUtil.selectAllThreadIds();
int rank = 0;
for (String tmpServerId : serverIds) {
if (tmpServerId.hashCode() < threadId.hashCode()) {
......
package com.ai.cloud.skywalking.alarm;
import com.ai.cloud.skywalking.alarm.conf.Config;
import com.ai.cloud.skywalking.alarm.zk.ZKUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -28,9 +29,10 @@ public class AlarmProcessServer {
new UserInfoInspector().start();
logger.info("Start user inspector thread success....");
logger.info("Alarm process server successfully started.");
ZKUtil.watch(AlarmServerRegisterWatcher.getInstance());
while (true) {
try {
Thread.sleep(Config.Server.DAEMON_THREAD_WAITE_INTERVAL);
Thread.sleep(Config.Server.DAEMON_THREAD_WAIT_INTERVAL);
} catch (InterruptedException e) {
logger.error("Sleep failed", e);
}
......
......@@ -3,31 +3,42 @@ package com.ai.cloud.skywalking.alarm.conf;
public class Config {
public static class Server {
public static int PROCESS_THREAD_SIZE = 2;
public static long DAEMON_THREAD_WAITE_INTERVAL = 50000L;
public static long DAEMON_THREAD_WAIT_INTERVAL = 50000L;
}
public static class ZKPath {
public static String CONNECT_STR = "127.0.0.1:2181";
public static int CONNECT_TIMEOUT = 1000;
public static int RETRY_TIMEOUT = 1000;
public static int RETRY_TIMES = 3;
public static String NODE_PREFIX = "/skywalking";
public static String SERVER_REGISTER_LOCK_PATH = "/alarm-server/register";
public static String REGISTER_SERVER_PATH = "/alarm-server/servers";
public static String USER_REGISTER_LOCK_PATH = "/alarm-server/users";
}
public static class DB {
public static String PASSWORD = "devrdbusr13";
public static String USER_NAME = "devrdbusr13";
public static String DRIVER_CLASS = "com.mysql.jdbc.Driver";
public static String URL = "jdbc:mysql://10.1.228.200:31306/test";
}
public static class Alarm {
......
package com.ai.cloud.skywalking.alarm.dao;
import com.ai.cloud.skywalking.alarm.conf.Config;
import com.ai.cloud.skywalking.alarm.model.AlarmRule;
import com.ai.cloud.skywalking.alarm.model.ApplicationInfo;
import com.ai.cloud.skywalking.alarm.model.UserInfo;
import org.apache.logging.log4j.LogManager;
......@@ -63,37 +64,47 @@ public class AlarmMessageDao {
return 0;
}
public static List<ApplicationInfo> selectAllApplicationsByUserId(String userId) {
List<ApplicationInfo> result = new ArrayList<ApplicationInfo>();
public static List<AlarmRule> selectAlarmRulesByUserId(String userId) {
List<AlarmRule> rules = new ArrayList<AlarmRule>();
List<ApplicationInfo> selfDefineApplications = new ArrayList<ApplicationInfo>();
try {
PreparedStatement ps = con.prepareStatement("SELECT alarm_rule.app_id, alarm_rule.uid,alarm_rule.is_global, alarm_rule.todo_type," +
PreparedStatement ps = con.prepareStatement("SELECT alarm_rule.app_id,alarm_rule.rule_id, alarm_rule.uid,alarm_rule.is_global, alarm_rule.todo_type," +
" alarm_rule.config_args FROM alarm_rule WHERE uid = ? AND sts = ?");
ps.setString(1, userId);
ps.setString(2, "A");
ResultSet rs = ps.executeQuery();
ApplicationInfo globalConfig = null;
AlarmRule globalRules = null;
AlarmRule tmpAlarmRule = null;
ApplicationInfo tmpApplication;
while (rs.next()) {
if ("1".equals(rs.getString("is_global"))) {
globalConfig = new ApplicationInfo();
globalConfig.setConfigArgs(rs.getString("config_args"));
globalRules = new AlarmRule(rs.getString("uid"), rs.getString("rule_id"));
globalRules.setConfigArgs(rs.getString("config_args"));
globalRules.setTodoType(rs.getString("todo_type"));
globalRules.setGlobal(true);
continue;
} else {
tmpAlarmRule = new AlarmRule(rs.getString("uid"), rs.getString("rule_id"));
globalRules.setConfigArgs(rs.getString("config_args"));
globalRules.setTodoType(rs.getString("todo_type"));
// 自定义规则的Application
tmpApplication = new ApplicationInfo();
tmpApplication.setAppId(rs.getString("app_id"));
tmpApplication.setUId(rs.getString("uid"));
selfDefineApplications.add(tmpApplication);
tmpAlarmRule.getApplicationInfos().add(tmpApplication);
rules.add(tmpAlarmRule);
}
tmpApplication = new ApplicationInfo();
tmpApplication.setAppId(rs.getString("app_id"));
tmpApplication.setUId(rs.getString("uid"));
tmpApplication.setConfigArgs(rs.getString("config_args"));
tmpApplication.setToDoType(rs.getString("todo_type"));
result.add(tmpApplication);
}
if (globalConfig == null) {
if (globalRules == null) {
//
throw new IllegalArgumentException("Can not found the global config");
}
List<ApplicationInfo> allApplication = new ArrayList<ApplicationInfo>();
ps = con.prepareStatement("SELECT application_info.app_id, application_info.uid FROM application_info WHERE uid = ? AND sts = ?");
ps = con.prepareStatement("SELECT application_info.app_id, application_info.uid, app_code FROM application_info WHERE uid = ? AND sts = ?");
ps.setString(1, userId);
ps.setString(2, "A");
rs = ps.executeQuery();
......@@ -102,20 +113,16 @@ public class AlarmMessageDao {
applicationInfo = new ApplicationInfo();
applicationInfo.setAppId(rs.getString("app_id"));
applicationInfo.setUId(rs.getString("uid"));
applicationInfo.setAppCode(rs.getString("app_code"));
allApplication.add(applicationInfo);
}
allApplication.removeAll(result);
for (ApplicationInfo app : allApplication) {
app.setConfigArgs(globalConfig.getConfigArgs());
app.setToDoType(globalConfig.getToDoType());
result.add(app);
}
allApplication.removeAll(selfDefineApplications);
globalRules.getApplicationInfos().addAll(allApplication);
rules.add(globalRules);
} catch (SQLException e) {
logger.error("Failed to query applications.", e);
}
return result;
return rules;
}
}
package com.ai.cloud.skywalking.alarm.model;
import com.ai.cloud.skywalking.alarm.util.RedisUtil;
import com.google.gson.Gson;
import redis.clients.jedis.Jedis;
import java.util.ArrayList;
import java.util.List;
public class AlarmRule {
private ConfigArgsDescriber configArgsDescriber;
private String configArgs;
private String todoType;
private String ruleId;
private boolean global = false;
private long previousFireTimeM;
private String uid;
public AlarmRule(String uid, String ruleId) {
this.ruleId = ruleId;
this.uid = uid;
previousFireTimeM = getPreviousFireTime(uid, ruleId);
}
public void setConfigArgs(String configArgs) {
this.configArgs = configArgs;
configArgsDescriber = new Gson().fromJson(configArgs, ConfigArgsDescriber.class);
}
private List<ApplicationInfo> applicationInfos = new ArrayList<ApplicationInfo>();
public List<ApplicationInfo> getApplicationInfos() {
return applicationInfos;
}
public void setApplicationInfos(List<ApplicationInfo> applicationInfos) {
this.applicationInfos = applicationInfos;
}
public ConfigArgsDescriber getConfigArgsDescriber() {
return configArgsDescriber;
}
public void setConfigArgsDescriber(ConfigArgsDescriber configArgsDescriber) {
this.configArgsDescriber = configArgsDescriber;
}
public void setTodoType(String todoType) {
this.todoType = todoType;
}
public String getTodoType() {
return todoType;
}
public String getRuleId() {
return ruleId;
}
public void setRuleId(String ruleId) {
this.ruleId = ruleId;
}
public void setGlobal(boolean global) {
this.global = global;
}
public boolean isGlobal() {
return global;
}
public long getPreviousFireTimeM() {
return previousFireTimeM;
}
public void setPreviousFireTimeM(long previousFireTimeM) {
this.previousFireTimeM = previousFireTimeM;
}
private static long getPreviousFireTime(String userId, String ruleId) {
Jedis client = RedisUtil.getRedisClient();
try {
String previousTime = client.get(userId + "-" + ruleId);
if (previousTime == null || previousTime.length() <= 0) {
return System.currentTimeMillis() / (10000 * 6);
}
return Long.valueOf(previousTime);
} finally {
if (client != null) {
client.close();
}
}
}
}
package com.ai.cloud.skywalking.alarm.model;
import com.google.gson.Gson;
public class ApplicationInfo {
private String appId;
private String configArgs;
private String UId;
private String toDoType;
private ConfigArgsDescriber configArgsDescriber;
private String appCode;
public ApplicationInfo() {
}
......@@ -20,15 +19,6 @@ public class ApplicationInfo {
this.appId = appId;
}
public void setConfigArgs(String configArgs) {
this.configArgs = configArgs;
configArgsDescriber = new Gson().fromJson(configArgs, ConfigArgsDescriber.class);
}
public ConfigArgsDescriber getConfigArgsDescriber() {
return configArgsDescriber;
}
public String getConfigArgs() {
return configArgs;
}
......@@ -67,4 +57,12 @@ public class ApplicationInfo {
result = 31 * result + (getUId() != null ? getUId().hashCode() : 0);
return result;
}
public void setAppCode(String appCode) {
this.appCode = appCode;
}
public String getAppCode() {
return appCode;
}
}
......@@ -3,6 +3,10 @@ package com.ai.cloud.skywalking.alarm.model;
public class ConfigArgsDescriber {
private int period;
private MailInfo mailInfo;
private UrlInfo urlInfo;
public int getPeriod() {
return period;
}
......@@ -10,4 +14,20 @@ public class ConfigArgsDescriber {
public void setPeriod(int period) {
this.period = period;
}
public MailInfo getMailInfo() {
return mailInfo;
}
public void setMailInfo(MailInfo mailInfo) {
this.mailInfo = mailInfo;
}
public UrlInfo getUrlInfo() {
return urlInfo;
}
public void setUrlInfo(UrlInfo urlInfo) {
this.urlInfo = urlInfo;
}
}
package com.ai.cloud.skywalking.alarm.model;
public class MailInfo {
private String[] mailTo;
private String[] mailCc;
private String mailTemp;
public String[] getMailTo() {
return mailTo;
}
public void setMailTo(String[] mailTo) {
this.mailTo = mailTo;
}
public String[] getMailCc() {
return mailCc;
}
public void setMailCc(String[] mailCc) {
this.mailCc = mailCc;
}
public String getMailTemp() {
return mailTemp;
}
public void setMailTemp(String mailTemp) {
this.mailTemp = mailTemp;
}
}
package com.ai.cloud.skywalking.alarm.model;
public class UrlInfo {
private String urlCall;
private String method;
public String getUrlCall() {
return urlCall;
}
public void setUrlCall(String urlCall) {
this.urlCall = urlCall;
}
public String getMethod() {
return method;
}
public void setMethod(String method) {
this.method = method;
}
}
......@@ -3,9 +3,10 @@ package com.ai.cloud.skywalking.alarm.model;
import java.util.List;
public class UserInfo {
private String userId;
private List<ApplicationInfo> applicationInfos;
private List<AlarmRule> rules;
public UserInfo(String userId) {
this.userId = userId;
......@@ -15,11 +16,11 @@ public class UserInfo {
return userId;
}
public List<ApplicationInfo> getApplicationInfos() {
return applicationInfos;
public List<AlarmRule> getRules() {
return rules;
}
public void setApplicationInfos(List<ApplicationInfo> applicationInfos) {
this.applicationInfos = applicationInfos;
public void setRules(List<AlarmRule> rules) {
this.rules = rules;
}
}
package com.ai.cloud.skywalking.alarm.model.parameter;
import java.util.Collection;
import java.util.Set;
public class Application {
private String applicationId;
private Set<String> traceIds;
public Application(String applicationId) {
this.applicationId = applicationId;
}
public String getApplicationId() {
return applicationId;
}
public void setApplicationId(String applicationId) {
this.applicationId = applicationId;
}
public Set<String> getTraceIds() {
return traceIds;
}
public void setTraceIds(Set<String> traceIds) {
this.traceIds = traceIds;
}
}
......@@ -50,13 +50,6 @@ public class AlarmMessageProcessor {
// 没有数据需要发送
if (sentData.size() <= 0) {
// 清理数据
for (ApplicationInfo applicationInfo : rule.getApplicationInfos()) {
for (int i = 0; i < currentFireTimeM - rule.getPreviousFireTimeM(); i++) {
expiredAlarmMessage(generateAlarmKey(userInfo.getUserId(),
applicationInfo.getAppCode(), i));
}
}
// 修改-保存上次处理时间
rule.setPreviousFireTimeM(currentFireTimeM);
savePreviousFireTime(userInfo.getUserId(), rule.getRuleId(), currentFireTimeM);
......
package com.ai.cloud.skywalking.alarm.redis;
import com.ai.cloud.skywalking.alarm.conf.Config;
import com.ai.cloud.skywalking.alarm.model.ApplicationInfo;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.exceptions.JedisConnectionException;
import java.util.ArrayList;
import java.util.List;
public class RedisUtil {
private static Logger logger = LogManager.getLogger(RedisUtil.class);
private static JedisPool jedisPool;
private static String[] config;
private static RedisInspector connector = new RedisInspector();
private static Object lock = new Object();
static {
GenericObjectPoolConfig genericObjectPoolConfig = buildGenericObjectPoolConfig();
String redisServerConfig = Config.Alarm.REDIS_SERVER;
if (redisServerConfig == null || redisServerConfig.length() <= 0) {
logger.error("Redis server is not setting. Switch off alarm module. ");
} else {
config = redisServerConfig.split(":");
if (config.length != 2) {
logger.error("Redis server address is illegal setting, need to be 'ip:port'. Switch off alarm module. ");
Config.Alarm.ALARM_OFF_FLAG = true;
} else {
jedisPool = new JedisPool(genericObjectPoolConfig, config[0],
Integer.valueOf(config[1]));
// Test connect redis.
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
} catch (Exception e) {
handleFailedToConnectRedisServerException(e);
logger.error("can't connect to redis["
+ Config.Alarm.REDIS_SERVER + "]", e);
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}
}
public static List<String> getAlarmMessage(ApplicationInfo applicationInfo) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
return new ArrayList<String>(jedis.hgetAll(generateAlarmKey(applicationInfo)).values());
} catch (Exception e) {
handleFailedToConnectRedisServerException(e);
logger.error("Failed to set data.", e);
} finally {
if (jedis != null) {
jedis.close();
}
}
return new ArrayList<String>();
}
private static String generateAlarmKey(ApplicationInfo applicationInfo) {
return applicationInfo.getUId() + "-" + applicationInfo.getAppId() + "-"
+ ((System.currentTimeMillis() / (10000 * 6))
- applicationInfo.getConfigArgsDescriber().getPeriod());
}
private static GenericObjectPoolConfig buildGenericObjectPoolConfig() {
GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
genericObjectPoolConfig.setTestOnBorrow(true);
genericObjectPoolConfig.setMaxIdle(Config.Alarm.REDIS_MAX_IDLE);
genericObjectPoolConfig.setMinIdle(Config.Alarm.REDIS_MIN_IDLE);
genericObjectPoolConfig.setMaxTotal(Config.Alarm.REDIS_MAX_TOTAL);
return genericObjectPoolConfig;
}
private static void handleFailedToConnectRedisServerException(Exception e) {
if (e instanceof JedisConnectionException) {
// 发生连接不上Redis
if (connector == null || !connector.isAlive()) {
synchronized (lock) {
if (!connector.isAlive()) {
// 启动巡检线程
connector.start();
}
}
}
}
}
private static class RedisInspector extends Thread {
@Override
public void run() {
logger.debug("Connecting to redis....");
Jedis jedis;
while (true) {
try {
jedisPool = new JedisPool(buildGenericObjectPoolConfig(),
config[0], Integer.valueOf(config[1]));
jedis = jedisPool.getResource();
jedis.get("ok");
break;
} catch (Exception e) {
if (e instanceof JedisConnectionException) {
try {
Thread.sleep(5000L);
} catch (InterruptedException e1) {
logger.error("Sleep failed", e);
}
continue;
}
}
}
logger.debug("Connected to redis success. Open alarm function.");
Config.Alarm.ALARM_OFF_FLAG = false;
// 清理当前线程
connector = null;
}
}
}
......@@ -53,7 +53,7 @@ public class ZKUtil {
}
}
public static List<String> selectAllServerIds() {
public static List<String> selectAllThreadIds() {
try {
return client.getChildren().forPath(getRegisterServerPathPrefix());
} catch (Exception e) {
......
mail.host=mail.asiainfo.com
mail.transport.protocol=smtp
mail.smtp.auth=true
mail.username=zhangxin10
mail.password=!qaz2wsx
mail.account.prefix=@asiainfo.com
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册