diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java index 1f85432bd262fbd3665018a34f4630d83eab23b6..31e457f105e00f7f6124ffa1ce2c02006eb5c4f3 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java @@ -39,7 +39,7 @@ public enum TaskType { */ SHELL(0, "shell"), SQL(1, "sql"), - SUB_PROCESS(2, "sub process"), + SUB_PROCESS(2, "sub_process"), PROCEDURE(3, "procedure"), MR(4, "mr"), SPARK(5, "spark"), diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DependentUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DependentUtilsTest.java index 43745c4e3c73d089a8e1abca1ff6d0e1fcf8494c..a3ee26e18bc7676465fe60e3c00bba18b2c6ba82 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DependentUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DependentUtilsTest.java @@ -32,6 +32,7 @@ import java.util.List; public class DependentUtilsTest { private static final Logger logger = LoggerFactory.getLogger(ShellExecutorTest.class); + @Test public void getDependResultForRelation() { //failed diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index 1d5a1dafd51bd173175a8c6e85b95d6bf602ac74..53b56e54b2da7f571cf0e4ff7c6ab3604bdb9dd8 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -377,9 +377,6 @@ public class TaskInstance implements Serializable { } - public boolean isSubProcess(){ - return TaskType.SUB_PROCESS.equals(TaskType.valueOf(this.taskType)); - } public String getDependency(){ @@ -458,6 +455,18 @@ public class TaskInstance implements Serializable { return resources; } + public boolean isSubProcess(){ + return TaskType.SUB_PROCESS.equals(TaskType.valueOf(this.taskType)); + } + + public boolean isDependTask(){ + return TaskType.DEPENDENT.equals(TaskType.valueOf(this.taskType)); + } + + public boolean isConditionsTask(){ + return TaskType.CONDITIONS.equals(TaskType.valueOf(this.taskType)); + } + public void setResources(List resources) { this.resources = resources; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java index 7a4dc655f7b6f3c21a2e8815fa2552ddb9216c41..1133cadbe7752f82090e4997b69df848369656cc 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java @@ -361,4 +361,48 @@ public class DagHelper { processDag.setNodes(taskNodeList); return processDag; } + + /** + * is there have conditions after the parent node + * @param parentNodeName + * @return + */ + public static boolean haveConditionsAfterNode(String parentNodeName, + DAG dag + ){ + boolean result = false; + Set subsequentNodes = dag.getSubsequentNodes(parentNodeName); + if(CollectionUtils.isEmpty(subsequentNodes)){ + return result; + } + for(String nodeName : subsequentNodes){ + TaskNode taskNode = dag.getNode(nodeName); + List preTasksList = JSONUtils.toList(taskNode.getPreTasks(), String.class); + if(preTasksList.contains(parentNodeName) && taskNode.isConditionsTask()){ + return true; + } + } + return result; + } + + /** + * is there have conditions after the parent node + * @param parentNodeName + * @return + */ + public static boolean haveConditionsAfterNode(String parentNodeName, + List taskNodes + ){ + boolean result = false; + if(CollectionUtils.isEmpty(taskNodes)){ + return result; + } + for(TaskNode taskNode : taskNodes){ + List preTasksList = JSONUtils.toList(taskNode.getPreTasks(), String.class); + if(preTasksList.contains(parentNodeName) && taskNode.isConditionsTask()){ + return true; + } + } + return result; + } } diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java index e165da1e88c8b6880c395a83c233806cf88786a0..9c596708725af134d27db12f84c4de91f9c5236c 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java @@ -29,11 +29,21 @@ public class TaskInstanceTest { TaskInstance taskInstance = new TaskInstance(); //sub process - taskInstance.setTaskType("sub process"); + taskInstance.setTaskType("SUB_PROCESS"); Assert.assertTrue(taskInstance.isSubProcess()); //not sub process - taskInstance.setTaskType("http"); + taskInstance.setTaskType("HTTP"); Assert.assertFalse(taskInstance.isSubProcess()); + + //sub process + taskInstance.setTaskType("CONDITIONS"); + Assert.assertTrue(taskInstance.isConditionsTask()); + + //sub process + taskInstance.setTaskType("DEPENDENT"); + Assert.assertTrue(taskInstance.isDependTask()); + + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/conditions/ConditionsTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java similarity index 50% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/conditions/ConditionsTask.java rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java index 2f234cdc762425c8f5dc53eaaa9e1a7dc8d2d474..7e3950df1fcae33dfa3d40b43e40a5c30b1ccfe5 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/conditions/ConditionsTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java @@ -14,32 +14,28 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.worker.task.conditions; +package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.DependResult; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.model.DependentItem; import org.apache.dolphinscheduler.common.model.DependentTaskModel; -import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; import org.apache.dolphinscheduler.common.utils.DependentUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; -import org.apache.dolphinscheduler.server.worker.task.AbstractTask; -import org.apache.dolphinscheduler.server.worker.task.TaskProps; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.process.ProcessService; -import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -public class ConditionsTask extends AbstractTask { +public class ConditionsTaskExecThread extends MasterBaseTaskExecThread { /** @@ -48,66 +44,51 @@ public class ConditionsTask extends AbstractTask { private DependentParameters dependentParameters; /** - * process dao - */ - private ProcessService processService; - - /** - * taskInstance - */ - private TaskInstance taskInstance; - - /** - * + * complete task map */ private Map completeTaskList = new ConcurrentHashMap<>(); - /** - * taskExecutionContext + * condition result */ - private TaskExecutionContext taskExecutionContext; + private DependResult conditionResult; /** - * constructor - * @param taskExecutionContext taskExecutionContext + * constructor of MasterBaseTaskExecThread * - * @param logger logger + * @param taskInstance task instance */ - public ConditionsTask(TaskExecutionContext taskExecutionContext, Logger logger) { - super(taskExecutionContext, logger); - this.taskExecutionContext = taskExecutionContext; + public ConditionsTaskExecThread(TaskInstance taskInstance) { + super(taskInstance); } @Override - public void init() throws Exception { - logger.info("conditions task initialize"); - - this.processService = SpringApplicationContext.getBean(ProcessService.class); - - this.dependentParameters = JSONUtils.parseObject(taskExecutionContext. - getDependenceTaskExecutionContext() - .getDependence(), - DependentParameters.class); - - this.taskInstance = processService.findTaskInstanceById(taskExecutionContext.getTaskInstanceId()); - - if(taskInstance == null){ - throw new Exception("cannot find the task instance!"); - } - - List taskInstanceList = processService.findValidTaskListByProcessId(taskInstance.getProcessInstanceId()); - for(TaskInstance task : taskInstanceList){ - this.completeTaskList.putIfAbsent(task.getName(), task.getState()); + public Boolean submitWaitComplete() { + try{ + this.taskInstance = submit(); + logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, + taskInstance.getProcessDefinitionId(), + taskInstance.getProcessInstanceId(), + taskInstance.getId())); + String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance)); + Thread.currentThread().setName(threadLoggerInfoName); + initTaskParameters(); + logger.info("dependent task start"); + waitTaskQuit(); + updateTaskState(); + }catch (Exception e){ + logger.error("conditions task run exception" , e); } + return true; } - @Override - public void handle() throws Exception { - - String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, - taskExecutionContext.getTaskAppId()); - Thread.currentThread().setName(threadLoggerInfoName); + private void waitTaskQuit() { + List taskInstances = processService.findValidTaskListByProcessId( + taskInstance.getProcessInstanceId() + ); + for(TaskInstance task : taskInstances){ + completeTaskList.putIfAbsent(task.getName(), task.getState()); + } List modelResultList = new ArrayList<>(); for(DependentTaskModel dependentTaskModel : dependentParameters.getDependTaskList()){ @@ -119,14 +100,43 @@ public class ConditionsTask extends AbstractTask { DependResult modelResult = DependentUtils.getDependResultForRelation(dependentTaskModel.getRelation(), itemDependResult); modelResultList.add(modelResult); } - DependResult result = DependentUtils.getDependResultForRelation( + conditionResult = DependentUtils.getDependResultForRelation( dependentParameters.getRelation(), modelResultList ); - logger.info("the conditions task depend result : {}", result); - exitStatusCode = (result == DependResult.SUCCESS) ? - Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE; + logger.info("the conditions task depend result : {}", conditionResult); + } + + /** + * + */ + private void updateTaskState() { + ExecutionStatus status; + if(this.cancel){ + status = ExecutionStatus.KILL; + }else{ + status = (conditionResult == DependResult.SUCCESS) ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE; + } + taskInstance.setState(status); + taskInstance.setEndTime(new Date()); + processService.updateTaskInstance(taskInstance); + } + + private void initTaskParameters() { + this.taskInstance.setLogPath(getTaskLogPath(taskInstance)); + this.taskInstance.setHost(OSUtils.getHost() + Constants.COLON + masterConfig.getListenPort()); + taskInstance.setState(ExecutionStatus.RUNNING_EXEUTION); + taskInstance.setStartTime(new Date()); + this.processService.saveTaskInstance(taskInstance); + + this.dependentParameters = JSONUtils.parseObject(this.taskInstance.getDependency(), DependentParameters.class); } + + /** + * depend result for depend item + * @param item + * @return + */ private DependResult getDependResultForItem(DependentItem item){ DependResult dependResult = DependResult.SUCCESS; @@ -137,16 +147,13 @@ public class ConditionsTask extends AbstractTask { } ExecutionStatus executionStatus = completeTaskList.get(item.getDepTasks()); if(executionStatus != item.getStatus()){ - logger.info("depend item : {} expect status: {}, actual status: {}" ,item.getDepTasks(), item.getStatus().toString(), executionStatus.toString()); + logger.info("depend item : {} expect status: {}, actual status: {}" ,item.getDepTasks(), item.getStatus(), executionStatus); dependResult = DependResult.FAILED; } - logger.info("depend item: {}, depend result: {}", - item.getDepTasks(), dependResult); + logger.info("dependent item complete {} {},{}", + Constants.DEPENDENT_SPLIT, item.getDepTasks(), dependResult); return dependResult; } - @Override - public AbstractParameters getParameters() { - return null; - } -} \ No newline at end of file + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java new file mode 100644 index 0000000000000000000000000000000000000000..015c20024c06326992dd246adcbe9f417f94c2bc --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.master.runner; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.DependResult; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.model.DependentTaskModel; +import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; +import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.utils.DependentUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.utils.DependentExecute; +import org.slf4j.LoggerFactory; + +import java.util.*; + +import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT; + +public class DependentTaskExecThread extends MasterBaseTaskExecThread { + + private DependentParameters dependentParameters; + + /** + * dependent task list + */ + private List dependentTaskList = new ArrayList<>(); + + /** + * depend item result map + * save the result to log file + */ + private Map dependResultMap = new HashMap<>(); + + + /** + * dependent date + */ + private Date dependentDate; + + /** + * constructor of MasterBaseTaskExecThread + * + * @param taskInstance task instance + */ + public DependentTaskExecThread(TaskInstance taskInstance) { + super(taskInstance); + } + + + @Override + public Boolean submitWaitComplete() { + try{ + logger.info("dependent task start"); + this.taskInstance = submit(); + logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, + taskInstance.getProcessDefinitionId(), + taskInstance.getProcessInstanceId(), + taskInstance.getId())); + String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance)); + Thread.currentThread().setName(threadLoggerInfoName); + initTaskParameters(); + initDependParameters(); + waitTaskQuit(); + updateTaskState(); + }catch (Exception e){ + logger.error("dependent task run exception" , e); + } + return true; + } + + /** + * init dependent parameters + */ + private void initDependParameters() { + + this.dependentParameters = JSONUtils.parseObject(this.taskInstance.getDependency(), + DependentParameters.class); + + for(DependentTaskModel taskModel : dependentParameters.getDependTaskList()){ + this.dependentTaskList.add(new DependentExecute( + taskModel.getDependItemList(), taskModel.getRelation())); + } + if(this.processInstance.getScheduleTime() != null){ + this.dependentDate = this.processInstance.getScheduleTime(); + }else{ + this.dependentDate = new Date(); + } + } + + /** + * + */ + private void updateTaskState() { + ExecutionStatus status; + if(this.cancel){ + status = ExecutionStatus.KILL; + }else{ + DependResult result = getTaskDependResult(); + status = (result == DependResult.SUCCESS) ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE; + } + taskInstance.setState(status); + taskInstance.setEndTime(new Date()); + processService.saveTaskInstance(taskInstance); + } + + /** + * wait dependent tasks quit + */ + private Boolean waitTaskQuit() { + logger.info("wait depend task : {} complete", this.taskInstance.getName()); + if (taskInstance.getState().typeIsFinished()) { + logger.info("task {} already complete. task state:{}", + this.taskInstance.getName(), + this.taskInstance.getState()); + return true; + } + while (Stopper.isRunning()) { + try{ + if(this.processInstance == null){ + logger.error("process instance not exists , master task exec thread exit"); + return true; + } + if(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){ + cancelTaskInstance(); + break; + } + + if ( allDependentTaskFinish() || taskInstance.getState().typeIsFinished()){ + break; + } + // updateProcessInstance task instance + taskInstance = processService.findTaskInstanceById(taskInstance.getId()); + processInstance = processService.findProcessInstanceById(processInstance.getId()); + Thread.sleep(Constants.SLEEP_TIME_MILLIS); + } catch (Exception e) { + logger.error("exception",e); + if (processInstance != null) { + logger.error("wait task quit failed, instance id:{}, task id:{}", + processInstance.getId(), taskInstance.getId()); + } + } + } + return true; + } + + /** + * cancel dependent task + */ + private void cancelTaskInstance() { + this.cancel = true; + } + + private void initTaskParameters() { + taskInstance.setLogPath(getTaskLogPath(taskInstance)); + taskInstance.setHost(OSUtils.getHost() + Constants.COLON + masterConfig.getListenPort()); + taskInstance.setState(ExecutionStatus.RUNNING_EXEUTION); + taskInstance.setStartTime(new Date()); + processService.updateTaskInstance(taskInstance); + } + + /** + * judge all dependent tasks finish + * @return whether all dependent tasks finish + */ + private boolean allDependentTaskFinish(){ + boolean finish = true; + for(DependentExecute dependentExecute : dependentTaskList){ + for(Map.Entry entry: dependentExecute.getDependResultMap().entrySet()) { + if(!dependResultMap.containsKey(entry.getKey())){ + dependResultMap.put(entry.getKey(), entry.getValue()); + //save depend result to log + logger.info("dependent item complete {} {},{}", + DEPENDENT_SPLIT, entry.getKey(), entry.getValue()); + } + } + if(!dependentExecute.finish(dependentDate)){ + finish = false; + } + } + return finish; + } + + /** + * get dependent result + * @return DependResult + */ + private DependResult getTaskDependResult(){ + List dependResultList = new ArrayList<>(); + for(DependentExecute dependentExecute : dependentTaskList){ + DependResult dependResult = dependentExecute.getModelDependResult(dependentDate); + dependResultList.add(dependResult); + } + DependResult result = DependentUtils.getDependResultForRelation( + this.dependentParameters.getRelation(), dependResultList + ); + logger.info("dependent task completed, dependent result:{}", result); + return result; + } +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index dd7c564cbeef39275ea72c003cf983347d17bd7a..3226d82304207a84fb61af23858fbcf23993c26a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -16,11 +16,15 @@ */ package org.apache.dolphinscheduler.server.master.runner; +import ch.qos.logback.classic.LoggerContext; +import ch.qos.logback.classic.sift.SiftingAppender; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -41,7 +45,8 @@ public class MasterBaseTaskExecThread implements Callable { /** * logger of MasterBaseTaskExecThread */ - private static final Logger logger = LoggerFactory.getLogger(MasterBaseTaskExecThread.class); + protected Logger logger = LoggerFactory.getLogger(getClass()); + /** * process service @@ -71,7 +76,7 @@ public class MasterBaseTaskExecThread implements Callable { /** * master config */ - private MasterConfig masterConfig; + protected MasterConfig masterConfig; /** * taskUpdateQueue @@ -80,12 +85,10 @@ public class MasterBaseTaskExecThread implements Callable { /** * constructor of MasterBaseTaskExecThread * @param taskInstance task instance - * @param processInstance process instance */ - public MasterBaseTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){ + public MasterBaseTaskExecThread(TaskInstance taskInstance){ this.processService = SpringApplicationContext.getBean(ProcessService.class); this.alertDao = SpringApplicationContext.getBean(AlertDao.class); - this.processInstance = processInstance; this.cancel = false; this.taskInstance = taskInstance; this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class); @@ -123,7 +126,7 @@ public class MasterBaseTaskExecThread implements Callable { try { if(!submitDB){ // submit task to db - task = processService.submitTask(taskInstance, processInstance); + task = processService.submitTask(taskInstance); if(task != null && task.getId() != 0){ submitDB = true; } @@ -159,7 +162,9 @@ public class MasterBaseTaskExecThread implements Callable { public Boolean dispatchTask(TaskInstance taskInstance) { try{ - if(taskInstance.isSubProcess()){ + if(taskInstance.isConditionsTask() + || taskInstance.isDependTask() + || taskInstance.isSubProcess()){ return true; } if(taskInstance.getState().typeIsFinished()){ @@ -233,7 +238,39 @@ public class MasterBaseTaskExecThread implements Callable { */ @Override public Boolean call() throws Exception { + this.processInstance = processService.findProcessInstanceById(taskInstance.getProcessInstanceId()); return submitWaitComplete(); } + /** + * get task log path + * @return log path + */ + public String getTaskLogPath(TaskInstance task) { + String logPath; + try{ + String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory()) + .getLogger("ROOT") + .getAppender("TASKLOGFILE")) + .getDiscriminator()).getLogBase(); + if (baseLog.startsWith(Constants.SINGLE_SLASH)){ + logPath = baseLog + Constants.SINGLE_SLASH + + task.getProcessDefinitionId() + Constants.SINGLE_SLASH + + task.getProcessInstanceId() + Constants.SINGLE_SLASH + + task.getId() + ".log"; + }else{ + logPath = System.getProperty("user.dir") + Constants.SINGLE_SLASH + + baseLog + Constants.SINGLE_SLASH + + task.getProcessDefinitionId() + Constants.SINGLE_SLASH + + task.getProcessInstanceId() + Constants.SINGLE_SLASH + + task.getId() + ".log"; + } + }catch (Exception e){ + logger.error("logger", e); + logPath = ""; + } + return logPath; + } + + } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index e1e8c090fd958dba18afabd1ff54863c0f0098b7..e0110adfcd991ed875757ac27ab053198762ef49 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -365,7 +365,6 @@ public class MasterExecThread implements Runnable { } // generate process dag dag = DagHelper.buildDagGraph(processDag); - } /** @@ -418,9 +417,13 @@ public class MasterExecThread implements Runnable { private TaskInstance submitTaskExec(TaskInstance taskInstance) { MasterBaseTaskExecThread abstractExecThread = null; if(taskInstance.isSubProcess()){ - abstractExecThread = new SubProcessTaskExecThread(taskInstance, processInstance); + abstractExecThread = new SubProcessTaskExecThread(taskInstance); + }else if(taskInstance.isDependTask()){ + abstractExecThread = new DependentTaskExecThread(taskInstance); + }else if(taskInstance.isConditionsTask()){ + abstractExecThread = new ConditionsTaskExecThread(taskInstance); }else { - abstractExecThread = new MasterTaskExecThread(taskInstance, processInstance); + abstractExecThread = new MasterTaskExecThread(taskInstance); } Future future = taskExecService.submit(abstractExecThread); activeTaskNode.putIfAbsent(abstractExecThread, future); @@ -504,27 +507,7 @@ public class MasterExecThread implements Runnable { return taskInstance; } - /** - * is there have conditions after the parent node - * @param parentNodeName - * @return - */ - private boolean haveConditionsAfterNode(String parentNodeName){ - boolean result = false; - Collection startVertex = DagHelper.getStartVertex(parentNodeName, dag, completeTaskList); - if(startVertex == null){ - return result; - } - for(String nodeName : startVertex){ - TaskNode taskNode = dag.getNode(nodeName); - if(taskNode.getType().equals(TaskType.CONDITIONS.toString())){ - result = true; - break; - } - } - return result; - } /** * if all of the task dependence are skip, skip it too. @@ -701,7 +684,7 @@ public class MasterExecThread implements Runnable { ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState(); // conditions task would not return failed. if(depTaskState.typeIsFailure() - && !haveConditionsAfterNode(depsNode) + && !DagHelper.haveConditionsAfterNode(depsNode, dag ) && !dag.getNode(depsNode).isConditionsTask()){ return DependResult.FAILED; } @@ -1017,8 +1000,8 @@ public class MasterExecThread implements Runnable { addTaskToStandByList(task); }else{ completeTaskList.put(task.getName(), task); - if( task.getTaskType().equals(TaskType.CONDITIONS.toString()) || - haveConditionsAfterNode(task.getName())) { + if( task.isConditionsTask() + || DagHelper.haveConditionsAfterNode(task.getName(), dag)) { submitPostNode(task.getName()); }else{ errorTaskList.put(task.getName(), task); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java index bfc8c445e60dc15124048f8de7440f6689a15476..9986b07319668293b4e735c1c3561d1b3f975a12 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java @@ -16,7 +16,6 @@ */ package org.apache.dolphinscheduler.server.master.runner; -import org.slf4j.Logger; import com.alibaba.fastjson.JSON; @@ -28,7 +27,6 @@ import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; import org.apache.dolphinscheduler.remote.utils.Host; @@ -38,7 +36,6 @@ import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionConte import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.slf4j.LoggerFactory; import java.util.Date; @@ -48,12 +45,6 @@ import java.util.Date; */ public class MasterTaskExecThread extends MasterBaseTaskExecThread { - /** - * logger of MasterTaskExecThread - */ - private static final Logger logger = LoggerFactory.getLogger(MasterTaskExecThread.class); - - /** * taskInstance state manager */ @@ -65,10 +56,9 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { /** * constructor of MasterTaskExecThread * @param taskInstance task instance - * @param processInstance process instance */ - public MasterTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){ - super(taskInstance, processInstance); + public MasterTaskExecThread(TaskInstance taskInstance){ + super(taskInstance); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); this.nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java index 13a59505bcf68497dc7eadfd5bd5dee51ab80964..ee290487b76e1f0e0289ab3ff7c56e4d215b8d9f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java @@ -21,8 +21,6 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.Date; @@ -31,11 +29,6 @@ import java.util.Date; */ public class SubProcessTaskExecThread extends MasterBaseTaskExecThread { - /** - * logger of SubProcessTaskExecThread - */ - private static final Logger logger = LoggerFactory.getLogger(SubProcessTaskExecThread.class); - /** * sub process instance */ @@ -44,10 +37,9 @@ public class SubProcessTaskExecThread extends MasterBaseTaskExecThread { /** * sub process task exec thread * @param taskInstance task instance - * @param processInstance process instance */ - public SubProcessTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){ - super(taskInstance, processInstance); + public SubProcessTaskExecThread(TaskInstance taskInstance){ + super(taskInstance); } @Override diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java similarity index 87% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java index 087bb80ccb6feb8cd105436268f50706573bd5ab..235954632e07a820d3ec61cb3b8c930db7072d15 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.worker.task.dependent; +package org.apache.dolphinscheduler.server.utils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.DependResult; @@ -23,9 +23,11 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.model.DateInterval; import org.apache.dolphinscheduler.common.model.DependentItem; import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.DependentUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; @@ -108,31 +110,7 @@ public class DependentExecute { } // need to check workflow for updates, so get all task and check the task state if(dependentItem.getDepTasks().equals(Constants.DEPENDENT_ALL)){ - List taskNodes = - processService.getTaskNodeListByDefinitionId(dependentItem.getDefinitionId()); - - if(taskNodes != null && taskNodes.size() > 0){ - List results = new ArrayList<>(); - DependResult tmpResult = DependResult.FAILED; - for(TaskNode taskNode:taskNodes){ - tmpResult = getDependTaskResult(taskNode.getName(),processInstance); - if(DependResult.FAILED == tmpResult){ - break; - }else{ - results.add(getDependTaskResult(taskNode.getName(),processInstance)); - } - } - - if(DependResult.FAILED == tmpResult){ - result = DependResult.FAILED; - }else if(results.contains(DependResult.WAITING)){ - result = DependResult.WAITING; - }else{ - result = DependResult.SUCCESS; - } - }else{ - result = DependResult.FAILED; - } + result = dependResultByProcessInstance(processInstance); }else{ result = getDependTaskResult(dependentItem.getDepTasks(),processInstance); } @@ -143,6 +121,32 @@ public class DependentExecute { return result; } + /** + * depend type = depend_all + * skip the condition tasks. + * judge all the task + * @return + */ + private DependResult dependResultByProcessInstance(ProcessInstance processInstance){ + DependResult result = DependResult.FAILED; + List taskNodes = + processService.getTaskNodeListByDefinitionId(processInstance.getProcessDefinitionId()); + if(CollectionUtils.isEmpty(taskNodes)) { + return result; + } + for(TaskNode taskNode:taskNodes){ + if(taskNode.isConditionsTask() + || DagHelper.haveConditionsAfterNode(taskNode.getName(), taskNodes)){ + continue; + } + DependResult tmpResult = getDependTaskResult(taskNode.getName(),processInstance); + if(DependResult.SUCCESS != tmpResult){ + return tmpResult; + } + } + return DependResult.SUCCESS; + } + /** * get depend task result * @param taskName @@ -150,7 +154,7 @@ public class DependentExecute { * @return */ private DependResult getDependTaskResult(String taskName, ProcessInstance processInstance) { - DependResult result = DependResult.FAILED; + DependResult result; TaskInstance taskInstance = null; List taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId()); @@ -182,7 +186,7 @@ public class DependentExecute { */ private ProcessInstance findLastProcessInterval(int definitionId, DateInterval dateInterval) { - ProcessInstance runningProcess = processService.findLastRunningProcess(definitionId, dateInterval); + ProcessInstance runningProcess = processService.findLastRunningProcess(definitionId, dateInterval.getStartTime(), dateInterval.getEndTime()); if(runningProcess != null){ return runningProcess; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java index d997064892a5653ecd77fc58681f9fdc65f4fa3e..19ba9c9a21b66f725cdec65585c49d007fa2647c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java @@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.worker.task; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.utils.EnumUtils; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; -import org.apache.dolphinscheduler.server.worker.task.conditions.ConditionsTask; import org.apache.dolphinscheduler.server.worker.task.datax.DataxTask; import org.apache.dolphinscheduler.server.worker.task.flink.FlinkTask; import org.apache.dolphinscheduler.server.worker.task.http.HttpTask; @@ -69,8 +68,6 @@ public class TaskManager { return new DataxTask(taskExecutionContext, logger); case SQOOP: return new SqoopTask(taskExecutionContext, logger); - case CONDITIONS: - return new ConditionsTask(taskExecutionContext, logger); default: logger.error("unsupport task type: {}", taskExecutionContext.getTaskType()); throw new IllegalArgumentException("not support task type"); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java deleted file mode 100644 index 532a0863a5b704b64e16eb70a4da0d46112018aa..0000000000000000000000000000000000000000 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dolphinscheduler.server.worker.task.dependent; - -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.DependResult; -import org.apache.dolphinscheduler.common.enums.ExecutionStatus; -import org.apache.dolphinscheduler.common.model.DependentTaskModel; -import org.apache.dolphinscheduler.common.task.AbstractParameters; -import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; -import org.apache.dolphinscheduler.common.thread.Stopper; -import org.apache.dolphinscheduler.common.utils.DependentUtils; -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; -import org.apache.dolphinscheduler.server.worker.task.AbstractTask; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.process.ProcessService; -import org.slf4j.Logger; - -import java.util.*; - -import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT; - -/** - * Dependent Task - */ -public class DependentTask extends AbstractTask { - - /** - * dependent task list - */ - private List dependentTaskList = new ArrayList<>(); - - /** - * depend item result map - * save the result to log file - */ - private Map dependResultMap = new HashMap<>(); - - /** - * dependent parameters - */ - private DependentParameters dependentParameters; - - /** - * dependent date - */ - private Date dependentDate; - - /** - * process service - */ - private ProcessService processService; - - /** - * taskExecutionContext - */ - private TaskExecutionContext taskExecutionContext; - - /** - * constructor - * @param taskExecutionContext taskExecutionContext - * @param logger logger - */ - public DependentTask(TaskExecutionContext taskExecutionContext, Logger logger) { - super(taskExecutionContext, logger); - this.taskExecutionContext = taskExecutionContext; - } - - @Override - public void init(){ - logger.info("dependent task initialize"); - - this.dependentParameters = JSONUtils.parseObject(null, - DependentParameters.class); - if(dependentParameters != null){ - for(DependentTaskModel taskModel : dependentParameters.getDependTaskList()){ - this.dependentTaskList.add(new DependentExecute( - taskModel.getDependItemList(), taskModel.getRelation())); - } - } - - this.processService = SpringApplicationContext.getBean(ProcessService.class); - - if(taskExecutionContext.getScheduleTime() != null){ - this.dependentDate = taskExecutionContext.getScheduleTime(); - }else{ - this.dependentDate = taskExecutionContext.getStartTime(); - } - - } - - @Override - public void handle() throws Exception { - // set the name of the current thread - String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskExecutionContext.getTaskAppId()); - Thread.currentThread().setName(threadLoggerInfoName); - - try{ - TaskInstance taskInstance = null; - while(Stopper.isRunning()){ - taskInstance = processService.findTaskInstanceById(this.taskExecutionContext.getTaskInstanceId()); - - if(taskInstance == null){ - exitStatusCode = -1; - break; - } - - if(taskInstance.getState() == ExecutionStatus.KILL){ - this.cancel = true; - } - - if(this.cancel || allDependentTaskFinish()){ - break; - } - - Thread.sleep(Constants.SLEEP_TIME_MILLIS); - } - - if(cancel){ - exitStatusCode = Constants.EXIT_CODE_KILL; - }else{ - DependResult result = getTaskDependResult(); - exitStatusCode = (result == DependResult.SUCCESS) ? - Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE; - } - }catch (Exception e){ - logger.error(e.getMessage(),e); - exitStatusCode = -1; - throw e; - } - } - - /** - * get dependent result - * @return DependResult - */ - private DependResult getTaskDependResult(){ - List dependResultList = new ArrayList<>(); - for(DependentExecute dependentExecute : dependentTaskList){ - DependResult dependResult = dependentExecute.getModelDependResult(dependentDate); - dependResultList.add(dependResult); - } - DependResult result = DependentUtils.getDependResultForRelation( - this.dependentParameters.getRelation(), dependResultList - ); - return result; - } - - /** - * judge all dependent tasks finish - * @return whether all dependent tasks finish - */ - private boolean allDependentTaskFinish(){ - boolean finish = true; - for(DependentExecute dependentExecute : dependentTaskList){ - for(Map.Entry entry: dependentExecute.getDependResultMap().entrySet()) { - if(!dependResultMap.containsKey(entry.getKey())){ - dependResultMap.put(entry.getKey(), entry.getValue()); - //save depend result to log - logger.info("dependent item complete {} {},{}", - DEPENDENT_SPLIT, entry.getKey(), entry.getValue().toString()); - } - } - if(!dependentExecute.finish(dependentDate)){ - finish = false; - } - } - return finish; - } - - - @Override - public void cancelApplication(boolean cancelApplication) throws Exception { - // cancel process - this.cancel = true; - } - - @Override - public AbstractParameters getParameters() { - return null; - } -} diff --git a/dolphinscheduler-server/src/main/resources/logback-master.xml b/dolphinscheduler-server/src/main/resources/logback-master.xml index 58193caf4e147fae78db5b281c92f86128ed236d..7410c01f05d1d668f98e61a6e925760525784e70 100644 --- a/dolphinscheduler-server/src/main/resources/logback-master.xml +++ b/dolphinscheduler-server/src/main/resources/logback-master.xml @@ -29,7 +29,30 @@ - + + + + INFO + + + + taskAppId + ${log.base} + + + + ${log.base}/${taskAppId}.log + + + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %messsage%n + + UTF-8 + + true + + + ${log.base}/dolphinscheduler-master.log @@ -52,6 +75,7 @@ + diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java new file mode 100644 index 0000000000000000000000000000000000000000..299d4ba800d05e5196d2da6149974f3428b2d27d --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.master; + + +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.runner.ConditionsTaskExecThread; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationContext; + +import java.util.ArrayList; +import java.util.List; + +@RunWith(MockitoJUnitRunner.Silent.class) +public class ConditionsTaskTest { + + + private static final Logger logger = LoggerFactory.getLogger(DependentTaskTest.class); + + private ProcessService processService; + private ApplicationContext applicationContext; + + + private MasterConfig config; + + @Before + public void before() { + config = new MasterConfig(); + config.setMasterTaskCommitRetryTimes(3); + config.setMasterTaskCommitInterval(1000); + processService = Mockito.mock(ProcessService.class); + applicationContext = Mockito.mock(ApplicationContext.class); + SpringApplicationContext springApplicationContext = new SpringApplicationContext(); + springApplicationContext.setApplicationContext(applicationContext); + Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); + Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); + + Mockito.when(processService + .findTaskInstanceById(252612)) + .thenReturn(getTaskInstance()); + + Mockito.when(processService.saveTaskInstance(getTaskInstance())) + .thenReturn(true); + + Mockito.when(processService.findProcessInstanceById(10112)) + .thenReturn(getProcessInstance()); + + Mockito.when(processService + .findValidTaskListByProcessId(10112)) + .thenReturn(getTaskInstances()); + } + + @Test + public void testCondition(){ + TaskInstance taskInstance = getTaskInstance(); + String dependString = "{\"dependTaskList\":[{\"dependItemList\":[{\"depTasks\":\"1\",\"status\":\"SUCCESS\"}],\"relation\":\"AND\"}],\"relation\":\"AND\"}"; + String conditionResult = "{\"successNode\":[\"2\"],\"failedNode\":[\"3\"]}"; + + taskInstance.setDependency(dependString); + Mockito.when(processService.submitTask(taskInstance)) + .thenReturn(taskInstance); + ConditionsTaskExecThread conditions = + new ConditionsTaskExecThread(taskInstance); + + try { + conditions.call(); + } catch (Exception e) { + e.printStackTrace(); + } + + Assert.assertEquals(ExecutionStatus.SUCCESS, conditions.getTaskInstance().getState()); + } + + + private TaskInstance getTaskInstance(){ + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(252612); + taskInstance.setName("C"); + taskInstance.setTaskType("CONDITIONS"); + taskInstance.setProcessInstanceId(10112); + taskInstance.setProcessDefinitionId(100001); + return taskInstance; + } + + + + private List getTaskInstances(){ + List list = new ArrayList<>(); + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(199999); + taskInstance.setName("1"); + taskInstance.setState(ExecutionStatus.SUCCESS); + list.add(taskInstance); + return list; + } + + private ProcessInstance getProcessInstance(){ + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setId(10112); + processInstance.setProcessDefinitionId(100001); + processInstance.setState(ExecutionStatus.RUNNING_EXEUTION); + + return processInstance; + } + +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java similarity index 66% rename from dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java rename to dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java index 3477f4ac67220428d458c71134c5a6999a4c826c..a65b0508d8f3c1d6cc8474a4c0f0288942752b6f 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java @@ -14,17 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.worker.task.dependent; +package org.apache.dolphinscheduler.server.master; -import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.model.DateInterval; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.utils.dependent.DependentDateUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; -import org.apache.dolphinscheduler.server.worker.task.TaskProps; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.runner.DependentTaskExecThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.junit.Assert; @@ -50,12 +50,23 @@ public class DependentTaskTest { private ApplicationContext applicationContext; + private MasterConfig config; + @Before public void before() throws Exception{ + + config = new MasterConfig(); + config.setMasterTaskCommitRetryTimes(3); + config.setMasterTaskCommitInterval(1000); processService = Mockito.mock(ProcessService.class); + DateInterval dateInterval =DependentDateUtils.getTodayInterval(new Date()).get(0); Mockito.when(processService - .findLastRunningProcess(4,DependentDateUtils.getTodayInterval(new Date()).get(0))) + .findLastRunningProcess(4, dateInterval.getStartTime(), + dateInterval.getEndTime())) .thenReturn(findLastProcessInterval()); + + + Mockito.when(processService .getTaskNodeListByDefinitionId(4)) .thenReturn(getTaskNodes()); @@ -66,32 +77,62 @@ public class DependentTaskTest { Mockito.when(processService .findTaskInstanceById(252612)) .thenReturn(getTaskInstance()); + + + Mockito.when(processService.findProcessInstanceById(10111)) + .thenReturn(getProcessInstance()); + Mockito.when(processService.findProcessDefineById(0)) + .thenReturn(getProcessDefinition()); + Mockito.when(processService.saveTaskInstance(getTaskInstance())) + .thenReturn(true); + applicationContext = Mockito.mock(ApplicationContext.class); SpringApplicationContext springApplicationContext = new SpringApplicationContext(); springApplicationContext.setApplicationContext(applicationContext); Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); + Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); } @Test public void test() throws Exception{ - TaskProps taskProps = new TaskProps(); + TaskInstance taskInstance = getTaskInstance(); String dependString = "{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\",\"depTasks\":\"ALL\",\"projectId\":1,\"definitionList\":[{\"label\":\"C\",\"value\":4},{\"label\":\"B\",\"value\":3},{\"label\":\"A\",\"value\":2}],\"cycle\":\"day\",\"definitionId\":4}],\"relation\":\"AND\"}],\"relation\":\"AND\"}"; - taskProps.setDependence(dependString); - taskProps.setTaskStartTime(new Date()); - DependentTask dependentTask = new DependentTask(new TaskExecutionContext(), logger); - dependentTask.init(); - dependentTask.handle(); - Assert.assertEquals(dependentTask.getExitStatusCode(), Constants.EXIT_CODE_SUCCESS ); + taskInstance.setDependency(dependString); + Mockito.when(processService.submitTask(taskInstance)) + .thenReturn(taskInstance); + DependentTaskExecThread dependentTask = + new DependentTaskExecThread(taskInstance); + + dependentTask.call(); + + Assert.assertEquals(ExecutionStatus.SUCCESS, dependentTask.getTaskInstance().getState()); } private ProcessInstance findLastProcessInterval(){ ProcessInstance processInstance = new ProcessInstance(); processInstance.setId(11); + processInstance.setProcessDefinitionId(4); processInstance.setState(ExecutionStatus.SUCCESS); return processInstance; } + private ProcessDefinition getProcessDefinition(){ + ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setId(0); + return processDefinition; + } + + private ProcessInstance getProcessInstance(){ + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setId(10111); + processInstance.setProcessDefinitionId(0); + processInstance.setState(ExecutionStatus.RUNNING_EXEUTION); + + return processInstance; + } + + private List getTaskNodes(){ List list = new ArrayList<>(); TaskNode taskNode = new TaskNode(); @@ -113,9 +154,10 @@ public class DependentTaskTest { private TaskInstance getTaskInstance(){ TaskInstance taskInstance = new TaskInstance(); + taskInstance.setTaskType("DEPENDENT"); taskInstance.setId(252612); taskInstance.setName("C"); - taskInstance.setState(ExecutionStatus.SUCCESS); + taskInstance.setProcessInstanceId(10111); return taskInstance; } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index d2a13aebab1cf56cd69f3bfb1136b74a8495a33d..c69ea34c5cfe94603082d06b01e60cc64782ee0a 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -782,14 +782,13 @@ public class ProcessService { * submit task to db * submit sub process to command * @param taskInstance taskInstance - * @param processInstance processInstance * @return task instance */ @Transactional(rollbackFor = Exception.class) - public TaskInstance submitTask(TaskInstance taskInstance, ProcessInstance processInstance){ - logger.info("start submit task : {}, instance id:{}, state: {}, ", - taskInstance.getName(), processInstance.getId(), processInstance.getState() ); - processInstance = this.findProcessInstanceDetailById(processInstance.getId()); + public TaskInstance submitTask(TaskInstance taskInstance){ + ProcessInstance processInstance = this.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); + logger.info("start submit task : {}, instance id:{}, state: {}", + taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState()); //submit to db TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance); if(task == null){ @@ -1637,13 +1636,14 @@ public class ProcessService { /** * find last running process instance * @param definitionId process definition id - * @param dateInterval dateInterval + * @param startTime start time + * @param endTime end time * @return process instance */ - public ProcessInstance findLastRunningProcess(int definitionId, DateInterval dateInterval) { + public ProcessInstance findLastRunningProcess(int definitionId, Date startTime, Date endTime) { return processInstanceMapper.queryLastRunningProcess(definitionId, - dateInterval.getStartTime(), - dateInterval.getEndTime(), + startTime, + endTime, stateArray); } @@ -1799,5 +1799,22 @@ public class ProcessService { return resourceMapper.listResourceByIds(resIds); } + /** + * format task app id in task instance + * @param taskInstance + * @return + */ + public String formatTaskAppId(TaskInstance taskInstance){ + ProcessDefinition definition = this.findProcessDefineById(taskInstance.getProcessDefinitionId()); + ProcessInstance processInstanceById = this.findProcessInstanceById(taskInstance.getProcessInstanceId()); + + if(definition == null || processInstanceById == null){ + return ""; + } + return String.format("%s_%s_%s", + definition.getId(), + processInstanceById.getId(), + taskInstance.getId()); + } } diff --git a/pom.xml b/pom.xml index f90700e31819c8b17f4ad91faa934f75bbf0e8f1..9e9a43dac3c558378e50ade8099f16249dac20d2 100644 --- a/pom.xml +++ b/pom.xml @@ -782,6 +782,8 @@ **/server/master/register/MasterRegistryTest.java **/server/master/AlertManagerTest.java **/server/master/MasterCommandTest.java + **/server/master/DependentTaskTest.java + **/server/master/ConditionsTaskTest.java **/server/master/MasterExecThreadTest.java **/server/master/ParamsTest.java **/server/register/ZookeeperNodeManagerTest.java