diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java index d2a5bc017677b48f4fd5297b05bd6a6c257e1069..06d7d6396f78ea85297997ed8650cb80abd21eb6 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java @@ -19,20 +19,32 @@ package org.apache.dolphinscheduler.server.master.runner.task; import com.google.auto.service.AutoService; import org.apache.dolphinscheduler.common.utils.NetUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; +import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem; import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel; import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils; import org.apache.dolphinscheduler.server.utils.DependentExecute; import org.apache.dolphinscheduler.server.utils.LogUtils; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT; @@ -44,6 +56,12 @@ public class DependentTaskProcessor extends BaseTaskProcessor { protected DependentParameters dependentParameters; + private final ProcessDefinitionMapper processDefinitionMapper = SpringApplicationContext.getBean(ProcessDefinitionMapper.class); + + private final TaskDefinitionMapper taskDefinitionMapper = SpringApplicationContext.getBean(TaskDefinitionMapper.class); + + private final ProjectMapper projectMapper = SpringApplicationContext.getBean(ProjectMapper.class); + /** * dependent task list */ @@ -84,7 +102,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor { taskInstance.setStartTime(new Date()); processService.updateTaskInstance(taskInstance); initDependParameters(); - logger.info("Success initialize dependent task parameters, the dependent data is: {} parameter is {}", dependentDate, dependentParameters); + logger.info("Success initialize dependent task parameters, the dependent data is: {}", dependentDate); return true; } @@ -124,14 +142,50 @@ public class DependentTaskProcessor extends BaseTaskProcessor { */ protected void initDependParameters() { this.dependentParameters = taskInstance.getDependency(); - for (DependentTaskModel taskModel : dependentParameters.getDependTaskList()) { - this.dependentTaskList.add(new DependentExecute(taskModel)); - } if (processInstance.getScheduleTime() != null) { this.dependentDate = this.processInstance.getScheduleTime(); } else { this.dependentDate = new Date(); } + // check dependent project is exist + List dependTaskList = dependentParameters.getDependTaskList(); + Set projectCodes = new HashSet<>(); + Set processDefinitionCodes = new HashSet<>(); + Set taskDefinitionCodes = new HashSet<>(); + dependTaskList.forEach(dependentTaskModel -> { + dependentTaskModel.getDependItemList().forEach(dependentItem -> { + projectCodes.add(dependentItem.getProjectCode()); + processDefinitionCodes.add(dependentItem.getDefinitionCode()); + taskDefinitionCodes.add(dependentItem.getDepTaskCode()); + }); + }); + Map projectCodeMap = projectMapper.queryByCodes(projectCodes).stream().collect(Collectors.toMap(Project::getCode, Function.identity())); + Map processDefinitionMap = processDefinitionMapper.queryByCodes(processDefinitionCodes).stream().collect(Collectors.toMap(ProcessDefinition::getCode, Function.identity())); + Map taskDefinitionMap = taskDefinitionMapper.queryByCodeList(taskDefinitionCodes).stream().collect(Collectors.toMap(TaskDefinition::getCode, Function.identity())); + + for (DependentTaskModel taskModel : dependentParameters.getDependTaskList()) { + logger.info("Add sub dependent check tasks, dependent relation: {}", taskModel.getRelation()); + for (DependentItem dependentItem : taskModel.getDependItemList()) { + Project project = projectCodeMap.get(dependentItem.getProjectCode()); + if (project == null) { + logger.error("The dependent task's project is not exist, dependentItem: {}", dependentItem); + throw new RuntimeException("The dependent task's project is not exist, dependentItem: " + dependentItem); + } + ProcessDefinition processDefinition = processDefinitionMap.get(dependentItem.getDefinitionCode()); + if (processDefinition == null) { + logger.error("The dependent task's workflow is not exist, dependentItem: {}", dependentItem); + throw new RuntimeException("The dependent task's workflow is not exist, dependentItem: " + dependentItem); + } + TaskDefinition taskDefinition = taskDefinitionMap.get(dependentItem.getDepTaskCode()); + if (taskDefinition == null) { + logger.error("The dependent task's taskDefinition is not exist, dependentItem: {}", dependentItem); + throw new RuntimeException("The dependent task's taskDefinition is not exist, dependentItem: " + dependentItem); + } + logger.info("Add dependent task: projectName: {}, workflowName: {}, taskName: {}, dependentKey: {}", + project.getName(), processDefinition.getName(), taskDefinition.getName(), dependentItem.getKey()); + } + this.dependentTaskList.add(new DependentExecute(taskModel)); + } } @Override @@ -162,17 +216,13 @@ public class DependentTaskProcessor extends BaseTaskProcessor { if (!dependResultMap.containsKey(entry.getKey())) { dependResultMap.put(entry.getKey(), entry.getValue()); // save depend result to log - logger.info("dependent item complete, task: {}, result: {}", entry.getKey(), entry.getValue()); + logger.info("dependent item complete, dependentKey: {}, result: {}, dependentDate: {}", entry.getKey(), entry.getValue(), dependentDate); } } if (!dependentExecute.finish(dependentDate)) { finish = false; } } - if (!finish) { - // todo: add information, which dependent doesn't finished - logger.info("The dependent condition doesn't complete at date: {}", dependentDate); - } return finish; } diff --git a/dolphinscheduler-master/src/main/resources/logback-spring.xml b/dolphinscheduler-master/src/main/resources/logback-spring.xml index 95c0de103928de866daa8e0117d3aa6f9a656da0..15c0b80a43cc8656a154a53ab489b63b02f17f9d 100644 --- a/dolphinscheduler-master/src/main/resources/logback-spring.xml +++ b/dolphinscheduler-master/src/main/resources/logback-spring.xml @@ -40,7 +40,7 @@ ${log.base}/${taskAppId}.log - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{96}:[%line] - %messsage%n + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} - %messsage%n UTF-8 diff --git a/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml b/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml index 738b0f647953b2a99aab1c7fc32a5400d7409e7f..8452948d77e322df5c7b9dbb5c2da42d37bb3055 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml +++ b/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml @@ -63,7 +63,7 @@ ${log.base}/${taskAppId}.log - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} [%thread] %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %messsage%n + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} - %messsage%n UTF-8 diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/async/AsyncTaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/async/AsyncTaskExecutionContext.java index 809c13f0138c53cb9016860c7d9c77812f035c22..de415eb007453867955ebfd9b3a27facab2f4e6a 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/async/AsyncTaskExecutionContext.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/async/AsyncTaskExecutionContext.java @@ -36,7 +36,7 @@ public class AsyncTaskExecutionContext implements Delayed { private long currentStartTime; private int executeTimes; private final long executeInterval; - private final long timeout; + private long timeout; public AsyncTaskExecutionContext(@NonNull TaskExecutionContext taskExecutionContext, @@ -62,9 +62,16 @@ public class AsyncTaskExecutionContext implements Delayed { @Override public long getDelay(TimeUnit unit) { - return unit.convert( - Math.min(currentStartTime + executeInterval, timeout) - System.currentTimeMillis(), - TimeUnit.MILLISECONDS); + long nextExecuteTime = currentStartTime + executeInterval; + long delayTime; + if (nextExecuteTime >= timeout) { + // has been timeout, clear the timeoutParams + delayTime = timeout - System.currentTimeMillis(); + timeout = TimeUnit.SECONDS.toMillis(Integer.MAX_VALUE); + } else { + delayTime = nextExecuteTime - System.currentTimeMillis(); + } + return unit.convert(delayTime, TimeUnit.MILLISECONDS); } diff --git a/dolphinscheduler-worker/src/main/resources/logback-spring.xml b/dolphinscheduler-worker/src/main/resources/logback-spring.xml index 09402aeddfb8d193acb0f15a6b6049db434b8ab8..4a233e21b3077df91e4aac6ded5c690265102966 100644 --- a/dolphinscheduler-worker/src/main/resources/logback-spring.xml +++ b/dolphinscheduler-worker/src/main/resources/logback-spring.xml @@ -41,7 +41,7 @@ ${log.base}/${taskAppId}.log - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{96}:[%line] - %messsage%n + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} - %messsage%n UTF-8