diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java index 2b8fe7b93a48802da0f7f9f4103b753cb7a4a7b7..500248414480386c375e2b839de19f7be9ffde49 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java @@ -20,11 +20,11 @@ package org.apache.dolphinscheduler.server.master.dispatch.host; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.HeartBeat; import org.apache.dolphinscheduler.remote.utils.Host; -import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight; import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker; import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin; +import org.apache.dolphinscheduler.server.master.registry.WorkerInfoChangeListener; import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.apache.commons.collections.CollectionUtils; @@ -36,14 +36,10 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,23 +66,12 @@ public class LowerWeightHostManager extends CommonHostManager { */ private Lock lock; - /** - * executor service - */ - private ScheduledExecutorService executorService; - @PostConstruct public void init() { this.selector = new LowerWeightRoundRobin(); this.workerHostWeightsMap = new ConcurrentHashMap<>(); this.lock = new ReentrantLock(); - this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LowerWeightHostManagerExecutor")); - this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(), 0, 1, TimeUnit.SECONDS); - } - - @PreDestroy - public void close() { - this.executorService.shutdownNow(); + serverNodeManager.addWorkerInfoChangeListener(new WorkerWeightListener()); } /** @@ -109,6 +94,69 @@ public class LowerWeightHostManager extends CommonHostManager { throw new UnsupportedOperationException("not support"); } + + private class WorkerWeightListener implements WorkerInfoChangeListener { + @Override + public void notify(Map> workerGroups, Map workerNodeInfo) { + syncWorkerResources(workerGroups, workerNodeInfo); + } + } + + /** + * Sync worker resource. + * + * @param workerGroupNodes worker group nodes, key is worker group, value is worker group nodes. + * @param workerNodeInfoMap worker node info map, key is worker node, value is worker info. + */ + private void syncWorkerResources(final Map> workerGroupNodes, + final Map workerNodeInfoMap) { + try { + Map> workerHostWeights = new HashMap<>(); + for (Map.Entry> entry : workerGroupNodes.entrySet()) { + String workerGroup = entry.getKey(); + Set nodes = entry.getValue(); + Set hostWeights = new HashSet<>(nodes.size()); + for (String node : nodes) { + String heartbeat = workerNodeInfoMap.getOrDefault(node, null); + Optional hostWeightOpt = getHostWeight(node, workerGroup, heartbeat); + hostWeightOpt.ifPresent(hostWeights::add); + } + if (!hostWeights.isEmpty()) { + workerHostWeights.put(workerGroup, hostWeights); + } + } + syncWorkerHostWeight(workerHostWeights); + } catch (Throwable ex) { + logger.error("Sync worker resource error", ex); + } + } + + private Optional getHostWeight(String addr, String workerGroup, String heartBeatInfo) { + if (StringUtils.isEmpty(heartBeatInfo)) { + logger.warn("worker {} in work group {} have not received the heartbeat", addr, workerGroup); + return Optional.empty(); + } + HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo); + if (heartBeat == null) { + return Optional.empty(); + } + if (Constants.ABNORMAL_NODE_STATUS == heartBeat.getServerStatus()) { + logger.warn("worker {} current cpu load average {} is too high or available memory {}G is too low", + addr, heartBeat.getLoadAverage(), heartBeat.getAvailablePhysicalMemorySize()); + return Optional.empty(); + } + if (Constants.BUSY_NODE_STATUE == heartBeat.getServerStatus()) { + logger.warn("worker {} is busy, current waiting task count {} is large than worker thread count {}", + addr, heartBeat.getWorkerWaitingTaskCount(), heartBeat.getWorkerExecThreadCount()); + return Optional.empty(); + } + return Optional.of( + new HostWeight(HostWorker.of(addr, heartBeat.getWorkerHostWeight(), workerGroup), + heartBeat.getCpuUsage(), heartBeat.getMemoryUsage(), heartBeat.getLoadAverage(), + heartBeat.getWorkerWaitingTaskCount(), heartBeat.getStartupTime())); + } + + private void syncWorkerHostWeight(Map> workerHostWeights) { lock.lock(); try { @@ -128,58 +176,4 @@ public class LowerWeightHostManager extends CommonHostManager { } } - class RefreshResourceTask implements Runnable { - - @Override - public void run() { - try { - Map> workerHostWeights = new HashMap<>(); - Map> workerGroupNodes = serverNodeManager.getWorkerGroupNodes(); - for (Map.Entry> entry : workerGroupNodes.entrySet()) { - String workerGroup = entry.getKey(); - Set nodes = entry.getValue(); - Set hostWeights = new HashSet<>(nodes.size()); - for (String node : nodes) { - String heartbeat = serverNodeManager.getWorkerNodeInfo(node); - Optional hostWeightOpt = getHostWeight(node, workerGroup, heartbeat); - if (hostWeightOpt.isPresent()) { - hostWeights.add(hostWeightOpt.get()); - } - } - if (!hostWeights.isEmpty()) { - workerHostWeights.put(workerGroup, hostWeights); - } - } - syncWorkerHostWeight(workerHostWeights); - } catch (Throwable ex) { - logger.error("RefreshResourceTask error", ex); - } - } - - public Optional getHostWeight(String addr, String workerGroup, String heartBeatInfo) { - if (StringUtils.isEmpty(heartBeatInfo)) { - logger.warn("worker {} in work group {} have not received the heartbeat", addr, workerGroup); - return Optional.empty(); - } - HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo); - if (heartBeat == null) { - return Optional.empty(); - } - if (Constants.ABNORMAL_NODE_STATUS == heartBeat.getServerStatus()) { - logger.warn("worker {} current cpu load average {} is too high or available memory {}G is too low", - addr, heartBeat.getLoadAverage(), heartBeat.getAvailablePhysicalMemorySize()); - return Optional.empty(); - } - if (Constants.BUSY_NODE_STATUE == heartBeat.getServerStatus()) { - logger.warn("worker {} is busy, current waiting task count {} is large than worker thread count {}", - addr, heartBeat.getWorkerWaitingTaskCount(), heartBeat.getWorkerExecThreadCount()); - return Optional.empty(); - } - return Optional.of( - new HostWeight(HostWorker.of(addr, heartBeat.getWorkerHostWeight(), workerGroup), - heartBeat.getCpuUsage(), heartBeat.getMemoryUsage(), heartBeat.getLoadAverage(), - heartBeat.getWorkerWaitingTaskCount(), heartBeat.getStartupTime())); - } - } - } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java index 1993f6a30fc8cf504fc1f5d25669b4b604a2559d..b92cb6147361c187fc42de79babfdab6394838bc 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java @@ -38,6 +38,7 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -131,6 +132,8 @@ public class ServerNodeManager implements InitializingBean { @Autowired private MasterConfig masterConfig; + private List workerInfoChangeListeners = new ArrayList<>(); + private static volatile int MASTER_SLOT = 0; private static volatile int MASTER_SIZE = 0; @@ -217,6 +220,7 @@ public class ServerNodeManager implements InitializingBean { } } } + notifyWorkerInfoChangeListeners(); } catch (Exception e) { logger.error("WorkerNodeInfoAndGroupDbSyncTask error:", e); } @@ -256,6 +260,7 @@ public class ServerNodeManager implements InitializingBean { String node = parseNode(path); syncSingleWorkerNodeInfo(node, data); } + notifyWorkerInfoChangeListeners(); } catch (IllegalArgumentException ex) { logger.warn(ex.getMessage()); } catch (Exception ex) { @@ -457,6 +462,23 @@ public class ServerNodeManager implements InitializingBean { } } + /** + * Add the resource change listener, when the resource changed, the listener will be notified. + * + * @param listener will be trigger, when the worker node info changed. + */ + public synchronized void addWorkerInfoChangeListener(WorkerInfoChangeListener listener) { + workerInfoChangeListeners.add(listener); + } + + private void notifyWorkerInfoChangeListeners() { + Map> workerGroupNodes = getWorkerGroupNodes(); + Map workerNodeInfo = getWorkerNodeInfo(); + for (WorkerInfoChangeListener listener : workerInfoChangeListeners) { + listener.notify(workerGroupNodes, workerNodeInfo); + } + } + /** * destroy */ diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RefreshResourceTaskTest.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java similarity index 50% rename from dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RefreshResourceTaskTest.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java index 5e35b0b2ecb88f30f585139ddb6efdda91979b7d..f885a6fba04c5c7b3f7ca4f37beeff39b216129b 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RefreshResourceTaskTest.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/WorkerInfoChangeListener.java @@ -15,33 +15,22 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.master.dispatch.host; +package org.apache.dolphinscheduler.server.master.registry; -import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; - -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.InjectMocks; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; +import java.util.Map; +import java.util.Set; /** - * RefreshResourceTask test + * The listener used in {@link ServerNodeManager} to notify the change of worker info. */ -@RunWith(MockitoJUnitRunner.class) -public class RefreshResourceTaskTest { - - @Mock - private ServerNodeManager serverNodeManager; +public interface WorkerInfoChangeListener { - @InjectMocks - LowerWeightHostManager lowerWeightHostManager; + /** + * Used to notify the change of worker info. + * + * @param workerGroups worker groups map, key is worker group name, value is worker address. + * @param workerNodeInfo worker node info map, key is worker address, value is worker info. + */ + void notify(Map> workerGroups, Map workerNodeInfo); - @Test - public void testGetHostWeightWithResult() { - Assert.assertTrue(!lowerWeightHostManager.new RefreshResourceTask() - .getHostWeight("192.168.1.1:22", "default", null) - .isPresent()); - } -} \ No newline at end of file +}