diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java index 240f10ff2c46fbf106a65f94c4e36d72e29c98be..f2cdff500b4c8c286f4201b06f174386fed904ac 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java @@ -28,17 +28,27 @@ import org.apache.dolphinscheduler.server.master.runner.task.TaskAction; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.google.auto.service.AutoService; @AutoService(StateEventHandler.class) public class TaskTimeoutStateEventHandler implements StateEventHandler { + + private static final Logger logger = LoggerFactory.getLogger(TaskTimeoutStateEventHandler.class); + @Override public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent) throws StateEventHandleError { TaskMetrics.incTaskTimeout(); workflowExecuteRunnable.checkTaskInstanceByStateEvent(stateEvent); - TaskInstance taskInstance = workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId()).get(); + TaskInstance taskInstance = + workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId()).orElseThrow( + () -> new StateEventHandleError(String.format( + "Cannot find the task instance from workflow execute runnable, taskInstanceId: %s", + stateEvent.getTaskInstanceId()))); if (TimeoutFlag.CLOSE == taskInstance.getTaskDefine().getTimeoutFlag()) { return true; @@ -47,10 +57,17 @@ public class TaskTimeoutStateEventHandler implements StateEventHandler { Map activeTaskProcessMap = workflowExecuteRunnable.getActiveTaskProcessMap(); if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) { - ITaskProcessor taskProcessor = activeTaskProcessMap.get(taskInstance.getTaskCode()); - taskProcessor.action(TaskAction.TIMEOUT); + if (activeTaskProcessMap.containsKey(taskInstance.getTaskCode())) { + ITaskProcessor taskProcessor = activeTaskProcessMap.get(taskInstance.getTaskCode()); + taskProcessor.action(TaskAction.TIMEOUT); + } else { + logger.warn( + "cannot find the task processor for task {}, so skip task processor action.", + taskInstance.getTaskCode()); + } } - if (TaskTimeoutStrategy.WARN == taskTimeoutStrategy || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) { + if (TaskTimeoutStrategy.WARN == taskTimeoutStrategy + || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) { workflowExecuteRunnable.processTimeout(); workflowExecuteRunnable.taskTimeout(taskInstance); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 09e5318ca52985033cfd7c4a2d4373ce95fe54ea..a7c669a7353ec9f034f043ec0d385a6842a87eff 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -288,7 +288,6 @@ public class WorkflowExecuteRunnable implements Callable { } finally { LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } - } }