提交 566a3a61 编写于 作者: G Gian Merlino

Indexing service: Break up segment actions

Each one now one operates on at most a collection of segments that comprise
a single partition. The main purpose of this change is to prevent audit log
payload sizes from getting out of control.
上级 2d2fa319
......@@ -20,10 +20,15 @@
package io.druid.indexing.common;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.ServerView;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
......@@ -37,10 +42,14 @@ import io.druid.segment.loading.SegmentLoader;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
/**
......@@ -167,7 +176,7 @@ public class TaskToolbox
return objectMapper;
}
public Map<DataSegment, File> getSegments(List<DataSegment> segments)
public Map<DataSegment, File> fetchSegments(List<DataSegment> segments)
throws SegmentLoadingException
{
Map<DataSegment, File> retVal = Maps.newLinkedHashMap();
......@@ -178,6 +187,25 @@ public class TaskToolbox
return retVal;
}
public void pushSegments(Iterable<DataSegment> segments) throws IOException {
// Request segment pushes for each set
final Multimap<Interval, DataSegment> segmentMultimap = Multimaps.index(
segments,
new Function<DataSegment, Interval>()
{
@Override
public Interval apply(DataSegment segment)
{
return segment.getInterval();
}
}
);
for (final Collection<DataSegment> segmentCollection : segmentMultimap.asMap().values()) {
getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(segmentCollection)));
}
}
public File getTaskWorkDir()
{
return taskWorkDir;
......
......@@ -19,6 +19,7 @@
package io.druid.indexing.common.actions;
import com.metamx.common.ISE;
import com.metamx.emitter.EmittingLogger;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.TaskStorage;
......@@ -45,21 +46,21 @@ public class LocalTaskActionClient implements TaskActionClient
{
log.info("Performing action for task[%s]: %s", task.getId(), taskAction);
final RetType ret = taskAction.perform(task, toolbox);
if (taskAction.isAudited()) {
// Add audit log
try {
storage.addAuditLog(task, taskAction);
}
catch (Exception e) {
final String actionClass = taskAction.getClass().getName();
log.makeAlert(e, "Failed to record action in audit log")
.addData("task", task.getId())
.addData("actionClass", taskAction.getClass().getName())
.addData("actionClass", actionClass)
.emit();
throw new ISE(e, "Failed to record action [%s] in audit log", actionClass);
}
}
return ret;
return taskAction.perform(task, toolbox);
}
}
......@@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet;
import com.metamx.common.ISE;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.task.Task;
import io.druid.timeline.DataSegment;
......@@ -80,9 +79,7 @@ public class SegmentInsertAction implements TaskAction<Set<DataSegment>>
@Override
public Set<DataSegment> perform(Task task, TaskActionToolbox toolbox) throws IOException
{
if(!toolbox.taskLockCoversSegments(task, segments, allowOlderVersions)) {
throw new ISE("Segments not covered by locks for task[%s]: %s", task.getId(), segments);
}
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
final Set<DataSegment> retVal = toolbox.getIndexerDBCoordinator().announceHistoricalSegments(segments);
......
......@@ -5,7 +5,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet;
import com.metamx.common.ISE;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.task.Task;
import io.druid.timeline.DataSegment;
......@@ -42,10 +41,7 @@ public class SegmentMetadataUpdateAction implements TaskAction<Void>
Task task, TaskActionToolbox toolbox
) throws IOException
{
if(!toolbox.taskLockCoversSegments(task, segments, true)) {
throw new ISE("Segments not covered by locks for task: %s", task.getId());
}
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
toolbox.getIndexerDBCoordinator().updateSegmentMetadata(segments);
// Emit metrics
......
......@@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet;
import com.metamx.common.ISE;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.task.Task;
import io.druid.timeline.DataSegment;
......@@ -59,10 +58,7 @@ public class SegmentNukeAction implements TaskAction<Void>
@Override
public Void perform(Task task, TaskActionToolbox toolbox) throws IOException
{
if(!toolbox.taskLockCoversSegments(task, segments, true)) {
throw new ISE("Segments not covered by locks for task: %s", task.getId());
}
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
toolbox.getIndexerDBCoordinator().deleteSegments(segments);
// Emit metrics
......
......@@ -19,9 +19,11 @@
package io.druid.indexing.common.actions;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.task.Task;
......@@ -65,6 +67,42 @@ public class TaskActionToolbox
return emitter;
}
public boolean segmentsAreFromSamePartitionSet(
final Set<DataSegment> segments
)
{
// Verify that these segments are all in the same partition set
Preconditions.checkArgument(!segments.isEmpty(), "segments nonempty");
final DataSegment firstSegment = segments.iterator().next();
for (final DataSegment segment : segments) {
if (!segment.getDataSource().equals(firstSegment.getDataSource())) {
return false;
}
if (!segment.getInterval().equals(firstSegment.getInterval())) {
return false;
}
if (!segment.getVersion().equals(firstSegment.getVersion())) {
return false;
}
}
return true;
}
public void verifyTaskLocksAndSinglePartitionSettitude(
final Task task,
final Set<DataSegment> segments,
final boolean allowOlderVersions
)
{
if (!taskLockCoversSegments(task, segments, allowOlderVersions)) {
throw new ISE("Segments not covered by locks for task: %s", task.getId());
}
if (!segmentsAreFromSamePartitionSet(segments)) {
throw new ISE("Segments are not in the same partition set: %s", segments);
}
}
public boolean taskLockCoversSegments(
final Task task,
final Set<DataSegment> segments,
......
......@@ -22,6 +22,7 @@ package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
......@@ -103,7 +104,7 @@ public class DeleteTask extends AbstractFixedIntervalTask
segment.getVersion()
);
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment)));
toolbox.pushSegments(ImmutableList.of(uploadedSegment));
return TaskStatus.success(getId());
}
......
......@@ -24,10 +24,14 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.api.client.util.Lists;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.metamx.common.logger.Logger;
import io.druid.common.utils.JodaUtils;
import io.druid.indexer.HadoopDruidIndexerConfig;
......@@ -47,12 +51,15 @@ import io.tesla.aether.internal.DefaultTeslaAether;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
public class HadoopIndexTask extends AbstractFixedIntervalTask
{
......@@ -180,14 +187,10 @@ public class HadoopIndexTask extends AbstractFixedIntervalTask
if (segments != null) {
List<DataSegment> publishedSegments = toolbox.getObjectMapper().readValue(
segments, new TypeReference<List<DataSegment>>()
{
}
segments,
new TypeReference<List<DataSegment>>() {}
);
// Request segment pushes
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(publishedSegments)));
// Done
toolbox.pushSegments(publishedSegments);
return TaskStatus.success(getId());
} else {
return TaskStatus.failure(getId());
......
......@@ -156,7 +156,7 @@ public class IndexTask extends AbstractFixedIntervalTask
segments.add(segment);
}
}
toolbox.getTaskActionClient().submit(new SegmentInsertAction(segments));
toolbox.pushSegments(segments);
return TaskStatus.success(getId());
}
......
......@@ -97,11 +97,9 @@ public class KillTask extends AbstractFixedIntervalTask
// Kill segments
for (DataSegment segment : unusedSegments) {
toolbox.getDataSegmentKiller().kill(segment);
toolbox.getTaskActionClient().submit(new SegmentNukeAction(ImmutableSet.of(segment)));
}
// Remove metadata for these segments
toolbox.getTaskActionClient().submit(new SegmentNukeAction(ImmutableSet.copyOf(unusedSegments)));
return TaskStatus.success(getId());
}
}
......@@ -27,6 +27,7 @@ import com.google.common.base.Joiner;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
......@@ -142,7 +143,7 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
);
// download segments to merge
final Map<DataSegment, File> gettedSegments = toolbox.getSegments(segments);
final Map<DataSegment, File> gettedSegments = toolbox.fetchSegments(segments);
// merge files together
final File fileToUpload = merge(gettedSegments, new File(taskDir, "merged"));
......@@ -165,7 +166,7 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart));
emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize()));
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment)));
toolbox.pushSegments(ImmutableList.of(uploadedSegment));
return TaskStatus.success(getId());
}
......
......@@ -98,18 +98,12 @@ public class MoveTask extends AbstractFixedIntervalTask
log.info("OK to move segment: %s", unusedSegment.getIdentifier());
}
List<DataSegment> movedSegments = Lists.newLinkedList();
// Move segments
for (DataSegment segment : unusedSegments) {
movedSegments.add(toolbox.getDataSegmentMover().move(segment, targetLoadSpec));
final DataSegment movedSegment = toolbox.getDataSegmentMover().move(segment, targetLoadSpec);
toolbox.getTaskActionClient().submit(new SegmentMetadataUpdateAction(ImmutableSet.of(movedSegment)));
}
// Update metadata for moved segments
toolbox.getTaskActionClient().submit(new SegmentMetadataUpdateAction(
ImmutableSet.copyOf(movedSegments)
));
return TaskStatus.success(getId());
}
......
......@@ -418,7 +418,7 @@ public class RealtimeIndexTask extends AbstractTask
@Override
public void publishSegment(DataSegment segment) throws IOException
{
taskToolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
taskToolbox.pushSegments(ImmutableList.of(segment));
}
}
}
......@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
......@@ -248,7 +249,7 @@ public class VersionConverterTask extends AbstractFixedIntervalTask
}
}
final Map<DataSegment, File> localSegments = toolbox.getSegments(Arrays.asList(segment));
final Map<DataSegment, File> localSegments = toolbox.fetchSegments(Arrays.asList(segment));
final File location = localSegments.get(segment);
final File outLocation = new File(location, "v9_out");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册