提交 23edba9e 编写于 作者: F fjy

Merge pull request #338 from metamx/indexing-service-bugs

Indexing service bugs
......@@ -20,10 +20,15 @@
package io.druid.indexing.common;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.ServerView;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
......@@ -37,10 +42,14 @@ import io.druid.segment.loading.SegmentLoader;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
/**
......@@ -167,7 +176,7 @@ public class TaskToolbox
return objectMapper;
}
public Map<DataSegment, File> getSegments(List<DataSegment> segments)
public Map<DataSegment, File> fetchSegments(List<DataSegment> segments)
throws SegmentLoadingException
{
Map<DataSegment, File> retVal = Maps.newLinkedHashMap();
......@@ -178,6 +187,25 @@ public class TaskToolbox
return retVal;
}
public void pushSegments(Iterable<DataSegment> segments) throws IOException {
// Request segment pushes for each set
final Multimap<Interval, DataSegment> segmentMultimap = Multimaps.index(
segments,
new Function<DataSegment, Interval>()
{
@Override
public Interval apply(DataSegment segment)
{
return segment.getInterval();
}
}
);
for (final Collection<DataSegment> segmentCollection : segmentMultimap.asMap().values()) {
getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(segmentCollection)));
}
}
public File getTaskWorkDir()
{
return taskWorkDir;
......
......@@ -19,6 +19,7 @@
package io.druid.indexing.common.actions;
import com.metamx.common.ISE;
import com.metamx.emitter.EmittingLogger;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.TaskStorage;
......@@ -45,21 +46,21 @@ public class LocalTaskActionClient implements TaskActionClient
{
log.info("Performing action for task[%s]: %s", task.getId(), taskAction);
final RetType ret = taskAction.perform(task, toolbox);
if (taskAction.isAudited()) {
// Add audit log
try {
storage.addAuditLog(task, taskAction);
}
catch (Exception e) {
final String actionClass = taskAction.getClass().getName();
log.makeAlert(e, "Failed to record action in audit log")
.addData("task", task.getId())
.addData("actionClass", taskAction.getClass().getName())
.addData("actionClass", actionClass)
.emit();
throw new ISE(e, "Failed to record action [%s] in audit log", actionClass);
}
}
return ret;
return taskAction.perform(task, toolbox);
}
}
......@@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet;
import com.metamx.common.ISE;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.task.Task;
import io.druid.timeline.DataSegment;
......@@ -80,9 +79,7 @@ public class SegmentInsertAction implements TaskAction<Set<DataSegment>>
@Override
public Set<DataSegment> perform(Task task, TaskActionToolbox toolbox) throws IOException
{
if(!toolbox.taskLockCoversSegments(task, segments, allowOlderVersions)) {
throw new ISE("Segments not covered by locks for task[%s]: %s", task.getId(), segments);
}
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
final Set<DataSegment> retVal = toolbox.getIndexerDBCoordinator().announceHistoricalSegments(segments);
......
......@@ -5,7 +5,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet;
import com.metamx.common.ISE;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.task.Task;
import io.druid.timeline.DataSegment;
......@@ -42,10 +41,7 @@ public class SegmentMetadataUpdateAction implements TaskAction<Void>
Task task, TaskActionToolbox toolbox
) throws IOException
{
if(!toolbox.taskLockCoversSegments(task, segments, true)) {
throw new ISE("Segments not covered by locks for task: %s", task.getId());
}
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
toolbox.getIndexerDBCoordinator().updateSegmentMetadata(segments);
// Emit metrics
......
......@@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet;
import com.metamx.common.ISE;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.task.Task;
import io.druid.timeline.DataSegment;
......@@ -59,10 +58,7 @@ public class SegmentNukeAction implements TaskAction<Void>
@Override
public Void perform(Task task, TaskActionToolbox toolbox) throws IOException
{
if(!toolbox.taskLockCoversSegments(task, segments, true)) {
throw new ISE("Segments not covered by locks for task: %s", task.getId());
}
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
toolbox.getIndexerDBCoordinator().deleteSegments(segments);
// Emit metrics
......
......@@ -19,9 +19,11 @@
package io.druid.indexing.common.actions;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.task.Task;
......@@ -65,6 +67,38 @@ public class TaskActionToolbox
return emitter;
}
public boolean segmentsAreFromSamePartitionSet(
final Set<DataSegment> segments
)
{
// Verify that these segments are all in the same partition set
Preconditions.checkArgument(!segments.isEmpty(), "segments nonempty");
final DataSegment firstSegment = segments.iterator().next();
for (final DataSegment segment : segments) {
if (!segment.getDataSource().equals(firstSegment.getDataSource())
|| !segment.getInterval().equals(firstSegment.getInterval())
|| !segment.getVersion().equals(firstSegment.getVersion())) {
return false;
}
}
return true;
}
public void verifyTaskLocksAndSinglePartitionSettitude(
final Task task,
final Set<DataSegment> segments,
final boolean allowOlderVersions
)
{
if (!taskLockCoversSegments(task, segments, allowOlderVersions)) {
throw new ISE("Segments not covered by locks for task: %s", task.getId());
}
if (!segmentsAreFromSamePartitionSet(segments)) {
throw new ISE("Segments are not in the same partition set: %s", segments);
}
}
public boolean taskLockCoversSegments(
final Task task,
final Set<DataSegment> segments,
......
......@@ -21,6 +21,7 @@ package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.druid.indexing.common.actions.LockTryAcquireAction;
import io.druid.indexing.common.actions.TaskActionClient;
import org.joda.time.Interval;
......@@ -58,7 +59,8 @@ public abstract class AbstractFixedIntervalTask extends AbstractTask
)
{
super(id, groupId, taskResource, dataSource);
this.interval = interval;
this.interval = Preconditions.checkNotNull(interval, "interval");
Preconditions.checkArgument(interval.toDurationMillis() > 0, "interval empty");
}
@Override
......
......@@ -22,6 +22,7 @@ package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
......@@ -103,7 +104,7 @@ public class DeleteTask extends AbstractFixedIntervalTask
segment.getVersion()
);
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment)));
toolbox.pushSegments(ImmutableList.of(uploadedSegment));
return TaskStatus.success(getId());
}
......
......@@ -24,10 +24,14 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.api.client.util.Lists;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.metamx.common.logger.Logger;
import io.druid.common.utils.JodaUtils;
import io.druid.indexer.HadoopDruidIndexerConfig;
......@@ -47,12 +51,15 @@ import io.tesla.aether.internal.DefaultTeslaAether;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
public class HadoopIndexTask extends AbstractFixedIntervalTask
{
......@@ -180,14 +187,10 @@ public class HadoopIndexTask extends AbstractFixedIntervalTask
if (segments != null) {
List<DataSegment> publishedSegments = toolbox.getObjectMapper().readValue(
segments, new TypeReference<List<DataSegment>>()
{
}
segments,
new TypeReference<List<DataSegment>>() {}
);
// Request segment pushes
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(publishedSegments)));
// Done
toolbox.pushSegments(publishedSegments);
return TaskStatus.success(getId());
} else {
return TaskStatus.failure(getId());
......
......@@ -156,7 +156,7 @@ public class IndexTask extends AbstractFixedIntervalTask
segments.add(segment);
}
}
toolbox.getTaskActionClient().submit(new SegmentInsertAction(segments));
toolbox.pushSegments(segments);
return TaskStatus.success(getId());
}
......
......@@ -97,11 +97,9 @@ public class KillTask extends AbstractFixedIntervalTask
// Kill segments
for (DataSegment segment : unusedSegments) {
toolbox.getDataSegmentKiller().kill(segment);
toolbox.getTaskActionClient().submit(new SegmentNukeAction(ImmutableSet.of(segment)));
}
// Remove metadata for these segments
toolbox.getTaskActionClient().submit(new SegmentNukeAction(ImmutableSet.copyOf(unusedSegments)));
return TaskStatus.success(getId());
}
}
......@@ -27,6 +27,7 @@ import com.google.common.base.Joiner;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
......@@ -142,7 +143,7 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
);
// download segments to merge
final Map<DataSegment, File> gettedSegments = toolbox.getSegments(segments);
final Map<DataSegment, File> gettedSegments = toolbox.fetchSegments(segments);
// merge files together
final File fileToUpload = merge(gettedSegments, new File(taskDir, "merged"));
......@@ -165,7 +166,7 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart));
emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize()));
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment)));
toolbox.pushSegments(ImmutableList.of(uploadedSegment));
return TaskStatus.success(getId());
}
......
......@@ -98,18 +98,12 @@ public class MoveTask extends AbstractFixedIntervalTask
log.info("OK to move segment: %s", unusedSegment.getIdentifier());
}
List<DataSegment> movedSegments = Lists.newLinkedList();
// Move segments
for (DataSegment segment : unusedSegments) {
movedSegments.add(toolbox.getDataSegmentMover().move(segment, targetLoadSpec));
final DataSegment movedSegment = toolbox.getDataSegmentMover().move(segment, targetLoadSpec);
toolbox.getTaskActionClient().submit(new SegmentMetadataUpdateAction(ImmutableSet.of(movedSegment)));
}
// Update metadata for moved segments
toolbox.getTaskActionClient().submit(new SegmentMetadataUpdateAction(
ImmutableSet.copyOf(movedSegments)
));
return TaskStatus.success(getId());
}
......
......@@ -418,7 +418,7 @@ public class RealtimeIndexTask extends AbstractTask
@Override
public void publishSegment(DataSegment segment) throws IOException
{
taskToolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
taskToolbox.pushSegments(ImmutableList.of(segment));
}
}
}
......@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
......@@ -248,7 +249,7 @@ public class VersionConverterTask extends AbstractFixedIntervalTask
}
}
final Map<DataSegment, File> localSegments = toolbox.getSegments(Arrays.asList(segment));
final Map<DataSegment, File> localSegments = toolbox.fetchSegments(Arrays.asList(segment));
final File location = localSegments.get(segment);
final File outLocation = new File(location, "v9_out");
......
......@@ -489,7 +489,18 @@ public class DbTaskStorage implements TaskStorage
final Map<Long, TaskLock> retMap = Maps.newHashMap();
for (final Map<String, Object> row : dbTaskLocks) {
retMap.put((Long) row.get("id"), jsonMapper.readValue((byte[]) row.get("lock_payload"), TaskLock.class));
try {
retMap.put(
(Long) row.get("id"),
jsonMapper.readValue((byte[]) row.get("lock_payload"), TaskLock.class)
);
}
catch (Exception e) {
log.makeAlert(e, "Failed to deserialize TaskLock")
.addData("task", taskid)
.addData("lockPayload", row)
.emit();
}
}
return retMap;
}
......
......@@ -42,6 +42,7 @@ import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.io.IOException;
......@@ -169,39 +170,39 @@ public class IndexerDBCoordinator
private boolean announceHistoricalSegment(final Handle handle, final DataSegment segment) throws IOException
{
try {
final List<Map<String, Object>> exists = handle.createQuery(
String.format(
"SELECT id FROM %s WHERE id = :identifier",
dbTables.getSegmentsTable()
)
).bind(
"identifier",
segment.getIdentifier()
).list();
if (!exists.isEmpty()) {
if (segmentExists(handle, segment)) {
log.info("Found [%s] in DB, not updating DB", segment.getIdentifier());
return false;
}
handle.createStatement(
String.format(
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
dbTables.getSegmentsTable()
)
)
.bind("id", segment.getIdentifier())
.bind("dataSource", segment.getDataSource())
.bind("created_date", new DateTime().toString())
.bind("start", segment.getInterval().getStart().toString())
.bind("end", segment.getInterval().getEnd().toString())
.bind("partitioned", segment.getShardSpec().getPartitionNum())
.bind("version", segment.getVersion())
.bind("used", true)
.bind("payload", jsonMapper.writeValueAsString(segment))
.execute();
log.info("Published segment [%s] to DB", segment.getIdentifier());
// Try/catch to work around races due to SELECT -> INSERT. Avoid ON DUPLICATE KEY since it's not portable.
try {
handle.createStatement(
String.format(
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
dbTables.getSegmentsTable()
)
)
.bind("id", segment.getIdentifier())
.bind("dataSource", segment.getDataSource())
.bind("created_date", new DateTime().toString())
.bind("start", segment.getInterval().getStart().toString())
.bind("end", segment.getInterval().getEnd().toString())
.bind("partitioned", segment.getShardSpec().getPartitionNum())
.bind("version", segment.getVersion())
.bind("used", true)
.bind("payload", jsonMapper.writeValueAsString(segment))
.execute();
log.info("Published segment [%s] to DB", segment.getIdentifier());
} catch (Exception e) {
if (e.getCause() instanceof SQLException && segmentExists(handle, segment)) {
log.info("Found [%s] in DB, not updating DB", segment.getIdentifier());
} else {
throw e;
}
}
}
catch (IOException e) {
log.error(e, "Exception inserting into DB");
......@@ -211,6 +212,20 @@ public class IndexerDBCoordinator
return true;
}
private boolean segmentExists(final Handle handle, final DataSegment segment) {
final List<Map<String, Object>> exists = handle.createQuery(
String.format(
"SELECT id FROM %s WHERE id = :identifier",
dbTables.getSegmentsTable()
)
).bind(
"identifier",
segment.getIdentifier()
).list();
return !exists.isEmpty();
}
public void updateSegmentMetadata(final Set<DataSegment> segments) throws IOException
{
dbi.inTransaction(
......
......@@ -19,6 +19,7 @@
package io.druid.indexing.overlord;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
......@@ -109,6 +110,11 @@ public class TaskLockbox
for (final Pair<Task, TaskLock> taskAndLock : byVersionOrdering.sortedCopy(storedLocks)) {
final Task task = taskAndLock.lhs;
final TaskLock savedTaskLock = taskAndLock.rhs;
if (savedTaskLock.getInterval().toDurationMillis() <= 0) {
// "Impossible", but you never know what crazy stuff can be restored from storage.
log.warn("WTF?! Got lock with empty interval for task: %s", task.getId());
continue;
}
uniqueTaskIds.add(task.getId());
final Optional<TaskLock> acquiredTaskLock = tryLock(
task,
......@@ -205,6 +211,7 @@ public class TaskLockbox
giant.lock();
try {
Preconditions.checkArgument(interval.toDurationMillis() > 0, "interval empty");
final String dataSource = task.getDataSource();
final List<TaskLockPosse> foundPosses = findLockPossesForInterval(dataSource, interval);
final TaskLockPosse posseToUse;
......
......@@ -73,7 +73,7 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>emitter</artifactId>
<version>0.2.6</version>
<version>0.2.7</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册