diff --git a/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/AlarmMessageProcessThread.java b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/AlarmMessageProcessThread.java index 2eba4b1deb9ea3c4e6d126e3880164daf05453e0..bdb7194260cacb296b684bdcbfb1457b89c24931 100644 --- a/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/AlarmMessageProcessThread.java +++ b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/AlarmMessageProcessThread.java @@ -1,141 +1,156 @@ package com.ai.cloud.skywalking.alarm; +import com.ai.cloud.skywalking.alarm.conf.Config; import com.ai.cloud.skywalking.alarm.dao.AlarmMessageDao; +import com.ai.cloud.skywalking.alarm.model.AlarmRule; +import com.ai.cloud.skywalking.alarm.model.ProcessThreadStatus; import com.ai.cloud.skywalking.alarm.model.UserInfo; -import com.ai.cloud.skywalking.alarm.zk.ZKUtil; +import com.ai.cloud.skywalking.alarm.procesor.AlarmMessageProcessor; +import com.ai.cloud.skywalking.alarm.util.ProcessUtil; +import com.ai.cloud.skywalking.alarm.util.ZKUtil; +import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; -import java.util.*; - +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; public class AlarmMessageProcessThread extends Thread { - private int rank; - private String threadId; + private Logger logger = LogManager.getLogger(AlarmMessageProcessThread.class); - private boolean isChanged = false; - private List locks; - private List toBeProcessUsers; + + private String threadId; + private ProcessThreadStatus status; + private String[] processUserIds; + private List usersLocks; + private CoordinatorStatusWatcher watcher = new CoordinatorStatusWatcher(); public AlarmMessageProcessThread() { + // 初始化生成ThreadId threadId = UUID.randomUUID().toString(); - registerServer(threadId); - rank = ballot(threadId); - toBeProcessUsers = getToBeProcessUsers(); } - @Override public void run() { - lockAllUsers(); - Set traceIds; - Set toBeSenderTraceId; + //注册服务(默认为空闲状态) + registerProcessThread(threadId, ProcessThreadStatus.FREE); while (true) { - for (UserInfo userInfo : toBeProcessUsers) { - traceIds = new HashSet(); -// for (ApplicationInfo applicationInfo : userInfo.getApplicationInfos()) { -// toBeSenderTraceId = RedisUtil.getAlarmMessage(applicationInfo); -// toBeSenderTraceId.removeAll(traceIds); -// if (toBeSenderTraceId == null || toBeSenderTraceId.size() <= 0){ -// continue; -// }else{ -// -// } -// } + //检查是否为忙碌状态 + if (status == ProcessThreadStatus.BUSY) { + //处理告警信息 + for (String userId : processUserIds) { + List rules = AlarmMessageDao.selectAlarmRulesByUserId(userId); + UserInfo userInfo = AlarmMessageDao.selectUser(userId); + for (AlarmRule rule : rules) { + new AlarmMessageProcessor().process(userInfo, rule); + } + } + } + //检查是否分配线程的状态(重新分配状态) + if (status == ProcessThreadStatus.REDISTRIBUTING) { + // 释放用户锁 + releaseUserLock(); + // 修改自身状态:(空闲状态) + status = ProcessThreadStatus.FREE; + ProcessUtil.changeProcessThreadStatus(threadId, ProcessThreadStatus.FREE); } - if (isChanged) { - unlockAllUsers(); - rank = ballot(threadId); - toBeProcessUsers = getToBeProcessUsers(); - lockAllUsers(); - isChanged = false; + //检查分配线程的状态(分配完成状态) + if (status == ProcessThreadStatus.REDISTRIBUTE_SUCCESS) { + // 获取待处理的用户 + processUserIds = acquireProcessedUsers(); + // 给用户加锁 + lockUser(processUserIds); + // 修改自身状态 :(忙碌状态) + status = ProcessThreadStatus.BUSY; + ProcessUtil.changeProcessThreadStatus(threadId, ProcessThreadStatus.BUSY); } try { - Thread.sleep(1000L); + Thread.sleep(Config.ProcessThread.THREAD_WAIT_INTERVAL); } catch (InterruptedException e) { - logger.error("Sleep failed", e); + logger.error("Sleep failed.", e); } } } - private void unlockAllUsers() { - for (InterProcessMutex lock : locks) { - try { - lock.release(); - } catch (Exception e) { - logger.error("Failed to release lock[{}]", e); - } + private String[] acquireProcessedUsers() { + String path = Config.ZKPath.REGISTER_SERVER_PATH + "/" + threadId; + String value = ZKUtil.getPathData(path); + String[] valueArrays = value.split("@"); + String toBeProcessUserId; + if (valueArrays.length < 2) { + toBeProcessUserId = ""; + } else { + toBeProcessUserId = valueArrays[1]; } + + return toBeProcessUserId.split(";"); } - private void lockAllUsers() { - locks = new ArrayList(toBeProcessUsers.size()); + private void lockUser(String[] userIds) { + usersLocks = new ArrayList(); + String userLockPath = Config.ZKPath.USER_REGISTER_LOCK_PATH + "/"; InterProcessMutex tmpLock; - for (UserInfo userInfo : toBeProcessUsers) { - tmpLock = ZKUtil.getProcessUserLock(userInfo.getUserId()); - locks.add(tmpLock); + for (String userId : userIds) { + tmpLock = ZKUtil.getLock(userLockPath + userId); try { tmpLock.acquire(); } catch (Exception e) { - logger.error("Failed to lock "); + logger.error("Failed to lock user[{}]", userId, e); + //TODO 锁失败,该怎么处理? } - + usersLocks.add(tmpLock); } } - private List getToBeProcessUsers() { - List userInfos = AlarmMessageDao.selectAllUserInfo(); - List allThreadIds = ZKUtil.selectAllThreadIds(); - int step = (int) Math.ceil(userInfos.size() * 1.0 / allThreadIds.size()); - int start = rank * step; - int end = (rank + 1) * step; - if (end > userInfos.size()) { - return new ArrayList(); + private void releaseUserLock() { + for (InterProcessMutex lock : usersLocks) { + try { + lock.release(); + } catch (Exception e) { + // + logger.error("Failed to release lock user.", e); + //TODO 释放锁,该怎么处理。 + } } - List toBeProcessUsers = userInfos.subList(start, end); - for (UserInfo userInfo : toBeProcessUsers) { - // userInfo.setApplicationInfos(AlarmMessageDao.selectAlarmRulesByUserId(userInfo.getUserId())); - } - return toBeProcessUsers; + usersLocks.clear(); } - - private void registerServer(String serverId) { - InterProcessMutex registerLock = ZKUtil.getRegisterLock(); + private void registerProcessThread(String threadId, ProcessThreadStatus status) { try { - registerLock.acquire(); - ZKUtil.register(serverId); + String registerPath = Config.ZKPath.REGISTER_SERVER_PATH + "/" + threadId; + String registerValue = status + "@ "; + ZKUtil.getZkClient().create().creatingParentsIfNeeded() + .forPath(registerPath, registerValue.getBytes()); + + this.status = status; + + + ZKUtil.getPathDataWithWatch(Config.ZKPath.REGISTER_SERVER_PATH + "/" + threadId, watcher); } catch (Exception e) { - logger.error("Failed to lock.", e); - } finally { - if (registerLock != null) { - try { - registerLock.release(); - } catch (Exception e) { - logger.error("Failed to release lock.", e); - } - } + logger.error("Failed to register process thread.", e); } } - private int ballot(String threadId) { - List serverIds = ZKUtil.selectAllThreadIds(); - int rank = 0; - for (String tmpServerId : serverIds) { - if (tmpServerId.hashCode() < threadId.hashCode()) { - rank++; + private class CoordinatorStatusWatcher implements CuratorWatcher { + + @Override + public void process(WatchedEvent watchedEvent) { + if (watchedEvent.getType() == Watcher.Event.EventType.NodeDataChanged) { + String value = ZKUtil.getPathData(Config.ZKPath.COORDINATOR_STATUS_PATH); + status = ProcessThreadStatus.convert(value); } + + ZKUtil.getPathDataWithWatch(Config.ZKPath.REGISTER_SERVER_PATH + "/" + threadId, watcher); } - return rank; } - public void setChanged(boolean changed) { - isChanged = changed; - } } diff --git a/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/AlarmProcessServer.java b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/AlarmProcessServer.java index 6efbbf4476236c53f6d0fba813651b4c3b8942d5..d845ab50c3ccb8b4344da2caa9a6f6adbd8e714a 100644 --- a/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/AlarmProcessServer.java +++ b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/AlarmProcessServer.java @@ -1,7 +1,6 @@ package com.ai.cloud.skywalking.alarm; import com.ai.cloud.skywalking.alarm.conf.Config; -import com.ai.cloud.skywalking.alarm.zk.ZKUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -29,7 +28,6 @@ public class AlarmProcessServer { new UserInfoInspector().start(); logger.info("Start user inspector thread success...."); logger.info("Alarm process server successfully started."); - ZKUtil.watch(AlarmServerRegisterWatcher.getInstance()); while (true) { try { Thread.sleep(Config.Server.DAEMON_THREAD_WAIT_INTERVAL); diff --git a/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/AlarmServerRegisterWatcher.java b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/AlarmServerRegisterWatcher.java deleted file mode 100644 index ab8499e14c43d6ede82ccf05a91af908d7b12fad..0000000000000000000000000000000000000000 --- a/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/AlarmServerRegisterWatcher.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.ai.cloud.skywalking.alarm; - -import com.ai.cloud.skywalking.alarm.zk.ZKUtil; -import org.apache.curator.framework.api.CuratorWatcher; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; - -public class AlarmServerRegisterWatcher implements CuratorWatcher { - - private static AlarmServerRegisterWatcher watcher = new AlarmServerRegisterWatcher(); - - @Override - public void process(WatchedEvent watchedEvent) throws Exception { - if (watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) { - for (AlarmMessageProcessThread thread : AlarmProcessServer.getProcessThreads()) { - thread.setChanged(true); - } - } - - ZKUtil.watch(getInstance()); - } - - public static AlarmServerRegisterWatcher getInstance() { - return watcher; - } -} diff --git a/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/UserInfoCoordinator.java b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/UserInfoCoordinator.java new file mode 100644 index 0000000000000000000000000000000000000000..7a8aa0de2c27c9bdd888de414c58f8f8622d7fe3 --- /dev/null +++ b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/UserInfoCoordinator.java @@ -0,0 +1,141 @@ +package com.ai.cloud.skywalking.alarm; + +import com.ai.cloud.skywalking.alarm.conf.Config; +import com.ai.cloud.skywalking.alarm.dao.AlarmMessageDao; +import com.ai.cloud.skywalking.alarm.model.ProcessThreadStatus; +import com.ai.cloud.skywalking.alarm.model.ProcessThreadValue; +import com.ai.cloud.skywalking.alarm.util.ProcessUtil; +import com.ai.cloud.skywalking.alarm.util.ZKUtil; +import com.google.gson.Gson; +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class UserInfoCoordinator extends Thread { + + + private Logger logger = LogManager.getLogger(UserInfoInspector.class); + + private boolean redistributing; + private boolean newServerComingFlag = false; + + public UserInfoCoordinator() { + redistributing = false; + } + + @Override + public void run() { + while (true) { + //检查是否有新服务注册或者在重分配过程做有新处理线程启动了 + if (!redistributing || !newServerComingFlag) { + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + logger.error("Sleep error", e); + } + + continue; + } + + // 设置正在处理的标志位 + redistributing = true; + newServerComingFlag = false; + + //获取当前所有的注册的处理线程 + List registeredThreads = acquireAllRegisteredThread(); + //修改状态 (开始重新分配状态) + changeStatus(registeredThreads, ProcessThreadStatus.REDISTRIBUTING); + //检查所有的服务是否都处于空闲状态 + while (!isAllProcessThreadFree(registeredThreads)) { + try { + Thread.sleep(100L); + } catch (InterruptedException e) { + logger.error("Sleep failed", e); + } + } + + //查询当前有多少用户 + List users = AlarmMessageDao.selectAllUserIds(); + + //将用户重新分配给服务 + allocationUser(registeredThreads, users); + + //修改状态(分配完成) + changeStatus(registeredThreads, ProcessThreadStatus.REDISTRIBUTE_SUCCESS); + } + } + + private void allocationUser(List registeredThreads, List userIds) { + Set sortThreadIds = new HashSet(registeredThreads); + int step = (int) Math.ceil(userIds.size() * 1.0 / sortThreadIds.size()); + int start = 0; + int end = step; + + if (end > userIds.size()) { + end = userIds.size(); + } + + for (String thread : sortThreadIds) { + String value = ZKUtil.getPathData(Config.ZKPath.REGISTER_SERVER_PATH + "/" + thread); + ProcessThreadValue value1 = new Gson().fromJson(value, ProcessThreadValue.class); + value1.setDealUserIds(userIds.subList(start, end)); + ZKUtil.setPathData(Config.ZKPath.REGISTER_SERVER_PATH + "/" + thread, new Gson().toJson(value1)); + + start = end; + end += step; + + if (end > userIds.size()) { + break; + } + + } + } + + private List acquireAllRegisteredThread() { + return ZKUtil.getChildren(Config.ZKPath.REGISTER_SERVER_PATH); + } + + private boolean isAllProcessThreadFree(List registeredThreadIds) { + String registerPathPrefix = Config.ZKPath.REGISTER_SERVER_PATH + "/"; + for (String threadId : registeredThreadIds) { + if (getProcessThreadStatus(registerPathPrefix, threadId) + != ProcessThreadStatus.FREE) { + return false; + } + } + return true; + } + + private ProcessThreadStatus getProcessThreadStatus(String registerPathPrefix, String threadId) { + String value = ZKUtil.getPathData(registerPathPrefix + threadId); + ProcessThreadValue value1 = new Gson().fromJson(value, ProcessThreadValue.class); + return ProcessThreadStatus.convert(value1.getStatus()); + } + + + private void changeStatus(List registeredThreadIds, ProcessThreadStatus status) { + for (String threadId : registeredThreadIds) { + ProcessUtil.changeProcessThreadStatus(threadId, status); + } + } + + public class RegisterServerWatcher implements CuratorWatcher { + + @Override + public void process(WatchedEvent watchedEvent) throws Exception { + if (watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) { + if (redistributing) { + newServerComingFlag = true; + } else { + redistributing = true; + } + } + } + } +} diff --git a/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/UserInfoInspector.java b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/UserInfoInspector.java index 14f46ead06d7163e0038f85c972c0fd8d4a9cb58..955b58d0a402aa57801a7919666e8fd108ee5425 100644 --- a/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/UserInfoInspector.java +++ b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/UserInfoInspector.java @@ -29,7 +29,7 @@ public class UserInfoInspector extends Thread { if (currentUserSize != preUserSize) { logger.info("Total user has been changed. Notice all process thread to change process date."); for (AlarmMessageProcessThread thread : AlarmProcessServer.getProcessThreads()) { - thread.setChanged(true); + //thread.setChanged(true); } } } diff --git a/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/conf/Config.java b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/conf/Config.java index 198772a9fa1a2f1ad007e058ea7509b43275130f..37f622d837943c21cccc3b2cbed9a71fd3ddc2d0 100644 --- a/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/conf/Config.java +++ b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/conf/Config.java @@ -10,6 +10,10 @@ public class Config { } + public static class ProcessThread { + public static long THREAD_WAIT_INTERVAL = 60 * 1000L; + } + public static class ZKPath { public static String CONNECT_STR = "127.0.0.1:2181"; @@ -22,9 +26,9 @@ public class Config { public static String NODE_PREFIX = "/skywalking"; - public static String SERVER_REGISTER_LOCK_PATH = "/alarm-server/register"; + public static String COORDINATOR_STATUS_PATH = "/alarm-server/coordinator/status"; - public static String REGISTER_SERVER_PATH = "/alarm-server/servers"; + public static String REGISTER_SERVER_PATH = "/alarm-server/register-servers"; public static String USER_REGISTER_LOCK_PATH = "/alarm-server/users"; } @@ -54,16 +58,7 @@ public class Config { public static boolean ALARM_OFF_FLAG = false; } - public static class MailSender { - - public static String HOST = ""; - - public static String TRANSPORT_PROTOCOL = ""; - - public static boolean SMTP_AUTH = true; - - public static String USER_NAME = ""; - - public static String PASSWORD = ""; + public static class MailSenderInfo { + public static String configId = "1000"; } } \ No newline at end of file diff --git a/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/dao/AlarmMessageDao.java b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/dao/AlarmMessageDao.java index eb552053c639d38514dd6fcdc7c07f55ad8713f2..ca01cd07c6ec2c7869c1507a500d20dc94a36c15 100644 --- a/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/dao/AlarmMessageDao.java +++ b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/dao/AlarmMessageDao.java @@ -1,47 +1,31 @@ package com.ai.cloud.skywalking.alarm.dao; -import com.ai.cloud.skywalking.alarm.conf.Config; import com.ai.cloud.skywalking.alarm.model.AlarmRule; import com.ai.cloud.skywalking.alarm.model.ApplicationInfo; import com.ai.cloud.skywalking.alarm.model.UserInfo; +import com.ai.cloud.skywalking.alarm.util.DBConnectUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.sql.*; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; import java.util.ArrayList; import java.util.List; public class AlarmMessageDao { private static Logger logger = LogManager.getLogger(AlarmMessageDao.class); - private static Connection con; - static { - try { - Class.forName(Config.DB.DRIVER_CLASS); - } catch (ClassNotFoundException e) { - logger.error("Failed to found DB driver class.", e); - System.exit(-1); - } + public static List selectAllUserIds() { + List result = new ArrayList(); try { - con = DriverManager.getConnection(Config.DB.URL, Config.DB.USER_NAME, Config.DB.PASSWORD); - } catch (SQLException e) { - logger.error("Failed to connect DB", e); - System.exit(-1); - } - } - - public static List selectAllUserInfo() { - List result = new ArrayList(); - try { - PreparedStatement ps = con.prepareStatement("SELECT user_info.uid FROM user_info WHERE sts = ?"); + PreparedStatement ps = DBConnectUtil.getConnection().prepareStatement("SELECT user_info.uid FROM user_info WHERE sts = ?"); ps.setString(1, "A"); ResultSet rs = ps.executeQuery(); - UserInfo userInfo; while (rs.next()) { - userInfo = new UserInfo(rs.getString("uid")); - result.add(userInfo); + result.add(rs.getString("uid")); } } catch (SQLException e) { logger.error("Failed to select all user info", e); @@ -52,7 +36,7 @@ public class AlarmMessageDao { public static int selectUserCount() { try { - PreparedStatement ps = con.prepareStatement("SELECT count(user_info.uid) as totalNumber FROM user_info WHERE sts = ?"); + PreparedStatement ps = DBConnectUtil.getConnection().prepareStatement("SELECT count(user_info.uid) as totalNumber FROM user_info WHERE sts = ?"); ps.setString(1, "A"); ResultSet rs = ps.executeQuery(); rs.next(); @@ -64,11 +48,27 @@ public class AlarmMessageDao { return 0; } + public static UserInfo selectUser(String userId) { + UserInfo userInfo = null; + try { + PreparedStatement ps = DBConnectUtil.getConnection().prepareStatement("SELECT user_info.uid,user_info.user_name FROM user_info WHERE sts = ?"); + ps.setString(1, "A"); + ResultSet rs = ps.executeQuery(); + rs.next(); + userInfo = new UserInfo(rs.getString("uid")); + userInfo.setUserName(rs.getString("user_name")); + } catch (SQLException e) { + logger.error("Failed to select all user info", e); + } + + return userInfo; + } + public static List selectAlarmRulesByUserId(String userId) { List rules = new ArrayList(); List selfDefineApplications = new ArrayList(); try { - PreparedStatement ps = con.prepareStatement("SELECT alarm_rule.app_id,alarm_rule.rule_id, alarm_rule.uid,alarm_rule.is_global, alarm_rule.todo_type," + + PreparedStatement ps = DBConnectUtil.getConnection().prepareStatement("SELECT alarm_rule.app_id,alarm_rule.rule_id, alarm_rule.uid,alarm_rule.is_global, alarm_rule.todo_type," + " alarm_rule.config_args FROM alarm_rule WHERE uid = ? AND sts = ?"); ps.setString(1, userId); ps.setString(2, "A"); @@ -104,7 +104,8 @@ public class AlarmMessageDao { } List allApplication = new ArrayList(); - ps = con.prepareStatement("SELECT application_info.app_id, application_info.uid, app_code FROM application_info WHERE uid = ? AND sts = ?"); + ps = DBConnectUtil.getConnection() + .prepareStatement("SELECT application_info.app_id, application_info.uid, app_code FROM application_info WHERE uid = ? AND sts = ?"); ps.setString(1, userId); ps.setString(2, "A"); rs = ps.executeQuery(); diff --git a/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/dao/SystemConfigDao.java b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/dao/SystemConfigDao.java new file mode 100644 index 0000000000000000000000000000000000000000..07666552d48231f4431e593b637b3157ad3161cd --- /dev/null +++ b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/dao/SystemConfigDao.java @@ -0,0 +1,26 @@ +package com.ai.cloud.skywalking.alarm.dao; + +import com.ai.cloud.skywalking.alarm.util.DBConnectUtil; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +public class SystemConfigDao { + private static Logger logger = LogManager.getLogger(AlarmMessageDao.class); + + public static String getMailSenderInfo(String configId) throws SQLException { + PreparedStatement ps = DBConnectUtil.getConnection().prepareStatement( + "SELECT system_config.conf_value FROM system_config WHERE system_config.sts = " + + "? AND system_config.config_id = ?"); + ps.setString(1, "A"); + ps.setString(2, configId); + + ResultSet resultSet = ps.executeQuery(); + resultSet.next(); + + return resultSet.getString("conf_value"); + } +} diff --git a/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/model/ProcessThreadStatus.java b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/model/ProcessThreadStatus.java new file mode 100644 index 0000000000000000000000000000000000000000..b3d3fae429f5a96b7a7c7858dbf45c0b8ac8cecd --- /dev/null +++ b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/model/ProcessThreadStatus.java @@ -0,0 +1,31 @@ +package com.ai.cloud.skywalking.alarm.model; + +public enum ProcessThreadStatus { + REDISTRIBUTING("1"), REDISTRIBUTE_SUCCESS("2"), FREE("0"), BUSY("3"); + + private String value; + + ProcessThreadStatus(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + public static ProcessThreadStatus convert(String value) { + ProcessThreadStatus status; + switch (value) { + case "0": + status = REDISTRIBUTING; + break; + case "1": + status = REDISTRIBUTE_SUCCESS; + break; + default: + throw new IllegalArgumentException("Coordinator status illegal"); + } + + return status; + } +} diff --git a/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/model/ProcessThreadValue.java b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/model/ProcessThreadValue.java new file mode 100644 index 0000000000000000000000000000000000000000..ae515272e3e76b1c6ec4422f3c998fa615ff4355 --- /dev/null +++ b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/model/ProcessThreadValue.java @@ -0,0 +1,24 @@ +package com.ai.cloud.skywalking.alarm.model; + +import java.util.List; + +public class ProcessThreadValue { + private String status; + private List dealUserIds; + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public List getDealUserIds() { + return dealUserIds; + } + + public void setDealUserIds(List dealUserIds) { + this.dealUserIds = dealUserIds; + } +} diff --git a/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/model/UserInfo.java b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/model/UserInfo.java index d3f53e9d2255627bf28fdbe3d3f6e5e07bbe6ae5..5cecd10e511fe694e4bc8406743676c9a93c07ad 100644 --- a/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/model/UserInfo.java +++ b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/model/UserInfo.java @@ -1,12 +1,10 @@ package com.ai.cloud.skywalking.alarm.model; -import java.util.List; - public class UserInfo { private String userId; - private List rules; + private String userName; public UserInfo(String userId) { this.userId = userId; @@ -16,11 +14,11 @@ public class UserInfo { return userId; } - public List getRules() { - return rules; + public String getUserName() { + return userName; } - public void setRules(List rules) { - this.rules = rules; + public void setUserName(String userName) { + this.userName = userName; } } diff --git a/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/model/parameter/Application.java b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/model/parameter/Application.java deleted file mode 100644 index 8aee93399c650551bec984a384a85cfbbe159872..0000000000000000000000000000000000000000 --- a/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/model/parameter/Application.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.ai.cloud.skywalking.alarm.model.parameter; - -import java.util.Collection; -import java.util.Set; - -public class Application { - - private String applicationId; - - private Set traceIds; - - public Application(String applicationId) { - this.applicationId = applicationId; - } - - public String getApplicationId() { - return applicationId; - } - - public void setApplicationId(String applicationId) { - this.applicationId = applicationId; - } - - public Set getTraceIds() { - return traceIds; - } - - public void setTraceIds(Set traceIds) { - this.traceIds = traceIds; - } - - -} diff --git a/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/procesor/AlarmMessageProcessor.java b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/procesor/AlarmMessageProcessor.java index 7db62605ca859c55a6372ea7eca3cd2157a55ec5..c28d7530ac3128f4b7faa2cadb5edafc32c4a33c 100644 --- a/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/procesor/AlarmMessageProcessor.java +++ b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/procesor/AlarmMessageProcessor.java @@ -1,158 +1,164 @@ package com.ai.cloud.skywalking.alarm.procesor; -import java.io.IOException; -import java.io.StringReader; -import java.io.StringWriter; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import redis.clients.jedis.Jedis; - +import com.ai.cloud.skywalking.alarm.dao.AlarmMessageDao; import com.ai.cloud.skywalking.alarm.model.AlarmRule; import com.ai.cloud.skywalking.alarm.model.ApplicationInfo; import com.ai.cloud.skywalking.alarm.model.MailInfo; import com.ai.cloud.skywalking.alarm.model.UserInfo; import com.ai.cloud.skywalking.alarm.util.MailUtil; import com.ai.cloud.skywalking.alarm.util.RedisUtil; - import freemarker.template.Configuration; import freemarker.template.Template; import freemarker.template.TemplateException; import freemarker.template.Version; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import redis.clients.jedis.Jedis; + +import java.io.IOException; +import java.io.StringReader; +import java.io.StringWriter; +import java.text.SimpleDateFormat; +import java.util.*; public class AlarmMessageProcessor { - private static Logger logger = LogManager - .getLogger(AlarmMessageProcessor.class); - - public void process(UserInfo userInfo, AlarmRule rule) { - Set warningTracingIds = new HashSet(); - Set warningMessageKeys = new HashSet(); - long currentFireMinuteTime = System.currentTimeMillis() / (10000 * 6); - long warningTimeWindowSize = currentFireMinuteTime - - rule.getPreviousFireTimeM(); - // 获取待发送数据 - if (warningTimeWindowSize >= rule.getConfigArgsDescriber().getPeriod()) { - for (ApplicationInfo applicationInfo : rule.getApplicationInfos()) { - for (int period = 0; period < warningTimeWindowSize; period++) { - String alarmKey = userInfo.getUserId() - + "-" - + applicationInfo.getAppCode() - + "-" - + ((System.currentTimeMillis() / (10000 * 6)) - period); - - warningMessageKeys.add(alarmKey); - warningTracingIds.addAll(getAlarmMessages(alarmKey)); - } - } - - // 发送告警数据 - if (warningTracingIds.size() > 0) { - if ("0".equals(rule.getTodoType())) { - // 发送邮件 - String subjects = generateSubject(warningTracingIds.size(), - rule.getPreviousFireTimeM(), currentFireMinuteTime); - Map parameter = new HashMap(); - // TODO:已使用新的参数,warningTracingIds包含所有的告警tracingId,需要在模板中生成链接 - parameter.put("warningTracingIds", warningTracingIds); - // TODO:请转换为USERNAME,ID无法识别 - parameter.put("name", userInfo.getUserId()); - String mailContext = generateContent(rule - .getConfigArgsDescriber().getMailInfo() - .getMailTemp(), parameter); - if (mailContext.length() > 0) { - MailInfo mailInfo = rule.getConfigArgsDescriber() - .getMailInfo(); - MailUtil.sendMail(mailInfo.getMailTo(), - mailInfo.getMailCc(), mailContext, subjects); - } - } - } - - // 清理数据 - for (String toBeRemovedKey : warningMessageKeys) { - expiredAlarmMessage(toBeRemovedKey); - } - - // 修改-保存上次处理时间 - dealPreviousFireTime(userInfo, rule, currentFireMinuteTime); - } - - } - - private void dealPreviousFireTime(UserInfo userInfo, AlarmRule rule, - long currentFireMinuteTime) { - rule.setPreviousFireTimeM(currentFireMinuteTime); - savePreviousFireTime(userInfo.getUserId(), rule.getRuleId(), - currentFireMinuteTime); - } - - private String generateSubject(int count, long startTime, long endTime) { - String title = "[Warning] There were " - + count - + " alarm information between " - + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date( - startTime)) - + " to " - + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date( - endTime)); - - return title; - } - - private void expiredAlarmMessage(String key) { - Jedis client = RedisUtil.getRedisClient(); - client.expire(key, 0); - if (client != null) { - client.close(); - } - } - - private void savePreviousFireTime(String userId, String ruleId, - long currentFireMinuteTime) { - Jedis client = RedisUtil.getRedisClient(); - client.hset(userId, ruleId, String.valueOf(currentFireMinuteTime)); - if (client != null) { - client.close(); - } - } - - private Collection getAlarmMessages(String key) { - Jedis client = RedisUtil.getRedisClient(); - Map result = client.hgetAll(key); - if (result == null) { - return new ArrayList(); - } - - client.close(); - - return result.values(); - } - - private String generateContent(String templateStr, Map parameter) { - Configuration cfg = new Configuration(new Version("2.3.23")); - cfg.setDefaultEncoding("UTF-8"); - Template t = null; - try { - t = new Template(null, new StringReader(templateStr), cfg); - StringWriter out = new StringWriter(); - t.process(parameter, out); - return out.getBuffer().toString(); - } catch (IOException e) { - logger.error("Template illegal.", e); - } catch (TemplateException e) { - logger.error("Failed to generate content.", e); - } - - return ""; - } + private static Logger logger = LogManager + .getLogger(AlarmMessageProcessor.class); + + public void process(UserInfo userInfo, AlarmRule rule) { + Set warningTracingIds = new HashSet(); + Set warningMessageKeys = new HashSet(); + long currentFireMinuteTime = System.currentTimeMillis() / (10000 * 6); + long warningTimeWindowSize = currentFireMinuteTime + - rule.getPreviousFireTimeM(); + // 获取待发送数据 + if (warningTimeWindowSize >= rule.getConfigArgsDescriber().getPeriod()) { + for (ApplicationInfo applicationInfo : rule.getApplicationInfos()) { + for (int period = 0; period < warningTimeWindowSize; period++) { + String alarmKey = userInfo.getUserId() + + "-" + + applicationInfo.getAppCode() + + "-" + + (currentFireMinuteTime - period); + + warningMessageKeys.add(alarmKey); + warningTracingIds.addAll(getAlarmMessages(alarmKey)); + } + } + + // 发送告警数据 + if (warningTracingIds.size() > 0) { + if ("0".equals(rule.getTodoType())) { + // 发送邮件 + String subjects = generateSubject(warningTracingIds.size(), + rule.getPreviousFireTimeM(), currentFireMinuteTime); + Map parameter = new HashMap(); + parameter.put("warningTracingIds", warningTracingIds); + parameter.put("name", userInfo.getUserName()); + parameter.put("startDate", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date( + rule.getPreviousFireTimeM() * 10000 * 6))); + parameter.put("endDate", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date( + currentFireMinuteTime * 10000 * 6))); + //TODO portalAddr需要初始化 + parameter.put("portalAddr", "http://127.0.0.1:8080/skywalking-webui/"); + String mailContext = generateContent(rule + .getConfigArgsDescriber().getMailInfo() + .getMailTemp(), parameter); + if (mailContext.length() > 0) { + MailInfo mailInfo = rule.getConfigArgsDescriber() + .getMailInfo(); + MailUtil.sendMail(mailInfo.getMailTo(), + mailInfo.getMailCc(), mailContext, subjects); + } + } + } + + // 清理数据 + for (String toBeRemovedKey : warningMessageKeys) { + expiredAlarmMessage(toBeRemovedKey); + } + + // 修改-保存上次处理时间 + dealPreviousFireTime(userInfo, rule, currentFireMinuteTime); + } + + } + + private void dealPreviousFireTime(UserInfo userInfo, AlarmRule rule, + long currentFireMinuteTime) { + rule.setPreviousFireTimeM(currentFireMinuteTime); + savePreviousFireTime(userInfo.getUserId(), rule.getRuleId(), + currentFireMinuteTime); + } + + private String generateSubject(int count, long startTime, long endTime) { + String title = "[Warning] There were " + + count + + " alarm information between " + + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date( + startTime * 10000 * 6)) + + " to " + + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date( + endTime * 10000 * 6)); + + return title; + } + + private void expiredAlarmMessage(String key) { + Jedis client = RedisUtil.getRedisClient(); + client.expire(key, 0); + if (client != null) { + client.close(); + } + } + + private void savePreviousFireTime(String userId, String ruleId, + long currentFireMinuteTime) { + Jedis client = RedisUtil.getRedisClient(); + client.hset(userId, ruleId, String.valueOf(currentFireMinuteTime)); + if (client != null) { + client.close(); + } + } + + private Collection getAlarmMessages(String key) { + Jedis client = RedisUtil.getRedisClient(); + Map result = client.hgetAll(key); + if (result == null) { + return new ArrayList(); + } + + client.close(); + + return result.keySet(); + } + + private String generateContent(String templateStr, Map parameter) { + Configuration cfg = new Configuration(new Version("2.3.23")); + cfg.setDefaultEncoding("UTF-8"); + Template t = null; + try { + t = new Template(null, new StringReader(templateStr), cfg); + StringWriter out = new StringWriter(); + t.process(parameter, out); + return out.getBuffer().toString(); + } catch (IOException e) { + logger.error("Template illegal.", e); + } catch (TemplateException e) { + logger.error("Failed to generate content.", e); + } + + return ""; + } + + public static void main(String[] args) throws InterruptedException { + UserInfo userInfo = new UserInfo("27"); + userInfo.setUserName("123"); + List rules = AlarmMessageDao.selectAlarmRulesByUserId(userInfo.getUserId()); + while (true) { + new AlarmMessageProcessor().process(userInfo, rules.get(0)); + Thread.sleep(60 * 1000L); + } + } } diff --git a/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/util/DBConnectUtil.java b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/util/DBConnectUtil.java new file mode 100644 index 0000000000000000000000000000000000000000..7cb587e84326509208971ad8c38205a2cff9bd08 --- /dev/null +++ b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/util/DBConnectUtil.java @@ -0,0 +1,36 @@ +package com.ai.cloud.skywalking.alarm.util; + +import com.ai.cloud.skywalking.alarm.conf.Config; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +public class DBConnectUtil { + + private static Logger logger = LogManager.getLogger(DBConnectUtil.class); + + private static Connection con; + + static { + try { + Class.forName(Config.DB.DRIVER_CLASS); + } catch (ClassNotFoundException e) { + logger.error("Failed to found DB driver class.", e); + System.exit(-1); + } + + try { + con = DriverManager.getConnection(Config.DB.URL, Config.DB.USER_NAME, Config.DB.PASSWORD); + } catch (SQLException e) { + logger.error("Failed to connect DB", e); + System.exit(-1); + } + } + + public static Connection getConnection() { + return con; + } +} diff --git a/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/util/MailUtil.java b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/util/MailUtil.java index 44811ffc77d76c299d462fc77db808516c654c08..47ad21bb5211ff4fbd20c8d93074f0a33a2d0377 100644 --- a/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/util/MailUtil.java +++ b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/util/MailUtil.java @@ -1,5 +1,8 @@ package com.ai.cloud.skywalking.alarm.util; +import com.ai.cloud.skywalking.alarm.conf.Config; +import com.ai.cloud.skywalking.alarm.dao.SystemConfigDao; +import com.google.gson.Gson; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -17,10 +20,11 @@ public class MailUtil { private static String sendAccount; private static Transport ts; + static { try { - Properties prop = new Properties(); - prop.load(MailUtil.class.getResourceAsStream("/mail/mail.config")); + String senderInfo = SystemConfigDao.getMailSenderInfo(Config.MailSenderInfo.configId); + Properties prop = new Gson().fromJson(senderInfo, Properties.class); session = Session.getInstance(prop); ts = session.getTransport(); ts.connect(prop.getProperty("mail.host"), prop.getProperty("mail.username"), prop.getProperty("mail.password")); diff --git a/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/util/ProcessUtil.java b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/util/ProcessUtil.java new file mode 100644 index 0000000000000000000000000000000000000000..df55e24e4fda15fcc5d48987ef3e74b253b997ca --- /dev/null +++ b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/util/ProcessUtil.java @@ -0,0 +1,18 @@ +package com.ai.cloud.skywalking.alarm.util; + +import com.ai.cloud.skywalking.alarm.conf.Config; +import com.ai.cloud.skywalking.alarm.model.ProcessThreadStatus; +import com.ai.cloud.skywalking.alarm.model.ProcessThreadValue; +import com.google.gson.Gson; + +public class ProcessUtil { + + public static void changeProcessThreadStatus(String threadId, ProcessThreadStatus status) { + String path = Config.ZKPath.REGISTER_SERVER_PATH + "/" + threadId; + String value = ZKUtil.getPathData(path); + ProcessThreadValue newValue = new Gson().fromJson(value, ProcessThreadValue.class); + newValue.setStatus(status.getValue()); + ZKUtil.setPathData(path, new Gson().toJson(newValue)); + } + +} diff --git a/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/util/ZKUtil.java b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/util/ZKUtil.java new file mode 100644 index 0000000000000000000000000000000000000000..89cc6a03b03fb14082d545225b566f86f8d4b2dd --- /dev/null +++ b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/util/ZKUtil.java @@ -0,0 +1,78 @@ +package com.ai.cloud.skywalking.alarm.util; + +import com.ai.cloud.skywalking.alarm.conf.Config; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; + +public class ZKUtil { + private static Logger logger = LogManager.getLogger(ZKUtil.class); + private static CuratorFramework client; + + static { + try { + RetryPolicy retryPolicy = new ExponentialBackoffRetry(Config.ZKPath.RETRY_TIMEOUT, + Config.ZKPath.RETRY_TIMES); + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(). + connectString(Config.ZKPath.CONNECT_STR) + .connectionTimeoutMs(Config.ZKPath.CONNECT_TIMEOUT).retryPolicy(retryPolicy); + client = builder.build(); + client.start(); + } catch (Exception e) { + logger.error("Failed to connect zookeeper.", e); + System.exit(-1); + } + } + + public static CuratorFramework getZkClient() { + return client; + } + + public static InterProcessMutex getLock(String path) { + return new InterProcessMutex(client, path); + } + + + public static String getPathData(String path) { + try { + return new String(client.getData().forPath(path)); + } catch (Exception e) { + logger.error("Failed to get the value of path[{}]", path, e); + } + return ""; + } + + public static String getPathDataWithWatch(String path, CuratorWatcher watcher) { + try { + return new String(client.getData().usingWatcher(watcher).forPath(path)); + } catch (Exception e) { + logger.error("Failed to get the value of path[{}]", path, e); + } + return ""; + } + + public static void setPathData(String path, String value) { + try { + client.setData().forPath(path, value.getBytes()); + } catch (Exception e) { + logger.error("Failed to set date of path[{{}]", path, e); + } + } + + public static List getChildren(String registerServerPath) { + try { + return client.getChildren().forPath(registerServerPath); + } catch (Exception e) { + logger.error("Failed to get child nodes of path[{{}]", registerServerPath, e); + } + return new ArrayList(); + } +} diff --git a/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/zk/ZKUtil.java b/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/zk/ZKUtil.java deleted file mode 100644 index 9bdccd3a594a231e3f73610633f2bf81a1c6f931..0000000000000000000000000000000000000000 --- a/skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/zk/ZKUtil.java +++ /dev/null @@ -1,96 +0,0 @@ -package com.ai.cloud.skywalking.alarm.zk; - -import com.ai.cloud.skywalking.alarm.AlarmServerRegisterWatcher; -import com.ai.cloud.skywalking.alarm.conf.Config; -import org.apache.curator.RetryPolicy; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.recipes.locks.InterProcessMutex; -import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.ArrayList; -import java.util.List; - -public class ZKUtil { - - private static Logger logger = LogManager.getLogger(ZKUtil.class); - - private static CuratorFramework client; - - static { - try { - RetryPolicy retryPolicy = new ExponentialBackoffRetry(Config.ZKPath.RETRY_TIMEOUT, - Config.ZKPath.RETRY_TIMES); - CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(). - connectString(Config.ZKPath.CONNECT_STR) - .connectionTimeoutMs(Config.ZKPath.CONNECT_TIMEOUT).retryPolicy(retryPolicy); - client = builder.build(); - client.start(); - - createIfNotExists(getRegisterLockPath()); - createIfNotExists(getUserLockPathPrefix()); - } catch (Exception e) { - logger.error("Failed to connect zookeeper.", e); - System.exit(-1); - } - } - - public static InterProcessMutex getRegisterLock() { - return new InterProcessMutex(client, getRegisterLockPath()); - } - - public static InterProcessMutex getProcessUserLock(String uid) { - return new InterProcessMutex(client, getUserLockPath(uid)); - } - - public static void register(String serverId) { - try { - client.create().creatingParentsIfNeeded().forPath(getRegisterServerPathPrefix() + "/" + serverId); - } catch (Exception e) { - logger.error("Failed to register server", e); - } - } - - public static List selectAllThreadIds() { - try { - return client.getChildren().forPath(getRegisterServerPathPrefix()); - } catch (Exception e) { - logger.error("Failed to get children of Path[" + getRegisterServerPathPrefix() + "].", e); - } - return new ArrayList(); - } - - public static void watch(AlarmServerRegisterWatcher instance) { - try { - client.getChildren().usingWatcher(instance).forPath(getRegisterServerPathPrefix()); - } catch (Exception e) { - logger.error("Failed to get children for path[" + getRegisterServerPathPrefix() + "].", e); - } - } - - - private static String getRegisterLockPath() { - return Config.ZKPath.NODE_PREFIX + Config.ZKPath.SERVER_REGISTER_LOCK_PATH; - } - - private static String getUserLockPath(String uid) { - return getUserLockPathPrefix() + "/" + uid; - } - - private static String getUserLockPathPrefix() { - return Config.ZKPath.NODE_PREFIX + Config.ZKPath.USER_REGISTER_LOCK_PATH; - } - - - private static void createIfNotExists(String path) throws Exception { - if (client.checkExists().forPath(path) == null) { - client.create().creatingParentsIfNeeded().forPath(path); - } - } - - private static String getRegisterServerPathPrefix() { - return Config.ZKPath.NODE_PREFIX + Config.ZKPath.REGISTER_SERVER_PATH; - } -}