提交 30d98f56 编写于 作者: G Gian Merlino

RealtimeIndexTask: Allow configurable rejection policies

上级 5c96d500
......@@ -52,8 +52,10 @@ import com.metamx.druid.realtime.Schema;
import com.metamx.druid.realtime.SegmentPublisher;
import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.druid.realtime.plumber.NoopRejectionPolicyFactory;
import com.metamx.druid.realtime.plumber.Plumber;
import com.metamx.druid.realtime.plumber.RealtimePlumberSchool;
import com.metamx.druid.realtime.plumber.RejectionPolicyFactory;
import com.metamx.druid.realtime.plumber.Sink;
import com.metamx.druid.realtime.plumber.VersioningPolicy;
import com.metamx.emitter.EmittingLogger;
......@@ -91,6 +93,9 @@ public class RealtimeIndexTask extends AbstractTask
@JsonIgnore
private final IndexGranularity segmentGranularity;
@JsonIgnore
private final RejectionPolicyFactory rejectionPolicyFactory;
@JsonIgnore
private volatile Plumber plumber = null;
......@@ -105,7 +110,8 @@ public class RealtimeIndexTask extends AbstractTask
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig,
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity,
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory
)
{
super(
......@@ -128,6 +134,7 @@ public class RealtimeIndexTask extends AbstractTask
this.fireDepartmentConfig = fireDepartmentConfig;
this.windowPeriod = windowPeriod;
this.segmentGranularity = segmentGranularity;
this.rejectionPolicyFactory = rejectionPolicyFactory;
}
@Override
......@@ -269,6 +276,10 @@ public class RealtimeIndexTask extends AbstractTask
realtimePlumberSchool.setServerView(toolbox.getNewSegmentServerView());
realtimePlumberSchool.setServiceEmitter(toolbox.getEmitter());
if (this.rejectionPolicyFactory != null) {
realtimePlumberSchool.setRejectionPolicyFactory(new NoopRejectionPolicyFactory());
}
final FireDepartment fireDepartment = new FireDepartment(schema, fireDepartmentConfig, null, null);
final RealtimeMetricsMonitor metricsMonitor = new RealtimeMetricsMonitor(ImmutableList.of(fireDepartment));
this.queryRunnerFactoryConglomerate = toolbox.getQueryRunnerFactoryConglomerate();
......@@ -375,6 +386,12 @@ public class RealtimeIndexTask extends AbstractTask
return segmentGranularity;
}
@JsonProperty("rejectionPolicy")
public RejectionPolicyFactory getRejectionPolicyFactory()
{
return rejectionPolicyFactory;
}
public static class TaskActionSegmentPublisher implements SegmentPublisher
{
final Task task;
......
......@@ -207,7 +207,8 @@ public class TaskSerdeTest
null,
null,
new Period("PT10M"),
IndexGranularity.HOUR
IndexGranularity.HOUR,
null
);
final ObjectMapper jsonMapper = new DefaultObjectMapper();
......
......@@ -52,6 +52,7 @@ public class TestRealtimeTask extends RealtimeIndexTask
null,
null,
null,
null,
null
);
this.status = status;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册