提交 56f1d924 编写于 作者: G Gian Merlino

Merge pull request #681 from metamx/fix-rt

fix some rt cleanup and failure edge cases
......@@ -312,6 +312,22 @@ public class RealtimePlumber implements Plumber
{
final Interval interval = sink.getInterval();
// use a file to indicate that pushing has completed
final File persistDir = computePersistDir(schema, interval);
final File mergedTarget = new File(persistDir, "merged");
final File isPushedMarker = new File(persistDir, "isPushedMarker");
if (!isPushedMarker.exists()) {
removeSegment(sink, mergedTarget);
if (mergedTarget.exists()) {
log.wtf("Merged target[%s] exists?!", mergedTarget);
return;
}
} else {
log.info("Already pushed sink[%s]", sink);
return;
}
for (FireHydrant hydrant : sink) {
synchronized (hydrant) {
if (!hydrant.hasSwapped()) {
......@@ -322,12 +338,6 @@ public class RealtimePlumber implements Plumber
}
}
final File mergedTarget = new File(computePersistDir(schema, interval), "merged");
if (mergedTarget.exists()) {
log.info("Skipping already-merged sink: %s", sink);
return;
}
try {
List<QueryableIndex> indexes = Lists.newArrayList();
for (FireHydrant fireHydrant : sink) {
......@@ -351,6 +361,14 @@ public class RealtimePlumber implements Plumber
);
segmentPublisher.publishSegment(segment);
if (!isPushedMarker.createNewFile()) {
log.makeAlert("Failed to create marker file for [%s]", schema.getDataSource())
.addData("interval", sink.getInterval())
.addData("partitionNum", segment.getShardSpec().getPartitionNum())
.addData("marker", isPushedMarker)
.emit();
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource())
......@@ -360,9 +378,6 @@ public class RealtimePlumber implements Plumber
// We're trying to shut down, and this segment failed to push. Let's just get rid of it.
// This call will also delete possibly-partially-written files, so we don't need to do it explicitly.
abandonSegment(truncatedTime, sink);
} else {
// Delete any possibly-partially-written files, so we can try again on the next push cycle.
removeMergedSegment(sink);
}
}
}
......@@ -649,7 +664,7 @@ public class RealtimePlumber implements Plumber
{
try {
segmentAnnouncer.unannounceSegment(sink.getSegment());
removeMergedSegment(sink);
removeSegment(sink, computePersistDir(schema, sink.getInterval()));
log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier());
sinks.remove(truncatedTime);
sinkTimeline.remove(
......@@ -785,29 +800,29 @@ public class RealtimePlumber implements Plumber
&& config.getShardSpec().getPartitionNum() == segment.getShardSpec().getPartitionNum()
&& Iterables.any(
sinks.keySet(), new Predicate<Long>()
{
@Override
public boolean apply(Long sinkKey)
{
return segment.getInterval().contains(sinkKey);
}
}
{
@Override
public boolean apply(Long sinkKey)
{
return segment.getInterval().contains(sinkKey);
}
}
);
}
}
);
}
private void removeMergedSegment(final Sink sink)
private void removeSegment(final Sink sink, final File target)
{
final File mergedTarget = new File(computePersistDir(schema, sink.getInterval()), "merged");
if (mergedTarget.exists()) {
if (target.exists()) {
try {
log.info("Deleting Index File[%s]", mergedTarget);
FileUtils.deleteDirectory(mergedTarget);
log.info("Deleting Index File[%s]", target);
FileUtils.deleteDirectory(target);
}
catch (Exception e) {
log.makeAlert(e, "Unable to remove merged segment for dataSource[%s]", schema.getDataSource())
log.makeAlert(e, "Unable to remove file for dataSource[%s]", schema.getDataSource())
.addData("file", target)
.addData("interval", sink.getInterval())
.emit();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册