diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java index 67b4168edf2e1afcc296dee17e7dd4471778c21e..72aa12eb1bae3faa7f46bd83ef822042ed45e644 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java @@ -20,11 +20,11 @@ package org.apache.dolphinscheduler.api.service.impl; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.WorkFlowLineageService; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.entity.ProcessLineage; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage; import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation; -import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper; @@ -46,9 +46,6 @@ public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkF @Autowired private WorkFlowLineageMapper workFlowLineageMapper; - @Autowired - private ProcessDefinitionMapper processDefinitionMapper; - @Autowired private ProjectMapper projectMapper; @@ -66,48 +63,41 @@ public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkF Set workFlowRelations, ProcessLineage processLineage) { List relations = workFlowLineageMapper.queryCodeRelation( - processLineage.getPostTaskCode(), processLineage.getPostTaskVersion() - , processLineage.getProcessDefinitionCode(), processLineage.getProjectCode()); - - for (ProcessLineage relation : relations) { - if (relation.getProcessDefinitionCode() != null) { - - relation.setPreTaskCode(processLineage.getPostTaskCode()); - relation.setPreTaskVersion(processLineage.getPostTaskVersion()); - - WorkFlowLineage pre = workFlowLineageMapper - .queryWorkFlowLineageByCode(processLineage.getProcessDefinitionCode(), processLineage.getProjectCode()); - // sourceWorkFlowId = "" - if (!workFlowLineageMap.containsKey(pre.getWorkFlowId())) { - workFlowLineageMap.put(pre.getWorkFlowId(), pre); - } - - WorkFlowLineage post = workFlowLineageMapper - .queryWorkFlowLineageByCode(relation.getProcessDefinitionCode(), relation.getProjectCode()); - - if (workFlowLineageMap.containsKey(post.getWorkFlowId())) { - WorkFlowLineage workFlowLineage = workFlowLineageMap.get(post.getWorkFlowId()); - String sourceWorkFlowId = workFlowLineage.getSourceWorkFlowId(); - if (sourceWorkFlowId.equals("")) { - workFlowLineage.setSourceWorkFlowId(String.valueOf(pre.getWorkFlowId())); - } else { - workFlowLineage.setSourceWorkFlowId(sourceWorkFlowId + "," + pre.getWorkFlowId()); - - } - + processLineage.getPostTaskCode(), processLineage.getPostTaskVersion(), + processLineage.getProcessDefinitionCode(), processLineage.getProjectCode()); + if (!relations.isEmpty()) { + Set preWorkFlowIds = new HashSet<>(); + List preRelations = workFlowLineageMapper.queryCodeRelation( + processLineage.getPreTaskCode(), processLineage.getPreTaskVersion(), + processLineage.getProcessDefinitionCode(), processLineage.getProjectCode()); + for (ProcessLineage preRelation : preRelations) { + WorkFlowLineage pre = workFlowLineageMapper.queryWorkFlowLineageByCode( + preRelation.getProcessDefinitionCode(), preRelation.getProjectCode()); + preWorkFlowIds.add(pre.getWorkFlowId()); + } + ProcessLineage postRelation = relations.get(0); + WorkFlowLineage post = workFlowLineageMapper.queryWorkFlowLineageByCode( + postRelation.getProcessDefinitionCode(), postRelation.getProjectCode()); + if (!workFlowLineageMap.containsKey(post.getWorkFlowId())) { + post.setSourceWorkFlowId(StringUtils.join(preWorkFlowIds, ",")); + workFlowLineageMap.put(post.getWorkFlowId(), post); + } else { + WorkFlowLineage workFlowLineage = workFlowLineageMap.get(post.getWorkFlowId()); + String sourceWorkFlowId = workFlowLineage.getSourceWorkFlowId(); + if (sourceWorkFlowId.equals("")) { + workFlowLineage.setSourceWorkFlowId(StringUtils.join(preWorkFlowIds, ",")); } else { - post.setSourceWorkFlowId(String.valueOf(pre.getWorkFlowId())); - workFlowLineageMap.put(post.getWorkFlowId(), post); + if (!preWorkFlowIds.isEmpty()) { + workFlowLineage.setSourceWorkFlowId(sourceWorkFlowId + "," + StringUtils.join(preWorkFlowIds, ",")); + } } - - WorkFlowRelation workFlowRelation = new WorkFlowRelation(); - workFlowRelation.setSourceWorkFlowId(pre.getWorkFlowId()); - workFlowRelation.setTargetWorkFlowId(post.getWorkFlowId()); - if (workFlowRelations.contains(workFlowRelation)) { - continue; + } + if (preWorkFlowIds.isEmpty()) { + workFlowRelations.add(new WorkFlowRelation(0, post.getWorkFlowId())); + } else { + for (Integer workFlowId : preWorkFlowIds) { + workFlowRelations.add(new WorkFlowRelation(workFlowId, post.getWorkFlowId())); } - workFlowRelations.add(workFlowRelation); - getRelation(workFlowLineageMap, workFlowRelations, relation); } } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java index 3a74d800d4c27c7744d6bc3bde3f1c4c3f1bd24c..d41bba565911bb978ebf6c22bf35578baf989b80 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkFlowRelation.java @@ -38,6 +38,14 @@ public class WorkFlowRelation { this.targetWorkFlowId = targetWorkFlowId; } + public WorkFlowRelation() { + } + + public WorkFlowRelation(int sourceWorkFlowId, int targetWorkFlowId) { + this.sourceWorkFlowId = sourceWorkFlowId; + this.targetWorkFlowId = targetWorkFlowId; + } + @Override public boolean equals(Object obj) { return obj instanceof WorkFlowRelation diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java index 14b0ee148150105de6e247aab0afefac3a266148..026e8bffff8e728edde7ccc0badc2487d6181697 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java @@ -59,7 +59,7 @@ public interface WorkFlowLineageMapper { /** * queryWorkFlowLineageByCode * - * @param processDefinitionCode processDefinitioncode + * @param processDefinitionCode processDefinitionCode * @param projectCode projectCode * @return WorkFlowLineage */ diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml index a1dbeef6a9365f54d7e6fe23335ec2298178b7f3..b78113b66fe51290741f0c5da5d4cfd874343ee9 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml @@ -50,19 +50,18 @@