diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java index 1c336c89a15b582f1510ce9395b97509c99479c2..5956de2aa03a0d3e931ea0c6f3bb5c1eb7c4e2f4 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java @@ -86,14 +86,14 @@ public enum ExecutionStatus { public boolean typeIsFinished(){ return typeIsSuccess() || typeIsFailure() || typeIsCancel() || typeIsPause() - || typeIsWaittingThread(); + || typeIsStop(); } /** * status is waiting thread * @return status */ - public boolean typeIsWaittingThread(){ + public boolean typeIsWaitingThread(){ return this == WAITTING_THREAD; } @@ -104,6 +104,13 @@ public enum ExecutionStatus { public boolean typeIsPause(){ return this == PAUSE; } + /** + * status is pause + * @return status + */ + public boolean typeIsStop(){ + return this == STOP; + } /** * status is running diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java index 015c20024c06326992dd246adcbe9f417f94c2bc..8be683915c56eacb321df7221a1783452c4221b6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java @@ -146,7 +146,7 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread { if ( allDependentTaskFinish() || taskInstance.getState().typeIsFinished()){ break; } - // updateProcessInstance task instance + // update process task taskInstance = processService.findTaskInstanceById(taskInstance.getId()); processInstance = processService.findProcessInstanceById(processInstance.getId()); Thread.sleep(Constants.SLEEP_TIME_MILLIS); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 6379b3229036b15d8e227f0a96ea5ac576f19678..39a5e1e032d39d4925d3ab8e1a8933a065a0c1f6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -338,7 +338,7 @@ public class MasterExecThread implements Runnable { private void endProcess() { processInstance.setEndTime(new Date()); processService.updateProcessInstance(processInstance); - if(processInstance.getState().typeIsWaittingThread()){ + if(processInstance.getState().typeIsWaitingThread()){ processService.createRecoveryWaitingThreadCommand(null, processInstance); } List taskInstances = processService.findValidTaskListByProcessId(processInstance.getId()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java index 235954632e07a820d3ec61cb3b8c930db7072d15..71c7d959e15a45ebf8c7721d11ac982222e30e50 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java @@ -123,28 +123,16 @@ public class DependentExecute { /** * depend type = depend_all - * skip the condition tasks. - * judge all the task * @return */ private DependResult dependResultByProcessInstance(ProcessInstance processInstance){ - DependResult result = DependResult.FAILED; - List taskNodes = - processService.getTaskNodeListByDefinitionId(processInstance.getProcessDefinitionId()); - if(CollectionUtils.isEmpty(taskNodes)) { - return result; + if(!processInstance.getState().typeIsFinished()){ + return DependResult.WAITING; } - for(TaskNode taskNode:taskNodes){ - if(taskNode.isConditionsTask() - || DagHelper.haveConditionsAfterNode(taskNode.getName(), taskNodes)){ - continue; - } - DependResult tmpResult = getDependTaskResult(taskNode.getName(),processInstance); - if(DependResult.SUCCESS != tmpResult){ - return tmpResult; - } + if(processInstance.getState().typeIsSuccess()){ + return DependResult.SUCCESS; } - return DependResult.SUCCESS; + return DependResult.FAILED; } /** @@ -168,7 +156,11 @@ public class DependentExecute { if(taskInstance == null){ // cannot find task in the process instance // maybe because process instance is running or failed. - result = getDependResultByProcessStateWhenTaskNull(processInstance.getState()); + if(processInstance.getState().typeIsFinished()){ + result = DependResult.FAILED; + }else{ + return DependResult.WAITING; + } }else{ result = getDependResultByState(taskInstance.getState()); } @@ -217,9 +209,7 @@ public class DependentExecute { */ private DependResult getDependResultByState(ExecutionStatus state) { - if(state.typeIsRunning() - || state == ExecutionStatus.SUBMITTED_SUCCESS - || state == ExecutionStatus.WAITTING_THREAD){ + if(!state.typeIsFinished()){ return DependResult.WAITING; }else if(state.typeIsSuccess()){ return DependResult.SUCCESS; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java index a65b0508d8f3c1d6cc8474a4c0f0288942752b6f..66bc3afe84a10c09df8f4354a1a10d303d549384 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java @@ -94,11 +94,12 @@ public class DependentTaskTest { } @Test - public void test() throws Exception{ + public void testDependAll() throws Exception{ TaskInstance taskInstance = getTaskInstance(); String dependString = "{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\",\"depTasks\":\"ALL\",\"projectId\":1,\"definitionList\":[{\"label\":\"C\",\"value\":4},{\"label\":\"B\",\"value\":3},{\"label\":\"A\",\"value\":2}],\"cycle\":\"day\",\"definitionId\":4}],\"relation\":\"AND\"}],\"relation\":\"AND\"}"; taskInstance.setDependency(dependString); + Mockito.when(processService.submitTask(taskInstance)) .thenReturn(taskInstance); DependentTaskExecThread dependentTask = @@ -107,6 +108,54 @@ public class DependentTaskTest { dependentTask.call(); Assert.assertEquals(ExecutionStatus.SUCCESS, dependentTask.getTaskInstance().getState()); + + DateInterval dateInterval =DependentDateUtils.getTodayInterval(new Date()).get(0); + + + Mockito.when(processService + .findLastRunningProcess(4, dateInterval.getStartTime(), + dateInterval.getEndTime())) + .thenReturn(findLastStopProcessInterval()); + DependentTaskExecThread dependentFailure = new DependentTaskExecThread(taskInstance); + dependentFailure.call(); + Assert.assertEquals(ExecutionStatus.FAILURE, dependentFailure.getTaskInstance().getState()); + } + + @Test + public void testDependTask() throws Exception{ + + TaskInstance taskInstance = getTaskInstance(); + String dependString = "{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\",\"depTasks\":\"D\",\"projectId\":1,\"definitionList\":[{\"label\":\"C\",\"value\":4},{\"label\":\"B\",\"value\":3},{\"label\":\"A\",\"value\":2}],\"cycle\":\"day\",\"definitionId\":4}],\"relation\":\"AND\"}],\"relation\":\"AND\"}"; + taskInstance.setDependency(dependString); + Mockito.when(processService.submitTask(taskInstance)) + .thenReturn(taskInstance); + DependentTaskExecThread dependentTask = + new DependentTaskExecThread(taskInstance); + + dependentTask.call(); + + Assert.assertEquals(ExecutionStatus.SUCCESS, dependentTask.getTaskInstance().getState()); + + DateInterval dateInterval =DependentDateUtils.getTodayInterval(new Date()).get(0); + Mockito.when(processService + .findLastRunningProcess(4, dateInterval.getStartTime(), + dateInterval.getEndTime())) + .thenReturn(findLastStopProcessInterval()); + + Mockito.when(processService + .findValidTaskListByProcessId(11)) + .thenReturn(getErrorTaskInstances()); + DependentTaskExecThread dependentFailure = new DependentTaskExecThread(taskInstance); + dependentFailure.call(); + Assert.assertEquals(ExecutionStatus.FAILURE, dependentFailure.getTaskInstance().getState()); + } + + private ProcessInstance findLastStopProcessInterval(){ + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setId(11); + processInstance.setProcessDefinitionId(4); + processInstance.setState(ExecutionStatus.STOP); + return processInstance; } private ProcessInstance findLastProcessInterval(){ @@ -142,7 +191,7 @@ public class DependentTaskTest { return list; } - private List getTaskInstances(){ + private List getErrorTaskInstances(){ List list = new ArrayList<>(); TaskInstance taskInstance = new TaskInstance(); taskInstance.setName("C"); @@ -152,12 +201,23 @@ public class DependentTaskTest { return list; } + private List getTaskInstances(){ + List list = new ArrayList<>(); + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setName("D"); + taskInstance.setState(ExecutionStatus.SUCCESS); + taskInstance.setDependency("1231"); + list.add(taskInstance); + return list; + } + private TaskInstance getTaskInstance(){ TaskInstance taskInstance = new TaskInstance(); taskInstance.setTaskType("DEPENDENT"); taskInstance.setId(252612); taskInstance.setName("C"); taskInstance.setProcessInstanceId(10111); + taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); return taskInstance; } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index f0ae76ea4df108675045133a850381aa0daa601f..6c4356fe968149015c117775179b48ebe25d269a 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -254,7 +254,7 @@ public class ProcessService { //process data check if (null == processData) { logger.error("process data is null"); - return null; + return new ArrayList<>(); } return processData.getTasks();