diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java index 32c5bed1dcdc8ac5b16dc27a61d55f61ab3ddb1f..8720525ae1ebc9728d1c904de44b25ae66f5de5f 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java @@ -283,14 +283,28 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider { RemoteTaskRunnerWorkItem runningTask = runningTasks.get(task.getId()); if (runningTask != null) { - log.info("Assigned a task[%s] that is already running, not doing anything", task.getId()); - return runningTask.getResult(); + ZkWorker zkWorker = findWorkerRunningTask(task.getId()); + if (zkWorker == null) { + log.makeAlert("Told to run task that is in the running queue but no worker is actually running it?!") + .addData("taskId", task.getId()) + .emit(); + runningTasks.remove(task.getId()); + } else { + log.info("Task[%s] already running on %s.", task.getId(), zkWorker.getWorker().getHost()); + TaskStatus status = zkWorker.getRunningTasks().get(task.getId()); + if (status.isComplete()) { + taskComplete(runningTask, zkWorker, task.getId(), status); + } + return runningTask.getResult(); + } } + RemoteTaskRunnerWorkItem pendingTask = pendingTasks.get(task.getId()); if (pendingTask != null) { log.info("Assigned a task[%s] that is already pending, not doing anything", task.getId()); return pendingTask.getResult(); } + RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem( task, SettableFuture.create() @@ -473,8 +487,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider } catch (Exception e) { log.makeAlert("Exception while trying to run task") - .addData("taskId", taskRunnerWorkItem.getTask().getId()) - .emit(); + .addData("taskId", taskRunnerWorkItem.getTask().getId()) + .emit(); } } @@ -590,16 +604,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider } if (taskStatus.isComplete()) { - if (taskRunnerWorkItem != null) { - final ListenableFuture result = taskRunnerWorkItem.getResult(); - if (result != null) { - ((SettableFuture) result).set(taskStatus); - } - } - - // Worker is done with this task - zkWorker.setLastCompletedTaskTime(new DateTime()); - cleanup(zkWorker.getWorker().getHost(), taskId); + taskComplete(taskRunnerWorkItem, zkWorker, taskId, taskStatus); runPendingTasks(); } break; @@ -710,4 +715,23 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values()); return null; } + + private void taskComplete( + RemoteTaskRunnerWorkItem taskRunnerWorkItem, + ZkWorker zkWorker, + String taskId, + TaskStatus taskStatus + ) + { + if (taskRunnerWorkItem != null) { + final ListenableFuture result = taskRunnerWorkItem.getResult(); + if (result != null) { + ((SettableFuture) result).set(taskStatus); + } + } + + // Worker is done with this task + zkWorker.setLastCompletedTaskTime(new DateTime()); + cleanup(zkWorker.getWorker().getHost(), taskId); + } } \ No newline at end of file diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ZkWorker.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ZkWorker.java index c322548ffea727bfeaac7c5e8ec2ec2521b15971..eb23c429a8ae1ee11fda33f86e606e738bc8014b 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ZkWorker.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ZkWorker.java @@ -39,6 +39,7 @@ import java.io.IOException; import java.util.Collection; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; /** * Holds information about a worker and a listener for task status changes associated with the worker. @@ -49,7 +50,7 @@ public class ZkWorker implements Closeable private final PathChildrenCache statusCache; private final Function cacheConverter; - private volatile DateTime lastCompletedTaskTime = new DateTime(); + private AtomicReference lastCompletedTaskTime = new AtomicReference(new DateTime()); public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper) { @@ -128,7 +129,7 @@ public class ZkWorker implements Closeable @JsonProperty public DateTime getLastCompletedTaskTime() { - return lastCompletedTaskTime; + return lastCompletedTaskTime.get(); } public boolean isRunningTask(String taskId) @@ -154,7 +155,7 @@ public class ZkWorker implements Closeable public void setLastCompletedTaskTime(DateTime completedTaskTime) { - lastCompletedTaskTime = completedTaskTime; + lastCompletedTaskTime.getAndSet(completedTaskTime); } @Override diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java index 6c6da7ccb89ebcf5412643ecf8a43cc48bcbe3bd..492251dfc6f259d3fa30a0992b62a2f00d3ea7a3 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java @@ -258,6 +258,25 @@ public class RemoteTaskRunnerTest Assert.assertFalse(runningTasks.contains("first")); } + @Test + public void testRunWithTaskComplete() throws Exception + { + cf.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.EPHEMERAL) + .forPath(joiner.join(statusPath, task.getId()), jsonMapper.writeValueAsBytes(TaskStatus.success(task.getId()))); + + doSetup(); + + remoteTaskRunner.bootstrap(Arrays.asList(task)); + + ListenableFuture future = remoteTaskRunner.run(task); + + TaskStatus status = future.get(); + + Assert.assertEquals(TaskStatus.Status.SUCCESS, status.getStatusCode()); + } + private void doSetup() throws Exception { makeWorker(); diff --git a/server/src/main/java/com/metamx/druid/master/CostBalancerStrategy.java b/server/src/main/java/com/metamx/druid/master/CostBalancerStrategy.java index 47f7fb19436ab44d0462970bd174d9140f85b72c..0da949b9a7c15b4352839fb98b9e2162c88dd416 100644 --- a/server/src/main/java/com/metamx/druid/master/CostBalancerStrategy.java +++ b/server/src/main/java/com/metamx/druid/master/CostBalancerStrategy.java @@ -47,9 +47,8 @@ public class CostBalancerStrategy implements BalancerStrategy DataSegment proposalSegment, List serverHolders ) { - ServerHolder holder= chooseBestServer(proposalSegment, serverHolders, false).rhs; - if (holder!=null && !holder.isServingSegment(proposalSegment)) - { + ServerHolder holder = chooseBestServer(proposalSegment, serverHolders, false).rhs; + if (holder != null && !holder.isServingSegment(proposalSegment)) { return holder; } return null; @@ -65,7 +64,6 @@ public class CostBalancerStrategy implements BalancerStrategy } - /** * For assignment, we want to move to the lowest cost server that isn't already serving the segment. * @@ -83,25 +81,10 @@ public class CostBalancerStrategy implements BalancerStrategy { Pair bestServer = Pair.of(Double.POSITIVE_INFINITY, null); - MinMaxPriorityQueue> costsAndServers = MinMaxPriorityQueue.orderedBy( - new Comparator>() - { - @Override - public int compare( - Pair o, - Pair o1 - ) - { - return Double.compare(o.lhs, o1.lhs); - } - } - ).create(); - final long proposalSegmentSize = proposalSegment.getSize(); for (ServerHolder server : serverHolders) { - if (includeCurrentServer || !server.isServingSegment(proposalSegment)) - { + if (includeCurrentServer || !server.isServingSegment(proposalSegment)) { /** Don't calculate cost if the server doesn't have enough space or is loading the segment */ if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) { continue; @@ -159,10 +142,10 @@ public class CostBalancerStrategy implements BalancerStrategy referenceTimestamp.getMillis() - segment1.getInterval().getEndMillis(), referenceTimestamp.getMillis() - segment2.getInterval().getEndMillis() ); - double segment1diff=referenceTimestamp.getMillis()-segment1.getInterval().getEndMillis(); - double segment2diff=referenceTimestamp.getMillis()-segment2.getInterval().getEndMillis(); - if (segment1diff < SEVEN_DAYS_IN_MILLIS && segment2diff