提交 20acd6f4 编写于 作者: G Gian Merlino

RealtimePlumberSchool: Persist and merge immediately upon finishJob

上级 5d44f0f1
......@@ -105,8 +105,6 @@ public class RealtimePlumberSchool implements PlumberSchool
private volatile SegmentPublisher segmentPublisher = null;
private volatile ServerView serverView = null;
private volatile boolean noMoreData = false;
@JsonCreator
public RealtimePlumberSchool(
@JsonProperty("windowPeriod") Period windowPeriod,
......@@ -321,12 +319,89 @@ public class RealtimePlumberSchool implements PlumberSchool
);
}
// Submits persist-n-merge task for a Sink to the persistExecutor
private void persistAndMerge(final long truncatedTime, final Sink sink)
{
final String threadName = String.format(
"%s-%s-persist-n-merge", schema.getDataSource(), new DateTime(truncatedTime)
);
persistExecutor.execute(
new ThreadRenamingRunnable(threadName)
{
@Override
public void doRun()
{
final Interval interval = sink.getInterval();
for (FireHydrant hydrant : sink) {
if (!hydrant.hasSwapped()) {
log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink);
final int rowCount = persistHydrant(hydrant, schema, interval);
metrics.incrementRowOutputCount(rowCount);
}
}
final File mergedTarget = new File(computePersistDir(schema, interval), "merged");
if (mergedTarget.exists()) {
log.info("Skipping already-merged sink: %s", sink);
return;
}
File mergedFile = null;
try {
List<QueryableIndex> indexes = Lists.newArrayList();
for (FireHydrant fireHydrant : sink) {
Segment segment = fireHydrant.getSegment();
final QueryableIndex queryableIndex = segment.asQueryableIndex();
log.info("Adding hydrant[%s]", fireHydrant);
indexes.add(queryableIndex);
}
mergedFile = IndexMerger.mergeQueryableIndex(
indexes,
schema.getAggregators(),
mergedTarget
);
QueryableIndex index = IndexIO.loadIndex(mergedFile);
DataSegment segment = dataSegmentPusher.push(
mergedFile,
sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions()))
);
segmentPublisher.publishSegment(segment);
}
catch (IOException e) {
log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource())
.addData("interval", interval)
.emit();
}
if (mergedFile != null) {
try {
if (mergedFile != null) {
log.info("Deleting Index File[%s]", mergedFile);
FileUtils.deleteDirectory(mergedFile);
}
}
catch (IOException e) {
log.warn(e, "Error deleting directory[%s]", mergedFile);
}
}
}
}
);
}
@Override
public void finishJob()
{
log.info("Shutting down...");
noMoreData = true;
for (final Map.Entry<Long, Sink> entry : sinks.entrySet()) {
persistAndMerge(entry.getKey(), entry.getValue());
}
while (!sinks.isEmpty()) {
try {
......@@ -557,79 +632,14 @@ public class RealtimePlumberSchool implements PlumberSchool
List<Map.Entry<Long, Sink>> sinksToPush = Lists.newArrayList();
for (Map.Entry<Long, Sink> entry : sinks.entrySet()) {
final Long intervalStart = entry.getKey();
if (noMoreData || intervalStart < minTimestamp) {
if (intervalStart < minTimestamp) {
log.info("Adding entry[%s] for merge and push.", entry);
sinksToPush.add(entry);
}
}
for (final Map.Entry<Long, Sink> entry : sinksToPush) {
final Sink sink = entry.getValue();
final String threadName = String.format(
"%s-%s-persist-n-merge", schema.getDataSource(), new DateTime(entry.getKey())
);
persistExecutor.execute(
new ThreadRenamingRunnable(threadName)
{
@Override
public void doRun()
{
final Interval interval = sink.getInterval();
for (FireHydrant hydrant : sink) {
if (!hydrant.hasSwapped()) {
log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink);
final int rowCount = persistHydrant(hydrant, schema, interval);
metrics.incrementRowOutputCount(rowCount);
}
}
File mergedFile = null;
try {
List<QueryableIndex> indexes = Lists.newArrayList();
for (FireHydrant fireHydrant : sink) {
Segment segment = fireHydrant.getSegment();
final QueryableIndex queryableIndex = segment.asQueryableIndex();
log.info("Adding hydrant[%s]", fireHydrant);
indexes.add(queryableIndex);
}
mergedFile = IndexMerger.mergeQueryableIndex(
indexes,
schema.getAggregators(),
new File(computePersistDir(schema, interval), "merged")
);
QueryableIndex index = IndexIO.loadIndex(mergedFile);
DataSegment segment = dataSegmentPusher.push(
mergedFile,
sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions()))
);
segmentPublisher.publishSegment(segment);
}
catch (IOException e) {
log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource())
.addData("interval", interval)
.emit();
}
if (mergedFile != null) {
try {
if (mergedFile != null) {
log.info("Deleting Index File[%s]", mergedFile);
FileUtils.deleteDirectory(mergedFile);
}
}
catch (IOException e) {
log.warn(e, "Error deleting directory[%s]", mergedFile);
}
}
}
}
);
persistAndMerge(entry.getKey(), entry.getValue());
}
if (stopped) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册