diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index 22ae759bcbb009c512a993a08731d48ff3d00276..ca44ead095b75b39b96824569ac02d8c814d0aab 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -130,7 +130,8 @@ public class MasterSchedulerService extends Thread { NettyClientConfig clientConfig = new NettyClientConfig(); this.nettyRemotingClient = new NettyRemotingClient(clientConfig); - stateWheelExecuteThread = new StateWheelExecuteThread(processTimeoutCheckList, + stateWheelExecuteThread = new StateWheelExecuteThread(processService, + processTimeoutCheckList, taskTimeoutCheckList, taskRetryCheckList, this.processInstanceExecMaps, diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java index 7eca55ccf3381e2a1c4e89bc9f6d43eacef156b4..bc3f257a8091d929bddcd36989535713b53afe04 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.hadoop.util.ThreadUtil; @@ -42,6 +43,7 @@ public class StateWheelExecuteThread extends Thread { private static final Logger logger = LoggerFactory.getLogger(StateWheelExecuteThread.class); + private ProcessService processService; private ConcurrentHashMap processInstanceTimeoutCheckList; private ConcurrentHashMap taskInstanceTimeoutCheckList; private ConcurrentHashMap taskInstanceRetryCheckList; @@ -49,11 +51,13 @@ public class StateWheelExecuteThread extends Thread { private int stateCheckIntervalSecs; - public StateWheelExecuteThread(ConcurrentHashMap processInstanceTimeoutCheckList, + public StateWheelExecuteThread(ProcessService processService, + ConcurrentHashMap processInstanceTimeoutCheckList, ConcurrentHashMap taskInstanceTimeoutCheckList, ConcurrentHashMap taskInstanceRetryCheckList, ConcurrentHashMap processInstanceExecMaps, int stateCheckIntervalSecs) { + this.processService = processService; this.processInstanceTimeoutCheckList = processInstanceTimeoutCheckList; this.taskInstanceTimeoutCheckList = taskInstanceTimeoutCheckList; this.taskInstanceRetryCheckList = taskInstanceRetryCheckList; @@ -95,6 +99,10 @@ public class StateWheelExecuteThread extends Thread { } for (TaskInstance taskInstance : taskInstanceTimeoutCheckList.values()) { if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) { + if (taskInstance.getStartTime() == null) { + TaskInstance newTaskInstance = processService.findTaskInstanceById(taskInstance.getId()); + taskInstance.setStartTime(newTaskInstance.getStartTime()); + } long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT); if (timeRemain < 0) { addTaskTimeoutEvent(taskInstance);