diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskDispatchFailedQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskDispatchFailedQueueConsumer.java index 499fd8e07bcf9345f1de24272c12e8d92e030a66..b6d3a2984b39186914b0f757f305fc24fe186b4c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskDispatchFailedQueueConsumer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskDispatchFailedQueueConsumer.java @@ -23,7 +23,6 @@ import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; -import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; import org.apache.dolphinscheduler.service.queue.TaskPriority; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; @@ -49,14 +48,14 @@ public class TaskDispatchFailedQueueConsumer extends BaseDaemonThread { */ @Autowired @Qualifier(Constants.TASK_PRIORITY_QUEUE) - private TaskPriorityQueue taskPriorityQueue; + private TaskPriorityQueue taskPriorityQueueImpl; /** * taskDispatchFailedQueue */ @Autowired @Qualifier(Constants.TASK_DISPATCH_FAILED_QUEUE) - private TaskPriorityQueue taskDispatchFailedQueue; + private TaskPriorityQueue taskDispatchFailedQueueImpl; @Autowired private MasterConfig masterConfig; @@ -74,8 +73,8 @@ public class TaskDispatchFailedQueueConsumer extends BaseDaemonThread { static { TIME_DELAY = new Long[Constants.DEFAULT_MAX_RETRY_COUNT]; for (int i = 0; i < Constants.DEFAULT_MAX_RETRY_COUNT; i++) { - int delayTime = (i + 1) * 1000; - TIME_DELAY[i] = (long) delayTime; + long delayTime = (i + 1) * Constants.SLEEP_TIME_MILLIS; + TIME_DELAY[i] = delayTime; } } @@ -94,7 +93,7 @@ public class TaskDispatchFailedQueueConsumer extends BaseDaemonThread { public void run() { while (Stopper.isRunning()) { try { - failedRetry(); + dispatchFailedBackToTaskPriorityQueue(masterConfig.getDispatchTaskNumber()); } catch (Exception e) { TaskMetrics.incTaskDispatchError(); logger.error("failed task retry error", e); @@ -102,33 +101,31 @@ public class TaskDispatchFailedQueueConsumer extends BaseDaemonThread { } } - public void failedRetry() throws TaskPriorityQueueException { - if (taskDispatchFailedQueue.size() > 0) { - retryConsumerThreadPoolExecutor.submit(() -> dispatchFailedBackToTaskPriorityQueue(masterConfig.getDispatchTaskNumber())); - } - } - /** * put the failed dispatch task into the dispatch queue again */ private void dispatchFailedBackToTaskPriorityQueue(int fetchTaskNum) { for (int i = 0; i < fetchTaskNum; i++) { try { - TaskPriority dispatchFailedTaskPriority = taskDispatchFailedQueue.poll(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS); + TaskPriority dispatchFailedTaskPriority = taskDispatchFailedQueueImpl.poll(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS); if (Objects.isNull(dispatchFailedTaskPriority)) { continue; } - if (canRetry(dispatchFailedTaskPriority)) { - dispatchFailedTaskPriority.setDispatchFailedRetryTimes(dispatchFailedTaskPriority.getDispatchFailedRetryTimes() + 1); - taskPriorityQueue.put(dispatchFailedTaskPriority); - } else { - taskDispatchFailedQueue.put(dispatchFailedTaskPriority); - } + retryConsumerThreadPoolExecutor.submit(() -> { + if (canRetry(dispatchFailedTaskPriority)) { + dispatchFailedTaskPriority.setDispatchFailedRetryTimes(dispatchFailedTaskPriority.getDispatchFailedRetryTimes() + 1); + taskPriorityQueueImpl.put(dispatchFailedTaskPriority); + } else { + taskDispatchFailedQueueImpl.put(dispatchFailedTaskPriority); + } + }); } catch (InterruptedException exception) { logger.error("dispatch failed queue poll error", exception); Thread.currentThread().interrupt(); } catch (Exception e) { logger.error("dispatch failed back to task priority queue error", e); + } finally { + ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } } }