diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index 9bb5c555fdda5e023b7a5f92713fb17c81add8a6..6b4b799ef961f3c39ce66c9cc091e56c472ef857 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -114,30 +114,29 @@ public class MasterBaseTaskExecThread implements Callable { Integer commitRetryInterval = masterConfig.getMasterTaskCommitInterval(); int retryTimes = 1; - boolean taskDBFlag = false; - boolean taskQueueFlag = false; + boolean submitDB = false; + boolean submitQueue = false; TaskInstance task = null; - while (true){ + while (retryTimes <= commitRetryTimes){ try { - if(!taskDBFlag){ + if(!submitDB){ // submit task to db task = processDao.submitTask(taskInstance, processInstance); if(task != null && task.getId() != 0){ - taskDBFlag = true; + submitDB = true; } } - if(taskDBFlag && !taskQueueFlag){ + if(submitDB && !submitQueue){ // submit task to queue - taskQueueFlag = processDao.submitTaskToQueue(task); + submitQueue = processDao.submitTaskToQueue(task); } - if(taskDBFlag && taskQueueFlag){ + if(submitDB && submitQueue){ return task; } - if(!taskDBFlag){ - logger.error("task commit to db failed , task has already retry {} times, please check the database", retryTimes); - }else if(!taskQueueFlag){ - logger.error("task commit to queue failed , task has already retry {} times, please check the database", retryTimes); - + if(!submitDB){ + logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes); + }else if(!submitQueue){ + logger.error("task commit to queue failed , taskId {} has already retry {} times, please check the queue", taskInstance.getId(), retryTimes); } Thread.sleep(commitRetryInterval); } catch (Exception e) { @@ -145,6 +144,7 @@ public class MasterBaseTaskExecThread implements Callable { } retryTimes += 1; } + return task; } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java index e91deca511a046664646705b9d99709bb2284526..7d10591e0d8f50acb8f1e2add06f60a970164041 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java @@ -74,6 +74,10 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { public Boolean submitWaitComplete() { Boolean result = false; this.taskInstance = submit(); + if(this.taskInstance == null){ + logger.error("submit task instance to mysql and queue failed , please check and fix it"); + return result; + } if(!this.taskInstance.getState().typeIsFinished()) { result = waitTaskQuit(); }