提交 06089360 编写于 作者: F fjy

Backport Dynamic configuration fixes for worker select strategy

上级 ecf0895f
......@@ -25,6 +25,7 @@ import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
......@@ -48,6 +49,7 @@ import io.druid.curator.cache.PathChildrenCacheFactory;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.overlord.setup.WorkerSelectStrategy;
import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker;
......@@ -107,7 +109,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
private final PathChildrenCacheFactory pathChildrenCacheFactory;
private final PathChildrenCache workerPathCache;
private final HttpClient httpClient;
private final WorkerSelectStrategy strategy;
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
// all workers that exist in ZK
private final ConcurrentMap<String, ZkWorker> zkWorkers = new ConcurrentHashMap<>();
......@@ -133,7 +135,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
CuratorFramework cf,
PathChildrenCacheFactory pathChildrenCacheFactory,
HttpClient httpClient,
WorkerSelectStrategy strategy
Supplier<WorkerBehaviorConfig> workerConfigRef
)
{
this.jsonMapper = jsonMapper;
......@@ -143,7 +145,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
this.pathChildrenCacheFactory = pathChildrenCacheFactory;
this.workerPathCache = pathChildrenCacheFactory.make(cf, zkPaths.getIndexerAnnouncementPath());
this.httpClient = httpClient;
this.strategy = strategy;
this.workerConfigRef = workerConfigRef;
}
@LifecycleStart
......@@ -529,6 +531,14 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
return true;
} else {
// Nothing running this task, announce it in ZK for a worker to run it
WorkerBehaviorConfig workerConfig = workerConfigRef.get();
WorkerSelectStrategy strategy;
if (workerConfig == null || workerConfig.getSelectStrategy() == null) {
log.warn("No worker selections strategy set. Using default.");
strategy = WorkerBehaviorConfig.DEFAULT_STRATEGY;
} else {
strategy = workerConfig.getSelectStrategy();
}
final Optional<ImmutableZkWorker> immutableZkWorker = strategy.findWorkerForTask(
config,
ImmutableMap.copyOf(
......
......@@ -41,7 +41,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
private final ZkPathsConfig zkPaths;
private final ObjectMapper jsonMapper;
private final HttpClient httpClient;
private final WorkerSelectStrategy strategy;
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
@Inject
public RemoteTaskRunnerFactory(
......@@ -50,7 +50,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
final ZkPathsConfig zkPaths,
final ObjectMapper jsonMapper,
@Global final HttpClient httpClient,
final Supplier<WorkerBehaviorConfig> workerBehaviourConfigSupplier
final Supplier<WorkerBehaviorConfig> workerConfigRef
)
{
this.curator = curator;
......@@ -58,17 +58,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
this.zkPaths = zkPaths;
this.jsonMapper = jsonMapper;
this.httpClient = httpClient;
if (workerBehaviourConfigSupplier != null) {
// Backwards compatibility
final WorkerBehaviorConfig workerBehaviorConfig = workerBehaviourConfigSupplier.get();
if (workerBehaviorConfig != null) {
this.strategy = workerBehaviorConfig.getSelectStrategy();
} else {
this.strategy = new FillCapacityWorkerSelectStrategy();
}
} else {
this.strategy = new FillCapacityWorkerSelectStrategy();
}
this.workerConfigRef = workerConfigRef;
}
@Override
......@@ -84,7 +74,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
.withCompressed(remoteTaskRunnerConfig.isCompressZnodes())
.build(),
httpClient,
strategy
workerConfigRef
);
}
}
......@@ -77,7 +77,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
synchronized (lock) {
boolean didProvision = false;
final WorkerBehaviorConfig workerConfig = workerConfigRef.get();
if (workerConfig == null) {
if (workerConfig == null || workerConfig.getAutoScaler() == null) {
log.warn("No workerConfig available, cannot provision new workers.");
return false;
}
......
......@@ -22,12 +22,20 @@ package io.druid.indexing.overlord.setup;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.indexing.overlord.autoscaling.AutoScaler;
import io.druid.indexing.overlord.autoscaling.NoopAutoScaler;
/**
*/
public class WorkerBehaviorConfig
{
public static final String CONFIG_KEY = "worker.config";
public static WorkerSelectStrategy DEFAULT_STRATEGY = new FillCapacityWorkerSelectStrategy();
public static AutoScaler DEFAULT_AUTOSCALER = new NoopAutoScaler();
public static WorkerBehaviorConfig defaultConfig()
{
return new WorkerBehaviorConfig(DEFAULT_STRATEGY, DEFAULT_AUTOSCALER);
}
private final WorkerSelectStrategy selectStrategy;
private final AutoScaler autoScaler;
......
......@@ -40,8 +40,7 @@ import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy;
import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker;
import io.druid.jackson.DefaultObjectMapper;
......@@ -59,6 +58,7 @@ import org.junit.Test;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class RemoteTaskRunnerTest
......@@ -69,6 +69,7 @@ public class RemoteTaskRunnerTest
private static final String announcementsPath = String.format("%s/indexer/announcements/worker", basePath);
private static final String tasksPath = String.format("%s/indexer/tasks/worker", basePath);
private static final String statusPath = String.format("%s/indexer/status/worker", basePath);
private static final int TIMEOUT_SECONDS = 5;
private TestingCluster testingCluster;
private CuratorFramework cf;
......@@ -285,7 +286,7 @@ public class RemoteTaskRunnerTest
cf.delete().forPath(joiner.join(statusPath, task.getId()));
TaskStatus status = future.get();
TaskStatus status = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
Assert.assertEquals(status.getStatusCode(), TaskStatus.Status.FAILED);
}
......@@ -338,7 +339,7 @@ public class RemoteTaskRunnerTest
ListenableFuture<TaskStatus> future = remoteTaskRunner.run(task);
TaskStatus status = future.get();
TaskStatus status = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
Assert.assertEquals(TaskStatus.Status.SUCCESS, status.getStatusCode());
}
......@@ -356,7 +357,7 @@ public class RemoteTaskRunnerTest
cf.delete().forPath(announcementsPath);
TaskStatus status = future.get();
TaskStatus status = future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
Assert.assertEquals(TaskStatus.Status.FAILED, status.getStatusCode());
}
......@@ -407,7 +408,7 @@ public class RemoteTaskRunnerTest
cf,
new SimplePathChildrenCacheFactory.Builder().build(),
null,
new FillCapacityWorkerSelectStrategy()
DSuppliers.of(new AtomicReference<>(WorkerBehaviorConfig.defaultConfig()))
);
remoteTaskRunner.start();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册