diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index ad444070be7c0986afcece7363494b84b33ed7b4..3525ae4d77853cd1ae5105fec0ea2a6c2d221d70 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -575,6 +575,15 @@ public class WorkflowExecuteThread implements Runnable { complementListDate = CronUtils.getSelfFireDateList(start, end, schedules); logger.info(" process definition code:{} complement data: {}", processInstance.getProcessDefinitionCode(), complementListDate.toString()); + + if (complementListDate.size() > 0 && Flag.NO == processInstance.getIsSubProcess()) { + processInstance.setScheduleTime(complementListDate.get(0)); + processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( + processDefinition.getGlobalParamMap(), + processDefinition.getGlobalParamList(), + CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime())); + processService.updateProcessInstance(processInstance); + } } } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index e78112a33b4eab30a477007a8f77177f7ac6b353..4ca47c8fcaef3345b12390037d54a50030613f8a 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -125,7 +125,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.facebook.presto.jdbc.internal.guava.collect.Lists; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -630,10 +629,8 @@ public class ProcessService { processInstance.setWarningGroupId(warningGroupId); processInstance.setDryRun(command.getDryRun()); - // schedule time - Date scheduleTime = getScheduleTime(command, cmdParam); - if (scheduleTime != null) { - processInstance.setScheduleTime(scheduleTime); + if (command.getScheduleTime() != null) { + processInstance.setScheduleTime(command.getScheduleTime()); } processInstance.setCommandStartTime(command.getStartTime()); processInstance.setLocations(processDefinition.getLocations()); @@ -878,13 +875,14 @@ public class ProcessService { runStatus = processInstance.getState(); break; case COMPLEMENT_DATA: - // delete all the valid tasks when complement data - List taskInstanceList = this.findValidTaskListByProcessId(processInstance.getId()); - for (TaskInstance taskInstance : taskInstanceList) { - taskInstance.setFlag(Flag.NO); - this.updateTaskInstance(taskInstance); + // delete all the valid tasks when complement data if id is not null + if (processInstance.getId() != 0) { + List taskInstanceList = this.findValidTaskListByProcessId(processInstance.getId()); + for (TaskInstance taskInstance : taskInstanceList) { + taskInstance.setFlag(Flag.NO); + this.updateTaskInstance(taskInstance); + } } - initComplementDataParam(processDefinition, processInstance, cmdParam); break; case REPEAT_RUNNING: // delete the recover task names from command parameter