提交 b85c54d8 编写于 作者: E Eric Tschetter

Merge branch 'master' of github.com:metamx/druid

......@@ -81,10 +81,12 @@ public class YeOldePlumberSchool implements PlumberSchool
// There can be only one.
final Sink theSink = new Sink(interval, schema);
// Temporary directory to hold spilled segments.
final File persistDir = new File(
tmpSegmentDir, theSink.getSegment().withVersion(version).getIdentifier()
);
// Set of spilled segments. Will be merged at the end.
final Set<File> spilled = Sets.newHashSet();
return new Plumber()
......
......@@ -163,6 +163,17 @@ public class IndexGeneratorTask extends AbstractTask
plumber.persist(firehose.commit());
plumber.finishJob();
// Output metrics
log.info(
"Task[%s] took in %,d rows (%,d processed, %,d unparseable, %,d thrown away) and output %,d rows",
getId(),
metrics.processed() + metrics.unparseable() + metrics.thrownAway(),
metrics.processed(),
metrics.unparseable(),
metrics.thrownAway(),
metrics.rowOutput()
);
// Done
return TaskStatus.success(getId(), ImmutableList.copyOf(pushedSegments));
}
......
......@@ -108,16 +108,16 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
segmentsToMerge.add(timelineObjects.get(i));
if (segmentsToMerge.getMergedSize() > params.getMergeBytesLimit()
|| segmentsToMerge.size() >= params.getMergeSegmentsLimit())
if (segmentsToMerge.getByteCount() > params.getMergeBytesLimit()
|| segmentsToMerge.getSegmentCount() >= params.getMergeSegmentsLimit())
{
i -= segmentsToMerge.backtrack(params.getMergeBytesLimit());
if (segmentsToMerge.size() > 1) {
if (segmentsToMerge.getSegmentCount() > 1) {
count += mergeSegments(segmentsToMerge, entry.getKey());
}
if (segmentsToMerge.size() == 0) {
if (segmentsToMerge.getSegmentCount() == 0) {
// Backtracked all the way to zero. Increment by one so we continue to make progress.
i++;
}
......@@ -128,7 +128,7 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
// Finish any timelineObjects to merge that may have not hit threshold
segmentsToMerge.backtrack(params.getMergeBytesLimit());
if (segmentsToMerge.size() > 1) {
if (segmentsToMerge.getSegmentCount() > 1) {
count += mergeSegments(segmentsToMerge, entry.getKey());
}
}
......@@ -182,13 +182,13 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
// (timeline object, union interval of underlying segments up to this point in the list)
private final List<Pair<TimelineObjectHolder<String, DataSegment>, Interval>> timelineObjects;
private long mergedSize;
private long byteCount;
private SegmentsToMerge()
{
this.timelineObjects = Lists.newArrayList();
this.segments = HashMultiset.create();
this.mergedSize = 0;
this.byteCount = 0;
}
public List<DataSegment> getSegments()
......@@ -233,7 +233,7 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
for(final PartitionChunk<DataSegment> segment : timelineObject.getObject()) {
segments.add(segment.getObject());
if(segments.count(segment.getObject()) == 1) {
mergedSize += segment.getObject().getSize();
byteCount += segment.getObject().getSize();
}
}
......@@ -275,12 +275,12 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
}
}
public long getMergedSize()
public long getByteCount()
{
return mergedSize;
return byteCount;
}
public int size()
public int getSegmentCount()
{
return timelineObjects.size();
}
......@@ -303,14 +303,14 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
Preconditions.checkArgument(maxSize >= 0, "maxSize >= 0");
int removed = 0;
while (!isComplete() || mergedSize > maxSize) {
while (!isComplete() || byteCount > maxSize) {
removed ++;
final TimelineObjectHolder<String, DataSegment> removedHolder = timelineObjects.remove(timelineObjects.size() - 1).lhs;
for(final PartitionChunk<DataSegment> segment : removedHolder.getObject()) {
segments.remove(segment.getObject());
if(segments.count(segment.getObject()) == 0) {
mergedSize -= segment.getObject().getSize();
byteCount -= segment.getObject().getSize();
}
}
}
......
......@@ -101,6 +101,22 @@ public class DruidMasterSegmentMergerTest
);
}
@Test
public void testMergeNoncontiguous()
{
final List<DataSegment> segments = ImmutableList.of(
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("2").size(10).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-03/P1D")).version("2").size(10).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("2").size(10).build()
);
Assert.assertEquals(
ImmutableList.of(
ImmutableList.of(segments.get(0), segments.get(1), segments.get(2))
), merge(segments)
);
}
@Test
public void testMergeSeriesByteLimited()
{
......@@ -176,6 +192,40 @@ public class DruidMasterSegmentMergerTest
);
}
@Test
public void testOverlappingMergeWithGapsAlignedStart()
{
final List<DataSegment> segments = ImmutableList.of(
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P8D")).version("2").size(80).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("3").size(8).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("3").size(8).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-09/P1D")).version("3").size(8).build()
);
Assert.assertEquals(
ImmutableList.of(
ImmutableList.of(segments.get(1), segments.get(0), segments.get(2))
), merge(segments)
);
}
@Test
public void testOverlappingMergeWithGapsNonalignedStart()
{
final List<DataSegment> segments = ImmutableList.of(
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P8D")).version("2").size(80).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-02/P1D")).version("3").size(8).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("3").size(8).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-09/P1D")).version("3").size(8).build()
);
Assert.assertEquals(
ImmutableList.of(
ImmutableList.of(segments.get(0), segments.get(1), segments.get(2))
), merge(segments)
);
}
@Test
public void testOverlappingMerge1()
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册