提交 253125cb 编写于 作者: E Eric Tschetter

1) Adjust RealtimePlumberSchool to take a configurable rejection policy that...

1) Adjust RealtimePlumberSchool to take a configurable rejection policy that lets you switch between server time and message time for rejections.
上级 0da9075f
......@@ -54,6 +54,8 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import org.apache.commons.io.FileUtils;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.annotate.JsonSubTypes;
import org.codehaus.jackson.annotate.JsonTypeInfo;
import org.codehaus.jackson.map.annotate.JacksonInject;
import org.joda.time.DateTime;
import org.joda.time.Duration;
......@@ -81,6 +83,7 @@ public class RealtimePlumberSchool implements PlumberSchool
private final Period windowPeriod;
private final File basePersistDirectory;
private final IndexGranularity segmentGranularity;
private final RejectionPolicyFactory rejectionPolicyFactory;
private volatile Executor persistExecutor = null;
private volatile ScheduledExecutorService scheduledExecutor = null;
......@@ -95,12 +98,14 @@ public class RealtimePlumberSchool implements PlumberSchool
public RealtimePlumberSchool(
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity,
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory
)
{
this.windowPeriod = windowPeriod;
this.basePersistDirectory = basePersistDirectory;
this.segmentGranularity = segmentGranularity;
this.rejectionPolicyFactory = rejectionPolicyFactory;
Preconditions.checkNotNull(windowPeriod, "RealtimePlumberSchool requires a windowPeriod.");
Preconditions.checkNotNull(basePersistDirectory, "RealtimePlumberSchool requires a basePersistDirectory.");
......@@ -335,12 +340,14 @@ public class RealtimePlumberSchool implements PlumberSchool
}
);
final RejectionPolicy rejectionPolicy = rejectionPolicyFactory.create(windowPeriod);
return new Plumber()
{
@Override
public Sink getSink(long timestamp)
{
if (timestamp < System.currentTimeMillis() - windowMillis) { // reject if too old
if (!rejectionPolicy.accept(timestamp)) {
return null;
}
......@@ -527,4 +534,59 @@ public class RealtimePlumberSchool implements PlumberSchool
);
}
}
public interface RejectionPolicy
{
public boolean accept(long timestamp);
}
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "server_time", value = ServerTimeRejectionPolicyFactory.class),
@JsonSubTypes.Type(name = "message_time", value = MessageTimeRejectionPolicyFactory.class)
})
public interface RejectionPolicyFactory
{
public RejectionPolicy create(Period windowPeriod);
}
public class ServerTimeRejectionPolicyFactory implements RejectionPolicyFactory
{
@Override
public RejectionPolicy create(Period windowPeriod)
{
final long windowMillis = windowPeriod.toStandardDuration().getMillis();
return new RejectionPolicy()
{
@Override
public boolean accept(long timestamp)
{
return timestamp >= (System.currentTimeMillis() - windowMillis);
}
};
}
}
public class MessageTimeRejectionPolicyFactory implements RejectionPolicyFactory
{
@Override
public RejectionPolicy create(Period windowPeriod)
{
final long windowMillis = windowPeriod.toStandardDuration().getMillis();
return new RejectionPolicy()
{
long maxTimestamp = Long.MIN_VALUE;
@Override
public boolean accept(long timestamp)
{
maxTimestamp = Math.max(maxTimestamp, timestamp);
return timestamp >= (maxTimestamp - windowMillis);
}
};
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册