未验证 提交 0be6ad3a 编写于 作者: W Wenjun Ruan 提交者: GitHub

Fix submit dependent task failed doesn't catch exception (#62)

* Set process instance and task priority default value as 2 (#11539)

(cherry picked from commit 2862f5b6)

* Fix submit dependent task failed doesn't catch exception
上级 cdf98acc
......@@ -46,6 +46,7 @@ import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_ALL_TASK_CODE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
/**
......@@ -84,26 +85,31 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
@Override
public boolean submitTask() {
this.taskInstance =
processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval);
try {
this.taskInstance =
processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval);
if (this.taskInstance == null) {
if (this.taskInstance == null) {
return false;
}
this.setTaskExecutionLogger();
logger.info("Dependent task submit success");
taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(),
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
processService.updateTaskInstance(taskInstance);
initDependParameters();
logger.info("Success initialize dependent task parameters, the dependent data is: {}", dependentDate);
return true;
} catch (Exception ex) {
logger.info("Submit/initialize dependent task failed", ex);
return false;
}
this.setTaskExecutionLogger();
logger.info("Dependent task submit success");
taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(),
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
processService.updateTaskInstance(taskInstance);
initDependParameters();
logger.info("Success initialize dependent task parameters, the dependent data is: {}", dependentDate);
return true;
}
@Override
......@@ -176,13 +182,18 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
logger.error("The dependent task's workflow is not exist, dependentItem: {}", dependentItem);
throw new RuntimeException("The dependent task's workflow is not exist, dependentItem: " + dependentItem);
}
TaskDefinition taskDefinition = taskDefinitionMap.get(dependentItem.getDepTaskCode());
if (taskDefinition == null) {
logger.error("The dependent task's taskDefinition is not exist, dependentItem: {}", dependentItem);
throw new RuntimeException("The dependent task's taskDefinition is not exist, dependentItem: " + dependentItem);
if (dependentItem.getDepTaskCode() == DEPENDENT_ALL_TASK_CODE) {
logger.info("Add dependent task: projectName: {}, workflowName: {}, taskName: ALL, dependentKey: {}",
project.getName(), processDefinition.getName(), dependentItem.getKey());
} else {
TaskDefinition taskDefinition = taskDefinitionMap.get(dependentItem.getDepTaskCode());
if (dependentItem.getDepTaskCode() != DEPENDENT_ALL_TASK_CODE && taskDefinition == null) {
logger.error("The dependent task's taskDefinition is not exist, dependentItem: {}", dependentItem);
throw new RuntimeException("The dependent task's taskDefinition is not exist, dependentItem: " + dependentItem);
}
logger.info("Add dependent task: projectName: {}, workflowName: {}, taskName: {}, dependentKey: {}",
project.getName(), processDefinition.getName(), taskDefinition.getName(), dependentItem.getKey());
}
logger.info("Add dependent task: projectName: {}, workflowName: {}, taskName: {}, dependentKey: {}",
project.getName(), processDefinition.getName(), taskDefinition.getName(), dependentItem.getKey());
}
this.dependentTaskList.add(new DependentExecute(taskModel));
}
......
......@@ -17,12 +17,11 @@
package org.apache.dolphinscheduler.plugin.task.api.model;
import lombok.Data;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
/**
* dependent item
*/
@Data
public class DependentItem {
private long projectCode;
private long definitionCode;
......@@ -40,59 +39,4 @@ public class DependentItem {
getDateValue());
}
public long getProjectCode() {
return projectCode;
}
public void setProjectCode(long projectCode) {
this.projectCode = projectCode;
}
public long getDefinitionCode() {
return definitionCode;
}
public void setDefinitionCode(long definitionCode) {
this.definitionCode = definitionCode;
}
public long getDepTaskCode() {
return depTaskCode;
}
public void setDepTaskCode(long depTaskCode) {
this.depTaskCode = depTaskCode;
}
public String getCycle() {
return cycle;
}
public void setCycle(String cycle) {
this.cycle = cycle;
}
public String getDateValue() {
return dateValue;
}
public void setDateValue(String dateValue) {
this.dateValue = dateValue;
}
public DependResult getDependResult() {
return dependResult;
}
public void setDependResult(DependResult dependResult) {
this.dependResult = dependResult;
}
public ExecutionStatus getStatus() {
return status;
}
public void setStatus(ExecutionStatus status) {
this.status = status;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册