提交 bb83ddb5 编写于 作者: F Fangjin Yang

fix bug with worker disappearance

上级 7f674719
......@@ -478,7 +478,7 @@ public class RemoteTaskRunner implements TaskRunner
log.info("Task %s just disappeared!", taskId);
retryTask(runningTasks.get(taskId), worker.getHost());
} else {
log.info("Lost a task I didn't know about: %s", taskId);
log.info("A task disappeared I didn't know about: %s", taskId);
}
}
}
......@@ -513,19 +513,23 @@ public class RemoteTaskRunner implements TaskRunner
ZkWorker zkWorker = zkWorkers.get(worker.getHost());
if (zkWorker != null) {
try {
Set<String> tasksPending = Sets.newHashSet(
Set<String> tasksToRetry = Sets.newHashSet(
cf.getChildren()
.forPath(JOINER.join(config.getTaskPath(), worker.getHost()))
);
log.info("%s had %d tasks pending", worker.getHost(), tasksPending.size());
tasksToRetry.addAll(
cf.getChildren()
.forPath(JOINER.join(config.getStatusPath(), worker.getHost()))
);
log.info("%s has %d tasks to retry", worker.getHost(), tasksToRetry.size());
for (String taskId : tasksPending) {
TaskRunnerWorkItem taskRunnerWorkItem = pendingTasks.get(taskId);
for (String taskId : tasksToRetry) {
TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId);
if (taskRunnerWorkItem != null) {
cf.delete().guaranteed().forPath(JOINER.join(config.getTaskPath(), worker.getHost(), taskId));
retryTask(taskRunnerWorkItem, worker.getHost());
} else {
log.warn("RemoteTaskRunner has no knowledge of pending task %s", taskId);
log.warn("RemoteTaskRunner has no knowledge of task %s", taskId);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册