diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java index cfcd95d88ca6e86cabd4bce146fe2075327d68f2..45edd9730e5ec11a925622c22c4e4a954854bd7c 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java @@ -46,6 +46,7 @@ import java.util.stream.Collectors; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -163,6 +164,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst * @param taskInstanceId task instance id * @return the result code and msg */ + @Transactional @Override public Map forceTaskSuccess(User loginUser, long projectCode, Integer taskInstanceId) { Project project = projectMapper.queryByCode(projectCode); @@ -195,6 +197,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst task.setState(ExecutionStatus.FORCED_SUCCESS); int changedNum = taskInstanceMapper.updateById(task); if (changedNum > 0) { + processService.forceProcessInstanceSuccessByTaskInstanceId(taskInstanceId); putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index f61676e693789cb60dcf293c94e770286aea1352..86f6eb0160202a7dfbc9bf6ee9ea0679f90b2ccd 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -295,4 +295,6 @@ public interface ProcessService { org.apache.dolphinscheduler.remote.command.CommandType taskType); ProcessInstance loadNextProcess4Serial(long code, int state, int id); + + void forceProcessInstanceSuccessByTaskInstanceId(Integer taskInstanceId); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 2f0e1c077ac1238122f4b6dac639d1e83daaa584..14c998517ed31ab22a43352bbdf343f482bd8d31 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -1308,10 +1308,10 @@ public class ProcessServiceImpl implements ProcessService { * * @param parentInstance parentInstance * @param parentTask parentTask + * @param processMap processMap * @return process instance map */ - private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask) { - ProcessInstanceMap processMap = findWorkProcessMapByParent(parentInstance.getId(), parentTask.getId()); + private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask, ProcessInstanceMap processMap) { if (processMap != null) { return processMap; } @@ -1375,11 +1375,16 @@ public class ProcessServiceImpl implements ProcessService { // recover failover tolerance would not create a new command when the sub command already have been created return; } - instanceMap = setProcessInstanceMap(parentProcessInstance, task); + instanceMap = setProcessInstanceMap(parentProcessInstance, task, instanceMap); ProcessInstance childInstance = null; if (instanceMap.getProcessInstanceId() != 0) { childInstance = findProcessInstanceById(instanceMap.getProcessInstanceId()); } + if (childInstance != null && childInstance.getState() == ExecutionStatus.SUCCESS + && CommandType.START_FAILURE_TASK_PROCESS == parentProcessInstance.getCommandType()) { + logger.info("sub process instance {} status is success, so skip creating command", childInstance.getId()); + return; + } Command subProcessCommand = createSubProcessCommand(parentProcessInstance, childInstance, instanceMap, task); updateSubProcessDefinitionByParent(parentProcessInstance, subProcessCommand.getProcessDefinitionCode()); initSubInstanceState(childInstance); @@ -3050,4 +3055,31 @@ public class ProcessServiceImpl implements ProcessService { throw new ServiceException("delete command fail, id:" + commandId); } } + + @Override + public void forceProcessInstanceSuccessByTaskInstanceId(Integer taskInstanceId) { + TaskInstance task = taskInstanceMapper.selectById(taskInstanceId); + if (task == null) { + return; + } + ProcessInstance processInstance = findProcessInstanceDetailById(task.getProcessInstanceId()); + if (processInstance != null && (processInstance.getState().typeIsFailure() || processInstance.getState().typeIsCancel())) { + List validTaskList = findValidTaskListByProcessId(processInstance.getId()); + List instanceTaskCodeList = validTaskList.stream().map(TaskInstance::getTaskCode).collect(Collectors.toList()); + List taskRelations = findRelationByCode(processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion()); + List taskDefinitionLogs = genTaskDefineList(taskRelations); + List definiteTaskCodeList = taskDefinitionLogs.stream().filter(definitionLog -> definitionLog.getFlag() == Flag.YES) + .map(TaskDefinitionLog::getCode).collect(Collectors.toList()); + // only all tasks have instances + if (org.apache.dolphinscheduler.common.utils.CollectionUtils.equalLists(instanceTaskCodeList, definiteTaskCodeList)) { + List failTaskList = validTaskList.stream().filter(instance -> instance.getState().typeIsFailure() || instance.getState().typeIsCancel()) + .map(TaskInstance::getId).collect(Collectors.toList()); + if (failTaskList.size() == 1 && failTaskList.contains(taskInstanceId)) { + processInstance.setState(ExecutionStatus.SUCCESS); + updateProcessInstance(processInstance); + } + } + } + } } \ No newline at end of file