提交 aae2d90f 编写于 作者: C cheddar

Merge pull request #221 from metamx/fix-is

Fix the case where a task completes during indexing service coordinator restart causes improper cleanup
...@@ -283,14 +283,28 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider ...@@ -283,14 +283,28 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
{ {
RemoteTaskRunnerWorkItem runningTask = runningTasks.get(task.getId()); RemoteTaskRunnerWorkItem runningTask = runningTasks.get(task.getId());
if (runningTask != null) { if (runningTask != null) {
log.info("Assigned a task[%s] that is already running, not doing anything", task.getId()); ZkWorker zkWorker = findWorkerRunningTask(task.getId());
return runningTask.getResult(); if (zkWorker == null) {
log.makeAlert("Told to run task that is in the running queue but no worker is actually running it?!")
.addData("taskId", task.getId())
.emit();
runningTasks.remove(task.getId());
} else {
log.info("Task[%s] already running on %s.", task.getId(), zkWorker.getWorker().getHost());
TaskStatus status = zkWorker.getRunningTasks().get(task.getId());
if (status.isComplete()) {
taskComplete(runningTask, zkWorker, task.getId(), status);
}
return runningTask.getResult();
}
} }
RemoteTaskRunnerWorkItem pendingTask = pendingTasks.get(task.getId()); RemoteTaskRunnerWorkItem pendingTask = pendingTasks.get(task.getId());
if (pendingTask != null) { if (pendingTask != null) {
log.info("Assigned a task[%s] that is already pending, not doing anything", task.getId()); log.info("Assigned a task[%s] that is already pending, not doing anything", task.getId());
return pendingTask.getResult(); return pendingTask.getResult();
} }
RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem( RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
task, task,
SettableFuture.<TaskStatus>create() SettableFuture.<TaskStatus>create()
...@@ -473,8 +487,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider ...@@ -473,8 +487,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
} }
catch (Exception e) { catch (Exception e) {
log.makeAlert("Exception while trying to run task") log.makeAlert("Exception while trying to run task")
.addData("taskId", taskRunnerWorkItem.getTask().getId()) .addData("taskId", taskRunnerWorkItem.getTask().getId())
.emit(); .emit();
} }
} }
...@@ -590,16 +604,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider ...@@ -590,16 +604,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
} }
if (taskStatus.isComplete()) { if (taskStatus.isComplete()) {
if (taskRunnerWorkItem != null) { taskComplete(taskRunnerWorkItem, zkWorker, taskId, taskStatus);
final ListenableFuture<TaskStatus> result = taskRunnerWorkItem.getResult();
if (result != null) {
((SettableFuture<TaskStatus>) result).set(taskStatus);
}
}
// Worker is done with this task
zkWorker.setLastCompletedTaskTime(new DateTime());
cleanup(zkWorker.getWorker().getHost(), taskId);
runPendingTasks(); runPendingTasks();
} }
break; break;
...@@ -710,4 +715,23 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider ...@@ -710,4 +715,23 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values()); log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values());
return null; return null;
} }
private void taskComplete(
RemoteTaskRunnerWorkItem taskRunnerWorkItem,
ZkWorker zkWorker,
String taskId,
TaskStatus taskStatus
)
{
if (taskRunnerWorkItem != null) {
final ListenableFuture<TaskStatus> result = taskRunnerWorkItem.getResult();
if (result != null) {
((SettableFuture<TaskStatus>) result).set(taskStatus);
}
}
// Worker is done with this task
zkWorker.setLastCompletedTaskTime(new DateTime());
cleanup(zkWorker.getWorker().getHost(), taskId);
}
} }
\ No newline at end of file
...@@ -39,6 +39,7 @@ import java.io.IOException; ...@@ -39,6 +39,7 @@ import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
/** /**
* Holds information about a worker and a listener for task status changes associated with the worker. * Holds information about a worker and a listener for task status changes associated with the worker.
...@@ -49,7 +50,7 @@ public class ZkWorker implements Closeable ...@@ -49,7 +50,7 @@ public class ZkWorker implements Closeable
private final PathChildrenCache statusCache; private final PathChildrenCache statusCache;
private final Function<ChildData, TaskStatus> cacheConverter; private final Function<ChildData, TaskStatus> cacheConverter;
private volatile DateTime lastCompletedTaskTime = new DateTime(); private AtomicReference<DateTime> lastCompletedTaskTime = new AtomicReference<DateTime>(new DateTime());
public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper) public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper)
{ {
...@@ -128,7 +129,7 @@ public class ZkWorker implements Closeable ...@@ -128,7 +129,7 @@ public class ZkWorker implements Closeable
@JsonProperty @JsonProperty
public DateTime getLastCompletedTaskTime() public DateTime getLastCompletedTaskTime()
{ {
return lastCompletedTaskTime; return lastCompletedTaskTime.get();
} }
public boolean isRunningTask(String taskId) public boolean isRunningTask(String taskId)
...@@ -154,7 +155,7 @@ public class ZkWorker implements Closeable ...@@ -154,7 +155,7 @@ public class ZkWorker implements Closeable
public void setLastCompletedTaskTime(DateTime completedTaskTime) public void setLastCompletedTaskTime(DateTime completedTaskTime)
{ {
lastCompletedTaskTime = completedTaskTime; lastCompletedTaskTime.getAndSet(completedTaskTime);
} }
@Override @Override
......
...@@ -258,6 +258,25 @@ public class RemoteTaskRunnerTest ...@@ -258,6 +258,25 @@ public class RemoteTaskRunnerTest
Assert.assertFalse(runningTasks.contains("first")); Assert.assertFalse(runningTasks.contains("first"));
} }
@Test
public void testRunWithTaskComplete() throws Exception
{
cf.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(joiner.join(statusPath, task.getId()), jsonMapper.writeValueAsBytes(TaskStatus.success(task.getId())));
doSetup();
remoteTaskRunner.bootstrap(Arrays.<Task>asList(task));
ListenableFuture<TaskStatus> future = remoteTaskRunner.run(task);
TaskStatus status = future.get();
Assert.assertEquals(TaskStatus.Status.SUCCESS, status.getStatusCode());
}
private void doSetup() throws Exception private void doSetup() throws Exception
{ {
makeWorker(); makeWorker();
......
...@@ -47,9 +47,8 @@ public class CostBalancerStrategy implements BalancerStrategy ...@@ -47,9 +47,8 @@ public class CostBalancerStrategy implements BalancerStrategy
DataSegment proposalSegment, List<ServerHolder> serverHolders DataSegment proposalSegment, List<ServerHolder> serverHolders
) )
{ {
ServerHolder holder= chooseBestServer(proposalSegment, serverHolders, false).rhs; ServerHolder holder = chooseBestServer(proposalSegment, serverHolders, false).rhs;
if (holder!=null && !holder.isServingSegment(proposalSegment)) if (holder != null && !holder.isServingSegment(proposalSegment)) {
{
return holder; return holder;
} }
return null; return null;
...@@ -65,7 +64,6 @@ public class CostBalancerStrategy implements BalancerStrategy ...@@ -65,7 +64,6 @@ public class CostBalancerStrategy implements BalancerStrategy
} }
/** /**
* For assignment, we want to move to the lowest cost server that isn't already serving the segment. * For assignment, we want to move to the lowest cost server that isn't already serving the segment.
* *
...@@ -83,25 +81,10 @@ public class CostBalancerStrategy implements BalancerStrategy ...@@ -83,25 +81,10 @@ public class CostBalancerStrategy implements BalancerStrategy
{ {
Pair<Double, ServerHolder> bestServer = Pair.of(Double.POSITIVE_INFINITY, null); Pair<Double, ServerHolder> bestServer = Pair.of(Double.POSITIVE_INFINITY, null);
MinMaxPriorityQueue<Pair<Double, ServerHolder>> costsAndServers = MinMaxPriorityQueue.orderedBy(
new Comparator<Pair<Double, ServerHolder>>()
{
@Override
public int compare(
Pair<Double, ServerHolder> o,
Pair<Double, ServerHolder> o1
)
{
return Double.compare(o.lhs, o1.lhs);
}
}
).create();
final long proposalSegmentSize = proposalSegment.getSize(); final long proposalSegmentSize = proposalSegment.getSize();
for (ServerHolder server : serverHolders) { for (ServerHolder server : serverHolders) {
if (includeCurrentServer || !server.isServingSegment(proposalSegment)) if (includeCurrentServer || !server.isServingSegment(proposalSegment)) {
{
/** Don't calculate cost if the server doesn't have enough space or is loading the segment */ /** Don't calculate cost if the server doesn't have enough space or is loading the segment */
if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) { if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) {
continue; continue;
...@@ -159,10 +142,10 @@ public class CostBalancerStrategy implements BalancerStrategy ...@@ -159,10 +142,10 @@ public class CostBalancerStrategy implements BalancerStrategy
referenceTimestamp.getMillis() - segment1.getInterval().getEndMillis(), referenceTimestamp.getMillis() - segment1.getInterval().getEndMillis(),
referenceTimestamp.getMillis() - segment2.getInterval().getEndMillis() referenceTimestamp.getMillis() - segment2.getInterval().getEndMillis()
); );
double segment1diff=referenceTimestamp.getMillis()-segment1.getInterval().getEndMillis(); double segment1diff = referenceTimestamp.getMillis() - segment1.getInterval().getEndMillis();
double segment2diff=referenceTimestamp.getMillis()-segment2.getInterval().getEndMillis(); double segment2diff = referenceTimestamp.getMillis() - segment2.getInterval().getEndMillis();
if (segment1diff < SEVEN_DAYS_IN_MILLIS && segment2diff <SEVEN_DAYS_IN_MILLIS) { if (segment1diff < SEVEN_DAYS_IN_MILLIS && segment2diff < SEVEN_DAYS_IN_MILLIS) {
recencyPenalty = (2 - segment1diff / SEVEN_DAYS_IN_MILLIS)*(2-segment2diff /SEVEN_DAYS_IN_MILLIS); recencyPenalty = (2 - segment1diff / SEVEN_DAYS_IN_MILLIS) * (2 - segment2diff / SEVEN_DAYS_IN_MILLIS);
} }
/** gap is null if the two segment intervals overlap or if they're adjacent */ /** gap is null if the two segment intervals overlap or if they're adjacent */
...@@ -251,6 +234,4 @@ public class CostBalancerStrategy implements BalancerStrategy ...@@ -251,6 +234,4 @@ public class CostBalancerStrategy implements BalancerStrategy
); );
} }
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册