From 3dc5845389fb2ca13ccc50aa0965b9369d5fb546 Mon Sep 17 00:00:00 2001 From: bao liang <29528966+lenboo@users.noreply.github.com> Date: Fri, 23 Oct 2020 17:36:44 +0800 Subject: [PATCH] [FIX-3966] The timeout warning does not take effect in sub_process (#3982) * fix #3966 sub process doesnot send alert mail after process instance ending. * fix bug 3964: sub_process The timeout warning does not take effect add timeout warning for sub_process/dependent task. * fix code smell * fix code smell * fix code smell * update worker group inherit from parent Co-authored-by: baoliang --- .../dolphinscheduler/dao/entity/Command.java | 2 + .../runner/DependentTaskExecThread.java | 4 + .../runner/MasterBaseTaskExecThread.java | 94 ++++++++++++++++++- .../master/runner/MasterExecThread.java | 7 +- .../master/runner/MasterTaskExecThread.java | 51 +--------- .../runner/SubProcessTaskExecThread.java | 7 +- .../server/utils/DependentExecute.java | 4 +- .../processor/TaskExecuteProcessor.java | 7 +- .../worker/runner/TaskExecuteThread.java | 4 - .../service/process/ProcessService.java | 4 +- 10 files changed, 112 insertions(+), 72 deletions(-) diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java index 7d52dc93f..cba015182 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java @@ -131,6 +131,7 @@ public class Command { WarningType warningType, int warningGroupId, Date scheduleTime, + String workerGroup, Priority processInstancePriority) { this.commandType = commandType; this.executorId = executorId; @@ -143,6 +144,7 @@ public class Command { this.failureStrategy = failureStrategy; this.startTime = new Date(); this.updateTime = new Date(); + this.workerGroup = workerGroup; this.processInstancePriority = processInstancePriority; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java index 8be683915..d4d87b958 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java @@ -138,6 +138,10 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread { logger.error("process instance not exists , master task exec thread exit"); return true; } + if (checkTaskTimeout()) { + this.checkTimeoutFlag = !alertTimeout(); + handleTimeoutFailed(); + } if(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){ cancelTaskInstance(); break; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index 3226d8230..8297213d9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -20,8 +20,12 @@ import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.sift.SiftingAppender; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.AlertDao; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator; @@ -34,8 +38,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.dolphinscheduler.common.Constants.*; +import java.util.Date; import java.util.concurrent.Callable; +import com.alibaba.fastjson.JSON; + /** * master task exec base class @@ -82,6 +89,17 @@ public class MasterBaseTaskExecThread implements Callable { * taskUpdateQueue */ private TaskPriorityQueue taskUpdateQueue; + + /** + * whether need check task time out. + */ + protected boolean checkTimeoutFlag = false; + + /** + * task timeout parameters + */ + protected TaskTimeoutParameter taskTimeoutParameter; + /** * constructor of MasterBaseTaskExecThread * @param taskInstance task instance @@ -93,6 +111,27 @@ public class MasterBaseTaskExecThread implements Callable { this.taskInstance = taskInstance; this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class); this.taskUpdateQueue = SpringApplicationContext.getBean(TaskPriorityQueueImpl.class); + initTaskParams(); + } + + /** + * init task ordinary parameters + */ + private void initTaskParams() { + initTimeoutParams(); + } + + /** + * init task timeout parameters + */ + private void initTimeoutParams() { + String taskJson = taskInstance.getTaskJson(); + TaskNode taskNode = JSON.parseObject(taskJson, TaskNode.class); + taskTimeoutParameter = taskNode.getTaskTimeoutParameter(); + + if(taskTimeoutParameter.getEnable()){ + checkTimeoutFlag = true; + } } /** @@ -152,8 +191,6 @@ public class MasterBaseTaskExecThread implements Callable { return task; } - - /** * dispatcht task * @param taskInstance taskInstance @@ -196,7 +233,6 @@ public class MasterBaseTaskExecThread implements Callable { } } - /** * buildTaskPriorityInfo * @@ -272,5 +308,57 @@ public class MasterBaseTaskExecThread implements Callable { return logPath; } + /** + * alert time out + * @return + */ + protected boolean alertTimeout(){ + if( TaskTimeoutStrategy.FAILED == this.taskTimeoutParameter.getStrategy()){ + return true; + } + logger.warn("process id:{} process name:{} task id: {},name:{} execution time out", + processInstance.getId(), processInstance.getName(), taskInstance.getId(), taskInstance.getName()); + // send warn mail + ProcessDefinition processDefine = processService.findProcessDefineById(processInstance.getProcessDefinitionId()); + alertDao.sendTaskTimeoutAlert(processInstance.getWarningGroupId(),processDefine.getReceivers(), + processDefine.getReceiversCc(), processInstance.getId(), processInstance.getName(), + taskInstance.getId(),taskInstance.getName()); + return true; + } + + /** + * handle time out for time out strategy warn&&failed + */ + protected void handleTimeoutFailed(){ + if(TaskTimeoutStrategy.WARN == this.taskTimeoutParameter.getStrategy()){ + return; + } + logger.info("process id:{} name:{} task id:{} name:{} cancel because of timeout.", + processInstance.getId(), processInstance.getName(), taskInstance.getId(), taskInstance.getName()); + this.cancel = true; + } + + /** + * check task remain time valid + * @return + */ + protected boolean checkTaskTimeout(){ + if (!checkTimeoutFlag || taskInstance.getStartTime() == null){ + return false; + } + long remainTime = getRemainTime(taskTimeoutParameter.getInterval() * 60L); + return remainTime <= 0; + } + + /** + * get remain time + * + * @return remain time + */ + protected long getRemainTime(long timeoutSeconds) { + Date startTime = taskInstance.getStartTime(); + long usedTime = (System.currentTimeMillis() - startTime.getTime()) / 1000; + return timeoutSeconds - usedTime; + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index df7273e7c..4b626e85d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -163,9 +163,6 @@ public class MasterExecThread implements Runnable { this.nettyRemotingClient = nettyRemotingClient; } - - - @Override public void run() { @@ -174,13 +171,11 @@ public class MasterExecThread implements Runnable { logger.info("process instance is not exists"); return; } - // check to see if it's done if (processInstance.getState().typeIsFinished()){ logger.info("process instance is done : {}",processInstance.getId()); return; } - try { if (processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess()){ // sub process complement data @@ -468,7 +463,7 @@ public class MasterExecThread implements Runnable { taskInstance.setAlertFlag(Flag.NO); // task instance start time - taskInstance.setStartTime(new Date()); + taskInstance.setStartTime(null); // task instance flag taskInstance.setFlag(Flag.YES); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java index 8c4c2bac0..dd24a472b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java @@ -17,18 +17,11 @@ package org.apache.dolphinscheduler.server.master.runner; - -import com.alibaba.fastjson.JSON; - import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; -import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; -import org.apache.dolphinscheduler.common.model.TaskNode; -import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; import org.apache.dolphinscheduler.remote.utils.Host; @@ -122,15 +115,6 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { taskInstance = processService.findTaskInstanceById(taskInstance.getId()); logger.info("wait task: process id: {}, task id:{}, task name:{} complete", this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName()); - // task time out - boolean checkTimeout = false; - TaskTimeoutParameter taskTimeoutParameter = getTaskTimeoutParameter(); - if(taskTimeoutParameter.getEnable()){ - TaskTimeoutStrategy strategy = taskTimeoutParameter.getStrategy(); - if(strategy == TaskTimeoutStrategy.WARN || strategy == TaskTimeoutStrategy.WARNFAILED){ - checkTimeout = true; - } - } while (Stopper.isRunning()){ try { @@ -151,18 +135,8 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { taskInstanceCacheManager.removeByTaskInstanceId(taskInstance.getId()); break; } - if(checkTimeout){ - long remainTime = getRemaintime(taskTimeoutParameter.getInterval() * 60L); - if (remainTime < 0) { - logger.warn("task id: {} execution time out",taskInstance.getId()); - // process define - ProcessDefinition processDefine = processService.findProcessDefineById(processInstance.getProcessDefinitionId()); - // send warn mail - alertDao.sendTaskTimeoutAlert(processInstance.getWarningGroupId(),processDefine.getReceivers(), - processDefine.getReceiversCc(), processInstance.getId(), processInstance.getName(), - taskInstance.getId(),taskInstance.getName()); - checkTimeout = false; - } + if (checkTaskTimeout()) { + this.checkTimeoutFlag = !alertTimeout(); } // updateProcessInstance task instance taskInstance = processService.findTaskInstanceById(taskInstance.getId()); @@ -249,25 +223,4 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { return true; } - /** - * get task timeout parameter - * @return TaskTimeoutParameter - */ - private TaskTimeoutParameter getTaskTimeoutParameter(){ - String taskJson = taskInstance.getTaskJson(); - TaskNode taskNode = JSON.parseObject(taskJson, TaskNode.class); - return taskNode.getTaskTimeoutParameter(); - } - - - /** - * get remain time?s? - * - * @return remain time - */ - private long getRemaintime(long timeoutSeconds) { - Date startTime = taskInstance.getStartTime(); - long usedTime = (System.currentTimeMillis() - startTime.getTime()) / 1000; - return timeoutSeconds - usedTime; - } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java index ee290487b..7e485d674 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java @@ -130,19 +130,20 @@ public class SubProcessTaskExecThread extends MasterBaseTaskExecThread { while (Stopper.isRunning()) { // waiting for subflow process instance establishment if (subProcessInstance == null) { - Thread.sleep(Constants.SLEEP_TIME_MILLIS); - if(!setTaskInstanceState()){ continue; } } subProcessInstance = processService.findProcessInstanceById(subProcessInstance.getId()); + if (checkTaskTimeout()) { + this.checkTimeoutFlag = !alertTimeout(); + handleTimeoutFailed(); + } updateParentProcessState(); if (subProcessInstance.getState().typeIsFinished()){ break; } - if(this.processInstance.getState() == ExecutionStatus.READY_PAUSE){ // parent process "ready to pause" , child process "pause" pauseSubProcess(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java index 71c7d959e..7f76baaa5 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java @@ -104,9 +104,7 @@ public class DependentExecute { ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionId(), dateInterval); if(processInstance == null){ - logger.error("cannot find the right process instance: definition id:{}, start:{}, end:{}", - dependentItem.getDefinitionId(), dateInterval.getStartTime(), dateInterval.getEndTime() ); - return DependResult.FAILED; + return DependResult.WAITING; } // need to check workflow for updates, so get all task and check the task state if(dependentItem.getDepTasks().equals(Constants.DEPENDENT_ALL)){ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 5ecc2c7b5..cbd8212ad 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -90,6 +90,8 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class); taskExecutionContext.setHost(OSUtils.getHost() + ":" + workerConfig.getListenPort()); + taskExecutionContext.setStartTime(new Date()); + taskExecutionContext.setLogPath(getTaskLogPath(taskExecutionContext)); // local execute path String execLocalPath = getExecLocalPath(taskExecutionContext); @@ -147,15 +149,14 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand(); ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode()); - ackCommand.setLogPath(getTaskLogPath(taskExecutionContext)); + ackCommand.setLogPath(taskExecutionContext.getLogPath() ); ackCommand.setHost(taskExecutionContext.getHost()); - ackCommand.setStartTime(new Date()); + ackCommand.setStartTime(taskExecutionContext.getStartTime()); if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){ ackCommand.setExecutePath(null); }else{ ackCommand.setExecutePath(taskExecutionContext.getExecutePath()); } - taskExecutionContext.setLogPath(ackCommand.getLogPath()); return ackCommand; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index e0294eb33..fa3e6d299 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -19,13 +19,11 @@ package org.apache.dolphinscheduler.server.worker.runner; import com.alibaba.fastjson.JSONObject; import org.apache.commons.collections.MapUtils; -import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; -import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; @@ -121,8 +119,6 @@ public class TaskExecuteThread implements Runnable { taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId())); - - task = TaskManager.newTask(taskExecutionContext, taskLogger); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 9bc3fa73e..77d4a6d97 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -412,6 +412,7 @@ public class ProcessService { processInstance.getWarningType(), processInstance.getWarningGroupId(), processInstance.getScheduleTime(), + processInstance.getWorkerGroup(), processInstance.getProcessInstancePriority() ); saveCommand(command); @@ -987,6 +988,7 @@ public class ProcessService { parentProcessInstance.getWarningType(), parentProcessInstance.getWarningGroupId(), parentProcessInstance.getScheduleTime(), + task.getWorkerGroup(), parentProcessInstance.getProcessInstancePriority() ); } @@ -1059,7 +1061,7 @@ public class ProcessService { taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1 ); } taskInstance.setEndTime(null); - taskInstance.setStartTime(new Date()); + taskInstance.setStartTime(null); taskInstance.setFlag(Flag.YES); taskInstance.setHost(null); taskInstance.setId(0); -- GitLab