提交 af8288d7 编写于 作者: W WangJPLeo

task dispatch failed queue fix

上级 307151cf
......@@ -49,14 +49,14 @@ public class TaskDispatchFailedQueueConsumer extends BaseDaemonThread {
*/
@Autowired
@Qualifier(Constants.TASK_PRIORITY_QUEUE)
private TaskPriorityQueue<TaskPriority> taskPriorityQueue;
private TaskPriorityQueue<TaskPriority> taskPriorityQueueImpl;
/**
* taskDispatchFailedQueue
*/
@Autowired
@Qualifier(Constants.TASK_DISPATCH_FAILED_QUEUE)
private TaskPriorityQueue<TaskPriority> taskDispatchFailedQueue;
private TaskPriorityQueue<TaskPriority> taskDispatchFailedQueueImpl;
@Autowired
private MasterConfig masterConfig;
......@@ -74,8 +74,8 @@ public class TaskDispatchFailedQueueConsumer extends BaseDaemonThread {
static {
TIME_DELAY = new Long[Constants.DEFAULT_MAX_RETRY_COUNT];
for (int i = 0; i < Constants.DEFAULT_MAX_RETRY_COUNT; i++) {
int delayTime = (i + 1) * 1000;
TIME_DELAY[i] = (long) delayTime;
long delayTime = (i + 1) * Constants.SLEEP_TIME_MILLIS;
TIME_DELAY[i] = delayTime;
}
}
......@@ -98,12 +98,14 @@ public class TaskDispatchFailedQueueConsumer extends BaseDaemonThread {
} catch (Exception e) {
TaskMetrics.incTaskDispatchError();
logger.error("failed task retry error", e);
} finally {
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
}
}
}
public void failedRetry() throws TaskPriorityQueueException {
if (taskDispatchFailedQueue.size() > 0) {
if (taskDispatchFailedQueueImpl.size() > 0) {
retryConsumerThreadPoolExecutor.submit(() -> dispatchFailedBackToTaskPriorityQueue(masterConfig.getDispatchTaskNumber()));
}
}
......@@ -114,15 +116,15 @@ public class TaskDispatchFailedQueueConsumer extends BaseDaemonThread {
private void dispatchFailedBackToTaskPriorityQueue(int fetchTaskNum) {
for (int i = 0; i < fetchTaskNum; i++) {
try {
TaskPriority dispatchFailedTaskPriority = taskDispatchFailedQueue.poll(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS);
TaskPriority dispatchFailedTaskPriority = taskDispatchFailedQueueImpl.poll(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS);
if (Objects.isNull(dispatchFailedTaskPriority)) {
continue;
}
if (canRetry(dispatchFailedTaskPriority)) {
dispatchFailedTaskPriority.setDispatchFailedRetryTimes(dispatchFailedTaskPriority.getDispatchFailedRetryTimes() + 1);
taskPriorityQueue.put(dispatchFailedTaskPriority);
taskPriorityQueueImpl.put(dispatchFailedTaskPriority);
} else {
taskDispatchFailedQueue.put(dispatchFailedTaskPriority);
taskDispatchFailedQueueImpl.put(dispatchFailedTaskPriority);
}
} catch (InterruptedException exception) {
logger.error("dispatch failed queue poll error", exception);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册