From acd3d3fab14d08b3b06015be3ac9cfb2f4bfc934 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Thu, 4 Aug 2022 14:43:45 +0800 Subject: [PATCH] Fix TaskGroup cannot work and will cause master dead loop (#11254) * Fix TaskGroup cannot work and will cause master dead loop * Remove acquireTaskGroupAgain in ProcessServiceImpl --- .../dao/entity/TaskGroupQueue.java | 4 +- .../dao/mapper/TaskGroupMapper.java | 3 + .../dao/mapper/TaskGroupQueueMapper.java | 3 + .../master/event/TaskStateEventHandler.java | 1 + .../runner/WorkflowExecuteRunnable.java | 20 +++++- .../runner/task/CommonTaskProcessor.java | 18 +---- .../service/process/ProcessService.java | 4 +- .../service/process/ProcessServiceImpl.java | 72 +++++++++---------- 8 files changed, 65 insertions(+), 60 deletions(-) diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java index 6399f12ad..b9959eb96 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java @@ -38,7 +38,7 @@ public class TaskGroupQueue implements Serializable { @TableId(value = "id", type = IdType.AUTO) private int id; /** - * taskIntanceid + * taskInstanceId */ private int taskId; /** @@ -65,7 +65,7 @@ public class TaskGroupQueue implements Serializable { */ private int groupId; /** - * processInstace id + * processInstance id */ private int processId; /** diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java index a2c9359e1..05c99194c 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java @@ -74,6 +74,9 @@ public interface TaskGroupMapper extends BaseMapper { */ TaskGroup queryByName(@Param("userId") int userId, @Param("name") String name); + /** + * Select the groupSize > useSize Count + */ int selectAvailableCountById(@Param("groupId") int groupId); int selectCountByIdStatus(@Param("id") int id,@Param("status") int status); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java index 5fda40943..38f5ca601 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java @@ -74,6 +74,9 @@ public interface TaskGroupQueueMapper extends BaseMapper { */ int updateStatusByTaskId(@Param("taskId") int taskId, @Param("status") int status); + /** + * Query the {@link TaskGroupQueue}, who's priority > the given priority + */ List queryHighPriorityTasks(@Param("groupId") int groupId, @Param("priority") int priority, @Param("status") int status); TaskGroupQueue queryTheHighestPriorityTasks(@Param("groupId") int groupId, @Param("status") int status, diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java index c0cf864d3..9854f9560 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java @@ -63,6 +63,7 @@ public class TaskStateEventHandler implements StateEventHandler { } workflowExecuteRunnable.taskFinished(task); if (task.getTaskGroupId() > 0) { + logger.info("The task instance need to release task Group: {}", task.getTaskGroupId()); workflowExecuteRunnable.releaseTaskGroup(task); } return true; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 7f98e0745..22e79daa6 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -338,7 +338,7 @@ public class WorkflowExecuteRunnable implements Callable { return true; } if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) { - boolean acquireTaskGroup = processService.acquireTaskGroupAgain(taskGroupQueue); + boolean acquireTaskGroup = processService.robTaskGroupResource(taskGroupQueue); if (acquireTaskGroup) { TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId()); ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode()); @@ -415,6 +415,7 @@ public class WorkflowExecuteRunnable implements Callable { * @param taskInstance */ public void releaseTaskGroup(TaskInstance taskInstance) { + logger.info("Release task group"); if (taskInstance.getTaskGroupId() > 0) { TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(taskInstance); if (nextTaskInstance != null) { @@ -929,6 +930,23 @@ public class WorkflowExecuteRunnable implements Callable { validTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); taskInstanceMap.put(taskInstance.getId(), taskInstance); activeTaskProcessorMaps.put(taskInstance.getTaskCode(), taskProcessor); + + // if we use task group, then need to acquire the task group resource + // if there is no resource the current task instance will not be dispatched + // it will be weakup when other tasks release the resource. + int taskGroupId = taskInstance.getTaskGroupId(); + if (taskGroupId > 0) { + boolean acquireTaskGroup = processService.acquireTaskGroup(taskInstance.getId(), + taskInstance.getName(), + taskGroupId, + taskInstance.getProcessInstanceId(), + taskInstance.getTaskGroupPriority()); + if (!acquireTaskGroup) { + logger.info("submit task name :{}, but the first time to try to acquire task group failed", taskInstance.getName()); + return Optional.of(taskInstance); + } + } + boolean dispatchSuccess = taskProcessor.action(TaskAction.DISPATCH); if (!dispatchSuccess) { logger.error("process id:{} name:{} dispatch standby task id:{} name:{} failed!", diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java index 06a72d1c8..f8cf041ae 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java @@ -51,23 +51,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor { protected boolean submitTask() { this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval); - if (this.taskInstance == null) { - return false; - } - - int taskGroupId = taskInstance.getTaskGroupId(); - if (taskGroupId > 0) { - boolean acquireTaskGroup = processService.acquireTaskGroup(taskInstance.getId(), - taskInstance.getName(), - taskGroupId, - taskInstance.getProcessInstanceId(), - taskInstance.getTaskGroupPriority()); - if (!acquireTaskGroup) { - logger.info("submit task name :{}, but the first time to try to acquire task group failed", taskInstance.getName()); - return true; - } - } - return true; + return this.taskInstance != null; } @Override diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 976fd9098..6c1cf5f96 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -273,9 +273,7 @@ public interface ProcessService { String taskName, int groupId, int processId, int priority); - boolean robTaskGroupResouce(TaskGroupQueue taskGroupQueue); - - boolean acquireTaskGroupAgain(TaskGroupQueue taskGroupQueue); + boolean robTaskGroupResource(TaskGroupQueue taskGroupQueue); void releaseAllTaskGroup(int processInstanceId); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 7dc4ce7be..1d8628a8c 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.service.process; -import static java.util.stream.Collectors.toSet; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; @@ -32,6 +31,8 @@ import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN; import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID; +import static java.util.stream.Collectors.toSet; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.CommandType; @@ -370,7 +371,7 @@ public class ProcessServiceImpl implements ProcessService { /** * Save error command, and delete original command. If the given command has already been moved into error command, - * will throw {@link SQLIntegrityConstraintViolationException ). + * will throw {@link java.sql.SQLIntegrityConstraintViolationException ). * * @param command command * @param message message @@ -2909,21 +2910,22 @@ public class ProcessServiceImpl implements ProcessService { * @param taskId task id */ @Override - public boolean acquireTaskGroup(int taskId, - String taskName, int groupId, - int processId, int priority) { + public boolean acquireTaskGroup(int taskId, String taskName, int groupId, int processId, int priority) { TaskGroup taskGroup = taskGroupMapper.selectById(groupId); if (taskGroup == null) { + // we don't throw exception here, to avoid the task group has been deleted during workflow running return true; } // if task group is not applicable if (taskGroup.getStatus() == Flag.NO.getCode()) { return true; } + // Create a waiting taskGroupQueue, after acquire resource, we can update the status to ACQUIRE_SUCCESS TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskId); if (taskGroupQueue == null) { taskGroupQueue = insertIntoTaskGroupQueue(taskId, taskName, groupId, processId, priority, TaskGroupQueueStatus.WAIT_QUEUE); } else { + logger.info("The task queue is already exist, taskId: {}", taskId); if (taskGroupQueue.getStatus() == TaskGroupQueueStatus.ACQUIRE_SUCCESS) { return true; } @@ -2931,15 +2933,14 @@ public class ProcessServiceImpl implements ProcessService { taskGroupQueue.setStatus(TaskGroupQueueStatus.WAIT_QUEUE); this.taskGroupQueueMapper.updateById(taskGroupQueue); } - //check priority + //check if there already exist higher priority tasks List highPriorityTasks = taskGroupQueueMapper.queryHighPriorityTasks(groupId, priority, TaskGroupQueueStatus.WAIT_QUEUE.getCode()); if (CollectionUtils.isNotEmpty(highPriorityTasks)) { - this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId()); return false; } //try to get taskGroup int count = taskGroupMapper.selectAvailableCountById(groupId); - if (count == 1 && robTaskGroupResouce(taskGroupQueue)) { + if (count == 1 && robTaskGroupResource(taskGroupQueue)) { return true; } this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId()); @@ -2950,10 +2951,11 @@ public class ProcessServiceImpl implements ProcessService { * try to get the task group resource(when other task release the resource) */ @Override - public boolean robTaskGroupResouce(TaskGroupQueue taskGroupQueue) { + public boolean robTaskGroupResource(TaskGroupQueue taskGroupQueue) { TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupQueue.getGroupId()); - int affectedCount = taskGroupMapper.updateTaskGroupResource(taskGroup.getId(), taskGroupQueue.getId(), - TaskGroupQueueStatus.WAIT_QUEUE.getCode()); + int affectedCount = taskGroupMapper.updateTaskGroupResource(taskGroup.getId(), + taskGroupQueue.getId(), + TaskGroupQueueStatus.WAIT_QUEUE.getCode()); if (affectedCount > 0) { taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS); this.taskGroupQueueMapper.updateById(taskGroupQueue); @@ -2963,11 +2965,6 @@ public class ProcessServiceImpl implements ProcessService { return false; } - @Override - public boolean acquireTaskGroupAgain(TaskGroupQueue taskGroupQueue) { - return robTaskGroupResouce(taskGroupQueue); - } - @Override public void releaseAllTaskGroup(int processInstanceId) { List taskInstances = this.taskInstanceMapper.loadAllInfosNoRelease(processInstanceId, TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()); @@ -2984,40 +2981,41 @@ public class ProcessServiceImpl implements ProcessService { @Override public TaskInstance releaseTaskGroup(TaskInstance taskInstance) { - TaskGroup taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId()); - if (taskGroup == null) { - return null; - } - TaskGroupQueue thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId()); - if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) { - return null; - } + TaskGroup taskGroup; + TaskGroupQueue thisTaskGroupQueue; try { - while (thisTaskGroupQueue.getForceStart() == Flag.NO.getCode() && taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(), taskGroup.getUseSize() - , thisTaskGroupQueue.getId(), TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1) { + do { + taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId()); + if (taskGroup == null) { + return null; + } thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId()); if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) { return null; } - taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId()); - } + } while (thisTaskGroupQueue.getForceStart() == Flag.NO.getCode() + && taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(), + taskGroup.getUseSize(), + thisTaskGroupQueue.getId(), + TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1); } catch (Exception e) { logger.error("release the task group error", e); + return null; } logger.info("updateTask:{}", taskInstance.getName()); changeTaskGroupQueueStatus(taskInstance.getId(), TaskGroupQueueStatus.RELEASE); - TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryTheHighestPriorityTasks(taskGroup.getId(), - TaskGroupQueueStatus.WAIT_QUEUE.getCode(), Flag.NO.getCode(), Flag.NO.getCode()); - if (taskGroupQueue == null) { - return null; - } - while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(), Flag.YES.getCode(), taskGroupQueue.getId()) != 1) { + TaskGroupQueue taskGroupQueue; + do { taskGroupQueue = this.taskGroupQueueMapper.queryTheHighestPriorityTasks(taskGroup.getId(), - TaskGroupQueueStatus.WAIT_QUEUE.getCode(), Flag.NO.getCode(), Flag.NO.getCode()); + TaskGroupQueueStatus.WAIT_QUEUE.getCode(), + Flag.NO.getCode(), + Flag.NO.getCode()); if (taskGroupQueue == null) { return null; } - } + } while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(), + Flag.YES.getCode(), + taskGroupQueue.getId()) != 1); return this.taskInstanceMapper.selectById(taskGroupQueue.getTaskId()); } @@ -3044,7 +3042,7 @@ public class ProcessServiceImpl implements ProcessService { * @param groupId group id * @param processId process id * @param priority priority - * @return result and msg code + * @return inserted task group queue */ @Override public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId, -- GitLab