From 26d98931cf90daf5620eb88b3ef6167fc38922ba Mon Sep 17 00:00:00 2001 From: Tboy Date: Tue, 24 Dec 2019 22:10:29 +0800 Subject: [PATCH] fix MasterBaseTaskExecThread submit method bug (#1532) * fix #1515 * sleep when resource in not satisfy. fix #1522 * add sleep 1s for no command * fix MasterBaseTaskExecThread submit method bug * updates * add log --- .../runner/MasterBaseTaskExecThread.java | 26 +++++++++---------- .../master/runner/MasterTaskExecThread.java | 4 +++ 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index 9bb5c555f..6b4b799ef 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -114,30 +114,29 @@ public class MasterBaseTaskExecThread implements Callable { Integer commitRetryInterval = masterConfig.getMasterTaskCommitInterval(); int retryTimes = 1; - boolean taskDBFlag = false; - boolean taskQueueFlag = false; + boolean submitDB = false; + boolean submitQueue = false; TaskInstance task = null; - while (true){ + while (retryTimes <= commitRetryTimes){ try { - if(!taskDBFlag){ + if(!submitDB){ // submit task to db task = processDao.submitTask(taskInstance, processInstance); if(task != null && task.getId() != 0){ - taskDBFlag = true; + submitDB = true; } } - if(taskDBFlag && !taskQueueFlag){ + if(submitDB && !submitQueue){ // submit task to queue - taskQueueFlag = processDao.submitTaskToQueue(task); + submitQueue = processDao.submitTaskToQueue(task); } - if(taskDBFlag && taskQueueFlag){ + if(submitDB && submitQueue){ return task; } - if(!taskDBFlag){ - logger.error("task commit to db failed , task has already retry {} times, please check the database", retryTimes); - }else if(!taskQueueFlag){ - logger.error("task commit to queue failed , task has already retry {} times, please check the database", retryTimes); - + if(!submitDB){ + logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes); + }else if(!submitQueue){ + logger.error("task commit to queue failed , taskId {} has already retry {} times, please check the queue", taskInstance.getId(), retryTimes); } Thread.sleep(commitRetryInterval); } catch (Exception e) { @@ -145,6 +144,7 @@ public class MasterBaseTaskExecThread implements Callable { } retryTimes += 1; } + return task; } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java index e91deca51..7d10591e0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java @@ -74,6 +74,10 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { public Boolean submitWaitComplete() { Boolean result = false; this.taskInstance = submit(); + if(this.taskInstance == null){ + logger.error("submit task instance to mysql and queue failed , please check and fix it"); + return result; + } if(!this.taskInstance.getState().typeIsFinished()) { result = waitTaskQuit(); } -- GitLab