diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 6e19ad18472426f93f63ad20da9a9457438b93bc..904914a55a6795474698b6ae8e06b7f438e75fd3 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.consumer; import com.alibaba.fastjson.JSONObject; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.UdfType; @@ -107,6 +108,10 @@ public class TaskPriorityQueueConsumer extends Thread{ int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber(); failedDispatchTasks.clear(); for(int i = 0; i < fetchTaskNum; i++){ + if(taskPriorityQueue.size() <= 0){ + Thread.sleep(Constants.SLEEP_TIME_MILLIS); + continue; + } // if not task , blocking here String taskPriorityInfo = taskPriorityQueue.take(); TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo); @@ -115,8 +120,8 @@ public class TaskPriorityQueueConsumer extends Thread{ failedDispatchTasks.add(taskPriorityInfo); } } - for(String taskPriorityInfo: failedDispatchTasks){ - taskPriorityQueue.put(taskPriorityInfo); + for(String dispatchFailedTask : failedDispatchTasks){ + taskPriorityQueue.put(dispatchFailedTask); } }catch (Exception e){ logger.error("dispatcher task error",e);