提交 9c70eec4 编写于 作者: W Wenjun Ruan

Add projectName, workflowName, taskName

上级 fc9fe3ad
......@@ -19,6 +19,12 @@ package org.apache.dolphinscheduler.server.master.runner.task;
import com.google.auto.service.AutoService;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
......@@ -28,12 +34,17 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameter
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
import org.apache.dolphinscheduler.server.utils.DependentExecute;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
......@@ -45,6 +56,12 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
protected DependentParameters dependentParameters;
private final ProcessDefinitionMapper processDefinitionMapper = SpringApplicationContext.getBean(ProcessDefinitionMapper.class);
private final TaskDefinitionMapper taskDefinitionMapper = SpringApplicationContext.getBean(TaskDefinitionMapper.class);
private final ProjectMapper projectMapper = SpringApplicationContext.getBean(ProjectMapper.class);
/**
* dependent task list
*/
......@@ -130,10 +147,42 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
} else {
this.dependentDate = new Date();
}
// check dependent project is exist
List<DependentTaskModel> dependTaskList = dependentParameters.getDependTaskList();
Set<Long> projectCodes = new HashSet<>();
Set<Long> processDefinitionCodes = new HashSet<>();
Set<Long> taskDefinitionCodes = new HashSet<>();
dependTaskList.forEach(dependentTaskModel -> {
dependentTaskModel.getDependItemList().forEach(dependentItem -> {
projectCodes.add(dependentItem.getProjectCode());
processDefinitionCodes.add(dependentItem.getDefinitionCode());
taskDefinitionCodes.add(dependentItem.getDepTaskCode());
});
});
Map<Long, Project> projectCodeMap = projectMapper.queryByCodes(projectCodes).stream().collect(Collectors.toMap(Project::getCode, Function.identity()));
Map<Long, ProcessDefinition> processDefinitionMap = processDefinitionMapper.queryByCodes(processDefinitionCodes).stream().collect(Collectors.toMap(ProcessDefinition::getCode, Function.identity()));
Map<Long, TaskDefinition> taskDefinitionMap = taskDefinitionMapper.queryByCodeList(taskDefinitionCodes).stream().collect(Collectors.toMap(TaskDefinition::getCode, Function.identity()));
for (DependentTaskModel taskModel : dependentParameters.getDependTaskList()) {
logger.info("Add sub dependent check tasks, dependent relation: {}", taskModel.getRelation());
for (DependentItem dependentItem : taskModel.getDependItemList()) {
logger.info("dependent task: {}", dependentItem.getKey());
Project project = projectCodeMap.get(dependentItem.getProjectCode());
if (project == null) {
logger.error("The dependent task's project is not exist, dependentItem: {}", dependentItem);
throw new RuntimeException("The dependent task's project is not exist, dependentItem: " + dependentItem);
}
ProcessDefinition processDefinition = processDefinitionMap.get(dependentItem.getDefinitionCode());
if (processDefinition == null) {
logger.error("The dependent task's workflow is not exist, dependentItem: {}", dependentItem);
throw new RuntimeException("The dependent task's workflow is not exist, dependentItem: " + dependentItem);
}
TaskDefinition taskDefinition = taskDefinitionMap.get(dependentItem.getDepTaskCode());
if (taskDefinition == null) {
logger.error("The dependent task's taskDefinition is not exist, dependentItem: {}", dependentItem);
throw new RuntimeException("The dependent task's taskDefinition is not exist, dependentItem: " + dependentItem);
}
logger.info("Add dependent task: projectName: {}, workflowName: {}, taskName: {}, dependentKey: {}",
project.getName(), processDefinition.getName(), taskDefinition.getName(), dependentItem.getKey());
}
this.dependentTaskList.add(new DependentExecute(taskModel));
}
......@@ -167,7 +216,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
if (!dependResultMap.containsKey(entry.getKey())) {
dependResultMap.put(entry.getKey(), entry.getValue());
// save depend result to log
logger.info("dependent item complete, task: {}, result: {}, dependentDate: {}", entry.getKey(), entry.getValue(), dependentDate);
logger.info("dependent item complete, dependentKey: {}, result: {}, dependentDate: {}", entry.getKey(), entry.getValue(), dependentDate);
}
}
if (!dependentExecute.finish(dependentDate)) {
......
......@@ -40,7 +40,7 @@
<file>${log.base}/${taskAppId}.log</file>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{96}:[%line] - %messsage%n
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} - %messsage%n
</pattern>
<charset>UTF-8</charset>
</encoder>
......
......@@ -63,7 +63,7 @@
<file>${log.base}/${taskAppId}.log</file>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} [%thread] %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %messsage%n
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} - %messsage%n
</pattern>
<charset>UTF-8</charset>
</encoder>
......
......@@ -41,7 +41,7 @@
<file>${log.base}/${taskAppId}.log</file>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{96}:[%line] - %messsage%n
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} - %messsage%n
</pattern>
<charset>UTF-8</charset>
</encoder>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册