未验证 提交 00bb86f7 编写于 作者: W Wenjun Ruan 提交者: GitHub

Merge pull request #67 from WhaleOps/dispatch-failed-queue

[Bug-Fix] Task dispatch failed queue fix.
...@@ -23,7 +23,6 @@ import org.apache.dolphinscheduler.common.thread.Stopper; ...@@ -23,7 +23,6 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
import org.apache.dolphinscheduler.service.queue.TaskPriority; import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
...@@ -49,14 +48,14 @@ public class TaskDispatchFailedQueueConsumer extends BaseDaemonThread { ...@@ -49,14 +48,14 @@ public class TaskDispatchFailedQueueConsumer extends BaseDaemonThread {
*/ */
@Autowired @Autowired
@Qualifier(Constants.TASK_PRIORITY_QUEUE) @Qualifier(Constants.TASK_PRIORITY_QUEUE)
private TaskPriorityQueue<TaskPriority> taskPriorityQueue; private TaskPriorityQueue<TaskPriority> taskPriorityQueueImpl;
/** /**
* taskDispatchFailedQueue * taskDispatchFailedQueue
*/ */
@Autowired @Autowired
@Qualifier(Constants.TASK_DISPATCH_FAILED_QUEUE) @Qualifier(Constants.TASK_DISPATCH_FAILED_QUEUE)
private TaskPriorityQueue<TaskPriority> taskDispatchFailedQueue; private TaskPriorityQueue<TaskPriority> taskDispatchFailedQueueImpl;
@Autowired @Autowired
private MasterConfig masterConfig; private MasterConfig masterConfig;
...@@ -74,8 +73,8 @@ public class TaskDispatchFailedQueueConsumer extends BaseDaemonThread { ...@@ -74,8 +73,8 @@ public class TaskDispatchFailedQueueConsumer extends BaseDaemonThread {
static { static {
TIME_DELAY = new Long[Constants.DEFAULT_MAX_RETRY_COUNT]; TIME_DELAY = new Long[Constants.DEFAULT_MAX_RETRY_COUNT];
for (int i = 0; i < Constants.DEFAULT_MAX_RETRY_COUNT; i++) { for (int i = 0; i < Constants.DEFAULT_MAX_RETRY_COUNT; i++) {
int delayTime = (i + 1) * 1000; long delayTime = (i + 1) * Constants.SLEEP_TIME_MILLIS;
TIME_DELAY[i] = (long) delayTime; TIME_DELAY[i] = delayTime;
} }
} }
...@@ -94,7 +93,7 @@ public class TaskDispatchFailedQueueConsumer extends BaseDaemonThread { ...@@ -94,7 +93,7 @@ public class TaskDispatchFailedQueueConsumer extends BaseDaemonThread {
public void run() { public void run() {
while (Stopper.isRunning()) { while (Stopper.isRunning()) {
try { try {
failedRetry(); dispatchFailedBackToTaskPriorityQueue(masterConfig.getDispatchTaskNumber());
} catch (Exception e) { } catch (Exception e) {
TaskMetrics.incTaskDispatchError(); TaskMetrics.incTaskDispatchError();
logger.error("failed task retry error", e); logger.error("failed task retry error", e);
...@@ -102,33 +101,31 @@ public class TaskDispatchFailedQueueConsumer extends BaseDaemonThread { ...@@ -102,33 +101,31 @@ public class TaskDispatchFailedQueueConsumer extends BaseDaemonThread {
} }
} }
public void failedRetry() throws TaskPriorityQueueException {
if (taskDispatchFailedQueue.size() > 0) {
retryConsumerThreadPoolExecutor.submit(() -> dispatchFailedBackToTaskPriorityQueue(masterConfig.getDispatchTaskNumber()));
}
}
/** /**
* put the failed dispatch task into the dispatch queue again * put the failed dispatch task into the dispatch queue again
*/ */
private void dispatchFailedBackToTaskPriorityQueue(int fetchTaskNum) { private void dispatchFailedBackToTaskPriorityQueue(int fetchTaskNum) {
for (int i = 0; i < fetchTaskNum; i++) { for (int i = 0; i < fetchTaskNum; i++) {
try { try {
TaskPriority dispatchFailedTaskPriority = taskDispatchFailedQueue.poll(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS); TaskPriority dispatchFailedTaskPriority = taskDispatchFailedQueueImpl.poll(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS);
if (Objects.isNull(dispatchFailedTaskPriority)) { if (Objects.isNull(dispatchFailedTaskPriority)) {
continue; continue;
} }
if (canRetry(dispatchFailedTaskPriority)) { retryConsumerThreadPoolExecutor.submit(() -> {
dispatchFailedTaskPriority.setDispatchFailedRetryTimes(dispatchFailedTaskPriority.getDispatchFailedRetryTimes() + 1); if (canRetry(dispatchFailedTaskPriority)) {
taskPriorityQueue.put(dispatchFailedTaskPriority); dispatchFailedTaskPriority.setDispatchFailedRetryTimes(dispatchFailedTaskPriority.getDispatchFailedRetryTimes() + 1);
} else { taskPriorityQueueImpl.put(dispatchFailedTaskPriority);
taskDispatchFailedQueue.put(dispatchFailedTaskPriority); } else {
} taskDispatchFailedQueueImpl.put(dispatchFailedTaskPriority);
}
});
} catch (InterruptedException exception) { } catch (InterruptedException exception) {
logger.error("dispatch failed queue poll error", exception); logger.error("dispatch failed queue poll error", exception);
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} catch (Exception e) { } catch (Exception e) {
logger.error("dispatch failed back to task priority queue error", e); logger.error("dispatch failed back to task priority queue error", e);
} finally {
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} }
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册