/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package cn.escheduler.api.service; import cn.escheduler.api.dto.treeview.Instance; import cn.escheduler.api.dto.treeview.TreeViewDto; import cn.escheduler.api.enums.Status; import cn.escheduler.api.utils.Constants; import cn.escheduler.api.utils.PageInfo; import cn.escheduler.common.enums.Flag; import cn.escheduler.common.enums.ReleaseState; import cn.escheduler.common.enums.TaskType; import cn.escheduler.common.enums.UserType; import cn.escheduler.common.graph.DAG; import cn.escheduler.common.model.TaskNode; import cn.escheduler.common.model.TaskNodeRelation; import cn.escheduler.common.process.ProcessDag; import cn.escheduler.common.process.Property; import cn.escheduler.common.thread.Stopper; import cn.escheduler.common.utils.CollectionUtils; import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.JSONUtils; import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.mapper.*; import cn.escheduler.dao.model.*; import com.alibaba.fastjson.JSON; import com.fasterxml.jackson.core.JsonProcessingException; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import static cn.escheduler.api.enums.Status.UPDATE_PROCESS_DEFINITION_ERROR; import static cn.escheduler.api.service.SchedulerService.deleteSchedule; import static cn.escheduler.api.utils.CheckUtils.checkOtherParams; import static cn.escheduler.api.utils.CheckUtils.checkTaskNodeParameters; import static cn.escheduler.common.Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID; /** * process definition service */ @Service public class ProcessDefinitionService extends BaseDAGService { private static final Logger logger = LoggerFactory.getLogger(ProcessDefinitionService.class); @Autowired private ProjectMapper projectMapper; @Autowired private ProjectService projectService; @Autowired private ProcessDefinitionMapper processDefineMapper; @Autowired private ProcessInstanceMapper processInstanceMapper; @Autowired private TaskInstanceMapper taskInstanceMapper; @Autowired private ScheduleMapper scheduleMapper; @Autowired private ProcessDao processDao; /** * create process definition * * @param loginUser * @param projectName * @param name * @param processDefinitionJson * @param desc * @param locations * @param connects * @return */ public Map createProcessDefinition(User loginUser, String projectName, String name, String processDefinitionJson, String desc, String locations, String connects) throws JsonProcessingException { Map result = new HashMap<>(5); Project project = projectMapper.queryByName(projectName); // check project auth Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); Status resultStatus = (Status) checkResult.get(Constants.STATUS); if (resultStatus != Status.SUCCESS) { return checkResult; } ProcessDefinition processDefine = new ProcessDefinition(); Date now = new Date(); ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); Map checkProcessJson = checkProcessNodeList(processData, processDefinitionJson); if (checkProcessJson.get(Constants.STATUS) != Status.SUCCESS) { return checkProcessJson; } processDefine.setName(name); processDefine.setReleaseState(ReleaseState.OFFLINE); processDefine.setProjectId(project.getId()); processDefine.setUserId(loginUser.getId()); processDefine.setProcessDefinitionJson(processDefinitionJson); processDefine.setDesc(desc); processDefine.setLocations(locations); processDefine.setConnects(connects); processDefine.setTimeout(processData.getTimeout()); processDefine.setTenantId(processData.getTenantId()); //custom global params List globalParamsList = processData.getGlobalParams(); if (globalParamsList != null && globalParamsList.size() > 0) { Set globalParamsSet = new HashSet<>(globalParamsList); globalParamsList = new ArrayList<>(globalParamsSet); processDefine.setGlobalParamList(globalParamsList); } processDefine.setCreateTime(now); processDefine.setUpdateTime(now); processDefine.setFlag(Flag.YES); processDefineMapper.insert(processDefine); putMsg(result, Status.SUCCESS); return result; } /** * query proccess definition list * * @param loginUser * @param projectName * @return */ public Map queryProccessDefinitionList(User loginUser, String projectName) { HashMap result = new HashMap<>(5); Project project = projectMapper.queryByName(projectName); Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); Status resultStatus = (Status) checkResult.get(Constants.STATUS); if (resultStatus != Status.SUCCESS) { return checkResult; } List resourceList = processDefineMapper.queryAllDefinitionList(project.getId()); result.put(Constants.DATA_LIST, resourceList); putMsg(result, Status.SUCCESS); return result; } /** * query proccess definition list paging * * @param loginUser * @param projectName * @param searchVal * @param pageNo * @param pageSize * @param userId * @return */ public Map queryProcessDefinitionListPaging(User loginUser, String projectName, String searchVal, Integer pageNo, Integer pageSize, Integer userId) { Map result = new HashMap<>(5); Project project = projectMapper.queryByName(projectName); Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); Status resultStatus = (Status) checkResult.get(Constants.STATUS); if (resultStatus != Status.SUCCESS) { return checkResult; } Integer count = processDefineMapper.countDefineNumber(project.getId(), userId, searchVal); PageInfo pageInfo = new PageInfo(pageNo, pageSize); List resourceList = processDefineMapper.queryDefineListPaging(project.getId(), searchVal, userId, pageInfo.getStart(), pageSize); pageInfo.setTotalCount(count); pageInfo.setLists(resourceList); result.put(Constants.DATA_LIST, pageInfo); putMsg(result, Status.SUCCESS); return result; } /** * query datail of process definition * * @param loginUser * @param projectName * @param processId * @return */ public Map queryProccessDefinitionById(User loginUser, String projectName, Integer processId) { Map result = new HashMap<>(5); Project project = projectMapper.queryByName(projectName); Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); Status resultStatus = (Status) checkResult.get(Constants.STATUS); if (resultStatus != Status.SUCCESS) { return checkResult; } ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(processId); if (processDefinition == null) { putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processId); } else { result.put(Constants.DATA_LIST, processDefinition); putMsg(result, Status.SUCCESS); } return result; } /** * update process definition * * @param loginUser * @param projectName * @param id * @param name * @param processDefinitionJson * @param desc * @param locations * @param connects * @return */ public Map updateProcessDefinition(User loginUser, String projectName, int id, String name, String processDefinitionJson, String desc, String locations, String connects) { Map result = new HashMap<>(5); Project project = projectMapper.queryByName(projectName); Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); Status resultStatus = (Status) checkResult.get(Constants.STATUS); if (resultStatus != Status.SUCCESS) { return checkResult; } ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); Map checkProcessJson = checkProcessNodeList(processData, processDefinitionJson); if ((checkProcessJson.get(Constants.STATUS) != Status.SUCCESS)) { return checkProcessJson; } ProcessDefinition processDefinition = processDao.findProcessDefineById(id); if (processDefinition == null) { // check process definition exists putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, id); return result; } else if (processDefinition.getReleaseState() == ReleaseState.ONLINE) { // online can not permit edit putMsg(result, Status.PROCESS_DEFINE_NOT_ALLOWED_EDIT, processDefinition.getName()); return result; } else { putMsg(result, Status.SUCCESS); } ProcessDefinition processDefine = processDao.findProcessDefineById(id); Date now = new Date(); processDefine.setId(id); processDefine.setName(name); processDefine.setReleaseState(ReleaseState.OFFLINE); processDefine.setProjectId(project.getId()); processDefine.setProcessDefinitionJson(processDefinitionJson); processDefine.setDesc(desc); processDefine.setLocations(locations); processDefine.setConnects(connects); processDefine.setTimeout(processData.getTimeout()); processDefine.setTenantId(processData.getTenantId()); //custom global params List globalParamsList = new ArrayList<>(); if (processData.getGlobalParams() != null && processData.getGlobalParams().size() > 0) { Set userDefParamsSet = new HashSet<>(processData.getGlobalParams()); globalParamsList = new ArrayList<>(userDefParamsSet); } processDefine.setGlobalParamList(globalParamsList); processDefine.setUpdateTime(now); processDefine.setFlag(Flag.YES); if (processDefineMapper.update(processDefine) > 0) { putMsg(result, Status.SUCCESS); } else { putMsg(result, UPDATE_PROCESS_DEFINITION_ERROR); } return result; } /** * verify process definition name unique * * @param loginUser * @param projectName * @param name * @return */ public Map verifyProccessDefinitionName(User loginUser, String projectName, String name) { Map result = new HashMap<>(); Project project = projectMapper.queryByName(projectName); Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); Status resultEnum = (Status) checkResult.get(Constants.STATUS); if (resultEnum != Status.SUCCESS) { return checkResult; } ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(project.getId(), name); if (processDefinition == null) { putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.PROCESS_INSTANCE_EXIST, name); } return result; } /** * delete process definition by id * * @param loginUser * @param projectName * @param processDefinitionId * @return */ @Transactional(value = "TransactionManager", rollbackFor = Exception.class) public Map deleteProcessDefinitionById(User loginUser, String projectName, Integer processDefinitionId) { Map result = new HashMap<>(5); Project project = projectMapper.queryByName(projectName); Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); Status resultEnum = (Status) checkResult.get(Constants.STATUS); if (resultEnum != Status.SUCCESS) { return checkResult; } ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(processDefinitionId); if (processDefinition == null) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionId); return result; } // Determine if the login user is the owner of the process definition if (loginUser.getId() != processDefinition.getUserId() && loginUser.getUserType() != UserType.ADMIN_USER) { putMsg(result, Status.USER_NO_OPERATION_PERM); return result; } // check process definition is already online if (processDefinition.getReleaseState() == ReleaseState.ONLINE) { putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE,processDefinitionId); return result; } // get the timing according to the process definition List schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId); if (!schedules.isEmpty() && schedules.size() > 1) { logger.warn("scheduler num is {},Greater than 1",schedules.size()); putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR); return result; }else if(schedules.size() == 1){ Schedule schedule = schedules.get(0); if(schedule.getReleaseState() == ReleaseState.OFFLINE){ scheduleMapper.delete(schedule.getId()); }else if(schedule.getReleaseState() == ReleaseState.ONLINE){ putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE,schedule.getId()); return result; } } int delete = processDefineMapper.delete(processDefinitionId); if (delete > 0) { putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR); } return result; } /** * batch delete process definition by ids * * @param loginUser * @param projectName * @param processDefinitionIds * @return */ public Map batchDeleteProcessDefinitionByIds(User loginUser, String projectName, String processDefinitionIds) { Map result = new HashMap<>(5); Map deleteReuslt = new HashMap<>(5); List deleteFailedIdList = new ArrayList(); Project project = projectMapper.queryByName(projectName); Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); Status resultEnum = (Status) checkResult.get(Constants.STATUS); if (resultEnum != Status.SUCCESS) { return checkResult; } if(StringUtils.isNotEmpty(processDefinitionIds)){ String[] processInstanceIdArray = processDefinitionIds.split(","); for (String strProcessInstanceId:processInstanceIdArray) { int processInstanceId = Integer.parseInt(strProcessInstanceId); try { deleteReuslt = deleteProcessDefinitionById(loginUser, projectName, processInstanceId); if(!Status.SUCCESS.equals(deleteReuslt.get(Constants.STATUS))){ deleteFailedIdList.add(processInstanceId); logger.error((String)deleteReuslt.get(Constants.MSG)); } } catch (Exception e) { deleteFailedIdList.add(processInstanceId); } } } if(deleteFailedIdList.size() > 0){ putMsg(result, Status.BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR,StringUtils.join(deleteFailedIdList.toArray(),",")); }else{ putMsg(result, Status.SUCCESS); } return result; } /** * release process definition: online / offline * * @param loginUser * @param projectName * @param id * @param releaseState * @return */ @Transactional(value = "TransactionManager", rollbackFor = Exception.class) public Map releaseProcessDefinition(User loginUser, String projectName, int id, int releaseState) { HashMap result = new HashMap<>(); Project project = projectMapper.queryByName(projectName); Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); Status resultEnum = (Status) checkResult.get(Constants.STATUS); if (resultEnum != Status.SUCCESS) { return checkResult; } ReleaseState state = ReleaseState.getEnum(releaseState); switch (state) { case ONLINE: { processDefineMapper.updateProcessDefinitionReleaseState(id, state); break; } case OFFLINE: { processDefineMapper.updateProcessDefinitionReleaseState(id, state); List scheduleList = scheduleMapper.selectAllByProcessDefineArray(new int[]{id}); for(Schedule schedule:scheduleList){ logger.info("set schedule offline, schedule id: {}, process definition id: {}", project.getId(), schedule.getId(), id); // set status schedule.setReleaseState(ReleaseState.OFFLINE); scheduleMapper.update(schedule); deleteSchedule(project.getId(), schedule.getId()); } break; } default: { putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "releaseState"); return result; } } putMsg(result, Status.SUCCESS); return result; } /** * check the process definition node meets the specifications * * @param processData * @param processDefinitionJson * @return */ public Map checkProcessNodeList(ProcessData processData, String processDefinitionJson) { Map result = new HashMap<>(5); try { if (processData == null) { logger.error("process data is null"); putMsg(result,Status.DATA_IS_NOT_VALID, processDefinitionJson); return result; } // Check whether the task node is normal List taskNodes = processData.getTasks(); if (taskNodes == null) { logger.error("process node info is empty"); putMsg(result, Status.DATA_IS_NULL, processDefinitionJson); return result; } // check has cycle if (graphHasCycle(taskNodes)) { logger.error("process DAG has cycle"); putMsg(result, Status.PROCESS_NODE_HAS_CYCLE); return result; } // check whether the process definition json is normal for (TaskNode taskNode : taskNodes) { if (!checkTaskNodeParameters(taskNode.getParams(), taskNode.getType())) { logger.error("task node {} parameter invalid", taskNode.getName()); putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskNode.getName()); return result; } // check extra params checkOtherParams(taskNode.getExtras()); } putMsg(result,Status.SUCCESS); } catch (Exception e) { result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR); result.put(Constants.MSG, e.getMessage()); } return result; } /** * get task node details based on process definition */ public Map getTaskNodeListByDefinitionId(Integer defineId) throws Exception { Map result = new HashMap<>(); ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(defineId); if (processDefinition == null) { logger.info("process define not exists"); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinition.getId()); return result; } String processDefinitionJson = processDefinition.getProcessDefinitionJson(); ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); List taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks(); result.put(Constants.DATA_LIST, taskNodeList); putMsg(result, Status.SUCCESS); return result; } /** * get task node details based on process definition */ public Map getTaskNodeListByDefinitionIdList(String defineIdList) throws Exception { Map result = new HashMap<>(); Map> taskNodeMap = new HashMap<>(); String[] idList = defineIdList.split(","); List definitionIdList = Arrays.asList(idList); List processDefinitionList = processDefineMapper.queryDefinitionListByIdList(definitionIdList); if (processDefinitionList == null || processDefinitionList.size() ==0) { logger.info("process definition not exists"); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineIdList); return result; } for(ProcessDefinition processDefinition : processDefinitionList){ String processDefinitionJson = processDefinition.getProcessDefinitionJson(); ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); List taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks(); taskNodeMap.put(processDefinition.getId(), taskNodeList); } result.put(Constants.DATA_LIST, taskNodeMap); putMsg(result, Status.SUCCESS); return result; } /** * Encapsulates the TreeView structure * * @param processId * @param limit * @return */ public Map viewTree(Integer processId, Integer limit) throws Exception { Map result = new HashMap<>(); ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(processId); if (processDefinition == null) { logger.info("process define not exists"); throw new RuntimeException("process define not exists"); } DAG dag = genDagGraph(processDefinition); /** * nodes that is running */ Map> runningNodeMap = new ConcurrentHashMap<>(); /** * nodes that is waiting torun */ Map> waitingRunningNodeMap = new ConcurrentHashMap<>(); /** * List of process instances */ List processInstanceList = processInstanceMapper.queryByProcessDefineId(processId, limit); if (limit > processInstanceList.size()) { limit = processInstanceList.size(); } TreeViewDto parentTreeViewDto = new TreeViewDto(); parentTreeViewDto.setName("DAG"); parentTreeViewDto.setType(""); // Specify the process definition, because it is a TreeView for a process definition for (int i = limit - 1; i >= 0; i--) { ProcessInstance processInstance = processInstanceList.get(i); Date endTime = processInstance.getEndTime() == null ? new Date() : processInstance.getEndTime(); parentTreeViewDto.getInstances().add(new Instance(processInstance.getId(), processInstance.getName(), "", processInstance.getState().toString() , processInstance.getStartTime(), endTime, processInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - processInstance.getStartTime().getTime()))); } List parentTreeViewDtoList = new ArrayList<>(); parentTreeViewDtoList.add(parentTreeViewDto); // Here is the encapsulation task instance for (String startNode : dag.getBeginNode()) { runningNodeMap.put(startNode, parentTreeViewDtoList); } while (Stopper.isRunning()) { Set postNodeList = null; Iterator>> iter = runningNodeMap.entrySet().iterator(); while (iter.hasNext()) { Map.Entry> en = iter.next(); String nodeName = en.getKey(); parentTreeViewDtoList = en.getValue(); TreeViewDto treeViewDto = new TreeViewDto(); treeViewDto.setName(nodeName); TaskNode taskNode = dag.getNode(nodeName); treeViewDto.setType(taskNode.getType()); //set treeViewDto instances for (int i = limit - 1; i >= 0; i--) { ProcessInstance processInstance = processInstanceList.get(i); TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(), nodeName); if (taskInstance == null) { treeViewDto.getInstances().add(new Instance(-1, "not running", "null")); } else { Date startTime = taskInstance.getStartTime() == null ? new Date() : taskInstance.getStartTime(); Date endTime = taskInstance.getEndTime() == null ? new Date() : taskInstance.getEndTime(); int subProcessId = 0; /** * if process is sub process, the return sub id, or sub id=0 */ if (taskInstance.getTaskType().equals(TaskType.SUB_PROCESS.name())) { String taskJson = taskInstance.getTaskJson(); taskNode = JSON.parseObject(taskJson, TaskNode.class); subProcessId = Integer.parseInt(JSON.parseObject( taskNode.getParams()).getString(CMDPARAM_SUB_PROCESS_DEFINE_ID)); } treeViewDto.getInstances().add(new Instance(taskInstance.getId(), taskInstance.getName(), taskInstance.getTaskType(), taskInstance.getState().toString() , taskInstance.getStartTime(), taskInstance.getEndTime(), taskInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessId)); } } for (TreeViewDto pTreeViewDto : parentTreeViewDtoList) { pTreeViewDto.getChildren().add(treeViewDto); } postNodeList = dag.getSubsequentNodes(nodeName); if (postNodeList != null && postNodeList.size() > 0) { for (String nextNodeName : postNodeList) { List treeViewDtoList = waitingRunningNodeMap.get(nextNodeName); if (treeViewDtoList != null && treeViewDtoList.size() > 0) { treeViewDtoList.add(treeViewDto); waitingRunningNodeMap.put(nextNodeName, treeViewDtoList); } else { treeViewDtoList = new ArrayList<>(); treeViewDtoList.add(treeViewDto); waitingRunningNodeMap.put(nextNodeName, treeViewDtoList); } } } runningNodeMap.remove(nodeName); } if (waitingRunningNodeMap == null || waitingRunningNodeMap.size() == 0) { break; } else { runningNodeMap.putAll(waitingRunningNodeMap); waitingRunningNodeMap.clear(); } } result.put(Constants.DATA_LIST, parentTreeViewDto); result.put(Constants.STATUS, Status.SUCCESS); result.put(Constants.MSG, Status.SUCCESS.getMsg()); return result; } /** * Generate the DAG Graph based on the process definition id * * @param processDefinition * @return * @throws Exception */ private DAG genDagGraph(ProcessDefinition processDefinition) throws Exception { String processDefinitionJson = processDefinition.getProcessDefinitionJson(); ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); List taskNodeList = processData.getTasks(); processDefinition.setGlobalParamList(processData.getGlobalParams()); List taskNodeRelations = new ArrayList<>(); // Traverse node information and build relationships for (TaskNode taskNode : taskNodeList) { String preTasks = taskNode.getPreTasks(); List preTasksList = JSONUtils.toList(preTasks, String.class); // If the dependency is not empty if (preTasksList != null) { for (String depNode : preTasksList) { taskNodeRelations.add(new TaskNodeRelation(depNode, taskNode.getName())); } } } ProcessDag processDag = new ProcessDag(); processDag.setEdges(taskNodeRelations); processDag.setNodes(taskNodeList); // Generate concrete Dag to be executed return genDagGraph(processDag); } /** * Generate the DAG of process * * @return DAG */ private DAG genDagGraph(ProcessDag processDag) { DAG dag = new DAG<>(); /** * Add the ndoes */ if (CollectionUtils.isNotEmpty(processDag.getNodes())) { for (TaskNode node : processDag.getNodes()) { dag.addNode(node.getName(), node); } } /** * Add the edges */ if (CollectionUtils.isNotEmpty(processDag.getEdges())) { for (TaskNodeRelation edge : processDag.getEdges()) { dag.addEdge(edge.getStartNode(), edge.getEndNode()); } } return dag; } /** * whether the graph has a ring * * @param taskNodeResponseList * @return */ private boolean graphHasCycle(List taskNodeResponseList) { DAG graph = new DAG<>(); // Fill the vertices for (TaskNode taskNodeResponse : taskNodeResponseList) { graph.addNode(taskNodeResponse.getName(), taskNodeResponse); } // Fill edge relations for (TaskNode taskNodeResponse : taskNodeResponseList) { taskNodeResponse.getPreTasks(); List preTasks = JSONUtils.toList(taskNodeResponse.getPreTasks(),String.class); if (CollectionUtils.isNotEmpty(preTasks)) { for (String preTask : preTasks) { if (!graph.addEdge(preTask, taskNodeResponse.getName())) { return true; } } } } return graph.hasCycle(); } }