From 43d9c00259c32020bc442a0f436ed12a2fd8b5bd Mon Sep 17 00:00:00 2001 From: caishunfeng Date: Thu, 15 Sep 2022 10:36:34 +0800 Subject: [PATCH] [Cherry-pick][Bug] [Master] WorkflowExecuteRunnable will face a infinite loop #11838 #11864 (#11949) Co-authored-by: Yann Ann <2993643785@qq.com> --- .../event/TaskTimeoutStateEventHandler.java | 25 ++++++++++++++++--- .../runner/WorkflowExecuteRunnable.java | 1 - 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java index 240f10ff2..f2cdff500 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java @@ -28,17 +28,27 @@ import org.apache.dolphinscheduler.server.master.runner.task.TaskAction; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.google.auto.service.AutoService; @AutoService(StateEventHandler.class) public class TaskTimeoutStateEventHandler implements StateEventHandler { + + private static final Logger logger = LoggerFactory.getLogger(TaskTimeoutStateEventHandler.class); + @Override public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent) throws StateEventHandleError { TaskMetrics.incTaskTimeout(); workflowExecuteRunnable.checkTaskInstanceByStateEvent(stateEvent); - TaskInstance taskInstance = workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId()).get(); + TaskInstance taskInstance = + workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId()).orElseThrow( + () -> new StateEventHandleError(String.format( + "Cannot find the task instance from workflow execute runnable, taskInstanceId: %s", + stateEvent.getTaskInstanceId()))); if (TimeoutFlag.CLOSE == taskInstance.getTaskDefine().getTimeoutFlag()) { return true; @@ -47,10 +57,17 @@ public class TaskTimeoutStateEventHandler implements StateEventHandler { Map activeTaskProcessMap = workflowExecuteRunnable.getActiveTaskProcessMap(); if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) { - ITaskProcessor taskProcessor = activeTaskProcessMap.get(taskInstance.getTaskCode()); - taskProcessor.action(TaskAction.TIMEOUT); + if (activeTaskProcessMap.containsKey(taskInstance.getTaskCode())) { + ITaskProcessor taskProcessor = activeTaskProcessMap.get(taskInstance.getTaskCode()); + taskProcessor.action(TaskAction.TIMEOUT); + } else { + logger.warn( + "cannot find the task processor for task {}, so skip task processor action.", + taskInstance.getTaskCode()); + } } - if (TaskTimeoutStrategy.WARN == taskTimeoutStrategy || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) { + if (TaskTimeoutStrategy.WARN == taskTimeoutStrategy + || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) { workflowExecuteRunnable.processTimeout(); workflowExecuteRunnable.taskTimeout(taskInstance); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 09e5318ca..a7c669a73 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -288,7 +288,6 @@ public class WorkflowExecuteRunnable implements Callable { } finally { LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } - } } -- GitLab