未验证 提交 72daba58 编写于 作者: W wind 提交者: GitHub

[DS-6694][Master] Reduce the refresh resource interval of LowerWeight (#6695)

* [DS-6694][Master] Reduce the refresh resource interval of LowerWeight

* add server node update event handle
Co-authored-by: Ncaishunfeng <534328519@qq.com>
上级 5855c936
......@@ -184,6 +184,7 @@ public class HeartBeat {
* update server state
*/
public void updateServerState() {
this.reportTime = System.currentTimeMillis();
if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < reservedMemory) {
logger.warn("current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G",
loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, reservedMemory);
......
......@@ -148,14 +148,12 @@ public class ZookeeperRegistry implements Registry {
String dataPath = null;
switch (type) {
case NODE_ADDED:
dataPath = event.getData().getPath();
eventType = DataChangeEvent.ADD;
break;
case NODE_UPDATED:
eventType = DataChangeEvent.UPDATE;
dataPath = event.getData().getPath();
break;
case NODE_REMOVED:
eventType = DataChangeEvent.REMOVE;
......@@ -164,7 +162,7 @@ public class ZookeeperRegistry implements Registry {
default:
}
if (null != eventType && null != dataPath) {
ListenerManager.dataChange(path, dataPath, eventType);
ListenerManager.dataChange(path, dataPath, new String(event.getData().getData()), eventType);
}
};
treeCache.getListenable().addListener(treeCacheListener);
......
......@@ -115,7 +115,7 @@ public class ZookeeperRegistryTest {
class TestListener implements SubscribeListener {
@Override
public void notify(String path, DataChangeEvent dataChangeEvent) {
public void notify(String path, String data, DataChangeEvent dataChangeEvent) {
logger.info("I'm test listener");
}
}
......
......@@ -78,7 +78,7 @@ public class LowerWeightHostManager extends CommonHostManager {
this.workerHostWeightsMap = new ConcurrentHashMap<>();
this.lock = new ReentrantLock();
this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LowerWeightHostManagerExecutor"));
this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(), 0, 5, TimeUnit.SECONDS);
this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(), 0, 1, TimeUnit.SECONDS);
}
@PreDestroy
......
......@@ -39,9 +39,8 @@ public class MasterRegistryDataListener implements SubscribeListener {
masterRegistryClient = SpringApplicationContext.getBean(MasterRegistryClient.class);
}
@Override
public void notify(String path, DataChangeEvent event) {
public void notify(String path, String data, DataChangeEvent event) {
//monitor master
if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_MASTERS + Constants.SINGLE_SLASH)) {
handleMasterEvent(event, path);
......
......@@ -190,7 +190,7 @@ public class ServerNodeManager implements InitializingBean {
public void run() {
// sync worker node info
Map<String, String> newWorkerNodeInfo = registryClient.getServerMaps(NodeType.WORKER, true);
syncWorkerNodeInfo(newWorkerNodeInfo);
syncAllWorkerNodeInfo(newWorkerNodeInfo);
// sync worker group nodes from database
List<WorkerGroup> workerGroupList = workerGroupMapper.queryAllWorkerGroup();
......@@ -218,7 +218,7 @@ public class ServerNodeManager implements InitializingBean {
class WorkerDataListener implements SubscribeListener {
@Override
public void notify(String path, DataChangeEvent dataChangeEvent) {
public void notify(String path, String data, DataChangeEvent dataChangeEvent) {
if (registryClient.isWorkerPath(path)) {
try {
if (dataChangeEvent == DataChangeEvent.ADD) {
......@@ -233,6 +233,14 @@ public class ServerNodeManager implements InitializingBean {
Set<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
syncWorkerGroupNodes(group, currentNodes);
alertDao.sendServerStopedAlert(1, path, "WORKER");
} else if (dataChangeEvent == DataChangeEvent.UPDATE) {
logger.debug("worker group node : {} update, data: {}", path, data);
String group = parseGroup(path);
Set<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
syncWorkerGroupNodes(group, currentNodes);
String node = parseNode(path);
syncSingleWorkerNodeInfo(node, data);
}
} catch (IllegalArgumentException ex) {
logger.warn(ex.getMessage());
......@@ -251,6 +259,13 @@ public class ServerNodeManager implements InitializingBean {
return parts[parts.length - 2];
}
private String parseNode(String path) {
String[] parts = path.split("/");
if (parts.length < WORKER_LISTENER_CHECK_LENGTH) {
throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path));
}
return parts[parts.length - 1];
}
}
/**
......@@ -258,7 +273,7 @@ public class ServerNodeManager implements InitializingBean {
*/
class MasterDataListener implements SubscribeListener {
@Override
public void notify(String path, DataChangeEvent dataChangeEvent) {
public void notify(String path, String data, DataChangeEvent dataChangeEvent) {
if (registryClient.isMasterPath(path)) {
try {
if (dataChangeEvent.equals(DataChangeEvent.ADD)) {
......@@ -407,7 +422,7 @@ public class ServerNodeManager implements InitializingBean {
*
* @param newWorkerNodeInfo new worker node info
*/
private void syncWorkerNodeInfo(Map<String, String> newWorkerNodeInfo) {
private void syncAllWorkerNodeInfo(Map<String, String> newWorkerNodeInfo) {
workerNodeInfoLock.lock();
try {
workerNodeInfo.clear();
......@@ -417,6 +432,18 @@ public class ServerNodeManager implements InitializingBean {
}
}
/**
* sync single worker node info
*/
private void syncSingleWorkerNodeInfo(String node, String info) {
workerNodeInfoLock.lock();
try {
workerNodeInfo.put(node, info);
} finally {
workerNodeInfoLock.unlock();
}
}
/**
* destroy
*/
......
......@@ -55,12 +55,12 @@ public class ListenerManager {
*
*After the data changes, it is distributed to the corresponding listener for processing
*/
public static void dataChange(String key,String path, DataChangeEvent dataChangeEvent) {
public static void dataChange(String key,String path, String data, DataChangeEvent dataChangeEvent) {
SubscribeListener notifyListener = listeners.get(key);
if (null == notifyListener) {
return;
}
notifyListener.notify(path,dataChangeEvent);
notifyListener.notify(path, data, dataChangeEvent);
}
}
......@@ -25,6 +25,6 @@ public interface SubscribeListener {
/**
* Processing logic when the subscription node changes
*/
void notify(String path, DataChangeEvent dataChangeEvent);
void notify(String path, String data, DataChangeEvent dataChangeEvent);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册