提交 c9b411c0 编写于 作者: E Eric Tschetter

1) Remove the need for TaskActions to require a Task as a constructor parameter

上级 a11a34f8
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.common;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.MMappedQueryableIndexFactory;
import com.metamx.druid.loading.S3DataSegmentPuller;
import com.metamx.druid.loading.SegmentKiller;
import com.metamx.druid.loading.SegmentLoaderConfig;
import com.metamx.druid.loading.SegmentLoadingException;
import com.metamx.druid.loading.SingleSegmentLoader;
import com.metamx.druid.merger.common.actions.TaskActionClient;
import com.metamx.druid.merger.common.actions.TaskActionClientFactory;
import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.emitter.service.ServiceEmitter;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import java.io.File;
import java.util.List;
import java.util.Map;
/**
* Stuff that may be needed by a Task in order to conduct its business.
*/
public class TaskToolboxFactory
{
private final TaskConfig config;
private final TaskActionClientFactory taskActionClient;
private final ServiceEmitter emitter;
private final RestS3Service s3Client;
private final DataSegmentPusher segmentPusher;
private final SegmentKiller segmentKiller;
private final ObjectMapper objectMapper;
public TaskToolboxFactory(
TaskConfig config,
TaskActionClientFactory taskActionClient,
ServiceEmitter emitter,
RestS3Service s3Client,
DataSegmentPusher segmentPusher,
SegmentKiller segmentKiller,
ObjectMapper objectMapper
)
{
this.config = config;
this.taskActionClient = taskActionClient;
this.emitter = emitter;
this.s3Client = s3Client;
this.segmentPusher = segmentPusher;
this.segmentKiller = segmentKiller;
this.objectMapper = objectMapper;
}
public ObjectMapper getObjectMapper()
{
return objectMapper;
}
public TaskToolbox build(Task task)
{
return new TaskToolbox(
config,
taskActionClient == null ? null : taskActionClient.create(task),
emitter,
s3Client,
segmentPusher,
segmentKiller,
objectMapper
);
}
}
package com.metamx.druid.merger.common.actions;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.TaskStorage;
import com.metamx.emitter.EmittingLogger;
public class LocalTaskActionClient implements TaskActionClient
{
private final Task task;
private final TaskStorage storage;
private final TaskActionToolbox toolbox;
private static final EmittingLogger log = new EmittingLogger(LocalTaskActionClient.class);
public LocalTaskActionClient(TaskStorage storage, TaskActionToolbox toolbox)
public LocalTaskActionClient(Task task, TaskStorage storage, TaskActionToolbox toolbox)
{
this.task = task;
this.storage = storage;
this.toolbox = toolbox;
}
......@@ -19,15 +22,15 @@ public class LocalTaskActionClient implements TaskActionClient
@Override
public <RetType> RetType submit(TaskAction<RetType> taskAction)
{
final RetType ret = taskAction.perform(toolbox);
final RetType ret = taskAction.perform(task, toolbox);
// Add audit log
try {
storage.addAuditLog(taskAction);
storage.addAuditLog(task, taskAction);
}
catch (Exception e) {
log.makeAlert(e, "Failed to record action in audit log")
.addData("task", taskAction.getTask().getId())
.addData("task", task.getId())
.addData("actionClass", taskAction.getClass().getName())
.emit();
}
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.common.actions;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.TaskStorage;
/**
*/
public class LocalTaskActionClientFactory implements TaskActionClientFactory
{
private final TaskStorage storage;
private final TaskActionToolbox toolbox;
public LocalTaskActionClientFactory(TaskStorage storage, TaskActionToolbox toolbox)
{
this.storage = storage;
this.toolbox = toolbox;
}
@Override
public TaskActionClient create(Task task)
{
return new LocalTaskActionClient(task, storage, toolbox);
}
}
......@@ -11,25 +11,16 @@ import org.joda.time.Interval;
public class LockAcquireAction implements TaskAction<Optional<TaskLock>>
{
private final Task task;
private final Interval interval;
@JsonCreator
public LockAcquireAction(
@JsonProperty("task") Task task,
@JsonProperty("interval") Interval interval
)
{
this.task = task;
this.interval = interval;
}
@JsonProperty
public Task getTask()
{
return task;
}
@JsonProperty
public Interval getInterval()
{
......@@ -42,7 +33,7 @@ public class LockAcquireAction implements TaskAction<Optional<TaskLock>>
}
@Override
public Optional<TaskLock> perform(TaskActionToolbox toolbox)
public Optional<TaskLock> perform(Task task, TaskActionToolbox toolbox)
{
try {
return toolbox.getTaskLockbox().tryLock(task, interval);
......
......@@ -12,29 +12,13 @@ import java.util.List;
public class LockListAction implements TaskAction<List<TaskLock>>
{
private final Task task;
@JsonCreator
public LockListAction(
@JsonProperty("task") Task task
)
{
this.task = task;
}
@JsonProperty
public Task getTask()
{
return task;
}
public TypeReference<List<TaskLock>> getReturnTypeReference()
{
return new TypeReference<List<TaskLock>>() {};
}
@Override
public List<TaskLock> perform(TaskActionToolbox toolbox)
public List<TaskLock> perform(Task task, TaskActionToolbox toolbox)
{
try {
return toolbox.getTaskLockbox().findLocksForTask(task);
......
......@@ -12,25 +12,16 @@ import java.util.List;
public class LockReleaseAction implements TaskAction<Void>
{
private final Task task;
private final Interval interval;
@JsonCreator
public LockReleaseAction(
@JsonProperty("task") Task task,
@JsonProperty("interval") Interval interval
)
{
this.task = task;
this.interval = interval;
}
@JsonProperty
public Task getTask()
{
return task;
}
@JsonProperty
public Interval getInterval()
{
......@@ -43,7 +34,7 @@ public class LockReleaseAction implements TaskAction<Void>
}
@Override
public Void perform(TaskActionToolbox toolbox)
public Void perform(Task task, TaskActionToolbox toolbox)
{
try {
toolbox.getTaskLockbox().unlock(task, interval);
......
......@@ -4,6 +4,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.metamx.common.logger.Logger;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.ToStringResponseHandler;
import com.fasterxml.jackson.databind.ObjectMapper;
......@@ -16,14 +17,16 @@ import java.util.Map;
public class RemoteTaskActionClient implements TaskActionClient
{
private final Task task;
private final HttpClient httpClient;
private final ServiceProvider serviceProvider;
private final ObjectMapper jsonMapper;
private static final Logger log = new Logger(RemoteTaskActionClient.class);
public RemoteTaskActionClient(HttpClient httpClient, ServiceProvider serviceProvider, ObjectMapper jsonMapper)
public RemoteTaskActionClient(Task task, HttpClient httpClient, ServiceProvider serviceProvider, ObjectMapper jsonMapper)
{
this.task = task;
this.httpClient = httpClient;
this.serviceProvider = serviceProvider;
this.jsonMapper = jsonMapper;
......@@ -33,7 +36,7 @@ public class RemoteTaskActionClient implements TaskActionClient
public <RetType> RetType submit(TaskAction<RetType> taskAction)
{
try {
byte[] dataToSend = jsonMapper.writeValueAsBytes(taskAction);
byte[] dataToSend = jsonMapper.writeValueAsBytes(new TaskActionHolder(task, taskAction));
final String response = httpClient.post(getServiceUri().toURL())
.setContent("application/json", dataToSend)
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.common.actions;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.http.client.HttpClient;
import com.netflix.curator.x.discovery.ServiceProvider;
/**
*/
public class RemoteTaskActionClientFactory implements TaskActionClientFactory
{
private final HttpClient httpClient;
private final ServiceProvider serviceProvider;
private final ObjectMapper jsonMapper;
public RemoteTaskActionClientFactory(HttpClient httpClient, ServiceProvider serviceProvider, ObjectMapper jsonMapper)
{
this.httpClient = httpClient;
this.serviceProvider = serviceProvider;
this.jsonMapper = jsonMapper;
}
@Override
public TaskActionClient create(Task task)
{
return new RemoteTaskActionClient(task, httpClient, serviceProvider, jsonMapper);
}
}
......@@ -18,25 +18,16 @@ import java.util.Set;
public class SegmentInsertAction implements TaskAction<Void>
{
private final Task task;
private final Set<DataSegment> segments;
@JsonCreator
public SegmentInsertAction(
@JsonProperty("task") Task task,
@JsonProperty("segments") Set<DataSegment> segments
)
{
this.task = task;
this.segments = ImmutableSet.copyOf(segments);
}
@JsonProperty
public Task getTask()
{
return task;
}
@JsonProperty
public Set<DataSegment> getSegments()
{
......@@ -49,7 +40,7 @@ public class SegmentInsertAction implements TaskAction<Void>
}
@Override
public Void perform(TaskActionToolbox toolbox)
public Void perform(Task task, TaskActionToolbox toolbox)
{
if(!toolbox.taskLockCoversSegments(task, segments, false)) {
throw new ISE("Segments not covered by locks for task: %s", task.getId());
......
......@@ -12,28 +12,19 @@ import java.util.List;
public class SegmentListUnusedAction implements TaskAction<List<DataSegment>>
{
private final Task task;
private final String dataSource;
private final Interval interval;
@JsonCreator
public SegmentListUnusedAction(
@JsonProperty("task") Task task,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
)
{
this.task = task;
this.dataSource = dataSource;
this.interval = interval;
}
@JsonProperty
public Task getTask()
{
return task;
}
@JsonProperty
public String getDataSource()
{
......@@ -52,7 +43,7 @@ public class SegmentListUnusedAction implements TaskAction<List<DataSegment>>
}
@Override
public List<DataSegment> perform(TaskActionToolbox toolbox)
public List<DataSegment> perform(Task task, TaskActionToolbox toolbox)
{
try {
return toolbox.getMergerDBCoordinator().getUnusedSegmentsForInterval(dataSource, interval);
......
......@@ -12,28 +12,19 @@ import java.util.List;
public class SegmentListUsedAction implements TaskAction<List<DataSegment>>
{
private final Task task;
private final String dataSource;
private final Interval interval;
@JsonCreator
public SegmentListUsedAction(
@JsonProperty("task") Task task,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
)
{
this.task = task;
this.dataSource = dataSource;
this.interval = interval;
}
@JsonProperty
public Task getTask()
{
return task;
}
@JsonProperty
public String getDataSource()
{
......@@ -52,7 +43,7 @@ public class SegmentListUsedAction implements TaskAction<List<DataSegment>>
}
@Override
public List<DataSegment> perform(TaskActionToolbox toolbox)
public List<DataSegment> perform(Task task, TaskActionToolbox toolbox)
{
try {
return toolbox.getMergerDBCoordinator().getUsedSegmentsForInterval(dataSource, interval);
......
......@@ -18,25 +18,16 @@ import java.util.Set;
public class SegmentNukeAction implements TaskAction<Void>
{
private final Task task;
private final Set<DataSegment> segments;
@JsonCreator
public SegmentNukeAction(
@JsonProperty("task") Task task,
@JsonProperty("segments") Set<DataSegment> segments
)
{
this.task = task;
this.segments = ImmutableSet.copyOf(segments);
}
@JsonProperty
public Task getTask()
{
return task;
}
@JsonProperty
public Set<DataSegment> getSegments()
{
......@@ -49,7 +40,7 @@ public class SegmentNukeAction implements TaskAction<Void>
}
@Override
public Void perform(TaskActionToolbox toolbox)
public Void perform(Task task, TaskActionToolbox toolbox)
{
if(!toolbox.taskLockCoversSegments(task, segments, true)) {
throw new ISE("Segments not covered by locks for task: %s", task.getId());
......
......@@ -11,25 +11,16 @@ import java.util.List;
public class SpawnTasksAction implements TaskAction<Void>
{
private final Task task;
private final List<Task> newTasks;
@JsonCreator
public SpawnTasksAction(
@JsonProperty("task") Task task,
@JsonProperty("newTasks") List<Task> newTasks
)
{
this.task = task;
this.newTasks = ImmutableList.copyOf(newTasks);
}
@JsonProperty
public Task getTask()
{
return task;
}
@JsonProperty
public List<Task> getNewTasks()
{
......@@ -42,7 +33,7 @@ public class SpawnTasksAction implements TaskAction<Void>
}
@Override
public Void perform(TaskActionToolbox toolbox)
public Void perform(Task task, TaskActionToolbox toolbox)
{
try {
for(final Task newTask : newTasks) {
......
......@@ -18,7 +18,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
})
public interface TaskAction<RetType>
{
public Task getTask(); // TODO Look into replacing this with task ID so stuff serializes smaller
public TypeReference<RetType> getReturnTypeReference(); // T_T
public RetType perform(TaskActionToolbox toolbox);
public RetType perform(Task task, TaskActionToolbox toolbox);
}
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.common.actions;
import com.metamx.druid.merger.common.task.Task;
/**
*/
public interface TaskActionClientFactory
{
public TaskActionClient create(Task task);
}
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.common.actions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.druid.merger.common.task.Task;
/**
*/
public class TaskActionHolder<T>
{
private final Task task;
private final TaskAction<T> action;
@JsonCreator
public TaskActionHolder(
@JsonProperty("task") Task task,
@JsonProperty("action") TaskAction action
)
{
this.task = task;
this.action = action;
}
@JsonProperty
public Task getTask()
{
return task;
}
@JsonProperty
public TaskAction<T> getAction()
{
return action;
}
}
......@@ -102,9 +102,9 @@ public abstract class AbstractTask implements Task
return ID_JOINER.join(objects);
}
public SegmentListUsedAction makeListUsedAction()
public SegmentListUsedAction makeImplicitListUsedAction()
{
return new SegmentListUsedAction(this, getDataSource(), getFixedInterval().get());
return new SegmentListUsedAction(getDataSource(), getImplicitLockInterval().get());
}
public TaskStatus success()
......
......@@ -77,7 +77,7 @@ public class DeleteTask extends AbstractTask
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
// Strategy: Create an empty segment covering the interval to be deleted
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction(this)));
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
final Interval interval = this.getImplicitLockInterval().get();
final IncrementalIndex empty = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0]);
final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(interval, empty);
......@@ -104,7 +104,7 @@ public class DeleteTask extends AbstractTask
segment.getVersion()
);
toolbox.getTaskActionClient().submit(new SegmentInsertAction(this, ImmutableSet.of(uploadedSegment)));
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment)));
return TaskStatus.success(getId());
}
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
......@@ -74,7 +93,7 @@ public class HadoopIndexTask extends AbstractTask
);
// We should have a lock from before we started running
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction(this)));
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
log.info("Setting version to: %s", myLock.getVersion());
configCopy.setVersion(myLock.getVersion());
......@@ -105,7 +124,7 @@ public class HadoopIndexTask extends AbstractTask
List<DataSegment> publishedSegments = job.getPublishedSegments();
// Request segment pushes
toolbox.getTaskActionClient().submit(new SegmentInsertAction(this, ImmutableSet.copyOf(publishedSegments)));
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(publishedSegments)));
// Done
return TaskStatus.success(getId());
......
......@@ -258,7 +258,7 @@ public class IndexDeterminePartitionsTask extends AbstractTask
}
);
toolbox.getTaskActionClient().submit(new SpawnTasksAction(this, nextTasks));
toolbox.getTaskActionClient().submit(new SpawnTasksAction(nextTasks));
return TaskStatus.success(getId());
}
......
......@@ -100,7 +100,7 @@ public class IndexGeneratorTask extends AbstractTask
public TaskStatus run(final TaskToolbox toolbox) throws Exception
{
// We should have a lock from before we started running
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction(this)));
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
// We know this exists
final Interval interval = getImplicitLockInterval().get();
......@@ -193,7 +193,7 @@ public class IndexGeneratorTask extends AbstractTask
);
// Request segment pushes
toolbox.getTaskActionClient().submit(new SegmentInsertAction(this, ImmutableSet.copyOf(pushedSegments)));
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(pushedSegments)));
// Done
return TaskStatus.success(getId());
......
......@@ -142,7 +142,7 @@ public class IndexTask extends AbstractTask
@Override
public TaskStatus preflight(TaskToolbox toolbox) throws Exception
{
toolbox.getTaskActionClient().submit(new SpawnTasksAction(this, toSubtasks()));
toolbox.getTaskActionClient().submit(new SpawnTasksAction(toSubtasks()));
return TaskStatus.success(getId());
}
......
......@@ -72,7 +72,7 @@ public class KillTask extends AbstractTask
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
// Confirm we have a lock (will throw if there isn't exactly one element)
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction(this)));
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
if(!myLock.getDataSource().equals(getDataSource())) {
throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource());
......@@ -85,7 +85,7 @@ public class KillTask extends AbstractTask
// List unused segments
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
.submit(new SegmentListUnusedAction(this, myLock.getDataSource(), myLock.getInterval()));
.submit(new SegmentListUnusedAction(myLock.getDataSource(), myLock.getInterval()));
// Verify none of these segments have versions > lock version
for(final DataSegment unusedSegment : unusedSegments) {
......@@ -105,7 +105,7 @@ public class KillTask extends AbstractTask
toolbox.getSegmentKiller().kill(unusedSegments);
// Remove metadata for these segments
toolbox.getTaskActionClient().submit(new SegmentNukeAction(this, ImmutableSet.copyOf(unusedSegments)));
toolbox.getTaskActionClient().submit(new SegmentNukeAction(ImmutableSet.copyOf(unusedSegments)));
return TaskStatus.success(getId());
}
......
......@@ -119,7 +119,7 @@ public abstract class MergeTask extends AbstractTask
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction(this)));
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
final ServiceEmitter emitter = toolbox.getEmitter();
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
final DataSegment mergedSegment = computeMergedSegment(getDataSource(), myLock.getVersion(), segments);
......@@ -170,7 +170,7 @@ public abstract class MergeTask extends AbstractTask
emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart));
emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize()));
toolbox.getTaskActionClient().submit(new SegmentInsertAction(this, ImmutableSet.of(uploadedSegment)));
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment)));
return TaskStatus.success(getId());
}
......@@ -215,7 +215,7 @@ public abstract class MergeTask extends AbstractTask
final Set<String> current = ImmutableSet.copyOf(
Iterables.transform(
toolbox.getTaskActionClient()
.submit(new SegmentListUsedAction(this, getDataSource(), getImplicitLockInterval().get())),
.submit(new SegmentListUsedAction(getDataSource(), getImplicitLockInterval().get())),
toIdentifier
)
);
......
......@@ -52,7 +52,7 @@ import org.joda.time.Interval;
@JsonSubTypes.Type(name = "index_generator", value = IndexGeneratorTask.class),
@JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class),
@JsonSubTypes.Type(name = "version_converter", value = VersionConverterTask.class),
@JsonSubTypes.Type(name = "version_converter_sub", value = VersionConverterSubTask.class),
@JsonSubTypes.Type(name = "version_converter_sub", value = VersionConverterSubTask.class)
})
public interface Task
{
......
......@@ -71,7 +71,7 @@ public class VersionConverterSubTask extends AbstractTask
final File outLocation = new File(location, "v9_out");
if (IndexIO.convertSegment(location, outLocation)) {
final DataSegment updatedSegment = toolbox.getSegmentPusher().push(outLocation, segment);
toolbox.getTaskActionClient().submit(new SegmentInsertAction(this, Sets.newHashSet(updatedSegment)));
toolbox.getTaskActionClient().submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment)));
}
return success();
......
......@@ -50,11 +50,10 @@ public class VersionConverterTask extends AbstractTask
{
final TaskActionClient taskClient = toolbox.getTaskActionClient();
List<DataSegment> segments = taskClient.submit(makeListUsedAction());
List<DataSegment> segments = taskClient.submit(makeImplicitListUsedAction());
taskClient.submit(
new SpawnTasksAction(
this,
Lists.transform(
segments,
new Function<DataSegment, Task>()
......
......@@ -327,11 +327,11 @@ public class DbTaskStorage implements TaskStorage
}
@Override
public <T> void addAuditLog(final TaskAction<T> taskAction)
public <T> void addAuditLog(final Task task, final TaskAction<T> taskAction)
{
Preconditions.checkNotNull(taskAction, "taskAction");
log.info("Logging action for task[%s]: %s", taskAction.getTask().getId(), taskAction);
log.info("Logging action for task[%s]: %s", task.getId(), taskAction);
dbi.withHandle(
new HandleCallback<Integer>()
......@@ -345,7 +345,7 @@ public class DbTaskStorage implements TaskStorage
dbConnectorConfig.getTaskLogTable()
)
)
.bind("task_id", taskAction.getTask().getId())
.bind("task_id", task.getId())
.bind("log_payload", jsonMapper.writeValueAsString(taskAction))
.execute();
}
......
......@@ -40,14 +40,14 @@ import java.util.concurrent.locks.ReentrantLock;
* Implements an in-heap TaskStorage facility, with no persistence across restarts. This class is not
* thread safe.
*/
public class LocalTaskStorage implements TaskStorage
public class HeapMemoryTaskStorage implements TaskStorage
{
private final ReentrantLock giant = new ReentrantLock();
private final Map<String, TaskStuff> tasks = Maps.newHashMap();
private final Multimap<String, TaskLock> taskLocks = HashMultimap.create();
private final Multimap<String, TaskAction> taskActions = ArrayListMultimap.create();
private static final Logger log = new Logger(LocalTaskStorage.class);
private static final Logger log = new Logger(HeapMemoryTaskStorage.class);
@Override
public void insert(Task task, TaskStatus status)
......@@ -185,12 +185,12 @@ public class LocalTaskStorage implements TaskStorage
}
@Override
public <T> void addAuditLog(TaskAction<T> taskAction)
public <T> void addAuditLog(Task task, TaskAction<T> taskAction)
{
giant.lock();
try {
taskActions.put(taskAction.getTask().getId(), taskAction);
taskActions.put(task.getId(), taskAction);
} finally {
giant.unlock();
}
......
......@@ -26,6 +26,7 @@ import com.metamx.common.logger.Logger;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.task.Task;
import org.apache.commons.io.FileUtils;
......@@ -38,17 +39,17 @@ import java.util.concurrent.ExecutorService;
*/
public class LocalTaskRunner implements TaskRunner
{
private final TaskToolbox toolbox;
private final TaskToolboxFactory toolboxFactory;
private final ExecutorService exec;
private static final Logger log = new Logger(LocalTaskRunner.class);
public LocalTaskRunner(
TaskToolbox toolbox,
TaskToolboxFactory toolboxFactory,
ExecutorService exec
)
{
this.toolbox = toolbox;
this.toolboxFactory = toolboxFactory;
this.exec = exec;
}
......@@ -61,6 +62,8 @@ public class LocalTaskRunner implements TaskRunner
@Override
public void run(final Task task, final TaskCallback callback)
{
final TaskToolbox toolbox = toolboxFactory.build(task);
exec.submit(
new Runnable()
{
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator;
import com.google.common.base.Function;
......
......@@ -26,6 +26,8 @@ import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.merger.coordinator.exec.TaskConsumer;
import com.metamx.druid.merger.coordinator.scaling.ResourceManagementScheduler;
......@@ -49,7 +51,7 @@ public class TaskMasterLifecycle
private final ReentrantLock giant = new ReentrantLock();
private final Condition mayBeStopped = giant.newCondition();
private final TaskQueue taskQueue;
private final TaskToolbox taskToolbox;
private final TaskToolboxFactory taskToolboxFactory;
private volatile boolean leading = false;
private volatile TaskRunner taskRunner;
......@@ -59,7 +61,7 @@ public class TaskMasterLifecycle
public TaskMasterLifecycle(
final TaskQueue taskQueue,
final TaskToolbox taskToolbox,
final TaskToolboxFactory taskToolboxFactory,
final IndexerCoordinatorConfig indexerCoordinatorConfig,
final ServiceDiscoveryConfig serviceDiscoveryConfig,
final TaskRunnerFactory runnerFactory,
......@@ -69,7 +71,7 @@ public class TaskMasterLifecycle
)
{
this.taskQueue = taskQueue;
this.taskToolbox = taskToolbox;
this.taskToolboxFactory = taskToolboxFactory;
this.leaderSelector = new LeaderSelector(
curator, indexerCoordinatorConfig.getLeaderLatchPath(), new LeaderSelectorListener()
......@@ -87,7 +89,7 @@ public class TaskMasterLifecycle
final TaskConsumer taskConsumer = new TaskConsumer(
taskQueue,
taskRunner,
taskToolbox,
taskToolboxFactory,
emitter
);
......@@ -217,9 +219,9 @@ public class TaskMasterLifecycle
return taskQueue;
}
public TaskToolbox getTaskToolbox()
public TaskToolbox getTaskToolbox(Task task)
{
return taskToolbox;
return taskToolboxFactory.build(task);
}
public ResourceManagementScheduler getResourceManagementScheduler()
......
......@@ -69,7 +69,7 @@ public interface TaskStorage
/**
* Add an action taken by a task to the audit log.
*/
public <T> void addAuditLog(TaskAction<T> taskAction);
public <T> void addAuditLog(Task task, TaskAction<T> taskAction);
/**
* Returns all actions taken by a task.
......
......@@ -24,7 +24,7 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.TaskQueue;
import com.metamx.druid.merger.coordinator.TaskRunner;
......@@ -36,7 +36,7 @@ public class TaskConsumer implements Runnable
{
private final TaskQueue queue;
private final TaskRunner runner;
private final TaskToolbox toolbox;
private final TaskToolboxFactory toolboxFactory;
private final ServiceEmitter emitter;
private final Thread thready;
......@@ -47,13 +47,13 @@ public class TaskConsumer implements Runnable
public TaskConsumer(
TaskQueue queue,
TaskRunner runner,
TaskToolbox toolbox,
TaskToolboxFactory toolboxFactory,
ServiceEmitter emitter
)
{
this.queue = queue;
this.runner = runner;
this.toolbox = toolbox;
this.toolboxFactory = toolboxFactory;
this.emitter = emitter;
this.thready = new Thread(this);
}
......@@ -123,7 +123,7 @@ public class TaskConsumer implements Runnable
// Run preflight checks
TaskStatus preflightStatus;
try {
preflightStatus = task.preflight(toolbox);
preflightStatus = task.preflight(toolboxFactory.build(task));
log.info("Preflight done for task: %s", task.getId());
}
catch (Exception e) {
......
......@@ -48,22 +48,21 @@ import com.metamx.druid.http.RedirectInfo;
import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentPusherConfig;
import com.metamx.druid.loading.S3SegmentKiller;
import com.metamx.druid.loading.SegmentKiller;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.LocalTaskActionClient;
import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.actions.LocalTaskActionClientFactory;
import com.metamx.druid.merger.common.actions.TaskActionToolbox;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.merger.coordinator.DbTaskStorage;
import com.metamx.druid.merger.coordinator.HeapMemoryTaskStorage;
import com.metamx.druid.merger.coordinator.LocalTaskRunner;
import com.metamx.druid.merger.coordinator.LocalTaskStorage;
import com.metamx.druid.merger.coordinator.MergerDBCoordinator;
import com.metamx.druid.merger.coordinator.RemoteTaskRunner;
import com.metamx.druid.merger.coordinator.RetryPolicyFactory;
......@@ -147,7 +146,8 @@ public class IndexerCoordinatorNode extends RegisteringNode
private RestS3Service s3Service = null;
private IndexerCoordinatorConfig config = null;
private TaskConfig taskConfig = null;
private TaskToolbox taskToolbox = null;
private DataSegmentPusher segmentPusher = null;
private TaskToolboxFactory taskToolboxFactory = null;
private MergerDBCoordinator mergerDBCoordinator = null;
private TaskStorage taskStorage = null;
private TaskQueue taskQueue = null;
......@@ -208,6 +208,12 @@ public class IndexerCoordinatorNode extends RegisteringNode
return this;
}
public IndexerCoordinatorNode setSegmentPusher(DataSegmentPusher segmentPusher)
{
this.segmentPusher = segmentPusher;
return this;
}
public IndexerCoordinatorNode setMergeDbCoordinator(MergerDBCoordinator mergeDbCoordinator)
{
this.mergerDBCoordinator = mergeDbCoordinator;
......@@ -252,6 +258,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
initializeTaskStorage();
initializeTaskLockbox();
initializeTaskQueue();
initializeDataSegmentPusher();
initializeTaskToolbox();
initializeJacksonInjections();
initializeJacksonSubtypes();
......@@ -339,7 +346,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
final ServiceDiscoveryConfig serviceDiscoveryConfig = configFactory.build(ServiceDiscoveryConfig.class);
taskMasterLifecycle = new TaskMasterLifecycle(
taskQueue,
taskToolbox,
taskToolboxFactory,
config,
serviceDiscoveryConfig,
taskRunnerFactory,
......@@ -403,7 +410,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
InjectableValues.Std injectables = new InjectableValues.Std();
injectables.addValue("s3Client", s3Service)
.addValue("segmentPusher", taskToolbox.getSegmentPusher());
.addValue("segmentPusher", segmentPusher);
jsonMapper.setInjectableValues(injectables);
}
......@@ -472,26 +479,26 @@ public class IndexerCoordinatorNode extends RegisteringNode
);
}
public void initializeDataSegmentPusher()
{
if (segmentPusher == null) {
segmentPusher = ServerInit.getSegmentPusher(props, configFactory, jsonMapper);
}
}
public void initializeTaskToolbox()
{
if (taskToolbox == null) {
final DataSegmentPusher dataSegmentPusher = new S3DataSegmentPusher(
s3Service,
configFactory.build(S3DataSegmentPusherConfig.class),
jsonMapper
);
final SegmentKiller segmentKiller = new S3SegmentKiller(
s3Service
);
taskToolbox = new TaskToolbox(
if (taskToolboxFactory == null) {
final SegmentKiller segmentKiller = new S3SegmentKiller(s3Service);
taskToolboxFactory = new TaskToolboxFactory(
taskConfig,
new LocalTaskActionClient(
new LocalTaskActionClientFactory(
taskStorage,
new TaskActionToolbox(taskQueue, taskLockbox, mergerDBCoordinator, emitter)
),
emitter,
s3Service,
dataSegmentPusher,
segmentPusher,
segmentKiller,
jsonMapper
);
......@@ -546,7 +553,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
{
if (taskStorage == null) {
if (config.getStorageImpl().equals("local")) {
taskStorage = new LocalTaskStorage();
taskStorage = new HeapMemoryTaskStorage();
} else if (config.getStorageImpl().equals("db")) {
final IndexerDbConnectorConfig dbConnectorConfig = configFactory.build(IndexerDbConnectorConfig.class);
taskStorage = new DbTaskStorage(jsonMapper, dbConnectorConfig, new DbConnector(dbConnectorConfig).getDBI());
......@@ -615,7 +622,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
public TaskRunner build()
{
final ExecutorService runnerExec = Executors.newFixedThreadPool(config.getNumLocalThreads());
return new LocalTaskRunner(taskToolbox, runnerExec);
return new LocalTaskRunner(taskToolboxFactory, runnerExec);
}
};
} else {
......
......@@ -28,6 +28,7 @@ import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.actions.TaskAction;
import com.metamx.druid.merger.common.actions.TaskActionHolder;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.TaskMasterLifecycle;
import com.metamx.druid.merger.coordinator.TaskStorageQueryAdapter;
......@@ -181,9 +182,12 @@ public class IndexerCoordinatorResource
@POST
@Path("/action")
@Produces("application/json")
public <T> Response doAction(final TaskAction<T> action)
public <T> Response doAction(final TaskActionHolder<T> holder)
{
final T ret = taskMasterLifecycle.getTaskToolbox().getTaskActionClient().submit(action);
final T ret = taskMasterLifecycle.getTaskToolbox(holder.getTask())
.getTaskActionClient()
.submit(holder.getAction());
final Map<String, Object> retMap = Maps.newHashMap();
retMap.put("result", ret);
......
......@@ -23,6 +23,7 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.emitter.EmittingLogger;
import com.netflix.curator.framework.CuratorFramework;
......@@ -45,21 +46,21 @@ public class TaskMonitor
private final PathChildrenCache pathChildrenCache;
private final CuratorFramework cf;
private final WorkerCuratorCoordinator workerCuratorCoordinator;
private final TaskToolbox toolbox;
private final TaskToolboxFactory toolboxFactory;
private final ExecutorService exec;
public TaskMonitor(
PathChildrenCache pathChildrenCache,
CuratorFramework cf,
WorkerCuratorCoordinator workerCuratorCoordinator,
TaskToolbox toolbox,
TaskToolboxFactory toolboxFactory,
ExecutorService exec
)
{
this.pathChildrenCache = pathChildrenCache;
this.cf = cf;
this.workerCuratorCoordinator = workerCuratorCoordinator;
this.toolbox = toolbox;
this.toolboxFactory = toolboxFactory;
this.exec = exec;
}
......@@ -81,10 +82,11 @@ public class TaskMonitor
throws Exception
{
if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
final Task task = toolbox.getObjectMapper().readValue(
final Task task = toolboxFactory.getObjectMapper().readValue(
cf.getData().forPath(pathChildrenCacheEvent.getData().getPath()),
Task.class
);
final TaskToolbox toolbox = toolboxFactory.build(task);
if (workerCuratorCoordinator.statusExists(task.getId())) {
log.warn("Got task %s that I am already running...", task.getId());
......
......@@ -35,6 +35,7 @@ import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.CuratorConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentPusher;
......@@ -43,7 +44,9 @@ import com.metamx.druid.loading.S3DataSegmentPusherConfig;
import com.metamx.druid.loading.S3SegmentKiller;
import com.metamx.druid.loading.SegmentKiller;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.actions.RemoteTaskActionClient;
import com.metamx.druid.merger.common.actions.RemoteTaskActionClientFactory;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
......@@ -106,7 +109,8 @@ public class WorkerNode extends RegisteringNode
private ServiceEmitter emitter = null;
private TaskConfig taskConfig = null;
private WorkerConfig workerConfig = null;
private TaskToolbox taskToolbox = null;
private DataSegmentPusher segmentPusher = null;
private TaskToolboxFactory taskToolboxFactory = null;
private CuratorFramework curatorFramework = null;
private ServiceDiscovery serviceDiscovery = null;
private ServiceProvider coordinatorServiceProvider = null;
......@@ -149,9 +153,15 @@ public class WorkerNode extends RegisteringNode
return this;
}
public WorkerNode setTaskToolbox(TaskToolbox taskToolbox)
public WorkerNode setSegmentPusher(DataSegmentPusher segmentPusher)
{
this.taskToolbox = taskToolbox;
this.segmentPusher = segmentPusher;
return this;
}
public WorkerNode setTaskToolboxFactory(TaskToolboxFactory taskToolboxFactory)
{
this.taskToolboxFactory = taskToolboxFactory;
return this;
}
......@@ -195,6 +205,7 @@ public class WorkerNode extends RegisteringNode
initializeCuratorFramework();
initializeServiceDiscovery();
initializeCoordinatorServiceProvider();
initializeDataSegmentPusher();
initializeTaskToolbox();
initializeJacksonInjections();
initializeJacksonSubtypes();
......@@ -271,7 +282,7 @@ public class WorkerNode extends RegisteringNode
InjectableValues.Std injectables = new InjectableValues.Std();
injectables.addValue("s3Client", s3Service)
.addValue("segmentPusher", taskToolbox.getSegmentPusher());
.addValue("segmentPusher", segmentPusher);
jsonMapper.setInjectableValues(injectables);
}
......@@ -334,23 +345,23 @@ public class WorkerNode extends RegisteringNode
}
}
public void initializeDataSegmentPusher()
{
if (segmentPusher == null) {
segmentPusher = ServerInit.getSegmentPusher(props, configFactory, jsonMapper);
}
}
public void initializeTaskToolbox() throws S3ServiceException
{
if (taskToolbox == null) {
final DataSegmentPusher dataSegmentPusher = new S3DataSegmentPusher(
s3Service,
configFactory.build(S3DataSegmentPusherConfig.class),
jsonMapper
);
final SegmentKiller segmentKiller = new S3SegmentKiller(
s3Service
);
taskToolbox = new TaskToolbox(
if (taskToolboxFactory == null) {
final SegmentKiller segmentKiller = new S3SegmentKiller(s3Service);
taskToolboxFactory = new TaskToolboxFactory(
taskConfig,
new RemoteTaskActionClient(httpClient, coordinatorServiceProvider, jsonMapper),
new RemoteTaskActionClientFactory(httpClient, coordinatorServiceProvider, jsonMapper),
emitter,
s3Service,
dataSegmentPusher,
segmentPusher,
segmentKiller,
jsonMapper
);
......@@ -417,7 +428,7 @@ public class WorkerNode extends RegisteringNode
pathChildrenCache,
curatorFramework,
workerCuratorCoordinator,
taskToolbox,
taskToolboxFactory,
workerExec
);
lifecycle.addManagedInstance(taskMonitor);
......
......@@ -12,6 +12,7 @@ import com.metamx.druid.merger.TestTask;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
......@@ -280,7 +281,7 @@ public class RemoteTaskRunnerTest
new PathChildrenCache(cf, String.format("%s/worker1", tasksPath), true),
cf,
workerCuratorCoordinator,
new TaskToolbox(
new TaskToolboxFactory(
new TaskConfig()
{
@Override
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator;
import com.google.common.base.Optional;
......@@ -22,11 +41,11 @@ import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.SegmentKiller;
import com.metamx.druid.loading.SegmentLoadingException;
import com.metamx.druid.merger.common.TaskLock;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.LocalTaskActionClient;
import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.actions.LocalTaskActionClientFactory;
import com.metamx.druid.merger.common.actions.LockAcquireAction;
import com.metamx.druid.merger.common.actions.LockListAction;
import com.metamx.druid.merger.common.actions.LockReleaseAction;
......@@ -59,7 +78,6 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
......@@ -71,7 +89,7 @@ public class TaskLifecycleTest
private TaskQueue tq = null;
private TaskRunner tr = null;
private MockMergerDBCoordinator mdc = null;
private TaskToolbox tb = null;
private TaskToolboxFactory tb = null;
private TaskConsumer tc = null;
TaskStorageQueryAdapter tsqa = null;
......@@ -91,12 +109,12 @@ public class TaskLifecycleTest
tmp = Files.createTempDir();
ts = new LocalTaskStorage();
ts = new HeapMemoryTaskStorage();
tl = new TaskLockbox(ts);
tq = new TaskQueue(ts, tl);
mdc = newMockMDC();
tb = new TaskToolbox(
tb = new TaskToolboxFactory(
new TaskConfig()
{
@Override
......@@ -117,7 +135,7 @@ public class TaskLifecycleTest
return null;
}
},
new LocalTaskActionClient(ts, new TaskActionToolbox(tq, tl, mdc, newMockEmitter())),
new LocalTaskActionClientFactory(ts, new TaskActionToolbox(tq, tl, mdc, newMockEmitter())),
newMockEmitter(),
null, // s3 client
new DataSegmentPusher()
......@@ -137,16 +155,7 @@ public class TaskLifecycleTest
}
},
new DefaultObjectMapper()
)
{
@Override
public Map<DataSegment, File> getSegments(
Task task, List<DataSegment> segments
) throws SegmentLoadingException
{
return ImmutableMap.of();
}
};
);
tr = new LocalTaskRunner(
tb,
......@@ -239,11 +248,12 @@ public class TaskLifecycleTest
@Test
public void testKillTask() throws Exception
{
// TODO: Worst test ever
// This test doesn't actually do anything right now. We should actually put things into the Mocked coordinator
// Such that this test can test things...
final Task killTask = new KillTask("foo", new Interval("2010-01-02/P2D"));
final TaskStatus mergedStatus = runTask(killTask);
Assert.assertEquals("merged statusCode", TaskStatus.Status.SUCCESS, mergedStatus.getStatusCode());
final TaskStatus status = runTask(killTask);
Assert.assertEquals("merged statusCode", TaskStatus.Status.SUCCESS, status.getStatusCode());
Assert.assertEquals("num segments published", 0, mdc.getPublished().size());
Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size());
}
......@@ -273,8 +283,8 @@ public class TaskLifecycleTest
// Sort of similar to what realtime tasks do:
// Acquire lock for first interval
final Optional<TaskLock> lock1 = toolbox.getTaskActionClient().submit(new LockAcquireAction(this, interval1));
final List<TaskLock> locks1 = toolbox.getTaskActionClient().submit(new LockListAction(this));
final Optional<TaskLock> lock1 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval1));
final List<TaskLock> locks1 = toolbox.getTaskActionClient().submit(new LockListAction());
// (Confirm lock sanity)
Assert.assertTrue("lock1 present", lock1.isPresent());
......@@ -282,8 +292,8 @@ public class TaskLifecycleTest
Assert.assertEquals("locks1", ImmutableList.of(lock1.get()), locks1);
// Acquire lock for second interval
final Optional<TaskLock> lock2 = toolbox.getTaskActionClient().submit(new LockAcquireAction(this, interval2));
final List<TaskLock> locks2 = toolbox.getTaskActionClient().submit(new LockListAction(this));
final Optional<TaskLock> lock2 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval2));
final List<TaskLock> locks2 = toolbox.getTaskActionClient().submit(new LockListAction());
// (Confirm lock sanity)
Assert.assertTrue("lock2 present", lock2.isPresent());
......@@ -294,7 +304,6 @@ public class TaskLifecycleTest
toolbox.getTaskActionClient()
.submit(
new SegmentInsertAction(
this,
ImmutableSet.of(
DataSegment.builder()
.dataSource("foo")
......@@ -306,8 +315,8 @@ public class TaskLifecycleTest
);
// Release first lock
toolbox.getTaskActionClient().submit(new LockReleaseAction(this, interval1));
final List<TaskLock> locks3 = toolbox.getTaskActionClient().submit(new LockListAction(this));
toolbox.getTaskActionClient().submit(new LockReleaseAction(interval1));
final List<TaskLock> locks3 = toolbox.getTaskActionClient().submit(new LockListAction());
// (Confirm lock sanity)
Assert.assertEquals("locks3", ImmutableList.of(lock2.get()), locks3);
......@@ -316,7 +325,6 @@ public class TaskLifecycleTest
toolbox.getTaskActionClient()
.submit(
new SegmentInsertAction(
this,
ImmutableSet.of(
DataSegment.builder()
.dataSource("foo")
......@@ -328,8 +336,8 @@ public class TaskLifecycleTest
);
// Release second lock
toolbox.getTaskActionClient().submit(new LockReleaseAction(this, interval2));
final List<TaskLock> locks4 = toolbox.getTaskActionClient().submit(new LockListAction(this));
toolbox.getTaskActionClient().submit(new LockReleaseAction(interval2));
final List<TaskLock> locks4 = toolbox.getTaskActionClient().submit(new LockListAction());
// (Confirm lock sanity)
Assert.assertEquals("locks4", ImmutableList.<TaskLock>of(), locks4);
......@@ -363,7 +371,7 @@ public class TaskLifecycleTest
{
final TaskLock myLock = Iterables.getOnlyElement(
toolbox.getTaskActionClient()
.submit(new LockListAction(this))
.submit(new LockListAction())
);
final DataSegment segment = DataSegment.builder()
......@@ -372,7 +380,7 @@ public class TaskLifecycleTest
.version(myLock.getVersion())
.build();
toolbox.getTaskActionClient().submit(new SegmentInsertAction(this, ImmutableSet.of(segment)));
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
return TaskStatus.success(getId());
}
};
......@@ -398,10 +406,7 @@ public class TaskLifecycleTest
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final TaskLock myLock = Iterables.getOnlyElement(
toolbox.getTaskActionClient()
.submit(new LockListAction(this))
);
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
final DataSegment segment = DataSegment.builder()
.dataSource("ds")
......@@ -409,7 +414,7 @@ public class TaskLifecycleTest
.version(myLock.getVersion())
.build();
toolbox.getTaskActionClient().submit(new SegmentInsertAction(this, ImmutableSet.of(segment)));
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
return TaskStatus.success(getId());
}
};
......@@ -435,10 +440,7 @@ public class TaskLifecycleTest
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final TaskLock myLock = Iterables.getOnlyElement(
toolbox.getTaskActionClient()
.submit(new LockListAction(this))
);
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
final DataSegment segment = DataSegment.builder()
.dataSource("ds")
......@@ -446,7 +448,7 @@ public class TaskLifecycleTest
.version(myLock.getVersion() + "1!!!1!!")
.build();
toolbox.getTaskActionClient().submit(new SegmentInsertAction(this, ImmutableSet.of(segment)));
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
return TaskStatus.success(getId());
}
};
......
......@@ -26,7 +26,8 @@ import com.google.common.collect.Sets;
import com.metamx.druid.merger.common.TaskLock;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.LocalTaskActionClient;
import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.actions.LocalTaskActionClientFactory;
import com.metamx.druid.merger.common.actions.SpawnTasksAction;
import com.metamx.druid.merger.common.actions.TaskActionToolbox;
import com.metamx.druid.merger.common.task.AbstractTask;
......@@ -43,7 +44,7 @@ public class TaskQueueTest
@Test
public void testEmptyQueue() throws Exception
{
final TaskStorage ts = new LocalTaskStorage();
final TaskStorage ts = new HeapMemoryTaskStorage();
final TaskLockbox tl = new TaskLockbox(ts);
final TaskQueue tq = newTaskQueue(ts, tl);
......@@ -65,7 +66,7 @@ public class TaskQueueTest
@Test
public void testAddRemove() throws Exception
{
final TaskStorage ts = new LocalTaskStorage();
final TaskStorage ts = new HeapMemoryTaskStorage();
final TaskLockbox tl = new TaskLockbox(ts);
final TaskQueue tq = newTaskQueue(ts, tl);
......@@ -154,12 +155,12 @@ public class TaskQueueTest
@Test
public void testContinues() throws Exception
{
final TaskStorage ts = new LocalTaskStorage();
final TaskStorage ts = new HeapMemoryTaskStorage();
final TaskLockbox tl = new TaskLockbox(ts);
final TaskQueue tq = newTaskQueue(ts, tl);
final TaskToolbox tb = new TaskToolbox(
final TaskToolboxFactory tb = new TaskToolboxFactory(
null,
new LocalTaskActionClient(ts, new TaskActionToolbox(tq, tl, null, null)),
new LocalTaskActionClientFactory(ts, new TaskActionToolbox(tq, tl, null, null)),
null,
null,
null,
......@@ -181,7 +182,7 @@ public class TaskQueueTest
Assert.assertNull("poll #2", tq.poll());
// report T1 done. Should cause T0 to be created
tq.notify(t1, t1.run(tb));
tq.notify(t1, t1.run(tb.build(t1)));
Assert.assertTrue("T0 isPresent (#2)", ts.getStatus("T0").isPresent());
Assert.assertTrue("T0 isRunnable (#2)", ts.getStatus("T0").get().isRunnable());
......@@ -195,7 +196,7 @@ public class TaskQueueTest
Assert.assertNull("poll #4", tq.poll());
// report T0 done. Should cause T0, T1 to be marked complete
tq.notify(t0, t0.run(tb));
tq.notify(t0, t0.run(tb.build(t0)));
Assert.assertTrue("T0 isPresent (#3)", ts.getStatus("T0").isPresent());
Assert.assertTrue("T0 isRunnable (#3)", !ts.getStatus("T0").get().isRunnable());
......@@ -211,12 +212,12 @@ public class TaskQueueTest
@Test
public void testConcurrency() throws Exception
{
final TaskStorage ts = new LocalTaskStorage();
final TaskStorage ts = new HeapMemoryTaskStorage();
final TaskLockbox tl = new TaskLockbox(ts);
final TaskQueue tq = newTaskQueue(ts, tl);
final TaskToolbox tb = new TaskToolbox(
final TaskToolboxFactory tb = new TaskToolboxFactory(
null,
new LocalTaskActionClient(ts, new TaskActionToolbox(tq, tl, null, null)),
new LocalTaskActionClientFactory(ts, new TaskActionToolbox(tq, tl, null, null)),
null,
null,
null,
......@@ -248,7 +249,7 @@ public class TaskQueueTest
Thread.sleep(5);
// Finish t0
tq.notify(t0, t0.run(tb));
tq.notify(t0, t0.run(tb.build(t0)));
// take max number of tasks
final Set<String> taken = Sets.newHashSet();
......@@ -280,7 +281,7 @@ public class TaskQueueTest
Assert.assertNull("null poll #2", tq.poll());
// Finish t3
tq.notify(t3, t3.run(tb));
tq.notify(t3, t3.run(tb.build(t3)));
// We should be able to get t2 now
final Task wt2 = tq.poll();
......@@ -291,7 +292,7 @@ public class TaskQueueTest
Assert.assertNull("null poll #3", tq.poll());
// Finish t2
tq.notify(t2, t2.run(tb));
tq.notify(t2, t2.run(tb.build(t2)));
// We should be able to get t4
// And it should be in group G0, but that group should have a different version than last time
......@@ -305,14 +306,14 @@ public class TaskQueueTest
Assert.assertNotSame("wt4 version", wt2Lock.getVersion(), wt4Lock.getVersion());
// Kind of done testing at this point, but let's finish t4 anyway
tq.notify(t4, t4.run(tb));
tq.notify(t4, t4.run(tb.build(t4)));
Assert.assertNull("null poll #4", tq.poll());
}
@Test
public void testBootstrap() throws Exception
{
final TaskStorage storage = new LocalTaskStorage();
final TaskStorage storage = new HeapMemoryTaskStorage();
final TaskLockbox lockbox = new TaskLockbox(storage);
storage.insert(newTask("T1", "G1", "bar", new Interval("2011-01-01/P1D")), TaskStatus.running("T1"));
......@@ -374,7 +375,7 @@ public class TaskQueueTest
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
toolbox.getTaskActionClient().submit(new SpawnTasksAction(this, nextTasks));
toolbox.getTaskActionClient().submit(new SpawnTasksAction(nextTasks));
return TaskStatus.success(id);
}
};
......
......@@ -165,17 +165,17 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.1.2</version>
<version>2.1.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.1.3</version>
<version>2.1.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.1.4-mmx-2</version>
<version>2.1.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
......@@ -190,12 +190,12 @@
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-smile</artifactId>
<version>2.1.3</version>
<version>2.1.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-json-provider</artifactId>
<version>2.1.3</version>
<version>2.1.4</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
......
......@@ -45,25 +45,15 @@ import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.http.QueryServlet;
import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.LocalDataSegmentPusher;
import com.metamx.druid.loading.LocalDataSegmentPusherConfig;
import com.metamx.druid.loading.S3DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentPusherConfig;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.Monitor;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory;
......@@ -259,31 +249,7 @@ public class RealtimeNode extends BaseServerNode<RealtimeNode>
private void initializeSegmentPusher()
{
if (dataSegmentPusher == null) {
final Properties props = getProps();
if (Boolean.parseBoolean(props.getProperty("druid.pusher.local", "false"))) {
dataSegmentPusher = new LocalDataSegmentPusher(
getConfigFactory().build(LocalDataSegmentPusherConfig.class), getJsonMapper()
);
}
else {
final RestS3Service s3Client;
try {
s3Client = new RestS3Service(
new AWSCredentials(
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
)
);
}
catch (S3ServiceException e) {
throw Throwables.propagate(e);
}
dataSegmentPusher = new S3DataSegmentPusher(
s3Client, getConfigFactory().build(S3DataSegmentPusherConfig.class), getJsonMapper()
);
}
dataSegmentPusher = ServerInit.getSegmentPusher(getProps(), getConfigFactory(), getJsonMapper());
}
}
......
......@@ -19,17 +19,24 @@
package com.metamx.druid.initialization;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.DruidProcessingConfig;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.DelegatingSegmentLoader;
import com.metamx.druid.loading.LocalDataSegmentPuller;
import com.metamx.druid.loading.LocalDataSegmentPusher;
import com.metamx.druid.loading.LocalDataSegmentPusherConfig;
import com.metamx.druid.loading.MMappedQueryableIndexFactory;
import com.metamx.druid.loading.QueryableIndexFactory;
import com.metamx.druid.loading.S3DataSegmentPuller;
import com.metamx.druid.loading.S3DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentPusherConfig;
import com.metamx.druid.loading.SegmentLoaderConfig;
import com.metamx.druid.loading.SingleSegmentLoader;
import com.metamx.druid.query.group.GroupByQueryEngine;
......@@ -48,12 +55,16 @@ import com.metamx.druid.query.timeboundary.TimeBoundaryQuery;
import com.metamx.druid.query.timeboundary.TimeBoundaryQueryRunnerFactory;
import com.metamx.druid.query.timeseries.TimeseriesQuery;
import com.metamx.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import com.metamx.druid.utils.PropUtils;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
import org.skife.config.ConfigurationObjectFactory;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
/**
......@@ -145,6 +156,34 @@ public class ServerInit
return queryRunners;
}
public static DataSegmentPusher getSegmentPusher(
final Properties props,
final ConfigurationObjectFactory configFactory,
final ObjectMapper jsonMapper
)
{
if (Boolean.parseBoolean(props.getProperty("druid.pusher.local", "false"))) {
return new LocalDataSegmentPusher(configFactory.build(LocalDataSegmentPusherConfig.class), jsonMapper);
}
else {
final RestS3Service s3Client;
try {
s3Client = new RestS3Service(
new AWSCredentials(
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
)
);
}
catch (S3ServiceException e) {
throw Throwables.propagate(e);
}
return new S3DataSegmentPusher(s3Client, configFactory.build(S3DataSegmentPusherConfig.class), jsonMapper);
}
}
private static class ComputeScratchPool extends StupidPool<ByteBuffer>
{
private static final Logger log = new Logger(ComputeScratchPool.class);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册