未验证 提交 19a67fa4 编写于 作者: W Wenjun Ruan 提交者: GitHub

Merge pull request #56 from ruanwenjun/dev_wenjun_supportTimeoutInAsyncTask

Optimize dependent log
......@@ -19,20 +19,32 @@ package org.apache.dolphinscheduler.server.master.runner.task;
import com.google.auto.service.AutoService;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
import org.apache.dolphinscheduler.server.utils.DependentExecute;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
......@@ -44,6 +56,12 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
protected DependentParameters dependentParameters;
private final ProcessDefinitionMapper processDefinitionMapper = SpringApplicationContext.getBean(ProcessDefinitionMapper.class);
private final TaskDefinitionMapper taskDefinitionMapper = SpringApplicationContext.getBean(TaskDefinitionMapper.class);
private final ProjectMapper projectMapper = SpringApplicationContext.getBean(ProjectMapper.class);
/**
* dependent task list
*/
......@@ -84,7 +102,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
taskInstance.setStartTime(new Date());
processService.updateTaskInstance(taskInstance);
initDependParameters();
logger.info("Success initialize dependent task parameters, the dependent data is: {} parameter is {}", dependentDate, dependentParameters);
logger.info("Success initialize dependent task parameters, the dependent data is: {}", dependentDate);
return true;
}
......@@ -124,14 +142,50 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
*/
protected void initDependParameters() {
this.dependentParameters = taskInstance.getDependency();
for (DependentTaskModel taskModel : dependentParameters.getDependTaskList()) {
this.dependentTaskList.add(new DependentExecute(taskModel));
}
if (processInstance.getScheduleTime() != null) {
this.dependentDate = this.processInstance.getScheduleTime();
} else {
this.dependentDate = new Date();
}
// check dependent project is exist
List<DependentTaskModel> dependTaskList = dependentParameters.getDependTaskList();
Set<Long> projectCodes = new HashSet<>();
Set<Long> processDefinitionCodes = new HashSet<>();
Set<Long> taskDefinitionCodes = new HashSet<>();
dependTaskList.forEach(dependentTaskModel -> {
dependentTaskModel.getDependItemList().forEach(dependentItem -> {
projectCodes.add(dependentItem.getProjectCode());
processDefinitionCodes.add(dependentItem.getDefinitionCode());
taskDefinitionCodes.add(dependentItem.getDepTaskCode());
});
});
Map<Long, Project> projectCodeMap = projectMapper.queryByCodes(projectCodes).stream().collect(Collectors.toMap(Project::getCode, Function.identity()));
Map<Long, ProcessDefinition> processDefinitionMap = processDefinitionMapper.queryByCodes(processDefinitionCodes).stream().collect(Collectors.toMap(ProcessDefinition::getCode, Function.identity()));
Map<Long, TaskDefinition> taskDefinitionMap = taskDefinitionMapper.queryByCodeList(taskDefinitionCodes).stream().collect(Collectors.toMap(TaskDefinition::getCode, Function.identity()));
for (DependentTaskModel taskModel : dependentParameters.getDependTaskList()) {
logger.info("Add sub dependent check tasks, dependent relation: {}", taskModel.getRelation());
for (DependentItem dependentItem : taskModel.getDependItemList()) {
Project project = projectCodeMap.get(dependentItem.getProjectCode());
if (project == null) {
logger.error("The dependent task's project is not exist, dependentItem: {}", dependentItem);
throw new RuntimeException("The dependent task's project is not exist, dependentItem: " + dependentItem);
}
ProcessDefinition processDefinition = processDefinitionMap.get(dependentItem.getDefinitionCode());
if (processDefinition == null) {
logger.error("The dependent task's workflow is not exist, dependentItem: {}", dependentItem);
throw new RuntimeException("The dependent task's workflow is not exist, dependentItem: " + dependentItem);
}
TaskDefinition taskDefinition = taskDefinitionMap.get(dependentItem.getDepTaskCode());
if (taskDefinition == null) {
logger.error("The dependent task's taskDefinition is not exist, dependentItem: {}", dependentItem);
throw new RuntimeException("The dependent task's taskDefinition is not exist, dependentItem: " + dependentItem);
}
logger.info("Add dependent task: projectName: {}, workflowName: {}, taskName: {}, dependentKey: {}",
project.getName(), processDefinition.getName(), taskDefinition.getName(), dependentItem.getKey());
}
this.dependentTaskList.add(new DependentExecute(taskModel));
}
}
@Override
......@@ -162,17 +216,13 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
if (!dependResultMap.containsKey(entry.getKey())) {
dependResultMap.put(entry.getKey(), entry.getValue());
// save depend result to log
logger.info("dependent item complete, task: {}, result: {}", entry.getKey(), entry.getValue());
logger.info("dependent item complete, dependentKey: {}, result: {}, dependentDate: {}", entry.getKey(), entry.getValue(), dependentDate);
}
}
if (!dependentExecute.finish(dependentDate)) {
finish = false;
}
}
if (!finish) {
// todo: add information, which dependent doesn't finished
logger.info("The dependent condition doesn't complete at date: {}", dependentDate);
}
return finish;
}
......
......@@ -40,7 +40,7 @@
<file>${log.base}/${taskAppId}.log</file>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{96}:[%line] - %messsage%n
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} - %messsage%n
</pattern>
<charset>UTF-8</charset>
</encoder>
......
......@@ -63,7 +63,7 @@
<file>${log.base}/${taskAppId}.log</file>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} [%thread] %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %messsage%n
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} - %messsage%n
</pattern>
<charset>UTF-8</charset>
</encoder>
......
......@@ -36,7 +36,7 @@ public class AsyncTaskExecutionContext implements Delayed {
private long currentStartTime;
private int executeTimes;
private final long executeInterval;
private final long timeout;
private long timeout;
public AsyncTaskExecutionContext(@NonNull TaskExecutionContext taskExecutionContext,
......@@ -62,9 +62,16 @@ public class AsyncTaskExecutionContext implements Delayed {
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(
Math.min(currentStartTime + executeInterval, timeout) - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
long nextExecuteTime = currentStartTime + executeInterval;
long delayTime;
if (nextExecuteTime >= timeout) {
// has been timeout, clear the timeoutParams
delayTime = timeout - System.currentTimeMillis();
timeout = TimeUnit.SECONDS.toMillis(Integer.MAX_VALUE);
} else {
delayTime = nextExecuteTime - System.currentTimeMillis();
}
return unit.convert(delayTime, TimeUnit.MILLISECONDS);
}
......
......@@ -41,7 +41,7 @@
<file>${log.base}/${taskAppId}.log</file>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{96}:[%line] - %messsage%n
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} - %messsage%n
</pattern>
<charset>UTF-8</charset>
</encoder>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册