diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/NoopRejectionPolicyFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/NoopRejectionPolicyFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..59a3e24cb21539d8271c3061387dd770211a2ba1 --- /dev/null +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/NoopRejectionPolicyFactory.java @@ -0,0 +1,26 @@ +package com.metamx.druid.realtime.plumber; + +import org.joda.time.DateTime; +import org.joda.time.Period; + +public class NoopRejectionPolicyFactory implements RejectionPolicyFactory +{ + @Override + public RejectionPolicy create(Period windowPeriod) + { + return new RejectionPolicy() + { + @Override + public DateTime getCurrMaxTime() + { + return new DateTime(0); + } + + @Override + public boolean accept(long timestamp) + { + return true; + } + }; + } +} diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java index a429fbef9d589aff71b1b87e068b81bb20d5f640..894e8f1df284c9ee6e05bada12d19119b2a4210e 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java @@ -105,6 +105,8 @@ public class RealtimePlumberSchool implements PlumberSchool private volatile SegmentPublisher segmentPublisher = null; private volatile ServerView serverView = null; + private volatile boolean noMoreData = false; + @JsonCreator public RealtimePlumberSchool( @JsonProperty("windowPeriod") Period windowPeriod, @@ -324,6 +326,8 @@ public class RealtimePlumberSchool implements PlumberSchool { log.info("Shutting down..."); + noMoreData = true; + while (!sinks.isEmpty()) { try { log.info( @@ -553,7 +557,7 @@ public class RealtimePlumberSchool implements PlumberSchool List> sinksToPush = Lists.newArrayList(); for (Map.Entry entry : sinks.entrySet()) { final Long intervalStart = entry.getKey(); - if (intervalStart < minTimestamp) { + if (noMoreData || intervalStart < minTimestamp) { log.info("Adding entry[%s] for merge and push.", entry); sinksToPush.add(entry); } diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RejectionPolicyFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RejectionPolicyFactory.java index 40e8e496bf6d3bed113dc25ed3fe11d9923ec8fb..b47b5c11eac792565a933fb3e6f7de26bbb41b50 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RejectionPolicyFactory.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RejectionPolicyFactory.java @@ -7,7 +7,8 @@ import org.joda.time.Period; @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes(value = { @JsonSubTypes.Type(name = "serverTime", value = ServerTimeRejectionPolicyFactory.class), - @JsonSubTypes.Type(name = "messageTime", value = MessageTimeRejectionPolicyFactory.class) + @JsonSubTypes.Type(name = "messageTime", value = MessageTimeRejectionPolicyFactory.class), + @JsonSubTypes.Type(name = "none", value = NoopRejectionPolicyFactory.class) }) public interface RejectionPolicyFactory {