提交 ded83557 编写于 作者: F fjy

Merge pull request #640 from metamx/disable-worker

An alternative way to disable middlemanagers based on worker version
......@@ -22,7 +22,7 @@ The following configs only apply if the overlord is running in remote mode:
|Property|Description|Default|
|--------|-----------|-------|
|`druid.indexer.runner.taskAssignmentTimeout`|How long to wait after a task as been assigned to a middle manager before throwing an error.|PT5M|
|`druid.indexer.runner.minWorkerVersion`|The minimum middle manager version to send tasks to. |none|
|`druid.indexer.runner.minWorkerVersion`|The minimum middle manager version to send tasks to. |"0"|
|`druid.indexer.runner.compressZnodes`|Indicates whether or not the overlord should expect middle managers to compress Znodes.|false|
|`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper.|524288|
......@@ -80,7 +80,6 @@ Issuing a GET request at the same URL will return the current worker setup spec
|Property|Description|Default|
|--------|-----------|-------|
|`minVersion`|The coordinator only assigns tasks to workers with a version greater than the minVersion. If this is not specified, the minVersion will be druid.indexer.runner.minWorkerVersion.|none|
|`minNumWorkers`|The minimum number of workers that can be in the cluster at any given time.|0|
|`maxNumWorkers`|The maximum number of workers that can be in the cluster at any given time.|0|
|`nodeData`|A JSON object that describes how to launch new nodes. Currently, only EC2 is supported.|none; required|
......
---
layout: doc_page
---
Rolling Updates
===============
For rolling Druid cluster updates with no downtime, we recommend updating Druid nodes in the following order:
1. Historical Nodes
2. Indexing Service/Real-time Nodes
3. Broker Nodes
4. Coordinator Nodes
## Historical Nodes
Historical nodes can be updated one at a time. Each historical node has a startup time to memory map all the segments it was serving before the update. The startup time typically takes a few seconds to a few minutes, depending on the hardware of the node. As long as each historical node is updated with a sufficient delay (greater than the time required to start a single node), you can rolling update the entire historical cluster.
## Standalone Real-time nodes
Standalone real-time nodes can be updated one at a time in a rolling fashion.
## Indexing Service
### With Autoscaling
Overlord nodes will try to launch new middle manager nodes and terminate old ones without dropping data. This process is based on the configuration `druid.indexer.runner.minWorkerVersion=#{VERSION}`. Each time you update your overlord node, the `VERSION` value should be increased.
The config `druid.indexer.autoscale.workerVersion=#{VERSION}` also needs to be set.
### Without Autoscaling
Middle managers can be updated in a rolling fashion based on API.
To prepare a middle manager for update, send a POST request to `<MiddleManager_IP:PORT>/druid/worker/v1/disable`. The overlord will now no longer send tasks to this middle manager.
Current tasks will still try to complete. To view all existing tasks, send a GET request to `<MiddleManager_IP:PORT>/druid/worker/v1/tasks`. When this list is empty, the middle manager can be updated. After the middle manager is updated, it is automatically enabled again. You can also manually enable middle managers POSTing to `<MiddleManager_IP:PORT>/druid/worker/v1/enable`.
## Broker Nodes
Broker nodes can be updated one at a time in a rolling fashion. There needs to be some delay between updating each node as brokers must load the entire state of the cluster before they return valid results.
## Coordinator Nodes
Coordinator nodes can be updated in a rolling fashion.
\ No newline at end of file
......@@ -17,6 +17,7 @@ h2. Getting Started
h2. Booting a Druid Cluster
* "Simple Cluster Configuration":Simple-Cluster-Configuration.html
* "Production Cluster Configuration":Production-Cluster-Configuration.html
* "Rolling Cluster Updates":Rolling-Updates.html
h2. Configuration
* "Common Configuration":Configuration.html
......
......@@ -806,8 +806,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
}
);
sortedWorkers.addAll(zkWorkers.values());
final String workerSetupDataMinVer = workerSetupData.get() == null ? null : workerSetupData.get().getMinVersion();
final String minWorkerVer = workerSetupDataMinVer == null ? config.getMinWorkerVersion() : workerSetupDataMinVer;
final String minWorkerVer = config.getMinWorkerVersion();
for (ZkWorker zkWorker : sortedWorkers) {
if (zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) {
......
......@@ -274,9 +274,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
@Override
public boolean apply(ZkWorker zkWorker)
{
final String minVersion = workerSetupData.getMinVersion() != null
? workerSetupData.getMinVersion()
: config.getWorkerVersion();
final String minVersion = config.getWorkerVersion();
if (minVersion == null) {
throw new ISE("No minVersion found! It should be set in your runtime properties or configuration database.");
}
......
......@@ -28,7 +28,6 @@ public class WorkerSetupData
{
public static final String CONFIG_KEY = "worker.setup";
private final String minVersion;
private final int minNumWorkers;
private final int maxNumWorkers;
private final String availabilityZone;
......@@ -37,7 +36,6 @@ public class WorkerSetupData
@JsonCreator
public WorkerSetupData(
@JsonProperty("minVersion") String minVersion,
@JsonProperty("minNumWorkers") int minNumWorkers,
@JsonProperty("maxNumWorkers") int maxNumWorkers,
@JsonProperty("availabilityZone") String availabilityZone,
......@@ -45,7 +43,6 @@ public class WorkerSetupData
@JsonProperty("userData") EC2UserData userData
)
{
this.minVersion = minVersion;
this.minNumWorkers = minNumWorkers;
this.maxNumWorkers = maxNumWorkers;
this.availabilityZone = availabilityZone;
......@@ -53,12 +50,6 @@ public class WorkerSetupData
this.userData = userData;
}
@JsonProperty
public String getMinVersion()
{
return minVersion;
}
@JsonProperty
public int getMinNumWorkers()
{
......@@ -93,7 +84,6 @@ public class WorkerSetupData
public String toString()
{
return "WorkerSetupData{" +
"minVersion='" + minVersion + '\'' +
", minNumWorkers=" + minNumWorkers +
", maxNumWorkers=" + maxNumWorkers +
", availabilityZone=" + availabilityZone +
......
......@@ -51,13 +51,13 @@ public class WorkerCuratorCoordinator
private final ObjectMapper jsonMapper;
private final RemoteTaskRunnerConfig config;
private final CuratorFramework curatorFramework;
private final Worker worker;
private final Announcer announcer;
private final String baseAnnouncementsPath;
private final String baseTaskPath;
private final String baseStatusPath;
private volatile Worker worker;
private volatile boolean started;
@Inject
......@@ -129,7 +129,11 @@ public class WorkerCuratorCoordinator
try {
byte[] rawBytes = jsonMapper.writeValueAsBytes(data);
if (rawBytes.length > config.getMaxZnodeBytes()) {
throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxZnodeBytes());
throw new ISE(
"Length of raw bytes for task too large[%,d > %,d]",
rawBytes.length,
config.getMaxZnodeBytes()
);
}
curatorFramework.create()
......@@ -173,6 +177,11 @@ public class WorkerCuratorCoordinator
return getPath(Arrays.asList(baseStatusPath, statusId));
}
public Worker getWorker()
{
return worker;
}
public void unannounceTask(String taskId)
{
try {
......@@ -239,4 +248,16 @@ public class WorkerCuratorCoordinator
}
}
}
public void updateWorkerAnnouncement(Worker newWorker) throws Exception
{
synchronized (lock) {
if (!started) {
throw new ISE("Cannot update worker! Not Started!");
}
this.worker = newWorker;
announcer.update(getAnnouncementsPathForWorker(), jsonMapper.writeValueAsBytes(newWorker));
}
}
}
......@@ -19,12 +19,18 @@
package io.druid.indexing.worker.http;
import com.google.api.client.util.Lists;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.InputSupplier;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.indexing.overlord.ForkingTaskRunner;
import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.worker.Worker;
import io.druid.indexing.worker.WorkerCuratorCoordinator;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
......@@ -42,18 +48,98 @@ import java.io.InputStream;
public class WorkerResource
{
private static final Logger log = new Logger(WorkerResource.class);
private static String DISABLED_VERSION = "";
private final Worker enabledWorker;
private final Worker disabledWorker;
private final WorkerCuratorCoordinator curatorCoordinator;
private final ForkingTaskRunner taskRunner;
@Inject
public WorkerResource(
Worker worker,
WorkerCuratorCoordinator curatorCoordinator,
ForkingTaskRunner taskRunner
) throws Exception
{
this.enabledWorker = worker;
this.disabledWorker = new Worker(worker.getHost(), worker.getIp(), worker.getCapacity(), DISABLED_VERSION);
this.curatorCoordinator = curatorCoordinator;
this.taskRunner = taskRunner;
}
@POST
@Path("/disable")
@Produces("application/json")
public Response doDisable()
{
try {
curatorCoordinator.updateWorkerAnnouncement(disabledWorker);
return Response.ok(ImmutableMap.of(disabledWorker.getHost(), "disabled")).build();
}
catch (Exception e) {
return Response.serverError().build();
}
}
@POST
@Path("/enable")
@Produces("application/json")
public Response doEnable()
{
try {
curatorCoordinator.updateWorkerAnnouncement(enabledWorker);
return Response.ok(ImmutableMap.of(enabledWorker.getHost(), "enabled")).build();
}
catch (Exception e) {
return Response.serverError().build();
}
}
@GET
@Path("/enabled")
@Produces("application/json")
public Response isEnabled()
{
try {
final Worker theWorker = curatorCoordinator.getWorker();
final boolean enabled = !theWorker.getVersion().equalsIgnoreCase(DISABLED_VERSION);
return Response.ok(ImmutableMap.of(theWorker.getHost(), enabled)).build();
}
catch (Exception e) {
return Response.serverError().build();
}
}
@GET
@Path("/tasks")
@Produces("application/json")
public Response getTasks()
{
try {
return Response.ok(
Lists.newArrayList(
Collections2.transform(
taskRunner.getKnownTasks(),
new Function<TaskRunnerWorkItem, String>()
{
@Override
public String apply(TaskRunnerWorkItem input)
{
return input.getTaskId();
}
}
)
)
).build();
}
catch (Exception e) {
return Response.serverError().build();
}
}
@POST
@Path("/task/{taskid}/shutdown")
@Produces("application/json")
......@@ -82,7 +168,8 @@ public class WorkerResource
if (stream.isPresent()) {
try {
return Response.ok(stream.get().getInput()).build();
} catch (Exception e) {
}
catch (Exception e) {
log.warn(e, "Failed to read log for task: %s", taskid);
return Response.serverError().build();
}
......
......@@ -380,7 +380,7 @@ public class RemoteTaskRunnerTest
},
cf,
new SimplePathChildrenCacheFactory.Builder().build(),
DSuppliers.of(new AtomicReference<WorkerSetupData>(new WorkerSetupData("0", 0, 1, null, null, null))),
DSuppliers.of(new AtomicReference<WorkerSetupData>(new WorkerSetupData(0, 1, null, null, null))),
null
);
......
......@@ -95,7 +95,6 @@ public class EC2AutoScalingStrategyTest
{
workerSetupData.set(
new WorkerSetupData(
"0",
0,
1,
"",
......
......@@ -67,7 +67,7 @@ public class SimpleResourceManagementStrategyTest
autoScalingStrategy = EasyMock.createMock(AutoScalingStrategy.class);
workerSetupData = new AtomicReference<>(
new WorkerSetupData(
"0", 0, 2, null, null, null
0, 2, null, null, null
)
);
......@@ -237,7 +237,7 @@ public class SimpleResourceManagementStrategyTest
@Test
public void testDoSuccessfulTerminate() throws Exception
{
workerSetupData.set(new WorkerSetupData("0", 0, 1, null, null, null));
workerSetupData.set(new WorkerSetupData(0, 1, null, null, null));
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList());
......@@ -267,7 +267,7 @@ public class SimpleResourceManagementStrategyTest
@Test
public void testSomethingTerminating() throws Exception
{
workerSetupData.set(new WorkerSetupData("0", 0, 1, null, null, null));
workerSetupData.set(new WorkerSetupData(0, 1, null, null, null));
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip")).times(2);
......@@ -381,7 +381,7 @@ public class SimpleResourceManagementStrategyTest
EasyMock.verify(autoScalingStrategy);
// Increase minNumWorkers
workerSetupData.set(new WorkerSetupData("0", 3, 5, null, null, null));
workerSetupData.set(new WorkerSetupData(3, 5, null, null, null));
// Should provision two new workers
EasyMock.reset(autoScalingStrategy);
......@@ -404,85 +404,6 @@ public class SimpleResourceManagementStrategyTest
EasyMock.verify(autoScalingStrategy);
}
@Test
public void testMinVersionIncrease() throws Exception
{
// Don't terminate anything
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScalingStrategy);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create(), "h1", "i1", "0"),
new TestZkWorker(NoopTask.create(), "h1", "i2", "0")
)
);
Assert.assertFalse(terminatedSomething);
EasyMock.verify(autoScalingStrategy);
// Don't provision anything
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.replay(autoScalingStrategy);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create()),
new TestZkWorker(NoopTask.create())
)
);
Assert.assertFalse(provisionedSomething);
EasyMock.verify(autoScalingStrategy);
// Increase minVersion
workerSetupData.set(new WorkerSetupData("1", 0, 2, null, null, null));
// Provision two new workers
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.<List<String>>anyObject()))
.andReturn(Lists.<String>newArrayList("ip"));
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("h3"))
);
EasyMock.expect(autoScalingStrategy.provision()).andReturn(
new AutoScalingData(Lists.<String>newArrayList("h4"))
);
EasyMock.replay(autoScalingStrategy);
provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
new TestZkWorker(NoopTask.create(), "h1", "i1", "0"),
new TestZkWorker(NoopTask.create(), "h2", "i2", "0")
)
);
Assert.assertTrue(provisionedSomething);
EasyMock.verify(autoScalingStrategy);
// Terminate old workers
EasyMock.reset(autoScalingStrategy);
EasyMock.expect(autoScalingStrategy.ipToIdLookup(ImmutableList.of("i1", "i2", "i3", "i4"))).andReturn(
ImmutableList.of("h1", "h2", "h3", "h4")
);
EasyMock.expect(autoScalingStrategy.terminate(ImmutableList.of("i1", "i2"))).andReturn(
new AutoScalingData(ImmutableList.of("h1", "h2"))
);
EasyMock.replay(autoScalingStrategy);
terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<RemoteTaskRunnerWorkItem>asList(),
Arrays.<ZkWorker>asList(
new TestZkWorker(null, "h1", "i1", "0"),
new TestZkWorker(null, "h2", "i2", "0"),
new TestZkWorker(NoopTask.create(), "h3", "i3", "1"),
new TestZkWorker(NoopTask.create(), "h4", "i4", "1")
)
);
Assert.assertTrue(terminatedSomething);
EasyMock.verify(autoScalingStrategy);
}
@Test
public void testNullWorkerSetupData() throws Exception
{
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.worker.http;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.worker.Worker;
import io.druid.indexing.worker.WorkerCuratorCoordinator;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.server.initialization.ZkPathsConfig;
import junit.framework.Assert;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import javax.ws.rs.core.Response;
/**
*/
public class WorkerResourceTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
private static final String basePath = "/test/druid";
private static final String announcementsPath = String.format("%s/indexer/announcements/host", basePath);
private TestingCluster testingCluster;
private CuratorFramework cf;
private Worker worker;
private WorkerCuratorCoordinator curatorCoordinator;
private WorkerResource workerResource;
@Before
public void setUp() throws Exception
{
testingCluster = new TestingCluster(1);
testingCluster.start();
cf = CuratorFrameworkFactory.builder()
.connectString(testingCluster.getConnectString())
.retryPolicy(new ExponentialBackoffRetry(1, 10))
.compressionProvider(new PotentiallyGzippedCompressionProvider(false))
.build();
cf.start();
cf.create().creatingParentsIfNeeded().forPath(basePath);
worker = new Worker(
"host",
"ip",
3,
"v1"
);
curatorCoordinator = new WorkerCuratorCoordinator(
jsonMapper,
new ZkPathsConfig()
{
@Override
public String getZkBasePath()
{
return basePath;
}
},
new RemoteTaskRunnerConfig(),
cf,
worker
);
curatorCoordinator.start();
workerResource = new WorkerResource(
worker,
curatorCoordinator,
null
);
}
@After
public void tearDown() throws Exception
{
curatorCoordinator.stop();
cf.close();
testingCluster.close();
}
@Test
public void testDoDisable() throws Exception
{
Worker theWorker = jsonMapper.readValue(cf.getData().forPath(announcementsPath), Worker.class);
Assert.assertEquals("v1", theWorker.getVersion());
Response res = workerResource.doDisable();
Assert.assertEquals(Response.Status.OK.getStatusCode(), res.getStatus());
theWorker = jsonMapper.readValue(cf.getData().forPath(announcementsPath), Worker.class);
Assert.assertTrue(theWorker.getVersion().isEmpty());
}
@Test
public void testDoEnable() throws Exception
{
// Disable the worker
Response res = workerResource.doDisable();
Assert.assertEquals(Response.Status.OK.getStatusCode(), res.getStatus());
Worker theWorker = jsonMapper.readValue(cf.getData().forPath(announcementsPath), Worker.class);
Assert.assertTrue(theWorker.getVersion().isEmpty());
// Enable the worker
res = workerResource.doEnable();
Assert.assertEquals(Response.Status.OK.getStatusCode(), res.getStatus());
theWorker = jsonMapper.readValue(cf.getData().forPath(announcementsPath), Worker.class);
Assert.assertEquals("v1", theWorker.getVersion());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册