未验证 提交 3dc58453 编写于 作者: B bao liang 提交者: GitHub

[FIX-3966] The timeout warning does not take effect in sub_process (#3982)

* fix #3966 sub process doesnot send alert mail after process instance ending.

* fix bug 3964: sub_process The timeout warning does not take effect
add timeout warning for sub_process/dependent task.

* fix code smell

* fix code smell

* fix code smell

* update worker group inherit from parent
Co-authored-by: Nbaoliang <baoliang@analysys.com.cn>
上级 292b0fce
...@@ -131,6 +131,7 @@ public class Command { ...@@ -131,6 +131,7 @@ public class Command {
WarningType warningType, WarningType warningType,
int warningGroupId, int warningGroupId,
Date scheduleTime, Date scheduleTime,
String workerGroup,
Priority processInstancePriority) { Priority processInstancePriority) {
this.commandType = commandType; this.commandType = commandType;
this.executorId = executorId; this.executorId = executorId;
...@@ -143,6 +144,7 @@ public class Command { ...@@ -143,6 +144,7 @@ public class Command {
this.failureStrategy = failureStrategy; this.failureStrategy = failureStrategy;
this.startTime = new Date(); this.startTime = new Date();
this.updateTime = new Date(); this.updateTime = new Date();
this.workerGroup = workerGroup;
this.processInstancePriority = processInstancePriority; this.processInstancePriority = processInstancePriority;
} }
......
...@@ -138,6 +138,10 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread { ...@@ -138,6 +138,10 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread {
logger.error("process instance not exists , master task exec thread exit"); logger.error("process instance not exists , master task exec thread exit");
return true; return true;
} }
if (checkTaskTimeout()) {
this.checkTimeoutFlag = !alertTimeout();
handleTimeoutFailed();
}
if(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){ if(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){
cancelTaskInstance(); cancelTaskInstance();
break; break;
......
...@@ -20,8 +20,12 @@ import ch.qos.logback.classic.LoggerContext; ...@@ -20,8 +20,12 @@ import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender; import ch.qos.logback.classic.sift.SiftingAppender;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator; import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
...@@ -34,8 +38,11 @@ import org.slf4j.Logger; ...@@ -34,8 +38,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.dolphinscheduler.common.Constants.*; import static org.apache.dolphinscheduler.common.Constants.*;
import java.util.Date;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import com.alibaba.fastjson.JSON;
/** /**
* master task exec base class * master task exec base class
...@@ -82,6 +89,17 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> { ...@@ -82,6 +89,17 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
* taskUpdateQueue * taskUpdateQueue
*/ */
private TaskPriorityQueue taskUpdateQueue; private TaskPriorityQueue taskUpdateQueue;
/**
* whether need check task time out.
*/
protected boolean checkTimeoutFlag = false;
/**
* task timeout parameters
*/
protected TaskTimeoutParameter taskTimeoutParameter;
/** /**
* constructor of MasterBaseTaskExecThread * constructor of MasterBaseTaskExecThread
* @param taskInstance task instance * @param taskInstance task instance
...@@ -93,6 +111,27 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> { ...@@ -93,6 +111,27 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
this.taskInstance = taskInstance; this.taskInstance = taskInstance;
this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class); this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
this.taskUpdateQueue = SpringApplicationContext.getBean(TaskPriorityQueueImpl.class); this.taskUpdateQueue = SpringApplicationContext.getBean(TaskPriorityQueueImpl.class);
initTaskParams();
}
/**
* init task ordinary parameters
*/
private void initTaskParams() {
initTimeoutParams();
}
/**
* init task timeout parameters
*/
private void initTimeoutParams() {
String taskJson = taskInstance.getTaskJson();
TaskNode taskNode = JSON.parseObject(taskJson, TaskNode.class);
taskTimeoutParameter = taskNode.getTaskTimeoutParameter();
if(taskTimeoutParameter.getEnable()){
checkTimeoutFlag = true;
}
} }
/** /**
...@@ -152,8 +191,6 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> { ...@@ -152,8 +191,6 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
return task; return task;
} }
/** /**
* dispatcht task * dispatcht task
* @param taskInstance taskInstance * @param taskInstance taskInstance
...@@ -196,7 +233,6 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> { ...@@ -196,7 +233,6 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
} }
} }
/** /**
* buildTaskPriorityInfo * buildTaskPriorityInfo
* *
...@@ -272,5 +308,57 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> { ...@@ -272,5 +308,57 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
return logPath; return logPath;
} }
/**
* alert time out
* @return
*/
protected boolean alertTimeout(){
if( TaskTimeoutStrategy.FAILED == this.taskTimeoutParameter.getStrategy()){
return true;
}
logger.warn("process id:{} process name:{} task id: {},name:{} execution time out",
processInstance.getId(), processInstance.getName(), taskInstance.getId(), taskInstance.getName());
// send warn mail
ProcessDefinition processDefine = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
alertDao.sendTaskTimeoutAlert(processInstance.getWarningGroupId(),processDefine.getReceivers(),
processDefine.getReceiversCc(), processInstance.getId(), processInstance.getName(),
taskInstance.getId(),taskInstance.getName());
return true;
}
/**
* handle time out for time out strategy warn&&failed
*/
protected void handleTimeoutFailed(){
if(TaskTimeoutStrategy.WARN == this.taskTimeoutParameter.getStrategy()){
return;
}
logger.info("process id:{} name:{} task id:{} name:{} cancel because of timeout.",
processInstance.getId(), processInstance.getName(), taskInstance.getId(), taskInstance.getName());
this.cancel = true;
}
/**
* check task remain time valid
* @return
*/
protected boolean checkTaskTimeout(){
if (!checkTimeoutFlag || taskInstance.getStartTime() == null){
return false;
}
long remainTime = getRemainTime(taskTimeoutParameter.getInterval() * 60L);
return remainTime <= 0;
}
/**
* get remain time
*
* @return remain time
*/
protected long getRemainTime(long timeoutSeconds) {
Date startTime = taskInstance.getStartTime();
long usedTime = (System.currentTimeMillis() - startTime.getTime()) / 1000;
return timeoutSeconds - usedTime;
}
} }
...@@ -163,9 +163,6 @@ public class MasterExecThread implements Runnable { ...@@ -163,9 +163,6 @@ public class MasterExecThread implements Runnable {
this.nettyRemotingClient = nettyRemotingClient; this.nettyRemotingClient = nettyRemotingClient;
} }
@Override @Override
public void run() { public void run() {
...@@ -174,13 +171,11 @@ public class MasterExecThread implements Runnable { ...@@ -174,13 +171,11 @@ public class MasterExecThread implements Runnable {
logger.info("process instance is not exists"); logger.info("process instance is not exists");
return; return;
} }
// check to see if it's done // check to see if it's done
if (processInstance.getState().typeIsFinished()){ if (processInstance.getState().typeIsFinished()){
logger.info("process instance is done : {}",processInstance.getId()); logger.info("process instance is done : {}",processInstance.getId());
return; return;
} }
try { try {
if (processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess()){ if (processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess()){
// sub process complement data // sub process complement data
...@@ -468,7 +463,7 @@ public class MasterExecThread implements Runnable { ...@@ -468,7 +463,7 @@ public class MasterExecThread implements Runnable {
taskInstance.setAlertFlag(Flag.NO); taskInstance.setAlertFlag(Flag.NO);
// task instance start time // task instance start time
taskInstance.setStartTime(new Date()); taskInstance.setStartTime(null);
// task instance flag // task instance flag
taskInstance.setFlag(Flag.YES); taskInstance.setFlag(Flag.YES);
......
...@@ -17,18 +17,11 @@ ...@@ -17,18 +17,11 @@
package org.apache.dolphinscheduler.server.master.runner; package org.apache.dolphinscheduler.server.master.runner;
import com.alibaba.fastjson.JSON;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
...@@ -122,15 +115,6 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { ...@@ -122,15 +115,6 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
taskInstance = processService.findTaskInstanceById(taskInstance.getId()); taskInstance = processService.findTaskInstanceById(taskInstance.getId());
logger.info("wait task: process id: {}, task id:{}, task name:{} complete", logger.info("wait task: process id: {}, task id:{}, task name:{} complete",
this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName()); this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName());
// task time out
boolean checkTimeout = false;
TaskTimeoutParameter taskTimeoutParameter = getTaskTimeoutParameter();
if(taskTimeoutParameter.getEnable()){
TaskTimeoutStrategy strategy = taskTimeoutParameter.getStrategy();
if(strategy == TaskTimeoutStrategy.WARN || strategy == TaskTimeoutStrategy.WARNFAILED){
checkTimeout = true;
}
}
while (Stopper.isRunning()){ while (Stopper.isRunning()){
try { try {
...@@ -151,18 +135,8 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { ...@@ -151,18 +135,8 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
taskInstanceCacheManager.removeByTaskInstanceId(taskInstance.getId()); taskInstanceCacheManager.removeByTaskInstanceId(taskInstance.getId());
break; break;
} }
if(checkTimeout){ if (checkTaskTimeout()) {
long remainTime = getRemaintime(taskTimeoutParameter.getInterval() * 60L); this.checkTimeoutFlag = !alertTimeout();
if (remainTime < 0) {
logger.warn("task id: {} execution time out",taskInstance.getId());
// process define
ProcessDefinition processDefine = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
// send warn mail
alertDao.sendTaskTimeoutAlert(processInstance.getWarningGroupId(),processDefine.getReceivers(),
processDefine.getReceiversCc(), processInstance.getId(), processInstance.getName(),
taskInstance.getId(),taskInstance.getName());
checkTimeout = false;
}
} }
// updateProcessInstance task instance // updateProcessInstance task instance
taskInstance = processService.findTaskInstanceById(taskInstance.getId()); taskInstance = processService.findTaskInstanceById(taskInstance.getId());
...@@ -249,25 +223,4 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { ...@@ -249,25 +223,4 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
return true; return true;
} }
/**
* get task timeout parameter
* @return TaskTimeoutParameter
*/
private TaskTimeoutParameter getTaskTimeoutParameter(){
String taskJson = taskInstance.getTaskJson();
TaskNode taskNode = JSON.parseObject(taskJson, TaskNode.class);
return taskNode.getTaskTimeoutParameter();
}
/**
* get remain time?s?
*
* @return remain time
*/
private long getRemaintime(long timeoutSeconds) {
Date startTime = taskInstance.getStartTime();
long usedTime = (System.currentTimeMillis() - startTime.getTime()) / 1000;
return timeoutSeconds - usedTime;
}
} }
...@@ -130,19 +130,20 @@ public class SubProcessTaskExecThread extends MasterBaseTaskExecThread { ...@@ -130,19 +130,20 @@ public class SubProcessTaskExecThread extends MasterBaseTaskExecThread {
while (Stopper.isRunning()) { while (Stopper.isRunning()) {
// waiting for subflow process instance establishment // waiting for subflow process instance establishment
if (subProcessInstance == null) { if (subProcessInstance == null) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS); Thread.sleep(Constants.SLEEP_TIME_MILLIS);
if(!setTaskInstanceState()){ if(!setTaskInstanceState()){
continue; continue;
} }
} }
subProcessInstance = processService.findProcessInstanceById(subProcessInstance.getId()); subProcessInstance = processService.findProcessInstanceById(subProcessInstance.getId());
if (checkTaskTimeout()) {
this.checkTimeoutFlag = !alertTimeout();
handleTimeoutFailed();
}
updateParentProcessState(); updateParentProcessState();
if (subProcessInstance.getState().typeIsFinished()){ if (subProcessInstance.getState().typeIsFinished()){
break; break;
} }
if(this.processInstance.getState() == ExecutionStatus.READY_PAUSE){ if(this.processInstance.getState() == ExecutionStatus.READY_PAUSE){
// parent process "ready to pause" , child process "pause" // parent process "ready to pause" , child process "pause"
pauseSubProcess(); pauseSubProcess();
......
...@@ -104,9 +104,7 @@ public class DependentExecute { ...@@ -104,9 +104,7 @@ public class DependentExecute {
ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionId(), ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionId(),
dateInterval); dateInterval);
if(processInstance == null){ if(processInstance == null){
logger.error("cannot find the right process instance: definition id:{}, start:{}, end:{}", return DependResult.WAITING;
dependentItem.getDefinitionId(), dateInterval.getStartTime(), dateInterval.getEndTime() );
return DependResult.FAILED;
} }
// need to check workflow for updates, so get all task and check the task state // need to check workflow for updates, so get all task and check the task state
if(dependentItem.getDepTasks().equals(Constants.DEPENDENT_ALL)){ if(dependentItem.getDepTasks().equals(Constants.DEPENDENT_ALL)){
......
...@@ -90,6 +90,8 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { ...@@ -90,6 +90,8 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class); TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class);
taskExecutionContext.setHost(OSUtils.getHost() + ":" + workerConfig.getListenPort()); taskExecutionContext.setHost(OSUtils.getHost() + ":" + workerConfig.getListenPort());
taskExecutionContext.setStartTime(new Date());
taskExecutionContext.setLogPath(getTaskLogPath(taskExecutionContext));
// local execute path // local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext); String execLocalPath = getExecLocalPath(taskExecutionContext);
...@@ -147,15 +149,14 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { ...@@ -147,15 +149,14 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand(); TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode()); ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode());
ackCommand.setLogPath(getTaskLogPath(taskExecutionContext)); ackCommand.setLogPath(taskExecutionContext.getLogPath() );
ackCommand.setHost(taskExecutionContext.getHost()); ackCommand.setHost(taskExecutionContext.getHost());
ackCommand.setStartTime(new Date()); ackCommand.setStartTime(taskExecutionContext.getStartTime());
if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){ if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){
ackCommand.setExecutePath(null); ackCommand.setExecutePath(null);
}else{ }else{
ackCommand.setExecutePath(taskExecutionContext.getExecutePath()); ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
} }
taskExecutionContext.setLogPath(ackCommand.getLogPath());
return ackCommand; return ackCommand;
} }
......
...@@ -19,13 +19,11 @@ package org.apache.dolphinscheduler.server.worker.runner; ...@@ -19,13 +19,11 @@ package org.apache.dolphinscheduler.server.worker.runner;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import org.apache.commons.collections.MapUtils; import org.apache.commons.collections.MapUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;
...@@ -121,8 +119,6 @@ public class TaskExecuteThread implements Runnable { ...@@ -121,8 +119,6 @@ public class TaskExecuteThread implements Runnable {
taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId())); taskExecutionContext.getTaskInstanceId()));
task = TaskManager.newTask(taskExecutionContext, task = TaskManager.newTask(taskExecutionContext,
taskLogger); taskLogger);
......
...@@ -412,6 +412,7 @@ public class ProcessService { ...@@ -412,6 +412,7 @@ public class ProcessService {
processInstance.getWarningType(), processInstance.getWarningType(),
processInstance.getWarningGroupId(), processInstance.getWarningGroupId(),
processInstance.getScheduleTime(), processInstance.getScheduleTime(),
processInstance.getWorkerGroup(),
processInstance.getProcessInstancePriority() processInstance.getProcessInstancePriority()
); );
saveCommand(command); saveCommand(command);
...@@ -987,6 +988,7 @@ public class ProcessService { ...@@ -987,6 +988,7 @@ public class ProcessService {
parentProcessInstance.getWarningType(), parentProcessInstance.getWarningType(),
parentProcessInstance.getWarningGroupId(), parentProcessInstance.getWarningGroupId(),
parentProcessInstance.getScheduleTime(), parentProcessInstance.getScheduleTime(),
task.getWorkerGroup(),
parentProcessInstance.getProcessInstancePriority() parentProcessInstance.getProcessInstancePriority()
); );
} }
...@@ -1059,7 +1061,7 @@ public class ProcessService { ...@@ -1059,7 +1061,7 @@ public class ProcessService {
taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1 ); taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1 );
} }
taskInstance.setEndTime(null); taskInstance.setEndTime(null);
taskInstance.setStartTime(new Date()); taskInstance.setStartTime(null);
taskInstance.setFlag(Flag.YES); taskInstance.setFlag(Flag.YES);
taskInstance.setHost(null); taskInstance.setHost(null);
taskInstance.setId(0); taskInstance.setId(0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册