提交 ac2ca0e4 编写于 作者: X Xavier Léauté

separate move and archive tasks

上级 6b903720
......@@ -29,6 +29,7 @@ import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPusher;
......@@ -53,6 +54,7 @@ public class TaskToolbox
private final ServiceEmitter emitter;
private final DataSegmentPusher segmentPusher;
private final DataSegmentKiller dataSegmentKiller;
private final DataSegmentArchiver dataSegmentArchiver;
private final DataSegmentMover dataSegmentMover;
private final DataSegmentAnnouncer segmentAnnouncer;
private final ServerView newSegmentServerView;
......@@ -71,6 +73,7 @@ public class TaskToolbox
DataSegmentPusher segmentPusher,
DataSegmentKiller dataSegmentKiller,
DataSegmentMover dataSegmentMover,
DataSegmentArchiver dataSegmentArchiver,
DataSegmentAnnouncer segmentAnnouncer,
ServerView newSegmentServerView,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
......@@ -88,6 +91,7 @@ public class TaskToolbox
this.segmentPusher = segmentPusher;
this.dataSegmentKiller = dataSegmentKiller;
this.dataSegmentMover = dataSegmentMover;
this.dataSegmentArchiver = dataSegmentArchiver;
this.segmentAnnouncer = segmentAnnouncer;
this.newSegmentServerView = newSegmentServerView;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
......@@ -128,6 +132,11 @@ public class TaskToolbox
return dataSegmentMover;
}
public DataSegmentArchiver getDataSegmentArchiver()
{
return dataSegmentArchiver;
}
public DataSegmentAnnouncer getSegmentAnnouncer()
{
return segmentAnnouncer;
......
......@@ -29,6 +29,7 @@ import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPusher;
......@@ -48,6 +49,7 @@ public class TaskToolboxFactory
private final DataSegmentPusher segmentPusher;
private final DataSegmentKiller dataSegmentKiller;
private final DataSegmentMover dataSegmentMover;
private final DataSegmentArchiver dataSegmentArchiver;
private final DataSegmentAnnouncer segmentAnnouncer;
private final ServerView newSegmentServerView;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
......@@ -64,6 +66,7 @@ public class TaskToolboxFactory
DataSegmentPusher segmentPusher,
DataSegmentKiller dataSegmentKiller,
DataSegmentMover dataSegmentMover,
DataSegmentArchiver dataSegmentArchiver,
DataSegmentAnnouncer segmentAnnouncer,
ServerView newSegmentServerView,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
......@@ -79,6 +82,7 @@ public class TaskToolboxFactory
this.segmentPusher = segmentPusher;
this.dataSegmentKiller = dataSegmentKiller;
this.dataSegmentMover = dataSegmentMover;
this.dataSegmentArchiver = dataSegmentArchiver;
this.segmentAnnouncer = segmentAnnouncer;
this.newSegmentServerView = newSegmentServerView;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
......@@ -100,6 +104,7 @@ public class TaskToolboxFactory
segmentPusher,
dataSegmentKiller,
dataSegmentMover,
dataSegmentArchiver,
segmentAnnouncer,
newSegmentServerView,
queryRunnerFactoryConglomerate,
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentListUnusedAction;
import io.druid.indexing.common.actions.SegmentMoveAction;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import java.util.List;
public class ArchiveTask extends AbstractFixedIntervalTask
{
private static final Logger log = new Logger(ArchiveTask.class);
public ArchiveTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
)
{
super(id, dataSource, interval);
}
@Override
public String getType()
{
return "archive";
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
// Confirm we have a lock (will throw if there isn't exactly one element)
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
if (!myLock.getDataSource().equals(getDataSource())) {
throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource());
}
if (!myLock.getInterval().equals(getInterval())) {
throw new ISE("WTF?! Lock interval[%s] != task interval[%s]", myLock.getInterval(), getInterval());
}
// List unused segments
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
.submit(new SegmentListUnusedAction(myLock.getDataSource(), myLock.getInterval()));
// Verify none of these segments have versions > lock version
for (final DataSegment unusedSegment : unusedSegments) {
if (unusedSegment.getVersion().compareTo(myLock.getVersion()) > 0) {
throw new ISE(
"WTF?! Unused segment[%s] has version[%s] > task version[%s]",
unusedSegment.getIdentifier(),
unusedSegment.getVersion(),
myLock.getVersion()
);
}
log.info("OK to archive segment: %s", unusedSegment.getIdentifier());
}
List<DataSegment> archivedSegments = Lists.newLinkedList();
// Move segments
for (DataSegment segment : unusedSegments) {
archivedSegments.add(toolbox.getDataSegmentArchiver().archive(segment));
}
// Update metadata for moved segments
toolbox.getTaskActionClient().submit(
new SegmentMoveAction(
ImmutableSet.copyOf(archivedSegments)
)
);
return TaskStatus.success(getId());
}
}
......@@ -35,16 +35,20 @@ import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import java.util.List;
import java.util.Map;
public class MoveTask extends AbstractFixedIntervalTask
{
private static final Logger log = new Logger(MoveTask.class);
private final Map<String, Object> targetLoadSpec;
@JsonCreator
public MoveTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
@JsonProperty("interval") Interval interval,
@JsonProperty("target") Map<String, Object> targetLoadSpec
)
{
super(
......@@ -52,6 +56,7 @@ public class MoveTask extends AbstractFixedIntervalTask
dataSource,
interval
);
this.targetLoadSpec = targetLoadSpec;
}
@Override
......@@ -97,7 +102,7 @@ public class MoveTask extends AbstractFixedIntervalTask
// Move segments
for (DataSegment segment : unusedSegments) {
movedSegments.add(toolbox.getDataSegmentMover().move(segment));
movedSegments.add(toolbox.getDataSegmentMover().move(segment, targetLoadSpec));
}
// Update metadata for moved segments
......
......@@ -46,6 +46,7 @@ import io.druid.query.QueryRunner;
@JsonSubTypes.Type(name = "delete", value = DeleteTask.class),
@JsonSubTypes.Type(name = "kill", value = KillTask.class),
@JsonSubTypes.Type(name = "move", value = MoveTask.class),
@JsonSubTypes.Type(name = "archive", value = ArchiveTask.class),
@JsonSubTypes.Type(name = "index", value = IndexTask.class),
@JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class),
@JsonSubTypes.Type(name = "index_realtime", value = RealtimeIndexTask.class),
......
......@@ -64,6 +64,7 @@ import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPuller;
......@@ -89,6 +90,7 @@ import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class TaskLifecycleTest
......@@ -162,11 +164,19 @@ public class TaskLifecycleTest
new DataSegmentMover()
{
@Override
public DataSegment move(DataSegment dataSegment) throws SegmentLoadingException
public DataSegment move(DataSegment dataSegment, Map<String, Object> targetLoadSpec) throws SegmentLoadingException
{
return dataSegment;
}
},
new DataSegmentArchiver()
{
@Override
public DataSegment archive(DataSegment segment) throws SegmentLoadingException
{
return segment;
}
},
null, // segment announcer
null, // new segment server view
null, // query runner factory conglomerate corporation unionized collective
......
......@@ -122,7 +122,7 @@ public class WorkerTaskMonitorTest
new ThreadPoolTaskRunner(
new TaskToolboxFactory(
new TaskConfig(tmp.toString(), null, null, 0),
null, null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory(
null, null, null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory(
new OmniSegmentLoader(
ImmutableMap.<String, DataSegmentPuller>of(
"local",
......
......@@ -46,13 +46,13 @@ public class S3DataSegmentArchiver extends S3DataSegmentMover implements DataSeg
public DataSegment archive(DataSegment segment) throws SegmentLoadingException
{
String targetS3Bucket = config.getArchiveBucket();
String targetS3Path = MapUtils.getString(segment.getLoadSpec(), "key");
String targetS3BaseKey = config.getArchiveBaseKey();
return move(
segment,
ImmutableMap.<String, Object>of(
"bucket", targetS3Bucket,
"key", targetS3Path
"baseKey", targetS3BaseKey
)
);
}
......
......@@ -26,8 +26,16 @@ public class S3DataSegmentArchiverConfig
@JsonProperty
public String archiveBucket = "";
@JsonProperty
public String archiveBaseKey = "";
public String getArchiveBucket()
{
return archiveBucket;
}
public String getArchiveBaseKey()
{
return archiveBaseKey;
}
}
......@@ -24,6 +24,7 @@ import com.google.inject.Inject;
import com.metamx.common.MapUtils;
import com.metamx.common.logger.Logger;
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import org.jets3t.service.ServiceException;
......@@ -56,14 +57,16 @@ public class S3DataSegmentMover implements DataSegmentMover
String s3DescriptorPath = S3Utils.descriptorPathForSegmentPath(s3Path);
final String targetS3Bucket = MapUtils.getString(targetLoadSpec, "bucket");
final String targetS3Path = MapUtils.getString(targetLoadSpec, "key");
final String targetS3BaseKey = MapUtils.getString(targetLoadSpec, "baseKey");
final String targetS3Path = S3Utils.constructSegmentPath(targetS3BaseKey, segment);
String targetS3DescriptorPath = S3Utils.descriptorPathForSegmentPath(targetS3Path);
if (targetS3Bucket.isEmpty()) {
throw new SegmentLoadingException("Target S3 bucket is not specified");
}
if (targetS3Path.isEmpty()) {
throw new SegmentLoadingException("Target S3 path is not specified");
throw new SegmentLoadingException("Target S3 baseKey is not specified");
}
if (s3Client.isObjectInBucket(s3Bucket, s3Path)) {
......
......@@ -20,7 +20,6 @@
package io.druid.storage.s3;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
......@@ -29,7 +28,6 @@ import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import io.druid.segment.SegmentUtils;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment;
import io.druid.utils.CompressionUtils;
import org.jets3t.service.ServiceException;
......@@ -45,7 +43,6 @@ import java.util.concurrent.Callable;
public class S3DataSegmentPusher implements DataSegmentPusher
{
private static final EmittingLogger log = new EmittingLogger(S3DataSegmentPusher.class);
private static final Joiner JOINER = Joiner.on("/").skipNulls();
private final RestS3Service s3Client;
private final S3DataSegmentPusherConfig config;
......@@ -73,10 +70,7 @@ public class S3DataSegmentPusher implements DataSegmentPusher
public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException
{
log.info("Uploading [%s] to S3", indexFilesDir);
final String outputKey = JOINER.join(
config.getBaseKey().isEmpty() ? null : config.getBaseKey(),
DataSegmentPusherUtil.getStorageDir(inSegment)
);
final String s3Path = S3Utils.constructSegmentPath(config.getBaseKey(), inSegment);
final File zipOutFile = File.createTempFile("druid", "index.zip");
final long indexSize = CompressionUtils.zip(indexFilesDir, zipOutFile);
......@@ -90,7 +84,6 @@ public class S3DataSegmentPusher implements DataSegmentPusher
S3Object toPush = new S3Object(zipOutFile);
final String outputBucket = config.getBucket();
final String s3Path = outputKey + "/index.zip";
final String s3DescriptorPath = S3Utils.descriptorPathForSegmentPath(s3Path);
toPush.setBucketName(outputBucket);
......
......@@ -19,9 +19,12 @@
package io.druid.storage.s3;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.metamx.common.RetryUtils;
import org.jets3t.service.ServiceException;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object;
......@@ -34,6 +37,8 @@ import java.util.concurrent.Callable;
*/
public class S3Utils
{
private static final Joiner JOINER = Joiner.on("/").skipNulls();
public static void closeStreamsQuietly(S3Object s3Obj)
{
if (s3Obj == null) {
......@@ -96,6 +101,15 @@ public class S3Utils
return true;
}
public static String constructSegmentPath(String baseKey, DataSegment segment)
{
return JOINER.join(
baseKey.isEmpty() ? null : baseKey,
DataSegmentPusherUtil.getStorageDir(segment)
) + "/index.zip";
}
public static String descriptorPathForSegmentPath(String s3Path)
{
return s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json";
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.loading;
import com.google.inject.Inject;
import com.metamx.common.MapUtils;
import io.druid.timeline.DataSegment;
import java.util.Map;
public class OmniDataSegmentArchiver implements DataSegmentArchiver
{
private final Map<String, DataSegmentArchiver> archivers;
@Inject
public OmniDataSegmentArchiver(
Map<String, DataSegmentArchiver> archivers
)
{
this.archivers = archivers;
}
@Override
public DataSegment archive(DataSegment segment) throws SegmentLoadingException
{
return getArchiver(segment).archive(segment);
}
private DataSegmentArchiver getArchiver(DataSegment segment) throws SegmentLoadingException
{
String type = MapUtils.getString(segment.getLoadSpec(), "type");
DataSegmentArchiver archiver = archivers.get(type);
if (archiver == null) {
throw new SegmentLoadingException("Unknown loader type[%s]. Known types are %s", type, archivers.keySet());
}
return archiver;
}
}
......@@ -38,9 +38,9 @@ public class OmniDataSegmentMover implements DataSegmentMover
}
@Override
public DataSegment move(DataSegment segment) throws SegmentLoadingException
public DataSegment move(DataSegment segment, Map<String, Object> targetLoadSpec) throws SegmentLoadingException
{
return getMover(segment).move(segment);
return getMover(segment).move(segment, targetLoadSpec);
}
private DataSegmentMover getMover(DataSegment segment) throws SegmentLoadingException
......
......@@ -60,8 +60,10 @@ import io.druid.indexing.worker.executor.ChatHandlerResource;
import io.druid.indexing.worker.executor.ExecutorLifecycle;
import io.druid.indexing.worker.executor.ExecutorLifecycleConfig;
import io.druid.query.QuerySegmentWalker;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.OmniDataSegmentArchiver;
import io.druid.segment.loading.OmniDataSegmentKiller;
import io.druid.segment.loading.OmniDataSegmentMover;
import io.druid.segment.loading.SegmentLoaderConfig;
......@@ -133,6 +135,8 @@ public class CliPeon extends GuiceRunnable
binder.bind(DataSegmentKiller.class).to(OmniDataSegmentKiller.class).in(LazySingleton.class);
Binders.dataSegmentMoverBinder(binder);
binder.bind(DataSegmentMover.class).to(OmniDataSegmentMover.class).in(LazySingleton.class);
Binders.dataSegmentArchiverBinder(binder);
binder.bind(DataSegmentArchiver.class).to(OmniDataSegmentArchiver.class).in(LazySingleton.class);
binder.bind(ExecutorLifecycle.class).in(ManageLifecycle.class);
binder.bind(ExecutorLifecycleConfig.class).toInstance(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册