未验证 提交 43d9c002 编写于 作者: C caishunfeng 提交者: GitHub

[Cherry-pick][Bug] [Master] WorkflowExecuteRunnable will face a infinite loop...

[Cherry-pick][Bug] [Master] WorkflowExecuteRunnable will face a infinite loop #11838 #11864 (#11949)
Co-authored-by: NYann Ann <2993643785@qq.com>
上级 81f2d6b2
......@@ -28,17 +28,27 @@ import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.auto.service.AutoService;
@AutoService(StateEventHandler.class)
public class TaskTimeoutStateEventHandler implements StateEventHandler {
private static final Logger logger = LoggerFactory.getLogger(TaskTimeoutStateEventHandler.class);
@Override
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent)
throws StateEventHandleError {
TaskMetrics.incTaskTimeout();
workflowExecuteRunnable.checkTaskInstanceByStateEvent(stateEvent);
TaskInstance taskInstance = workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId()).get();
TaskInstance taskInstance =
workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId()).orElseThrow(
() -> new StateEventHandleError(String.format(
"Cannot find the task instance from workflow execute runnable, taskInstanceId: %s",
stateEvent.getTaskInstanceId())));
if (TimeoutFlag.CLOSE == taskInstance.getTaskDefine().getTimeoutFlag()) {
return true;
......@@ -47,10 +57,17 @@ public class TaskTimeoutStateEventHandler implements StateEventHandler {
Map<Long, ITaskProcessor> activeTaskProcessMap = workflowExecuteRunnable.getActiveTaskProcessMap();
if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy
|| TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
ITaskProcessor taskProcessor = activeTaskProcessMap.get(taskInstance.getTaskCode());
taskProcessor.action(TaskAction.TIMEOUT);
if (activeTaskProcessMap.containsKey(taskInstance.getTaskCode())) {
ITaskProcessor taskProcessor = activeTaskProcessMap.get(taskInstance.getTaskCode());
taskProcessor.action(TaskAction.TIMEOUT);
} else {
logger.warn(
"cannot find the task processor for task {}, so skip task processor action.",
taskInstance.getTaskCode());
}
}
if (TaskTimeoutStrategy.WARN == taskTimeoutStrategy || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
if (TaskTimeoutStrategy.WARN == taskTimeoutStrategy
|| TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
workflowExecuteRunnable.processTimeout();
workflowExecuteRunnable.taskTimeout(taskInstance);
}
......
......@@ -288,7 +288,6 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册