From 890faffd22247dc2577b4127d06f1ec0d8763033 Mon Sep 17 00:00:00 2001 From: JinYong Li <42576980+JinyLeeChina@users.noreply.github.com> Date: Wed, 29 Dec 2021 00:27:54 +0800 Subject: [PATCH] [Bug][ApiServer] workflow copy (#7694) * fix workflow copy * fix copy * fix copy * code style --- .../dolphinscheduler/api/enums/Status.java | 2 +- .../impl/ProcessDefinitionServiceImpl.java | 46 +++++++++++++++---- 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 87f658b8b..413a32808 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -293,7 +293,7 @@ public enum Status { PROJECT_PROCESS_NOT_MATCH(50054, "the project and the process is not match", "项目和工作流不匹配"), DELETE_EDGE_ERROR(50055, "delete edge error", "删除工作流任务连接线错误"), NOT_SUPPORT_UPDATE_TASK_DEFINITION(50056, "task state does not support modification", "当前任务不支持修改"), - NOT_SUPPORT_COPY_TASK_TYPE(50057, "task type [{}] does not support copy", "不支持复制的任务类型[{}]"), + NOT_SUPPORT_COPY_TASK_TYPE(50057, "task type [{0}] does not support copy", "不支持复制的任务类型[{0}]"), HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"), /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 9c91043a5..20fa7264c 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -76,8 +76,6 @@ import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.mapred.TaskLog.LogName; -import org.apache.yetus.audience.InterfaceAudience.Public; import java.io.BufferedOutputStream; import java.io.IOException; @@ -108,6 +106,7 @@ import org.springframework.web.multipart.MultipartFile; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Lists; @@ -1099,13 +1098,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes); return result; } - HashMap userProjects = new HashMap(Constants.DEFAULT_HASH_MAP_SIZE); + HashMap userProjects = new HashMap<>(Constants.DEFAULT_HASH_MAP_SIZE); projectMapper.queryProjectCreatedAndAuthorizedByUserId(loginUser.getId()) - .forEach(userProject -> userProjects.put(userProject.getCode(), userProject)); + .forEach(userProject -> userProjects.put(userProject.getCode(), userProject)); // check processDefinition exist in project - List processDefinitionListInProject = processDefinitionList.stream(). - filter(o -> userProjects.containsKey(o.getProjectCode())).collect(Collectors.toList()); + List processDefinitionListInProject = processDefinitionList.stream() + .filter(o -> userProjects.containsKey(o.getProjectCode())).collect(Collectors.toList()); if (CollectionUtils.isEmpty(processDefinitionListInProject)) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes); return result; @@ -1309,6 +1308,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } List failedProcessList = new ArrayList<>(); doBatchOperateProcessDefinition(loginUser, targetProjectCode, failedProcessList, codes, result, true); + if (result.get(Constants.STATUS) == Status.NOT_SUPPORT_COPY_TASK_TYPE) { + return result; + } checkBatchOperateResult(projectCode, targetProjectCode, result, failedProcessList, true); return result; } @@ -1386,18 +1388,35 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro processDefinition.setProjectCode(targetProjectCode); if (isCopy) { List taskDefinitionLogs = processService.genTaskDefineList(processTaskRelations); + Map taskCodeMap = new HashMap<>(); for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { if (TaskType.CONDITIONS.getDesc().equals(taskDefinitionLog.getTaskType()) || TaskType.SWITCH.getDesc().equals(taskDefinitionLog.getTaskType()) - || TaskType.SUB_PROCESS.getDesc().equals(taskDefinitionLog.getTaskType())) { + || TaskType.SUB_PROCESS.getDesc().equals(taskDefinitionLog.getTaskType()) + || TaskType.DEPENDENT.getDesc().equals(taskDefinitionLog.getTaskType())) { putMsg(result, Status.NOT_SUPPORT_COPY_TASK_TYPE, taskDefinitionLog.getTaskType()); - throw new ServiceException(Status.NOT_SUPPORT_COPY_TASK_TYPE); + return; + } + try { + long taskCode = CodeGenerateUtils.getInstance().genCode(); + taskCodeMap.put(taskDefinitionLog.getCode(), taskCode); + taskDefinitionLog.setCode(taskCode); + } catch (CodeGenerateException e) { + putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS); + throw new ServiceException(Status.INTERNAL_SERVER_ERROR_ARGS); } taskDefinitionLog.setProjectCode(targetProjectCode); - taskDefinitionLog.setCode(0L); taskDefinitionLog.setVersion(0); taskDefinitionLog.setName(taskDefinitionLog.getName() + "_copy_" + DateUtils.getCurrentTimeStamp()); } + for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) { + if (processTaskRelationLog.getPreTaskCode() > 0) { + processTaskRelationLog.setPreTaskCode(taskCodeMap.get(processTaskRelationLog.getPreTaskCode())); + } + if (processTaskRelationLog.getPostTaskCode() > 0) { + processTaskRelationLog.setPostTaskCode(taskCodeMap.get(processTaskRelationLog.getPostTaskCode())); + } + } try { processDefinition.setCode(CodeGenerateUtils.getInstance().genCode()); } catch (CodeGenerateException e) { @@ -1407,6 +1426,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro processDefinition.setId(0); processDefinition.setUserId(loginUser.getId()); processDefinition.setName(processDefinition.getName() + "_copy_" + DateUtils.getCurrentTimeStamp()); + if (StringUtils.isNotBlank(processDefinition.getLocations())) { + ArrayNode jsonNodes = JSONUtils.parseArray(processDefinition.getLocations()); + for (int i = 0; i < jsonNodes.size(); i++) { + ObjectNode node = (ObjectNode) jsonNodes.path(i); + node.put("taskCode", taskCodeMap.get(node.get("taskCode").asLong())); + jsonNodes.set(i, node); + } + processDefinition.setLocations(JSONUtils.toJsonString(jsonNodes)); + } try { result.putAll(createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs)); } catch (Exception e) { -- GitLab