提交 2d21aea2 编写于 作者: F fjy

Merge pull request #323 from metamx/indexing-service-stuff

Indexing service stuff
......@@ -308,21 +308,29 @@ This module is used to configure the [Indexing Service](Indexing-Service.html) t
|Property|Description|Default|
|--------|-----------|-------|
|`druid.indexer.logs.type`|Choices:noop, S3. Where to store task logs|noop|
|`druid.indexer.logs.type`|Choices:noop, s3, file. Where to store task logs|file|
#### Noop Task Logs
#### File Task Logs
No task logs are actually stored.
Store task logs in the local filesystem.
|Property|Description|Default|
|--------|-----------|-------|
|`druid.indexer.logs.directory`|Local filesystem path.|log|
#### S3 Task Logs
Store Task Logs in S3.
Store task logs in S3.
|Property|Description|Default|
|--------|-----------|-------|
|`druid.indexer.logs.s3Bucket`|S3 bucket name.|none|
|`druid.indexer.logs.s3Prefix`|S3 key prefix.|none|
#### Noop Task Logs
No task logs are actually stored.
### Firehose Module
The Firehose module lists all available firehoses. There are no configurations.
......@@ -110,12 +110,16 @@ If autoscaling is enabled, new middle managers may be added when a task has been
#### JVM Configuration
In addition to the configuration of some of the default modules in [Configuration](Configuration.html), the overlord module requires the following basic configs to run in remote mode:
In addition to the configuration of some of the default modules in [Configuration](Configuration.html), the overlord has the following basic configs:
|Property|Description|Default|
|--------|-----------|-------|
|`druid.indexer.runner.type`|Choices "local" or "remote". Indicates whether tasks should be run locally or in a distributed environment.|local|
|`druid.indexer.storage.type`|Choices are "local" or "db". Indicates whether incoming tasks should be stored locally (in heap) or in a database. Storing incoming tasks in a database allows for tasks to be bootstrapped if the overlord should fail.|local|
|`druid.indexer.storage.type`|Choices are "local" or "db". Indicates whether incoming tasks should be stored locally (in heap) or in a database. Storing incoming tasks in a database allows for tasks to be resumed if the overlord should fail.|local|
|`druid.indexer.queue.maxSize`|Maximum number of active tasks at one time.|Integer.MAX_VALUE|
|`druid.indexer.queue.startDelay`|Sleep this long before starting overlord queue management. This can be useful to give a cluster time to re-orient itself after e.g. a widespread network issue.|PT1M|
|`druid.indexer.queue.restartDelay`|Sleep this long when overlord queue management throws an exception before trying again.|PT30S|
|`druid.indexer.queue.storageSyncRate`|Sync overlord state this often with an underlying task persistence mechanism.|PT1M|
The following configs only apply if the overlord is running in remote mode:
......
......@@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import io.druid.indexing.common.config.EventReceiverFirehoseFactoryConfig;
import io.druid.indexing.common.index.EventReceiverFirehoseFactory;
import io.druid.initialization.DruidModule;
......@@ -46,7 +45,5 @@ public class IndexingServiceFirehoseModule implements DruidModule
@Override
public void configure(Binder binder)
{
// backwards compatibility
ConfigProvider.bind(binder, EventReceiverFirehoseFactoryConfig.class);
}
}
......@@ -68,7 +68,7 @@ public class LockAcquireAction implements TaskAction<TaskLock>
@Override
public boolean isAudited()
{
return true;
return false;
}
@Override
......
......@@ -60,7 +60,7 @@ public class LockReleaseAction implements TaskAction<Void>
@Override
public boolean isAudited()
{
return true;
return false;
}
@Override
......
......@@ -23,56 +23,54 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableList;
import com.google.common.base.Optional;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.task.Task;
import org.joda.time.Interval;
import java.util.List;
public class SpawnTasksAction implements TaskAction<Void>
public class LockTryAcquireAction implements TaskAction<Optional<TaskLock>>
{
@JsonIgnore
private final List<Task> newTasks;
private final Interval interval;
@JsonCreator
public SpawnTasksAction(
@JsonProperty("newTasks") List<Task> newTasks
public LockTryAcquireAction(
@JsonProperty("interval") Interval interval
)
{
this.newTasks = ImmutableList.copyOf(newTasks);
this.interval = interval;
}
@JsonProperty
public List<Task> getNewTasks()
public Interval getInterval()
{
return newTasks;
return interval;
}
public TypeReference<Void> getReturnTypeReference()
public TypeReference<Optional<TaskLock>> getReturnTypeReference()
{
return new TypeReference<Void>() {};
return new TypeReference<Optional<TaskLock>>()
{
};
}
@Override
public Void perform(Task task, TaskActionToolbox toolbox)
public Optional<TaskLock> perform(Task task, TaskActionToolbox toolbox)
{
for(final Task newTask : newTasks) {
toolbox.getTaskQueue().add(newTask);
}
return null;
return toolbox.getTaskLockbox().tryLock(task, interval);
}
@Override
public boolean isAudited()
{
return true;
return false;
}
@Override
public String toString()
{
return "SpawnTasksAction{" +
"newTasks=" + newTasks +
return "LockTryAcquireAction{" +
"interval=" + interval +
'}';
}
}
......@@ -29,13 +29,13 @@ import java.io.IOException;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "lockAcquire", value = LockAcquireAction.class),
@JsonSubTypes.Type(name = "lockTryAcquire", value = LockTryAcquireAction.class),
@JsonSubTypes.Type(name = "lockList", value = LockListAction.class),
@JsonSubTypes.Type(name = "lockRelease", value = LockReleaseAction.class),
@JsonSubTypes.Type(name = "segmentInsertion", value = SegmentInsertAction.class),
@JsonSubTypes.Type(name = "segmentListUsed", value = SegmentListUsedAction.class),
@JsonSubTypes.Type(name = "segmentListUnused", value = SegmentListUnusedAction.class),
@JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class),
@JsonSubTypes.Type(name = "spawnTasks", value = SpawnTasksAction.class)
@JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class)
})
public interface TaskAction<RetType>
{
......
......@@ -27,7 +27,6 @@ import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.IndexerDBCoordinator;
import io.druid.indexing.overlord.TaskLockbox;
import io.druid.indexing.overlord.TaskQueue;
import io.druid.timeline.DataSegment;
import java.util.List;
......@@ -35,30 +34,22 @@ import java.util.Set;
public class TaskActionToolbox
{
private final TaskQueue taskQueue;
private final TaskLockbox taskLockbox;
private final IndexerDBCoordinator indexerDBCoordinator;
private final ServiceEmitter emitter;
@Inject
public TaskActionToolbox(
TaskQueue taskQueue,
TaskLockbox taskLockbox,
IndexerDBCoordinator indexerDBCoordinator,
ServiceEmitter emitter
)
{
this.taskQueue = taskQueue;
this.taskLockbox = taskLockbox;
this.indexerDBCoordinator = indexerDBCoordinator;
this.emitter = emitter;
}
public TaskQueue getTaskQueue()
{
return taskQueue;
}
public TaskLockbox getTaskLockbox()
{
return taskLockbox;
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.config;
import org.skife.config.Config;
/**
*/
@Deprecated
public abstract class EventReceiverFirehoseFactoryConfig
{
@Config("druid.indexer.firehoseId.prefix")
public abstract String getFirehoseIdPrefix();
}
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.config;
import io.druid.server.initialization.ZkPathsConfig;
import org.skife.config.Config;
import org.skife.config.Default;
/**
*/
public abstract class IndexerZkConfig extends ZkPathsConfig
{
@Config("druid.zk.maxNumBytes")
@Default("512000")
public abstract long getMaxNumBytes();
}
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.config;
import org.skife.config.Config;
import org.skife.config.Default;
import org.skife.config.DefaultNull;
public abstract class TaskLogConfig
{
@Config("druid.indexer.logs.type")
@Default("noop")
public abstract String getLogType();
@Config("druid.indexer.logs.s3bucket")
@DefaultNull
public abstract String getLogStorageBucket();
@Config("druid.indexer.logs.s3prefix")
@DefaultNull
public abstract String getLogStoragePrefix();
}
......@@ -33,7 +33,6 @@ import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.MapInputRowParser;
import io.druid.indexing.common.config.EventReceiverFirehoseFactoryConfig;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
......@@ -63,31 +62,15 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
private final MapInputRowParser parser;
private final Optional<ChatHandlerProvider> chatHandlerProvider;
@Deprecated
private final EventReceiverFirehoseFactoryConfig config;
@JsonCreator
public EventReceiverFirehoseFactory(
@JsonProperty("serviceName") String serviceName,
@JsonProperty("firehoseId") String firehoseId,
@JsonProperty("bufferSize") Integer bufferSize,
@JsonProperty("parser") MapInputRowParser parser,
@JacksonInject ChatHandlerProvider chatHandlerProvider,
@JacksonInject EventReceiverFirehoseFactoryConfig config
@JacksonInject ChatHandlerProvider chatHandlerProvider
)
{
// This code is here for backwards compatibility
if (serviceName == null) {
this.serviceName = String.format(
"%s:%s",
config.getFirehoseIdPrefix(),
Preconditions.checkNotNull(firehoseId, "firehoseId")
);
} else {
this.serviceName = serviceName;
}
this.config = config;
this.serviceName = Preconditions.checkNotNull(serviceName, "serviceName");
this.bufferSize = bufferSize == null || bufferSize <= 0 ? DEFAULT_BUFFER_SIZE : bufferSize;
this.parser = Preconditions.checkNotNull(parser, "parser");
this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
......@@ -117,13 +100,6 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
return serviceName;
}
@Deprecated
@JsonProperty("firehoseId")
public String getFirehoseId()
{
return serviceName.replaceFirst(String.format("%s:", config.getFirehoseIdPrefix()), "");
}
@JsonProperty
public int getBufferSize()
{
......
......@@ -17,23 +17,59 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.config;
package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.db.DbConnectorConfig;
import org.skife.config.Config;
import io.druid.indexing.common.actions.LockTryAcquireAction;
import io.druid.indexing.common.actions.TaskActionClient;
import org.joda.time.Interval;
public abstract class IndexerDbConnectorConfig extends DbConnectorConfig
public abstract class AbstractFixedIntervalTask extends AbstractTask
{
@JsonProperty("taskTable")
@Config("druid.database.taskTable")
public abstract String getTaskTable();
@JsonIgnore
private final Interval interval;
@JsonProperty("taskLockTable")
@Config("druid.database.taskLockTable")
public abstract String getTaskLockTable();
protected AbstractFixedIntervalTask(
String id,
String dataSource,
Interval interval
)
{
this(id, id, new TaskResource(id, 1), dataSource, interval);
}
@JsonProperty("taskLogTable")
@Config("druid.database.taskLogTable")
public abstract String getTaskLogTable();
protected AbstractFixedIntervalTask(
String id,
String groupId,
String dataSource,
Interval interval
)
{
this(id, groupId, new TaskResource(id, 1), dataSource, interval);
}
protected AbstractFixedIntervalTask(
String id,
String groupId,
TaskResource taskResource,
String dataSource,
Interval interval
)
{
super(id, groupId, taskResource, dataSource);
this.interval = interval;
}
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
return taskActionClient.submit(new LockTryAcquireAction(interval)).isPresent();
}
@JsonProperty
public Interval getInterval()
{
return interval;
}
}
......@@ -23,21 +23,15 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Joiner;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockListAction;
import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.List;
public abstract class AbstractTask implements Task
{
......@@ -55,26 +49,22 @@ public abstract class AbstractTask implements Task
@JsonIgnore
private final String dataSource;
@JsonIgnore
private final Optional<Interval> interval;
protected AbstractTask(String id, String dataSource, Interval interval)
protected AbstractTask(String id, String dataSource)
{
this(id, id, new TaskResource(id, 1), dataSource, interval);
this(id, id, new TaskResource(id, 1), dataSource);
}
protected AbstractTask(String id, String groupId, String dataSource, Interval interval)
protected AbstractTask(String id, String groupId, String dataSource)
{
this(id, groupId, new TaskResource(id, 1), dataSource, interval);
this(id, groupId, new TaskResource(id, 1), dataSource);
}
protected AbstractTask(String id, String groupId, TaskResource taskResource, String dataSource, Interval interval)
protected AbstractTask(String id, String groupId, TaskResource taskResource, String dataSource)
{
this.id = Preconditions.checkNotNull(id, "id");
this.groupId = Preconditions.checkNotNull(groupId, "groupId");
this.taskResource = Preconditions.checkNotNull(taskResource, "resource");
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.interval = Optional.fromNullable(interval);
}
@JsonProperty
......@@ -111,25 +101,12 @@ public abstract class AbstractTask implements Task
return dataSource;
}
@JsonProperty("interval")
@Override
public Optional<Interval> getImplicitLockInterval()
{
return interval;
}
@Override
public <T> QueryRunner<T> getQueryRunner(Query<T> query)
{
return null;
}
@Override
public TaskStatus preflight(TaskActionClient taskActionClient) throws Exception
{
return TaskStatus.running(id);
}
@Override
public String toString()
{
......@@ -137,7 +114,6 @@ public abstract class AbstractTask implements Task
.add("id", id)
.add("type", getType())
.add("dataSource", dataSource)
.add("interval", getImplicitLockInterval())
.toString();
}
......@@ -149,11 +125,6 @@ public abstract class AbstractTask implements Task
return ID_JOINER.join(objects);
}
public SegmentListUsedAction defaultListUsedAction()
{
return new SegmentListUsedAction(getDataSource(), getImplicitLockInterval().get());
}
public TaskStatus success()
{
return TaskStatus.success(getId());
......@@ -186,14 +157,6 @@ public abstract class AbstractTask implements Task
protected Iterable<TaskLock> getTaskLocks(TaskToolbox toolbox) throws IOException
{
final List<TaskLock> locks = toolbox.getTaskActionClient().submit(new LockListAction());
if (locks.isEmpty() && getImplicitLockInterval().isPresent()) {
// In the Peon's local mode, the implicit lock interval is not pre-acquired, so we need to try it here.
toolbox.getTaskActionClient().submit(new LockAcquireAction(getImplicitLockInterval().get()));
return toolbox.getTaskActionClient().submit(new LockListAction());
} else {
return locks;
}
return toolbox.getTaskActionClient().submit(new LockListAction());
}
}
......@@ -30,7 +30,6 @@ import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockListAction;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexMerger;
......@@ -44,7 +43,7 @@ import org.joda.time.Interval;
import java.io.File;
public class DeleteTask extends AbstractTask
public class DeleteTask extends AbstractFixedIntervalTask
{
private static final Logger log = new Logger(DeleteTask.class);
......@@ -78,16 +77,15 @@ public class DeleteTask extends AbstractTask
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
// Strategy: Create an empty segment covering the interval to be deleted
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
final Interval interval = this.getImplicitLockInterval().get();
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
final IncrementalIndex empty = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0]);
final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(interval, empty);
final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(getInterval(), empty);
// Create DataSegment
final DataSegment segment =
DataSegment.builder()
.dataSource(this.getDataSource())
.interval(interval)
.interval(getInterval())
.version(myLock.getVersion())
.shardSpec(new NoneShardSpec())
.build();
......
......@@ -37,12 +37,15 @@ import io.druid.indexer.HadoopDruidIndexerSchema;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockTryAcquireAction;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.initialization.Initialization;
import io.druid.server.initialization.ExtensionsConfig;
import io.druid.timeline.DataSegment;
import io.tesla.aether.internal.DefaultTeslaAether;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.File;
import java.lang.reflect.Method;
......@@ -51,7 +54,7 @@ import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.List;
public class HadoopIndexTask extends AbstractTask
public class HadoopIndexTask extends AbstractFixedIntervalTask
{
private static final Logger log = new Logger(HadoopIndexTask.class);
private static String defaultHadoopCoordinates = "org.apache.hadoop:hadoop-core:1.0.3";
......@@ -88,10 +91,14 @@ public class HadoopIndexTask extends AbstractTask
super(
id != null ? id : String.format("index_hadoop_%s_%s", schema.getDataSource(), new DateTime()),
schema.getDataSource(),
JodaUtils.umbrellaInterval(JodaUtils.condenseIntervals(schema.getGranularitySpec().bucketIntervals()))
JodaUtils.umbrellaInterval(
JodaUtils.condenseIntervals(
schema.getGranularitySpec()
.bucketIntervals()
)
)
);
// Some HadoopDruidIndexerSchema stuff doesn't make sense in the context of the indexing service
Preconditions.checkArgument(schema.getSegmentOutputPath() == null, "segmentOutputPath must be absent");
Preconditions.checkArgument(schema.getWorkingPath() == null, "workingPath must be absent");
......@@ -107,7 +114,6 @@ public class HadoopIndexTask extends AbstractTask
return "index_hadoop";
}
@JsonProperty("config")
public HadoopDruidIndexerSchema getSchema()
{
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultiset;
import com.google.common.primitives.Ints;
import com.metamx.common.logger.Logger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SpawnTasksAction;
import io.druid.segment.realtime.Schema;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.ShardSpec;
import io.druid.timeline.partition.SingleDimensionShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class IndexDeterminePartitionsTask extends AbstractTask
{
private static String makeTaskId(String groupId, DateTime start, DateTime end)
{
return String.format(
"%s_partitions_%s_%s",
groupId,
start,
end
);
}
@JsonIgnore
private final FirehoseFactory firehoseFactory;
@JsonIgnore
private final Schema schema;
@JsonIgnore
private final long targetPartitionSize;
@JsonIgnore
private final int rowFlushBoundary;
private static final Logger log = new Logger(IndexTask.class);
@JsonCreator
public IndexDeterminePartitionsTask(
@JsonProperty("id") String id,
@JsonProperty("groupId") String groupId,
@JsonProperty("interval") Interval interval,
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("schema") Schema schema,
@JsonProperty("targetPartitionSize") long targetPartitionSize,
@JsonProperty("rowFlushBoundary") int rowFlushBoundary
)
{
super(
id != null ? id : makeTaskId(groupId, interval.getStart(), interval.getEnd()),
groupId,
schema.getDataSource(),
Preconditions.checkNotNull(interval, "interval")
);
this.firehoseFactory = firehoseFactory;
this.schema = schema;
this.targetPartitionSize = targetPartitionSize;
this.rowFlushBoundary = rowFlushBoundary;
}
@Override
public String getType()
{
return "index_partitions";
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
log.info("Running with targetPartitionSize[%d]", targetPartitionSize);
// The implementation of this determine partitions stuff is less than optimal. Should be done better.
// We know this exists
final Interval interval = getImplicitLockInterval().get();
// Blacklist dimensions that have multiple values per row
final Set<String> unusableDimensions = Sets.newHashSet();
// Track values of all non-blacklisted dimensions
final Map<String, TreeMultiset<String>> dimensionValueMultisets = Maps.newHashMap();
// Load data
final Firehose firehose = firehoseFactory.connect();
try {
while (firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow();
if (interval.contains(inputRow.getTimestampFromEpoch())) {
// Extract dimensions from event
for (final String dim : inputRow.getDimensions()) {
final List<String> dimValues = inputRow.getDimension(dim);
if (!unusableDimensions.contains(dim)) {
if (dimValues.size() == 1) {
// Track this value
TreeMultiset<String> dimensionValueMultiset = dimensionValueMultisets.get(dim);
if (dimensionValueMultiset == null) {
dimensionValueMultiset = TreeMultiset.create();
dimensionValueMultisets.put(dim, dimensionValueMultiset);
}
dimensionValueMultiset.add(dimValues.get(0));
} else {
// Only single-valued dimensions can be used for partitions
unusableDimensions.add(dim);
dimensionValueMultisets.remove(dim);
}
}
}
}
}
}
finally {
firehose.close();
}
// ShardSpecs for index generator tasks
final List<ShardSpec> shardSpecs = Lists.newArrayList();
// Select highest-cardinality dimension
Ordering<Map.Entry<String, TreeMultiset<String>>> byCardinalityOrdering = new Ordering<Map.Entry<String, TreeMultiset<String>>>()
{
@Override
public int compare(
Map.Entry<String, TreeMultiset<String>> left,
Map.Entry<String, TreeMultiset<String>> right
)
{
return Ints.compare(left.getValue().elementSet().size(), right.getValue().elementSet().size());
}
};
if (dimensionValueMultisets.isEmpty()) {
// No suitable partition dimension. We'll make one big segment and hope for the best.
log.info("No suitable partition dimension found");
shardSpecs.add(new NoneShardSpec());
} else {
// Find best partition dimension (heuristic: highest cardinality).
final Map.Entry<String, TreeMultiset<String>> partitionEntry =
byCardinalityOrdering.max(dimensionValueMultisets.entrySet());
final String partitionDim = partitionEntry.getKey();
final TreeMultiset<String> partitionDimValues = partitionEntry.getValue();
log.info(
"Partitioning on dimension[%s] with cardinality[%d] over rows[%d]",
partitionDim,
partitionDimValues.elementSet().size(),
partitionDimValues.size()
);
// Iterate over unique partition dimension values in sorted order
String currentPartitionStart = null;
int currentPartitionSize = 0;
for (final String partitionDimValue : partitionDimValues.elementSet()) {
currentPartitionSize += partitionDimValues.count(partitionDimValue);
if (currentPartitionSize >= targetPartitionSize) {
final ShardSpec shardSpec = new SingleDimensionShardSpec(
partitionDim,
currentPartitionStart,
partitionDimValue,
shardSpecs.size()
);
log.info("Adding shard: %s", shardSpec);
shardSpecs.add(shardSpec);
currentPartitionSize = partitionDimValues.count(partitionDimValue);
currentPartitionStart = partitionDimValue;
}
}
if (currentPartitionSize > 0) {
// One last shard to go
final ShardSpec shardSpec;
if (shardSpecs.isEmpty()) {
shardSpec = new NoneShardSpec();
} else {
shardSpec = new SingleDimensionShardSpec(
partitionDim,
currentPartitionStart,
null,
shardSpecs.size()
);
}
log.info("Adding shard: %s", shardSpec);
shardSpecs.add(shardSpec);
}
}
List<Task> nextTasks = Lists.transform(
shardSpecs,
new Function<ShardSpec, Task>()
{
@Override
public Task apply(ShardSpec shardSpec)
{
return new IndexGeneratorTask(
null,
getGroupId(),
getImplicitLockInterval().get(),
firehoseFactory,
new Schema(
schema.getDataSource(),
schema.getSpatialDimensions(),
schema.getAggregators(),
schema.getIndexGranularity(),
shardSpec
),
rowFlushBoundary
);
}
}
);
toolbox.getTaskActionClient().submit(new SpawnTasksAction(nextTasks));
return TaskStatus.success(getId());
}
@JsonProperty
public FirehoseFactory getFirehoseFactory()
{
return firehoseFactory;
}
@JsonProperty
public Schema getSchema()
{
return schema;
}
@JsonProperty
public long getTargetPartitionSize()
{
return targetPartitionSize;
}
@JsonProperty
public int getRowFlushBoundary()
{
return rowFlushBoundary;
}
}
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.metamx.common.logger.Logger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.index.YeOldePlumberSchool;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.Schema;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.Sink;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public class IndexGeneratorTask extends AbstractTask
{
@JsonIgnore
private final FirehoseFactory firehoseFactory;
@JsonIgnore
private final Schema schema;
@JsonIgnore
private final int rowFlushBoundary;
private static final Logger log = new Logger(IndexTask.class);
@JsonCreator
public IndexGeneratorTask(
@JsonProperty("id") String id,
@JsonProperty("groupId") String groupId,
@JsonProperty("interval") Interval interval,
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("schema") Schema schema,
@JsonProperty("rowFlushBoundary") int rowFlushBoundary
)
{
super(
id != null
? id
: String.format(
"%s_generator_%s_%s_%s",
groupId,
interval.getStart(),
interval.getEnd(),
schema.getShardSpec().getPartitionNum()
),
groupId,
schema.getDataSource(),
Preconditions.checkNotNull(interval, "interval")
);
this.firehoseFactory = firehoseFactory;
this.schema = schema;
this.rowFlushBoundary = rowFlushBoundary;
}
@Override
public String getType()
{
return "index_generator";
}
@Override
public TaskStatus run(final TaskToolbox toolbox) throws Exception
{
// We should have a lock from before we started running
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
// We know this exists
final Interval interval = getImplicitLockInterval().get();
// Set up temporary directory for indexing
final File tmpDir = new File(
toolbox.getTaskWorkDir(),
String.format(
"%s_%s_%s_%s_%s",
this.getDataSource(),
interval.getStart(),
interval.getEnd(),
myLock.getVersion(),
schema.getShardSpec().getPartitionNum()
)
);
// We need to track published segments.
final List<DataSegment> pushedSegments = new CopyOnWriteArrayList<DataSegment>();
final DataSegmentPusher wrappedDataSegmentPusher = new DataSegmentPusher()
{
@Override
public String getPathForHadoop(String dataSource)
{
return toolbox.getSegmentPusher().getPathForHadoop(dataSource);
}
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
{
final DataSegment pushedSegment = toolbox.getSegmentPusher().push(file, segment);
pushedSegments.add(pushedSegment);
return pushedSegment;
}
};
// Create firehose + plumber
final FireDepartmentMetrics metrics = new FireDepartmentMetrics();
final Firehose firehose = firehoseFactory.connect();
final Plumber plumber = new YeOldePlumberSchool(
interval,
myLock.getVersion(),
wrappedDataSegmentPusher,
tmpDir
).findPlumber(schema, metrics);
// rowFlushBoundary for this job
final int myRowFlushBoundary = this.rowFlushBoundary > 0
? rowFlushBoundary
: toolbox.getConfig().getDefaultRowFlushBoundary();
try {
while (firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow();
if (shouldIndex(inputRow)) {
final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
if (sink == null) {
throw new NullPointerException(
String.format(
"Was expecting non-null sink for timestamp[%s]",
new DateTime(inputRow.getTimestampFromEpoch())
)
);
}
int numRows = sink.add(inputRow);
metrics.incrementProcessed();
if (numRows >= myRowFlushBoundary) {
plumber.persist(firehose.commit());
}
} else {
metrics.incrementThrownAway();
}
}
}
finally {
firehose.close();
}
plumber.persist(firehose.commit());
plumber.finishJob();
// Output metrics
log.info(
"Task[%s] took in %,d rows (%,d processed, %,d unparseable, %,d thrown away) and output %,d rows",
getId(),
metrics.processed() + metrics.unparseable() + metrics.thrownAway(),
metrics.processed(),
metrics.unparseable(),
metrics.thrownAway(),
metrics.rowOutput()
);
// Request segment pushes
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(pushedSegments)));
// Done
return TaskStatus.success(getId());
}
/**
* Should we index this inputRow? Decision is based on our interval and shardSpec.
*
* @param inputRow the row to check
*
* @return true or false
*/
private boolean shouldIndex(InputRow inputRow)
{
if (getImplicitLockInterval().get().contains(inputRow.getTimestampFromEpoch())) {
return schema.getShardSpec().isInChunk(inputRow);
} else {
return false;
}
}
@JsonProperty("firehose")
public FirehoseFactory getFirehoseFactory()
{
return firehoseFactory;
}
@JsonProperty
public Schema getSchema()
{
return schema;
}
@JsonProperty
public int getRowFlushBoundary()
{
return rowFlushBoundary;
}
}
......@@ -22,26 +22,48 @@ package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.util.Sets;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.TreeMultiset;
import com.google.common.primitives.Ints;
import com.metamx.common.logger.Logger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.indexer.granularity.GranularitySpec;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SpawnTasksAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.index.YeOldePlumberSchool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.Schema;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.Sink;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.ShardSpec;
import io.druid.timeline.partition.SingleDimensionShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
public class IndexTask extends AbstractTask
public class IndexTask extends AbstractFixedIntervalTask
{
private static final Logger log = new Logger(IndexTask.class);
......@@ -58,7 +80,7 @@ public class IndexTask extends AbstractTask
private final QueryGranularity indexGranularity;
@JsonIgnore
private final long targetPartitionSize;
private final int targetPartitionSize;
@JsonIgnore
private final FirehoseFactory firehoseFactory;
......@@ -74,7 +96,7 @@ public class IndexTask extends AbstractTask
@JsonProperty("spatialDimensions") List<SpatialDimensionSchema> spatialDimensions,
@JsonProperty("aggregators") AggregatorFactory[] aggregators,
@JsonProperty("indexGranularity") QueryGranularity indexGranularity,
@JsonProperty("targetPartitionSize") long targetPartitionSize,
@JsonProperty("targetPartitionSize") int targetPartitionSize,
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("rowFlushBoundary") int rowFlushBoundary
)
......@@ -96,75 +118,283 @@ public class IndexTask extends AbstractTask
this.aggregators = aggregators;
this.indexGranularity = (indexGranularity == null) ? QueryGranularity.NONE : indexGranularity;
this.targetPartitionSize = targetPartitionSize;
this.firehoseFactory = firehoseFactory;
this.firehoseFactory = Preconditions.checkNotNull(firehoseFactory, "firehoseFactory");
this.rowFlushBoundary = rowFlushBoundary;
}
public List<Task> toSubtasks()
@Override
public String getType()
{
final List<Task> retVal = Lists.newArrayList();
return "index";
}
for (final Interval interval : granularitySpec.bucketIntervals()) {
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
final Set<DataSegment> segments = Sets.newHashSet();
for (final Interval bucket : granularitySpec.bucketIntervals()) {
final List<ShardSpec> shardSpecs;
if (targetPartitionSize > 0) {
// Need to do one pass over the data before indexing in order to determine good partitions
retVal.add(
new IndexDeterminePartitionsTask(
null,
getGroupId(),
interval,
firehoseFactory,
new Schema(
getDataSource(),
spatialDimensions,
aggregators,
indexGranularity,
new NoneShardSpec()
),
targetPartitionSize,
rowFlushBoundary
)
);
shardSpecs = determinePartitions(bucket, targetPartitionSize);
} else {
// Jump straight into indexing
retVal.add(
new IndexGeneratorTask(
null,
getGroupId(),
interval,
firehoseFactory,
new Schema(
getDataSource(),
spatialDimensions,
aggregators,
indexGranularity,
new NoneShardSpec()
),
rowFlushBoundary
)
shardSpecs = ImmutableList.<ShardSpec>of(new NoneShardSpec());
}
for (final ShardSpec shardSpec : shardSpecs) {
final DataSegment segment = generateSegment(
toolbox,
new Schema(
getDataSource(),
spatialDimensions,
aggregators,
indexGranularity,
shardSpec
),
bucket,
myLock.getVersion()
);
segments.add(segment);
}
}
return retVal;
toolbox.getTaskActionClient().submit(new SegmentInsertAction(segments));
return TaskStatus.success(getId());
}
@Override
public String getType()
private List<ShardSpec> determinePartitions(
final Interval interval,
final int targetPartitionSize
) throws IOException
{
return "index";
log.info("Determining partitions for interval[%s] with targetPartitionSize[%d]", interval, targetPartitionSize);
// The implementation of this determine partitions stuff is less than optimal. Should be done better.
// Blacklist dimensions that have multiple values per row
final Set<String> unusableDimensions = com.google.common.collect.Sets.newHashSet();
// Track values of all non-blacklisted dimensions
final Map<String, TreeMultiset<String>> dimensionValueMultisets = Maps.newHashMap();
// Load data
try (Firehose firehose = firehoseFactory.connect()) {
while (firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow();
if (interval.contains(inputRow.getTimestampFromEpoch())) {
// Extract dimensions from event
for (final String dim : inputRow.getDimensions()) {
final List<String> dimValues = inputRow.getDimension(dim);
if (!unusableDimensions.contains(dim)) {
if (dimValues.size() == 1) {
// Track this value
TreeMultiset<String> dimensionValueMultiset = dimensionValueMultisets.get(dim);
if (dimensionValueMultiset == null) {
dimensionValueMultiset = TreeMultiset.create();
dimensionValueMultisets.put(dim, dimensionValueMultiset);
}
dimensionValueMultiset.add(dimValues.get(0));
} else {
// Only single-valued dimensions can be used for partitions
unusableDimensions.add(dim);
dimensionValueMultisets.remove(dim);
}
}
}
}
}
}
// ShardSpecs we will return
final List<ShardSpec> shardSpecs = Lists.newArrayList();
// Select highest-cardinality dimension
Ordering<Map.Entry<String, TreeMultiset<String>>> byCardinalityOrdering = new Ordering<Map.Entry<String, TreeMultiset<String>>>()
{
@Override
public int compare(
Map.Entry<String, TreeMultiset<String>> left,
Map.Entry<String, TreeMultiset<String>> right
)
{
return Ints.compare(left.getValue().elementSet().size(), right.getValue().elementSet().size());
}
};
if (dimensionValueMultisets.isEmpty()) {
// No suitable partition dimension. We'll make one big segment and hope for the best.
log.info("No suitable partition dimension found");
shardSpecs.add(new NoneShardSpec());
} else {
// Find best partition dimension (heuristic: highest cardinality).
final Map.Entry<String, TreeMultiset<String>> partitionEntry =
byCardinalityOrdering.max(dimensionValueMultisets.entrySet());
final String partitionDim = partitionEntry.getKey();
final TreeMultiset<String> partitionDimValues = partitionEntry.getValue();
log.info(
"Partitioning on dimension[%s] with cardinality[%d] over rows[%d]",
partitionDim,
partitionDimValues.elementSet().size(),
partitionDimValues.size()
);
// Iterate over unique partition dimension values in sorted order
String currentPartitionStart = null;
int currentPartitionSize = 0;
for (final String partitionDimValue : partitionDimValues.elementSet()) {
currentPartitionSize += partitionDimValues.count(partitionDimValue);
if (currentPartitionSize >= targetPartitionSize) {
final ShardSpec shardSpec = new SingleDimensionShardSpec(
partitionDim,
currentPartitionStart,
partitionDimValue,
shardSpecs.size()
);
log.info("Adding shard: %s", shardSpec);
shardSpecs.add(shardSpec);
currentPartitionSize = partitionDimValues.count(partitionDimValue);
currentPartitionStart = partitionDimValue;
}
}
if (currentPartitionSize > 0) {
// One last shard to go
final ShardSpec shardSpec;
if (shardSpecs.isEmpty()) {
shardSpec = new NoneShardSpec();
} else {
shardSpec = new SingleDimensionShardSpec(
partitionDim,
currentPartitionStart,
null,
shardSpecs.size()
);
}
log.info("Adding shard: %s", shardSpec);
shardSpecs.add(shardSpec);
}
}
return shardSpecs;
}
@Override
public TaskStatus preflight(TaskActionClient taskActionClient) throws Exception
private DataSegment generateSegment(
final TaskToolbox toolbox,
final Schema schema,
final Interval interval,
final String version
) throws IOException
{
taskActionClient.submit(new SpawnTasksAction(toSubtasks()));
return TaskStatus.success(getId());
// Set up temporary directory.
final File tmpDir = new File(
toolbox.getTaskWorkDir(),
String.format(
"%s_%s_%s_%s_%s",
this.getDataSource(),
interval.getStart(),
interval.getEnd(),
version,
schema.getShardSpec().getPartitionNum()
)
);
// We need to track published segments.
final List<DataSegment> pushedSegments = new CopyOnWriteArrayList<DataSegment>();
final DataSegmentPusher wrappedDataSegmentPusher = new DataSegmentPusher()
{
@Override
public String getPathForHadoop(String dataSource)
{
return toolbox.getSegmentPusher().getPathForHadoop(dataSource);
}
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
{
final DataSegment pushedSegment = toolbox.getSegmentPusher().push(file, segment);
pushedSegments.add(pushedSegment);
return pushedSegment;
}
};
// Create firehose + plumber
final FireDepartmentMetrics metrics = new FireDepartmentMetrics();
final Firehose firehose = firehoseFactory.connect();
final Plumber plumber = new YeOldePlumberSchool(
interval,
version,
wrappedDataSegmentPusher,
tmpDir
).findPlumber(schema, metrics);
// rowFlushBoundary for this job
final int myRowFlushBoundary = this.rowFlushBoundary > 0
? rowFlushBoundary
: toolbox.getConfig().getDefaultRowFlushBoundary();
try {
plumber.startJob();
while (firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow();
if (shouldIndex(schema, interval, inputRow)) {
final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
if (sink == null) {
throw new NullPointerException(
String.format(
"Was expecting non-null sink for timestamp[%s]",
new DateTime(inputRow.getTimestampFromEpoch())
)
);
}
int numRows = sink.add(inputRow);
metrics.incrementProcessed();
if (numRows >= myRowFlushBoundary) {
plumber.persist(firehose.commit());
}
} else {
metrics.incrementThrownAway();
}
}
}
finally {
firehose.close();
}
plumber.persist(firehose.commit());
plumber.finishJob();
// Output metrics
log.info(
"Task[%s] took in %,d rows (%,d processed, %,d unparseable, %,d thrown away) and output %,d rows",
getId(),
metrics.processed() + metrics.unparseable() + metrics.thrownAway(),
metrics.processed(),
metrics.unparseable(),
metrics.thrownAway(),
metrics.rowOutput()
);
// We expect a single segment to have been created.
return Iterables.getOnlyElement(pushedSegments);
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
/**
* Should we index this inputRow? Decision is based on our interval and shardSpec.
*
* @param inputRow the row to check
*
* @return true or false
*/
private boolean shouldIndex(final Schema schema, final Interval interval, final InputRow inputRow)
{
throw new IllegalStateException("IndexTasks should not be run!");
return interval.contains(inputRow.getTimestampFromEpoch()) && schema.getShardSpec().isInChunk(inputRow);
}
@JsonProperty
......@@ -191,7 +421,7 @@ public class IndexTask extends AbstractTask
return targetPartitionSize;
}
@JsonProperty
@JsonProperty("firehose")
public FirehoseFactory getFirehoseFactory()
{
return firehoseFactory;
......@@ -202,4 +432,10 @@ public class IndexTask extends AbstractTask
{
return rowFlushBoundary;
}
@JsonProperty
public List<SpatialDimensionSchema> getSpatialDimensions()
{
return spatialDimensions;
}
}
......@@ -28,7 +28,6 @@ import com.metamx.common.logger.Logger;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockListAction;
import io.druid.indexing.common.actions.SegmentListUnusedAction;
import io.druid.indexing.common.actions.SegmentNukeAction;
import io.druid.timeline.DataSegment;
......@@ -38,7 +37,7 @@ import java.util.List;
/**
*/
public class KillTask extends AbstractTask
public class KillTask extends AbstractFixedIntervalTask
{
private static final Logger log = new Logger(KillTask.class);
......@@ -68,12 +67,12 @@ public class KillTask extends AbstractTask
// Confirm we have a lock (will throw if there isn't exactly one element)
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
if(!myLock.getDataSource().equals(getDataSource())) {
if (!myLock.getDataSource().equals(getDataSource())) {
throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource());
}
if(!myLock.getInterval().equals(getImplicitLockInterval().get())) {
throw new ISE("WTF?! Lock interval[%s] != task interval[%s]", myLock.getInterval(), getImplicitLockInterval().get());
if (!myLock.getInterval().equals(getInterval())) {
throw new ISE("WTF?! Lock interval[%s] != task interval[%s]", myLock.getInterval(), getInterval());
}
// List unused segments
......@@ -82,8 +81,8 @@ public class KillTask extends AbstractTask
.submit(new SegmentListUnusedAction(myLock.getDataSource(), myLock.getInterval()));
// Verify none of these segments have versions > lock version
for(final DataSegment unusedSegment : unusedSegments) {
if(unusedSegment.getVersion().compareTo(myLock.getVersion()) > 0) {
for (final DataSegment unusedSegment : unusedSegments) {
if (unusedSegment.getVersion().compareTo(myLock.getVersion()) > 0) {
throw new ISE(
"WTF?! Unused segment[%s] has version[%s] > task version[%s]",
unusedSegment.getIdentifier(),
......
......@@ -27,7 +27,6 @@ import com.google.common.base.Joiner;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
......@@ -41,9 +40,8 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockListAction;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.segment.IndexIO;
import io.druid.timeline.DataSegment;
......@@ -53,14 +51,13 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
*/
public abstract class MergeTaskBase extends AbstractTask
public abstract class MergeTaskBase extends AbstractFixedIntervalTask
{
@JsonIgnore
private final List<DataSegment> segments;
......@@ -186,9 +183,12 @@ public abstract class MergeTaskBase extends AbstractTask
* we are operating on every segment that overlaps the chosen interval.
*/
@Override
public TaskStatus preflight(TaskActionClient taskActionClient)
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
try {
// Try to acquire lock
if (!super.isReady(taskActionClient)) {
return false;
} else {
final Function<DataSegment, String> toIdentifier = new Function<DataSegment, String>()
{
@Override
......@@ -199,7 +199,10 @@ public abstract class MergeTaskBase extends AbstractTask
};
final Set<String> current = ImmutableSet.copyOf(
Iterables.transform(taskActionClient.submit(defaultListUsedAction()), toIdentifier)
Iterables.transform(
taskActionClient.submit(new SegmentListUsedAction(getDataSource(), getInterval())),
toIdentifier
)
);
final Set<String> requested = ImmutableSet.copyOf(Iterables.transform(segments, toIdentifier));
......@@ -219,10 +222,7 @@ public abstract class MergeTaskBase extends AbstractTask
);
}
return TaskStatus.running(getId());
}
catch (IOException e) {
throw Throwables.propagate(e);
return true;
}
}
......@@ -241,7 +241,7 @@ public abstract class MergeTaskBase extends AbstractTask
return Objects.toStringHelper(this)
.add("id", getId())
.add("dataSource", getDataSource())
.add("interval", getImplicitLockInterval())
.add("interval", getInterval())
.add("segments", segments)
.toString();
}
......
......@@ -25,9 +25,8 @@ import com.metamx.common.logger.Logger;
import io.druid.data.input.FirehoseFactory;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.TaskActionClient;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
/**
*/
......@@ -42,19 +41,16 @@ public class NoopTask extends AbstractTask
@JsonCreator
public NoopTask(
@JsonProperty("id") String id,
@JsonProperty("interval") Interval interval,
@JsonProperty("runTime") int runTime,
@JsonProperty("firehose") FirehoseFactory firehoseFactory
)
{
super(
id == null ? String.format("noop_%s", new DateTime()) : id,
"none",
interval == null ? new Interval(Period.days(1), new DateTime()) : interval
"none"
);
this.runTime = (runTime == 0) ? defaultRunTime : runTime;
this.firehoseFactory = firehoseFactory;
}
......@@ -76,6 +72,12 @@ public class NoopTask extends AbstractTask
return firehoseFactory;
}
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
return true;
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
......
......@@ -38,6 +38,7 @@ import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockListAction;
import io.druid.indexing.common.actions.LockReleaseAction;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
......@@ -130,8 +131,7 @@ public class RealtimeIndexTask extends AbstractTask
), 1
)
: taskResource,
schema.getDataSource(),
null
schema.getDataSource()
);
this.schema = schema;
......@@ -167,6 +167,12 @@ public class RealtimeIndexTask extends AbstractTask
}
}
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
return true;
}
@Override
public TaskStatus run(final TaskToolbox toolbox) throws Exception
{
......
......@@ -21,27 +21,22 @@ package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Optional;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import org.joda.time.Interval;
/**
* Represents a task that can run on a worker. The general contracts surrounding Tasks are:
* <ul>
* <li>Tasks must operate on a single datasource.</li>
* <li>Tasks should be immutable, since the task ID is used as a proxy for the task in many locations.</li>
* <li>Task IDs must be unique. This can be done by naming them using UUIDs or the current timestamp.</li>
* <li>Tasks are each part of a "task group", which is a set of tasks that can share interval locks. These are
* useful for producing sharded segments.</li>
* <li>Tasks can optionally have an "implicit lock interval". Tasks with this property are guaranteed to have
* a lock on that interval during their {@link #preflight(io.druid.indexing.common.actions.TaskActionClient)}
* and {@link #run(io.druid.indexing.common.TaskToolbox)} methods.</li>
* <li>Tasks do not need to explicitly release locks; they are released upon task completion. Tasks may choose
* to release locks early if they desire.</li>
* <li>Tasks must operate on a single datasource.</li>
* <li>Tasks should be immutable, since the task ID is used as a proxy for the task in many locations.</li>
* <li>Task IDs must be unique. This can be done by naming them using UUIDs or the current timestamp.</li>
* <li>Tasks are each part of a "task group", which is a set of tasks that can share interval locks. These are
* useful for producing sharded segments.</li>
* <li>Tasks do not need to explicitly release locks; they are released upon task completion. Tasks may choose
* to release locks early if they desire.</li>
* </ul>
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
......@@ -51,8 +46,6 @@ import org.joda.time.Interval;
@JsonSubTypes.Type(name = "delete", value = DeleteTask.class),
@JsonSubTypes.Type(name = "kill", value = KillTask.class),
@JsonSubTypes.Type(name = "index", value = IndexTask.class),
@JsonSubTypes.Type(name = "index_partitions", value = IndexDeterminePartitionsTask.class),
@JsonSubTypes.Type(name = "index_generator", value = IndexGeneratorTask.class),
@JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class),
@JsonSubTypes.Type(name = "index_realtime", value = RealtimeIndexTask.class),
@JsonSubTypes.Type(name = "noop", value = NoopTask.class),
......@@ -96,12 +89,6 @@ public interface Task
*/
public String getDataSource();
/**
* Returns implicit lock interval for this task, if any. Tasks without implicit lock intervals are not granted locks
* when started and must explicitly request them.
*/
public Optional<Interval> getImplicitLockInterval();
/**
* Returns query runners for this task. If this task is not meant to answer queries over its datasource, this method
* should return null.
......@@ -109,18 +96,19 @@ public interface Task
public <T> QueryRunner<T> getQueryRunner(Query<T> query);
/**
* Execute preflight checks for a task. This typically runs on the coordinator, and will be run while
* holding a lock on our dataSource and implicit lock interval (if any). If this method throws an exception, the
* task should be considered a failure.
* Execute preflight actions for a task. This can be used to acquire locks, check preconditions, and so on. The
* actions must be idempotent, since this method may be executed multiple times. This typically runs on the
* coordinator. If this method throws an exception, the task should be considered a failure.
*
* This method must be idempotent, as it may be run multiple times per task.
*
* @param taskActionClient action client for this task (not the full toolbox)
*
* @return Some kind of status (runnable means continue on to a worker, non-runnable means we completed without
* using a worker).
* @return true if ready, false if not ready yet
*
* @throws Exception
* @throws Exception if the task should be considered a failure
*/
public TaskStatus preflight(TaskActionClient taskActionClient) throws Exception;
public boolean isReady(TaskActionClient taskActionClient) throws Exception;
/**
* Execute a task. This typically runs on a worker as determined by a TaskRunner, and will be run while
......
......@@ -23,16 +23,14 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.actions.SpawnTasksAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.segment.IndexIO;
import io.druid.segment.loading.SegmentLoadingException;
......@@ -48,10 +46,10 @@ import java.util.Map;
/**
*/
public class VersionConverterTask extends AbstractTask
public class VersionConverterTask extends AbstractFixedIntervalTask
{
private static final String TYPE = "version_converter";
private static final Integer CURR_VERSION_INTEGER = new Integer(IndexIO.CURRENT_VERSION_ID);
private static final Integer CURR_VERSION_INTEGER = IndexIO.CURRENT_VERSION_ID;
private static final Logger log = new Logger(VersionConverterTask.class);
......@@ -74,6 +72,8 @@ public class VersionConverterTask extends AbstractTask
private static String makeId(String dataSource, Interval interval)
{
Preconditions.checkNotNull(dataSource, "dataSource");
Preconditions.checkNotNull(interval, "interval");
return joinId(TYPE, dataSource, interval.getStart(), interval.getEnd(), new DateTime());
}
......@@ -105,7 +105,6 @@ public class VersionConverterTask extends AbstractTask
)
{
super(id, groupId, dataSource, interval);
this.segment = segment;
}
......@@ -125,45 +124,43 @@ public class VersionConverterTask extends AbstractTask
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
if (segment == null) {
throw new ISE("Segment was null, this should never run.", this.getClass().getSimpleName());
}
log.info("I'm in a subless mood.");
convertSegment(toolbox, segment);
return success();
}
@Override
public TaskStatus preflight(TaskActionClient taskActionClient) throws Exception
{
if (segment != null) {
return super.preflight(taskActionClient);
}
List<DataSegment> segments = taskActionClient.submit(defaultListUsedAction());
final FunctionalIterable<Task> tasks = FunctionalIterable
.create(segments)
.keep(
new Function<DataSegment, Task>()
{
@Override
public Task apply(DataSegment segment)
final List<DataSegment> segments = toolbox.getTaskActionClient().submit(
new SegmentListUsedAction(
getDataSource(),
getInterval()
)
);
final FunctionalIterable<Task> tasks = FunctionalIterable
.create(segments)
.keep(
new Function<DataSegment, Task>()
{
final Integer segmentVersion = segment.getBinaryVersion();
if (!CURR_VERSION_INTEGER.equals(segmentVersion)) {
return new SubTask(getGroupId(), segment);
@Override
public Task apply(DataSegment segment)
{
final Integer segmentVersion = segment.getBinaryVersion();
if (!CURR_VERSION_INTEGER.equals(segmentVersion)) {
return new SubTask(getGroupId(), segment);
}
log.info("Skipping[%s], already version[%s]", segment.getIdentifier(), segmentVersion);
return null;
}
log.info("Skipping[%s], already version[%s]", segment.getIdentifier(), segmentVersion);
return null;
}
}
);
taskActionClient.submit(new SpawnTasksAction(Lists.newArrayList(tasks)));
return TaskStatus.success(getId());
);
// Vestigial from a past time when this task spawned subtasks.
for (final Task subTask : tasks) {
final TaskStatus status = subTask.run(toolbox);
if (!status.isSuccess()) {
return status;
}
}
} else {
log.info("I'm in a subless mood.");
convertSegment(toolbox, segment);
}
return success();
}
@Override
......@@ -185,7 +182,7 @@ public class VersionConverterTask extends AbstractTask
return super.equals(o);
}
public static class SubTask extends AbstractTask
public static class SubTask extends AbstractFixedIntervalTask
{
@JsonIgnore
private final DataSegment segment;
......
......@@ -23,15 +23,19 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.common.RetryUtils;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import com.mysql.jdbc.exceptions.MySQLTimeoutException;
import com.mysql.jdbc.exceptions.MySQLTransientException;
import io.druid.db.DbConnector;
import io.druid.db.DbTablesConfig;
import io.druid.indexing.common.TaskLock;
......@@ -41,11 +45,18 @@ import io.druid.indexing.common.task.Task;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.skife.jdbi.v2.exceptions.DBIException;
import org.skife.jdbi.v2.exceptions.StatementException;
import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.sql.SQLException;
import java.sql.SQLRecoverableException;
import java.sql.SQLTransientException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
public class DbTaskStorage implements TaskStorage
{
......@@ -92,7 +103,7 @@ public class DbTaskStorage implements TaskStorage
log.info("Inserting task %s with status: %s", task.getId(), status);
try {
dbi.withHandle(
retryingHandle(
new HandleCallback<Void>()
{
@Override
......@@ -134,7 +145,7 @@ public class DbTaskStorage implements TaskStorage
log.info("Updating task %s to status: %s", status.getId(), status);
int updated = dbi.withHandle(
int updated = retryingHandle(
new HandleCallback<Integer>()
{
@Override
......@@ -162,7 +173,7 @@ public class DbTaskStorage implements TaskStorage
@Override
public Optional<Task> getTask(final String taskid)
{
return dbi.withHandle(
return retryingHandle(
new HandleCallback<Optional<Task>>()
{
@Override
......@@ -192,7 +203,7 @@ public class DbTaskStorage implements TaskStorage
@Override
public Optional<TaskStatus> getStatus(final String taskid)
{
return dbi.withHandle(
return retryingHandle(
new HandleCallback<Optional<TaskStatus>>()
{
@Override
......@@ -222,7 +233,7 @@ public class DbTaskStorage implements TaskStorage
@Override
public List<Task> getActiveTasks()
{
return dbi.withHandle(
return retryingHandle(
new HandleCallback<List<Task>>()
{
@Override
......@@ -231,7 +242,7 @@ public class DbTaskStorage implements TaskStorage
final List<Map<String, Object>> dbTasks =
handle.createQuery(
String.format(
"SELECT id, payload, status_payload FROM %s WHERE active = 1",
"SELECT id, payload, status_payload FROM %s WHERE active = 1 ORDER BY created_date",
dbTables.getTasksTable()
)
)
......@@ -273,7 +284,7 @@ public class DbTaskStorage implements TaskStorage
taskid
);
dbi.withHandle(
retryingHandle(
new HandleCallback<Integer>()
{
@Override
......@@ -308,7 +319,7 @@ public class DbTaskStorage implements TaskStorage
if (taskLock.equals(taskLockToRemove)) {
log.info("Deleting TaskLock with id[%d]: %s", id, taskLock);
dbi.withHandle(
retryingHandle(
new HandleCallback<Integer>()
{
@Override
......@@ -353,7 +364,7 @@ public class DbTaskStorage implements TaskStorage
log.info("Logging action for task[%s]: %s", task.getId(), taskAction);
dbi.withHandle(
retryingHandle(
new HandleCallback<Integer>()
{
@Override
......@@ -376,7 +387,7 @@ public class DbTaskStorage implements TaskStorage
@Override
public List<TaskAction> getAuditLogs(final String taskid)
{
return dbi.withHandle(
return retryingHandle(
new HandleCallback<List<TaskAction>>()
{
@Override
......@@ -392,21 +403,18 @@ public class DbTaskStorage implements TaskStorage
.bind("task_id", taskid)
.list();
return Lists.transform(
dbTaskLogs, new Function<Map<String, Object>, TaskAction>()
{
@Override
public TaskAction apply(Map<String, Object> row)
{
try {
return jsonMapper.readValue((byte[]) row.get("log_payload"), TaskAction.class);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
final List<TaskAction> retList = Lists.newArrayList();
for (final Map<String, Object> dbTaskLog : dbTaskLogs) {
try {
retList.add(jsonMapper.readValue((byte[]) dbTaskLog.get("log_payload"), TaskAction.class));
} catch (Exception e) {
log.makeAlert(e, "Failed to deserialize TaskLog")
.addData("task", taskid)
.addData("logPayload", dbTaskLog)
.emit();
}
}
);
return retList;
}
}
);
......@@ -414,7 +422,7 @@ public class DbTaskStorage implements TaskStorage
private Map<Long, TaskLock> getLocksWithIds(final String taskid)
{
return dbi.withHandle(
return retryingHandle(
new HandleCallback<Map<Long, TaskLock>>()
{
@Override
......@@ -439,4 +447,45 @@ public class DbTaskStorage implements TaskStorage
}
);
}
/**
* Retry SQL operations
*/
private <T> T retryingHandle(final HandleCallback<T> callback) {
final Callable<T> call = new Callable<T>()
{
@Override
public T call() throws Exception
{
return dbi.withHandle(callback);
}
};
final Predicate<Throwable> shouldRetry = new Predicate<Throwable>()
{
@Override
public boolean apply(Throwable e)
{
return shouldRetryException(e);
}
};
final int maxTries = 10;
try {
return RetryUtils.retry(call, shouldRetry, maxTries);
} catch (RuntimeException e) {
throw Throwables.propagate(e);
} catch (Exception e) {
throw new CallbackFailedException(e);
}
}
private static boolean shouldRetryException(final Throwable e)
{
return e != null && (e instanceof SQLTransientException
|| e instanceof MySQLTransientException
|| e instanceof SQLRecoverableException
|| e instanceof UnableToObtainConnectionException
|| (e instanceof SQLException && ((SQLException) e).getErrorCode() == 1317)
|| (e instanceof SQLException && shouldRetryException(e.getCause()))
|| (e instanceof DBIException && shouldRetryException(e.getCause())));
}
}
......@@ -101,12 +101,6 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
this.exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(workerConfig.getCapacity()));
}
@Override
public void bootstrap(List<Task> tasks)
{
// do nothing
}
@Override
public ListenableFuture<TaskStatus> run(final Task task)
{
......@@ -115,7 +109,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
tasks.put(
task.getId(),
new ForkingTaskRunnerWorkItem(
task,
task.getId(),
exec.submit(
new Callable<TaskStatus>()
{
......@@ -358,6 +352,14 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
}
}
@Override
public Collection<TaskRunnerWorkItem> getKnownTasks()
{
synchronized (tasks) {
return Lists.<TaskRunnerWorkItem>newArrayList(tasks.values());
}
}
@Override
public Collection<ZkWorker> getWorkers()
{
......@@ -425,11 +427,11 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
private volatile ProcessHolder processHolder = null;
private ForkingTaskRunnerWorkItem(
Task task,
String taskId,
ListenableFuture<TaskStatus> statusFuture
)
{
super(task, statusFuture);
super(taskId, statusFuture);
}
}
......
......@@ -21,7 +21,6 @@ package io.druid.indexing.overlord;
import com.google.common.util.concurrent.SettableFuture;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.worker.Worker;
import org.joda.time.DateTime;
......@@ -33,25 +32,25 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
private final Worker worker;
public RemoteTaskRunnerWorkItem(
Task task,
String taskId,
SettableFuture<TaskStatus> result,
Worker worker
)
{
super(task, result);
super(taskId, result);
this.result = result;
this.worker = worker;
}
public RemoteTaskRunnerWorkItem(
Task task,
String taskId,
SettableFuture<TaskStatus> result,
DateTime createdTime,
DateTime queueInsertionTime,
Worker worker
)
{
super(task, result, createdTime, queueInsertionTime);
super(taskId, result, createdTime, queueInsertionTime);
this.result = result;
this.worker = worker;
}
......@@ -69,11 +68,11 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
@Override
public RemoteTaskRunnerWorkItem withQueueInsertionTime(DateTime time)
{
return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), time, worker);
return new RemoteTaskRunnerWorkItem(getTaskId(), result, getCreatedTime(), time, worker);
}
public RemoteTaskRunnerWorkItem withWorker(Worker theWorker)
{
return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), getQueueInsertionTime(), theWorker);
return new RemoteTaskRunnerWorkItem(getTaskId(), result, getCreatedTime(), getQueueInsertionTime(), theWorker);
}
}
......@@ -23,13 +23,15 @@ import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.IAE;
import com.metamx.common.Pair;
import com.metamx.common.guava.Comparators;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.emitter.EmittingLogger;
......@@ -73,8 +75,86 @@ public class TaskLockbox
}
/**
* Locks a task without removing it from the queue. Blocks until the lock is acquired. Throws an exception
* if the lock cannot be acquired.
* Wipe out our current in-memory state and resync it from our bundled {@link io.druid.indexing.overlord.TaskStorage}.
*/
public void syncFromStorage()
{
giant.lock();
try {
// Load stuff from taskStorage first. If this fails, we don't want to lose all our locks.
final List<Pair<Task, TaskLock>> storedLocks = Lists.newArrayList();
for (final Task task : taskStorage.getActiveTasks()) {
for (final TaskLock taskLock : taskStorage.getLocks(task.getId())) {
storedLocks.add(Pair.of(task, taskLock));
}
}
// Sort locks by version, so we add them back in the order they were acquired.
final Ordering<Pair<Task, TaskLock>> byVersionOrdering = new Ordering<Pair<Task, TaskLock>>()
{
@Override
public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right)
{
// The second compare shouldn't be necessary, but, whatever.
return ComparisonChain.start()
.compare(left.rhs.getVersion(), right.rhs.getVersion())
.compare(left.lhs.getId(), right.lhs.getId())
.result();
}
};
running.clear();
// Bookkeeping for a log message at the end
final Set<String> uniqueTaskIds = Sets.newHashSet();
int taskLockCount = 0;
for (final Pair<Task, TaskLock> taskAndLock : byVersionOrdering.sortedCopy(storedLocks)) {
final Task task = taskAndLock.lhs;
final TaskLock savedTaskLock = taskAndLock.rhs;
uniqueTaskIds.add(task.getId());
final Optional<TaskLock> acquiredTaskLock = tryLock(
task,
savedTaskLock.getInterval(),
Optional.of(savedTaskLock.getVersion())
);
if (acquiredTaskLock.isPresent() && savedTaskLock.getVersion().equals(acquiredTaskLock.get().getVersion())) {
taskLockCount ++;
log.info(
"Reacquired lock on interval[%s] version[%s] for task: %s",
savedTaskLock.getInterval(),
savedTaskLock.getVersion(),
task.getId()
);
} else if (acquiredTaskLock.isPresent()) {
taskLockCount ++;
log.info(
"Could not reacquire lock on interval[%s] version[%s] (got version[%s] instead) for task: %s",
savedTaskLock.getInterval(),
savedTaskLock.getVersion(),
acquiredTaskLock.get().getVersion(),
task.getId()
);
} else {
log.info(
"Could not reacquire lock on interval[%s] version[%s] for task: %s",
savedTaskLock.getInterval(),
savedTaskLock.getVersion(),
task.getId()
);
}
}
log.info(
"Synced %,d locks for %,d tasks from storage (%,d locks ignored).",
taskLockCount,
uniqueTaskIds.size(),
storedLocks.size() - taskLockCount
);
} finally {
giant.unlock();
}
}
/**
* Acquires a lock on behalf of a task. Blocks until the lock is acquired. Throws an exception if the lock
* cannot be acquired.
*/
public TaskLock lock(final Task task, final Interval interval) throws InterruptedException
{
......@@ -97,7 +177,8 @@ public class TaskLockbox
* Attempt to lock a task, without removing it from the queue. Equivalent to the long form of {@code tryLock}
* with no preferred version.
*
* @param task task to attempt to lock
* @param task task that wants a lock
* @param interval interval to lock
*
* @return lock version if lock was acquired, absent otherwise
*/
......@@ -113,22 +194,17 @@ public class TaskLockbox
* is only mostly guaranteed, however; we assume clock monotonicity and we assume that callers specifying
* {@code preferredVersion} are doing the right thing.
*
* @param task task to attempt to lock
* @param task task that wants a lock
* @param interval interval to lock
* @param preferredVersion use this version string if one has not yet been assigned
*
* @return lock version if lock was acquired, absent otherwise
*/
public Optional<TaskLock> tryLock(final Task task, final Interval interval, final Optional<String> preferredVersion)
private Optional<TaskLock> tryLock(final Task task, final Interval interval, final Optional<String> preferredVersion)
{
giant.lock();
try {
if(task.getImplicitLockInterval().isPresent() && !task.getImplicitLockInterval().get().equals(interval)) {
// Task may only lock its fixed interval, if present
throw new IAE("Task must lock its fixed interval: %s", task.getId());
}
final String dataSource = task.getDataSource();
final List<TaskLockPosse> foundPosses = findLockPossesForInterval(dataSource, interval);
final TaskLockPosse posseToUse;
......@@ -184,9 +260,10 @@ public class TaskLockbox
if (posseToUse.getTaskIds().add(task.getId())) {
log.info("Added task[%s] to TaskLock[%s]", task.getId(), posseToUse.getTaskLock().getGroupId());
// Best effort to update task storage facility
// Update task storage facility. If it fails, revoke the lock.
try {
taskStorage.addLock(task.getId(), posseToUse.getTaskLock());
return Optional.of(posseToUse.getTaskLock());
} catch(Exception e) {
log.makeAlert("Failed to persist lock in storage")
.addData("task", task.getId())
......@@ -194,12 +271,13 @@ public class TaskLockbox
.addData("interval", posseToUse.getTaskLock().getInterval())
.addData("version", posseToUse.getTaskLock().getVersion())
.emit();
unlock(task, interval);
return Optional.absent();
}
} else {
log.info("Task[%s] already present in TaskLock[%s]", task.getId(), posseToUse.getTaskLock().getGroupId());
return Optional.of(posseToUse.getTaskLock());
}
return Optional.of(posseToUse.getTaskLock());
}
finally {
giant.unlock();
......@@ -271,7 +349,7 @@ public class TaskLockbox
// Wake up blocking-lock waiters
lockReleaseCondition.signalAll();
// Best effort to remove lock from storage
// Remove lock from storage. If it cannot be removed, just ignore the failure.
try {
taskStorage.removeLock(task.getId(), taskLock);
} catch(Exception e) {
......@@ -315,20 +393,6 @@ public class TaskLockbox
}
}
/**
* Removes all locks from this lockbox.
*/
public void clear()
{
giant.lock();
try {
running.clear();
} finally {
giant.unlock();
}
}
/**
* Return the currently-active lock posses for some task.
*
......@@ -341,17 +405,12 @@ public class TaskLockbox
try {
final Iterable<TaskLockPosse> searchSpace;
if (task.getImplicitLockInterval().isPresent()) {
// Narrow down search using findLockPossesForInterval
searchSpace = findLockPossesForInterval(task.getDataSource(), task.getImplicitLockInterval().get());
// Scan through all locks for this datasource
final NavigableMap<Interval, TaskLockPosse> dsRunning = running.get(task.getDataSource());
if(dsRunning == null) {
searchSpace = ImmutableList.of();
} else {
// Scan through all locks for this datasource
final NavigableMap<Interval, TaskLockPosse> dsRunning = running.get(task.getDataSource());
if(dsRunning == null) {
searchSpace = ImmutableList.of();
} else {
searchSpace = dsRunning.values();
}
searchSpace = dsRunning.values();
}
return ImmutableList.copyOf(
......
......@@ -34,7 +34,7 @@ import io.druid.guice.annotations.Self;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.exec.TaskConsumer;
import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.indexing.overlord.scaling.ResourceManagementScheduler;
import io.druid.indexing.overlord.scaling.ResourceManagementSchedulerFactory;
import io.druid.server.DruidNode;
......@@ -56,20 +56,22 @@ public class TaskMaster
private final LeaderSelector leaderSelector;
private final ReentrantLock giant = new ReentrantLock();
private final Condition mayBeStopped = giant.newCondition();
private final TaskQueue taskQueue;
private final TaskActionClientFactory taskActionClientFactory;
private final AtomicReference<Lifecycle> leaderLifecycleRef = new AtomicReference<Lifecycle>(null);
private final AtomicReference<Lifecycle> leaderLifecycleRef = new AtomicReference<>(null);
private volatile boolean leading = false;
private volatile TaskRunner taskRunner;
private volatile TaskQueue taskQueue;
private volatile ResourceManagementScheduler resourceManagementScheduler;
private static final EmittingLogger log = new EmittingLogger(TaskMaster.class);
@Inject
public TaskMaster(
final TaskQueue taskQueue,
final TaskQueueConfig taskQueueConfig,
final TaskLockbox taskLockbox,
final TaskStorage taskStorage,
final TaskActionClientFactory taskActionClientFactory,
@Self final DruidNode node,
final ZkPathsConfig zkPaths,
......@@ -80,118 +82,99 @@ public class TaskMaster
final ServiceEmitter emitter
)
{
this.taskQueue = taskQueue;
this.taskActionClientFactory = taskActionClientFactory;
this.leaderSelector = new LeaderSelector(
curator, zkPaths.getIndexerLeaderLatchPath(), new LeaderSelectorListener()
{
@Override
public void takeLeadership(CuratorFramework client) throws Exception
{
giant.lock();
try {
log.info("By the power of Grayskull, I have the power!");
taskRunner = runnerFactory.build();
final TaskConsumer taskConsumer = new TaskConsumer(
taskQueue,
taskRunner,
taskActionClientFactory,
emitter
);
// Bootstrap task queue and task lockbox (load state stuff from the database)
taskQueue.bootstrap();
// Sensible order to start stuff:
final Lifecycle leaderLifecycle = new Lifecycle();
if (leaderLifecycleRef.getAndSet(leaderLifecycle) != null) {
log.makeAlert("TaskMaster set a new Lifecycle without the old one being cleared! Race condition")
.emit();
}
leaderLifecycle.addManagedInstance(taskRunner);
leaderLifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
taskRunner.bootstrap(taskQueue.snapshot());
}
@Override
public void stop()
{
}
curator,
zkPaths.getIndexerLeaderLatchPath(),
new LeaderSelectorListener()
{
@Override
public void takeLeadership(CuratorFramework client) throws Exception
{
giant.lock();
try {
// Make sure the previous leadership cycle is really, really over.
stopLeading();
// I AM THE MASTER OF THE UNIVERSE.
log.info("By the power of Grayskull, I have the power!");
taskLockbox.syncFromStorage();
taskRunner = runnerFactory.build();
taskQueue = new TaskQueue(
taskQueueConfig,
taskStorage,
taskRunner,
taskActionClientFactory,
taskLockbox,
emitter
);
// Sensible order to start stuff:
final Lifecycle leaderLifecycle = new Lifecycle();
if (leaderLifecycleRef.getAndSet(leaderLifecycle) != null) {
log.makeAlert("TaskMaster set a new Lifecycle without the old one being cleared! Race condition")
.emit();
}
);
leaderLifecycle.addManagedInstance(taskQueue);
leaderLifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
serviceAnnouncer.announce(node);
}
@Override
public void stop()
{
serviceAnnouncer.unannounce(node);
leaderLifecycle.addManagedInstance(taskRunner);
if (taskRunner instanceof RemoteTaskRunner) {
final ScheduledExecutorFactory executorFactory = ScheduledExecutors.createFactory(leaderLifecycle);
resourceManagementScheduler = managementSchedulerFactory.build(
(RemoteTaskRunner) taskRunner,
executorFactory
);
leaderLifecycle.addManagedInstance(resourceManagementScheduler);
}
leaderLifecycle.addManagedInstance(taskQueue);
leaderLifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
serviceAnnouncer.announce(node);
}
@Override
public void stop()
{
serviceAnnouncer.unannounce(node);
}
}
);
try {
leaderLifecycle.start();
leading = true;
while (leading && !Thread.currentThread().isInterrupted()) {
mayBeStopped.await();
}
}
);
leaderLifecycle.addManagedInstance(taskConsumer);
if (taskRunner instanceof RemoteTaskRunner) {
final ScheduledExecutorFactory executorFactory = ScheduledExecutors.createFactory(leaderLifecycle);
resourceManagementScheduler = managementSchedulerFactory.build(
(RemoteTaskRunner) taskRunner,
executorFactory
);
leaderLifecycle.addManagedInstance(resourceManagementScheduler);
catch (InterruptedException e) {
// Suppress so we can bow out gracefully
}
finally {
log.info("Bowing out!");
stopLeading();
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to lead").emit();
throw Throwables.propagate(e);
}
finally {
giant.unlock();
}
}
try {
leaderLifecycle.start();
leading = true;
while (leading && !Thread.currentThread().isInterrupted()) {
mayBeStopped.await();
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
if (newState == ConnectionState.LOST || newState == ConnectionState.SUSPENDED) {
// disconnected from zk. assume leadership is gone
stopLeading();
}
}
catch (InterruptedException e) {
// Suppress so we can bow out gracefully
}
finally {
log.info("Bowing out!");
stopLeading();
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to lead").emit();
throw Throwables.propagate(e);
}
finally {
giant.unlock();
}
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
if (newState == ConnectionState.LOST || newState == ConnectionState.SUSPENDED) {
// disconnected from zk. assume leadership is gone
stopLeading();
}
}
}
);
leaderSelector.setId(node.getHost());
......
......@@ -24,34 +24,24 @@ import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import java.util.Collection;
import java.util.List;
/**
* Interface for handing off tasks. Used by a {@link io.druid.indexing.overlord.exec.TaskConsumer} to
* run tasks that have been locked.
* Interface for handing off tasks. Managed by a {@link io.druid.indexing.overlord.TaskQueue}.
*/
public interface TaskRunner
{
/**
* Provide a new task runner with a list of tasks that may already be running. Will be called once shortly
* after instantiation and before any calls to {@link #run}. Bootstrapping should not be construed as a command
* to run the tasks; they will be passed to {@link #run} one-by-one when this is desired. Some bootstrapped tasks
* may not actually be running (for example, if they are currently held back due to not having a lock).
*
* @param tasks the tasks
*/
public void bootstrap(List<Task> tasks);
/**
* Run a task. The returned status should be some kind of completed status.
*
* @param task task to run
*
* @return task status, eventually
*/
public ListenableFuture<TaskStatus> run(Task task);
/**
* Best-effort task shutdown. May or may not do anything.
* Inform the task runner it can clean up any resources associated with a task. This implies shutdown of any
* currently-running tasks.
*/
public void shutdown(String taskid);
......@@ -59,5 +49,7 @@ public interface TaskRunner
public Collection<? extends TaskRunnerWorkItem> getPendingTasks();
public Collection<? extends TaskRunnerWorkItem> getKnownTasks();
public Collection<ZkWorker> getWorkers();
}
......@@ -19,11 +19,9 @@
package io.druid.indexing.overlord;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ComparisonChain;
import com.google.common.util.concurrent.ListenableFuture;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator;
......@@ -32,36 +30,35 @@ import org.joda.time.DateTimeComparator;
*/
public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
{
private final Task task;
private final String taskId;
private final ListenableFuture<TaskStatus> result;
private final DateTime createdTime;
private final DateTime queueInsertionTime;
public TaskRunnerWorkItem(
Task task,
String taskId,
ListenableFuture<TaskStatus> result
)
{
this(task, result, new DateTime(), new DateTime());
this(taskId, result, new DateTime(), new DateTime());
}
public TaskRunnerWorkItem(
Task task,
String taskId,
ListenableFuture<TaskStatus> result,
DateTime createdTime,
DateTime queueInsertionTime
)
{
this.task = task;
this.taskId = taskId;
this.result = result;
this.createdTime = createdTime;
this.queueInsertionTime = queueInsertionTime;
}
@JsonProperty
public Task getTask()
public String getTaskId()
{
return task;
return taskId;
}
public ListenableFuture<TaskStatus> getResult()
......@@ -69,13 +66,11 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
return result;
}
@JsonProperty
public DateTime getCreatedTime()
{
return createdTime;
}
@JsonProperty
public DateTime getQueueInsertionTime()
{
return queueInsertionTime;
......@@ -83,7 +78,7 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
public TaskRunnerWorkItem withQueueInsertionTime(DateTime time)
{
return new TaskRunnerWorkItem(task, result, createdTime, time);
return new TaskRunnerWorkItem(taskId, result, createdTime, time);
}
@Override
......@@ -91,7 +86,7 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
{
return ComparisonChain.start()
.compare(createdTime, taskRunnerWorkItem.getCreatedTime(), DateTimeComparator.getInstance())
.compare(task.getId(), taskRunnerWorkItem.getTask().getId())
.compare(taskId, taskRunnerWorkItem.getTaskId())
.result();
}
......@@ -99,9 +94,10 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
public String toString()
{
return "TaskRunnerWorkItem{" +
"task=" + task +
"taskId='" + taskId + '\'' +
", result=" + result +
", createdTime=" + createdTime +
", queueInsertionTime=" + queueInsertionTime +
'}';
}
}
......@@ -77,7 +77,8 @@ public interface TaskStorage
public List<TaskAction> getAuditLogs(String taskid);
/**
* Returns a list of currently running or pending tasks as stored in the storage facility, in no particular order.
* Returns a list of currently running or pending tasks as stored in the storage facility. No particular order
* is guaranteed, but implementations are encouraged to return tasks in ascending order of creation.
*/
public List<Task> getActiveTasks();
......
......@@ -19,23 +19,14 @@
package io.druid.indexing.overlord;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.guava.FunctionalIterable;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.SpawnTasksAction;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.task.Task;
import io.druid.timeline.DataSegment;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
......@@ -57,126 +48,20 @@ public class TaskStorageQueryAdapter
}
/**
* Returns all recursive task statuses for a particular task, staying within the same task group. Includes that
* task, plus any tasks it spawned, and so on. Does not include spawned tasks that ended up in a different task
* group. Does not include this task's parents or siblings.
*/
public Map<String, Optional<TaskStatus>> getSameGroupChildStatuses(final String taskid)
{
final Optional<Task> taskOptional = storage.getTask(taskid);
final Optional<TaskStatus> statusOptional = storage.getStatus(taskid);
final ImmutableMap.Builder<String, Optional<TaskStatus>> resultBuilder = ImmutableMap.builder();
resultBuilder.put(taskid, statusOptional);
final Iterable<Task> nextTasks = FunctionalIterable
.create(storage.getAuditLogs(taskid)).filter(
new Predicate<TaskAction>()
{
@Override
public boolean apply(TaskAction taskAction)
{
return taskAction instanceof SpawnTasksAction;
}
}
).transformCat(
new Function<TaskAction, Iterable<Task>>()
{
@Override
public Iterable<Task> apply(TaskAction taskAction)
{
return ((SpawnTasksAction) taskAction).getNewTasks();
}
}
);
if(taskOptional.isPresent() && statusOptional.isPresent()) {
for(final Task nextTask : nextTasks) {
if(nextTask.getGroupId().equals(taskOptional.get().getGroupId())) {
resultBuilder.putAll(getSameGroupChildStatuses(nextTask.getId()));
}
}
}
return resultBuilder.build();
}
/**
* Like {@link #getSameGroupChildStatuses}, but flattens the recursive statuses into a single, merged status.
*/
public Optional<TaskStatus> getSameGroupMergedStatus(final String taskid)
{
final Map<String, Optional<TaskStatus>> statuses = getSameGroupChildStatuses(taskid);
int nSuccesses = 0;
int nFailures = 0;
int nTotal = 0;
int nPresent = 0;
for(final Optional<TaskStatus> statusOption : statuses.values()) {
nTotal ++;
if(statusOption.isPresent()) {
nPresent ++;
final TaskStatus status = statusOption.get();
if(status.isSuccess()) {
nSuccesses ++;
} else if(status.isFailure()) {
nFailures ++;
}
}
}
final Optional<TaskStatus> status;
if(nPresent == 0) {
status = Optional.absent();
} else if(nSuccesses == nTotal) {
status = Optional.of(TaskStatus.success(taskid));
} else if(nFailures > 0) {
status = Optional.of(TaskStatus.failure(taskid));
} else {
status = Optional.of(TaskStatus.running(taskid));
}
return status;
}
/**
* Returns all segments created by descendants for a particular task that stayed within the same task group. Includes
* that task, plus any tasks it spawned, and so on. Does not include spawned tasks that ended up in a different task
* group. Does not include this task's parents or siblings.
* Returns all segments created by this task.
*
* This method is useful when you want to figure out all of the things a single task spawned. It does pose issues
* with the result set perhaps growing boundlessly and we do not do anything to protect against that. Use at your
* own risk and know that at some point, we might adjust this to actually enforce some sort of limits.
*/
public Set<DataSegment> getSameGroupNewSegments(final String taskid)
public Set<DataSegment> getInsertedSegments(final String taskid)
{
final Optional<Task> taskOptional = storage.getTask(taskid);
final Set<DataSegment> segments = Sets.newHashSet();
final List<Task> nextTasks = Lists.newArrayList();
for(final TaskAction action : storage.getAuditLogs(taskid)) {
if(action instanceof SpawnTasksAction) {
nextTasks.addAll(((SpawnTasksAction) action).getNewTasks());
}
if(action instanceof SegmentInsertAction) {
for (final TaskAction action : storage.getAuditLogs(taskid)) {
if (action instanceof SegmentInsertAction) {
segments.addAll(((SegmentInsertAction) action).getSegments());
}
}
if(taskOptional.isPresent()) {
for(final Task nextTask : nextTasks) {
if(nextTask.getGroupId().equals(taskOptional.get().getGroupId())) {
segments.addAll(getSameGroupNewSegments(nextTask.getId()));
}
}
}
return segments;
}
}
......@@ -19,7 +19,7 @@
package io.druid.indexing.overlord;
import com.google.common.base.Function;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
......@@ -46,7 +46,6 @@ import org.joda.time.Interval;
import java.io.File;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListSet;
......@@ -58,7 +57,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
{
private final TaskToolboxFactory toolboxFactory;
private final ListeningExecutorService exec;
private final Set<TaskRunnerWorkItem> runningItems = new ConcurrentSkipListSet<TaskRunnerWorkItem>();
private final Set<ThreadPoolTaskRunnerWorkItem> runningItems = new ConcurrentSkipListSet<>();
private static final EmittingLogger log = new EmittingLogger(ThreadPoolTaskRunner.class);
......@@ -67,7 +66,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
TaskToolboxFactory toolboxFactory
)
{
this.toolboxFactory = toolboxFactory;
this.toolboxFactory = Preconditions.checkNotNull(toolboxFactory, "toolboxFactory");
this.exec = MoreExecutors.listeningDecorator(Execs.singleThreaded("task-runner-%d"));
}
......@@ -77,19 +76,12 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
exec.shutdownNow();
}
@Override
public void bootstrap(List<Task> tasks)
{
// do nothing
}
@Override
public ListenableFuture<TaskStatus> run(final Task task)
{
final TaskToolbox toolbox = toolboxFactory.build(task);
final ListenableFuture<TaskStatus> statusFuture = exec.submit(new ExecutorServiceTaskRunnerCallable(task, toolbox));
final TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem(task, statusFuture);
final ListenableFuture<TaskStatus> statusFuture = exec.submit(new ThreadPoolTaskRunnerCallable(task, toolbox));
final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem = new ThreadPoolTaskRunnerWorkItem(task, statusFuture);
runningItems.add(taskRunnerWorkItem);
Futures.addCallback(
statusFuture, new FutureCallback<TaskStatus>()
......@@ -115,7 +107,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
public void shutdown(final String taskid)
{
for (final TaskRunnerWorkItem runningItem : runningItems) {
if (runningItem.getTask().getId().equals(taskid)) {
if (runningItem.getTaskId().equals(taskid)) {
runningItem.getResult().cancel(true);
}
}
......@@ -124,7 +116,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
@Override
public Collection<TaskRunnerWorkItem> getRunningTasks()
{
return ImmutableList.copyOf(runningItems);
return ImmutableList.<TaskRunnerWorkItem>copyOf(runningItems);
}
@Override
......@@ -133,6 +125,12 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
return ImmutableList.of();
}
@Override
public Collection<TaskRunnerWorkItem> getKnownTasks()
{
return ImmutableList.<TaskRunnerWorkItem>copyOf(runningItems);
}
@Override
public Collection<ZkWorker> getWorkers()
{
......@@ -155,18 +153,8 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
{
QueryRunner<T> queryRunner = null;
final List<Task> runningTasks = Lists.transform(
ImmutableList.copyOf(getRunningTasks()), new Function<TaskRunnerWorkItem, Task>()
{
@Override
public Task apply(TaskRunnerWorkItem o)
{
return o.getTask();
}
}
);
for (final Task task : runningTasks) {
for (final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem : ImmutableList.copyOf(runningItems)) {
final Task task = taskRunnerWorkItem.getTask();
if (task.getDataSource().equals(query.getDataSource())) {
final QueryRunner<T> taskQueryRunner = task.getQueryRunner(query);
......@@ -185,12 +173,31 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
return queryRunner == null ? new NoopQueryRunner<T>() : queryRunner;
}
private static class ExecutorServiceTaskRunnerCallable implements Callable<TaskStatus>
private static class ThreadPoolTaskRunnerWorkItem extends TaskRunnerWorkItem
{
private final Task task;
private ThreadPoolTaskRunnerWorkItem(
Task task,
ListenableFuture<TaskStatus> result
)
{
super(task.getId(), result);
this.task = task;
}
public Task getTask()
{
return task;
}
}
private static class ThreadPoolTaskRunnerCallable implements Callable<TaskStatus>
{
private final Task task;
private final TaskToolbox toolbox;
public ExecutorServiceTaskRunnerCallable(Task task, TaskToolbox toolbox)
public ThreadPoolTaskRunnerCallable(Task task, TaskToolbox toolbox)
{
this.task = task;
this.toolbox = toolbox;
......@@ -242,10 +249,5 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
throw Throwables.propagate(e);
}
}
public TaskRunnerWorkItem getTaskRunnerWorkItem()
{
return new TaskRunnerWorkItem(task, null);
}
}
}
......@@ -71,9 +71,9 @@ public class ZkWorker implements Closeable
};
}
public void start(PathChildrenCache.StartMode startMode) throws Exception
public void start() throws Exception
{
statusCache.start(startMode);
statusCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
}
public void addListener(PathChildrenCacheListener listener)
......
......@@ -17,30 +17,63 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.http;
package io.druid.indexing.overlord.config;
import com.google.inject.Inject;
import io.druid.common.config.JacksonConfigManager;
import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskStorageQueryAdapter;
import io.druid.tasklogs.TaskLogStreamer;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Duration;
import org.joda.time.Period;
import javax.ws.rs.Path;
/**
*/
@Deprecated
@Path("/mmx/merger/v1")
public class OldOverlordResource extends OverlordResource
public class TaskQueueConfig
{
@Inject
public OldOverlordResource(
TaskMaster taskMaster,
TaskStorageQueryAdapter taskStorageQueryAdapter,
TaskLogStreamer taskLogStreamer,
JacksonConfigManager configManager
) throws Exception
@JsonProperty
private int maxSize;
@JsonProperty
private Duration startDelay;
@JsonProperty
private Duration restartDelay;
@JsonProperty
private Duration storageSyncRate;
@JsonCreator
public TaskQueueConfig(
@JsonProperty("maxSize") final Integer maxSize,
@JsonProperty("startDelay") final Period startDelay,
@JsonProperty("restartDelay") final Period restartDelay,
@JsonProperty("storageSyncRate") final Period storageSyncRate
)
{
this.maxSize = maxSize == null ? Integer.MAX_VALUE : maxSize;
this.startDelay = defaultDuration(startDelay, "PT1M");
this.restartDelay = defaultDuration(restartDelay, "PT30S");
this.storageSyncRate = defaultDuration(storageSyncRate, "PT1M");
}
public int getMaxSize()
{
return maxSize;
}
public Duration getStartDelay()
{
return startDelay;
}
public Duration getRestartDelay()
{
return restartDelay;
}
public Duration getStorageSyncRate()
{
return storageSyncRate;
}
private static Duration defaultDuration(final Period period, final String theDefault)
{
super(taskMaster, taskStorageQueryAdapter, taskLogStreamer, configManager);
return (period == null ? new Period(theDefault) : period).toStandardDuration();
}
}
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord.exec;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.TaskQueue;
import io.druid.indexing.overlord.TaskRunner;
public class TaskConsumer implements Runnable
{
private final TaskQueue queue;
private final TaskRunner runner;
private final TaskActionClientFactory taskActionClientFactory;
private final ServiceEmitter emitter;
private final Thread thready;
private volatile boolean shutdown = false;
private static final EmittingLogger log = new EmittingLogger(TaskConsumer.class);
public TaskConsumer(
TaskQueue queue,
TaskRunner runner,
TaskActionClientFactory taskActionClientFactory,
ServiceEmitter emitter
)
{
this.queue = queue;
this.runner = runner;
this.taskActionClientFactory = taskActionClientFactory;
this.emitter = emitter;
this.thready = new Thread(this);
}
@LifecycleStart
public void start()
{
thready.start();
}
@LifecycleStop
public void stop()
{
shutdown = true;
thready.interrupt();
}
@Override
public void run()
{
try {
while (!Thread.currentThread().isInterrupted()) {
final Task task;
try {
task = queue.take();
}
catch (InterruptedException e) {
log.info("Interrupted while waiting for new work");
Thread.currentThread().interrupt();
break;
}
try {
handoff(task);
}
catch (Exception e) {
log.makeAlert(e, "Failed to hand off task")
.addData("task", task.getId())
.addData("type", task.getType())
.addData("dataSource", task.getDataSource())
.addData("interval", task.getImplicitLockInterval())
.emit();
// Retry would be nice, but only after we have a way to throttle and limit them. Just fail for now.
if (!shutdown) {
queue.notify(task, TaskStatus.failure(task.getId()));
}
}
}
}
catch (Exception e) {
// exit thread
log.error(e, "Uncaught exception while consuming tasks");
throw Throwables.propagate(e);
}
}
private void handoff(final Task task) throws Exception
{
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()
.setUser2(task.getDataSource())
.setUser4(task.getType())
.setUser5(task.getImplicitLockInterval().toString());
// Run preflight checks
TaskStatus preflightStatus;
try {
preflightStatus = task.preflight(taskActionClientFactory.create(task));
log.info("Preflight done for task: %s", task.getId());
}
catch (Exception e) {
preflightStatus = TaskStatus.failure(task.getId());
log.error(e, "Exception thrown during preflight for task: %s", task.getId());
}
if (!preflightStatus.isRunnable()) {
log.info("Task finished during preflight: %s", task.getId());
queue.notify(task, preflightStatus);
return;
}
// Hand off work to TaskRunner, with a callback
final ListenableFuture<TaskStatus> status = runner.run(task);
Futures.addCallback(
status, new FutureCallback<TaskStatus>()
{
@Override
public void onSuccess(final TaskStatus status)
{
log.info("Received %s status for task: %s", status.getStatusCode(), task);
handleStatus(status);
}
@Override
public void onFailure(Throwable t)
{
log.makeAlert(t, "Failed to run task")
.addData("task", task.getId())
.addData("type", task.getType())
.addData("dataSource", task.getDataSource())
.addData("interval", task.getImplicitLockInterval())
.emit();
handleStatus(TaskStatus.failure(task.getId()));
}
private void handleStatus(TaskStatus status)
{
try {
// If we're not supposed to be running anymore, don't do anything. Somewhat racey if the flag gets set after
// we check and before we commit the database transaction, but better than nothing.
if (shutdown) {
log.info("Abandoning task due to shutdown: %s", task.getId());
return;
}
queue.notify(task, status);
// Emit event and log, if the task is done
if (status.isComplete()) {
metricBuilder.setUser3(status.getStatusCode().toString());
emitter.emit(metricBuilder.build("indexer/time/run/millis", status.getDuration()));
log.info(
"Task %s: %s (%d run duration)",
status.getStatusCode(),
task,
status.getDuration()
);
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to handle task status")
.addData("task", task.getId())
.addData("statusCode", status.getStatusCode())
.emit();
}
}
}
);
}
}
......@@ -70,14 +70,7 @@ public class OverlordResource
public Map<String, Object> apply(TaskRunnerWorkItem input)
{
return new ImmutableMap.Builder<String, Object>()
.put("id", input.getTask().getId())
.put("dataSource", input.getTask().getDataSource())
.put("interval",
!input.getTask().getImplicitLockInterval().isPresent()
? ""
: input.getTask().getImplicitLockInterval().get()
)
.put("nodeType", input.getTask().getNodeType() == null ? "" : input.getTask().getNodeType())
.put("id", input.getTaskId())
.put("createdTime", input.getCreatedTime())
.put("queueInsertionTime", input.getQueueInsertionTime())
.build();
......@@ -151,7 +144,7 @@ public class OverlordResource
@Produces("application/json")
public Response getTaskStatus(@PathParam("taskid") String taskid)
{
return optionalTaskResponse(taskid, "status", taskStorageQueryAdapter.getSameGroupMergedStatus(taskid));
return optionalTaskResponse(taskid, "status", taskStorageQueryAdapter.getStatus(taskid));
}
@GET
......@@ -159,7 +152,7 @@ public class OverlordResource
@Produces("application/json")
public Response getTaskSegments(@PathParam("taskid") String taskid)
{
final Set<DataSegment> segments = taskStorageQueryAdapter.getSameGroupNewSegments(taskid);
final Set<DataSegment> segments = taskStorageQueryAdapter.getInsertedSegments(taskid);
return Response.ok().entity(segments).build();
}
......@@ -169,13 +162,13 @@ public class OverlordResource
public Response doShutdown(@PathParam("taskid") final String taskid)
{
return asLeaderWith(
taskMaster.getTaskRunner(),
new Function<TaskRunner, Response>()
taskMaster.getTaskQueue(),
new Function<TaskQueue, Response>()
{
@Override
public Response apply(TaskRunner taskRunner)
public Response apply(TaskQueue taskQueue)
{
taskRunner.shutdown(taskid);
taskQueue.shutdown(taskid);
return Response.ok(ImmutableMap.of("task", taskid)).build();
}
}
......@@ -225,7 +218,7 @@ public class OverlordResource
final Map<String, Object> retMap;
// It would be great to verify that this worker is actually supposed to be running the task before
// actually doing the task. Some ideas for how that could be done would be using some sort of attempt_id
// actually doing the action. Some ideas for how that could be done would be using some sort of attempt_id
// or token that gets passed around.
try {
......
......@@ -20,16 +20,19 @@
package io.druid.indexing.worker.executor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import io.druid.concurrent.Execs;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.TaskRunner;
......@@ -47,6 +50,7 @@ public class ExecutorLifecycle
private static final EmittingLogger log = new EmittingLogger(ExecutorLifecycle.class);
private final ExecutorLifecycleConfig config;
private final TaskActionClientFactory taskActionClientFactory;
private final TaskRunner taskRunner;
private final ObjectMapper jsonMapper;
......@@ -57,11 +61,13 @@ public class ExecutorLifecycle
@Inject
public ExecutorLifecycle(
ExecutorLifecycleConfig config,
TaskActionClientFactory taskActionClientFactory,
TaskRunner taskRunner,
ObjectMapper jsonMapper
)
{
this.config = config;
this.taskActionClientFactory = taskActionClientFactory;
this.taskRunner = taskRunner;
this.jsonMapper = jsonMapper;
}
......@@ -69,9 +75,9 @@ public class ExecutorLifecycle
@LifecycleStart
public void start()
{
final File taskFile = config.getTaskFile();
final File statusFile = config.getStatusFile();
final InputStream parentStream = config.getParentStream();
final File taskFile = Preconditions.checkNotNull(config.getTaskFile(), "taskFile");
final File statusFile = Preconditions.checkNotNull(config.getStatusFile(), "statusFile");
final InputStream parentStream = Preconditions.checkNotNull(config.getParentStream(), "parentStream");
final Task task;
......@@ -111,28 +117,41 @@ public class ExecutorLifecycle
}
);
statusFuture = Futures.transform(
taskRunner.run(task), new Function<TaskStatus, TaskStatus>()
{
@Override
public TaskStatus apply(TaskStatus taskStatus)
{
try {
log.info(
"Task completed with status: %s",
jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(taskStatus)
);
statusFile.getParentFile().mkdirs();
jsonMapper.writeValue(statusFile, taskStatus);
return taskStatus;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
// Won't hurt in remote mode, and is required for setting up locks in local mode:
try {
if (!task.isReady(taskActionClientFactory.create(task))) {
throw new ISE("Task is not ready to run yet!", task.getId());
}
} catch (Exception e) {
throw new ISE(e, "Failed to run isReady", task.getId());
}
statusFuture = Futures.transform(
taskRunner.run(task),
new Function<TaskStatus, TaskStatus>()
{
@Override
public TaskStatus apply(TaskStatus taskStatus)
{
try {
log.info(
"Task completed with status: %s",
jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(taskStatus)
);
final File statusFileParent = statusFile.getParentFile();
if (statusFileParent != null) {
statusFileParent.mkdirs();
}
jsonMapper.writeValue(statusFile, taskStatus);
return taskStatus;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
}
......
......@@ -67,7 +67,7 @@ public class MergeTaskBaseTest
@Test
public void testInterval()
{
Assert.assertEquals(new Interval("2012-01-03/2012-01-07"), testMergeTaskBase.getImplicitLockInterval().get());
Assert.assertEquals(new Interval("2012-01-03/2012-01-07"), testMergeTaskBase.getInterval());
}
@Test
......
......@@ -28,6 +28,7 @@ import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockListAction;
import io.druid.indexing.common.actions.LockReleaseAction;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.task.AbstractTask;
import io.druid.indexing.common.task.TaskResource;
import io.druid.timeline.DataSegment;
......@@ -42,12 +43,12 @@ public class RealtimeishTask extends AbstractTask
{
public RealtimeishTask()
{
super("rt1", "rt", new TaskResource("rt1", 1), "foo", null);
super("rt1", "rt", new TaskResource("rt1", 1), "foo");
}
public RealtimeishTask(String id, String groupId, TaskResource taskResource, String dataSource, Interval interval)
public RealtimeishTask(String id, String groupId, TaskResource taskResource, String dataSource)
{
super(id, groupId, taskResource, dataSource, interval);
super(id, groupId, taskResource, dataSource);
}
@Override
......@@ -56,6 +57,12 @@ public class RealtimeishTask extends AbstractTask
return "realtime_test";
}
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
return true;
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
......
......@@ -39,7 +39,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.25.0</metamx.java-util.version>
<metamx.java-util.version>0.25.1</metamx.java-util.version>
<apache.curator.version>2.1.0-incubating</apache.curator.version>
<druid.api.version>0.1.5</druid.api.version>
</properties>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册