diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index 9880b2cc8d540b0d632bcf5b03ef4c7cb3dcb137..d4a995820365a7884b6f702c6851e3d51a92d15c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -31,7 +31,6 @@ public class MasterConfig { private int fetchCommandNum; private int preExecThreads; private int execThreads; - private int execTaskNum; private int dispatchTaskNumber; private HostSelector hostSelector; private int heartbeatInterval; @@ -74,14 +73,6 @@ public class MasterConfig { this.execThreads = execThreads; } - public int getExecTaskNum() { - return execTaskNum; - } - - public void setExecTaskNum(int execTaskNum) { - this.execTaskNum = execTaskNum; - } - public int getDispatchTaskNumber() { return dispatchTaskNumber; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 290164b868aba0d8c8546b21ef81cf578bd6d305..c6562bc8d78df8e855ca2d2a92d8269913e4ee87 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -19,12 +19,14 @@ package org.apache.dolphinscheduler.server.master.consumer; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; +import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.TaskPriority; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; @@ -33,6 +35,8 @@ import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext; import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; @@ -78,30 +82,24 @@ public class TaskPriorityQueueConsumer extends Thread { @Autowired private MasterConfig masterConfig; + /** + * consumer thread pool + */ + private ThreadPoolExecutor consumerThreadPoolExecutor; + @PostConstruct public void init() { - super.setName("TaskUpdateQueueConsumerThread"); + this.consumerThreadPoolExecutor = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("TaskUpdateQueueConsumerThread", masterConfig.getDispatchTaskNumber()); super.start(); } @Override public void run() { - List failedDispatchTasks = new ArrayList<>(); + int fetchTaskNum = masterConfig.getDispatchTaskNumber(); while (Stopper.isRunning()) { try { - int fetchTaskNum = masterConfig.getDispatchTaskNumber(); - failedDispatchTasks.clear(); - for (int i = 0; i < fetchTaskNum; i++) { - TaskPriority taskPriority = taskPriorityQueue.poll(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS); - if (Objects.isNull(taskPriority)) { - continue; - } + List failedDispatchTasks = this.batchDispatch(fetchTaskNum); - boolean dispatchResult = dispatch(taskPriority); - if (!dispatchResult) { - failedDispatchTasks.add(taskPriority); - } - } if (!failedDispatchTasks.isEmpty()) { for (TaskPriority dispatchFailedTask : failedDispatchTasks) { taskPriorityQueue.put(dispatchFailedTask); @@ -118,13 +116,41 @@ public class TaskPriorityQueueConsumer extends Thread { } } + /** + * batch dispatch with thread pool + */ + private List batchDispatch(int fetchTaskNum) throws TaskPriorityQueueException, InterruptedException { + List failedDispatchTasks = new ArrayList<>(); + CountDownLatch latch = new CountDownLatch(fetchTaskNum); + + for (int i = 0; i < fetchTaskNum; i++) { + TaskPriority taskPriority = taskPriorityQueue.poll(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS); + if (Objects.isNull(taskPriority)) { + latch.countDown(); + continue; + } + + consumerThreadPoolExecutor.submit(() -> { + boolean dispatchResult = this.dispatchTask(taskPriority); + if (!dispatchResult) { + failedDispatchTasks.add(taskPriority); + } + latch.countDown(); + }); + } + + latch.await(); + + return failedDispatchTasks; + } + /** * dispatch task * * @param taskPriority taskPriority * @return result */ - protected boolean dispatch(TaskPriority taskPriority) { + protected boolean dispatchTask(TaskPriority taskPriority) { boolean result = false; try { TaskExecutionContext context = taskPriority.getTaskExecutionContext(); @@ -158,8 +184,6 @@ public class TaskPriorityQueueConsumer extends Thread { /** * check if task need to check state, if true, refresh the checkpoint - * @param taskPriority - * @return */ private boolean isTaskNeedToCheck(TaskPriority taskPriority) { long now = System.currentTimeMillis(); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java index 87bc0428fceef875ddd37bbf02e252acf94cf558..ed5a1d1f89d79fcb6d80aeefd451a39976ed0149 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java @@ -88,7 +88,6 @@ public class WorkflowExecuteThreadTest { applicationContext = mock(ApplicationContext.class); config = new MasterConfig(); - config.setExecTaskNum(1); Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); processInstance = mock(ProcessInstance.class); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java index 99b6f797bb5a507f4cb6ac9f83128e1e22f6f8b8..b17dfc61f486e252ba9f935b41bba37cf1467e08 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java @@ -307,7 +307,7 @@ public class TaskPriorityQueueConsumerTest { TaskPriority taskPriority = new TaskPriority(); taskPriority.setTaskId(1); - boolean res = taskPriorityQueueConsumer.dispatch(taskPriority); + boolean res = taskPriorityQueueConsumer.dispatchTask(taskPriority); Assert.assertFalse(res); }