未验证 提交 e6da1ccf 编写于 作者: W Wenjun Ruan 提交者: GitHub

Add worker-group-refresh-interval in master config (#12601)

* Add worker-group-refresh-interval in master config

* Set interval cannot smaller than 10s

* Update dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
Co-authored-by: Nkezhenxu94 <kezhenxu94@apache.org>
上级 d84f1ef2
...@@ -277,6 +277,7 @@ Location: `master-server/conf/application.yaml` ...@@ -277,6 +277,7 @@ Location: `master-server/conf/application.yaml`
|master.kill-yarn-job-when-task-failover|true|whether to kill yarn job when failover taskInstance| |master.kill-yarn-job-when-task-failover|true|whether to kill yarn job when failover taskInstance|
|master.registry-disconnect-strategy.strategy|stop|Used when the master disconnect from registry, default value: stop. Optional values include stop, waiting| |master.registry-disconnect-strategy.strategy|stop|Used when the master disconnect from registry, default value: stop. Optional values include stop, waiting|
|master.registry-disconnect-strategy.max-waiting-time|100s|Used when the master disconnect from registry, and the disconnect strategy is waiting, this config means the master will waiting to reconnect to registry in given times, and after the waiting times, if the master still cannot connect to registry, will stop itself, if the value is 0s, the Master will waitting infinitely| |master.registry-disconnect-strategy.max-waiting-time|100s|Used when the master disconnect from registry, and the disconnect strategy is waiting, this config means the master will waiting to reconnect to registry in given times, and after the waiting times, if the master still cannot connect to registry, will stop itself, if the value is 0s, the Master will waitting infinitely|
|master.worker-group-refresh-interval|10s|The interval to refresh worker group from db to memory|
### Worker Server related configuration ### Worker Server related configuration
......
...@@ -272,6 +272,7 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn相关的配置 ...@@ -272,6 +272,7 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn相关的配置
|master.kill-yarn-job-when-task-failover|true|当任务实例failover时,是否kill掉yarn job| |master.kill-yarn-job-when-task-failover|true|当任务实例failover时,是否kill掉yarn job|
|master.registry-disconnect-strategy.strategy|stop|当Master与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting| |master.registry-disconnect-strategy.strategy|stop|当Master与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting|
|master.registry-disconnect-strategy.max-waiting-time|100s|当Master与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 该值表示当Master与注册中心失联时会在给定时间之内进行重连, 在给定时间之内重连失败将会停止自己,在重连时,Master会丢弃目前正在执行的工作流,值为0表示会无限期等待 | |master.registry-disconnect-strategy.max-waiting-time|100s|当Master与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 该值表示当Master与注册中心失联时会在给定时间之内进行重连, 在给定时间之内重连失败将会停止自己,在重连时,Master会丢弃目前正在执行的工作流,值为0表示会无限期等待 |
|master.master.worker-group-refresh-interval|10s|定期将workerGroup从数据库中同步到内存的时间间隔|
## Worker Server相关配置 ## Worker Server相关配置
......
...@@ -93,6 +93,8 @@ public class MasterConfig implements Validator { ...@@ -93,6 +93,8 @@ public class MasterConfig implements Validator {
private boolean killYarnJobWhenTaskFailover = true; private boolean killYarnJobWhenTaskFailover = true;
private ConnectStrategyProperties registryDisconnectStrategy = new ConnectStrategyProperties(); private ConnectStrategyProperties registryDisconnectStrategy = new ConnectStrategyProperties();
private Duration workerGroupRefreshInterval = Duration.ofSeconds(10L);
// ip:listenPort // ip:listenPort
private String masterAddress; private String masterAddress;
...@@ -140,6 +142,10 @@ public class MasterConfig implements Validator { ...@@ -140,6 +142,10 @@ public class MasterConfig implements Validator {
if (masterConfig.getMaxCpuLoadAvg() <= 0) { if (masterConfig.getMaxCpuLoadAvg() <= 0) {
masterConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2); masterConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
} }
if (masterConfig.getWorkerGroupRefreshInterval().getSeconds() < 10) {
errors.rejectValue("worker-group-refresh-interval", null, "should >= 10s");
}
masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort())); masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort()));
masterConfig.setMasterRegistryPath(REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + masterConfig.getMasterAddress()); masterConfig.setMasterRegistryPath(REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + masterConfig.getMasterAddress());
printConfig(); printConfig();
...@@ -163,5 +169,6 @@ public class MasterConfig implements Validator { ...@@ -163,5 +169,6 @@ public class MasterConfig implements Validator {
logger.info("Master config: registryDisconnectStrategy -> {} ", registryDisconnectStrategy); logger.info("Master config: registryDisconnectStrategy -> {} ", registryDisconnectStrategy);
logger.info("Master config: masterAddress -> {} ", masterAddress); logger.info("Master config: masterAddress -> {} ", masterAddress);
logger.info("Master config: masterRegistryPath -> {} ", masterRegistryPath); logger.info("Master config: masterRegistryPath -> {} ", masterRegistryPath);
logger.info("Master config: workerGroupRefreshInterval -> {} ", workerGroupRefreshInterval);
} }
} }
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.master.dispatch.host; package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
...@@ -78,8 +77,9 @@ public abstract class CommonHostManager implements HostManager { ...@@ -78,8 +77,9 @@ public abstract class CommonHostManager implements HostManager {
Set<String> nodes = serverNodeManager.getWorkerGroupNodes(workerGroup); Set<String> nodes = serverNodeManager.getWorkerGroupNodes(workerGroup);
if (CollectionUtils.isNotEmpty(nodes)) { if (CollectionUtils.isNotEmpty(nodes)) {
for (String node : nodes) { for (String node : nodes) {
WorkerHeartBeat workerNodeInfo = serverNodeManager.getWorkerNodeInfo(node); serverNodeManager.getWorkerNodeInfo(node).ifPresent(
hostWorkers.add(HostWorker.of(node, workerNodeInfo.getWorkerHostWeight(), workerGroup)); workerNodeInfo -> hostWorkers
.add(HostWorker.of(node, workerNodeInfo.getWorkerHostWeight(), workerGroup)));
} }
} }
return hostWorkers; return hostWorkers;
......
...@@ -48,6 +48,7 @@ import java.util.HashMap; ...@@ -48,6 +48,7 @@ import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
...@@ -66,9 +67,6 @@ import org.springframework.beans.factory.InitializingBean; ...@@ -66,9 +67,6 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
/**
* server node manager
*/
@Service @Service
public class ServerNodeManager implements InitializingBean { public class ServerNodeManager implements InitializingBean {
...@@ -89,9 +87,6 @@ public class ServerNodeManager implements InitializingBean { ...@@ -89,9 +87,6 @@ public class ServerNodeManager implements InitializingBean {
*/ */
private final ConcurrentHashMap<String, Set<String>> workerGroupNodes = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, Set<String>> workerGroupNodes = new ConcurrentHashMap<>();
/**
* master nodes
*/
private final Set<String> masterNodes = new HashSet<>(); private final Set<String> masterNodes = new HashSet<>();
private final Map<String, WorkerHeartBeat> workerNodeInfo = new HashMap<>(); private final Map<String, WorkerHeartBeat> workerNodeInfo = new HashMap<>();
...@@ -115,35 +110,36 @@ public class ServerNodeManager implements InitializingBean { ...@@ -115,35 +110,36 @@ public class ServerNodeManager implements InitializingBean {
@Autowired @Autowired
private MasterConfig masterConfig; private MasterConfig masterConfig;
private List<WorkerInfoChangeListener> workerInfoChangeListeners = new ArrayList<>(); private final List<WorkerInfoChangeListener> workerInfoChangeListeners = new ArrayList<>();
private static volatile int MASTER_SLOT = 0; private volatile int currentSlot = 0;
private static volatile int MASTER_SIZE = 0; private volatile int totalSlot = 0;
public static int getSlot() { public int getSlot() {
return MASTER_SLOT; return currentSlot;
} }
public static int getMasterSize() { public int getMasterSize() {
return MASTER_SIZE; return totalSlot;
} }
/**
* init listener
*
* @throws Exception if error throws Exception
*/
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() {
// load nodes from zookeeper // load nodes from zookeeper
load(); updateMasterNodes();
updateWorkerNodes();
updateWorkerGroupMappings();
// init executor service // init executor service
executorService = executorService =
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor")); Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor"));
executorService.scheduleWithFixedDelay(new WorkerNodeInfoAndGroupDbSyncTask(), 0, 10, TimeUnit.SECONDS); executorService.scheduleWithFixedDelay(
new WorkerNodeInfoAndGroupDbSyncTask(),
0,
masterConfig.getWorkerGroupRefreshInterval().getSeconds(),
TimeUnit.SECONDS);
// init MasterNodeListener listener // init MasterNodeListener listener
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_MASTERS, new MasterDataListener()); registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_MASTERS, new MasterDataListener());
...@@ -152,19 +148,6 @@ public class ServerNodeManager implements InitializingBean { ...@@ -152,19 +148,6 @@ public class ServerNodeManager implements InitializingBean {
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_WORKERS, new WorkerDataListener()); registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_WORKERS, new WorkerDataListener());
} }
/**
* load nodes from zookeeper
*/
public void load() {
// master nodes from zookeeper
updateMasterNodes();
updateWorkerNodes();
updateWorkerGroupMappings();
}
/**
* worker node info and worker group db sync task
*/
class WorkerNodeInfoAndGroupDbSyncTask implements Runnable { class WorkerNodeInfoAndGroupDbSyncTask implements Runnable {
@Override @Override
...@@ -251,8 +234,8 @@ public class ServerNodeManager implements InitializingBean { ...@@ -251,8 +234,8 @@ public class ServerNodeManager implements InitializingBean {
} }
private void updateMasterNodes() { private void updateMasterNodes() {
MASTER_SLOT = 0; currentSlot = 0;
MASTER_SIZE = 0; totalSlot = 0;
this.masterNodes.clear(); this.masterNodes.clear();
String nodeLock = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS; String nodeLock = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS;
try { try {
...@@ -325,14 +308,12 @@ public class ServerNodeManager implements InitializingBean { ...@@ -325,14 +308,12 @@ public class ServerNodeManager implements InitializingBean {
this.masterPriorityQueue.putList(masterNodes); this.masterPriorityQueue.putList(masterNodes);
int index = masterPriorityQueue.getIndex(masterConfig.getMasterAddress()); int index = masterPriorityQueue.getIndex(masterConfig.getMasterAddress());
if (index >= 0) { if (index >= 0) {
MASTER_SIZE = nodes.size(); totalSlot = nodes.size();
MASTER_SLOT = index; currentSlot = index;
} else { } else {
logger.warn("current addr:{} is not in active master list", logger.warn("Current master is not in active master list");
masterConfig.getMasterAddress());
} }
logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, logger.info("Update master nodes, total master size: {}, current slot: {}", totalSlot, currentSlot);
MASTER_SLOT, masterConfig.getMasterAddress());
} finally { } finally {
masterLock.unlock(); masterLock.unlock();
} }
...@@ -360,10 +341,10 @@ public class ServerNodeManager implements InitializingBean { ...@@ -360,10 +341,10 @@ public class ServerNodeManager implements InitializingBean {
workerGroup = Constants.DEFAULT_WORKER_GROUP; workerGroup = Constants.DEFAULT_WORKER_GROUP;
} }
Set<String> nodes = workerGroupNodes.get(workerGroup); Set<String> nodes = workerGroupNodes.get(workerGroup);
if (CollectionUtils.isNotEmpty(nodes)) { if (CollectionUtils.isEmpty(nodes)) {
return Collections.unmodifiableSet(nodes); return Collections.emptySet();
} }
return nodes; return Collections.unmodifiableSet(nodes);
} finally { } finally {
workerGroupReadLock.unlock(); workerGroupReadLock.unlock();
} }
...@@ -373,45 +354,19 @@ public class ServerNodeManager implements InitializingBean { ...@@ -373,45 +354,19 @@ public class ServerNodeManager implements InitializingBean {
return Collections.unmodifiableMap(workerNodeInfo); return Collections.unmodifiableMap(workerNodeInfo);
} }
/** public Optional<WorkerHeartBeat> getWorkerNodeInfo(String workerServerAddress) {
* get worker node info
*
* @param workerNode worker node
* @return worker node info
*/
public WorkerHeartBeat getWorkerNodeInfo(String workerNode) {
workerNodeInfoReadLock.lock(); workerNodeInfoReadLock.lock();
try { try {
return workerNodeInfo.getOrDefault(workerNode, null); return Optional.ofNullable(workerNodeInfo.getOrDefault(workerServerAddress, null));
} finally { } finally {
workerNodeInfoReadLock.unlock(); workerNodeInfoReadLock.unlock();
} }
} }
/** private void syncSingleWorkerNodeInfo(String workerAddress, WorkerHeartBeat info) {
* sync worker node info
*
* @param newWorkerNodeInfo new worker node info
*/
private void syncAllWorkerNodeInfo(Map<String, String> newWorkerNodeInfo) {
workerNodeInfoWriteLock.lock();
try {
workerNodeInfo.clear();
for (Map.Entry<String, String> entry : newWorkerNodeInfo.entrySet()) {
workerNodeInfo.put(entry.getKey(), JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class));
}
} finally {
workerNodeInfoWriteLock.unlock();
}
}
/**
* sync single worker node info
*/
private void syncSingleWorkerNodeInfo(String node, WorkerHeartBeat info) {
workerNodeInfoWriteLock.lock(); workerNodeInfoWriteLock.lock();
try { try {
workerNodeInfo.put(node, info); workerNodeInfo.put(workerAddress, info);
} finally { } finally {
workerNodeInfoWriteLock.unlock(); workerNodeInfoWriteLock.unlock();
} }
...@@ -434,9 +389,6 @@ public class ServerNodeManager implements InitializingBean { ...@@ -434,9 +389,6 @@ public class ServerNodeManager implements InitializingBean {
} }
} }
/**
* destroy
*/
@PreDestroy @PreDestroy
public void destroy() { public void destroy() {
executorService.shutdownNow(); executorService.shutdownNow();
......
...@@ -110,6 +110,9 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl ...@@ -110,6 +110,9 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
@Autowired @Autowired
private WorkflowEventLooper workflowEventLooper; private WorkflowEventLooper workflowEventLooper;
@Autowired
private ServerNodeManager serverNodeManager;
private String masterAddress; private String masterAddress;
protected MasterSchedulerBootstrap() { protected MasterSchedulerBootstrap() {
...@@ -260,8 +263,8 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl ...@@ -260,8 +263,8 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
private List<Command> findCommands() throws MasterException { private List<Command> findCommands() throws MasterException {
try { try {
long scheduleStartTime = System.currentTimeMillis(); long scheduleStartTime = System.currentTimeMillis();
int thisMasterSlot = ServerNodeManager.getSlot(); int thisMasterSlot = serverNodeManager.getSlot();
int masterCount = ServerNodeManager.getMasterSize(); int masterCount = serverNodeManager.getMasterSize();
if (masterCount <= 0) { if (masterCount <= 0) {
logger.warn("Master count: {} is invalid, the current slot: {}", masterCount, thisMasterSlot); logger.warn("Master count: {} is invalid, the current slot: {}", masterCount, thisMasterSlot);
return Collections.emptyList(); return Collections.emptyList();
...@@ -283,8 +286,8 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl ...@@ -283,8 +286,8 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
} }
private SlotCheckState slotCheck(Command command) { private SlotCheckState slotCheck(Command command) {
int slot = ServerNodeManager.getSlot(); int slot = serverNodeManager.getSlot();
int masterSize = ServerNodeManager.getMasterSize(); int masterSize = serverNodeManager.getMasterSize();
SlotCheckState state; SlotCheckState state;
if (masterSize <= 0) { if (masterSize <= 0) {
state = SlotCheckState.CHANGE; state = SlotCheckState.CHANGE;
......
...@@ -115,6 +115,7 @@ master: ...@@ -115,6 +115,7 @@ master:
strategy: waiting strategy: waiting
# The max waiting time to reconnect to registry if you set the strategy to waiting # The max waiting time to reconnect to registry if you set the strategy to waiting
max-waiting-time: 100s max-waiting-time: 100s
worker-group-refresh-interval: 10s
server: server:
port: 5679 port: 5679
......
...@@ -23,6 +23,8 @@ import org.apache.dolphinscheduler.server.master.dispatch.ExecutionContextTestUt ...@@ -23,6 +23,8 @@ import org.apache.dolphinscheduler.server.master.dispatch.ExecutionContextTestUt
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import java.util.Optional;
import org.assertj.core.util.Strings; import org.assertj.core.util.Strings;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
...@@ -57,7 +59,8 @@ public class RoundRobinHostManagerTest { ...@@ -57,7 +59,8 @@ public class RoundRobinHostManagerTest {
@Test @Test
public void testSelectWithResult() { public void testSelectWithResult() {
Mockito.when(serverNodeManager.getWorkerGroupNodes("default")).thenReturn(Sets.newHashSet("192.168.1.1:22")); Mockito.when(serverNodeManager.getWorkerGroupNodes("default")).thenReturn(Sets.newHashSet("192.168.1.1:22"));
Mockito.when(serverNodeManager.getWorkerNodeInfo("192.168.1.1:22")).thenReturn(new WorkerHeartBeat()); Mockito.when(serverNodeManager.getWorkerNodeInfo("192.168.1.1:22"))
.thenReturn(Optional.of(new WorkerHeartBeat()));
ExecutionContext context = ExecutionContextTestUtils.getExecutionContext(10000); ExecutionContext context = ExecutionContextTestUtils.getExecutionContext(10000);
Host host = roundRobinHostManager.select(context); Host host = roundRobinHostManager.select(context);
Assertions.assertFalse(Strings.isNullOrEmpty(host.getAddress())); Assertions.assertFalse(Strings.isNullOrEmpty(host.getAddress()));
......
...@@ -150,6 +150,7 @@ master: ...@@ -150,6 +150,7 @@ master:
failover-interval: 10m failover-interval: 10m
# kill yarn jon when failover taskInstance, default true # kill yarn jon when failover taskInstance, default true
kill-yarn-job-when-task-failover: true kill-yarn-job-when-task-failover: true
worker-group-refresh-interval: 10s
worker: worker:
# worker listener port # worker listener port
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册