提交 2c1fb791 编写于 作者: wu-sheng's avatar wu-sheng

1.修整UserInfoCoordinator代码,确保逻辑的有效性和健壮性。

上级 e95b993b
......@@ -50,7 +50,6 @@ public class AlarmMessageProcessThread extends Thread {
for (Map.Entry<UserInfo, List<AlarmRule>> entry : cacheRules.entrySet()) {
for (AlarmRule rule : entry.getValue()) {
processor.process(entry.getKey(), rule);
// System.out.println(currentThread().getName() + " @~ " + entry.getKey().getUserId() + " @~ " + rule.getRuleId());
}
}
}
......
......@@ -22,219 +22,222 @@ import java.util.concurrent.TimeUnit;
public class UserInfoCoordinator extends Thread {
private Logger logger = LogManager.getLogger(UserInfoInspector.class);
private boolean redistributing;
private boolean newServerComingFlag = false;
private RegisterServerWatcher watcher = new RegisterServerWatcher();
private InterProcessMutex lock = new InterProcessMutex(ZKUtil.getZkClient(), Config.ZKPath.COORDINATOR_PATH);
private boolean coordinatorFlag;
public UserInfoCoordinator() {
redistributing = false;
try {
coordinatorFlag = lock.acquire(Config.Coordinator.RETRY_GET_COORDINATOR_LOCK_INTERVAL
, TimeUnit.SECONDS);
} catch (Exception e) {
logger.error("Failed to", e);
coordinatorFlag = false;
}
if (coordinatorFlag) {
watcherRegisterServerPath();
}
}
@Override
public void run() {
if (!coordinatorFlag) {
//
while (!retryBecomeCoordinator()) {
try {
Thread.sleep(Config.Coordinator.RETRY_BECOME_COORDINATOR_WAIT_TIME);
} catch (Exception e) {
logger.error("Sleep Failed.", e);
}
}
// 新官上任三把火,重新分配
watcherRegisterServerPath();
redistributing = true;
}
while (true) {
try {
//检查是否有新服务注册或者在重分配过程做有新处理线程启动了
if (!redistributing && !newServerComingFlag) {
try {
Thread.sleep(Config.Coordinator.CHECK_REDISTRIBUTE_INTERVAL);
} catch (InterruptedException e) {
logger.error("Sleep error", e);
}
continue;
}
newServerComingFlag = false;
//获取当前所有的注册的处理线程
List<String> registeredThreads = acquireAllRegisteredThread();
//修改状态 (开始重新分配状态)
changeStatus(registeredThreads, ProcessThreadStatus.REDISTRIBUTING);
//检查所有的服务是否都处于空闲状态
while (!checkAllProcessStatus(registeredThreads, ProcessThreadStatus.FREE)) {
try {
Thread.sleep(Config.Coordinator.CHECK_ALL_PROCESS_THREAD_INTERVAL);
} catch (InterruptedException e) {
logger.error("Sleep failed", e);
}
}
//查询当前有多少用户
List<String> users = AlarmMessageDao.selectAllUserIds();
//将用户重新分配给服务
List<String> realRedistributeThread = allocationUser(registeredThreads, users);
//修改状态(分配完成)
changeStatus(realRedistributeThread, ProcessThreadStatus.REDISTRIBUTE_SUCCESS);
//检查所有的服务是否都处于忙碌状态
while (!checkAllProcessStatus(realRedistributeThread, ProcessThreadStatus.BUSY)) {
try {
Thread.sleep(Config.Coordinator.CHECK_ALL_PROCESS_THREAD_INTERVAL);
} catch (InterruptedException e) {
logger.error("Sleep failed", e);
}
}
redistributing = false;
} catch (Exception e) {
logger.error("Failed to redistribute User ", e);
}
}
}
private void watcherRegisterServerPath() {
try {
ZKUtil.getChildrenWithWatcher(Config.ZKPath.REGISTER_SERVER_PATH, watcher);
} catch (Exception e) {
logger.error("Failed to set watcher for get children", e);
if (lock != null && lock.isAcquiredInThisProcess()) {
try {
lock.release();
} catch (Exception e1) {
logger.error("Failed to release lock.", e1);
}
}
}
}
private boolean retryBecomeCoordinator() {
try {
return lock.acquire(5, TimeUnit.SECONDS);
} catch (Exception e) {
logger.error("Failed to acquire lock .", e);
return false;
}
}
private List<String> allocationUser(List<String> registeredThreads, List<String> userIds) {
List<String> realRedistributeThread = new ArrayList<String>();
Set<String> sortThreadIds = new HashSet<String>(registeredThreads);
int step = (int) Math.ceil(userIds.size() * 1.0 / sortThreadIds.size());
int start = 0;
int end = step;
if (end > userIds.size()) {
end = userIds.size();
}
for (String thread : sortThreadIds) {
if (!ZKUtil.exists(Config.ZKPath.REGISTER_SERVER_PATH + "/" + thread))
continue;
String value = ZKUtil.getPathData(Config.ZKPath.REGISTER_SERVER_PATH + "/" + thread);
ProcessThreadValue value1 = new Gson().fromJson(value, ProcessThreadValue.class);
value1.setDealUserIds(userIds.subList(start, end));
ZKUtil.setPathData(Config.ZKPath.REGISTER_SERVER_PATH + "/" + thread, new Gson().toJson(value1));
// 实际重新分配的线程Id
realRedistributeThread.add(thread);
start = end;
end += step;
if (start >= userIds.size()) {
break;
}
if (end > userIds.size()) {
end = userIds.size();
}
}
return realRedistributeThread;
}
private List<String> acquireAllRegisteredThread() {
return ZKUtil.getChildren(Config.ZKPath.REGISTER_SERVER_PATH);
}
private boolean checkAllProcessStatus(List<String> registeredThreadIds, ProcessThreadStatus status) {
String registerPathPrefix = Config.ZKPath.REGISTER_SERVER_PATH + "/";
for (String threadId : registeredThreadIds) {
if (!ZKUtil.exists(Config.ZKPath.REGISTER_SERVER_PATH + "/" + threadId))
continue;
if (getProcessThreadStatus(registerPathPrefix, threadId)
!= status) {
return false;
}
}
return true;
}
private ProcessThreadStatus getProcessThreadStatus(String registerPathPrefix, String threadId) {
if (!ZKUtil.exists(Config.ZKPath.REGISTER_SERVER_PATH + "/" + threadId))
return ProcessThreadStatus.FREE;
String value = ZKUtil.getPathData(registerPathPrefix + threadId);
if (value == null || value.length() == 0)
return ProcessThreadStatus.FREE;
ProcessThreadValue value1 = new Gson().fromJson(value, ProcessThreadValue.class);
return ProcessThreadStatus.convert(value1.getStatus());
}
private void changeStatus(List<String> registeredThreadIds, ProcessThreadStatus status) {
for (String threadId : registeredThreadIds) {
ProcessUtil.changeProcessThreadStatus(threadId, status);
}
}
public class RegisterServerWatcher implements CuratorWatcher {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
if (redistributing) {
redistributing = false;
newServerComingFlag = true;
} else {
redistributing = true;
}
}
try {
ZKUtil.getChildrenWithWatcher(Config.ZKPath.REGISTER_SERVER_PATH, watcher);
} catch (Exception e) {
if (lock != null && lock.isAcquiredInThisProcess()) {
try {
lock.release();
} catch (Exception e1) {
logger.error("Failed to release master locker.", e1);
}
}
}
}
}
private Logger logger = LogManager.getLogger(UserInfoCoordinator.class);
private boolean redistributing;
private RegisterServerWatcher watcher = new RegisterServerWatcher();
private InterProcessMutex lock = new InterProcessMutex(
ZKUtil.getZkClient(), Config.ZKPath.COORDINATOR_PATH);
private boolean isCoordinator = false;
public UserInfoCoordinator() {
}
@Override
public void run() {
while (true) {
try {
if (!isCoordinator) {
while (!retryBecomeCoordinator()) {
try {
Thread.sleep(Config.Coordinator.RETRY_BECOME_COORDINATOR_WAIT_TIME);
} catch (Exception e) {
logger.error("Sleep Failed.", e);
}
}
isCoordinator = true;
watcherRegisterServerPath();
redistributing = true;
}
// 检查是否有新服务注册或者在重分配过程做有新处理线程启动了
if (!redistributing) {
try {
Thread.sleep(Config.Coordinator.CHECK_REDISTRIBUTE_INTERVAL);
} catch (InterruptedException e) {
logger.error("Sleep error", e);
}
continue;
}
redistributing = false;
// 获取当前所有的注册的处理线程
List<String> registeredThreads = acquireAllRegisteredThread();
// 修改状态 (开始重新分配状态)
changeStatus(registeredThreads,
ProcessThreadStatus.REDISTRIBUTING);
// 检查所有的服务是否都处于空闲状态
int retryTimes = 0;
while (!checkAllProcessStatus(registeredThreads,
ProcessThreadStatus.FREE)) {
try {
Thread.sleep(Config.Coordinator.CHECK_ALL_PROCESS_THREAD_INTERVAL);
retryTimes++;
} catch (InterruptedException e) {
logger.error("Sleep failed", e);
}
if(retryTimes > 1000){
logger.warn("checking all processors are free, waiting {}ms", Config.Coordinator.CHECK_ALL_PROCESS_THREAD_INTERVAL * retryTimes);
retryTimes = 0;
}
}
// 查询当前有多少用户
List<String> users = AlarmMessageDao.selectAllUserIds();
// 将用户重新分配给服务
List<String> realRedistributeThread = allocationUser(
registeredThreads, users);
// 修改状态(分配完成)
changeStatus(realRedistributeThread,
ProcessThreadStatus.REDISTRIBUTE_SUCCESS);
// 检查所有的服务是否都处于忙碌状态
while (!checkAllProcessStatus(realRedistributeThread,
ProcessThreadStatus.BUSY)) {
try {
Thread.sleep(Config.Coordinator.CHECK_ALL_PROCESS_THREAD_INTERVAL);
} catch (InterruptedException e) {
logger.error("Sleep failed", e);
}
if(retryTimes > 1000){
logger.warn("checking all processors are busy, waiting {}ms", Config.Coordinator.CHECK_ALL_PROCESS_THREAD_INTERVAL * retryTimes);
retryTimes = 0;
}
}
} catch (Exception e) {
logger.error("Failed to coordinate, retry. ", e);
releaseCoordinator();
isCoordinator = false;
}
}
}
private boolean retryBecomeCoordinator() {
try {
return lock.acquire(
Config.Coordinator.RETRY_GET_COORDINATOR_LOCK_INTERVAL,
TimeUnit.SECONDS);
} catch (Exception e) {
logger.error("Failed to acquire lock .", e);
return false;
}
}
private void releaseCoordinator() {
if (lock != null && lock.isAcquiredInThisProcess()) {
try {
lock.release();
} catch (Exception e1) {
logger.error("Failed to release lock.", e1);
}
}
}
private List<String> allocationUser(List<String> registeredThreads,
List<String> userIds) {
List<String> realRedistributeThread = new ArrayList<String>();
Set<String> sortThreadIds = new HashSet<String>(registeredThreads);
int step = (int) Math.ceil(userIds.size() * 1.0 / sortThreadIds.size());
int start = 0;
int end = step;
if (end > userIds.size()) {
end = userIds.size();
}
for (String thread : sortThreadIds) {
if (!ZKUtil.exists(Config.ZKPath.REGISTER_SERVER_PATH + "/"
+ thread))
continue;
String value = ZKUtil
.getPathData(Config.ZKPath.REGISTER_SERVER_PATH + "/"
+ thread);
ProcessThreadValue value1 = new Gson().fromJson(value,
ProcessThreadValue.class);
value1.setDealUserIds(userIds.subList(start, end));
ZKUtil.setPathData(Config.ZKPath.REGISTER_SERVER_PATH + "/"
+ thread, new Gson().toJson(value1));
// 实际重新分配的线程Id
realRedistributeThread.add(thread);
start = end;
end += step;
if (start >= userIds.size()) {
break;
}
if (end > userIds.size()) {
end = userIds.size();
}
}
return realRedistributeThread;
}
private List<String> acquireAllRegisteredThread() {
return ZKUtil.getChildren(Config.ZKPath.REGISTER_SERVER_PATH);
}
private boolean checkAllProcessStatus(List<String> registeredThreadIds,
ProcessThreadStatus status) {
String registerPathPrefix = Config.ZKPath.REGISTER_SERVER_PATH + "/";
for (String threadId : registeredThreadIds) {
if (!ZKUtil.exists(Config.ZKPath.REGISTER_SERVER_PATH + "/"
+ threadId))
continue;
if (getProcessThreadStatus(registerPathPrefix, threadId) != status) {
return false;
}
}
return true;
}
private ProcessThreadStatus getProcessThreadStatus(
String registerPathPrefix, String threadId) {
if (!ZKUtil.exists(Config.ZKPath.REGISTER_SERVER_PATH + "/" + threadId))
return ProcessThreadStatus.FREE;
String value = ZKUtil.getPathData(registerPathPrefix + threadId);
if (value == null || value.length() == 0)
return ProcessThreadStatus.FREE;
ProcessThreadValue value1 = new Gson().fromJson(value,
ProcessThreadValue.class);
return ProcessThreadStatus.convert(value1.getStatus());
}
private void changeStatus(List<String> registeredThreadIds,
ProcessThreadStatus status) {
for (String threadId : registeredThreadIds) {
ProcessUtil.changeProcessThreadStatus(threadId, status);
}
}
public class RegisterServerWatcher implements CuratorWatcher {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
redistributing = true;
}
watcherRegisterServerPath();
}
}
private void watcherRegisterServerPath() {
try {
ZKUtil.getChildrenWithWatcher(Config.ZKPath.REGISTER_SERVER_PATH,
watcher);
} catch (Exception e) {
logger.error("Failed to set watcher for get children", e);
}
}
}
package com.ai.cloud.skywalking.alarm;
import com.ai.cloud.skywalking.alarm.dao.AlarmMessageDao;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class UserInfoInspector extends Thread {
private Logger logger = LogManager.getLogger(UserInfoInspector.class);
private int preUserSize;
public UserInfoInspector() {
preUserSize = AlarmMessageDao.selectUserCount();
}
@Override
public void run() {
int currentUserSize;
while (true) {
try {
Thread.sleep(10 * 1000L);
} catch (InterruptedException e) {
logger.error("Sleep Failed", e);
}
currentUserSize = AlarmMessageDao.selectUserCount();
if (currentUserSize != preUserSize) {
logger.info("Total user has been changed. Notice all process thread to change process date.");
for (AlarmMessageProcessThread thread : AlarmProcessServer.getProcessThreads()) {
//thread.setChanged(true);
}
}
}
}
}
......@@ -12,7 +12,6 @@ public class Config {
public static class ProcessThread {
public static long THREAD_WAIT_INTERVAL = 60 * 1000L;
//public static long THREAD_WAIT_INTERVAL = 1 * 1000L;
}
public static class ZKPath {
......@@ -42,7 +41,7 @@ public class Config {
// 单位:(毫秒)
public static long CHECK_REDISTRIBUTE_INTERVAL = 5 * 1000;
// 单位:(毫秒)
public static long CHECK_ALL_PROCESS_THREAD_INTERVAL = 100L;
public static long CHECK_ALL_PROCESS_THREAD_INTERVAL = 500L;
}
public static class DB {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册