diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index cdd9ff221972b1728876c6bab0165d0db8f7824f..480d6657c2d80fdada39e16e609002a3ea09edde 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -48,13 +48,13 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; -import java.util.HashSet; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.dolphinscheduler.common.Constants.*; +import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; /** * TaskUpdateQueue consumer @@ -328,36 +328,38 @@ public class TaskPriorityQueueConsumer extends Thread{ return false; } - /** - * create project resource files + * get resource full name list */ - private List getResourceFullNames(TaskNode taskNode){ - - Set resourceIdsSet = new HashSet<>(); + private List getResourceFullNames(TaskNode taskNode) { + List resourceFullNameList = new ArrayList<>(); AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams()); if (baseParam != null) { List projectResourceFiles = baseParam.getResourceFilesList(); if (projectResourceFiles != null) { - Stream resourceInfotream = projectResourceFiles.stream().map(resourceInfo -> resourceInfo.getId()); - resourceIdsSet.addAll(resourceInfotream.collect(Collectors.toSet())); - } - } + // filter the resources that the resource id equals 0 + Set oldVersionResources = projectResourceFiles.stream().filter(t -> t.getId() == 0).collect(Collectors.toSet()); + if (CollectionUtils.isNotEmpty(oldVersionResources)) { + resourceFullNameList.addAll(oldVersionResources.stream().map(resource -> resource.getRes()).collect(Collectors.toSet())); + } - if (CollectionUtils.isEmpty(resourceIdsSet)){ - return null; - } - - Integer[] resourceIds = resourceIdsSet.toArray(new Integer[resourceIdsSet.size()]); + // get the resource id in order to get the resource names in batch + Stream resourceIdStream = projectResourceFiles.stream().map(resourceInfo -> resourceInfo.getId()); + Set resourceIdsSet = resourceIdStream.collect(Collectors.toSet()); - List resources = processService.listResourceByIds(resourceIds); + if (CollectionUtils.isNotEmpty(resourceIdsSet)) { + Integer[] resourceIds = resourceIdsSet.toArray(new Integer[resourceIdsSet.size()]); - List resourceFullNames = resources.stream() - .map(resourceInfo -> resourceInfo.getFullName()) - .collect(Collectors.toList()); + List resources = processService.listResourceByIds(resourceIds); + resourceFullNameList.addAll(resources.stream() + .map(resourceInfo -> resourceInfo.getFullName()) + .collect(Collectors.toList())); + } + } + } - return resourceFullNames; + return resourceFullNameList; } }