提交 2427e818 编写于 作者: G Gian Merlino

Merger: Feedback from code review

上级 14cf506c
......@@ -51,27 +51,8 @@ public class SegmentInsertAction implements TaskAction<Void>
@Override
public Void perform(TaskActionToolbox toolbox)
{
// Verify that each of these segments-to-insert falls under some lock
// TODO: Should this be done while holding the giant lock on TaskLockbox? (To prevent anyone from grabbing
// TODO: these locks out from under us while the operation is ongoing.) Probably not necessary.
final List<TaskLock> taskLocks = toolbox.getTaskLockbox().findLocksForTask(task);
for(final DataSegment segment : segments) {
final boolean ok = Iterables.any(
taskLocks, new Predicate<TaskLock>()
{
@Override
public boolean apply(TaskLock taskLock)
{
return taskLock.getVersion().equals(segment.getVersion())
&& taskLock.getDataSource().equals(segment.getDataSource())
&& taskLock.getInterval().contains(segment.getInterval());
}
}
);
if(!ok) {
throw new ISE("No currently-held lock covers segment: %s", segment);
}
if(!toolbox.taskLockCoversSegments(task, segments, false)) {
throw new ISE("Segments not covered by locks for task: %s", task.getId());
}
try {
......
......@@ -51,27 +51,8 @@ public class SegmentNukeAction implements TaskAction<Void>
@Override
public Void perform(TaskActionToolbox toolbox)
{
// Verify that each of these segments-to-nuke falls under some lock
// TODO: Should this be done while holding the giant lock on TaskLockbox? (To prevent anyone from grabbing
// TODO: these locks out from under us while the operation is ongoing.) Probably not necessary.
final List<TaskLock> taskLocks = toolbox.getTaskLockbox().findLocksForTask(task);
for(final DataSegment segment : segments) {
final boolean ok = Iterables.any(
taskLocks, new Predicate<TaskLock>()
{
@Override
public boolean apply(TaskLock taskLock)
{
return taskLock.getVersion().compareTo(segment.getVersion()) >= 0
&& taskLock.getDataSource().equals(segment.getDataSource())
&& taskLock.getInterval().contains(segment.getInterval());
}
}
);
if(!ok) {
throw new ISE("No currently-held lock covers segment: %s", segment);
}
if(!toolbox.taskLockCoversSegments(task, segments, true)) {
throw new ISE("Segments not covered by locks for task: %s", task.getId());
}
try {
......
package com.metamx.druid.merger.common.actions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.metamx.common.ISE;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.merger.common.TaskLock;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.MergerDBCoordinator;
import com.metamx.druid.merger.coordinator.TaskLockbox;
import com.metamx.druid.merger.coordinator.TaskQueue;
import com.metamx.emitter.service.ServiceEmitter;
import java.util.List;
import java.util.Set;
public class TaskActionToolbox
{
private final TaskQueue taskQueue;
......@@ -44,4 +53,43 @@ public class TaskActionToolbox
{
return emitter;
}
public boolean taskLockCoversSegments(
final Task task,
final Set<DataSegment> segments,
final boolean allowOlderVersions
)
{
// Verify that each of these segments falls under some lock
// NOTE: It is possible for our lock to be revoked (if the task has failed and given up its locks) after we check
// NOTE: it and before we perform the segment insert, but, that should be OK since the worst that happens is we
// NOTE: insert some segments from the task but not others.
final List<TaskLock> taskLocks = getTaskLockbox().findLocksForTask(task);
for(final DataSegment segment : segments) {
final boolean ok = Iterables.any(
taskLocks, new Predicate<TaskLock>()
{
@Override
public boolean apply(TaskLock taskLock)
{
final boolean versionOk = allowOlderVersions
? taskLock.getVersion().compareTo(segment.getVersion()) >= 0
: taskLock.getVersion().equals(segment.getVersion());
return versionOk
&& taskLock.getDataSource().equals(segment.getDataSource())
&& taskLock.getInterval().contains(segment.getInterval());
}
}
);
if (!ok) {
return false;
}
}
return true;
}
}
......@@ -13,7 +13,7 @@ public abstract class TaskConfig
@Config("druid.merger.rowFlushBoundary")
@Default("500000")
public abstract long getRowFlushBoundary();
public abstract int getDefaultRowFlushBoundary();
public File getTaskDir(final Task task) {
return new File(getBaseTaskDir(), task.getId());
......
......@@ -69,20 +69,13 @@ public abstract class AbstractTask implements Task
return dataSource;
}
@JsonProperty("interval")
@Override
public Optional<Interval> getFixedInterval()
{
return interval;
}
// Awesome hack to get around lack of serde for Optional<T>
// TODO Look into jackson-datatype-guava
@JsonProperty("interval")
private Interval getNullableIntervalForJackson()
{
return interval.orNull();
}
@Override
public TaskStatus preflight(TaskToolbox toolbox) throws Exception
{
......
......@@ -50,11 +50,16 @@ public class IndexDeterminePartitionsTask extends AbstractTask
{
@JsonProperty
private final FirehoseFactory firehoseFactory;
@JsonProperty
private final Schema schema;
@JsonProperty
private final long targetPartitionSize;
@JsonProperty
private final int rowFlushBoundary;
private static final Logger log = new Logger(IndexTask.class);
@JsonCreator
......@@ -63,7 +68,8 @@ public class IndexDeterminePartitionsTask extends AbstractTask
@JsonProperty("interval") Interval interval,
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("schema") Schema schema,
@JsonProperty("targetPartitionSize") long targetPartitionSize
@JsonProperty("targetPartitionSize") long targetPartitionSize,
@JsonProperty("rowFlushBoundary") int rowFlushBoundary
)
{
super(
......@@ -81,6 +87,7 @@ public class IndexDeterminePartitionsTask extends AbstractTask
this.firehoseFactory = firehoseFactory;
this.schema = schema;
this.targetPartitionSize = targetPartitionSize;
this.rowFlushBoundary = rowFlushBoundary;
}
@Override
......@@ -244,7 +251,8 @@ public class IndexDeterminePartitionsTask extends AbstractTask
schema.getAggregators(),
schema.getIndexGranularity(),
shardSpec
)
),
rowFlushBoundary
);
}
}
......
......@@ -58,6 +58,9 @@ public class IndexGeneratorTask extends AbstractTask
@JsonProperty
private final Schema schema;
@JsonProperty
private final int rowFlushBoundary;
private static final Logger log = new Logger(IndexTask.class);
@JsonCreator
......@@ -65,7 +68,8 @@ public class IndexGeneratorTask extends AbstractTask
@JsonProperty("groupId") String groupId,
@JsonProperty("interval") Interval interval,
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("schema") Schema schema
@JsonProperty("schema") Schema schema,
@JsonProperty("rowFlushBoundary") int rowFlushBoundary
)
{
super(
......@@ -83,6 +87,7 @@ public class IndexGeneratorTask extends AbstractTask
this.firehoseFactory = firehoseFactory;
this.schema = schema;
this.rowFlushBoundary = rowFlushBoundary;
}
@Override
......@@ -139,6 +144,11 @@ public class IndexGeneratorTask extends AbstractTask
tmpDir
).findPlumber(schema, metrics);
// rowFlushBoundary for this job
final int myRowFlushBoundary = this.rowFlushBoundary > 0
? rowFlushBoundary
: toolbox.getConfig().getDefaultRowFlushBoundary();
try {
while(firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow();
......@@ -157,7 +167,7 @@ public class IndexGeneratorTask extends AbstractTask
int numRows = sink.add(inputRow);
metrics.incrementProcessed();
if(numRows >= toolbox.getConfig().getRowFlushBoundary()) {
if(numRows >= myRowFlushBoundary) {
plumber.persist(firehose.commit());
}
} else {
......
......@@ -42,11 +42,23 @@ import java.util.List;
public class IndexTask extends AbstractTask
{
@JsonProperty private final GranularitySpec granularitySpec;
@JsonProperty private final AggregatorFactory[] aggregators;
@JsonProperty private final QueryGranularity indexGranularity;
@JsonProperty private final long targetPartitionSize;
@JsonProperty private final FirehoseFactory firehoseFactory;
@JsonProperty
private final GranularitySpec granularitySpec;
@JsonProperty
private final AggregatorFactory[] aggregators;
@JsonProperty
private final QueryGranularity indexGranularity;
@JsonProperty
private final long targetPartitionSize;
@JsonProperty
private final FirehoseFactory firehoseFactory;
@JsonProperty
private final int rowFlushBoundary;
private static final Logger log = new Logger(IndexTask.class);
......@@ -57,7 +69,8 @@ public class IndexTask extends AbstractTask
@JsonProperty("aggregators") AggregatorFactory[] aggregators,
@JsonProperty("indexGranularity") QueryGranularity indexGranularity,
@JsonProperty("targetPartitionSize") long targetPartitionSize,
@JsonProperty("firehose") FirehoseFactory firehoseFactory
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("rowFlushBoundary") int rowFlushBoundary
)
{
super(
......@@ -75,6 +88,7 @@ public class IndexTask extends AbstractTask
this.indexGranularity = indexGranularity;
this.targetPartitionSize = targetPartitionSize;
this.firehoseFactory = firehoseFactory;
this.rowFlushBoundary = rowFlushBoundary;
}
public List<Task> toSubtasks()
......@@ -95,7 +109,8 @@ public class IndexTask extends AbstractTask
indexGranularity,
new NoneShardSpec()
),
targetPartitionSize
targetPartitionSize,
rowFlushBoundary
)
);
} else {
......@@ -110,7 +125,8 @@ public class IndexTask extends AbstractTask
aggregators,
indexGranularity,
new NoneShardSpec()
)
),
rowFlushBoundary
)
);
}
......
......@@ -22,6 +22,7 @@ package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Objects;
......@@ -33,6 +34,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
......@@ -46,7 +48,6 @@ import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.emitter.service.AlertEvent;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import org.apache.commons.codec.digest.DigestUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
......@@ -281,7 +282,11 @@ public abstract class MergeTask extends AbstractTask
)
);
return String.format("%s_%s", dataSource, DigestUtils.sha1Hex(segmentIDs).toLowerCase());
return String.format(
"%s_%s",
dataSource,
Hashing.sha1().hashString(segmentIDs, Charsets.UTF_8).toString().toLowerCase()
);
}
private static Interval computeMergedInterval(final List<DataSegment> segments)
......
......@@ -19,9 +19,10 @@
package com.metamx.druid.merger.common.task;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.hash.Hashing;
import com.metamx.druid.client.DataSegment;
import org.apache.commons.codec.digest.DigestUtils;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
......@@ -72,11 +73,12 @@ public class MergeTaskTest
@Test
public void testID()
{
final String desiredPrefix = "merge_foo_" + DigestUtils.sha1Hex(
final String desiredPrefix = "merge_foo_" + Hashing.sha1().hashString(
"2012-01-03T00:00:00.000Z_2012-01-05T00:00:00.000Z_V1_0"
+ "_2012-01-04T00:00:00.000Z_2012-01-06T00:00:00.000Z_V1_0"
+ "_2012-01-05T00:00:00.000Z_2012-01-07T00:00:00.000Z_V1_0"
) + "_";
, Charsets.UTF_8
).toString().toLowerCase() + "_";
Assert.assertEquals(
desiredPrefix,
testMergeTask.getId().substring(0, desiredPrefix.length())
......
......@@ -26,7 +26,8 @@ public class TaskSerdeTest
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
QueryGranularity.NONE,
10000,
null
null,
-1
);
final ObjectMapper jsonMapper = new DefaultObjectMapper();
......@@ -52,7 +53,8 @@ public class TaskSerdeTest
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
QueryGranularity.NONE,
new NoneShardSpec()
)
),
-1
);
final ObjectMapper jsonMapper = new DefaultObjectMapper();
......@@ -97,6 +99,7 @@ public class TaskSerdeTest
final ObjectMapper jsonMapper = new DefaultObjectMapper();
final String json = jsonMapper.writeValueAsString(task);
System.out.println(json);
final Task task2 = jsonMapper.readValue(json, Task.class);
Assert.assertEquals(task.getId(), task2.getId());
......
......@@ -291,7 +291,7 @@ public class RemoteTaskRunnerTest
}
@Override
public long getRowFlushBoundary()
public int getDefaultRowFlushBoundary()
{
return 0;
}
......
......@@ -106,7 +106,7 @@ public class TaskLifecycleTest
}
@Override
public long getRowFlushBoundary()
public int getDefaultRowFlushBoundary()
{
return 50000;
}
......@@ -182,7 +182,8 @@ public class TaskLifecycleTest
IR("2010-01-02T01", "a", "b", 2),
IR("2010-01-02T01", "a", "c", 1)
)
)
),
-1
);
final TaskStatus mergedStatus = runTask(indexTask);
......@@ -216,7 +217,8 @@ public class TaskLifecycleTest
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
QueryGranularity.NONE,
10000,
newMockExceptionalFirehoseFactory()
newMockExceptionalFirehoseFactory(),
-1
);
final TaskStatus mergedStatus = runTask(indexTask);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册