提交 b300ee68 编写于 作者: W WangJPLeo

Avoid the queue is empty

上级 3dbfc146
......@@ -93,9 +93,7 @@ public class TaskDispatchFailedQueueConsumer extends BaseDaemonThread {
public void run() {
while (Stopper.isRunning()) {
try {
if (taskDispatchFailedQueueImpl.size() > 0) {
dispatchFailedBackToTaskPriorityQueue(masterConfig.getDispatchTaskNumber());
}
dispatchFailedBackToTaskPriorityQueue(masterConfig.getDispatchTaskNumber());
} catch (Exception e) {
TaskMetrics.incTaskDispatchError();
logger.error("failed task retry error", e);
......@@ -113,7 +111,14 @@ public class TaskDispatchFailedQueueConsumer extends BaseDaemonThread {
if (Objects.isNull(dispatchFailedTaskPriority)) {
continue;
}
retryConsumerThreadPoolExecutor.submit(() -> dispatchFailedTask(dispatchFailedTaskPriority));
retryConsumerThreadPoolExecutor.submit(() -> {
if (canRetry(dispatchFailedTaskPriority)) {
dispatchFailedTaskPriority.setDispatchFailedRetryTimes(dispatchFailedTaskPriority.getDispatchFailedRetryTimes() + 1);
taskPriorityQueueImpl.put(dispatchFailedTaskPriority);
} else {
taskDispatchFailedQueueImpl.put(dispatchFailedTaskPriority);
}
});
} catch (InterruptedException exception) {
logger.error("dispatch failed queue poll error", exception);
Thread.currentThread().interrupt();
......@@ -125,18 +130,6 @@ public class TaskDispatchFailedQueueConsumer extends BaseDaemonThread {
}
}
/**
* determine whether to retry and put into the dispatch queue when the conditions are met.
*/
private void dispatchFailedTask(TaskPriority dispatchFailedTaskPriority) {
if (canRetry(dispatchFailedTaskPriority)) {
dispatchFailedTaskPriority.setDispatchFailedRetryTimes(dispatchFailedTaskPriority.getDispatchFailedRetryTimes() + 1);
taskPriorityQueueImpl.put(dispatchFailedTaskPriority);
} else {
taskDispatchFailedQueueImpl.put(dispatchFailedTaskPriority);
}
}
/**
* the time interval is adjusted according to the number of retries
*/
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册