未验证 提交 b100f6c4 编写于 作者: W Wenjun Ruan 提交者: GitHub

Remove the schedule thread in LowerWeightHostManager (#10310)

上级 36e20cdf
......@@ -20,11 +20,11 @@ package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.HeartBeat;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin;
import org.apache.dolphinscheduler.server.master.registry.WorkerInfoChangeListener;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections.CollectionUtils;
......@@ -36,14 +36,10 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -70,23 +66,12 @@ public class LowerWeightHostManager extends CommonHostManager {
*/
private Lock lock;
/**
* executor service
*/
private ScheduledExecutorService executorService;
@PostConstruct
public void init() {
this.selector = new LowerWeightRoundRobin();
this.workerHostWeightsMap = new ConcurrentHashMap<>();
this.lock = new ReentrantLock();
this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LowerWeightHostManagerExecutor"));
this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(), 0, 1, TimeUnit.SECONDS);
}
@PreDestroy
public void close() {
this.executorService.shutdownNow();
serverNodeManager.addWorkerInfoChangeListener(new WorkerWeightListener());
}
/**
......@@ -109,6 +94,69 @@ public class LowerWeightHostManager extends CommonHostManager {
throw new UnsupportedOperationException("not support");
}
private class WorkerWeightListener implements WorkerInfoChangeListener {
@Override
public void notify(Map<String, Set<String>> workerGroups, Map<String, String> workerNodeInfo) {
syncWorkerResources(workerGroups, workerNodeInfo);
}
}
/**
* Sync worker resource.
*
* @param workerGroupNodes worker group nodes, key is worker group, value is worker group nodes.
* @param workerNodeInfoMap worker node info map, key is worker node, value is worker info.
*/
private void syncWorkerResources(final Map<String, Set<String>> workerGroupNodes,
final Map<String, String> workerNodeInfoMap) {
try {
Map<String, Set<HostWeight>> workerHostWeights = new HashMap<>();
for (Map.Entry<String, Set<String>> entry : workerGroupNodes.entrySet()) {
String workerGroup = entry.getKey();
Set<String> nodes = entry.getValue();
Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
for (String node : nodes) {
String heartbeat = workerNodeInfoMap.getOrDefault(node, null);
Optional<HostWeight> hostWeightOpt = getHostWeight(node, workerGroup, heartbeat);
hostWeightOpt.ifPresent(hostWeights::add);
}
if (!hostWeights.isEmpty()) {
workerHostWeights.put(workerGroup, hostWeights);
}
}
syncWorkerHostWeight(workerHostWeights);
} catch (Throwable ex) {
logger.error("Sync worker resource error", ex);
}
}
private Optional<HostWeight> getHostWeight(String addr, String workerGroup, String heartBeatInfo) {
if (StringUtils.isEmpty(heartBeatInfo)) {
logger.warn("worker {} in work group {} have not received the heartbeat", addr, workerGroup);
return Optional.empty();
}
HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo);
if (heartBeat == null) {
return Optional.empty();
}
if (Constants.ABNORMAL_NODE_STATUS == heartBeat.getServerStatus()) {
logger.warn("worker {} current cpu load average {} is too high or available memory {}G is too low",
addr, heartBeat.getLoadAverage(), heartBeat.getAvailablePhysicalMemorySize());
return Optional.empty();
}
if (Constants.BUSY_NODE_STATUE == heartBeat.getServerStatus()) {
logger.warn("worker {} is busy, current waiting task count {} is large than worker thread count {}",
addr, heartBeat.getWorkerWaitingTaskCount(), heartBeat.getWorkerExecThreadCount());
return Optional.empty();
}
return Optional.of(
new HostWeight(HostWorker.of(addr, heartBeat.getWorkerHostWeight(), workerGroup),
heartBeat.getCpuUsage(), heartBeat.getMemoryUsage(), heartBeat.getLoadAverage(),
heartBeat.getWorkerWaitingTaskCount(), heartBeat.getStartupTime()));
}
private void syncWorkerHostWeight(Map<String, Set<HostWeight>> workerHostWeights) {
lock.lock();
try {
......@@ -128,58 +176,4 @@ public class LowerWeightHostManager extends CommonHostManager {
}
}
class RefreshResourceTask implements Runnable {
@Override
public void run() {
try {
Map<String, Set<HostWeight>> workerHostWeights = new HashMap<>();
Map<String, Set<String>> workerGroupNodes = serverNodeManager.getWorkerGroupNodes();
for (Map.Entry<String, Set<String>> entry : workerGroupNodes.entrySet()) {
String workerGroup = entry.getKey();
Set<String> nodes = entry.getValue();
Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
for (String node : nodes) {
String heartbeat = serverNodeManager.getWorkerNodeInfo(node);
Optional<HostWeight> hostWeightOpt = getHostWeight(node, workerGroup, heartbeat);
if (hostWeightOpt.isPresent()) {
hostWeights.add(hostWeightOpt.get());
}
}
if (!hostWeights.isEmpty()) {
workerHostWeights.put(workerGroup, hostWeights);
}
}
syncWorkerHostWeight(workerHostWeights);
} catch (Throwable ex) {
logger.error("RefreshResourceTask error", ex);
}
}
public Optional<HostWeight> getHostWeight(String addr, String workerGroup, String heartBeatInfo) {
if (StringUtils.isEmpty(heartBeatInfo)) {
logger.warn("worker {} in work group {} have not received the heartbeat", addr, workerGroup);
return Optional.empty();
}
HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo);
if (heartBeat == null) {
return Optional.empty();
}
if (Constants.ABNORMAL_NODE_STATUS == heartBeat.getServerStatus()) {
logger.warn("worker {} current cpu load average {} is too high or available memory {}G is too low",
addr, heartBeat.getLoadAverage(), heartBeat.getAvailablePhysicalMemorySize());
return Optional.empty();
}
if (Constants.BUSY_NODE_STATUE == heartBeat.getServerStatus()) {
logger.warn("worker {} is busy, current waiting task count {} is large than worker thread count {}",
addr, heartBeat.getWorkerWaitingTaskCount(), heartBeat.getWorkerExecThreadCount());
return Optional.empty();
}
return Optional.of(
new HostWeight(HostWorker.of(addr, heartBeat.getWorkerHostWeight(), workerGroup),
heartBeat.getCpuUsage(), heartBeat.getMemoryUsage(), heartBeat.getLoadAverage(),
heartBeat.getWorkerWaitingTaskCount(), heartBeat.getStartupTime()));
}
}
}
......@@ -38,6 +38,7 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
......@@ -131,6 +132,8 @@ public class ServerNodeManager implements InitializingBean {
@Autowired
private MasterConfig masterConfig;
private List<WorkerInfoChangeListener> workerInfoChangeListeners = new ArrayList<>();
private static volatile int MASTER_SLOT = 0;
private static volatile int MASTER_SIZE = 0;
......@@ -217,6 +220,7 @@ public class ServerNodeManager implements InitializingBean {
}
}
}
notifyWorkerInfoChangeListeners();
} catch (Exception e) {
logger.error("WorkerNodeInfoAndGroupDbSyncTask error:", e);
}
......@@ -256,6 +260,7 @@ public class ServerNodeManager implements InitializingBean {
String node = parseNode(path);
syncSingleWorkerNodeInfo(node, data);
}
notifyWorkerInfoChangeListeners();
} catch (IllegalArgumentException ex) {
logger.warn(ex.getMessage());
} catch (Exception ex) {
......@@ -457,6 +462,23 @@ public class ServerNodeManager implements InitializingBean {
}
}
/**
* Add the resource change listener, when the resource changed, the listener will be notified.
*
* @param listener will be trigger, when the worker node info changed.
*/
public synchronized void addWorkerInfoChangeListener(WorkerInfoChangeListener listener) {
workerInfoChangeListeners.add(listener);
}
private void notifyWorkerInfoChangeListeners() {
Map<String, Set<String>> workerGroupNodes = getWorkerGroupNodes();
Map<String, String> workerNodeInfo = getWorkerNodeInfo();
for (WorkerInfoChangeListener listener : workerInfoChangeListeners) {
listener.notify(workerGroupNodes, workerNodeInfo);
}
}
/**
* destroy
*/
......
......@@ -15,33 +15,22 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.host;
package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.Map;
import java.util.Set;
/**
* RefreshResourceTask test
* The listener used in {@link ServerNodeManager} to notify the change of worker info.
*/
@RunWith(MockitoJUnitRunner.class)
public class RefreshResourceTaskTest {
@Mock
private ServerNodeManager serverNodeManager;
public interface WorkerInfoChangeListener {
@InjectMocks
LowerWeightHostManager lowerWeightHostManager;
/**
* Used to notify the change of worker info.
*
* @param workerGroups worker groups map, key is worker group name, value is worker address.
* @param workerNodeInfo worker node info map, key is worker address, value is worker info.
*/
void notify(Map<String, Set<String>> workerGroups, Map<String, String> workerNodeInfo);
@Test
public void testGetHostWeightWithResult() {
Assert.assertTrue(!lowerWeightHostManager.new RefreshResourceTask()
.getHostWeight("192.168.1.1:22", "default", null)
.isPresent());
}
}
\ No newline at end of file
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册