From 363426de710263d0d2ee94033228742fb8e592d8 Mon Sep 17 00:00:00 2001 From: JinyLeeChina <42576980+JinyLeeChina@users.noreply.github.com> Date: Fri, 7 May 2021 16:37:49 +0800 Subject: [PATCH] [Feature][JsonSplit] fix process lineage bug (#5418) * update taskParams/add task delayTime/fix conditionType bug * update codeStyle for merge to dev * fix process lineage bug Co-authored-by: JinyLeeChina <297062848@qq.com> --- .../impl/WorkFlowLineageServiceImpl.java | 76 ++++++++----------- .../dao/entity/WorkFlowRelation.java | 8 ++ .../dao/mapper/WorkFlowLineageMapper.java | 2 +- .../dao/mapper/WorkFlowLineageMapper.xml | 13 ++-- 4 files changed, 48 insertions(+), 51 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java index 67b4168ed..72aa12eb1 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java @@ -20,11 +20,11 @@ package org.apache.dolphinscheduler.api.service.impl; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.WorkFlowLineageService; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.entity.ProcessLineage; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage; import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation; -import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper; @@ -46,9 +46,6 @@ public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkF @Autowired private WorkFlowLineageMapper workFlowLineageMapper; - @Autowired - private ProcessDefinitionMapper processDefinitionMapper; - @Autowired private ProjectMapper projectMapper; @@ -66,48 +63,41 @@ public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkF Set workFlowRelations, ProcessLineage processLineage) { List relations = workFlowLineageMapper.queryCodeRelation( - processLineage.getPostTaskCode(), processLineage.getPostTaskVersion() - , processLineage.getProcessDefinitionCode(), processLineage.getProjectCode()); - - for (ProcessLineage relation : relations) { - if (relation.getProcessDefinitionCode() != null) { - - relation.setPreTaskCode(processLineage.getPostTaskCode()); - relation.setPreTaskVersion(processLineage.getPostTaskVersion()); - - WorkFlowLineage pre = workFlowLineageMapper - .queryWorkFlowLineageByCode(processLineage.getProcessDefinitionCode(), processLineage.getProjectCode()); - // sourceWorkFlowId = "" - if (!workFlowLineageMap.containsKey(pre.getWorkFlowId())) { - workFlowLineageMap.put(pre.getWorkFlowId(), pre); - } - - WorkFlowLineage post = workFlowLineageMapper - .queryWorkFlowLineageByCode(relation.getProcessDefinitionCode(), relation.getProjectCode()); - - if (workFlowLineageMap.containsKey(post.getWorkFlowId())) { - WorkFlowLineage workFlowLineage = workFlowLineageMap.get(post.getWorkFlowId()); - String sourceWorkFlowId = workFlowLineage.getSourceWorkFlowId(); - if (sourceWorkFlowId.equals("")) { - workFlowLineage.setSourceWorkFlowId(String.valueOf(pre.getWorkFlowId())); - } else { - workFlowLineage.setSourceWorkFlowId(sourceWorkFlowId + "," + pre.getWorkFlowId()); - - } - + processLineage.getPostTaskCode(), processLineage.getPostTaskVersion(), + processLineage.getProcessDefinitionCode(), processLineage.getProjectCode()); + if (!relations.isEmpty()) { + Set preWorkFlowIds = new HashSet<>(); + List preRelations = workFlowLineageMapper.queryCodeRelation( + processLineage.getPreTaskCode(), processLineage.getPreTaskVersion(), + processLineage.getProcessDefinitionCode(), processLineage.getProjectCode()); + for (ProcessLineage preRelation : preRelations) { + WorkFlowLineage pre = workFlowLineageMapper.queryWorkFlowLineageByCode( + preRelation.getProcessDefinitionCode(), preRelation.getProjectCode()); + preWorkFlowIds.add(pre.getWorkFlowId()); + } + ProcessLineage postRelation = relations.get(0); + WorkFlowLineage post = workFlowLineageMapper.queryWorkFlowLineageByCode( + postRelation.getProcessDefinitionCode(), postRelation.getProjectCode()); + if (!workFlowLineageMap.containsKey(post.getWorkFlowId())) { + post.setSourceWorkFlowId(StringUtils.join(preWorkFlowIds, ",")); + workFlowLineageMap.put(post.getWorkFlowId(), post); + } else { + WorkFlowLineage workFlowLineage = workFlowLineageMap.get(post.getWorkFlowId()); + String sourceWorkFlowId = workFlowLineage.getSourceWorkFlowId(); + if (sourceWorkFlowId.equals("")) { + workFlowLineage.setSourceWorkFlowId(StringUtils.join(preWorkFlowIds, ",")); } else { - post.setSourceWorkFlowId(String.valueOf(pre.getWorkFlowId())); - workFlowLineageMap.put(post.getWorkFlowId(), post); + if (!preWorkFlowIds.isEmpty()) { + workFlowLineage.setSourceWorkFlowId(sourceWorkFlowId + "," + StringUtils.join(preWorkFlowIds, ",")); + } } - - WorkFlowRelation workFlowRelation = new WorkFlowRelation(); - workFlowRelation.setSourceWorkFlowId(pre.getWorkFlowId()); - workFlowRelation.setTargetWorkFlowId(post.getWorkFlowId()); - if (workFlowRelations.contains(workFlowRelation)) { - continue; + } + if (preWorkFlowIds.isEmpty()) { + workFlowRelations.add(new WorkFlowRelation(0, post.getWorkFlowId())); + } else { + for (Integer workFlowId : preWorkFlowIds) { + workFlowRelations.add(new WorkFlowRelation(workFlowId, post.getWorkFlowId())); } - workFlowRelations.add(workFlowRelation); - getRelation(workFlowLineageMap, workFlowRelations, relation); } } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java index 3a74d800d..d41bba565 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java @@ -38,6 +38,14 @@ public class WorkFlowRelation { this.targetWorkFlowId = targetWorkFlowId; } + public WorkFlowRelation() { + } + + public WorkFlowRelation(int sourceWorkFlowId, int targetWorkFlowId) { + this.sourceWorkFlowId = sourceWorkFlowId; + this.targetWorkFlowId = targetWorkFlowId; + } + @Override public boolean equals(Object obj) { return obj instanceof WorkFlowRelation diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java index 14b0ee148..026e8bfff 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java @@ -59,7 +59,7 @@ public interface WorkFlowLineageMapper { /** * queryWorkFlowLineageByCode * - * @param processDefinitionCode processDefinitioncode + * @param processDefinitionCode processDefinitionCode * @param projectCode projectCode * @return WorkFlowLineage */ diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml index a1dbeef6a..b78113b66 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml @@ -50,19 +50,18 @@