diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 306350aaaa32439cd338b29b554e1c650d05eb97..4f3b3c175870312e4027521730e9bb27b14e0979 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -152,6 +152,8 @@ public class RealtimePlumber implements Plumber bootstrapSinksFromDisk(); registerServerViewCallback(); startPersistThread(); + // Push pending sinks bootstrapped from previous run + mergeAndPush(); } @Override @@ -595,35 +597,7 @@ public class RealtimePlumber implements Plumber return ScheduledExecutors.Signal.STOP; } - log.info("Starting merge and push."); - - DateTime minTimestampAsDate = segmentGranularity.truncate( - rejectionPolicy.getCurrMaxTime().minus(windowMillis) - ); - long minTimestamp = minTimestampAsDate.getMillis(); - - log.info("Found [%,d] sinks. minTimestamp [%s]", sinks.size(), minTimestampAsDate); - - List> sinksToPush = Lists.newArrayList(); - for (Map.Entry entry : sinks.entrySet()) { - final Long intervalStart = entry.getKey(); - if (intervalStart < minTimestamp) { - log.info("Adding entry[%s] for merge and push.", entry); - sinksToPush.add(entry); - } else { - log.warn( - "[%s] < [%s] Skipping persist and merge.", - new DateTime(intervalStart), - minTimestampAsDate - ); - } - } - - log.info("Found [%,d] sinks to persist and merge", sinksToPush.size()); - - for (final Map.Entry entry : sinksToPush) { - persistAndMerge(entry.getKey(), entry.getValue()); - } + mergeAndPush(); if (stopped) { log.info("Stopping merge-n-push overseer thread"); @@ -636,6 +610,43 @@ public class RealtimePlumber implements Plumber ); } + private void mergeAndPush() + { + final Granularity segmentGranularity = schema.getGranularitySpec().getSegmentGranularity(); + final Period windowPeriod = config.getWindowPeriod(); + + final long windowMillis = windowPeriod.toStandardDuration().getMillis(); + log.info("Starting merge and push."); + + DateTime minTimestampAsDate = segmentGranularity.truncate( + rejectionPolicy.getCurrMaxTime().minus(windowMillis) + ); + long minTimestamp = minTimestampAsDate.getMillis(); + + log.info("Found [%,d] sinks. minTimestamp [%s]", sinks.size(), minTimestampAsDate); + + List> sinksToPush = Lists.newArrayList(); + for (Map.Entry entry : sinks.entrySet()) { + final Long intervalStart = entry.getKey(); + if (intervalStart < minTimestamp) { + log.info("Adding entry[%s] for merge and push.", entry); + sinksToPush.add(entry); + } else { + log.warn( + "[%s] < [%s] Skipping persist and merge.", + new DateTime(intervalStart), + minTimestampAsDate + ); + } + } + + log.info("Found [%,d] sinks to persist and merge", sinksToPush.size()); + + for (final Map.Entry entry : sinksToPush) { + persistAndMerge(entry.getKey(), entry.getValue()); + } + } + /** * Unannounces a given sink and removes all local references to it. */