diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index ac18975d9676330356f8a1e0719047d18cc8bdc9..4076900f41631774d7a89b56eaa66e69dde35c28 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.dao.entity; +import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT; + import com.fasterxml.jackson.core.type.TypeReference; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; @@ -25,6 +27,7 @@ import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters; +import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import java.io.Serializable; @@ -591,6 +594,26 @@ public class TaskInstance implements Serializable { } } + /** + * whether the retry interval is timed out + * + * @return Boolean + */ + public boolean retryTaskIntervalOverTime() { + if (getState() != ExecutionStatus.FAILURE) { + return true; + } + if (getId() == 0 + || getMaxRetryTimes() == 0 + || getRetryInterval() == 0) { + return true; + } + Date now = new Date(); + long failedTimeInterval = DateUtils.differSec(now, getEndTime()); + // task retry does not over time, return false + return getRetryInterval() * SEC_2_MINUTES_TIME_UNIT < failedTimeInterval; + } + public Priority getTaskInstancePriority() { return taskInstancePriority; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java index 335684272033bff197a02d5e8e4ad29a73ffcf49..7c4b32130d675210c4046cd621c2b05823a083f8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java @@ -122,7 +122,7 @@ public class EventExecuteService extends Thread { continue; } int processInstanceId = workflowExecuteThread.getProcessInstance().getId(); - logger.info("handle process instance : {} events, count:{}", + logger.info("handle process instance : {} , events count:{}", processInstanceId, workflowExecuteThread.eventSize()); logger.info("already exists handler process size:{}", this.eventHandlerMap.size()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java index f205e2ddcee64d3ce56fdaeb3637ed6a9db74db5..f2b10f7898381f7ead7f1aa4e78186b62492727f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java @@ -96,6 +96,10 @@ public class StateWheelExecuteThread extends Thread { return; } } + if (taskInstance.taskCanRetry() && taskInstance.retryTaskIntervalOverTime()) { + processDependCheck(taskInstance); + taskInstanceCheckList.remove(taskInstance.getId()); + } if (taskInstance.isSubProcess() || taskInstance.isDependTask()) { processDependCheck(taskInstance); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 3525ae4d77853cd1ae5105fec0ea2a6c2d221d70..c5012623b5a12de403900de745865238fa111e98 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -349,7 +349,7 @@ public class WorkflowExecuteThread implements Runnable { private boolean taskStateChangeHandler(StateEvent stateEvent) { TaskInstance task = processService.findTaskInstanceById(stateEvent.getTaskInstanceId()); - if (stateEvent.getExecutionStatus().typeIsFinished()) { + if (task.getState().typeIsFinished()) { taskFinished(task); } else if (activeTaskProcessorMaps.containsKey(stateEvent.getTaskInstanceId())) { ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId()); @@ -372,6 +372,18 @@ public class WorkflowExecuteThread implements Runnable { task.getState()); if (task.taskCanRetry()) { addTaskToStandByList(task); + if (!task.retryTaskIntervalOverTime()) { + logger.info("failure task will be submitted: process id: {}, task instance id: {} state:{} retry times:{} / {}, interval:{}", + processInstance.getId(), + task.getId(), + task.getState(), + task.getRetryTimes(), + task.getMaxRetryTimes(), + task.getRetryInterval()); + this.addTimeoutCheck(task); + } else { + submitStandByTask(); + } return; } ProcessInstance processInstance = processService.findProcessInstanceById(this.processInstance.getId()); @@ -650,18 +662,20 @@ public class WorkflowExecuteThread implements Runnable { } private void addTimeoutCheck(TaskInstance taskInstance) { - + if (taskTimeoutCheckList.containsKey(taskInstance.getId())) { + return; + } TaskDefinition taskDefinition = processService.findTaskDefinition( taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion() ); taskInstance.setTaskDefine(taskDefinition); - if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag()) { - this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance); - return; - } - if (taskInstance.isDependTask() || taskInstance.isSubProcess()) { + if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag() || taskInstance.taskCanRetry()) { this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance); + } else { + if (taskInstance.isDependTask() || taskInstance.isSubProcess()) { + this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance); + } } } @@ -1131,7 +1145,9 @@ public class WorkflowExecuteThread implements Runnable { private void addTaskToStandByList(TaskInstance taskInstance) { logger.info("add task to stand by list: {}", taskInstance.getName()); try { - readyToSubmitTaskQueue.put(taskInstance); + if (!readyToSubmitTaskQueue.contains(taskInstance)) { + readyToSubmitTaskQueue.put(taskInstance); + } } catch (Exception e) { logger.error("add task instance to readyToSubmitTaskQueue error, taskName: {}", taskInstance.getName(), e); } @@ -1191,36 +1207,12 @@ public class WorkflowExecuteThread implements Runnable { this.addStateEvent(stateEvent); } } - } public boolean workFlowFinish() { return this.processInstance.getState().typeIsFinished(); } - /** - * whether the retry interval is timed out - * - * @param taskInstance task instance - * @return Boolean - */ - private boolean retryTaskIntervalOverTime(TaskInstance taskInstance) { - if (taskInstance.getState() != ExecutionStatus.FAILURE) { - return true; - } - if (taskInstance.getId() == 0 - || - taskInstance.getMaxRetryTimes() == 0 - || - taskInstance.getRetryInterval() == 0) { - return true; - } - Date now = new Date(); - long failedTimeInterval = DateUtils.differSec(now, taskInstance.getEndTime()); - // task retry does not over time, return false - return taskInstance.getRetryInterval() * SEC_2_MINUTES_TIME_UNIT < failedTimeInterval; - } - /** * handling the list of tasks to be submitted */ @@ -1252,12 +1244,16 @@ public class WorkflowExecuteThread implements Runnable { } DependResult dependResult = getDependResultForTask(task); if (DependResult.SUCCESS == dependResult) { - if (retryTaskIntervalOverTime(task)) { + if (task.retryTaskIntervalOverTime()) { + int originalId = task.getId(); TaskInstance taskInstance = submitTaskExec(task); if (taskInstance == null) { this.taskFailedSubmit = true; } else { removeTaskFromStandbyList(task); + if (taskInstance.getId() != originalId) { + activeTaskProcessorMaps.remove(originalId); + } } } } else if (DependResult.FAILED == dependResult) { diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 2f1f632db7a922a1ca49a329810ddbe0b96a341a..22dd7e89184df7aee798f4afe3295ca3c880bb3d 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -1334,6 +1334,8 @@ public class ProcessService { taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1); } taskInstance.setSubmitTime(null); + taskInstance.setLogPath(null); + taskInstance.setExecutePath(null); taskInstance.setStartTime(null); taskInstance.setEndTime(null); taskInstance.setFlag(Flag.YES); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java index 59a0fe229c78deebe5c14fe1eb551ba5810c6e14..b558d42405c08ec12a51db57ddb75e1c1c34cc9a 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java @@ -111,15 +111,15 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue iterator = this.queue.iterator(); while (iterator.hasNext()) { TaskInstance taskInstance = iterator.next(); - if (taskId == taskInstance.getId()) { + if (taskCode == taskInstance.getTaskCode() + && taskVersion == taskInstance.getTaskDefinitionVersion()) { return true; } }