From 35f89d7232cee3ff7002e0e5c890bd5d96070680 Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 5 Aug 2013 14:44:01 -0700 Subject: [PATCH] make RTR idempotent to multiple run requests for same task, because higher level things in the indexing service require this behaviour --- .../coordinator/RemoteTaskRunner.java | 19 +++++++++++++------ .../config/RemoteTaskRunnerConfig.java | 5 +++++ .../coordinator/RemoteTaskRunnerTest.java | 17 ++++++----------- 3 files changed, 24 insertions(+), 17 deletions(-) diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java index 969c14b300..68f8f9b505 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java @@ -265,9 +265,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider task.getId(), new RemoteTaskRunnerWorkItem(task, SettableFuture.create()) ); - } else { - log.info("Bootstrap didn't find %s running. Running it again", task.getId()); - run(task); } } } @@ -284,8 +281,15 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider @Override public ListenableFuture run(final Task task) { - if (runningTasks.containsKey(task.getId()) || pendingTasks.containsKey(task.getId())) { - throw new ISE("Assigned a task[%s] that is already running or pending, WTF is happening?!", task.getId()); + RemoteTaskRunnerWorkItem runningTask = runningTasks.get(task.getId()); + if (runningTask != null) { + log.info("Assigned a task[%s] that is already running, not doing anything", task.getId()); + return runningTask.getResult(); + } + RemoteTaskRunnerWorkItem pendingTask = pendingTasks.get(task.getId()); + if (pendingTask != null) { + log.info("Assigned a task[%s] that is already pending, not doing anything", task.getId()); + return pendingTask.getResult(); } RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem( task, @@ -686,9 +690,12 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider } ); sortedWorkers.addAll(zkWorkers.values()); + final String configMinWorkerVer = workerSetupData.get().getMinVersion(); + final String minWorkerVer = configMinWorkerVer == null ? config.getWorkerVersion() : configMinWorkerVer; + for (ZkWorker zkWorker : sortedWorkers) { if (zkWorker.canRunTask(task) && - zkWorker.getWorker().getVersion().compareTo(workerSetupData.get().getMinVersion()) >= 0) { + zkWorker.getWorker().getVersion().compareTo(minWorkerVer) >= 0) { return zkWorker; } } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/RemoteTaskRunnerConfig.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/RemoteTaskRunnerConfig.java index 6023605ea7..7018e37a7f 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/RemoteTaskRunnerConfig.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/RemoteTaskRunnerConfig.java @@ -23,6 +23,7 @@ import com.metamx.druid.indexing.common.config.IndexerZkConfig; import org.joda.time.Duration; import org.skife.config.Config; import org.skife.config.Default; +import org.skife.config.DefaultNull; /** */ @@ -35,4 +36,8 @@ public abstract class RemoteTaskRunnerConfig extends IndexerZkConfig @Config("druid.curator.compression.enable") @Default("false") public abstract boolean enableCompression(); + + @Config("druid.indexer.worker.version") + @DefaultNull + public abstract String getWorkerVersion(); } diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java index c53f3e1e58..6c6da7ccb8 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java @@ -109,17 +109,6 @@ public class RemoteTaskRunnerTest remoteTaskRunner.run(task); } - @Test(expected = ISE.class) - public void testExceptionThrownWithExistingTask() throws Exception - { - doSetup(); - - remoteTaskRunner.run(makeTask(TaskStatus.running("task"))); - remoteTaskRunner.run( - makeTask(TaskStatus.running("task")) - ); - } - @Test public void testRunTooMuchZKData() throws Exception { @@ -415,5 +404,11 @@ public class RemoteTaskRunnerTest { return 1000; } + + @Override + public String getWorkerVersion() + { + return ""; + } } } -- GitLab