diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index c0bc3aaea819f6bb4001ff21c5680ed3f506b681..14b9a3aea6753198b7200397caa27536ba749c63 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.runner; import com.alibaba.fastjson.JSON; import com.google.common.collect.Lists; +import net.sf.jsqlparser.expression.NextValExpression; import org.apache.commons.io.FileUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.*; @@ -663,7 +664,6 @@ public class MasterExecThread implements Runnable { if(startNodes.contains(taskName)){ return DependResult.SUCCESS; } - TaskNode taskNode = dag.getNode(taskName); List depNameList = taskNode.getDepList(); for(String depsNode : depNameList ){ @@ -672,25 +672,30 @@ public class MasterExecThread implements Runnable { || skipTaskNodeList.containsKey(depsNode)){ continue; } - // dependencies must be fully completed + // all the dependencies must be completed if(!completeTaskList.containsKey(depsNode)){ return DependResult.WAITING; } + // depend node has already complete. ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState(); - // conditions task would not return failed. - if(depTaskState.typeIsFailure() - && !DagHelper.haveConditionsAfterNode(depsNode, dag ) - && !dag.getNode(depsNode).isConditionsTask()){ - return DependResult.FAILED; - } - if(depTaskState.typeIsPause() || depTaskState.typeIsCancel()){ return DependResult.WAITING; } + // ignore task state if current task is condition + if(taskNode.isConditionsTask()){ + continue; + } + if(dag.getNode(depsNode).isConditionsTask()){ + //condition task need check the branch to run + List nextTaskList = parseConditionTask(depsNode); + if(!nextTaskList.contains(taskName)){ + return DependResult.FAILED; + } + }else if(depTaskState.typeIsFailure()){ + return DependResult.FAILED; + } } - logger.info("taskName: {} completeDependTaskList: {}", taskName, Arrays.toString(completeTaskList.keySet().toArray())); - return DependResult.SUCCESS; }