未验证 提交 acd3d3fa 编写于 作者: W Wenjun Ruan 提交者: GitHub

Fix TaskGroup cannot work and will cause master dead loop (#11254)

* Fix TaskGroup cannot work and will cause master dead loop

* Remove acquireTaskGroupAgain in ProcessServiceImpl
上级 aef2fbf3
...@@ -38,7 +38,7 @@ public class TaskGroupQueue implements Serializable { ...@@ -38,7 +38,7 @@ public class TaskGroupQueue implements Serializable {
@TableId(value = "id", type = IdType.AUTO) @TableId(value = "id", type = IdType.AUTO)
private int id; private int id;
/** /**
* taskIntanceid * taskInstanceId
*/ */
private int taskId; private int taskId;
/** /**
...@@ -65,7 +65,7 @@ public class TaskGroupQueue implements Serializable { ...@@ -65,7 +65,7 @@ public class TaskGroupQueue implements Serializable {
*/ */
private int groupId; private int groupId;
/** /**
* processInstace id * processInstance id
*/ */
private int processId; private int processId;
/** /**
......
...@@ -74,6 +74,9 @@ public interface TaskGroupMapper extends BaseMapper<TaskGroup> { ...@@ -74,6 +74,9 @@ public interface TaskGroupMapper extends BaseMapper<TaskGroup> {
*/ */
TaskGroup queryByName(@Param("userId") int userId, @Param("name") String name); TaskGroup queryByName(@Param("userId") int userId, @Param("name") String name);
/**
* Select the groupSize > useSize Count
*/
int selectAvailableCountById(@Param("groupId") int groupId); int selectAvailableCountById(@Param("groupId") int groupId);
int selectCountByIdStatus(@Param("id") int id,@Param("status") int status); int selectCountByIdStatus(@Param("id") int id,@Param("status") int status);
......
...@@ -74,6 +74,9 @@ public interface TaskGroupQueueMapper extends BaseMapper<TaskGroupQueue> { ...@@ -74,6 +74,9 @@ public interface TaskGroupQueueMapper extends BaseMapper<TaskGroupQueue> {
*/ */
int updateStatusByTaskId(@Param("taskId") int taskId, @Param("status") int status); int updateStatusByTaskId(@Param("taskId") int taskId, @Param("status") int status);
/**
* Query the {@link TaskGroupQueue}, who's priority > the given <code>priority</code>
*/
List<TaskGroupQueue> queryHighPriorityTasks(@Param("groupId") int groupId, @Param("priority") int priority, @Param("status") int status); List<TaskGroupQueue> queryHighPriorityTasks(@Param("groupId") int groupId, @Param("priority") int priority, @Param("status") int status);
TaskGroupQueue queryTheHighestPriorityTasks(@Param("groupId") int groupId, @Param("status") int status, TaskGroupQueue queryTheHighestPriorityTasks(@Param("groupId") int groupId, @Param("status") int status,
......
...@@ -63,6 +63,7 @@ public class TaskStateEventHandler implements StateEventHandler { ...@@ -63,6 +63,7 @@ public class TaskStateEventHandler implements StateEventHandler {
} }
workflowExecuteRunnable.taskFinished(task); workflowExecuteRunnable.taskFinished(task);
if (task.getTaskGroupId() > 0) { if (task.getTaskGroupId() > 0) {
logger.info("The task instance need to release task Group: {}", task.getTaskGroupId());
workflowExecuteRunnable.releaseTaskGroup(task); workflowExecuteRunnable.releaseTaskGroup(task);
} }
return true; return true;
......
...@@ -338,7 +338,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> { ...@@ -338,7 +338,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
return true; return true;
} }
if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) { if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) {
boolean acquireTaskGroup = processService.acquireTaskGroupAgain(taskGroupQueue); boolean acquireTaskGroup = processService.robTaskGroupResource(taskGroupQueue);
if (acquireTaskGroup) { if (acquireTaskGroup) {
TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId()); TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode()); ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
...@@ -415,6 +415,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> { ...@@ -415,6 +415,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
* @param taskInstance * @param taskInstance
*/ */
public void releaseTaskGroup(TaskInstance taskInstance) { public void releaseTaskGroup(TaskInstance taskInstance) {
logger.info("Release task group");
if (taskInstance.getTaskGroupId() > 0) { if (taskInstance.getTaskGroupId() > 0) {
TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(taskInstance); TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(taskInstance);
if (nextTaskInstance != null) { if (nextTaskInstance != null) {
...@@ -929,6 +930,23 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> { ...@@ -929,6 +930,23 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
validTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); validTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
taskInstanceMap.put(taskInstance.getId(), taskInstance); taskInstanceMap.put(taskInstance.getId(), taskInstance);
activeTaskProcessorMaps.put(taskInstance.getTaskCode(), taskProcessor); activeTaskProcessorMaps.put(taskInstance.getTaskCode(), taskProcessor);
// if we use task group, then need to acquire the task group resource
// if there is no resource the current task instance will not be dispatched
// it will be weakup when other tasks release the resource.
int taskGroupId = taskInstance.getTaskGroupId();
if (taskGroupId > 0) {
boolean acquireTaskGroup = processService.acquireTaskGroup(taskInstance.getId(),
taskInstance.getName(),
taskGroupId,
taskInstance.getProcessInstanceId(),
taskInstance.getTaskGroupPriority());
if (!acquireTaskGroup) {
logger.info("submit task name :{}, but the first time to try to acquire task group failed", taskInstance.getName());
return Optional.of(taskInstance);
}
}
boolean dispatchSuccess = taskProcessor.action(TaskAction.DISPATCH); boolean dispatchSuccess = taskProcessor.action(TaskAction.DISPATCH);
if (!dispatchSuccess) { if (!dispatchSuccess) {
logger.error("process id:{} name:{} dispatch standby task id:{} name:{} failed!", logger.error("process id:{} name:{} dispatch standby task id:{} name:{} failed!",
......
...@@ -51,23 +51,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor { ...@@ -51,23 +51,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
protected boolean submitTask() { protected boolean submitTask() {
this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval); this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval);
if (this.taskInstance == null) { return this.taskInstance != null;
return false;
}
int taskGroupId = taskInstance.getTaskGroupId();
if (taskGroupId > 0) {
boolean acquireTaskGroup = processService.acquireTaskGroup(taskInstance.getId(),
taskInstance.getName(),
taskGroupId,
taskInstance.getProcessInstanceId(),
taskInstance.getTaskGroupPriority());
if (!acquireTaskGroup) {
logger.info("submit task name :{}, but the first time to try to acquire task group failed", taskInstance.getName());
return true;
}
}
return true;
} }
@Override @Override
......
...@@ -273,9 +273,7 @@ public interface ProcessService { ...@@ -273,9 +273,7 @@ public interface ProcessService {
String taskName, int groupId, String taskName, int groupId,
int processId, int priority); int processId, int priority);
boolean robTaskGroupResouce(TaskGroupQueue taskGroupQueue); boolean robTaskGroupResource(TaskGroupQueue taskGroupQueue);
boolean acquireTaskGroupAgain(TaskGroupQueue taskGroupQueue);
void releaseAllTaskGroup(int processInstanceId); void releaseAllTaskGroup(int processInstanceId);
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.service.process; package org.apache.dolphinscheduler.service.process;
import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
...@@ -32,6 +31,8 @@ import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR ...@@ -32,6 +31,8 @@ import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN; import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID; import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
import static java.util.stream.Collectors.toSet;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
...@@ -370,7 +371,7 @@ public class ProcessServiceImpl implements ProcessService { ...@@ -370,7 +371,7 @@ public class ProcessServiceImpl implements ProcessService {
/** /**
* Save error command, and delete original command. If the given command has already been moved into error command, * Save error command, and delete original command. If the given command has already been moved into error command,
* will throw {@link SQLIntegrityConstraintViolationException ). * will throw {@link java.sql.SQLIntegrityConstraintViolationException ).
* *
* @param command command * @param command command
* @param message message * @param message message
...@@ -2909,21 +2910,22 @@ public class ProcessServiceImpl implements ProcessService { ...@@ -2909,21 +2910,22 @@ public class ProcessServiceImpl implements ProcessService {
* @param taskId task id * @param taskId task id
*/ */
@Override @Override
public boolean acquireTaskGroup(int taskId, public boolean acquireTaskGroup(int taskId, String taskName, int groupId, int processId, int priority) {
String taskName, int groupId,
int processId, int priority) {
TaskGroup taskGroup = taskGroupMapper.selectById(groupId); TaskGroup taskGroup = taskGroupMapper.selectById(groupId);
if (taskGroup == null) { if (taskGroup == null) {
// we don't throw exception here, to avoid the task group has been deleted during workflow running
return true; return true;
} }
// if task group is not applicable // if task group is not applicable
if (taskGroup.getStatus() == Flag.NO.getCode()) { if (taskGroup.getStatus() == Flag.NO.getCode()) {
return true; return true;
} }
// Create a waiting taskGroupQueue, after acquire resource, we can update the status to ACQUIRE_SUCCESS
TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskId); TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskId);
if (taskGroupQueue == null) { if (taskGroupQueue == null) {
taskGroupQueue = insertIntoTaskGroupQueue(taskId, taskName, groupId, processId, priority, TaskGroupQueueStatus.WAIT_QUEUE); taskGroupQueue = insertIntoTaskGroupQueue(taskId, taskName, groupId, processId, priority, TaskGroupQueueStatus.WAIT_QUEUE);
} else { } else {
logger.info("The task queue is already exist, taskId: {}", taskId);
if (taskGroupQueue.getStatus() == TaskGroupQueueStatus.ACQUIRE_SUCCESS) { if (taskGroupQueue.getStatus() == TaskGroupQueueStatus.ACQUIRE_SUCCESS) {
return true; return true;
} }
...@@ -2931,15 +2933,14 @@ public class ProcessServiceImpl implements ProcessService { ...@@ -2931,15 +2933,14 @@ public class ProcessServiceImpl implements ProcessService {
taskGroupQueue.setStatus(TaskGroupQueueStatus.WAIT_QUEUE); taskGroupQueue.setStatus(TaskGroupQueueStatus.WAIT_QUEUE);
this.taskGroupQueueMapper.updateById(taskGroupQueue); this.taskGroupQueueMapper.updateById(taskGroupQueue);
} }
//check priority //check if there already exist higher priority tasks
List<TaskGroupQueue> highPriorityTasks = taskGroupQueueMapper.queryHighPriorityTasks(groupId, priority, TaskGroupQueueStatus.WAIT_QUEUE.getCode()); List<TaskGroupQueue> highPriorityTasks = taskGroupQueueMapper.queryHighPriorityTasks(groupId, priority, TaskGroupQueueStatus.WAIT_QUEUE.getCode());
if (CollectionUtils.isNotEmpty(highPriorityTasks)) { if (CollectionUtils.isNotEmpty(highPriorityTasks)) {
this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
return false; return false;
} }
//try to get taskGroup //try to get taskGroup
int count = taskGroupMapper.selectAvailableCountById(groupId); int count = taskGroupMapper.selectAvailableCountById(groupId);
if (count == 1 && robTaskGroupResouce(taskGroupQueue)) { if (count == 1 && robTaskGroupResource(taskGroupQueue)) {
return true; return true;
} }
this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId()); this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
...@@ -2950,10 +2951,11 @@ public class ProcessServiceImpl implements ProcessService { ...@@ -2950,10 +2951,11 @@ public class ProcessServiceImpl implements ProcessService {
* try to get the task group resource(when other task release the resource) * try to get the task group resource(when other task release the resource)
*/ */
@Override @Override
public boolean robTaskGroupResouce(TaskGroupQueue taskGroupQueue) { public boolean robTaskGroupResource(TaskGroupQueue taskGroupQueue) {
TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupQueue.getGroupId()); TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupQueue.getGroupId());
int affectedCount = taskGroupMapper.updateTaskGroupResource(taskGroup.getId(), taskGroupQueue.getId(), int affectedCount = taskGroupMapper.updateTaskGroupResource(taskGroup.getId(),
TaskGroupQueueStatus.WAIT_QUEUE.getCode()); taskGroupQueue.getId(),
TaskGroupQueueStatus.WAIT_QUEUE.getCode());
if (affectedCount > 0) { if (affectedCount > 0) {
taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS); taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
this.taskGroupQueueMapper.updateById(taskGroupQueue); this.taskGroupQueueMapper.updateById(taskGroupQueue);
...@@ -2963,11 +2965,6 @@ public class ProcessServiceImpl implements ProcessService { ...@@ -2963,11 +2965,6 @@ public class ProcessServiceImpl implements ProcessService {
return false; return false;
} }
@Override
public boolean acquireTaskGroupAgain(TaskGroupQueue taskGroupQueue) {
return robTaskGroupResouce(taskGroupQueue);
}
@Override @Override
public void releaseAllTaskGroup(int processInstanceId) { public void releaseAllTaskGroup(int processInstanceId) {
List<TaskInstance> taskInstances = this.taskInstanceMapper.loadAllInfosNoRelease(processInstanceId, TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()); List<TaskInstance> taskInstances = this.taskInstanceMapper.loadAllInfosNoRelease(processInstanceId, TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode());
...@@ -2984,40 +2981,41 @@ public class ProcessServiceImpl implements ProcessService { ...@@ -2984,40 +2981,41 @@ public class ProcessServiceImpl implements ProcessService {
@Override @Override
public TaskInstance releaseTaskGroup(TaskInstance taskInstance) { public TaskInstance releaseTaskGroup(TaskInstance taskInstance) {
TaskGroup taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId()); TaskGroup taskGroup;
if (taskGroup == null) { TaskGroupQueue thisTaskGroupQueue;
return null;
}
TaskGroupQueue thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId());
if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) {
return null;
}
try { try {
while (thisTaskGroupQueue.getForceStart() == Flag.NO.getCode() && taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(), taskGroup.getUseSize() do {
, thisTaskGroupQueue.getId(), TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1) { taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId());
if (taskGroup == null) {
return null;
}
thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId()); thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId());
if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) { if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) {
return null; return null;
} }
taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId()); } while (thisTaskGroupQueue.getForceStart() == Flag.NO.getCode()
} && taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(),
taskGroup.getUseSize(),
thisTaskGroupQueue.getId(),
TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1);
} catch (Exception e) { } catch (Exception e) {
logger.error("release the task group error", e); logger.error("release the task group error", e);
return null;
} }
logger.info("updateTask:{}", taskInstance.getName()); logger.info("updateTask:{}", taskInstance.getName());
changeTaskGroupQueueStatus(taskInstance.getId(), TaskGroupQueueStatus.RELEASE); changeTaskGroupQueueStatus(taskInstance.getId(), TaskGroupQueueStatus.RELEASE);
TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryTheHighestPriorityTasks(taskGroup.getId(), TaskGroupQueue taskGroupQueue;
TaskGroupQueueStatus.WAIT_QUEUE.getCode(), Flag.NO.getCode(), Flag.NO.getCode()); do {
if (taskGroupQueue == null) {
return null;
}
while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(), Flag.YES.getCode(), taskGroupQueue.getId()) != 1) {
taskGroupQueue = this.taskGroupQueueMapper.queryTheHighestPriorityTasks(taskGroup.getId(), taskGroupQueue = this.taskGroupQueueMapper.queryTheHighestPriorityTasks(taskGroup.getId(),
TaskGroupQueueStatus.WAIT_QUEUE.getCode(), Flag.NO.getCode(), Flag.NO.getCode()); TaskGroupQueueStatus.WAIT_QUEUE.getCode(),
Flag.NO.getCode(),
Flag.NO.getCode());
if (taskGroupQueue == null) { if (taskGroupQueue == null) {
return null; return null;
} }
} } while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(),
Flag.YES.getCode(),
taskGroupQueue.getId()) != 1);
return this.taskInstanceMapper.selectById(taskGroupQueue.getTaskId()); return this.taskInstanceMapper.selectById(taskGroupQueue.getTaskId());
} }
...@@ -3044,7 +3042,7 @@ public class ProcessServiceImpl implements ProcessService { ...@@ -3044,7 +3042,7 @@ public class ProcessServiceImpl implements ProcessService {
* @param groupId group id * @param groupId group id
* @param processId process id * @param processId process id
* @param priority priority * @param priority priority
* @return result and msg code * @return inserted task group queue
*/ */
@Override @Override
public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId, public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册