From 5c96d500d80e1335e1f5378aff1bd9d8caaa36df Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 24 Jul 2013 15:29:40 -0700 Subject: [PATCH] More controllable realtime shutdown - Realtime plumber will start persisting segments shortly after finishJob is called, regardless of rejection policy - Add "none" rejection policy --- .../plumber/NoopRejectionPolicyFactory.java | 26 +++++++++++++++++++ .../plumber/RealtimePlumberSchool.java | 6 ++++- .../plumber/RejectionPolicyFactory.java | 3 ++- 3 files changed, 33 insertions(+), 2 deletions(-) create mode 100644 realtime/src/main/java/com/metamx/druid/realtime/plumber/NoopRejectionPolicyFactory.java diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/NoopRejectionPolicyFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/NoopRejectionPolicyFactory.java new file mode 100644 index 0000000000..59a3e24cb2 --- /dev/null +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/NoopRejectionPolicyFactory.java @@ -0,0 +1,26 @@ +package com.metamx.druid.realtime.plumber; + +import org.joda.time.DateTime; +import org.joda.time.Period; + +public class NoopRejectionPolicyFactory implements RejectionPolicyFactory +{ + @Override + public RejectionPolicy create(Period windowPeriod) + { + return new RejectionPolicy() + { + @Override + public DateTime getCurrMaxTime() + { + return new DateTime(0); + } + + @Override + public boolean accept(long timestamp) + { + return true; + } + }; + } +} diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java index a429fbef9d..894e8f1df2 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java @@ -105,6 +105,8 @@ public class RealtimePlumberSchool implements PlumberSchool private volatile SegmentPublisher segmentPublisher = null; private volatile ServerView serverView = null; + private volatile boolean noMoreData = false; + @JsonCreator public RealtimePlumberSchool( @JsonProperty("windowPeriod") Period windowPeriod, @@ -324,6 +326,8 @@ public class RealtimePlumberSchool implements PlumberSchool { log.info("Shutting down..."); + noMoreData = true; + while (!sinks.isEmpty()) { try { log.info( @@ -553,7 +557,7 @@ public class RealtimePlumberSchool implements PlumberSchool List> sinksToPush = Lists.newArrayList(); for (Map.Entry entry : sinks.entrySet()) { final Long intervalStart = entry.getKey(); - if (intervalStart < minTimestamp) { + if (noMoreData || intervalStart < minTimestamp) { log.info("Adding entry[%s] for merge and push.", entry); sinksToPush.add(entry); } diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RejectionPolicyFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RejectionPolicyFactory.java index 40e8e496bf..b47b5c11ea 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RejectionPolicyFactory.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RejectionPolicyFactory.java @@ -7,7 +7,8 @@ import org.joda.time.Period; @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes(value = { @JsonSubTypes.Type(name = "serverTime", value = ServerTimeRejectionPolicyFactory.class), - @JsonSubTypes.Type(name = "messageTime", value = MessageTimeRejectionPolicyFactory.class) + @JsonSubTypes.Type(name = "messageTime", value = MessageTimeRejectionPolicyFactory.class), + @JsonSubTypes.Type(name = "none", value = NoopRejectionPolicyFactory.class) }) public interface RejectionPolicyFactory { -- GitLab