未验证 提交 230adbf6 编写于 作者: W Wenjun Ruan 提交者: GitHub

Fix TaskGroup cannot work and will cause master dead loop (#11254) (#11318)

* Fix TaskGroup cannot work and will cause master dead loop

* Remove acquireTaskGroupAgain in ProcessServiceImpl

(cherry picked from commit acd3d3fa)
上级 a6db4a59
......@@ -38,7 +38,7 @@ public class TaskGroupQueue implements Serializable {
@TableId(value = "id", type = IdType.AUTO)
private int id;
/**
* taskIntanceid
* taskInstanceId
*/
private int taskId;
/**
......@@ -65,7 +65,7 @@ public class TaskGroupQueue implements Serializable {
*/
private int groupId;
/**
* processInstace id
* processInstance id
*/
private int processId;
/**
......
......@@ -72,6 +72,9 @@ public interface TaskGroupMapper extends BaseMapper<TaskGroup> {
*/
TaskGroup queryByName(@Param("userId") int userId, @Param("name") String name);
/**
* Select the groupSize > useSize Count
*/
int selectAvailableCountById(@Param("groupId") int groupId);
int selectCountByIdStatus(@Param("id") int id,@Param("status") int status);
......
......@@ -74,6 +74,9 @@ public interface TaskGroupQueueMapper extends BaseMapper<TaskGroupQueue> {
*/
int updateStatusByTaskId(@Param("taskId") int taskId, @Param("status") int status);
/**
* Query the {@link TaskGroupQueue}, who's priority > the given <code>priority</code>
*/
List<TaskGroupQueue> queryHighPriorityTasks(@Param("groupId") int groupId, @Param("priority") int priority, @Param("status") int status);
TaskGroupQueue queryTheHighestPriorityTasks(@Param("groupId") int groupId, @Param("status") int status,
......
......@@ -63,6 +63,7 @@ public class TaskStateEventHandler implements StateEventHandler {
}
workflowExecuteRunnable.taskFinished(task);
if (task.getTaskGroupId() > 0) {
logger.info("The task instance need to release task Group: {}", task.getTaskGroupId());
workflowExecuteRunnable.releaseTaskGroup(task);
}
return true;
......
......@@ -332,7 +332,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
return true;
}
if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) {
boolean acquireTaskGroup = processService.acquireTaskGroupAgain(taskGroupQueue);
boolean acquireTaskGroup = processService.robTaskGroupResource(taskGroupQueue);
if (acquireTaskGroup) {
TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
......@@ -409,6 +409,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
* @param taskInstance
*/
public void releaseTaskGroup(TaskInstance taskInstance) {
logger.info("Release task group");
if (taskInstance.getTaskGroupId() > 0) {
TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(taskInstance);
if (nextTaskInstance != null) {
......@@ -922,6 +923,23 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
validTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
taskInstanceMap.put(taskInstance.getId(), taskInstance);
activeTaskProcessorMaps.put(taskInstance.getTaskCode(), taskProcessor);
// if we use task group, then need to acquire the task group resource
// if there is no resource the current task instance will not be dispatched
// it will be weakup when other tasks release the resource.
int taskGroupId = taskInstance.getTaskGroupId();
if (taskGroupId > 0) {
boolean acquireTaskGroup = processService.acquireTaskGroup(taskInstance.getId(),
taskInstance.getName(),
taskGroupId,
taskInstance.getProcessInstanceId(),
taskInstance.getTaskGroupPriority());
if (!acquireTaskGroup) {
logger.info("submit task name :{}, but the first time to try to acquire task group failed", taskInstance.getName());
return Optional.of(taskInstance);
}
}
boolean dispatchSuccess = taskProcessor.action(TaskAction.DISPATCH);
if (!dispatchSuccess) {
logger.error("process id:{} name:{} dispatch standby task id:{} name:{} failed!",
......
......@@ -51,23 +51,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
protected boolean submitTask() {
this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval);
if (this.taskInstance == null) {
return false;
}
int taskGroupId = taskInstance.getTaskGroupId();
if (taskGroupId > 0) {
boolean acquireTaskGroup = processService.acquireTaskGroup(taskInstance.getId(),
taskInstance.getName(),
taskGroupId,
taskInstance.getProcessInstanceId(),
taskInstance.getTaskGroupPriority());
if (!acquireTaskGroup) {
logger.info("submit task name :{}, but the first time to try to acquire task group failed", taskInstance.getName());
return true;
}
}
return true;
return this.taskInstance != null;
}
@Override
......
......@@ -271,9 +271,7 @@ public interface ProcessService {
String taskName, int groupId,
int processId, int priority);
boolean robTaskGroupResouce(TaskGroupQueue taskGroupQueue);
boolean acquireTaskGroupAgain(TaskGroupQueue taskGroupQueue);
boolean robTaskGroupResource(TaskGroupQueue taskGroupQueue);
void releaseAllTaskGroup(int processInstanceId);
......
......@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.service.process;
import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
......@@ -31,6 +30,8 @@ import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
import static java.util.stream.Collectors.toSet;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
......@@ -350,7 +351,7 @@ public class ProcessServiceImpl implements ProcessService {
/**
* Save error command, and delete original command. If the given command has already been moved into error command,
* will throw {@link SQLIntegrityConstraintViolationException ).
* will throw {@link java.sql.SQLIntegrityConstraintViolationException ).
*
* @param command command
* @param message message
......@@ -2871,21 +2872,22 @@ public class ProcessServiceImpl implements ProcessService {
* @param taskId task id
*/
@Override
public boolean acquireTaskGroup(int taskId,
String taskName, int groupId,
int processId, int priority) {
public boolean acquireTaskGroup(int taskId, String taskName, int groupId, int processId, int priority) {
TaskGroup taskGroup = taskGroupMapper.selectById(groupId);
if (taskGroup == null) {
// we don't throw exception here, to avoid the task group has been deleted during workflow running
return true;
}
// if task group is not applicable
if (taskGroup.getStatus() == Flag.NO.getCode()) {
return true;
}
// Create a waiting taskGroupQueue, after acquire resource, we can update the status to ACQUIRE_SUCCESS
TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskId);
if (taskGroupQueue == null) {
taskGroupQueue = insertIntoTaskGroupQueue(taskId, taskName, groupId, processId, priority, TaskGroupQueueStatus.WAIT_QUEUE);
} else {
logger.info("The task queue is already exist, taskId: {}", taskId);
if (taskGroupQueue.getStatus() == TaskGroupQueueStatus.ACQUIRE_SUCCESS) {
return true;
}
......@@ -2893,15 +2895,14 @@ public class ProcessServiceImpl implements ProcessService {
taskGroupQueue.setStatus(TaskGroupQueueStatus.WAIT_QUEUE);
this.taskGroupQueueMapper.updateById(taskGroupQueue);
}
//check priority
//check if there already exist higher priority tasks
List<TaskGroupQueue> highPriorityTasks = taskGroupQueueMapper.queryHighPriorityTasks(groupId, priority, TaskGroupQueueStatus.WAIT_QUEUE.getCode());
if (CollectionUtils.isNotEmpty(highPriorityTasks)) {
this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
return false;
}
//try to get taskGroup
int count = taskGroupMapper.selectAvailableCountById(groupId);
if (count == 1 && robTaskGroupResouce(taskGroupQueue)) {
if (count == 1 && robTaskGroupResource(taskGroupQueue)) {
return true;
}
this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
......@@ -2912,10 +2913,11 @@ public class ProcessServiceImpl implements ProcessService {
* try to get the task group resource(when other task release the resource)
*/
@Override
public boolean robTaskGroupResouce(TaskGroupQueue taskGroupQueue) {
public boolean robTaskGroupResource(TaskGroupQueue taskGroupQueue) {
TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupQueue.getGroupId());
int affectedCount = taskGroupMapper.updateTaskGroupResource(taskGroup.getId(), taskGroupQueue.getId(),
TaskGroupQueueStatus.WAIT_QUEUE.getCode());
int affectedCount = taskGroupMapper.updateTaskGroupResource(taskGroup.getId(),
taskGroupQueue.getId(),
TaskGroupQueueStatus.WAIT_QUEUE.getCode());
if (affectedCount > 0) {
taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
this.taskGroupQueueMapper.updateById(taskGroupQueue);
......@@ -2925,11 +2927,6 @@ public class ProcessServiceImpl implements ProcessService {
return false;
}
@Override
public boolean acquireTaskGroupAgain(TaskGroupQueue taskGroupQueue) {
return robTaskGroupResouce(taskGroupQueue);
}
@Override
public void releaseAllTaskGroup(int processInstanceId) {
List<TaskInstance> taskInstances = this.taskInstanceMapper.loadAllInfosNoRelease(processInstanceId, TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode());
......@@ -2946,40 +2943,41 @@ public class ProcessServiceImpl implements ProcessService {
@Override
public TaskInstance releaseTaskGroup(TaskInstance taskInstance) {
TaskGroup taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId());
if (taskGroup == null) {
return null;
}
TaskGroupQueue thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId());
if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) {
return null;
}
TaskGroup taskGroup;
TaskGroupQueue thisTaskGroupQueue;
try {
while (thisTaskGroupQueue.getForceStart() == Flag.NO.getCode() && taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(), taskGroup.getUseSize()
, thisTaskGroupQueue.getId(), TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1) {
do {
taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId());
if (taskGroup == null) {
return null;
}
thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId());
if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) {
return null;
}
taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId());
}
} while (thisTaskGroupQueue.getForceStart() == Flag.NO.getCode()
&& taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(),
taskGroup.getUseSize(),
thisTaskGroupQueue.getId(),
TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1);
} catch (Exception e) {
logger.error("release the task group error", e);
return null;
}
logger.info("updateTask:{}", taskInstance.getName());
changeTaskGroupQueueStatus(taskInstance.getId(), TaskGroupQueueStatus.RELEASE);
TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryTheHighestPriorityTasks(taskGroup.getId(),
TaskGroupQueueStatus.WAIT_QUEUE.getCode(), Flag.NO.getCode(), Flag.NO.getCode());
if (taskGroupQueue == null) {
return null;
}
while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(), Flag.YES.getCode(), taskGroupQueue.getId()) != 1) {
TaskGroupQueue taskGroupQueue;
do {
taskGroupQueue = this.taskGroupQueueMapper.queryTheHighestPriorityTasks(taskGroup.getId(),
TaskGroupQueueStatus.WAIT_QUEUE.getCode(), Flag.NO.getCode(), Flag.NO.getCode());
TaskGroupQueueStatus.WAIT_QUEUE.getCode(),
Flag.NO.getCode(),
Flag.NO.getCode());
if (taskGroupQueue == null) {
return null;
}
}
} while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(),
Flag.YES.getCode(),
taskGroupQueue.getId()) != 1);
return this.taskInstanceMapper.selectById(taskGroupQueue.getTaskId());
}
......@@ -3006,7 +3004,7 @@ public class ProcessServiceImpl implements ProcessService {
* @param groupId group id
* @param processId process id
* @param priority priority
* @return result and msg code
* @return inserted task group queue
*/
@Override
public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册