提交 c4b4f890 编写于 作者: C cheddar

Merge pull request #226 from metamx/bug-fix

1) bug fix for RemoteTaskRunner removing workers race condition
2) partition chunks not being sorted by chunk number
......@@ -97,12 +97,7 @@ public class IntegerPartitionChunk<T> implements PartitionChunk<T>
if (chunk instanceof IntegerPartitionChunk) {
IntegerPartitionChunk<T> intChunk = (IntegerPartitionChunk<T>) chunk;
int retVal = comparator.compare(start, intChunk.start);
if (retVal == 0) {
retVal = comparator.compare(end, intChunk.end);
}
return retVal;
return comparator.compare(chunkNumber, intChunk.chunkNumber);
}
throw new IllegalArgumentException("Cannot compare against something that is not a StringPartitionChunk.");
}
......
package com.metamx.druid.partition;
import com.google.common.collect.Ordering;
import java.util.Comparator;
public class LinearPartitionChunk <T> implements PartitionChunk<T>
{
Comparator<Integer> comparator = Ordering.<Integer>natural().nullsFirst();
private final int chunkNumber;
private final T object;
......@@ -56,7 +62,7 @@ public class LinearPartitionChunk <T> implements PartitionChunk<T>
if (chunk instanceof LinearPartitionChunk) {
LinearPartitionChunk<T> linearChunk = (LinearPartitionChunk<T>) chunk;
return chunkNumber - chunk.getChunkNumber();
return comparator.compare(chunkNumber, linearChunk.chunkNumber);
}
throw new IllegalArgumentException("Cannot compare against something that is not a LinearPartitionChunk.");
}
......
......@@ -27,7 +27,7 @@ import java.util.Comparator;
*/
public class StringPartitionChunk<T> implements PartitionChunk<T>
{
private static final Comparator<String> comparator = Ordering.<String>natural().nullsFirst();
private static final Comparator<Integer> comparator = Ordering.<Integer>natural().nullsFirst();
private final String start;
private final String end;
......@@ -95,12 +95,7 @@ public class StringPartitionChunk<T> implements PartitionChunk<T>
if (chunk instanceof StringPartitionChunk) {
StringPartitionChunk<T> stringChunk = (StringPartitionChunk<T>) chunk;
int retVal = comparator.compare(start, stringChunk.start);
if (retVal == 0) {
retVal = comparator.compare(end, stringChunk.end);
}
return retVal;
return comparator.compare(chunkNumber, stringChunk.chunkNumber);
}
throw new IllegalArgumentException("Cannot compare against something that is not a StringPartitionChunk.");
}
......
......@@ -62,13 +62,13 @@ public class IntegerPartitionChunkTest
public void testCompareTo() throws Exception
{
Assert.assertEquals(0, make(null, null, 0, 1).compareTo(make(null, null, 0, 1)));
Assert.assertEquals(0, make(10, null, 0, 1).compareTo(make(10, null, 1, 2)));
Assert.assertEquals(0, make(null, 10, 0, 1).compareTo(make(null, 10, 1, 2)));
Assert.assertEquals(0, make(10, 11, 0, 1).compareTo(make(10, 11, 1, 2)));
Assert.assertEquals(0, make(10, null, 0, 1).compareTo(make(10, null, 0, 2)));
Assert.assertEquals(0, make(null, 10, 0, 1).compareTo(make(null, 10, 0, 2)));
Assert.assertEquals(0, make(10, 11, 0, 1).compareTo(make(10, 11, 0, 2)));
Assert.assertEquals(-1, make(null, 10, 0, 1).compareTo(make(10, null, 1, 2)));
Assert.assertEquals(-1, make(11, 20, 0, 1).compareTo(make(20, 33, 0, 1)));
Assert.assertEquals(1, make(20, 33, 0, 1).compareTo(make(11, 20, 0, 1)));
Assert.assertEquals(1, make(10, null, 0, 1).compareTo(make(null, 10, 0, 1)));
Assert.assertEquals(-1, make(11, 20, 0, 1).compareTo(make(20, 33, 1, 1)));
Assert.assertEquals(1, make(20, 33, 1, 1).compareTo(make(11, 20, 0, 1)));
Assert.assertEquals(1, make(10, null, 1, 1).compareTo(make(null, 10, 0, 1)));
}
@Test
......
......@@ -61,14 +61,14 @@ public class StringPartitionChunkTest
@Test
public void testCompareTo() throws Exception
{
Assert.assertEquals(0, make(null, null, 0, 1).compareTo(make(null, null, 1, 2)));
Assert.assertEquals(0, make("10", null, 0, 1).compareTo(make("10", null, 1, 2)));
Assert.assertEquals(0, make(null, "10", 0, 1).compareTo(make(null, "10", 1, 2)));
Assert.assertEquals(0, make("10", "11", 0, 1).compareTo(make("10", "11", 1, 2)));
Assert.assertEquals(0, make(null, null, 0, 1).compareTo(make(null, null, 0, 2)));
Assert.assertEquals(0, make("10", null, 0, 1).compareTo(make("10", null, 0, 2)));
Assert.assertEquals(0, make(null, "10", 1, 1).compareTo(make(null, "10", 1, 2)));
Assert.assertEquals(0, make("10", "11", 1, 1).compareTo(make("10", "11", 1, 2)));
Assert.assertEquals(-1, make(null, "10", 0, 1).compareTo(make("10", null, 1, 2)));
Assert.assertEquals(-1, make("11", "20", 0, 1).compareTo(make("20", "33", 0, 1)));
Assert.assertEquals(1, make("20", "33", 0, 1).compareTo(make("11", "20", 0, 1)));
Assert.assertEquals(1, make("10", null, 0, 1).compareTo(make(null, "10", 0, 1)));
Assert.assertEquals(-1, make("11", "20", 0, 1).compareTo(make("20", "33", 1, 1)));
Assert.assertEquals(1, make("20", "33", 1, 1).compareTo(make("11", "20", 0, 1)));
Assert.assertEquals(1, make("10", null, 1, 1).compareTo(make(null, "10", 0, 1)));
}
@Test
......
......@@ -521,7 +521,15 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
);
}
runningTasks.put(task.getId(), pendingTasks.remove(task.getId()));
RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(task.getId());
if (workItem == null) {
log.makeAlert("WTF?! Got a null work item from pending tasks?! How can this be?!")
.addData("taskId", task.getId())
.emit();
return;
}
runningTasks.put(task.getId(), workItem.withWorker(theWorker));
log.info("Task %s switched from pending to running", task.getId());
// Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running
......@@ -613,8 +621,10 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
taskRunnerWorkItem = runningTasks.get(taskId);
if (taskRunnerWorkItem != null) {
log.info("Task %s just disappeared!", taskId);
log.info("Task[%s] just disappeared!", taskId);
taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId()));
} else {
log.warn("Task[%s] just disappeared but I didn't know about it?!", taskId);
}
break;
}
......@@ -653,13 +663,21 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
{
log.info("Kaboom! Worker[%s] removed!", worker.getHost());
ZkWorker zkWorker = zkWorkers.get(worker.getHost());
final ZkWorker zkWorker = zkWorkers.get(worker.getHost());
if (zkWorker != null) {
try {
List<String> tasksToFail = Lists.newArrayList(
cf.getChildren().forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost()))
);
tasksToFail.addAll(zkWorker.getRunningTaskIds());
log.info("[%s]: Found %d tasks assigned", worker.getHost(), tasksToFail.size());
for (Map.Entry<String, RemoteTaskRunnerWorkItem> entry : runningTasks.entrySet()) {
if (entry.getValue().getWorker().getHost().equalsIgnoreCase(worker.getHost())) {
log.info("[%s]: Found [%s] running", worker.getHost(), entry.getKey());
tasksToFail.add(entry.getKey());
}
}
for (String assignedTask : tasksToFail) {
RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask);
if (taskRunnerWorkItem != null) {
......@@ -668,10 +686,10 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
cf.delete().guaranteed().forPath(taskPath);
}
log.info("Failing task %s", assignedTask);
log.info("Failing task[%s]", assignedTask);
taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId()));
} else {
log.warn("RemoteTaskRunner has no knowledge of task %s", assignedTask);
log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask);
}
}
}
......@@ -683,7 +701,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
zkWorker.close();
}
catch (Exception e) {
log.error(e, "Exception closing worker %s!", worker.getHost());
log.error(e, "Exception closing worker[%s]!", worker.getHost());
}
zkWorkers.remove(worker.getHost());
}
......
......@@ -22,6 +22,7 @@ package com.metamx.druid.indexing.coordinator;
import com.google.common.util.concurrent.SettableFuture;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.worker.Worker;
import org.joda.time.DateTime;
/**
......@@ -29,6 +30,7 @@ import org.joda.time.DateTime;
public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
{
private final SettableFuture<TaskStatus> result;
private final Worker worker;
public RemoteTaskRunnerWorkItem(
Task task,
......@@ -37,17 +39,25 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
{
super(task, result);
this.result = result;
this.worker = null;
}
public RemoteTaskRunnerWorkItem(
Task task,
SettableFuture<TaskStatus> result,
DateTime createdTime,
DateTime queueInsertionTime
DateTime queueInsertionTime,
Worker worker
)
{
super(task, result, createdTime, queueInsertionTime);
this.result = result;
this.worker = worker;
}
public Worker getWorker()
{
return worker;
}
public void setResult(TaskStatus status)
......@@ -58,6 +68,11 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
@Override
public RemoteTaskRunnerWorkItem withQueueInsertionTime(DateTime time)
{
return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), time);
return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), time, worker);
}
public RemoteTaskRunnerWorkItem withWorker(Worker worker)
{
return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), getQueueInsertionTime(), worker);
}
}
......@@ -50,10 +50,13 @@ import java.io.File;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* Several of the tests here are integration tests rather than unit tests. We will introduce real unit tests for this
* class as well as integration tests in the very near future.
*/
public class RemoteTaskRunnerTest
{
......@@ -277,6 +280,29 @@ public class RemoteTaskRunnerTest
Assert.assertEquals(TaskStatus.Status.SUCCESS, status.getStatusCode());
}
@Test
public void testWorkerRemoved() throws Exception
{
doSetup();
remoteTaskRunner.bootstrap(Lists.<Task>newArrayList());
Future<TaskStatus> future = remoteTaskRunner.run(makeTask(TaskStatus.running("task")));
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
while (cf.checkExists().forPath(joiner.join(statusPath, "task")) == null) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
throw new ISE("Cannot find running task");
}
}
workerCuratorCoordinator.stop();
TaskStatus status = future.get();
Assert.assertEquals(TaskStatus.Status.FAILED, status.getStatusCode());
}
private void doSetup() throws Exception
{
makeWorker();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册