From 3e297bfd2ec458bc8b095f97b973448e70940fa2 Mon Sep 17 00:00:00 2001 From: bao liang <29528966+lenboo@users.noreply.github.com> Date: Fri, 22 May 2020 19:08:01 +0800 Subject: [PATCH] [BUG FIX]fix bug: Restart the worker service again, the previously submitted successful tasks are not executed bug (#2800) * feature: add number configuration for master dispatch tasks * fix bug(#2762) the master would be blocked when worker group not exists * fix bug(#2762) the master would be blocked when worker group not exists * fix ut * fix ut * fix bug(2781): cannot pause work flow when task state is "submit success" * fix code smell * add mysql other param blank judge * test * update comments * update comments * add ut * fix bug: Restart the worker service again, the previously submitted successful tasks are not executed * update comments * add sleep Co-authored-by: baoliang --- .../master/consumer/TaskPriorityQueueConsumer.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 6e19ad184..904914a55 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.consumer; import com.alibaba.fastjson.JSONObject; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.UdfType; @@ -107,6 +108,10 @@ public class TaskPriorityQueueConsumer extends Thread{ int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber(); failedDispatchTasks.clear(); for(int i = 0; i < fetchTaskNum; i++){ + if(taskPriorityQueue.size() <= 0){ + Thread.sleep(Constants.SLEEP_TIME_MILLIS); + continue; + } // if not task , blocking here String taskPriorityInfo = taskPriorityQueue.take(); TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo); @@ -115,8 +120,8 @@ public class TaskPriorityQueueConsumer extends Thread{ failedDispatchTasks.add(taskPriorityInfo); } } - for(String taskPriorityInfo: failedDispatchTasks){ - taskPriorityQueue.put(taskPriorityInfo); + for(String dispatchFailedTask : failedDispatchTasks){ + taskPriorityQueue.put(dispatchFailedTask); } }catch (Exception e){ logger.error("dispatcher task error",e); -- GitLab