提交 7b47658d 编写于 作者: Z zhangxin10

Merge remote-tracking branch 'origin/master'

Conflicts:
	skywalking-alarm/src/main/java/com/ai/cloud/skywalking/alarm/UserInfoCoordinator.java
......@@ -14,240 +14,230 @@ import org.apache.logging.log4j.Logger;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.util.*;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
public class UserInfoCoordinator extends Thread {
private Logger logger = LogManager.getLogger(UserInfoCoordinator.class);
private boolean redistributing;
private RegisterServerWatcher watcher = new RegisterServerWatcher();
private InterProcessMutex lock = new InterProcessMutex(
ZKUtil.getZkClient(), Config.ZKPath.COORDINATOR_PATH);
private boolean isCoordinator = false;
public UserInfoCoordinator() {
super("UserInfoCoordinator");
}
@Override
public void run() {
while (true) {
try {
if (!isCoordinator) {
logger.info("Begin to retry become a coordinator....");
while (!retryBecomeCoordinator()) {
try {
Thread.sleep(Config.Coordinator.RETRY_BECOME_COORDINATOR_WAIT_TIME);
} catch (Exception e) {
logger.error("Sleep Failed.", e);
}
}
logger.info("Become a coordinator.");
isCoordinator = true;
watcherRegisterServerPath();
redistributing = true;
}
// 检查是否有新服务注册或者在重分配过程做有新处理线程启动了
if (!redistributing) {
try {
Thread.sleep(Config.Coordinator.CHECK_REDISTRIBUTE_INTERVAL);
} catch (InterruptedException e) {
logger.error("Sleep error", e);
}
continue;
}
redistributing = false;
// 获取当前所有的注册的处理线程
List<String> registeredThreads = acquireAllRegisteredThread();
logger.info("Query a total of {} processing threads", registeredThreads.size());
// 修改状态 (开始重新分配状态)
changeStatus(registeredThreads,
ProcessThreadStatus.REDISTRIBUTING);
// 检查所有的服务是否都处于空闲状态
int retryTimes = 0;
while (!checkAllProcessStatus(registeredThreads,
ProcessThreadStatus.FREE)) {
try {
Thread.sleep(Config.Coordinator.CHECK_ALL_PROCESS_THREAD_INTERVAL);
retryTimes++;
} catch (InterruptedException e) {
logger.error("Sleep failed", e);
}
if (retryTimes > 1000) {
logger.warn("checking all processors are free, waiting {}ms", Config.Coordinator.CHECK_ALL_PROCESS_THREAD_INTERVAL * retryTimes);
retryTimes = 0;
}
}
// 查询当前有多少用户
List<String> users = AlarmMessageDao.selectAllUserIds();
logger.info("Query a total of {} process user", users.size());
// 将用户重新分配给服务
List<String> realRedistributeThread = allocationUser(
registeredThreads, users);
logger.info("Assign the user to be processed to {} processing threads", realRedistributeThread.size());
// 修改状态(分配完成)
changeStatus(realRedistributeThread,
ProcessThreadStatus.REDISTRIBUTE_SUCCESS);
logger.info("Change the state of {} processing threads to be busy", realRedistributeThread.size());
// 检查所有的服务是否都处于忙碌状态
while (!checkAllProcessStatus(realRedistributeThread,
ProcessThreadStatus.BUSY)) {
try {
Thread.sleep(Config.Coordinator.CHECK_ALL_PROCESS_THREAD_INTERVAL);
} catch (InterruptedException e) {
logger.error("Sleep failed", e);
}
if (retryTimes > 1000) {
logger.warn("checking all processors are busy, waiting {}ms", Config.Coordinator.CHECK_ALL_PROCESS_THREAD_INTERVAL * retryTimes);
retryTimes = 0;
}
}
} catch (Exception e) {
logger.error("Failed to coordinate, retry. ", e);
releaseCoordinator();
isCoordinator = false;
}
}
}
private boolean retryBecomeCoordinator() {
try {
return lock.acquire(
Config.Coordinator.RETRY_GET_COORDINATOR_LOCK_INTERVAL,
TimeUnit.SECONDS);
} catch (Exception e) {
logger.error("Failed to acquire lock .", e);
return false;
}
}
private void releaseCoordinator() {
if (lock != null && lock.isAcquiredInThisProcess()) {
try {
lock.release();
} catch (Exception e1) {
logger.error("Failed to release lock.", e1);
}
}
}
private List<String> allocationUser(List<String> registeredThreads,
List<String> userIds) throws Exception {
List<String> realRedistributeThread = new ArrayList<String>();
Set<String> sortThreadIds = new HashSet<String>(registeredThreads);
int step = (int) Math.ceil(userIds.size() * 1.0 / sortThreadIds.size());
int start = 0;
int end = step;
if (end > userIds.size()) {
end = userIds.size();
}
for (String thread : sortThreadIds) {
if (!ZKUtil.exists(Config.ZKPath.REGISTER_SERVER_PATH + "/"
+ thread))
continue;
String value = ZKUtil
.getPathData(Config.ZKPath.REGISTER_SERVER_PATH + "/"
+ thread);
ProcessThreadValue value1 = new Gson().fromJson(value,
ProcessThreadValue.class);
value1.setDealUserIds(userIds.subList(start, end));
ZKUtil.setPathData(Config.ZKPath.REGISTER_SERVER_PATH + "/"
+ thread, new Gson().toJson(value1));
// 实际重新分配的线程Id
realRedistributeThread.add(thread);
start = end;
end += step;
if (start >= userIds.size()) {
break;
}
if (end > userIds.size()) {
end = userIds.size();
}
}
return realRedistributeThread;
}
private List<String> acquireAllRegisteredThread() throws Exception {
return ZKUtil.getChildren(Config.ZKPath.REGISTER_SERVER_PATH);
}
private boolean checkAllProcessStatus(List<String> registeredThreadIds,
ProcessThreadStatus status) throws Exception {
String registerPathPrefix = Config.ZKPath.REGISTER_SERVER_PATH + "/";
for (String threadId : registeredThreadIds) {
if (!ZKUtil.exists(Config.ZKPath.REGISTER_SERVER_PATH + "/"
+ threadId))
continue;
if (getProcessThreadStatus(registerPathPrefix, threadId) != status) {
return false;
}
}
return true;
}
private ProcessThreadStatus getProcessThreadStatus(
String registerPathPrefix, String threadId) throws Exception {
if (!ZKUtil.exists(Config.ZKPath.REGISTER_SERVER_PATH + "/" + threadId))
return ProcessThreadStatus.FREE;
String value = ZKUtil.getPathData(registerPathPrefix + threadId);
if (value == null || value.length() == 0)
return ProcessThreadStatus.FREE;
ProcessThreadValue value1 = new Gson().fromJson(value,
ProcessThreadValue.class);
return ProcessThreadStatus.convert(value1.getStatus());
}
private void changeStatus(List<String> registeredThreadIds,
ProcessThreadStatus status) throws Exception {
Iterator<String> threadIterator = registeredThreadIds.iterator();
String threadId;
while (threadIterator.hasNext()) {
threadId = threadIterator.next();
if (!checkProcessThreadIsOnline(threadId)) {
threadIterator.remove();
}
ProcessUtil.changeProcessThreadStatus(threadId, status);
}
}
private boolean checkProcessThreadIsOnline(String threadId) {
return ZKUtil.exists(Config.ZKPath.REGISTER_SERVER_PATH + "/" + threadId);
}
public class RegisterServerWatcher implements CuratorWatcher {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
logger.info("The number of process thread has changed. The alarm service will reallocate processing data.");
redistributing = true;
}
watcherRegisterServerPath();
}
}
private void watcherRegisterServerPath() {
try {
ZKUtil.getChildrenWithWatcher(Config.ZKPath.REGISTER_SERVER_PATH,
watcher);
} catch (Exception e) {
logger.error("Failed to set watcher for get children", e);
}
}
private Logger logger = LogManager.getLogger(UserInfoCoordinator.class);
private boolean redistributing;
private RegisterServerWatcher watcher = new RegisterServerWatcher();
private InterProcessMutex lock = new InterProcessMutex(
ZKUtil.getZkClient(), Config.ZKPath.COORDINATOR_PATH);
private boolean isCoordinator = false;
public UserInfoCoordinator() {
}
@Override
public void run() {
while (true) {
try {
if (!isCoordinator) {
while (!retryBecomeCoordinator()) {
try {
Thread.sleep(Config.Coordinator.RETRY_BECOME_COORDINATOR_WAIT_TIME);
} catch (Exception e) {
logger.error("Sleep Failed.", e);
}
}
isCoordinator = true;
watcherRegisterServerPath();
redistributing = true;
}
// 检查是否有新服务注册或者在重分配过程做有新处理线程启动了
if (!redistributing) {
try {
Thread.sleep(Config.Coordinator.CHECK_REDISTRIBUTE_INTERVAL);
} catch (InterruptedException e) {
logger.error("Sleep error", e);
}
continue;
}
redistributing = false;
// 获取当前所有的注册的处理线程
List<String> registeredThreads = acquireAllRegisteredThread();
// 修改状态 (开始重新分配状态)
changeStatus(registeredThreads,
ProcessThreadStatus.REDISTRIBUTING);
// 检查所有的服务是否都处于空闲状态
int retryTimes = 0;
while (!checkAllProcessStatus(registeredThreads,
ProcessThreadStatus.FREE)) {
try {
Thread.sleep(Config.Coordinator.CHECK_ALL_PROCESS_THREAD_INTERVAL);
retryTimes++;
} catch (InterruptedException e) {
logger.error("Sleep failed", e);
}
if(retryTimes > 1000){
logger.warn("checking all processors are free, waiting {}ms", Config.Coordinator.CHECK_ALL_PROCESS_THREAD_INTERVAL * retryTimes);
retryTimes = 0;
}
}
// 查询当前有多少用户
List<String> users = AlarmMessageDao.selectAllUserIds();
// 将用户重新分配给服务
List<String> realRedistributeThread = allocationUser(
registeredThreads, users);
// 修改状态(分配完成)
changeStatus(realRedistributeThread,
ProcessThreadStatus.REDISTRIBUTE_SUCCESS);
// 检查所有的服务是否都处于忙碌状态
while (!checkAllProcessStatus(realRedistributeThread,
ProcessThreadStatus.BUSY)) {
try {
Thread.sleep(Config.Coordinator.CHECK_ALL_PROCESS_THREAD_INTERVAL);
} catch (InterruptedException e) {
logger.error("Sleep failed", e);
}
if(retryTimes > 1000){
logger.warn("checking all processors are busy, waiting {}ms", Config.Coordinator.CHECK_ALL_PROCESS_THREAD_INTERVAL * retryTimes);
retryTimes = 0;
}
}
} catch (Exception e) {
logger.error("Failed to coordinate, retry. ", e);
releaseCoordinator();
isCoordinator = false;
}
}
}
private boolean retryBecomeCoordinator() {
try {
return lock.acquire(
Config.Coordinator.RETRY_GET_COORDINATOR_LOCK_INTERVAL,
TimeUnit.SECONDS);
} catch (Exception e) {
logger.error("Failed to acquire lock .", e);
return false;
}
}
private void releaseCoordinator() {
if (lock != null && lock.isAcquiredInThisProcess()) {
try {
lock.release();
} catch (Exception e1) {
logger.error("Failed to release lock.", e1);
}
}
}
private List<String> allocationUser(List<String> registeredThreads,
List<String> userIds) {
List<String> realRedistributeThread = new ArrayList<String>();
Set<String> sortThreadIds = new HashSet<String>(registeredThreads);
int step = (int) Math.ceil(userIds.size() * 1.0 / sortThreadIds.size());
int start = 0;
int end = step;
if (end > userIds.size()) {
end = userIds.size();
}
for (String thread : sortThreadIds) {
if (!ZKUtil.exists(Config.ZKPath.REGISTER_SERVER_PATH + "/"
+ thread))
continue;
String value = ZKUtil
.getPathData(Config.ZKPath.REGISTER_SERVER_PATH + "/"
+ thread);
ProcessThreadValue value1 = new Gson().fromJson(value,
ProcessThreadValue.class);
value1.setDealUserIds(userIds.subList(start, end));
ZKUtil.setPathData(Config.ZKPath.REGISTER_SERVER_PATH + "/"
+ thread, new Gson().toJson(value1));
// 实际重新分配的线程Id
realRedistributeThread.add(thread);
start = end;
end += step;
if (start >= userIds.size()) {
break;
}
if (end > userIds.size()) {
end = userIds.size();
}
}
return realRedistributeThread;
}
private List<String> acquireAllRegisteredThread() {
return ZKUtil.getChildren(Config.ZKPath.REGISTER_SERVER_PATH);
}
private boolean checkAllProcessStatus(List<String> registeredThreadIds,
ProcessThreadStatus status) {
String registerPathPrefix = Config.ZKPath.REGISTER_SERVER_PATH + "/";
for (String threadId : registeredThreadIds) {
if (!ZKUtil.exists(Config.ZKPath.REGISTER_SERVER_PATH + "/"
+ threadId))
continue;
if (getProcessThreadStatus(registerPathPrefix, threadId) != status) {
return false;
}
}
return true;
}
private ProcessThreadStatus getProcessThreadStatus(
String registerPathPrefix, String threadId) {
if (!ZKUtil.exists(Config.ZKPath.REGISTER_SERVER_PATH + "/" + threadId))
return ProcessThreadStatus.FREE;
String value = ZKUtil.getPathData(registerPathPrefix + threadId);
if (value == null || value.length() == 0)
return ProcessThreadStatus.FREE;
ProcessThreadValue value1 = new Gson().fromJson(value,
ProcessThreadValue.class);
return ProcessThreadStatus.convert(value1.getStatus());
}
private void changeStatus(List<String> registeredThreadIds,
ProcessThreadStatus status) {
for (String threadId : registeredThreadIds) {
ProcessUtil.changeProcessThreadStatus(threadId, status);
}
}
public class RegisterServerWatcher implements CuratorWatcher {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
redistributing = true;
}
watcherRegisterServerPath();
}
}
private void watcherRegisterServerPath() {
try {
ZKUtil.getChildrenWithWatcher(Config.ZKPath.REGISTER_SERVER_PATH,
watcher);
} catch (Exception e) {
logger.error("Failed to set watcher for get children", e);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册