diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ForkingTaskRunner.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ForkingTaskRunner.java index 0eb0094c6c02b5a0063c05013bd6af1ff00e5d51..3fe72d35d8d2bb86ff40250526a7cac0c5c54ad6 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ForkingTaskRunner.java @@ -26,12 +26,10 @@ import com.google.common.base.Optional; import com.google.common.base.Splitter; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.ByteStreams; -import com.google.common.io.Closeables; import com.google.common.io.Closer; import com.google.common.io.Files; import com.google.common.io.InputSupplier; @@ -49,7 +47,6 @@ import com.metamx.druid.indexing.worker.executor.ExecutorMain; import com.metamx.emitter.EmittingLogger; import org.apache.commons.io.FileUtils; -import java.io.Closeable; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -63,7 +60,6 @@ import java.util.Properties; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicInteger; /** * Runs tasks in separate processes using {@link ExecutorMain}. @@ -79,7 +75,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider private final ListeningExecutorService exec; private final ObjectMapper jsonMapper; - private final Map tasks = Maps.newHashMap(); + private final Map tasks = Maps.newHashMap(); public ForkingTaskRunner( ForkingTaskRunnerConfig config, @@ -109,7 +105,8 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider if (!tasks.containsKey(task.getId())) { tasks.put( task.getId(), - new TaskInfo( + new ForkingTaskRunnerWorkItem( + task, exec.submit( new Callable() { @@ -135,17 +132,17 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider // time to adjust process holders synchronized (tasks) { - final TaskInfo taskInfo = tasks.get(task.getId()); + final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(task.getId()); - if (taskInfo.shutdown) { + if (taskWorkItem.shutdown) { throw new IllegalStateException("Task has been shut down!"); } - if (taskInfo == null) { + if (taskWorkItem == null) { throw new ISE("WTF?! TaskInfo disappeared for task: %s", task.getId()); } - if (taskInfo.processHolder != null) { + if (taskWorkItem.processHolder != null) { throw new ISE("WTF?! TaskInfo already has a process holder for task: %s", task.getId()); } @@ -206,13 +203,13 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider jsonMapper.writeValue(taskFile, task); log.info("Running command: %s", Joiner.on(" ").join(command)); - taskInfo.processHolder = new ProcessHolder( + taskWorkItem.processHolder = new ProcessHolder( new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(), logFile, childPort ); - processHolder = taskInfo.processHolder; + processHolder = taskWorkItem.processHolder; processHolder.registerWithCloser(closer); } @@ -261,9 +258,9 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider finally { try { synchronized (tasks) { - final TaskInfo taskInfo = tasks.remove(task.getId()); - if (taskInfo != null && taskInfo.processHolder != null) { - taskInfo.processHolder.process.destroy(); + final ForkingTaskRunnerWorkItem taskWorkItem = tasks.remove(task.getId()); + if (taskWorkItem != null && taskWorkItem.processHolder != null) { + taskWorkItem.processHolder.process.destroy(); } } @@ -281,7 +278,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider ); } - return tasks.get(task.getId()).statusFuture; + return tasks.get(task.getId()).getResult(); } } @@ -291,10 +288,10 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider synchronized (tasks) { exec.shutdown(); - for (TaskInfo taskInfo : tasks.values()) { - if (taskInfo.processHolder != null) { - log.info("Destroying process: %s", taskInfo.processHolder.process); - taskInfo.processHolder.process.destroy(); + for (ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) { + if (taskWorkItem.processHolder != null) { + log.info("Destroying process: %s", taskWorkItem.processHolder.process); + taskWorkItem.processHolder.process.destroy(); } } } @@ -303,7 +300,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider @Override public void shutdown(final String taskid) { - final TaskInfo taskInfo; + final ForkingTaskRunnerWorkItem taskInfo; synchronized (tasks) { taskInfo = tasks.get(taskid); @@ -326,13 +323,29 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider @Override public Collection getRunningTasks() { - return ImmutableList.of(); + synchronized (tasks) { + final List ret = Lists.newArrayList(); + for (final ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) { + if (taskWorkItem.processHolder != null) { + ret.add(taskWorkItem); + } + } + return ret; + } } @Override public Collection getPendingTasks() { - return ImmutableList.of(); + synchronized (tasks) { + final List ret = Lists.newArrayList(); + for (final ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) { + if (taskWorkItem.processHolder == null) { + ret.add(taskWorkItem); + } + } + return ret; + } } @Override @@ -347,9 +360,9 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider final ProcessHolder processHolder; synchronized (tasks) { - final TaskInfo taskInfo = tasks.get(taskid); - if (taskInfo != null && taskInfo.processHolder != null) { - processHolder = taskInfo.processHolder; + final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(taskid); + if (taskWorkItem != null && taskWorkItem.processHolder != null) { + processHolder = taskWorkItem.processHolder; } else { return Optional.absent(); } @@ -380,13 +393,13 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider int port = config.getStartPort(); int maxPortSoFar = -1; - for (TaskInfo taskInfo : tasks.values()) { - if (taskInfo.processHolder != null) { - if (taskInfo.processHolder.port > maxPortSoFar) { - maxPortSoFar = taskInfo.processHolder.port; + for (ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) { + if (taskWorkItem.processHolder != null) { + if (taskWorkItem.processHolder.port > maxPortSoFar) { + maxPortSoFar = taskWorkItem.processHolder.port; } - if (taskInfo.processHolder.port == port) { + if (taskWorkItem.processHolder.port == port) { port = maxPortSoFar + 1; } } @@ -396,15 +409,17 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider } } - private static class TaskInfo + private static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem { - private final ListenableFuture statusFuture; private volatile boolean shutdown = false; private volatile ProcessHolder processHolder = null; - private TaskInfo(ListenableFuture statusFuture) + private ForkingTaskRunnerWorkItem( + Task task, + ListenableFuture statusFuture + ) { - this.statusFuture = statusFuture; + super(task, statusFuture); } } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskRunnerWorkItem.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskRunnerWorkItem.java index 10a4ff5d1a32c436a35eff734479ae0d3ca12fda..e8f341284e12875902d42f61e7bebab187ee3ecf 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskRunnerWorkItem.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskRunnerWorkItem.java @@ -35,8 +35,7 @@ public class TaskRunnerWorkItem implements Comparable private final Task task; private final ListenableFuture result; private final DateTime createdTime; - - private volatile DateTime queueInsertionTime; + private final DateTime queueInsertionTime; public TaskRunnerWorkItem( Task task,