提交 54ad8544 编写于 作者: F fjy

Merge branch 'master' into rt-ut

......@@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.42-SNAPSHOT</version>
<version>0.5.45-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.42-SNAPSHOT</version>
<version>0.5.45-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -97,12 +97,7 @@ public class IntegerPartitionChunk<T> implements PartitionChunk<T>
if (chunk instanceof IntegerPartitionChunk) {
IntegerPartitionChunk<T> intChunk = (IntegerPartitionChunk<T>) chunk;
int retVal = comparator.compare(start, intChunk.start);
if (retVal == 0) {
retVal = comparator.compare(end, intChunk.end);
}
return retVal;
return comparator.compare(chunkNumber, intChunk.chunkNumber);
}
throw new IllegalArgumentException("Cannot compare against something that is not a StringPartitionChunk.");
}
......
package com.metamx.druid.partition;
import com.google.common.collect.Ordering;
import java.util.Comparator;
public class LinearPartitionChunk <T> implements PartitionChunk<T>
{
Comparator<Integer> comparator = Ordering.<Integer>natural().nullsFirst();
private final int chunkNumber;
private final T object;
......@@ -56,7 +62,7 @@ public class LinearPartitionChunk <T> implements PartitionChunk<T>
if (chunk instanceof LinearPartitionChunk) {
LinearPartitionChunk<T> linearChunk = (LinearPartitionChunk<T>) chunk;
return chunkNumber - chunk.getChunkNumber();
return comparator.compare(chunkNumber, linearChunk.chunkNumber);
}
throw new IllegalArgumentException("Cannot compare against something that is not a LinearPartitionChunk.");
}
......
......@@ -27,7 +27,7 @@ import java.util.Comparator;
*/
public class StringPartitionChunk<T> implements PartitionChunk<T>
{
private static final Comparator<String> comparator = Ordering.<String>natural().nullsFirst();
private static final Comparator<Integer> comparator = Ordering.<Integer>natural().nullsFirst();
private final String start;
private final String end;
......@@ -95,12 +95,7 @@ public class StringPartitionChunk<T> implements PartitionChunk<T>
if (chunk instanceof StringPartitionChunk) {
StringPartitionChunk<T> stringChunk = (StringPartitionChunk<T>) chunk;
int retVal = comparator.compare(start, stringChunk.start);
if (retVal == 0) {
retVal = comparator.compare(end, stringChunk.end);
}
return retVal;
return comparator.compare(chunkNumber, stringChunk.chunkNumber);
}
throw new IllegalArgumentException("Cannot compare against something that is not a StringPartitionChunk.");
}
......
......@@ -62,13 +62,13 @@ public class IntegerPartitionChunkTest
public void testCompareTo() throws Exception
{
Assert.assertEquals(0, make(null, null, 0, 1).compareTo(make(null, null, 0, 1)));
Assert.assertEquals(0, make(10, null, 0, 1).compareTo(make(10, null, 1, 2)));
Assert.assertEquals(0, make(null, 10, 0, 1).compareTo(make(null, 10, 1, 2)));
Assert.assertEquals(0, make(10, 11, 0, 1).compareTo(make(10, 11, 1, 2)));
Assert.assertEquals(0, make(10, null, 0, 1).compareTo(make(10, null, 0, 2)));
Assert.assertEquals(0, make(null, 10, 0, 1).compareTo(make(null, 10, 0, 2)));
Assert.assertEquals(0, make(10, 11, 0, 1).compareTo(make(10, 11, 0, 2)));
Assert.assertEquals(-1, make(null, 10, 0, 1).compareTo(make(10, null, 1, 2)));
Assert.assertEquals(-1, make(11, 20, 0, 1).compareTo(make(20, 33, 0, 1)));
Assert.assertEquals(1, make(20, 33, 0, 1).compareTo(make(11, 20, 0, 1)));
Assert.assertEquals(1, make(10, null, 0, 1).compareTo(make(null, 10, 0, 1)));
Assert.assertEquals(-1, make(11, 20, 0, 1).compareTo(make(20, 33, 1, 1)));
Assert.assertEquals(1, make(20, 33, 1, 1).compareTo(make(11, 20, 0, 1)));
Assert.assertEquals(1, make(10, null, 1, 1).compareTo(make(null, 10, 0, 1)));
}
@Test
......
......@@ -61,14 +61,14 @@ public class StringPartitionChunkTest
@Test
public void testCompareTo() throws Exception
{
Assert.assertEquals(0, make(null, null, 0, 1).compareTo(make(null, null, 1, 2)));
Assert.assertEquals(0, make("10", null, 0, 1).compareTo(make("10", null, 1, 2)));
Assert.assertEquals(0, make(null, "10", 0, 1).compareTo(make(null, "10", 1, 2)));
Assert.assertEquals(0, make("10", "11", 0, 1).compareTo(make("10", "11", 1, 2)));
Assert.assertEquals(0, make(null, null, 0, 1).compareTo(make(null, null, 0, 2)));
Assert.assertEquals(0, make("10", null, 0, 1).compareTo(make("10", null, 0, 2)));
Assert.assertEquals(0, make(null, "10", 1, 1).compareTo(make(null, "10", 1, 2)));
Assert.assertEquals(0, make("10", "11", 1, 1).compareTo(make("10", "11", 1, 2)));
Assert.assertEquals(-1, make(null, "10", 0, 1).compareTo(make("10", null, 1, 2)));
Assert.assertEquals(-1, make("11", "20", 0, 1).compareTo(make("20", "33", 0, 1)));
Assert.assertEquals(1, make("20", "33", 0, 1).compareTo(make("11", "20", 0, 1)));
Assert.assertEquals(1, make("10", null, 0, 1).compareTo(make(null, "10", 0, 1)));
Assert.assertEquals(-1, make("11", "20", 0, 1).compareTo(make("20", "33", 1, 1)));
Assert.assertEquals(1, make("20", "33", 1, 1).compareTo(make("11", "20", 0, 1)));
Assert.assertEquals(1, make("10", null, 1, 1).compareTo(make(null, "10", 0, 1)));
}
@Test
......
......@@ -9,7 +9,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.42-SNAPSHOT</version>
<version>0.5.45-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.42-SNAPSHOT</version>
<version>0.5.45-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.42-SNAPSHOT</version>
<version>0.5.45-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.42-SNAPSHOT</version>
<version>0.5.45-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -27,8 +27,8 @@ import com.google.common.base.Preconditions;
import com.metamx.druid.indexing.common.task.TaskResource;
/**
* Represents the status of a task. The task may be ongoing ({@link #isComplete()} false) or it may be
* complete ({@link #isComplete()} true).
* Represents the status of a task from the perspective of the coordinator. The task may be ongoing
* ({@link #isComplete()} false) or it may be complete ({@link #isComplete()} true).
* <p/>
* TaskStatus objects are immutable.
*/
......@@ -43,36 +43,38 @@ public class TaskStatus
public static TaskStatus running(String taskId)
{
return new TaskStatus(taskId, Status.RUNNING, -1, null);
return new TaskStatus(taskId, Status.RUNNING, -1);
}
public static TaskStatus success(String taskId)
{
return new TaskStatus(taskId, Status.SUCCESS, -1, null);
return new TaskStatus(taskId, Status.SUCCESS, -1);
}
public static TaskStatus failure(String taskId)
{
return new TaskStatus(taskId, Status.FAILED, -1, null);
return new TaskStatus(taskId, Status.FAILED, -1);
}
public static TaskStatus fromCode(String taskId, Status code)
{
return new TaskStatus(taskId, code, -1);
}
private final String id;
private final Status status;
private final long duration;
private final TaskResource resource;
@JsonCreator
private TaskStatus(
@JsonProperty("id") String id,
@JsonProperty("status") Status status,
@JsonProperty("duration") long duration,
@JsonProperty("resource") TaskResource resource
@JsonProperty("duration") long duration
)
{
this.id = id;
this.status = status;
this.duration = duration;
this.resource = resource == null ? new TaskResource(id, 1) : resource;
// Check class invariants.
Preconditions.checkNotNull(id, "id");
......@@ -97,12 +99,6 @@ public class TaskStatus
return duration;
}
@JsonProperty("resource")
public TaskResource getResource()
{
return resource;
}
/**
* Signals that a task is not yet complete, and is still runnable on a worker. Exactly one of isRunnable,
* isSuccess, or isFailure will be true at any one time.
......@@ -144,7 +140,7 @@ public class TaskStatus
public TaskStatus withDuration(long _duration)
{
return new TaskStatus(id, status, _duration, resource);
return new TaskStatus(id, status, _duration);
}
@Override
......@@ -154,7 +150,6 @@ public class TaskStatus
.add("id", id)
.add("status", status)
.add("duration", duration)
.add("resource", resource)
.toString();
}
}
......@@ -58,18 +58,14 @@ public abstract class AbstractTask implements Task
protected AbstractTask(String id, String groupId, String dataSource, Interval interval)
{
this.id = Preconditions.checkNotNull(id, "id");
this.groupId = Preconditions.checkNotNull(groupId, "groupId");
this.taskResource = new TaskResource(id, 1);
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.interval = Optional.fromNullable(interval);
this(id, groupId, new TaskResource(id, 1), dataSource, interval);
}
protected AbstractTask(String id, String groupId, TaskResource taskResource, String dataSource, Interval interval)
{
this.id = Preconditions.checkNotNull(id, "id");
this.groupId = Preconditions.checkNotNull(groupId, "groupId");
this.taskResource = Preconditions.checkNotNull(taskResource, "taskResource");
this.taskResource = Preconditions.checkNotNull(taskResource, "resource");
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.interval = Optional.fromNullable(interval);
}
......@@ -88,7 +84,7 @@ public abstract class AbstractTask implements Task
return groupId;
}
@JsonProperty
@JsonProperty("resource")
@Override
public TaskResource getTaskResource()
{
......
......@@ -26,12 +26,10 @@ import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.Closer;
import com.google.common.io.Files;
import com.google.common.io.InputSupplier;
......@@ -49,7 +47,6 @@ import com.metamx.druid.indexing.worker.executor.ExecutorMain;
import com.metamx.emitter.EmittingLogger;
import org.apache.commons.io.FileUtils;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
......@@ -63,7 +60,6 @@ import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Runs tasks in separate processes using {@link ExecutorMain}.
......@@ -79,7 +75,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
private final ListeningExecutorService exec;
private final ObjectMapper jsonMapper;
private final Map<String, TaskInfo> tasks = Maps.newHashMap();
private final Map<String, ForkingTaskRunnerWorkItem> tasks = Maps.newHashMap();
public ForkingTaskRunner(
ForkingTaskRunnerConfig config,
......@@ -109,7 +105,8 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
if (!tasks.containsKey(task.getId())) {
tasks.put(
task.getId(),
new TaskInfo(
new ForkingTaskRunnerWorkItem(
task,
exec.submit(
new Callable<TaskStatus>()
{
......@@ -135,17 +132,17 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
// time to adjust process holders
synchronized (tasks) {
final TaskInfo taskInfo = tasks.get(task.getId());
final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(task.getId());
if (taskInfo.shutdown) {
if (taskWorkItem.shutdown) {
throw new IllegalStateException("Task has been shut down!");
}
if (taskInfo == null) {
if (taskWorkItem == null) {
throw new ISE("WTF?! TaskInfo disappeared for task: %s", task.getId());
}
if (taskInfo.processHolder != null) {
if (taskWorkItem.processHolder != null) {
throw new ISE("WTF?! TaskInfo already has a process holder for task: %s", task.getId());
}
......@@ -206,13 +203,13 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
jsonMapper.writeValue(taskFile, task);
log.info("Running command: %s", Joiner.on(" ").join(command));
taskInfo.processHolder = new ProcessHolder(
taskWorkItem.processHolder = new ProcessHolder(
new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
logFile,
childPort
);
processHolder = taskInfo.processHolder;
processHolder = taskWorkItem.processHolder;
processHolder.registerWithCloser(closer);
}
......@@ -261,9 +258,9 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
finally {
try {
synchronized (tasks) {
final TaskInfo taskInfo = tasks.remove(task.getId());
if (taskInfo != null && taskInfo.processHolder != null) {
taskInfo.processHolder.process.destroy();
final ForkingTaskRunnerWorkItem taskWorkItem = tasks.remove(task.getId());
if (taskWorkItem != null && taskWorkItem.processHolder != null) {
taskWorkItem.processHolder.process.destroy();
}
}
......@@ -281,7 +278,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
);
}
return tasks.get(task.getId()).statusFuture;
return tasks.get(task.getId()).getResult();
}
}
......@@ -291,10 +288,10 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
synchronized (tasks) {
exec.shutdown();
for (TaskInfo taskInfo : tasks.values()) {
if (taskInfo.processHolder != null) {
log.info("Destroying process: %s", taskInfo.processHolder.process);
taskInfo.processHolder.process.destroy();
for (ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
if (taskWorkItem.processHolder != null) {
log.info("Destroying process: %s", taskWorkItem.processHolder.process);
taskWorkItem.processHolder.process.destroy();
}
}
}
......@@ -303,7 +300,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
@Override
public void shutdown(final String taskid)
{
final TaskInfo taskInfo;
final ForkingTaskRunnerWorkItem taskInfo;
synchronized (tasks) {
taskInfo = tasks.get(taskid);
......@@ -326,13 +323,29 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
@Override
public Collection<TaskRunnerWorkItem> getRunningTasks()
{
return ImmutableList.of();
synchronized (tasks) {
final List<TaskRunnerWorkItem> ret = Lists.newArrayList();
for (final ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
if (taskWorkItem.processHolder != null) {
ret.add(taskWorkItem);
}
}
return ret;
}
}
@Override
public Collection<TaskRunnerWorkItem> getPendingTasks()
{
return ImmutableList.of();
synchronized (tasks) {
final List<TaskRunnerWorkItem> ret = Lists.newArrayList();
for (final ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
if (taskWorkItem.processHolder == null) {
ret.add(taskWorkItem);
}
}
return ret;
}
}
@Override
......@@ -347,9 +360,9 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
final ProcessHolder processHolder;
synchronized (tasks) {
final TaskInfo taskInfo = tasks.get(taskid);
if (taskInfo != null && taskInfo.processHolder != null) {
processHolder = taskInfo.processHolder;
final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(taskid);
if (taskWorkItem != null && taskWorkItem.processHolder != null) {
processHolder = taskWorkItem.processHolder;
} else {
return Optional.absent();
}
......@@ -380,13 +393,13 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
int port = config.getStartPort();
int maxPortSoFar = -1;
for (TaskInfo taskInfo : tasks.values()) {
if (taskInfo.processHolder != null) {
if (taskInfo.processHolder.port > maxPortSoFar) {
maxPortSoFar = taskInfo.processHolder.port;
for (ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
if (taskWorkItem.processHolder != null) {
if (taskWorkItem.processHolder.port > maxPortSoFar) {
maxPortSoFar = taskWorkItem.processHolder.port;
}
if (taskInfo.processHolder.port == port) {
if (taskWorkItem.processHolder.port == port) {
port = maxPortSoFar + 1;
}
}
......@@ -396,15 +409,17 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
}
}
private static class TaskInfo
private static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem
{
private final ListenableFuture<TaskStatus> statusFuture;
private volatile boolean shutdown = false;
private volatile ProcessHolder processHolder = null;
private TaskInfo(ListenableFuture<TaskStatus> statusFuture)
private ForkingTaskRunnerWorkItem(
Task task,
ListenableFuture<TaskStatus> statusFuture
)
{
this.statusFuture = statusFuture;
super(task, statusFuture);
}
}
......
......@@ -41,6 +41,7 @@ import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.common.tasklogs.TaskLogProvider;
import com.metamx.druid.indexing.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
import com.metamx.druid.indexing.worker.TaskAnnouncement;
import com.metamx.druid.indexing.worker.Worker;
import com.metamx.emitter.EmittingLogger;
import com.metamx.http.client.HttpClient;
......@@ -291,9 +292,9 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
runningTasks.remove(task.getId());
} else {
log.info("Task[%s] already running on %s.", task.getId(), zkWorker.getWorker().getHost());
TaskStatus status = zkWorker.getRunningTasks().get(task.getId());
if (status.isComplete()) {
taskComplete(runningTask, zkWorker, task.getId(), status);
TaskAnnouncement announcement = zkWorker.getRunningTasks().get(task.getId());
if (announcement.getTaskStatus().isComplete()) {
taskComplete(runningTask, zkWorker, task.getId(), announcement.getTaskStatus());
}
return runningTask.getResult();
}
......@@ -520,7 +521,15 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
);
}
runningTasks.put(task.getId(), pendingTasks.remove(task.getId()));
RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(task.getId());
if (workItem == null) {
log.makeAlert("WTF?! Got a null work item from pending tasks?! How can this be?!")
.addData("taskId", task.getId())
.emit();
return;
}
runningTasks.put(task.getId(), workItem.withWorker(theWorker));
log.info("Task %s switched from pending to running", task.getId());
// Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running
......@@ -612,8 +621,10 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
taskRunnerWorkItem = runningTasks.get(taskId);
if (taskRunnerWorkItem != null) {
log.info("Task %s just disappeared!", taskId);
log.info("Task[%s] just disappeared!", taskId);
taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId()));
} else {
log.warn("Task[%s] just disappeared but I didn't know about it?!", taskId);
}
break;
}
......@@ -652,20 +663,33 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
{
log.info("Kaboom! Worker[%s] removed!", worker.getHost());
ZkWorker zkWorker = zkWorkers.get(worker.getHost());
final ZkWorker zkWorker = zkWorkers.get(worker.getHost());
if (zkWorker != null) {
try {
for (String assignedTask : cf.getChildren()
.forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost()))) {
RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(assignedTask);
List<String> tasksToFail = Lists.newArrayList(
cf.getChildren().forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost()))
);
log.info("[%s]: Found %d tasks assigned", worker.getHost(), tasksToFail.size());
for (Map.Entry<String, RemoteTaskRunnerWorkItem> entry : runningTasks.entrySet()) {
if (entry.getValue().getWorker().getHost().equalsIgnoreCase(worker.getHost())) {
log.info("[%s]: Found [%s] running", worker.getHost(), entry.getKey());
tasksToFail.add(entry.getKey());
}
}
for (String assignedTask : tasksToFail) {
RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.remove(assignedTask);
if (taskRunnerWorkItem != null) {
String taskPath = JOINER.join(config.getIndexerTaskPath(), worker.getHost(), assignedTask);
if (cf.checkExists().forPath(taskPath) != null) {
cf.delete().guaranteed().forPath(taskPath);
}
log.info("Failing task[%s]", assignedTask);
taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId()));
} else {
log.warn("RemoteTaskRunner has no knowledge of task %s", assignedTask);
log.warn("RemoteTaskRunner has no knowledge of task[%s]", assignedTask);
}
}
}
......@@ -677,7 +701,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
zkWorker.close();
}
catch (Exception e) {
log.error(e, "Exception closing worker %s!", worker.getHost());
log.error(e, "Exception closing worker[%s]!", worker.getHost());
}
zkWorkers.remove(worker.getHost());
}
......
......@@ -22,6 +22,7 @@ package com.metamx.druid.indexing.coordinator;
import com.google.common.util.concurrent.SettableFuture;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.worker.Worker;
import org.joda.time.DateTime;
/**
......@@ -29,6 +30,7 @@ import org.joda.time.DateTime;
public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
{
private final SettableFuture<TaskStatus> result;
private final Worker worker;
public RemoteTaskRunnerWorkItem(
Task task,
......@@ -37,17 +39,25 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
{
super(task, result);
this.result = result;
this.worker = null;
}
public RemoteTaskRunnerWorkItem(
Task task,
SettableFuture<TaskStatus> result,
DateTime createdTime,
DateTime queueInsertionTime
DateTime queueInsertionTime,
Worker worker
)
{
super(task, result, createdTime, queueInsertionTime);
this.result = result;
this.worker = worker;
}
public Worker getWorker()
{
return worker;
}
public void setResult(TaskStatus status)
......@@ -58,6 +68,11 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
@Override
public RemoteTaskRunnerWorkItem withQueueInsertionTime(DateTime time)
{
return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), time);
return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), time, worker);
}
public RemoteTaskRunnerWorkItem withWorker(Worker worker)
{
return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), getQueueInsertionTime(), worker);
}
}
......@@ -268,7 +268,7 @@ public class TaskMasterLifecycle
public Optional<ResourceManagementScheduler> getResourceManagementScheduler()
{
if (leading) {
return Optional.of(resourceManagementScheduler);
return Optional.fromNullable(resourceManagementScheduler);
} else {
return Optional.absent();
}
......
......@@ -35,8 +35,7 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
private final Task task;
private final ListenableFuture<TaskStatus> result;
private final DateTime createdTime;
private volatile DateTime queueInsertionTime;
private final DateTime queueInsertionTime;
public TaskRunnerWorkItem(
Task task,
......
......@@ -28,6 +28,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.worker.TaskAnnouncement;
import com.metamx.druid.indexing.worker.Worker;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
......@@ -48,7 +49,7 @@ public class ZkWorker implements Closeable
{
private final Worker worker;
private final PathChildrenCache statusCache;
private final Function<ChildData, TaskStatus> cacheConverter;
private final Function<ChildData, TaskAnnouncement> cacheConverter;
private AtomicReference<DateTime> lastCompletedTaskTime = new AtomicReference<DateTime>(new DateTime());
......@@ -56,13 +57,13 @@ public class ZkWorker implements Closeable
{
this.worker = worker;
this.statusCache = statusCache;
this.cacheConverter = new Function<ChildData, TaskStatus>()
this.cacheConverter = new Function<ChildData, TaskAnnouncement>()
{
@Override
public TaskStatus apply(ChildData input)
public TaskAnnouncement apply(ChildData input)
{
try {
return jsonMapper.readValue(input.getData(), TaskStatus.class);
return jsonMapper.readValue(input.getData(), TaskAnnouncement.class);
}
catch (Exception e) {
throw Throwables.propagate(e);
......@@ -93,14 +94,14 @@ public class ZkWorker implements Closeable
return getRunningTasks().keySet();
}
public Map<String, TaskStatus> getRunningTasks()
public Map<String, TaskAnnouncement> getRunningTasks()
{
Map<String, TaskStatus> retVal = Maps.newHashMap();
for (TaskStatus taskStatus : Lists.transform(
Map<String, TaskAnnouncement> retVal = Maps.newHashMap();
for (TaskAnnouncement taskAnnouncement : Lists.transform(
statusCache.getCurrentData(),
cacheConverter
)) {
retVal.put(taskStatus.getId(), taskStatus);
retVal.put(taskAnnouncement.getTaskStatus().getId(), taskAnnouncement);
}
return retVal;
......@@ -110,8 +111,8 @@ public class ZkWorker implements Closeable
public int getCurrCapacityUsed()
{
int currCapacity = 0;
for (TaskStatus taskStatus : getRunningTasks().values()) {
currCapacity += taskStatus.getResource().getRequiredCapacity();
for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) {
currCapacity += taskAnnouncement.getTaskResource().getRequiredCapacity();
}
return currCapacity;
}
......@@ -120,8 +121,8 @@ public class ZkWorker implements Closeable
public Set<String> getAvailabilityGroups()
{
Set<String> retVal = Sets.newHashSet();
for (TaskStatus taskStatus : getRunningTasks().values()) {
retVal.add(taskStatus.getResource().getAvailabilityGroup());
for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) {
retVal.add(taskAnnouncement.getTaskResource().getAvailabilityGroup());
}
return retVal;
}
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexing.worker;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.common.task.TaskResource;
/**
* Used by workers to announce the status of tasks they are currently running. This class is immutable.
*/
public class TaskAnnouncement
{
private final TaskStatus taskStatus;
private final TaskResource taskResource;
public static TaskAnnouncement create(Task task, TaskStatus status)
{
Preconditions.checkArgument(status.getId().equals(task.getId()), "task id == status id");
return new TaskAnnouncement(null, null, status, task.getTaskResource());
}
@JsonCreator
private TaskAnnouncement(
@JsonProperty("id") String taskId,
@JsonProperty("status") TaskStatus.Status status,
@JsonProperty("taskStatus") TaskStatus taskStatus,
@JsonProperty("taskResource") TaskResource taskResource
)
{
if (taskStatus != null) {
this.taskStatus = taskStatus;
} else {
// Can be removed when backwards compat is no longer needed
this.taskStatus = TaskStatus.fromCode(taskId, status);
}
this.taskResource = taskResource == null ? new TaskResource(this.taskStatus.getId(), 1) : taskResource;
}
// Can be removed when backwards compat is no longer needed
@JsonProperty("id")
@Deprecated
public String getTaskId()
{
return taskStatus.getId();
}
// Can be removed when backwards compat is no longer needed
@JsonProperty("status")
@Deprecated
public TaskStatus.Status getStatus()
{
return taskStatus.getStatusCode();
}
@JsonProperty("taskStatus")
public TaskStatus getTaskStatus()
{
return taskStatus;
}
@JsonProperty("taskResource")
public TaskResource getTaskResource()
{
return taskResource;
}
}
......@@ -180,7 +180,7 @@ public class WorkerCuratorCoordinator
}
}
public void announceStatus(TaskStatus status)
public void announceTask(TaskAnnouncement announcement)
{
synchronized (lock) {
if (!started) {
......@@ -188,7 +188,7 @@ public class WorkerCuratorCoordinator
}
try {
byte[] rawBytes = jsonMapper.writeValueAsBytes(status);
byte[] rawBytes = jsonMapper.writeValueAsBytes(announcement);
if (rawBytes.length > config.getMaxNumBytes()) {
throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxNumBytes());
}
......@@ -196,7 +196,7 @@ public class WorkerCuratorCoordinator
curatorFramework.create()
.withMode(CreateMode.EPHEMERAL)
.forPath(
getStatusPathForId(status.getId()), rawBytes
getStatusPathForId(announcement.getTaskStatus().getId()), rawBytes
);
}
catch (Exception e) {
......@@ -205,7 +205,7 @@ public class WorkerCuratorCoordinator
}
}
public void updateStatus(TaskStatus status)
public void updateAnnouncement(TaskAnnouncement announcement)
{
synchronized (lock) {
if (!started) {
......@@ -213,18 +213,18 @@ public class WorkerCuratorCoordinator
}
try {
if (curatorFramework.checkExists().forPath(getStatusPathForId(status.getId())) == null) {
announceStatus(status);
if (curatorFramework.checkExists().forPath(getStatusPathForId(announcement.getTaskStatus().getId())) == null) {
announceTask(announcement);
return;
}
byte[] rawBytes = jsonMapper.writeValueAsBytes(status);
byte[] rawBytes = jsonMapper.writeValueAsBytes(announcement);
if (rawBytes.length > config.getMaxNumBytes()) {
throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxNumBytes());
}
curatorFramework.setData()
.forPath(
getStatusPathForId(status.getId()), rawBytes
getStatusPathForId(announcement.getTaskStatus().getId()), rawBytes
);
}
catch (Exception e) {
......@@ -232,4 +232,4 @@ public class WorkerCuratorCoordinator
}
}
}
}
\ No newline at end of file
}
......@@ -118,7 +118,12 @@ public class WorkerTaskMonitor
TaskStatus taskStatus;
try {
workerCuratorCoordinator.unannounceTask(task.getId());
workerCuratorCoordinator.announceStatus(TaskStatus.running(task.getId()));
workerCuratorCoordinator.announceTask(
TaskAnnouncement.create(
task,
TaskStatus.running(task.getId())
)
);
taskStatus = taskRunner.run(task).get();
}
catch (Exception e) {
......@@ -134,7 +139,7 @@ public class WorkerTaskMonitor
taskStatus = taskStatus.withDuration(System.currentTimeMillis() - startTime);
try {
workerCuratorCoordinator.updateStatus(taskStatus);
workerCuratorCoordinator.updateAnnouncement(TaskAnnouncement.create(task, taskStatus));
log.info(
"Job's finished. Completed [%s] with status [%s]",
task.getId(),
......
......@@ -202,7 +202,7 @@ public class TaskSerdeTest
{
final Task task = new RealtimeIndexTask(
null,
null,
new TaskResource("rofl", 2),
new Schema("foo", null, new AggregatorFactory[0], QueryGranularity.NONE, new NoneShardSpec()),
null,
null,
......@@ -219,6 +219,8 @@ public class TaskSerdeTest
Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(Optional.<Interval>absent(), task.getImplicitLockInterval());
Assert.assertEquals(2, task.getTaskResource().getRequiredCapacity());
Assert.assertEquals("rofl", task.getTaskResource().getAvailabilityGroup());
Assert.assertEquals(new Period("PT10M"), ((RealtimeIndexTask) task).getWindowPeriod());
Assert.assertEquals(IndexGranularity.HOUR, ((RealtimeIndexTask) task).getSegmentGranularity());
......@@ -226,10 +228,12 @@ public class TaskSerdeTest
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getImplicitLockInterval(), task2.getImplicitLockInterval());
Assert.assertEquals(((RealtimeIndexTask) task).getWindowPeriod(), ((RealtimeIndexTask) task).getWindowPeriod());
Assert.assertEquals(task.getTaskResource().getRequiredCapacity(), task2.getTaskResource().getRequiredCapacity());
Assert.assertEquals(task.getTaskResource().getAvailabilityGroup(), task2.getTaskResource().getAvailabilityGroup());
Assert.assertEquals(((RealtimeIndexTask) task).getWindowPeriod(), ((RealtimeIndexTask) task2).getWindowPeriod());
Assert.assertEquals(
((RealtimeIndexTask) task).getSegmentGranularity(),
((RealtimeIndexTask) task).getSegmentGranularity()
((RealtimeIndexTask) task2).getSegmentGranularity()
);
}
......
......@@ -50,10 +50,13 @@ import java.io.File;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* Several of the tests here are integration tests rather than unit tests. We will introduce real unit tests for this
* class as well as integration tests in the very near future.
*/
public class RemoteTaskRunnerTest
{
......@@ -277,6 +280,29 @@ public class RemoteTaskRunnerTest
Assert.assertEquals(TaskStatus.Status.SUCCESS, status.getStatusCode());
}
@Test
public void testWorkerRemoved() throws Exception
{
doSetup();
remoteTaskRunner.bootstrap(Lists.<Task>newArrayList());
Future<TaskStatus> future = remoteTaskRunner.run(makeTask(TaskStatus.running("task")));
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
while (cf.checkExists().forPath(joiner.join(statusPath, "task")) == null) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
throw new ISE("Cannot find running task");
}
}
workerCuratorCoordinator.stop();
TaskStatus status = future.get();
Assert.assertEquals(TaskStatus.Status.FAILED, status.getStatusCode());
}
private void doSetup() throws Exception
{
makeWorker();
......
......@@ -31,6 +31,7 @@ import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.coordinator.RemoteTaskRunnerWorkItem;
import com.metamx.druid.indexing.coordinator.ZkWorker;
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
import com.metamx.druid.indexing.worker.TaskAnnouncement;
import com.metamx.druid.indexing.worker.Worker;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.emitter.EmittingLogger;
......@@ -351,12 +352,12 @@ public class SimpleResourceManagementStrategyTest
}
@Override
public Map<String, TaskStatus> getRunningTasks()
public Map<String, TaskAnnouncement> getRunningTasks()
{
if (testTask == null) {
return Maps.newHashMap();
}
return ImmutableMap.of(testTask.getId(), TaskStatus.running(testTask.getId()));
return ImmutableMap.of(testTask.getId(), TaskAnnouncement.create(testTask, TaskStatus.running(testTask.getId())));
}
}
}
package com.metamx.druid.indexing.worker;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.index.v1.IndexGranularity;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.task.RealtimeIndexTask;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.common.task.TaskResource;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.shard.NoneShardSpec;
import junit.framework.Assert;
import org.joda.time.Period;
import org.junit.Test;
public class TaskAnnouncementTest
{
@Test
public void testBackwardsCompatibleSerde() throws Exception
{
final Task task = new RealtimeIndexTask(
"theid",
new TaskResource("rofl", 2),
new Schema("foo", null, new AggregatorFactory[0], QueryGranularity.NONE, new NoneShardSpec()),
null,
null,
new Period("PT10M"),
IndexGranularity.HOUR,
null
);
final TaskStatus status = TaskStatus.running(task.getId());
final TaskAnnouncement announcement = TaskAnnouncement.create(task, status);
final ObjectMapper jsonMapper = new DefaultObjectMapper();
final String statusJson = jsonMapper.writeValueAsString(status);
final String announcementJson = jsonMapper.writeValueAsString(announcement);
final TaskStatus statusFromStatus = jsonMapper.readValue(statusJson, TaskStatus.class);
final TaskStatus statusFromAnnouncement = jsonMapper.readValue(announcementJson, TaskStatus.class);
final TaskAnnouncement announcementFromStatus = jsonMapper.readValue(statusJson, TaskAnnouncement.class);
final TaskAnnouncement announcementFromAnnouncement = jsonMapper.readValue(
announcementJson,
TaskAnnouncement.class
);
Assert.assertEquals("theid", statusFromStatus.getId());
Assert.assertEquals("theid", statusFromAnnouncement.getId());
Assert.assertEquals("theid", announcementFromStatus.getTaskStatus().getId());
Assert.assertEquals("theid", announcementFromAnnouncement.getTaskStatus().getId());
Assert.assertEquals("theid", announcementFromStatus.getTaskResource().getAvailabilityGroup());
Assert.assertEquals("rofl", announcementFromAnnouncement.getTaskResource().getAvailabilityGroup());
Assert.assertEquals(1, announcementFromStatus.getTaskResource().getRequiredCapacity());
Assert.assertEquals(2, announcementFromAnnouncement.getTaskResource().getRequiredCapacity());
}
}
......@@ -23,7 +23,7 @@
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<packaging>pom</packaging>
<version>0.5.42-SNAPSHOT</version>
<version>0.5.45-SNAPSHOT</version>
<name>druid</name>
<description>druid</description>
<scm>
......
......@@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.42-SNAPSHOT</version>
<version>0.5.45-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -28,7 +28,7 @@
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.42-SNAPSHOT</version>
<version>0.5.45-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -24,11 +24,11 @@
<artifactId>druid-services</artifactId>
<name>druid-services</name>
<description>druid-services</description>
<version>0.5.42-SNAPSHOT</version>
<version>0.5.45-SNAPSHOT</version>
<parent>
<groupId>com.metamx</groupId>
<artifactId>druid</artifactId>
<version>0.5.42-SNAPSHOT</version>
<version>0.5.45-SNAPSHOT</version>
</parent>
<dependencies>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册