diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index 76d4ae1525699d504e1705db995c4b8b5a52fdfc..0494b7dd008abdf9510cde828baad70a309ab00a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -21,170 +21,108 @@ import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostSelect import org.apache.dolphinscheduler.server.master.processor.queue.TaskExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.stereotype.Component; +import java.time.Duration; -@Component -@EnableConfigurationProperties -@ConfigurationProperties("master") -public class MasterConfig { +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; +import org.springframework.validation.Errors; +import org.springframework.validation.Validator; +import org.springframework.validation.annotation.Validated; + +import lombok.Data; + +@Data +@Validated +@Configuration +@ConfigurationProperties(prefix = "master") +public class MasterConfig implements Validator { /** * The master RPC server listen port. */ - private int listenPort; + private int listenPort = 5678; /** * The max batch size used to fetch command from database. */ - private int fetchCommandNum; + private int fetchCommandNum = 10; /** * The thread number used to prepare processInstance. This number shouldn't bigger than fetchCommandNum. */ - private int preExecThreads; + private int preExecThreads = 10; /** * todo: We may need to split the process/task into different thread size. * The thread number used to handle processInstance and task event. * Will create two thread poll to execute {@link WorkflowExecuteRunnable} and {@link TaskExecuteRunnable}. */ - private int execThreads; + private int execThreads = 10; /** * The task dispatch thread pool size. */ - private int dispatchTaskNumber; + private int dispatchTaskNumber = 3; /** * Worker select strategy. */ - private HostSelector hostSelector; + private HostSelector hostSelector = HostSelector.LOWER_WEIGHT; /** * Master heart beat task execute interval. */ - private int heartbeatInterval; + private Duration heartbeatInterval = Duration.ofSeconds(10); /** * task submit max retry times. */ - private int taskCommitRetryTimes; + private int taskCommitRetryTimes = 5; /** - * task submit retry interval/ms. + * task submit retry interval. */ - private int taskCommitInterval; + private Duration taskCommitInterval = Duration.ofSeconds(1); /** - * state wheel check interval/ms, if this value is bigger, may increase the delay of task/processInstance. + * state wheel check interval, if this value is bigger, may increase the delay of task/processInstance. */ - private int stateWheelInterval; - private double maxCpuLoadAvg; - private double reservedMemory; - private int failoverInterval; - private boolean killYarnJobWhenTaskFailover; - - public int getListenPort() { - return listenPort; - } - - public void setListenPort(int listenPort) { - this.listenPort = listenPort; - } - - public int getFetchCommandNum() { - return fetchCommandNum; - } - - public void setFetchCommandNum(int fetchCommandNum) { - this.fetchCommandNum = fetchCommandNum; - } - - public int getPreExecThreads() { - return preExecThreads; - } - - public void setPreExecThreads(int preExecThreads) { - this.preExecThreads = preExecThreads; - } - - public int getExecThreads() { - return execThreads; - } - - public void setExecThreads(int execThreads) { - this.execThreads = execThreads; - } - - public int getDispatchTaskNumber() { - return dispatchTaskNumber; - } - - public void setDispatchTaskNumber(int dispatchTaskNumber) { - this.dispatchTaskNumber = dispatchTaskNumber; - } - - public HostSelector getHostSelector() { - return hostSelector; - } - - public void setHostSelector(HostSelector hostSelector) { - this.hostSelector = hostSelector; - } - - public int getHeartbeatInterval() { - return heartbeatInterval; - } - - public void setHeartbeatInterval(int heartbeatInterval) { - this.heartbeatInterval = heartbeatInterval; - } - - public int getTaskCommitRetryTimes() { - return taskCommitRetryTimes; - } - - public void setTaskCommitRetryTimes(int taskCommitRetryTimes) { - this.taskCommitRetryTimes = taskCommitRetryTimes; - } - - public int getTaskCommitInterval() { - return taskCommitInterval; - } - - public void setTaskCommitInterval(int taskCommitInterval) { - this.taskCommitInterval = taskCommitInterval; - } - - public int getStateWheelInterval() { - return stateWheelInterval; - } - - public void setStateWheelInterval(int stateWheelInterval) { - this.stateWheelInterval = stateWheelInterval; - } - - public double getMaxCpuLoadAvg() { - return maxCpuLoadAvg > 0 ? maxCpuLoadAvg : Runtime.getRuntime().availableProcessors() * 2; - } - - public void setMaxCpuLoadAvg(double maxCpuLoadAvg) { - this.maxCpuLoadAvg = maxCpuLoadAvg; - } - - public double getReservedMemory() { - return reservedMemory; - } - - public void setReservedMemory(double reservedMemory) { - this.reservedMemory = reservedMemory; - } - - public int getFailoverInterval() { - return failoverInterval; - } - - public void setFailoverInterval(int failoverInterval) { - this.failoverInterval = failoverInterval; - } - - public boolean isKillYarnJobWhenTaskFailover() { - return killYarnJobWhenTaskFailover; - } - - public void setKillYarnJobWhenTaskFailover(boolean killYarnJobWhenTaskFailover) { - this.killYarnJobWhenTaskFailover = killYarnJobWhenTaskFailover; + private Duration stateWheelInterval = Duration.ofMillis(5); + private double maxCpuLoadAvg = -1; + private double reservedMemory = 0.3; + private Duration failoverInterval = Duration.ofMinutes(10); + private boolean killYarnJobWhenTaskFailover = true; + + @Override + public boolean supports(Class clazz) { + return MasterConfig.class.isAssignableFrom(clazz); + } + + @Override + public void validate(Object target, Errors errors) { + MasterConfig masterConfig = (MasterConfig) target; + if (masterConfig.getListenPort() <= 0) { + errors.rejectValue("listen-port", null, "is invalidated"); + } + if (masterConfig.getFetchCommandNum() <= 0) { + errors.rejectValue("fetch-command-num", null, "should be a positive value"); + } + if (masterConfig.getPreExecThreads() <= 0) { + errors.rejectValue("per-exec-threads", null, "should be a positive value"); + } + if (masterConfig.getExecThreads() <= 0) { + errors.rejectValue("exec-threads", null, "should be a positive value"); + } + if (masterConfig.getDispatchTaskNumber() <= 0) { + errors.rejectValue("dispatch-task-number", null, "should be a positive value"); + } + if (masterConfig.getHeartbeatInterval().toMillis() < 0) { + errors.rejectValue("heartbeat-interval", null, "should be a valid duration"); + } + if (masterConfig.getTaskCommitRetryTimes() <= 0) { + errors.rejectValue("task-commit-retry-times", null, "should be a positive value"); + } + if (masterConfig.getTaskCommitInterval().toMillis() <= 0) { + errors.rejectValue("task-commit-interval", null, "should be a valid duration"); + } + if (masterConfig.getStateWheelInterval().toMillis() <= 0) { + errors.rejectValue("state-wheel-interval", null, "should be a valid duration"); + } + if (masterConfig.getFailoverInterval().toMillis() <= 0) { + errors.rejectValue("failover-interval", null, "should be a valid duration"); + } + if (masterConfig.getMaxCpuLoadAvg() <= 0) { + masterConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2); + } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java index 5746cd1d3137e27c1fa5c2f55321b972c61c0f0b..359fdc745e0018d95331d319fcd9905ea4641b29 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java @@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient; import org.apache.commons.lang3.StringUtils; +import java.time.Duration; import java.util.Collections; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -186,7 +187,7 @@ public class MasterRegistryClient { void registry() { logger.info("Master node : {} registering to registry center", masterAddress); String localNodePath = getCurrentNodePath(); - int masterHeartbeatInterval = masterConfig.getHeartbeatInterval(); + Duration masterHeartbeatInterval = masterConfig.getHeartbeatInterval(); HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime, masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory(), @@ -209,7 +210,7 @@ public class MasterRegistryClient { // delete dead server registryClient.handleDeadServer(Collections.singleton(localNodePath), NodeType.MASTER, Constants.DELETE_OP); - this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0L, masterHeartbeatInterval, TimeUnit.SECONDS); + this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0L, masterHeartbeatInterval.getSeconds(), TimeUnit.SECONDS); logger.info("Master node : {} registered to registry center successfully with heartBeatInterval : {}s", masterAddress, masterHeartbeatInterval); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java index d6f3937f4b1f9b71a2eaa9c90629e692457ba5f9..63f4215f273adc92aab4b02b0256da265f851238 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java @@ -67,7 +67,7 @@ public class FailoverExecuteThread extends BaseDaemonThread { } catch (Exception e) { logger.error("Master failover thread execute error", e); } finally { - ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * masterConfig.getFailoverInterval() * 60); + ThreadUtils.sleep(masterConfig.getFailoverInterval().toMillis()); } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java index 65c7db924d9ec42ee3063025069afcceccaf28e4..8b92696723526e319d85591ac4696545a5935b33 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java @@ -93,7 +93,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread { @Override public void run() { - Duration checkInterval = Duration.ofMillis(masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS); + Duration checkInterval = masterConfig.getStateWheelInterval(); while (Stopper.isRunning()) { try { checkTask4Timeout(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java index 16aa9d9957063508bf7339078cd40fc37ddcb027..9af664e0e3af055edaf28aaa8b0b79e216582590 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java @@ -81,6 +81,7 @@ import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.apache.commons.collections.CollectionUtils; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -114,7 +115,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { protected int maxRetryTimes; - protected int commitInterval; + protected long commitInterval; protected ProcessService processService; @@ -132,7 +133,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { this.taskInstance = taskInstance; this.processInstance = processInstance; this.maxRetryTimes = masterConfig.getTaskCommitRetryTimes(); - this.commitInterval = masterConfig.getTaskCommitInterval(); + this.commitInterval = masterConfig.getTaskCommitInterval().toMillis(); } protected javax.sql.DataSource defaultDataSource = diff --git a/dolphinscheduler-master/src/main/resources/application.yaml b/dolphinscheduler-master/src/main/resources/application.yaml index a908ac307246cd12fb5a9f555421078fab03c758..879c3ea9f403767729ac44937825dc2d18d5cb6d 100644 --- a/dolphinscheduler-master/src/main/resources/application.yaml +++ b/dolphinscheduler-master/src/main/resources/application.yaml @@ -96,19 +96,19 @@ master: dispatch-task-number: 3 # master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight host-selector: lower_weight - # master heartbeat interval, the unit is second - heartbeat-interval: 10 + # master heartbeat interval + heartbeat-interval: 10s # master commit task retry times task-commit-retry-times: 5 - # master commit task interval, the unit is millisecond - task-commit-interval: 1000 + # master commit task interval + task-commit-interval: 1s state-wheel-interval: 5 # master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2 max-cpu-load-avg: -1 # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G reserved-memory: 0.3 # failover interval, the unit is minute - failover-interval: 10 + failover-interval: 10m # kill yarn jon when failover taskInstance, default true kill-yarn-job-when-task-failover: true diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java index f3a4414d3dfea4f7ad6789ca138a0a3920ba090f..602afd987f4268933ea28d6d142a0a848edd78c8 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java @@ -38,6 +38,7 @@ import org.apache.dolphinscheduler.server.master.runner.task.TaskAction; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; +import java.time.Duration; import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -77,7 +78,7 @@ public class BlockingTaskTest { config = new MasterConfig(); Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); config.setTaskCommitRetryTimes(3); - config.setTaskCommitInterval(1000); + config.setTaskCommitInterval(Duration.ofSeconds(1)); // mock process service processService = Mockito.mock(ProcessService.class); @@ -122,7 +123,7 @@ public class BlockingTaskTest { Mockito.when(processService .submitTaskWithRetry(Mockito.any(ProcessInstance.class) , Mockito.any(TaskInstance.class) - , Mockito.any(Integer.class), Mockito.any(Integer.class))) + , Mockito.any(Integer.class), Mockito.any(Long.class))) .thenReturn(taskInstance); return taskInstance; } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java index c4d90718b6b671088860fd54ace5b5c0dce07e45..7d01c44746e5eb9b23b6d6ac46fa0e25bb451b63 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java @@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; +import java.time.Duration; import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -66,7 +67,7 @@ public class ConditionsTaskTest { MasterConfig config = new MasterConfig(); Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); config.setTaskCommitRetryTimes(3); - config.setTaskCommitInterval(1000); + config.setTaskCommitInterval(Duration.ofSeconds(1)); processService = Mockito.mock(ProcessService.class); Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java index 8172fd44a0c9cbd3b911265106867776cf10d473..50b03c86e2b474973bc1453f9146ce8c3c2ea359 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java @@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; +import java.time.Duration; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -85,7 +86,7 @@ public class DependentTaskTest { MasterConfig config = new MasterConfig(); config.setTaskCommitRetryTimes(3); - config.setTaskCommitInterval(1000); + config.setTaskCommitInterval(Duration.ofSeconds(1)); Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); processService = Mockito.mock(ProcessService.class); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java index 200dec0b3d0479e386f216e8f9d80bf1cb2bee56..fa51a2d6bed1dbdfc06d64e519645cf1f645f733 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java @@ -44,6 +44,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import org.springframework.context.ApplicationContext; +import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -69,7 +70,7 @@ public class SubProcessTaskTest { MasterConfig config = new MasterConfig(); Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); config.setTaskCommitRetryTimes(3); - config.setTaskCommitInterval(1000); + config.setTaskCommitInterval(Duration.ofSeconds(1)); PowerMockito.mockStatic(Stopper.class); PowerMockito.when(Stopper.isRunning()).thenReturn(true); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java index 824726bb6f6e7d2f3770ba819613c87fac86ac2b..6f94a22e2f78b6dfba24c8a4a02a0dbb2f056f48 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java @@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -59,7 +60,7 @@ public class SwitchTaskTest { MasterConfig config = new MasterConfig(); Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); config.setTaskCommitRetryTimes(3); - config.setTaskCommitInterval(1000); + config.setTaskCommitInterval(Duration.ofSeconds(1)); processService = Mockito.mock(ProcessService.class); Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 8d61a9a460b8d8fa2046d0cbf4b3bf877fc21820..26afcc44086988c5dffeaa62c72876da4af148d3 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -103,7 +103,7 @@ public interface ProcessService { void setSubProcessParam(ProcessInstance subProcessInstance); - TaskInstance submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance, int commitRetryTimes, int commitInterval); + TaskInstance submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance, int commitRetryTimes, long commitInterval); @Transactional(rollbackFor = Exception.class) TaskInstance submitTask(ProcessInstance processInstance, TaskInstance taskInstance); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index fc436ee4bd7fe30bfc5cf8e9c59ab55eabf8ddc9..495cb8202649828fe9700537a6e1176a79b3ac64 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -1259,7 +1259,7 @@ public class ProcessServiceImpl implements ProcessService { * retry submit task to db */ @Override - public TaskInstance submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance, int commitRetryTimes, int commitInterval) { + public TaskInstance submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance, int commitRetryTimes, long commitInterval) { int retryTimes = 1; TaskInstance task = null; while (retryTimes <= commitRetryTimes) { diff --git a/dolphinscheduler-standalone-server/src/main/resources/application.yaml b/dolphinscheduler-standalone-server/src/main/resources/application.yaml index 94f4e074e2795eb145eec3a59dde6132fd30fd7d..21ec46f21938d50fc08215c20af2c09071a9ac46 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml +++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml @@ -131,19 +131,19 @@ master: dispatch-task-number: 3 # master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight host-selector: lower_weight - # master heartbeat interval, the unit is second - heartbeat-interval: 10 + # master heartbeat interval + heartbeat-interval: 10s # master commit task retry times task-commit-retry-times: 5 - # master commit task interval, the unit is millisecond - task-commit-interval: 1000 + # master commit task interval + task-commit-interval: 1s state-wheel-interval: 5 # master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2 max-cpu-load-avg: -1 # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G reserved-memory: 0.3 - # failover interval, the unit is minute - failover-interval: 10 + # failover interval + failover-interval: 10m # kill yarn jon when failover taskInstance, default true kill-yarn-job-when-task-failover: true @@ -152,8 +152,8 @@ worker: listen-port: 1234 # worker execute thread number to limit task instances in parallel exec-threads: 10 - # worker heartbeat interval, the unit is second - heartbeat-interval: 10 + # worker heartbeat interval + heartbeat-interval: 10s # worker host weight to dispatch tasks, default value 100 host-weight: 100 # tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true. diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index 059aa64ce665daa1fc556c29063db0a3266cf494..a641b28e1b0975e4089ac4cca655f24e2edab16a 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -17,113 +17,53 @@ package org.apache.dolphinscheduler.server.worker.config; +import java.time.Duration; import java.util.Set; import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Configuration; +import org.springframework.validation.Errors; +import org.springframework.validation.Validator; +import org.springframework.validation.annotation.Validated; -@Configuration -@EnableConfigurationProperties -@ConfigurationProperties("worker") -public class WorkerConfig { - private int listenPort; - private int execThreads; - private int heartbeatInterval; - private int hostWeight; - private boolean tenantAutoCreate; - private boolean tenantDistributedUser; - private int maxCpuLoadAvg; - private double reservedMemory; - private Set groups; - private String alertListenHost; - private int alertListenPort; - - public int getListenPort() { - return listenPort; - } - - public void setListenPort(int listenPort) { - this.listenPort = listenPort; - } - - public int getExecThreads() { - return execThreads; - } - - public void setExecThreads(int execThreads) { - this.execThreads = execThreads; - } - - public int getHeartbeatInterval() { - return heartbeatInterval; - } - - public void setHeartbeatInterval(int heartbeatInterval) { - this.heartbeatInterval = heartbeatInterval; - } - - public int getHostWeight() { - return hostWeight; - } - - public void setHostWeight(int hostWeight) { - this.hostWeight = hostWeight; - } - - public boolean isTenantAutoCreate() { - return tenantAutoCreate; - } - - public void setTenantAutoCreate(boolean tenantAutoCreate) { - this.tenantAutoCreate = tenantAutoCreate; - } +import com.google.common.collect.Sets; - public int getMaxCpuLoadAvg() { - return maxCpuLoadAvg > 0 ? maxCpuLoadAvg : Runtime.getRuntime().availableProcessors() * 2; - } - - public void setMaxCpuLoadAvg(int maxCpuLoadAvg) { - this.maxCpuLoadAvg = maxCpuLoadAvg; - } - - public double getReservedMemory() { - return reservedMemory; - } - - public void setReservedMemory(double reservedMemory) { - this.reservedMemory = reservedMemory; - } - - public Set getGroups() { - return groups; - } - - public void setGroups(Set groups) { - this.groups = groups; - } +import lombok.Data; - public String getAlertListenHost() { - return alertListenHost; - } - - public void setAlertListenHost(String alertListenHost) { - this.alertListenHost = alertListenHost; - } - - public int getAlertListenPort() { - return alertListenPort; - } - - public void setAlertListenPort(final int alertListenPort) { - this.alertListenPort = alertListenPort; - } - - public boolean isTenantDistributedUser() { - return tenantDistributedUser; - } +@Data +@Validated +@Configuration +@ConfigurationProperties(prefix = "worker") +public class WorkerConfig implements Validator { + private int listenPort = 1234; + private int execThreads = 10; + private Duration heartbeatInterval = Duration.ofSeconds(10); + private int hostWeight = 100; + private boolean tenantAutoCreate = true; + private boolean tenantDistributedUser = false; + private int maxCpuLoadAvg = -1; + private double reservedMemory = 0.3; + private Set groups = Sets.newHashSet("default"); + private String alertListenHost = "localhost"; + private int alertListenPort = 50052; + + @Override + public boolean supports(Class clazz) { + return WorkerConfig.class.isAssignableFrom(clazz); + } + + @Override + public void validate(Object target, Errors errors) { + WorkerConfig workerConfig = (WorkerConfig) target; + if (workerConfig.getExecThreads() <= 0) { + errors.rejectValue("exec-threads", null, "should be a positive value"); + } + if (workerConfig.getHeartbeatInterval().toMillis() <= 0) { + errors.rejectValue("heartbeat-interval", null, "shoule be a valid duration"); + } + if (workerConfig.getMaxCpuLoadAvg() <= 0) { + workerConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2); + } - public void setTenantDistributedUser(boolean tenantDistributedUser) { - this.tenantDistributedUser = tenantDistributedUser; } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java index 6cf5aa8db8056726e7ba999eb13f449f2cd34022..b94cdaf3172ef329b1ee20f371b4bb350af6a76a 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java @@ -46,6 +46,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; + import com.google.common.base.Strings; import com.google.common.collect.Sets; @@ -97,7 +98,7 @@ public class WorkerRegistryClient { public void registry() { String address = NetUtils.getAddr(workerConfig.getListenPort()); Set workerZkPaths = getWorkerZkPaths(); - int workerHeartbeatInterval = workerConfig.getHeartbeatInterval(); + long workerHeartbeatInterval = workerConfig.getHeartbeatInterval().getSeconds(); HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime, workerConfig.getMaxCpuLoadAvg(), diff --git a/dolphinscheduler-worker/src/main/resources/application.yaml b/dolphinscheduler-worker/src/main/resources/application.yaml index d903e1cd1f8258145b0f001694f54d477013c5a1..ad390ed6af2dfce81d9ca3c5ea6b6b62da280046 100644 --- a/dolphinscheduler-worker/src/main/resources/application.yaml +++ b/dolphinscheduler-worker/src/main/resources/application.yaml @@ -58,8 +58,8 @@ worker: listen-port: 1234 # worker execute thread number to limit task instances in parallel exec-threads: 100 - # worker heartbeat interval, the unit is second - heartbeat-interval: 10 + # worker heartbeat interval + heartbeat-interval: 10s # worker host weight to dispatch tasks, default value 100 host-weight: 100 # tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true. diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java index 56af82d9d86eb8327da3f71bba87386a2d79a520..4bd2161e83916949003008f8d659e9385a903d44 100644 --- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java @@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.service.registry.RegistryClient; +import java.time.Duration; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -86,15 +87,15 @@ public class WorkerRegistryClientTest { @Test public void testRegistry() { workerRegistryClient.initWorkRegistry(); - + given(workerManagerThread.getThreadPoolQueueSize()).willReturn(1); - + given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any(NodeType.class))).willReturn(true); - - given(workerConfig.getHeartbeatInterval()).willReturn(1); - + + given(workerConfig.getHeartbeatInterval()).willReturn(Duration.ofSeconds(1)); + workerRegistryClient.registry(); - + Mockito.verify(registryClient, Mockito.times(1)).handleDeadServer(Mockito.anyCollection(), Mockito.any(NodeType.class), Mockito.anyString()); }