From 6e453de241a6abb34c77462c9e778bf6f4013758 Mon Sep 17 00:00:00 2001 From: Shukun Zhang <60541766+andream7@users.noreply.github.com> Date: Thu, 30 Sep 2021 11:18:06 +0800 Subject: [PATCH] [Feature] workflow level task dry run (#6104) * feat workflow level task dry run * feat workflow level task dry run * feat workflow level task dry run * feat workflow level task dry run * feat workflow level task dry run * feat workflow level task dry run * feat workflow level task dry run * feat workflow level task dry run * feat workflow level task dry run * feat workflow level task dry run --- .../api/controller/ExecutorController.java | 5 ++-- .../api/service/ExecutorService.java | 3 +- .../api/service/impl/ExecutorServiceImpl.java | 8 ++++-- .../api/service/ExecutorServiceTest.java | 14 +++++----- .../dolphinscheduler/common/Constants.java | 7 +++++ .../dolphinscheduler/dao/entity/Command.java | 20 ++++++++++++- .../dao/entity/ErrorCommand.java | 17 ++++++++++- .../dao/entity/ProcessInstance.java | 16 +++++++++++ .../dao/entity/TaskInstance.java | 14 ++++++++++ .../dao/mapper/CommandMapper.xml | 2 +- .../dao/mapper/ProcessInstanceMapper.xml | 4 +-- .../dao/mapper/TaskInstanceMapper.xml | 4 +-- .../master/runner/WorkflowExecuteThread.java | 3 ++ .../worker/runner/TaskExecuteThread.java | 28 +++++++++++++------ .../service/process/ProcessService.java | 7 +++-- .../service/process/ProcessServiceTest.java | 1 + sql/dolphinscheduler_mysql.sql | 4 +++ sql/dolphinscheduler_postgre.sql | 4 +++ 18 files changed, 130 insertions(+), 31 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java index 93a31f2e8..db28f0b7d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java @@ -123,7 +123,8 @@ public class ExecutorController extends BaseController { @RequestParam(value = "environmentCode", required = false, defaultValue = "-1") Long environmentCode, @RequestParam(value = "timeout", required = false) Integer timeout, @RequestParam(value = "startParams", required = false) String startParams, - @RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber) { + @RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber, + @RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun) { if (timeout == null) { timeout = Constants.MAX_TASK_TIMEOUT; @@ -133,7 +134,7 @@ public class ExecutorController extends BaseController { startParamMap = JSONUtils.toMap(startParams); } Map result = execService.execProcessInstance(loginUser, projectCode, processDefinitionCode, scheduleTime, execType, failureStrategy, - startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode,timeout, startParamMap, expectedParallelismNumber); + startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode,timeout, startParamMap, expectedParallelismNumber, dryRun); return returnDataList(result); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java index e86881599..72d189237 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java @@ -62,7 +62,8 @@ public interface ExecutorService { TaskDependType taskDependType, WarningType warningType, int warningGroupId, RunMode runMode, Priority processInstancePriority, String workerGroup, Long environmentCode, Integer timeout, - Map startParams, Integer expectedParallelismNumber); + Map startParams, Integer expectedParallelismNumber, + int dryRun); /** * check whether the process definition can be executed diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index a213573f8..b910c4ba0 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -133,7 +133,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ TaskDependType taskDependType, WarningType warningType, int warningGroupId, RunMode runMode, Priority processInstancePriority, String workerGroup, Long environmentCode,Integer timeout, - Map startParams, Integer expectedParallelismNumber) { + Map startParams, Integer expectedParallelismNumber, + int dryRun) { Project project = projectMapper.queryByCode(projectCode); //check user access for project Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode); @@ -170,7 +171,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ */ int create = this.createCommand(commandType, processDefinition.getCode(), taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(), - warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode, startParams, expectedParallelismNumber); + warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode, startParams, expectedParallelismNumber, dryRun); if (create > 0) { processDefinition.setWarningGroupId(warningGroupId); @@ -507,7 +508,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ String startNodeList, String schedule, WarningType warningType, int executorId, int warningGroupId, RunMode runMode, Priority processInstancePriority, String workerGroup, Long environmentCode, - Map startParams, Integer expectedParallelismNumber) { + Map startParams, Integer expectedParallelismNumber, int dryRun) { /** * instantiate command schedule instance @@ -543,6 +544,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ command.setProcessInstancePriority(processInstancePriority); command.setWorkerGroup(workerGroup); command.setEnvironmentCode(environmentCode); + command.setDryRun(dryRun); Date start = null; Date end = null; diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java index 01e81adbb..e308f5844 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java @@ -158,7 +158,7 @@ public class ExecutorServiceTest { null, null, null, null, 0, RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 10, null, 0); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 10, null, 0, Constants.DRY_RUN_FLAG_NO); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(1)).createCommand(any(Command.class)); @@ -176,7 +176,7 @@ public class ExecutorServiceTest { null, "n1,n2", null, null, 0, RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 0); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 0, Constants.DRY_RUN_FLAG_NO); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(1)).createCommand(any(Command.class)); @@ -194,7 +194,7 @@ public class ExecutorServiceTest { null, null, null, null, 0, RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO); Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS)); verify(processService, times(0)).createCommand(any(Command.class)); } @@ -211,7 +211,7 @@ public class ExecutorServiceTest { null, null, null, null, 0, RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(1)).createCommand(any(Command.class)); } @@ -228,7 +228,7 @@ public class ExecutorServiceTest { null, null, null, null, 0, RunMode.RUN_MODE_PARALLEL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(31)).createCommand(any(Command.class)); @@ -246,7 +246,7 @@ public class ExecutorServiceTest { null, null, null, null, 0, RunMode.RUN_MODE_PARALLEL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 15); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 15, Constants.DRY_RUN_FLAG_NO); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(15)).createCommand(any(Command.class)); @@ -261,7 +261,7 @@ public class ExecutorServiceTest { null, null, null, null, 0, RunMode.RUN_MODE_PARALLEL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 0); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 0, Constants.DRY_RUN_FLAG_NO); Assert.assertEquals(result.get(Constants.STATUS), Status.MASTER_NOT_EXISTS); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 417ce59de..2d695046b 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -1093,4 +1093,11 @@ public final class Constants { public static final String TASK_DEPENDENCE_PROJECT_NAME = "projectName"; public static final String TASK_DEPENDENCE_DEFINITION_ID = "definitionId"; public static final String TASK_DEPENDENCE_DEFINITION_NAME = "definitionName"; + + /** + * dry run flag + */ + public static final int DRY_RUN_FLAG_NO = 0; + public static final int DRY_RUN_FLAG_YES = 1; + } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java index e3088d022..b1ed21753 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java @@ -126,6 +126,12 @@ public class Command { @TableField("environment_code") private Long environmentCode; + /** + * dry run flag + */ + @TableField("dry_run") + private int dryRun; + public Command() { this.taskDependType = TaskDependType.TASK_POST; this.failureStrategy = FailureStrategy.CONTINUE; @@ -145,7 +151,8 @@ public class Command { Date scheduleTime, String workerGroup, Long environmentCode, - Priority processInstancePriority) { + Priority processInstancePriority, + int dryRun) { this.commandType = commandType; this.executorId = executorId; this.processDefinitionCode = processDefinitionCode; @@ -160,6 +167,7 @@ public class Command { this.workerGroup = workerGroup; this.environmentCode = environmentCode; this.processInstancePriority = processInstancePriority; + this.dryRun = dryRun; } public TaskDependType getTaskDependType() { @@ -282,6 +290,14 @@ public class Command { this.environmentCode = environmentCode; } + public int getDryRun() { + return dryRun; + } + + public void setDryRun(int dryRun) { + this.dryRun = dryRun; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -358,6 +374,7 @@ public class Command { result = 31 * result + (updateTime != null ? updateTime.hashCode() : 0); result = 31 * result + (workerGroup != null ? workerGroup.hashCode() : 0); result = 31 * result + (environmentCode != null ? environmentCode.hashCode() : 0); + result = 31 * result + dryRun; return result; } @@ -379,6 +396,7 @@ public class Command { + ", updateTime=" + updateTime + ", workerGroup='" + workerGroup + '\'' + ", environmentCode='" + environmentCode + '\'' + + ", dryRun='" + dryRun + '\'' + '}'; } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java index d03570d9e..20df39ba2 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java @@ -120,7 +120,12 @@ public class ErrorCommand { */ private Long environmentCode; - public ErrorCommand(){} + /** + * dry run flag + */ + private int dryRun; + + public ErrorCommand() {} public ErrorCommand(Command command, String message) { this.id = command.getId(); @@ -138,6 +143,7 @@ public class ErrorCommand { this.environmentCode = command.getEnvironmentCode(); this.processInstancePriority = command.getProcessInstancePriority(); this.message = message; + this.dryRun = command.getDryRun(); } public TaskDependType getTaskDependType() { @@ -268,6 +274,14 @@ public class ErrorCommand { this.environmentCode = environmentCode; } + public int getDryRun() { + return dryRun; + } + + public void setDryRun(int dryRun) { + this.dryRun = dryRun; + } + @Override public String toString() { return "ErrorCommand{" @@ -287,6 +301,7 @@ public class ErrorCommand { + ", message='" + message + '\'' + ", workerGroup='" + workerGroup + '\'' + ", environmentCode='" + environmentCode + '\'' + + ", dryRun='" + dryRun + '\'' + '}'; } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java index 693f019e4..18c386b85 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java @@ -239,6 +239,11 @@ public class ProcessInstance { */ private String varPool; + /** + * dry run flag + */ + private int dryRun; + public ProcessInstance() { } @@ -503,6 +508,14 @@ public class ProcessInstance { this.environmentCode = environmentCode; } + public int getDryRun() { + return dryRun; + } + + public void setDryRun(int dryRun) { + this.dryRun = dryRun; + } + /** * add command to history * @@ -668,6 +681,9 @@ public class ProcessInstance { + ", processDefinitionVersion='" + processDefinitionVersion + '\'' + + ", dryRun='" + + dryRun + + '\'' + '}'; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index 7948df45a..ac18975d9 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -261,6 +261,11 @@ public class TaskInstance implements Serializable { */ private String taskParams; + /** + * dry run flag + */ + private int dryRun; + public void init(String host, Date startTime, String executePath) { this.host = host; this.startTime = startTime; @@ -529,6 +534,14 @@ public class TaskInstance implements Serializable { this.executorName = executorName; } + public int getDryRun() { + return dryRun; + } + + public void setDryRun(int dryRun) { + this.dryRun = dryRun; + } + public boolean isTaskComplete() { return this.getState().typeIsPause() @@ -654,6 +667,7 @@ public class TaskInstance implements Serializable { + ", executorId=" + executorId + ", executorName='" + executorName + '\'' + ", delayTime=" + delayTime + + ", dryRun=" + dryRun + '}'; } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml index d0e9141bc..ab7033680 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml @@ -21,7 +21,7 @@ select @@ -90,7 +90,7 @@