diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java index 103612f3d7dd365fde112d2e7eba3a8c69c9e7b2..1364915203e957918414f7f60c1b4759fc04d489 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java @@ -17,6 +17,9 @@ package org.apache.dolphinscheduler.api.service.impl; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.FORCED_SUCCESS; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_INSTANCE; + import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.ProcessInstanceService; import org.apache.dolphinscheduler.api.service.ProjectService; @@ -46,13 +49,11 @@ import java.util.stream.Collectors; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.FORCED_SUCCESS; -import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_INSTANCE; - /** * task instance service impl */ @@ -166,6 +167,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst * @param taskInstanceId task instance id * @return the result code and msg */ + @Transactional @Override public Map forceTaskSuccess(User loginUser, long projectCode, Integer taskInstanceId) { Project project = projectMapper.queryByCode(projectCode); @@ -198,6 +200,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst task.setState(ExecutionStatus.FORCED_SUCCESS); int changedNum = taskInstanceMapper.updateById(task); if (changedNum > 0) { + processService.forceProcessInstanceSuccessByTaskInstanceId(taskInstanceId); putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index fd33eb115aa98a4125330f9867f7ed9388e8e39d..976fd909862c7dbf1f7970347624a2939f5521d3 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -299,4 +299,6 @@ public interface ProcessService { ProcessInstance loadNextProcess4Serial(long code, int state, int id); public String findConfigYamlByName(String clusterName) ; + + void forceProcessInstanceSuccessByTaskInstanceId(Integer taskInstanceId); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index a305ca35a0e5fdff62f0ebbc971bd94d06762f0f..7dc4ce7bed39aacc249c08e94fa0749233d58a6b 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -1337,10 +1337,10 @@ public class ProcessServiceImpl implements ProcessService { * * @param parentInstance parentInstance * @param parentTask parentTask + * @param processMap processMap * @return process instance map */ - private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask) { - ProcessInstanceMap processMap = findWorkProcessMapByParent(parentInstance.getId(), parentTask.getId()); + private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask, ProcessInstanceMap processMap) { if (processMap != null) { return processMap; } @@ -1404,11 +1404,16 @@ public class ProcessServiceImpl implements ProcessService { // recover failover tolerance would not create a new command when the sub command already have been created return; } - instanceMap = setProcessInstanceMap(parentProcessInstance, task); + instanceMap = setProcessInstanceMap(parentProcessInstance, task, instanceMap); ProcessInstance childInstance = null; if (instanceMap.getProcessInstanceId() != 0) { childInstance = findProcessInstanceById(instanceMap.getProcessInstanceId()); } + if (childInstance != null && childInstance.getState() == ExecutionStatus.SUCCESS + && CommandType.START_FAILURE_TASK_PROCESS == parentProcessInstance.getCommandType()) { + logger.info("sub process instance {} status is success, so skip creating command", childInstance.getId()); + return; + } Command subProcessCommand = createSubProcessCommand(parentProcessInstance, childInstance, instanceMap, task); updateSubProcessDefinitionByParent(parentProcessInstance, subProcessCommand.getProcessDefinitionCode()); initSubInstanceState(childInstance); @@ -3106,4 +3111,31 @@ public class ProcessServiceImpl implements ProcessService { K8s k8s = k8sMapper.selectOne(nodeWrapper); return k8s.getK8sConfig(); } + + @Override + public void forceProcessInstanceSuccessByTaskInstanceId(Integer taskInstanceId) { + TaskInstance task = taskInstanceMapper.selectById(taskInstanceId); + if (task == null) { + return; + } + ProcessInstance processInstance = findProcessInstanceDetailById(task.getProcessInstanceId()); + if (processInstance != null && (processInstance.getState().typeIsFailure() || processInstance.getState().typeIsCancel())) { + List validTaskList = findValidTaskListByProcessId(processInstance.getId()); + List instanceTaskCodeList = validTaskList.stream().map(TaskInstance::getTaskCode).collect(Collectors.toList()); + List taskRelations = findRelationByCode(processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion()); + List taskDefinitionLogs = genTaskDefineList(taskRelations); + List definiteTaskCodeList = taskDefinitionLogs.stream().filter(definitionLog -> definitionLog.getFlag() == Flag.YES) + .map(TaskDefinitionLog::getCode).collect(Collectors.toList()); + // only all tasks have instances + if (org.apache.dolphinscheduler.common.utils.CollectionUtils.equalLists(instanceTaskCodeList, definiteTaskCodeList)) { + List failTaskList = validTaskList.stream().filter(instance -> instance.getState().typeIsFailure() || instance.getState().typeIsCancel()) + .map(TaskInstance::getId).collect(Collectors.toList()); + if (failTaskList.size() == 1 && failTaskList.contains(taskInstanceId)) { + processInstance.setState(ExecutionStatus.SUCCESS); + updateProcessInstance(processInstance); + } + } + } + } }