未验证 提交 8fefb9ec 编写于 作者: W Wenjun Ruan 提交者: GitHub

Add dependent task instance log (#11541) (#12014)

* Add dependent task instance log

* Optimize log

* Fix dependent task initialize failed will throw exception

(cherry picked from commit a41c6824)
上级 84876b9f
......@@ -17,25 +17,37 @@
package org.apache.dolphinscheduler.server.master.runner.task;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
import com.google.auto.service.AutoService;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
import org.apache.dolphinscheduler.server.utils.DependentExecute;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import com.google.auto.service.AutoService;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
/**
* dependent task processor
......@@ -45,6 +57,12 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
private DependentParameters dependentParameters;
private final ProcessDefinitionMapper processDefinitionMapper = SpringApplicationContext.getBean(ProcessDefinitionMapper.class);
private final TaskDefinitionMapper taskDefinitionMapper = SpringApplicationContext.getBean(TaskDefinitionMapper.class);
private final ProjectMapper projectMapper = SpringApplicationContext.getBean(ProjectMapper.class);
/**
* dependent task list
*/
......@@ -56,6 +74,10 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
*/
private Map<String, DependResult> dependResultMap = new HashMap<>();
private Map<Long, Project> projectCodeMap = new HashMap<>();
private Map<Long, ProcessDefinition> processDefinitionMap = new HashMap<>();
private Map<Long, TaskDefinition> taskDefinitionMap = new HashMap<>();
/**
* dependent date
*/
......@@ -67,23 +89,31 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
@Override
public boolean submitTask() {
this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval);
try {
this.taskInstance =
processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval);
if (this.taskInstance == null) {
if (this.taskInstance == null) {
return false;
}
this.setTaskExecutionLogger();
logger.info("Dependent task submit success");
taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(),
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
processService.updateTaskInstance(taskInstance);
initDependParameters();
logger.info("Success initialize dependent task parameters, the dependent data is: {}", dependentDate);
return true;
} catch (Exception ex) {
logger.error("Submit/Initialize dependent task error", ex);
return false;
}
this.setTaskExecutionLogger();
taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(),
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
processService.updateTaskInstance(taskInstance);
initDependParameters();
return true;
}
@Override
......@@ -122,14 +152,56 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
*/
private void initDependParameters() {
this.dependentParameters = taskInstance.getDependency();
for (DependentTaskModel taskModel : dependentParameters.getDependTaskList()) {
this.dependentTaskList.add(new DependentExecute(taskModel.getDependItemList(), taskModel.getRelation()));
}
if (processInstance.getScheduleTime() != null) {
this.dependentDate = this.processInstance.getScheduleTime();
} else {
this.dependentDate = new Date();
}
// check dependent project is exist
List<DependentTaskModel> dependTaskList = dependentParameters.getDependTaskList();
Set<Long> projectCodes = new HashSet<>();
Set<Long> processDefinitionCodes = new HashSet<>();
Set<Long> taskDefinitionCodes = new HashSet<>();
dependTaskList.forEach(dependentTaskModel -> {
dependentTaskModel.getDependItemList().forEach(dependentItem -> {
projectCodes.add(dependentItem.getProjectCode());
processDefinitionCodes.add(dependentItem.getDefinitionCode());
taskDefinitionCodes.add(dependentItem.getDepTaskCode());
});
});
projectCodeMap = projectMapper.queryByCodes(projectCodes).stream().collect(Collectors.toMap(Project::getCode, Function.identity()));
processDefinitionMap = processDefinitionMapper.queryByCodes(processDefinitionCodes).stream().collect(Collectors.toMap(ProcessDefinition::getCode, Function.identity()));
taskDefinitionMap = taskDefinitionMapper.queryByCodeList(taskDefinitionCodes).stream().collect(Collectors.toMap(TaskDefinition::getCode, Function.identity()));
for (DependentTaskModel taskModel : dependentParameters.getDependTaskList()) {
logger.info("Add sub dependent check tasks, dependent relation: {}", taskModel.getRelation());
for (DependentItem dependentItem : taskModel.getDependItemList()) {
Project project = projectCodeMap.get(dependentItem.getProjectCode());
if (project == null) {
logger.error("The dependent task's project is not exist, dependentItem: {}", dependentItem);
throw new RuntimeException("The dependent task's project is not exist, dependentItem: " + dependentItem);
}
ProcessDefinition processDefinition = processDefinitionMap.get(dependentItem.getDefinitionCode());
if (processDefinition == null) {
logger.error("The dependent task's workflow is not exist, dependentItem: {}", dependentItem);
throw new RuntimeException("The dependent task's workflow is not exist, dependentItem: " + dependentItem);
}
if (dependentItem.getDepTaskCode() == Constants.DEPENDENT_ALL_TASK_CODE) {
logger.info("Add dependent task: projectName: {}, workflowName: {}, taskName: ALL, dependentKey: {}",
project.getName(), processDefinition.getName(), dependentItem.getKey());
} else {
TaskDefinition taskDefinition = taskDefinitionMap.get(dependentItem.getDepTaskCode());
if (taskDefinition == null) {
logger.error("The dependent task's taskDefinition is not exist, dependentItem: {}", dependentItem);
throw new RuntimeException("The dependent task's taskDefinition is not exist, dependentItem: " + dependentItem);
}
logger.info("Add dependent task: projectName: {}, workflowName: {}, taskName: {}, dependentKey: {}",
project.getName(), processDefinition.getName(), taskDefinition.getName(), dependentItem.getKey());
}
}
this.dependentTaskList.add(new DependentExecute(taskModel.getDependItemList(), taskModel.getRelation()));
}
}
@Override
......@@ -159,8 +231,8 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
for (Map.Entry<String, DependResult> entry : dependentExecute.getDependResultMap().entrySet()) {
if (!dependResultMap.containsKey(entry.getKey())) {
dependResultMap.put(entry.getKey(), entry.getValue());
//save depend result to log
logger.info("dependent item complete, task: {}, result: {}", entry.getKey(), entry.getValue());
// save depend result to log
logger.info("dependent item complete, dependentKey: {}, result: {}, dependentDate: {}", entry.getKey(), entry.getValue(), dependentDate);
}
}
if (!dependentExecute.finish(dependentDate)) {
......@@ -182,7 +254,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
dependResultList.add(dependResult);
}
result = DependentUtils.getDependResultForRelation(this.dependentParameters.getRelation(), dependResultList);
logger.info("dependent task completed, dependent result: {}", result);
logger.info("Dependent task completed, dependent result: {}", result);
return result;
}
......
......@@ -17,29 +17,15 @@
package org.apache.dolphinscheduler.plugin.task.api.model;
import lombok.Data;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation;
import java.util.List;
@Data
public class DependentTaskModel {
private List<DependentItem> dependItemList;
private DependentRelation relation;
public List<DependentItem> getDependItemList() {
return dependItemList;
}
public void setDependItemList(List<DependentItem> dependItemList) {
this.dependItemList = dependItemList;
}
public DependentRelation getRelation() {
return relation;
}
public void setRelation(DependentRelation relation) {
this.relation = relation;
}
}
......@@ -17,11 +17,15 @@
package org.apache.dolphinscheduler.plugin.task.api.parameters;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
import java.util.List;
@Data
@EqualsAndHashCode(callSuper = true)
public class DependentParameters extends AbstractParameters {
private List<DependentTaskModel> dependTaskList;
......@@ -32,19 +36,4 @@ public class DependentParameters extends AbstractParameters {
return true;
}
public List<DependentTaskModel> getDependTaskList() {
return dependTaskList;
}
public void setDependTaskList(List<DependentTaskModel> dependTaskList) {
this.dependTaskList = dependTaskList;
}
public DependentRelation getRelation() {
return relation;
}
public void setRelation(DependentRelation relation) {
this.relation = relation;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册