提交 da7d1fcc 编写于 作者: Z zhangxin10

完成告警协调器的功能

上级 0496938f
package com.ai.cloud.skywalking.alarm;
import com.ai.cloud.skywalking.alarm.conf.Config;
import com.ai.cloud.skywalking.alarm.dao.AlarmMessageDao;
import com.ai.cloud.skywalking.alarm.model.AlarmRule;
import com.ai.cloud.skywalking.alarm.model.ProcessThreadStatus;
import com.ai.cloud.skywalking.alarm.model.UserInfo;
import com.ai.cloud.skywalking.alarm.zk.ZKUtil;
import com.ai.cloud.skywalking.alarm.procesor.AlarmMessageProcessor;
import com.ai.cloud.skywalking.alarm.util.ProcessUtil;
import com.ai.cloud.skywalking.alarm.util.ZKUtil;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
public class AlarmMessageProcessThread extends Thread {
private int rank;
private String threadId;
private Logger logger = LogManager.getLogger(AlarmMessageProcessThread.class);
private boolean isChanged = false;
private List<InterProcessMutex> locks;
private List<UserInfo> toBeProcessUsers;
private String threadId;
private ProcessThreadStatus status;
private String[] processUserIds;
private List<InterProcessMutex> usersLocks;
private CoordinatorStatusWatcher watcher = new CoordinatorStatusWatcher();
public AlarmMessageProcessThread() {
// 初始化生成ThreadId
threadId = UUID.randomUUID().toString();
registerServer(threadId);
rank = ballot(threadId);
toBeProcessUsers = getToBeProcessUsers();
}
@Override
public void run() {
lockAllUsers();
Set<String> traceIds;
Set<String> toBeSenderTraceId;
//注册服务(默认为空闲状态)
registerProcessThread(threadId, ProcessThreadStatus.FREE);
while (true) {
for (UserInfo userInfo : toBeProcessUsers) {
traceIds = new HashSet<String>();
// for (ApplicationInfo applicationInfo : userInfo.getApplicationInfos()) {
// toBeSenderTraceId = RedisUtil.getAlarmMessage(applicationInfo);
// toBeSenderTraceId.removeAll(traceIds);
// if (toBeSenderTraceId == null || toBeSenderTraceId.size() <= 0){
// continue;
// }else{
//
// }
// }
//检查是否为忙碌状态
if (status == ProcessThreadStatus.BUSY) {
//处理告警信息
for (String userId : processUserIds) {
List<AlarmRule> rules = AlarmMessageDao.selectAlarmRulesByUserId(userId);
UserInfo userInfo = AlarmMessageDao.selectUser(userId);
for (AlarmRule rule : rules) {
new AlarmMessageProcessor().process(userInfo, rule);
}
}
}
//检查是否分配线程的状态(重新分配状态)
if (status == ProcessThreadStatus.REDISTRIBUTING) {
// 释放用户锁
releaseUserLock();
// 修改自身状态:(空闲状态)
status = ProcessThreadStatus.FREE;
ProcessUtil.changeProcessThreadStatus(threadId, ProcessThreadStatus.FREE);
}
if (isChanged) {
unlockAllUsers();
rank = ballot(threadId);
toBeProcessUsers = getToBeProcessUsers();
lockAllUsers();
isChanged = false;
//检查分配线程的状态(分配完成状态)
if (status == ProcessThreadStatus.REDISTRIBUTE_SUCCESS) {
// 获取待处理的用户
processUserIds = acquireProcessedUsers();
// 给用户加锁
lockUser(processUserIds);
// 修改自身状态 :(忙碌状态)
status = ProcessThreadStatus.BUSY;
ProcessUtil.changeProcessThreadStatus(threadId, ProcessThreadStatus.BUSY);
}
try {
Thread.sleep(1000L);
Thread.sleep(Config.ProcessThread.THREAD_WAIT_INTERVAL);
} catch (InterruptedException e) {
logger.error("Sleep failed", e);
logger.error("Sleep failed.", e);
}
}
}
private void unlockAllUsers() {
for (InterProcessMutex lock : locks) {
try {
lock.release();
} catch (Exception e) {
logger.error("Failed to release lock[{}]", e);
}
private String[] acquireProcessedUsers() {
String path = Config.ZKPath.REGISTER_SERVER_PATH + "/" + threadId;
String value = ZKUtil.getPathData(path);
String[] valueArrays = value.split("@");
String toBeProcessUserId;
if (valueArrays.length < 2) {
toBeProcessUserId = "";
} else {
toBeProcessUserId = valueArrays[1];
}
return toBeProcessUserId.split(";");
}
private void lockAllUsers() {
locks = new ArrayList<InterProcessMutex>(toBeProcessUsers.size());
private void lockUser(String[] userIds) {
usersLocks = new ArrayList<InterProcessMutex>();
String userLockPath = Config.ZKPath.USER_REGISTER_LOCK_PATH + "/";
InterProcessMutex tmpLock;
for (UserInfo userInfo : toBeProcessUsers) {
tmpLock = ZKUtil.getProcessUserLock(userInfo.getUserId());
locks.add(tmpLock);
for (String userId : userIds) {
tmpLock = ZKUtil.getLock(userLockPath + userId);
try {
tmpLock.acquire();
} catch (Exception e) {
logger.error("Failed to lock ");
logger.error("Failed to lock user[{}]", userId, e);
//TODO 锁失败,该怎么处理?
}
usersLocks.add(tmpLock);
}
}
private List<UserInfo> getToBeProcessUsers() {
List<UserInfo> userInfos = AlarmMessageDao.selectAllUserInfo();
List<String> allThreadIds = ZKUtil.selectAllThreadIds();
int step = (int) Math.ceil(userInfos.size() * 1.0 / allThreadIds.size());
int start = rank * step;
int end = (rank + 1) * step;
if (end > userInfos.size()) {
return new ArrayList<UserInfo>();
private void releaseUserLock() {
for (InterProcessMutex lock : usersLocks) {
try {
lock.release();
} catch (Exception e) {
//
logger.error("Failed to release lock user.", e);
//TODO 释放锁,该怎么处理。
}
}
List<UserInfo> toBeProcessUsers = userInfos.subList(start, end);
for (UserInfo userInfo : toBeProcessUsers) {
// userInfo.setApplicationInfos(AlarmMessageDao.selectAlarmRulesByUserId(userInfo.getUserId()));
}
return toBeProcessUsers;
usersLocks.clear();
}
private void registerServer(String serverId) {
InterProcessMutex registerLock = ZKUtil.getRegisterLock();
private void registerProcessThread(String threadId, ProcessThreadStatus status) {
try {
registerLock.acquire();
ZKUtil.register(serverId);
String registerPath = Config.ZKPath.REGISTER_SERVER_PATH + "/" + threadId;
String registerValue = status + "@ ";
ZKUtil.getZkClient().create().creatingParentsIfNeeded()
.forPath(registerPath, registerValue.getBytes());
this.status = status;
ZKUtil.getPathDataWithWatch(Config.ZKPath.REGISTER_SERVER_PATH + "/" + threadId, watcher);
} catch (Exception e) {
logger.error("Failed to lock.", e);
} finally {
if (registerLock != null) {
try {
registerLock.release();
} catch (Exception e) {
logger.error("Failed to release lock.", e);
}
}
logger.error("Failed to register process thread.", e);
}
}
private int ballot(String threadId) {
List<String> serverIds = ZKUtil.selectAllThreadIds();
int rank = 0;
for (String tmpServerId : serverIds) {
if (tmpServerId.hashCode() < threadId.hashCode()) {
rank++;
private class CoordinatorStatusWatcher implements CuratorWatcher {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getType() == Watcher.Event.EventType.NodeDataChanged) {
String value = ZKUtil.getPathData(Config.ZKPath.COORDINATOR_STATUS_PATH);
status = ProcessThreadStatus.convert(value);
}
ZKUtil.getPathDataWithWatch(Config.ZKPath.REGISTER_SERVER_PATH + "/" + threadId, watcher);
}
return rank;
}
public void setChanged(boolean changed) {
isChanged = changed;
}
}
package com.ai.cloud.skywalking.alarm;
import com.ai.cloud.skywalking.alarm.conf.Config;
import com.ai.cloud.skywalking.alarm.zk.ZKUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -29,7 +28,6 @@ public class AlarmProcessServer {
new UserInfoInspector().start();
logger.info("Start user inspector thread success....");
logger.info("Alarm process server successfully started.");
ZKUtil.watch(AlarmServerRegisterWatcher.getInstance());
while (true) {
try {
Thread.sleep(Config.Server.DAEMON_THREAD_WAIT_INTERVAL);
......
package com.ai.cloud.skywalking.alarm;
import com.ai.cloud.skywalking.alarm.zk.ZKUtil;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
public class AlarmServerRegisterWatcher implements CuratorWatcher {
private static AlarmServerRegisterWatcher watcher = new AlarmServerRegisterWatcher();
@Override
public void process(WatchedEvent watchedEvent) throws Exception {
if (watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
for (AlarmMessageProcessThread thread : AlarmProcessServer.getProcessThreads()) {
thread.setChanged(true);
}
}
ZKUtil.watch(getInstance());
}
public static AlarmServerRegisterWatcher getInstance() {
return watcher;
}
}
package com.ai.cloud.skywalking.alarm;
import com.ai.cloud.skywalking.alarm.conf.Config;
import com.ai.cloud.skywalking.alarm.dao.AlarmMessageDao;
import com.ai.cloud.skywalking.alarm.model.ProcessThreadStatus;
import com.ai.cloud.skywalking.alarm.model.ProcessThreadValue;
import com.ai.cloud.skywalking.alarm.util.ProcessUtil;
import com.ai.cloud.skywalking.alarm.util.ZKUtil;
import com.google.gson.Gson;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class UserInfoCoordinator extends Thread {
private Logger logger = LogManager.getLogger(UserInfoInspector.class);
private boolean redistributing;
private boolean newServerComingFlag = false;
public UserInfoCoordinator() {
redistributing = false;
}
@Override
public void run() {
while (true) {
//检查是否有新服务注册或者在重分配过程做有新处理线程启动了
if (!redistributing || !newServerComingFlag) {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
logger.error("Sleep error", e);
}
continue;
}
// 设置正在处理的标志位
redistributing = true;
newServerComingFlag = false;
//获取当前所有的注册的处理线程
List<String> registeredThreads = acquireAllRegisteredThread();
//修改状态 (开始重新分配状态)
changeStatus(registeredThreads, ProcessThreadStatus.REDISTRIBUTING);
//检查所有的服务是否都处于空闲状态
while (!isAllProcessThreadFree(registeredThreads)) {
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
logger.error("Sleep failed", e);
}
}
//查询当前有多少用户
List<String> users = AlarmMessageDao.selectAllUserIds();
//将用户重新分配给服务
allocationUser(registeredThreads, users);
//修改状态(分配完成)
changeStatus(registeredThreads, ProcessThreadStatus.REDISTRIBUTE_SUCCESS);
}
}
private void allocationUser(List<String> registeredThreads, List<String> userIds) {
Set<String> sortThreadIds = new HashSet<String>(registeredThreads);
int step = (int) Math.ceil(userIds.size() * 1.0 / sortThreadIds.size());
int start = 0;
int end = step;
if (end > userIds.size()) {
end = userIds.size();
}
for (String thread : sortThreadIds) {
String value = ZKUtil.getPathData(Config.ZKPath.REGISTER_SERVER_PATH + "/" + thread);
ProcessThreadValue value1 = new Gson().fromJson(value, ProcessThreadValue.class);
value1.setDealUserIds(userIds.subList(start, end));
ZKUtil.setPathData(Config.ZKPath.REGISTER_SERVER_PATH + "/" + thread, new Gson().toJson(value1));
start = end;
end += step;
if (end > userIds.size()) {
break;
}
}
}
private List<String> acquireAllRegisteredThread() {
return ZKUtil.getChildren(Config.ZKPath.REGISTER_SERVER_PATH);
}
private boolean isAllProcessThreadFree(List<String> registeredThreadIds) {
String registerPathPrefix = Config.ZKPath.REGISTER_SERVER_PATH + "/";
for (String threadId : registeredThreadIds) {
if (getProcessThreadStatus(registerPathPrefix, threadId)
!= ProcessThreadStatus.FREE) {
return false;
}
}
return true;
}
private ProcessThreadStatus getProcessThreadStatus(String registerPathPrefix, String threadId) {
String value = ZKUtil.getPathData(registerPathPrefix + threadId);
ProcessThreadValue value1 = new Gson().fromJson(value, ProcessThreadValue.class);
return ProcessThreadStatus.convert(value1.getStatus());
}
private void changeStatus(List<String> registeredThreadIds, ProcessThreadStatus status) {
for (String threadId : registeredThreadIds) {
ProcessUtil.changeProcessThreadStatus(threadId, status);
}
}
public class RegisterServerWatcher implements CuratorWatcher {
@Override
public void process(WatchedEvent watchedEvent) throws Exception {
if (watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
if (redistributing) {
newServerComingFlag = true;
} else {
redistributing = true;
}
}
}
}
}
......@@ -29,7 +29,7 @@ public class UserInfoInspector extends Thread {
if (currentUserSize != preUserSize) {
logger.info("Total user has been changed. Notice all process thread to change process date.");
for (AlarmMessageProcessThread thread : AlarmProcessServer.getProcessThreads()) {
thread.setChanged(true);
//thread.setChanged(true);
}
}
}
......
......@@ -10,6 +10,10 @@ public class Config {
}
public static class ProcessThread {
public static long THREAD_WAIT_INTERVAL = 60 * 1000L;
}
public static class ZKPath {
public static String CONNECT_STR = "127.0.0.1:2181";
......@@ -22,9 +26,9 @@ public class Config {
public static String NODE_PREFIX = "/skywalking";
public static String SERVER_REGISTER_LOCK_PATH = "/alarm-server/register";
public static String COORDINATOR_STATUS_PATH = "/alarm-server/coordinator/status";
public static String REGISTER_SERVER_PATH = "/alarm-server/servers";
public static String REGISTER_SERVER_PATH = "/alarm-server/register-servers";
public static String USER_REGISTER_LOCK_PATH = "/alarm-server/users";
}
......@@ -54,16 +58,7 @@ public class Config {
public static boolean ALARM_OFF_FLAG = false;
}
public static class MailSender {
public static String HOST = "";
public static String TRANSPORT_PROTOCOL = "";
public static boolean SMTP_AUTH = true;
public static String USER_NAME = "";
public static String PASSWORD = "";
public static class MailSenderInfo {
public static String configId = "1000";
}
}
\ No newline at end of file
package com.ai.cloud.skywalking.alarm.dao;
import com.ai.cloud.skywalking.alarm.conf.Config;
import com.ai.cloud.skywalking.alarm.model.AlarmRule;
import com.ai.cloud.skywalking.alarm.model.ApplicationInfo;
import com.ai.cloud.skywalking.alarm.model.UserInfo;
import com.ai.cloud.skywalking.alarm.util.DBConnectUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.*;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
public class AlarmMessageDao {
private static Logger logger = LogManager.getLogger(AlarmMessageDao.class);
private static Connection con;
static {
try {
Class.forName(Config.DB.DRIVER_CLASS);
} catch (ClassNotFoundException e) {
logger.error("Failed to found DB driver class.", e);
System.exit(-1);
}
public static List<String> selectAllUserIds() {
List<String> result = new ArrayList<String>();
try {
con = DriverManager.getConnection(Config.DB.URL, Config.DB.USER_NAME, Config.DB.PASSWORD);
} catch (SQLException e) {
logger.error("Failed to connect DB", e);
System.exit(-1);
}
}
public static List<UserInfo> selectAllUserInfo() {
List<UserInfo> result = new ArrayList<UserInfo>();
try {
PreparedStatement ps = con.prepareStatement("SELECT user_info.uid FROM user_info WHERE sts = ?");
PreparedStatement ps = DBConnectUtil.getConnection().prepareStatement("SELECT user_info.uid FROM user_info WHERE sts = ?");
ps.setString(1, "A");
ResultSet rs = ps.executeQuery();
UserInfo userInfo;
while (rs.next()) {
userInfo = new UserInfo(rs.getString("uid"));
result.add(userInfo);
result.add(rs.getString("uid"));
}
} catch (SQLException e) {
logger.error("Failed to select all user info", e);
......@@ -52,7 +36,7 @@ public class AlarmMessageDao {
public static int selectUserCount() {
try {
PreparedStatement ps = con.prepareStatement("SELECT count(user_info.uid) as totalNumber FROM user_info WHERE sts = ?");
PreparedStatement ps = DBConnectUtil.getConnection().prepareStatement("SELECT count(user_info.uid) as totalNumber FROM user_info WHERE sts = ?");
ps.setString(1, "A");
ResultSet rs = ps.executeQuery();
rs.next();
......@@ -64,11 +48,27 @@ public class AlarmMessageDao {
return 0;
}
public static UserInfo selectUser(String userId) {
UserInfo userInfo = null;
try {
PreparedStatement ps = DBConnectUtil.getConnection().prepareStatement("SELECT user_info.uid,user_info.user_name FROM user_info WHERE sts = ?");
ps.setString(1, "A");
ResultSet rs = ps.executeQuery();
rs.next();
userInfo = new UserInfo(rs.getString("uid"));
userInfo.setUserName(rs.getString("user_name"));
} catch (SQLException e) {
logger.error("Failed to select all user info", e);
}
return userInfo;
}
public static List<AlarmRule> selectAlarmRulesByUserId(String userId) {
List<AlarmRule> rules = new ArrayList<AlarmRule>();
List<ApplicationInfo> selfDefineApplications = new ArrayList<ApplicationInfo>();
try {
PreparedStatement ps = con.prepareStatement("SELECT alarm_rule.app_id,alarm_rule.rule_id, alarm_rule.uid,alarm_rule.is_global, alarm_rule.todo_type," +
PreparedStatement ps = DBConnectUtil.getConnection().prepareStatement("SELECT alarm_rule.app_id,alarm_rule.rule_id, alarm_rule.uid,alarm_rule.is_global, alarm_rule.todo_type," +
" alarm_rule.config_args FROM alarm_rule WHERE uid = ? AND sts = ?");
ps.setString(1, userId);
ps.setString(2, "A");
......@@ -104,7 +104,8 @@ public class AlarmMessageDao {
}
List<ApplicationInfo> allApplication = new ArrayList<ApplicationInfo>();
ps = con.prepareStatement("SELECT application_info.app_id, application_info.uid, app_code FROM application_info WHERE uid = ? AND sts = ?");
ps = DBConnectUtil.getConnection()
.prepareStatement("SELECT application_info.app_id, application_info.uid, app_code FROM application_info WHERE uid = ? AND sts = ?");
ps.setString(1, userId);
ps.setString(2, "A");
rs = ps.executeQuery();
......
package com.ai.cloud.skywalking.alarm.dao;
import com.ai.cloud.skywalking.alarm.util.DBConnectUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
public class SystemConfigDao {
private static Logger logger = LogManager.getLogger(AlarmMessageDao.class);
public static String getMailSenderInfo(String configId) throws SQLException {
PreparedStatement ps = DBConnectUtil.getConnection().prepareStatement(
"SELECT system_config.conf_value FROM system_config WHERE system_config.sts = " +
"? AND system_config.config_id = ?");
ps.setString(1, "A");
ps.setString(2, configId);
ResultSet resultSet = ps.executeQuery();
resultSet.next();
return resultSet.getString("conf_value");
}
}
package com.ai.cloud.skywalking.alarm.model;
public enum ProcessThreadStatus {
REDISTRIBUTING("1"), REDISTRIBUTE_SUCCESS("2"), FREE("0"), BUSY("3");
private String value;
ProcessThreadStatus(String value) {
this.value = value;
}
public String getValue() {
return value;
}
public static ProcessThreadStatus convert(String value) {
ProcessThreadStatus status;
switch (value) {
case "0":
status = REDISTRIBUTING;
break;
case "1":
status = REDISTRIBUTE_SUCCESS;
break;
default:
throw new IllegalArgumentException("Coordinator status illegal");
}
return status;
}
}
package com.ai.cloud.skywalking.alarm.model;
import java.util.List;
public class ProcessThreadValue {
private String status;
private List<String> dealUserIds;
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public List<String> getDealUserIds() {
return dealUserIds;
}
public void setDealUserIds(List<String> dealUserIds) {
this.dealUserIds = dealUserIds;
}
}
package com.ai.cloud.skywalking.alarm.model;
import java.util.List;
public class UserInfo {
private String userId;
private List<AlarmRule> rules;
private String userName;
public UserInfo(String userId) {
this.userId = userId;
......@@ -16,11 +14,11 @@ public class UserInfo {
return userId;
}
public List<AlarmRule> getRules() {
return rules;
public String getUserName() {
return userName;
}
public void setRules(List<AlarmRule> rules) {
this.rules = rules;
public void setUserName(String userName) {
this.userName = userName;
}
}
package com.ai.cloud.skywalking.alarm.model.parameter;
import java.util.Collection;
import java.util.Set;
public class Application {
private String applicationId;
private Set<String> traceIds;
public Application(String applicationId) {
this.applicationId = applicationId;
}
public String getApplicationId() {
return applicationId;
}
public void setApplicationId(String applicationId) {
this.applicationId = applicationId;
}
public Set<String> getTraceIds() {
return traceIds;
}
public void setTraceIds(Set<String> traceIds) {
this.traceIds = traceIds;
}
}
package com.ai.cloud.skywalking.alarm.procesor;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import redis.clients.jedis.Jedis;
import com.ai.cloud.skywalking.alarm.dao.AlarmMessageDao;
import com.ai.cloud.skywalking.alarm.model.AlarmRule;
import com.ai.cloud.skywalking.alarm.model.ApplicationInfo;
import com.ai.cloud.skywalking.alarm.model.MailInfo;
import com.ai.cloud.skywalking.alarm.model.UserInfo;
import com.ai.cloud.skywalking.alarm.util.MailUtil;
import com.ai.cloud.skywalking.alarm.util.RedisUtil;
import freemarker.template.Configuration;
import freemarker.template.Template;
import freemarker.template.TemplateException;
import freemarker.template.Version;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import redis.clients.jedis.Jedis;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.text.SimpleDateFormat;
import java.util.*;
public class AlarmMessageProcessor {
private static Logger logger = LogManager
.getLogger(AlarmMessageProcessor.class);
public void process(UserInfo userInfo, AlarmRule rule) {
Set<String> warningTracingIds = new HashSet<String>();
Set<String> warningMessageKeys = new HashSet<String>();
long currentFireMinuteTime = System.currentTimeMillis() / (10000 * 6);
long warningTimeWindowSize = currentFireMinuteTime
- rule.getPreviousFireTimeM();
// 获取待发送数据
if (warningTimeWindowSize >= rule.getConfigArgsDescriber().getPeriod()) {
for (ApplicationInfo applicationInfo : rule.getApplicationInfos()) {
for (int period = 0; period < warningTimeWindowSize; period++) {
String alarmKey = userInfo.getUserId()
+ "-"
+ applicationInfo.getAppCode()
+ "-"
+ ((System.currentTimeMillis() / (10000 * 6)) - period);
warningMessageKeys.add(alarmKey);
warningTracingIds.addAll(getAlarmMessages(alarmKey));
}
}
// 发送告警数据
if (warningTracingIds.size() > 0) {
if ("0".equals(rule.getTodoType())) {
// 发送邮件
String subjects = generateSubject(warningTracingIds.size(),
rule.getPreviousFireTimeM(), currentFireMinuteTime);
Map parameter = new HashMap();
// TODO:已使用新的参数,warningTracingIds包含所有的告警tracingId,需要在模板中生成链接
parameter.put("warningTracingIds", warningTracingIds);
// TODO:请转换为USERNAME,ID无法识别
parameter.put("name", userInfo.getUserId());
String mailContext = generateContent(rule
.getConfigArgsDescriber().getMailInfo()
.getMailTemp(), parameter);
if (mailContext.length() > 0) {
MailInfo mailInfo = rule.getConfigArgsDescriber()
.getMailInfo();
MailUtil.sendMail(mailInfo.getMailTo(),
mailInfo.getMailCc(), mailContext, subjects);
}
}
}
// 清理数据
for (String toBeRemovedKey : warningMessageKeys) {
expiredAlarmMessage(toBeRemovedKey);
}
// 修改-保存上次处理时间
dealPreviousFireTime(userInfo, rule, currentFireMinuteTime);
}
}
private void dealPreviousFireTime(UserInfo userInfo, AlarmRule rule,
long currentFireMinuteTime) {
rule.setPreviousFireTimeM(currentFireMinuteTime);
savePreviousFireTime(userInfo.getUserId(), rule.getRuleId(),
currentFireMinuteTime);
}
private String generateSubject(int count, long startTime, long endTime) {
String title = "[Warning] There were "
+ count
+ " alarm information between "
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(
startTime))
+ " to "
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(
endTime));
return title;
}
private void expiredAlarmMessage(String key) {
Jedis client = RedisUtil.getRedisClient();
client.expire(key, 0);
if (client != null) {
client.close();
}
}
private void savePreviousFireTime(String userId, String ruleId,
long currentFireMinuteTime) {
Jedis client = RedisUtil.getRedisClient();
client.hset(userId, ruleId, String.valueOf(currentFireMinuteTime));
if (client != null) {
client.close();
}
}
private Collection<String> getAlarmMessages(String key) {
Jedis client = RedisUtil.getRedisClient();
Map<String, String> result = client.hgetAll(key);
if (result == null) {
return new ArrayList<String>();
}
client.close();
return result.values();
}
private String generateContent(String templateStr, Map parameter) {
Configuration cfg = new Configuration(new Version("2.3.23"));
cfg.setDefaultEncoding("UTF-8");
Template t = null;
try {
t = new Template(null, new StringReader(templateStr), cfg);
StringWriter out = new StringWriter();
t.process(parameter, out);
return out.getBuffer().toString();
} catch (IOException e) {
logger.error("Template illegal.", e);
} catch (TemplateException e) {
logger.error("Failed to generate content.", e);
}
return "";
}
private static Logger logger = LogManager
.getLogger(AlarmMessageProcessor.class);
public void process(UserInfo userInfo, AlarmRule rule) {
Set<String> warningTracingIds = new HashSet<String>();
Set<String> warningMessageKeys = new HashSet<String>();
long currentFireMinuteTime = System.currentTimeMillis() / (10000 * 6);
long warningTimeWindowSize = currentFireMinuteTime
- rule.getPreviousFireTimeM();
// 获取待发送数据
if (warningTimeWindowSize >= rule.getConfigArgsDescriber().getPeriod()) {
for (ApplicationInfo applicationInfo : rule.getApplicationInfos()) {
for (int period = 0; period < warningTimeWindowSize; period++) {
String alarmKey = userInfo.getUserId()
+ "-"
+ applicationInfo.getAppCode()
+ "-"
+ (currentFireMinuteTime - period);
warningMessageKeys.add(alarmKey);
warningTracingIds.addAll(getAlarmMessages(alarmKey));
}
}
// 发送告警数据
if (warningTracingIds.size() > 0) {
if ("0".equals(rule.getTodoType())) {
// 发送邮件
String subjects = generateSubject(warningTracingIds.size(),
rule.getPreviousFireTimeM(), currentFireMinuteTime);
Map parameter = new HashMap();
parameter.put("warningTracingIds", warningTracingIds);
parameter.put("name", userInfo.getUserName());
parameter.put("startDate", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(
rule.getPreviousFireTimeM() * 10000 * 6)));
parameter.put("endDate", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(
currentFireMinuteTime * 10000 * 6)));
//TODO portalAddr需要初始化
parameter.put("portalAddr", "http://127.0.0.1:8080/skywalking-webui/");
String mailContext = generateContent(rule
.getConfigArgsDescriber().getMailInfo()
.getMailTemp(), parameter);
if (mailContext.length() > 0) {
MailInfo mailInfo = rule.getConfigArgsDescriber()
.getMailInfo();
MailUtil.sendMail(mailInfo.getMailTo(),
mailInfo.getMailCc(), mailContext, subjects);
}
}
}
// 清理数据
for (String toBeRemovedKey : warningMessageKeys) {
expiredAlarmMessage(toBeRemovedKey);
}
// 修改-保存上次处理时间
dealPreviousFireTime(userInfo, rule, currentFireMinuteTime);
}
}
private void dealPreviousFireTime(UserInfo userInfo, AlarmRule rule,
long currentFireMinuteTime) {
rule.setPreviousFireTimeM(currentFireMinuteTime);
savePreviousFireTime(userInfo.getUserId(), rule.getRuleId(),
currentFireMinuteTime);
}
private String generateSubject(int count, long startTime, long endTime) {
String title = "[Warning] There were "
+ count
+ " alarm information between "
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(
startTime * 10000 * 6))
+ " to "
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(
endTime * 10000 * 6));
return title;
}
private void expiredAlarmMessage(String key) {
Jedis client = RedisUtil.getRedisClient();
client.expire(key, 0);
if (client != null) {
client.close();
}
}
private void savePreviousFireTime(String userId, String ruleId,
long currentFireMinuteTime) {
Jedis client = RedisUtil.getRedisClient();
client.hset(userId, ruleId, String.valueOf(currentFireMinuteTime));
if (client != null) {
client.close();
}
}
private Collection<String> getAlarmMessages(String key) {
Jedis client = RedisUtil.getRedisClient();
Map<String, String> result = client.hgetAll(key);
if (result == null) {
return new ArrayList<String>();
}
client.close();
return result.keySet();
}
private String generateContent(String templateStr, Map parameter) {
Configuration cfg = new Configuration(new Version("2.3.23"));
cfg.setDefaultEncoding("UTF-8");
Template t = null;
try {
t = new Template(null, new StringReader(templateStr), cfg);
StringWriter out = new StringWriter();
t.process(parameter, out);
return out.getBuffer().toString();
} catch (IOException e) {
logger.error("Template illegal.", e);
} catch (TemplateException e) {
logger.error("Failed to generate content.", e);
}
return "";
}
public static void main(String[] args) throws InterruptedException {
UserInfo userInfo = new UserInfo("27");
userInfo.setUserName("123");
List<AlarmRule> rules = AlarmMessageDao.selectAlarmRulesByUserId(userInfo.getUserId());
while (true) {
new AlarmMessageProcessor().process(userInfo, rules.get(0));
Thread.sleep(60 * 1000L);
}
}
}
package com.ai.cloud.skywalking.alarm.util;
import com.ai.cloud.skywalking.alarm.conf.Config;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
public class DBConnectUtil {
private static Logger logger = LogManager.getLogger(DBConnectUtil.class);
private static Connection con;
static {
try {
Class.forName(Config.DB.DRIVER_CLASS);
} catch (ClassNotFoundException e) {
logger.error("Failed to found DB driver class.", e);
System.exit(-1);
}
try {
con = DriverManager.getConnection(Config.DB.URL, Config.DB.USER_NAME, Config.DB.PASSWORD);
} catch (SQLException e) {
logger.error("Failed to connect DB", e);
System.exit(-1);
}
}
public static Connection getConnection() {
return con;
}
}
package com.ai.cloud.skywalking.alarm.util;
import com.ai.cloud.skywalking.alarm.conf.Config;
import com.ai.cloud.skywalking.alarm.dao.SystemConfigDao;
import com.google.gson.Gson;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -17,10 +20,11 @@ public class MailUtil {
private static String sendAccount;
private static Transport ts;
static {
try {
Properties prop = new Properties();
prop.load(MailUtil.class.getResourceAsStream("/mail/mail.config"));
String senderInfo = SystemConfigDao.getMailSenderInfo(Config.MailSenderInfo.configId);
Properties prop = new Gson().fromJson(senderInfo, Properties.class);
session = Session.getInstance(prop);
ts = session.getTransport();
ts.connect(prop.getProperty("mail.host"), prop.getProperty("mail.username"), prop.getProperty("mail.password"));
......
package com.ai.cloud.skywalking.alarm.util;
import com.ai.cloud.skywalking.alarm.conf.Config;
import com.ai.cloud.skywalking.alarm.model.ProcessThreadStatus;
import com.ai.cloud.skywalking.alarm.model.ProcessThreadValue;
import com.google.gson.Gson;
public class ProcessUtil {
public static void changeProcessThreadStatus(String threadId, ProcessThreadStatus status) {
String path = Config.ZKPath.REGISTER_SERVER_PATH + "/" + threadId;
String value = ZKUtil.getPathData(path);
ProcessThreadValue newValue = new Gson().fromJson(value, ProcessThreadValue.class);
newValue.setStatus(status.getValue());
ZKUtil.setPathData(path, new Gson().toJson(newValue));
}
}
package com.ai.cloud.skywalking.alarm.zk;
package com.ai.cloud.skywalking.alarm.util;
import com.ai.cloud.skywalking.alarm.AlarmServerRegisterWatcher;
import com.ai.cloud.skywalking.alarm.conf.Config;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.logging.log4j.LogManager;
......@@ -14,9 +14,7 @@ import java.util.ArrayList;
import java.util.List;
public class ZKUtil {
private static Logger logger = LogManager.getLogger(ZKUtil.class);
private static CuratorFramework client;
static {
......@@ -28,69 +26,53 @@ public class ZKUtil {
.connectionTimeoutMs(Config.ZKPath.CONNECT_TIMEOUT).retryPolicy(retryPolicy);
client = builder.build();
client.start();
createIfNotExists(getRegisterLockPath());
createIfNotExists(getUserLockPathPrefix());
} catch (Exception e) {
logger.error("Failed to connect zookeeper.", e);
System.exit(-1);
}
}
public static InterProcessMutex getRegisterLock() {
return new InterProcessMutex(client, getRegisterLockPath());
public static CuratorFramework getZkClient() {
return client;
}
public static InterProcessMutex getProcessUserLock(String uid) {
return new InterProcessMutex(client, getUserLockPath(uid));
public static InterProcessMutex getLock(String path) {
return new InterProcessMutex(client, path);
}
public static void register(String serverId) {
public static String getPathData(String path) {
try {
client.create().creatingParentsIfNeeded().forPath(getRegisterServerPathPrefix() + "/" + serverId);
return new String(client.getData().forPath(path));
} catch (Exception e) {
logger.error("Failed to register server", e);
logger.error("Failed to get the value of path[{}]", path, e);
}
return "";
}
public static List<String> selectAllThreadIds() {
public static String getPathDataWithWatch(String path, CuratorWatcher watcher) {
try {
return client.getChildren().forPath(getRegisterServerPathPrefix());
return new String(client.getData().usingWatcher(watcher).forPath(path));
} catch (Exception e) {
logger.error("Failed to get children of Path[" + getRegisterServerPathPrefix() + "].", e);
logger.error("Failed to get the value of path[{}]", path, e);
}
return new ArrayList<String>();
return "";
}
public static void watch(AlarmServerRegisterWatcher instance) {
public static void setPathData(String path, String value) {
try {
client.getChildren().usingWatcher(instance).forPath(getRegisterServerPathPrefix());
client.setData().forPath(path, value.getBytes());
} catch (Exception e) {
logger.error("Failed to get children for path[" + getRegisterServerPathPrefix() + "].", e);
logger.error("Failed to set date of path[{{}]", path, e);
}
}
private static String getRegisterLockPath() {
return Config.ZKPath.NODE_PREFIX + Config.ZKPath.SERVER_REGISTER_LOCK_PATH;
}
private static String getUserLockPath(String uid) {
return getUserLockPathPrefix() + "/" + uid;
}
private static String getUserLockPathPrefix() {
return Config.ZKPath.NODE_PREFIX + Config.ZKPath.USER_REGISTER_LOCK_PATH;
}
private static void createIfNotExists(String path) throws Exception {
if (client.checkExists().forPath(path) == null) {
client.create().creatingParentsIfNeeded().forPath(path);
public static List<String> getChildren(String registerServerPath) {
try {
return client.getChildren().forPath(registerServerPath);
} catch (Exception e) {
logger.error("Failed to get child nodes of path[{{}]", registerServerPath, e);
}
}
private static String getRegisterServerPathPrefix() {
return Config.ZKPath.NODE_PREFIX + Config.ZKPath.REGISTER_SERVER_PATH;
return new ArrayList<String>();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册