diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java index 6399f12ada51a02b34f767f40ce6fb732810157f..b9959eb96e84e107d8ab892a94f3c43a05b3faf9 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java @@ -38,7 +38,7 @@ public class TaskGroupQueue implements Serializable { @TableId(value = "id", type = IdType.AUTO) private int id; /** - * taskIntanceid + * taskInstanceId */ private int taskId; /** @@ -65,7 +65,7 @@ public class TaskGroupQueue implements Serializable { */ private int groupId; /** - * processInstace id + * processInstance id */ private int processId; /** diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java index d12ad9d0d294896ae6f2b7daa4ae9a25fea229af..bd7ffd6cbaeecf8139cc5c8f9ba66e67ba6c5ade 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java @@ -72,6 +72,9 @@ public interface TaskGroupMapper extends BaseMapper { */ TaskGroup queryByName(@Param("userId") int userId, @Param("name") String name); + /** + * Select the groupSize > useSize Count + */ int selectAvailableCountById(@Param("groupId") int groupId); int selectCountByIdStatus(@Param("id") int id,@Param("status") int status); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java index 5fda409432478dab9488165c22eded3044af1674..38f5ca60166d567049c7c77fee1faadab0b048a0 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java @@ -74,6 +74,9 @@ public interface TaskGroupQueueMapper extends BaseMapper { */ int updateStatusByTaskId(@Param("taskId") int taskId, @Param("status") int status); + /** + * Query the {@link TaskGroupQueue}, who's priority > the given priority + */ List queryHighPriorityTasks(@Param("groupId") int groupId, @Param("priority") int priority, @Param("status") int status); TaskGroupQueue queryTheHighestPriorityTasks(@Param("groupId") int groupId, @Param("status") int status, diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java index e3ad268f973294099862ba536a9119b8a4e7d2b4..dc4cc2721ede2c7882a8336bed5a4720cc5999ae 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java @@ -63,6 +63,7 @@ public class TaskStateEventHandler implements StateEventHandler { } workflowExecuteRunnable.taskFinished(task); if (task.getTaskGroupId() > 0) { + logger.info("The task instance need to release task Group: {}", task.getTaskGroupId()); workflowExecuteRunnable.releaseTaskGroup(task); } return true; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 005a41d26e70c0f10a037a95c8f9b67b715e129f..09e5318ca52985033cfd7c4a2d4373ce95fe54ea 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -332,7 +332,7 @@ public class WorkflowExecuteRunnable implements Callable { return true; } if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) { - boolean acquireTaskGroup = processService.acquireTaskGroupAgain(taskGroupQueue); + boolean acquireTaskGroup = processService.robTaskGroupResource(taskGroupQueue); if (acquireTaskGroup) { TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId()); ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode()); @@ -409,6 +409,7 @@ public class WorkflowExecuteRunnable implements Callable { * @param taskInstance */ public void releaseTaskGroup(TaskInstance taskInstance) { + logger.info("Release task group"); if (taskInstance.getTaskGroupId() > 0) { TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(taskInstance); if (nextTaskInstance != null) { @@ -922,6 +923,23 @@ public class WorkflowExecuteRunnable implements Callable { validTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); taskInstanceMap.put(taskInstance.getId(), taskInstance); activeTaskProcessorMaps.put(taskInstance.getTaskCode(), taskProcessor); + + // if we use task group, then need to acquire the task group resource + // if there is no resource the current task instance will not be dispatched + // it will be weakup when other tasks release the resource. + int taskGroupId = taskInstance.getTaskGroupId(); + if (taskGroupId > 0) { + boolean acquireTaskGroup = processService.acquireTaskGroup(taskInstance.getId(), + taskInstance.getName(), + taskGroupId, + taskInstance.getProcessInstanceId(), + taskInstance.getTaskGroupPriority()); + if (!acquireTaskGroup) { + logger.info("submit task name :{}, but the first time to try to acquire task group failed", taskInstance.getName()); + return Optional.of(taskInstance); + } + } + boolean dispatchSuccess = taskProcessor.action(TaskAction.DISPATCH); if (!dispatchSuccess) { logger.error("process id:{} name:{} dispatch standby task id:{} name:{} failed!", diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java index 0dec26defea11c6bfb222c46b19a0102470b39f3..6e7ab96d667ed68eb349ebdfb18850500279da4d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java @@ -51,23 +51,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor { protected boolean submitTask() { this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval); - if (this.taskInstance == null) { - return false; - } - - int taskGroupId = taskInstance.getTaskGroupId(); - if (taskGroupId > 0) { - boolean acquireTaskGroup = processService.acquireTaskGroup(taskInstance.getId(), - taskInstance.getName(), - taskGroupId, - taskInstance.getProcessInstanceId(), - taskInstance.getTaskGroupPriority()); - if (!acquireTaskGroup) { - logger.info("submit task name :{}, but the first time to try to acquire task group failed", taskInstance.getName()); - return true; - } - } - return true; + return this.taskInstance != null; } @Override diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 47e4732a4fe9937d63a66b34ca19e5fc302f6fbf..0c1db7f6e961dea1de2edf5775ca2524b2f7ac4b 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -271,9 +271,7 @@ public interface ProcessService { String taskName, int groupId, int processId, int priority); - boolean robTaskGroupResouce(TaskGroupQueue taskGroupQueue); - - boolean acquireTaskGroupAgain(TaskGroupQueue taskGroupQueue); + boolean robTaskGroupResource(TaskGroupQueue taskGroupQueue); void releaseAllTaskGroup(int processInstanceId); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 2e1f63fbcb6e9366d6c36a33967e2e323cb4e147..d91f63063394b0c1457b083bc2235a5fe7e5969a 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.service.process; -import static java.util.stream.Collectors.toSet; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS; @@ -31,6 +30,8 @@ import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN; import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID; +import static java.util.stream.Collectors.toSet; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.CommandType; @@ -350,7 +351,7 @@ public class ProcessServiceImpl implements ProcessService { /** * Save error command, and delete original command. If the given command has already been moved into error command, - * will throw {@link SQLIntegrityConstraintViolationException ). + * will throw {@link java.sql.SQLIntegrityConstraintViolationException ). * * @param command command * @param message message @@ -2871,21 +2872,22 @@ public class ProcessServiceImpl implements ProcessService { * @param taskId task id */ @Override - public boolean acquireTaskGroup(int taskId, - String taskName, int groupId, - int processId, int priority) { + public boolean acquireTaskGroup(int taskId, String taskName, int groupId, int processId, int priority) { TaskGroup taskGroup = taskGroupMapper.selectById(groupId); if (taskGroup == null) { + // we don't throw exception here, to avoid the task group has been deleted during workflow running return true; } // if task group is not applicable if (taskGroup.getStatus() == Flag.NO.getCode()) { return true; } + // Create a waiting taskGroupQueue, after acquire resource, we can update the status to ACQUIRE_SUCCESS TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskId); if (taskGroupQueue == null) { taskGroupQueue = insertIntoTaskGroupQueue(taskId, taskName, groupId, processId, priority, TaskGroupQueueStatus.WAIT_QUEUE); } else { + logger.info("The task queue is already exist, taskId: {}", taskId); if (taskGroupQueue.getStatus() == TaskGroupQueueStatus.ACQUIRE_SUCCESS) { return true; } @@ -2893,15 +2895,14 @@ public class ProcessServiceImpl implements ProcessService { taskGroupQueue.setStatus(TaskGroupQueueStatus.WAIT_QUEUE); this.taskGroupQueueMapper.updateById(taskGroupQueue); } - //check priority + //check if there already exist higher priority tasks List highPriorityTasks = taskGroupQueueMapper.queryHighPriorityTasks(groupId, priority, TaskGroupQueueStatus.WAIT_QUEUE.getCode()); if (CollectionUtils.isNotEmpty(highPriorityTasks)) { - this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId()); return false; } //try to get taskGroup int count = taskGroupMapper.selectAvailableCountById(groupId); - if (count == 1 && robTaskGroupResouce(taskGroupQueue)) { + if (count == 1 && robTaskGroupResource(taskGroupQueue)) { return true; } this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId()); @@ -2912,10 +2913,11 @@ public class ProcessServiceImpl implements ProcessService { * try to get the task group resource(when other task release the resource) */ @Override - public boolean robTaskGroupResouce(TaskGroupQueue taskGroupQueue) { + public boolean robTaskGroupResource(TaskGroupQueue taskGroupQueue) { TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupQueue.getGroupId()); - int affectedCount = taskGroupMapper.updateTaskGroupResource(taskGroup.getId(), taskGroupQueue.getId(), - TaskGroupQueueStatus.WAIT_QUEUE.getCode()); + int affectedCount = taskGroupMapper.updateTaskGroupResource(taskGroup.getId(), + taskGroupQueue.getId(), + TaskGroupQueueStatus.WAIT_QUEUE.getCode()); if (affectedCount > 0) { taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS); this.taskGroupQueueMapper.updateById(taskGroupQueue); @@ -2925,11 +2927,6 @@ public class ProcessServiceImpl implements ProcessService { return false; } - @Override - public boolean acquireTaskGroupAgain(TaskGroupQueue taskGroupQueue) { - return robTaskGroupResouce(taskGroupQueue); - } - @Override public void releaseAllTaskGroup(int processInstanceId) { List taskInstances = this.taskInstanceMapper.loadAllInfosNoRelease(processInstanceId, TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()); @@ -2946,40 +2943,41 @@ public class ProcessServiceImpl implements ProcessService { @Override public TaskInstance releaseTaskGroup(TaskInstance taskInstance) { - TaskGroup taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId()); - if (taskGroup == null) { - return null; - } - TaskGroupQueue thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId()); - if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) { - return null; - } + TaskGroup taskGroup; + TaskGroupQueue thisTaskGroupQueue; try { - while (thisTaskGroupQueue.getForceStart() == Flag.NO.getCode() && taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(), taskGroup.getUseSize() - , thisTaskGroupQueue.getId(), TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1) { + do { + taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId()); + if (taskGroup == null) { + return null; + } thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId()); if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) { return null; } - taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId()); - } + } while (thisTaskGroupQueue.getForceStart() == Flag.NO.getCode() + && taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(), + taskGroup.getUseSize(), + thisTaskGroupQueue.getId(), + TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1); } catch (Exception e) { logger.error("release the task group error", e); + return null; } logger.info("updateTask:{}", taskInstance.getName()); changeTaskGroupQueueStatus(taskInstance.getId(), TaskGroupQueueStatus.RELEASE); - TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryTheHighestPriorityTasks(taskGroup.getId(), - TaskGroupQueueStatus.WAIT_QUEUE.getCode(), Flag.NO.getCode(), Flag.NO.getCode()); - if (taskGroupQueue == null) { - return null; - } - while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(), Flag.YES.getCode(), taskGroupQueue.getId()) != 1) { + TaskGroupQueue taskGroupQueue; + do { taskGroupQueue = this.taskGroupQueueMapper.queryTheHighestPriorityTasks(taskGroup.getId(), - TaskGroupQueueStatus.WAIT_QUEUE.getCode(), Flag.NO.getCode(), Flag.NO.getCode()); + TaskGroupQueueStatus.WAIT_QUEUE.getCode(), + Flag.NO.getCode(), + Flag.NO.getCode()); if (taskGroupQueue == null) { return null; } - } + } while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(), + Flag.YES.getCode(), + taskGroupQueue.getId()) != 1); return this.taskInstanceMapper.selectById(taskGroupQueue.getTaskId()); } @@ -3006,7 +3004,7 @@ public class ProcessServiceImpl implements ProcessService { * @param groupId group id * @param processId process id * @param priority priority - * @return result and msg code + * @return inserted task group queue */ @Override public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId,