From 68a73b04a26bf68419307a826a5043ba1524d534 Mon Sep 17 00:00:00 2001 From: JinYong Li <42576980+jinyleechina@users.noreply.github.com> Date: Sat, 30 Jul 2022 23:28:31 +0800 Subject: [PATCH] [Fix-11007] [Master] fix forced_success bug (#11088) * fix forced_success bug * add comments * add transactional * refactor code Co-authored-by: JinyLeeChina (cherry picked from commit e5cca0e79bfe16d07e931bcc68c279643ad45fab) --- .../service/impl/TaskInstanceServiceImpl.java | 3 ++ .../service/process/ProcessService.java | 2 + .../service/process/ProcessServiceImpl.java | 38 +++++++++++++++++-- 3 files changed, 40 insertions(+), 3 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java index cfcd95d88..45edd9730 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java @@ -46,6 +46,7 @@ import java.util.stream.Collectors; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -163,6 +164,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst * @param taskInstanceId task instance id * @return the result code and msg */ + @Transactional @Override public Map forceTaskSuccess(User loginUser, long projectCode, Integer taskInstanceId) { Project project = projectMapper.queryByCode(projectCode); @@ -195,6 +197,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst task.setState(ExecutionStatus.FORCED_SUCCESS); int changedNum = taskInstanceMapper.updateById(task); if (changedNum > 0) { + processService.forceProcessInstanceSuccessByTaskInstanceId(taskInstanceId); putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index f61676e69..86f6eb016 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -295,4 +295,6 @@ public interface ProcessService { org.apache.dolphinscheduler.remote.command.CommandType taskType); ProcessInstance loadNextProcess4Serial(long code, int state, int id); + + void forceProcessInstanceSuccessByTaskInstanceId(Integer taskInstanceId); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 2f0e1c077..14c998517 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -1308,10 +1308,10 @@ public class ProcessServiceImpl implements ProcessService { * * @param parentInstance parentInstance * @param parentTask parentTask + * @param processMap processMap * @return process instance map */ - private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask) { - ProcessInstanceMap processMap = findWorkProcessMapByParent(parentInstance.getId(), parentTask.getId()); + private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask, ProcessInstanceMap processMap) { if (processMap != null) { return processMap; } @@ -1375,11 +1375,16 @@ public class ProcessServiceImpl implements ProcessService { // recover failover tolerance would not create a new command when the sub command already have been created return; } - instanceMap = setProcessInstanceMap(parentProcessInstance, task); + instanceMap = setProcessInstanceMap(parentProcessInstance, task, instanceMap); ProcessInstance childInstance = null; if (instanceMap.getProcessInstanceId() != 0) { childInstance = findProcessInstanceById(instanceMap.getProcessInstanceId()); } + if (childInstance != null && childInstance.getState() == ExecutionStatus.SUCCESS + && CommandType.START_FAILURE_TASK_PROCESS == parentProcessInstance.getCommandType()) { + logger.info("sub process instance {} status is success, so skip creating command", childInstance.getId()); + return; + } Command subProcessCommand = createSubProcessCommand(parentProcessInstance, childInstance, instanceMap, task); updateSubProcessDefinitionByParent(parentProcessInstance, subProcessCommand.getProcessDefinitionCode()); initSubInstanceState(childInstance); @@ -3050,4 +3055,31 @@ public class ProcessServiceImpl implements ProcessService { throw new ServiceException("delete command fail, id:" + commandId); } } + + @Override + public void forceProcessInstanceSuccessByTaskInstanceId(Integer taskInstanceId) { + TaskInstance task = taskInstanceMapper.selectById(taskInstanceId); + if (task == null) { + return; + } + ProcessInstance processInstance = findProcessInstanceDetailById(task.getProcessInstanceId()); + if (processInstance != null && (processInstance.getState().typeIsFailure() || processInstance.getState().typeIsCancel())) { + List validTaskList = findValidTaskListByProcessId(processInstance.getId()); + List instanceTaskCodeList = validTaskList.stream().map(TaskInstance::getTaskCode).collect(Collectors.toList()); + List taskRelations = findRelationByCode(processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion()); + List taskDefinitionLogs = genTaskDefineList(taskRelations); + List definiteTaskCodeList = taskDefinitionLogs.stream().filter(definitionLog -> definitionLog.getFlag() == Flag.YES) + .map(TaskDefinitionLog::getCode).collect(Collectors.toList()); + // only all tasks have instances + if (org.apache.dolphinscheduler.common.utils.CollectionUtils.equalLists(instanceTaskCodeList, definiteTaskCodeList)) { + List failTaskList = validTaskList.stream().filter(instance -> instance.getState().typeIsFailure() || instance.getState().typeIsCancel()) + .map(TaskInstance::getId).collect(Collectors.toList()); + if (failTaskList.size() == 1 && failTaskList.contains(taskInstanceId)) { + processInstance.setState(ExecutionStatus.SUCCESS); + updateProcessInstance(processInstance); + } + } + } + } } \ No newline at end of file -- GitLab