From 624f0aeab9621abbbc9584c4a3167cb1bd2dcf00 Mon Sep 17 00:00:00 2001 From: bao liang <29528966+lenboo@users.noreply.github.com> Date: Tue, 28 Jul 2020 11:12:51 +0800 Subject: [PATCH] fix bug#1336 serial complement data can have multiple process instances (#3317) Co-authored-by: lenboo --- .../server/master/runner/MasterExecThread.java | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 177fb8a8c..8b2743d1c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -252,6 +252,9 @@ public class MasterExecThread implements Runnable { } while(Stopper.isRunning()){ + + logger.info("process {} start to complement {} data", + processInstance.getId(), DateUtils.dateToString(scheduleDate)); // prepare dag and other info prepareProcess(); @@ -266,13 +269,13 @@ public class MasterExecThread implements Runnable { // execute process ,waiting for end runProcess(); + endProcess(); // process instance failure ,no more complements if(!processInstance.getState().typeIsSuccess()){ logger.info("process {} state {}, complement not completely!", processInstance.getId(), processInstance.getState()); break; } - // current process instance success ,next execute if(null == iterator){ // loop by day @@ -291,9 +294,7 @@ public class MasterExecThread implements Runnable { } scheduleDate = iterator.next(); } - - logger.info("process {} start to complement {} data", - processInstance.getId(), DateUtils.dateToString(scheduleDate)); + // flow end // execute next process instance complement data processInstance.setScheduleTime(scheduleDate); if(cmdParam.containsKey(Constants.CMDPARAM_RECOVERY_START_NODE_STRING)){ @@ -301,22 +302,15 @@ public class MasterExecThread implements Runnable { processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam)); } - List taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId()); - for(TaskInstance taskInstance : taskInstanceList){ - taskInstance.setFlag(Flag.NO); - processService.updateTaskInstance(taskInstance); - } processInstance.setState(ExecutionStatus.RUNNING_EXEUTION); processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( processInstance.getProcessDefinition().getGlobalParamMap(), processInstance.getProcessDefinition().getGlobalParamList(), CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime())); - + processInstance.setId(0); processService.saveProcessInstance(processInstance); } - // flow end - endProcess(); } -- GitLab