提交 96093147 编写于 作者: G Gian Merlino

ForkingTaskRunner: Make TaskInfo into ForkingTaskRunnerWorkItem

This allows the API/GUI to return reasonable results when the primary
task runner is a ForkingTaskRunner.
上级 4e8325f9
......@@ -26,12 +26,10 @@ import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.Closer;
import com.google.common.io.Files;
import com.google.common.io.InputSupplier;
......@@ -49,7 +47,6 @@ import com.metamx.druid.indexing.worker.executor.ExecutorMain;
import com.metamx.emitter.EmittingLogger;
import org.apache.commons.io.FileUtils;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
......@@ -63,7 +60,6 @@ import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Runs tasks in separate processes using {@link ExecutorMain}.
......@@ -79,7 +75,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
private final ListeningExecutorService exec;
private final ObjectMapper jsonMapper;
private final Map<String, TaskInfo> tasks = Maps.newHashMap();
private final Map<String, ForkingTaskRunnerWorkItem> tasks = Maps.newHashMap();
public ForkingTaskRunner(
ForkingTaskRunnerConfig config,
......@@ -109,7 +105,8 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
if (!tasks.containsKey(task.getId())) {
tasks.put(
task.getId(),
new TaskInfo(
new ForkingTaskRunnerWorkItem(
task,
exec.submit(
new Callable<TaskStatus>()
{
......@@ -135,17 +132,17 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
// time to adjust process holders
synchronized (tasks) {
final TaskInfo taskInfo = tasks.get(task.getId());
final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(task.getId());
if (taskInfo.shutdown) {
if (taskWorkItem.shutdown) {
throw new IllegalStateException("Task has been shut down!");
}
if (taskInfo == null) {
if (taskWorkItem == null) {
throw new ISE("WTF?! TaskInfo disappeared for task: %s", task.getId());
}
if (taskInfo.processHolder != null) {
if (taskWorkItem.processHolder != null) {
throw new ISE("WTF?! TaskInfo already has a process holder for task: %s", task.getId());
}
......@@ -206,13 +203,13 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
jsonMapper.writeValue(taskFile, task);
log.info("Running command: %s", Joiner.on(" ").join(command));
taskInfo.processHolder = new ProcessHolder(
taskWorkItem.processHolder = new ProcessHolder(
new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
logFile,
childPort
);
processHolder = taskInfo.processHolder;
processHolder = taskWorkItem.processHolder;
processHolder.registerWithCloser(closer);
}
......@@ -261,9 +258,9 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
finally {
try {
synchronized (tasks) {
final TaskInfo taskInfo = tasks.remove(task.getId());
if (taskInfo != null && taskInfo.processHolder != null) {
taskInfo.processHolder.process.destroy();
final ForkingTaskRunnerWorkItem taskWorkItem = tasks.remove(task.getId());
if (taskWorkItem != null && taskWorkItem.processHolder != null) {
taskWorkItem.processHolder.process.destroy();
}
}
......@@ -281,7 +278,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
);
}
return tasks.get(task.getId()).statusFuture;
return tasks.get(task.getId()).getResult();
}
}
......@@ -291,10 +288,10 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
synchronized (tasks) {
exec.shutdown();
for (TaskInfo taskInfo : tasks.values()) {
if (taskInfo.processHolder != null) {
log.info("Destroying process: %s", taskInfo.processHolder.process);
taskInfo.processHolder.process.destroy();
for (ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
if (taskWorkItem.processHolder != null) {
log.info("Destroying process: %s", taskWorkItem.processHolder.process);
taskWorkItem.processHolder.process.destroy();
}
}
}
......@@ -303,7 +300,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
@Override
public void shutdown(final String taskid)
{
final TaskInfo taskInfo;
final ForkingTaskRunnerWorkItem taskInfo;
synchronized (tasks) {
taskInfo = tasks.get(taskid);
......@@ -326,13 +323,29 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
@Override
public Collection<TaskRunnerWorkItem> getRunningTasks()
{
return ImmutableList.of();
synchronized (tasks) {
final List<TaskRunnerWorkItem> ret = Lists.newArrayList();
for (final ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
if (taskWorkItem.processHolder != null) {
ret.add(taskWorkItem);
}
}
return ret;
}
}
@Override
public Collection<TaskRunnerWorkItem> getPendingTasks()
{
return ImmutableList.of();
synchronized (tasks) {
final List<TaskRunnerWorkItem> ret = Lists.newArrayList();
for (final ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
if (taskWorkItem.processHolder == null) {
ret.add(taskWorkItem);
}
}
return ret;
}
}
@Override
......@@ -347,9 +360,9 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
final ProcessHolder processHolder;
synchronized (tasks) {
final TaskInfo taskInfo = tasks.get(taskid);
if (taskInfo != null && taskInfo.processHolder != null) {
processHolder = taskInfo.processHolder;
final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(taskid);
if (taskWorkItem != null && taskWorkItem.processHolder != null) {
processHolder = taskWorkItem.processHolder;
} else {
return Optional.absent();
}
......@@ -380,13 +393,13 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
int port = config.getStartPort();
int maxPortSoFar = -1;
for (TaskInfo taskInfo : tasks.values()) {
if (taskInfo.processHolder != null) {
if (taskInfo.processHolder.port > maxPortSoFar) {
maxPortSoFar = taskInfo.processHolder.port;
for (ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
if (taskWorkItem.processHolder != null) {
if (taskWorkItem.processHolder.port > maxPortSoFar) {
maxPortSoFar = taskWorkItem.processHolder.port;
}
if (taskInfo.processHolder.port == port) {
if (taskWorkItem.processHolder.port == port) {
port = maxPortSoFar + 1;
}
}
......@@ -396,15 +409,17 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
}
}
private static class TaskInfo
private static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem
{
private final ListenableFuture<TaskStatus> statusFuture;
private volatile boolean shutdown = false;
private volatile ProcessHolder processHolder = null;
private TaskInfo(ListenableFuture<TaskStatus> statusFuture)
private ForkingTaskRunnerWorkItem(
Task task,
ListenableFuture<TaskStatus> statusFuture
)
{
this.statusFuture = statusFuture;
super(task, statusFuture);
}
}
......
......@@ -35,8 +35,7 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
private final Task task;
private final ListenableFuture<TaskStatus> result;
private final DateTime createdTime;
private volatile DateTime queueInsertionTime;
private final DateTime queueInsertionTime;
public TaskRunnerWorkItem(
Task task,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册