未验证 提交 41c05b53 编写于 作者: W wind 提交者: GitHub

[cherry-pick-2.0.1][Bug][MasterServer] fix timeout event (#7283)

* fix timeout alert

* fix timeout
Co-authored-by: Ncaishunfeng <534328519@qq.com>
上级 db5367ab
......@@ -130,7 +130,8 @@ public class MasterSchedulerService extends Thread {
NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
stateWheelExecuteThread = new StateWheelExecuteThread(processTimeoutCheckList,
stateWheelExecuteThread = new StateWheelExecuteThread(processService,
processTimeoutCheckList,
taskTimeoutCheckList,
taskRetryCheckList,
this.processInstanceExecMaps,
......
......@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.hadoop.util.ThreadUtil;
......@@ -42,6 +43,7 @@ public class StateWheelExecuteThread extends Thread {
private static final Logger logger = LoggerFactory.getLogger(StateWheelExecuteThread.class);
private ProcessService processService;
private ConcurrentHashMap<Integer, ProcessInstance> processInstanceTimeoutCheckList;
private ConcurrentHashMap<Integer, TaskInstance> taskInstanceTimeoutCheckList;
private ConcurrentHashMap<Integer, TaskInstance> taskInstanceRetryCheckList;
......@@ -49,11 +51,13 @@ public class StateWheelExecuteThread extends Thread {
private int stateCheckIntervalSecs;
public StateWheelExecuteThread(ConcurrentHashMap<Integer, ProcessInstance> processInstanceTimeoutCheckList,
public StateWheelExecuteThread(ProcessService processService,
ConcurrentHashMap<Integer, ProcessInstance> processInstanceTimeoutCheckList,
ConcurrentHashMap<Integer, TaskInstance> taskInstanceTimeoutCheckList,
ConcurrentHashMap<Integer, TaskInstance> taskInstanceRetryCheckList,
ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps,
int stateCheckIntervalSecs) {
this.processService = processService;
this.processInstanceTimeoutCheckList = processInstanceTimeoutCheckList;
this.taskInstanceTimeoutCheckList = taskInstanceTimeoutCheckList;
this.taskInstanceRetryCheckList = taskInstanceRetryCheckList;
......@@ -95,6 +99,10 @@ public class StateWheelExecuteThread extends Thread {
}
for (TaskInstance taskInstance : taskInstanceTimeoutCheckList.values()) {
if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) {
if (taskInstance.getStartTime() == null) {
TaskInstance newTaskInstance = processService.findTaskInstanceById(taskInstance.getId());
taskInstance.setStartTime(newTaskInstance.getStartTime());
}
long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
if (timeRemain < 0) {
addTaskTimeoutEvent(taskInstance);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册