From e5cca0e79bfe16d07e931bcc68c279643ad45fab Mon Sep 17 00:00:00 2001 From: JinYong Li <42576980+JinyLeeChina@users.noreply.github.com> Date: Sat, 30 Jul 2022 23:28:31 +0800 Subject: [PATCH] [Fix-11007] [Master] fix forced_success bug (#11088) * fix forced_success bug * add comments * add transactional * refactor code Co-authored-by: JinyLeeChina --- .../service/impl/TaskInstanceServiceImpl.java | 9 +++-- .../service/process/ProcessService.java | 2 + .../service/process/ProcessServiceImpl.java | 38 +++++++++++++++++-- 3 files changed, 43 insertions(+), 6 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java index 103612f3d..136491520 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java @@ -17,6 +17,9 @@ package org.apache.dolphinscheduler.api.service.impl; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.FORCED_SUCCESS; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_INSTANCE; + import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.ProcessInstanceService; import org.apache.dolphinscheduler.api.service.ProjectService; @@ -46,13 +49,11 @@ import java.util.stream.Collectors; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.FORCED_SUCCESS; -import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_INSTANCE; - /** * task instance service impl */ @@ -166,6 +167,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst * @param taskInstanceId task instance id * @return the result code and msg */ + @Transactional @Override public Map forceTaskSuccess(User loginUser, long projectCode, Integer taskInstanceId) { Project project = projectMapper.queryByCode(projectCode); @@ -198,6 +200,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst task.setState(ExecutionStatus.FORCED_SUCCESS); int changedNum = taskInstanceMapper.updateById(task); if (changedNum > 0) { + processService.forceProcessInstanceSuccessByTaskInstanceId(taskInstanceId); putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index fd33eb115..976fd9098 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -299,4 +299,6 @@ public interface ProcessService { ProcessInstance loadNextProcess4Serial(long code, int state, int id); public String findConfigYamlByName(String clusterName) ; + + void forceProcessInstanceSuccessByTaskInstanceId(Integer taskInstanceId); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index a305ca35a..7dc4ce7be 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -1337,10 +1337,10 @@ public class ProcessServiceImpl implements ProcessService { * * @param parentInstance parentInstance * @param parentTask parentTask + * @param processMap processMap * @return process instance map */ - private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask) { - ProcessInstanceMap processMap = findWorkProcessMapByParent(parentInstance.getId(), parentTask.getId()); + private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask, ProcessInstanceMap processMap) { if (processMap != null) { return processMap; } @@ -1404,11 +1404,16 @@ public class ProcessServiceImpl implements ProcessService { // recover failover tolerance would not create a new command when the sub command already have been created return; } - instanceMap = setProcessInstanceMap(parentProcessInstance, task); + instanceMap = setProcessInstanceMap(parentProcessInstance, task, instanceMap); ProcessInstance childInstance = null; if (instanceMap.getProcessInstanceId() != 0) { childInstance = findProcessInstanceById(instanceMap.getProcessInstanceId()); } + if (childInstance != null && childInstance.getState() == ExecutionStatus.SUCCESS + && CommandType.START_FAILURE_TASK_PROCESS == parentProcessInstance.getCommandType()) { + logger.info("sub process instance {} status is success, so skip creating command", childInstance.getId()); + return; + } Command subProcessCommand = createSubProcessCommand(parentProcessInstance, childInstance, instanceMap, task); updateSubProcessDefinitionByParent(parentProcessInstance, subProcessCommand.getProcessDefinitionCode()); initSubInstanceState(childInstance); @@ -3106,4 +3111,31 @@ public class ProcessServiceImpl implements ProcessService { K8s k8s = k8sMapper.selectOne(nodeWrapper); return k8s.getK8sConfig(); } + + @Override + public void forceProcessInstanceSuccessByTaskInstanceId(Integer taskInstanceId) { + TaskInstance task = taskInstanceMapper.selectById(taskInstanceId); + if (task == null) { + return; + } + ProcessInstance processInstance = findProcessInstanceDetailById(task.getProcessInstanceId()); + if (processInstance != null && (processInstance.getState().typeIsFailure() || processInstance.getState().typeIsCancel())) { + List validTaskList = findValidTaskListByProcessId(processInstance.getId()); + List instanceTaskCodeList = validTaskList.stream().map(TaskInstance::getTaskCode).collect(Collectors.toList()); + List taskRelations = findRelationByCode(processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion()); + List taskDefinitionLogs = genTaskDefineList(taskRelations); + List definiteTaskCodeList = taskDefinitionLogs.stream().filter(definitionLog -> definitionLog.getFlag() == Flag.YES) + .map(TaskDefinitionLog::getCode).collect(Collectors.toList()); + // only all tasks have instances + if (org.apache.dolphinscheduler.common.utils.CollectionUtils.equalLists(instanceTaskCodeList, definiteTaskCodeList)) { + List failTaskList = validTaskList.stream().filter(instance -> instance.getState().typeIsFailure() || instance.getState().typeIsCancel()) + .map(TaskInstance::getId).collect(Collectors.toList()); + if (failTaskList.size() == 1 && failTaskList.contains(taskInstanceId)) { + processInstance.setState(ExecutionStatus.SUCCESS); + updateProcessInstance(processInstance); + } + } + } + } } -- GitLab