未验证 提交 6e453de2 编写于 作者: S Shukun Zhang 提交者: GitHub

[Feature] workflow level task dry run (#6104)

* feat workflow level task dry run

* feat workflow level task dry run

* feat workflow level task dry run

* feat workflow level task dry run

* feat workflow level task dry run

* feat workflow level task dry run

* feat workflow level task dry run

* feat workflow level task dry run

* feat workflow level task dry run

* feat workflow level task dry run
上级 1843e986
......@@ -123,7 +123,8 @@ public class ExecutorController extends BaseController {
@RequestParam(value = "environmentCode", required = false, defaultValue = "-1") Long environmentCode,
@RequestParam(value = "timeout", required = false) Integer timeout,
@RequestParam(value = "startParams", required = false) String startParams,
@RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber) {
@RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber,
@RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun) {
if (timeout == null) {
timeout = Constants.MAX_TASK_TIMEOUT;
......@@ -133,7 +134,7 @@ public class ExecutorController extends BaseController {
startParamMap = JSONUtils.toMap(startParams);
}
Map<String, Object> result = execService.execProcessInstance(loginUser, projectCode, processDefinitionCode, scheduleTime, execType, failureStrategy,
startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode,timeout, startParamMap, expectedParallelismNumber);
startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode,timeout, startParamMap, expectedParallelismNumber, dryRun);
return returnDataList(result);
}
......
......@@ -62,7 +62,8 @@ public interface ExecutorService {
TaskDependType taskDependType, WarningType warningType, int warningGroupId,
RunMode runMode,
Priority processInstancePriority, String workerGroup, Long environmentCode, Integer timeout,
Map<String, String> startParams, Integer expectedParallelismNumber);
Map<String, String> startParams, Integer expectedParallelismNumber,
int dryRun);
/**
* check whether the process definition can be executed
......
......@@ -133,7 +133,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
TaskDependType taskDependType, WarningType warningType, int warningGroupId,
RunMode runMode,
Priority processInstancePriority, String workerGroup, Long environmentCode,Integer timeout,
Map<String, String> startParams, Integer expectedParallelismNumber) {
Map<String, String> startParams, Integer expectedParallelismNumber,
int dryRun) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
......@@ -170,7 +171,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
*/
int create = this.createCommand(commandType, processDefinition.getCode(),
taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(),
warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode, startParams, expectedParallelismNumber);
warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode, startParams, expectedParallelismNumber, dryRun);
if (create > 0) {
processDefinition.setWarningGroupId(warningGroupId);
......@@ -507,7 +508,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
String startNodeList, String schedule, WarningType warningType,
int executorId, int warningGroupId,
RunMode runMode, Priority processInstancePriority, String workerGroup, Long environmentCode,
Map<String, String> startParams, Integer expectedParallelismNumber) {
Map<String, String> startParams, Integer expectedParallelismNumber, int dryRun) {
/**
* instantiate command schedule instance
......@@ -543,6 +544,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
command.setProcessInstancePriority(processInstancePriority);
command.setWorkerGroup(workerGroup);
command.setEnvironmentCode(environmentCode);
command.setDryRun(dryRun);
Date start = null;
Date end = null;
......
......@@ -158,7 +158,7 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 10, null, 0);
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 10, null, 0, Constants.DRY_RUN_FLAG_NO);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class));
......@@ -176,7 +176,7 @@ public class ExecutorServiceTest {
null, "n1,n2",
null, null, 0,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 0);
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 0, Constants.DRY_RUN_FLAG_NO);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class));
......@@ -194,7 +194,7 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0);
Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO);
Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS));
verify(processService, times(0)).createCommand(any(Command.class));
}
......@@ -211,7 +211,7 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0);
Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class));
}
......@@ -228,7 +228,7 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0);
Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(31)).createCommand(any(Command.class));
......@@ -246,7 +246,7 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 15);
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 15, Constants.DRY_RUN_FLAG_NO);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(15)).createCommand(any(Command.class));
......@@ -261,7 +261,7 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 0);
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 0, Constants.DRY_RUN_FLAG_NO);
Assert.assertEquals(result.get(Constants.STATUS), Status.MASTER_NOT_EXISTS);
}
......
......@@ -1093,4 +1093,11 @@ public final class Constants {
public static final String TASK_DEPENDENCE_PROJECT_NAME = "projectName";
public static final String TASK_DEPENDENCE_DEFINITION_ID = "definitionId";
public static final String TASK_DEPENDENCE_DEFINITION_NAME = "definitionName";
/**
* dry run flag
*/
public static final int DRY_RUN_FLAG_NO = 0;
public static final int DRY_RUN_FLAG_YES = 1;
}
......@@ -126,6 +126,12 @@ public class Command {
@TableField("environment_code")
private Long environmentCode;
/**
* dry run flag
*/
@TableField("dry_run")
private int dryRun;
public Command() {
this.taskDependType = TaskDependType.TASK_POST;
this.failureStrategy = FailureStrategy.CONTINUE;
......@@ -145,7 +151,8 @@ public class Command {
Date scheduleTime,
String workerGroup,
Long environmentCode,
Priority processInstancePriority) {
Priority processInstancePriority,
int dryRun) {
this.commandType = commandType;
this.executorId = executorId;
this.processDefinitionCode = processDefinitionCode;
......@@ -160,6 +167,7 @@ public class Command {
this.workerGroup = workerGroup;
this.environmentCode = environmentCode;
this.processInstancePriority = processInstancePriority;
this.dryRun = dryRun;
}
public TaskDependType getTaskDependType() {
......@@ -282,6 +290,14 @@ public class Command {
this.environmentCode = environmentCode;
}
public int getDryRun() {
return dryRun;
}
public void setDryRun(int dryRun) {
this.dryRun = dryRun;
}
@Override
public boolean equals(Object o) {
if (this == o) {
......@@ -358,6 +374,7 @@ public class Command {
result = 31 * result + (updateTime != null ? updateTime.hashCode() : 0);
result = 31 * result + (workerGroup != null ? workerGroup.hashCode() : 0);
result = 31 * result + (environmentCode != null ? environmentCode.hashCode() : 0);
result = 31 * result + dryRun;
return result;
}
......@@ -379,6 +396,7 @@ public class Command {
+ ", updateTime=" + updateTime
+ ", workerGroup='" + workerGroup + '\''
+ ", environmentCode='" + environmentCode + '\''
+ ", dryRun='" + dryRun + '\''
+ '}';
}
}
......
......@@ -120,7 +120,12 @@ public class ErrorCommand {
*/
private Long environmentCode;
public ErrorCommand(){}
/**
* dry run flag
*/
private int dryRun;
public ErrorCommand() {}
public ErrorCommand(Command command, String message) {
this.id = command.getId();
......@@ -138,6 +143,7 @@ public class ErrorCommand {
this.environmentCode = command.getEnvironmentCode();
this.processInstancePriority = command.getProcessInstancePriority();
this.message = message;
this.dryRun = command.getDryRun();
}
public TaskDependType getTaskDependType() {
......@@ -268,6 +274,14 @@ public class ErrorCommand {
this.environmentCode = environmentCode;
}
public int getDryRun() {
return dryRun;
}
public void setDryRun(int dryRun) {
this.dryRun = dryRun;
}
@Override
public String toString() {
return "ErrorCommand{"
......@@ -287,6 +301,7 @@ public class ErrorCommand {
+ ", message='" + message + '\''
+ ", workerGroup='" + workerGroup + '\''
+ ", environmentCode='" + environmentCode + '\''
+ ", dryRun='" + dryRun + '\''
+ '}';
}
}
......@@ -239,6 +239,11 @@ public class ProcessInstance {
*/
private String varPool;
/**
* dry run flag
*/
private int dryRun;
public ProcessInstance() {
}
......@@ -503,6 +508,14 @@ public class ProcessInstance {
this.environmentCode = environmentCode;
}
public int getDryRun() {
return dryRun;
}
public void setDryRun(int dryRun) {
this.dryRun = dryRun;
}
/**
* add command to history
*
......@@ -668,6 +681,9 @@ public class ProcessInstance {
+ ", processDefinitionVersion='"
+ processDefinitionVersion
+ '\''
+ ", dryRun='"
+ dryRun
+ '\''
+ '}';
}
......
......@@ -261,6 +261,11 @@ public class TaskInstance implements Serializable {
*/
private String taskParams;
/**
* dry run flag
*/
private int dryRun;
public void init(String host, Date startTime, String executePath) {
this.host = host;
this.startTime = startTime;
......@@ -529,6 +534,14 @@ public class TaskInstance implements Serializable {
this.executorName = executorName;
}
public int getDryRun() {
return dryRun;
}
public void setDryRun(int dryRun) {
this.dryRun = dryRun;
}
public boolean isTaskComplete() {
return this.getState().typeIsPause()
......@@ -654,6 +667,7 @@ public class TaskInstance implements Serializable {
+ ", executorId=" + executorId
+ ", executorName='" + executorName + '\''
+ ", delayTime=" + delayTime
+ ", dryRun=" + dryRun
+ '}';
}
......
......@@ -21,7 +21,7 @@
<select id="getOneToRun" resultType="org.apache.dolphinscheduler.dao.entity.Command">
select cmd.id, cmd.command_type, cmd.process_definition_code, cmd.command_param, cmd.task_depend_type, cmd.failure_strategy,
cmd.warning_type, cmd.warning_group_id, cmd.schedule_time, cmd.start_time, cmd.executor_id, cmd.update_time,
cmd.process_instance_priority, cmd.worker_group, cmd.environment_code
cmd.process_instance_priority, cmd.worker_group, cmd.environment_code, cmd.dry_run
from t_ds_command cmd
join t_ds_process_definition definition on cmd.process_definition_code = definition.code
where definition.release_state = 1 AND definition.flag = 1
......
......@@ -23,7 +23,7 @@
command_type, command_param, task_depend_type, max_try_times, failure_strategy, warning_type,
warning_group_id, schedule_time, command_start_time, global_params, flag,
update_time, is_sub_process, executor_id, history_cmd,
process_instance_priority, worker_group,environment_code, timeout, tenant_id, var_pool
process_instance_priority, worker_group,environment_code, timeout, tenant_id, var_pool, dry_run
</sql>
<select id="queryDetailById" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
......@@ -90,7 +90,7 @@
<select id="queryProcessInstanceListPaging" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select instance.id, instance.command_type, instance.executor_id, instance.process_definition_version,
instance.process_definition_code, instance.name, instance.state, instance.schedule_time, instance.start_time,
instance.end_time, instance.run_times, instance.recovery, instance.host
instance.end_time, instance.run_times, instance.recovery, instance.host, instance.dry_run
from t_ds_process_instance instance
join t_ds_process_definition define ON instance.process_definition_code = define.code
where instance.is_sub_process=0
......
......@@ -22,13 +22,13 @@
id, name, task_type, process_instance_id, task_code, task_definition_version, state, submit_time,
start_time, end_time, host, execute_path, log_path, alert_flag, retry_times, pid, app_link,
flag, retry_interval, max_retry_times, task_instance_priority, worker_group,environment_code , executor_id,
first_submit_time, delay_time, task_params, var_pool
first_submit_time, delay_time, task_params, var_pool, dry_run
</sql>
<sql id="baseSqlV2">
${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.task_code, ${alias}.task_definition_version, ${alias}.process_instance_id, ${alias}.state, ${alias}.submit_time,
${alias}.start_time, ${alias}.end_time, ${alias}.host, ${alias}.execute_path, ${alias}.log_path, ${alias}.alert_flag, ${alias}.retry_times, ${alias}.pid, ${alias}.app_link,
${alias}.flag, ${alias}.retry_interval, ${alias}.max_retry_times, ${alias}.task_instance_priority, ${alias}.worker_group,${alias}.environment_code , ${alias}.executor_id,
${alias}.first_submit_time, ${alias}.delay_time, ${alias}.task_params, ${alias}.var_pool
${alias}.first_submit_time, ${alias}.delay_time, ${alias}.task_params, ${alias}.var_pool, ${alias}.dry_run
</sql>
<update id="setFailoverByHostAndStateArray">
update t_ds_task_instance
......
......@@ -702,6 +702,9 @@ public class WorkflowExecuteThread implements Runnable {
// task instance flag
taskInstance.setFlag(Flag.YES);
// task dry run flag
taskInstance.setDryRun(processInstance.getDryRun());
// task instance retry times
taskInstance.setRetryTimes(0);
......
......@@ -167,10 +167,14 @@ public class TaskExecuteThread implements Runnable, Delayed {
}
logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId());
TaskInstance taskInstance = processService.findTaskInstanceById(taskExecutionContext.getTaskInstanceId());
int dryRun = taskInstance.getDryRun();
// copy hdfs/minio file to local
downloadResource(taskExecutionContext.getExecutePath(),
taskExecutionContext.getResources(),
logger);
if (dryRun == Constants.DRY_RUN_FLAG_NO) {
downloadResource(taskExecutionContext.getExecutePath(),
taskExecutionContext.getResources(),
logger);
}
taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
taskExecutionContext.setDefinedParams(getGlobalParamsMap());
......@@ -198,14 +202,20 @@ public class TaskExecuteThread implements Runnable, Delayed {
this.task.init();
//init varPool
this.task.getParameters().setVarPool(taskExecutionContext.getVarPool());
// task handle
this.task.handle();
// task result process
if (this.task.getNeedAlert()) {
sendAlert(this.task.getTaskAlertInfo());
if (dryRun == Constants.DRY_RUN_FLAG_NO) {
// task handle
this.task.handle();
// task result process
if (this.task.getNeedAlert()) {
sendAlert(this.task.getTaskAlertInfo());
}
responseCommand.setStatus(this.task.getExitStatus().getCode());
} else {
responseCommand.setStatus(ExecutionStatus.SUCCESS.getCode());
task.setExitStatusCode(Constants.EXIT_CODE_SUCCESS);
}
responseCommand.setStatus(this.task.getExitStatus().getCode());
responseCommand.setEndTime(new Date());
responseCommand.setProcessId(this.task.getProcessId());
responseCommand.setAppIds(this.task.getAppIds());
......
......@@ -552,7 +552,8 @@ public class ProcessService {
processInstance.getScheduleTime(),
processInstance.getWorkerGroup(),
processInstance.getEnvironmentCode(),
processInstance.getProcessInstancePriority()
processInstance.getProcessInstancePriority(),
processInstance.getDryRun()
);
saveCommand(command);
return;
......@@ -632,6 +633,7 @@ public class ProcessService {
processInstance.setWarningType(warningType);
Integer warningGroupId = command.getWarningGroupId() == null ? 0 : command.getWarningGroupId();
processInstance.setWarningGroupId(warningGroupId);
processInstance.setDryRun(command.getDryRun());
// schedule time
Date scheduleTime = getScheduleTime(command, cmdParam);
......@@ -1292,7 +1294,8 @@ public class ProcessService {
parentProcessInstance.getScheduleTime(),
task.getWorkerGroup(),
task.getEnvironmentCode(),
parentProcessInstance.getProcessInstancePriority()
parentProcessInstance.getProcessInstancePriority(),
parentProcessInstance.getDryRun()
);
}
......
......@@ -296,6 +296,7 @@ public class ProcessServiceTest {
commandParams.put(CMD_PARAM_START_PARAMS, JSONUtils.toJsonString(startParams));
command5.setCommandParam(JSONUtils.toJsonString(commandParams));
command5.setCommandType(CommandType.START_PROCESS);
command5.setDryRun(Constants.DRY_RUN_FLAG_NO);
ProcessInstance processInstance1 = processService.handleCommand(logger, host, validThreadNum, command5);
Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\""));
}
......
......@@ -332,6 +332,7 @@ CREATE TABLE `t_ds_command` (
`process_instance_priority` int(11) DEFAULT NULL COMMENT 'process instance priority: 0 Highest,1 High,2 Medium,3 Low,4 Lowest',
`worker_group` varchar(64) COMMENT 'worker group',
`environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code',
`dry_run` int NULL DEFAULT 0 COMMENT 'dry run flag:0 normal, 1 dry run',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
......@@ -381,6 +382,7 @@ CREATE TABLE `t_ds_error_command` (
`worker_group` varchar(64) COMMENT 'worker group',
`environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code',
`message` text COMMENT 'message',
`dry_run` int NULL DEFAULT NULL COMMENT 'dry run flag: 0 normal, 1 dry run',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;
......@@ -586,6 +588,7 @@ CREATE TABLE `t_ds_process_instance` (
`timeout` int(11) DEFAULT '0' COMMENT 'time out',
`tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id',
`var_pool` longtext COMMENT 'var_pool',
`dry_run` int NULL DEFAULT 0 COMMENT 'dry run flag: 0 normal, 1 dry run ',
PRIMARY KEY (`id`),
KEY `process_instance_index` (`process_definition_code`,`id`) USING BTREE,
KEY `start_time_index` (`start_time`) USING BTREE
......@@ -822,6 +825,7 @@ CREATE TABLE `t_ds_task_instance` (
`first_submit_time` datetime DEFAULT NULL COMMENT 'task first submit time',
`delay_time` int(4) DEFAULT '0' COMMENT 'task delay execution time',
`var_pool` longtext COMMENT 'var_pool',
`dry_run` int NULL DEFAULT NULL COMMENT 'dry run flag: 0 normal, 1 dry run',
PRIMARY KEY (`id`),
KEY `process_instance_id` (`process_instance_id`) USING BTREE,
CONSTRAINT `foreign_key_instance_id` FOREIGN KEY (`process_instance_id`) REFERENCES `t_ds_process_instance` (`id`) ON DELETE CASCADE
......
......@@ -254,6 +254,7 @@ CREATE TABLE t_ds_command (
process_instance_priority int DEFAULT NULL ,
worker_group varchar(64),
environment_code bigint DEFAULT '-1',
dry_run int DEFAULT '0' ,
PRIMARY KEY (id)
) ;
......@@ -297,6 +298,7 @@ CREATE TABLE t_ds_error_command (
worker_group varchar(64),
environment_code bigint DEFAULT '-1',
message text ,
dry_ru int DEFAULT '0' ,
PRIMARY KEY (id)
);
--
......@@ -490,6 +492,7 @@ CREATE TABLE t_ds_process_instance (
timeout int DEFAULT '0' ,
tenant_id int NOT NULL DEFAULT '-1' ,
var_pool text ,
dry_run int DEFAULT '0' ,
PRIMARY KEY (id)
) ;
......@@ -703,6 +706,7 @@ CREATE TABLE t_ds_task_instance (
first_submit_time timestamp DEFAULT NULL ,
delay_time int DEFAULT '0' ,
var_pool text ,
dry_run int DEFAULT '0' ,
PRIMARY KEY (id),
CONSTRAINT foreign_key_instance_id FOREIGN KEY(process_instance_id) REFERENCES t_ds_process_instance(id) ON DELETE CASCADE
) ;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册