提交 25550472 编写于 作者: F fjy

Merge pull request #615 from metamx/push-bootstrapped-sinks

push sinks after bootstrap. fix #570
......@@ -152,6 +152,8 @@ public class RealtimePlumber implements Plumber
bootstrapSinksFromDisk();
registerServerViewCallback();
startPersistThread();
// Push pending sinks bootstrapped from previous run
mergeAndPush();
}
@Override
......@@ -595,35 +597,7 @@ public class RealtimePlumber implements Plumber
return ScheduledExecutors.Signal.STOP;
}
log.info("Starting merge and push.");
DateTime minTimestampAsDate = segmentGranularity.truncate(
rejectionPolicy.getCurrMaxTime().minus(windowMillis)
);
long minTimestamp = minTimestampAsDate.getMillis();
log.info("Found [%,d] sinks. minTimestamp [%s]", sinks.size(), minTimestampAsDate);
List<Map.Entry<Long, Sink>> sinksToPush = Lists.newArrayList();
for (Map.Entry<Long, Sink> entry : sinks.entrySet()) {
final Long intervalStart = entry.getKey();
if (intervalStart < minTimestamp) {
log.info("Adding entry[%s] for merge and push.", entry);
sinksToPush.add(entry);
} else {
log.warn(
"[%s] < [%s] Skipping persist and merge.",
new DateTime(intervalStart),
minTimestampAsDate
);
}
}
log.info("Found [%,d] sinks to persist and merge", sinksToPush.size());
for (final Map.Entry<Long, Sink> entry : sinksToPush) {
persistAndMerge(entry.getKey(), entry.getValue());
}
mergeAndPush();
if (stopped) {
log.info("Stopping merge-n-push overseer thread");
......@@ -636,6 +610,43 @@ public class RealtimePlumber implements Plumber
);
}
private void mergeAndPush()
{
final Granularity segmentGranularity = schema.getGranularitySpec().getSegmentGranularity();
final Period windowPeriod = config.getWindowPeriod();
final long windowMillis = windowPeriod.toStandardDuration().getMillis();
log.info("Starting merge and push.");
DateTime minTimestampAsDate = segmentGranularity.truncate(
rejectionPolicy.getCurrMaxTime().minus(windowMillis)
);
long minTimestamp = minTimestampAsDate.getMillis();
log.info("Found [%,d] sinks. minTimestamp [%s]", sinks.size(), minTimestampAsDate);
List<Map.Entry<Long, Sink>> sinksToPush = Lists.newArrayList();
for (Map.Entry<Long, Sink> entry : sinks.entrySet()) {
final Long intervalStart = entry.getKey();
if (intervalStart < minTimestamp) {
log.info("Adding entry[%s] for merge and push.", entry);
sinksToPush.add(entry);
} else {
log.warn(
"[%s] < [%s] Skipping persist and merge.",
new DateTime(intervalStart),
minTimestampAsDate
);
}
}
log.info("Found [%,d] sinks to persist and merge", sinksToPush.size());
for (final Map.Entry<Long, Sink> entry : sinksToPush) {
persistAndMerge(entry.getKey(), entry.getValue());
}
}
/**
* Unannounces a given sink and removes all local references to it.
*/
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册