diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 5042a037140ab087f55a2036e9711c6cc5d037ed..fe5f9101b14c058added24dd9c54aa5dfadb7136 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -283,13 +283,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ switch (executeType) { case REPEAT_RUNNING: - result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), CommandType.REPEAT_RUNNING, startParams); + result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams); break; case RECOVER_SUSPENDED_PROCESS: - result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams); + result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams); break; case START_FAILURE_TASK_PROCESS: - result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), CommandType.START_FAILURE_TASK_PROCESS, startParams); + result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams); break; case STOP: if (processInstance.getState() == ExecutionStatus.READY_STOP) { @@ -409,10 +409,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * @param loginUser login user * @param instanceId instance id * @param processDefinitionCode process definition code + * @param version * @param commandType command type * @return insert result code */ - private Map insertCommand(User loginUser, Integer instanceId, long processDefinitionCode, CommandType commandType, String startParams) { + private Map insertCommand(User loginUser, Integer instanceId, long processDefinitionCode, int processVersion, CommandType commandType, String startParams) { Map result = new HashMap<>(); //To add startParams only when repeat running is needed @@ -427,6 +428,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ command.setProcessDefinitionCode(processDefinitionCode); command.setCommandParam(JSONUtils.toJsonString(cmdParam)); command.setExecutorId(loginUser.getId()); + command.setProcessDefinitionVersion(processVersion); + command.setProcessInstanceId(instanceId); if (!processService.verifyIsNeedCreateCommand(command)) { putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, processDefinitionCode); @@ -545,6 +548,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ command.setWorkerGroup(workerGroup); command.setEnvironmentCode(environmentCode); command.setDryRun(dryRun); + ProcessDefinition processDefinition = processService.findProcessDefinitionByCode(processDefineCode); + if (processDefinition != null) { + command.setProcessDefinitionVersion(processDefinition.getVersion()); + } + command.setProcessInstanceId(0); Date start = null; Date end = null; diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java index b1ed217537c47062a41ffee594c28e8a96f9d14b..ae2ff6258a18f3de588de3cb5a629dec6c48d8db 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java @@ -132,6 +132,12 @@ public class Command { @TableField("dry_run") private int dryRun; + @TableField("process_instance_id") + private int processInstanceId; + + @TableField("process_definition_version") + private int processDefinitionVersion; + public Command() { this.taskDependType = TaskDependType.TASK_POST; this.failureStrategy = FailureStrategy.CONTINUE; @@ -152,7 +158,10 @@ public class Command { String workerGroup, Long environmentCode, Priority processInstancePriority, - int dryRun) { + int dryRun, + int processInstanceId, + int processDefinitionVersion + ) { this.commandType = commandType; this.executorId = executorId; this.processDefinitionCode = processDefinitionCode; @@ -168,6 +177,8 @@ public class Command { this.environmentCode = environmentCode; this.processInstancePriority = processInstancePriority; this.dryRun = dryRun; + this.processInstanceId = processInstanceId; + this.processDefinitionVersion = processDefinitionVersion; } public TaskDependType getTaskDependType() { @@ -298,6 +309,22 @@ public class Command { this.dryRun = dryRun; } + public int getProcessInstanceId() { + return processInstanceId; + } + + public void setProcessInstanceId(int processInstanceId) { + this.processInstanceId = processInstanceId; + } + + public int getProcessDefinitionVersion() { + return processDefinitionVersion; + } + + public void setProcessDefinitionVersion(int processDefinitionVersion) { + this.processDefinitionVersion = processDefinitionVersion; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -353,8 +380,13 @@ public class Command { if (processInstancePriority != command.processInstancePriority) { return false; } + if (processInstanceId != command.processInstanceId) { + return false; + } + if (processDefinitionVersion != command.getProcessDefinitionVersion()) { + return false; + } return !(updateTime != null ? !updateTime.equals(command.updateTime) : command.updateTime != null); - } @Override @@ -375,6 +407,8 @@ public class Command { result = 31 * result + (workerGroup != null ? workerGroup.hashCode() : 0); result = 31 * result + (environmentCode != null ? environmentCode.hashCode() : 0); result = 31 * result + dryRun; + result = 31 * result + processInstanceId; + result = 31 * result + processDefinitionVersion; return result; } @@ -397,7 +431,10 @@ public class Command { + ", workerGroup='" + workerGroup + '\'' + ", environmentCode='" + environmentCode + '\'' + ", dryRun='" + dryRun + '\'' + + ", processInstanceId='" + processInstanceId + '\'' + + ", processDefinitionVersion='" + processDefinitionVersion + '\'' + '}'; } + } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java index 2bbfb4b7b1ce5e2475dd32210b2325fc523ba045..22913845c3a82ca313259054a6983a97dac7d597 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java @@ -31,13 +31,6 @@ import java.util.List; */ public interface CommandMapper extends BaseMapper { - - /** - * get one command - * @return command - */ - Command getOneToRun(); - /** * count command state * @param userId userId diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml index 5b2d6b4d8c13ef779faf9ac076483f54b7d34343..b0ea4774319538c81789fb78b4cf8a03bacf295d 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml @@ -18,16 +18,6 @@ -