提交 7168adcc 编写于 作者: N nishantmonu51

Use Ingest task instead of Noop Task

上级 a1a5e425
...@@ -42,10 +42,12 @@ import io.druid.data.input.InputRow; ...@@ -42,10 +42,12 @@ import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.InputRowParser; import io.druid.data.input.impl.InputRowParser;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.TaskToolboxFactory; import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.actions.SegmentListUsedAction; import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.task.AbstractTask;
import io.druid.query.filter.DimFilter; import io.druid.query.filter.DimFilter;
import io.druid.query.select.EventHolder; import io.druid.query.select.EventHolder;
import io.druid.segment.Cursor; import io.druid.segment.Cursor;
...@@ -137,15 +139,9 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar ...@@ -137,15 +139,9 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
public Firehose connect(InputRowParser inputRowParser) throws IOException, ParseException public Firehose connect(InputRowParser inputRowParser) throws IOException, ParseException
{ {
log.info("Connecting firehose: DruidFirehose[%s,%s]", dataSource, interval); log.info("Connecting firehose: DruidFirehose[%s,%s]", dataSource, interval);
// TODO: have a way to pass the toolbox to Firehose, The instance is initialized Lazily on connect method. // better way to achieve this is to pass toolbox to Firehose, The instance is initialized Lazily on connect method.
final TaskToolbox toolbox = injector.getInstance(TaskToolboxFactory.class).build( final TaskToolbox toolbox = injector.getInstance(TaskToolboxFactory.class).build(
new NoopTask( new IngestTask("Ingest-Task-Id", dataSource)
"druid-firehose",
0,
0,
null,
null
)
); );
try { try {
...@@ -226,6 +222,33 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar ...@@ -226,6 +222,33 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
return null; return null;
} }
private static class IngestTask extends AbstractTask
{
protected IngestTask(String id, String dataSource)
{
super(id, dataSource);
}
@Override
public String getType()
{
return "Ingest-Task";
}
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
return true;
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
return TaskStatus.success(getId());
}
}
public class DruidFirehose implements Firehose public class DruidFirehose implements Firehose
{ {
private volatile Yielder<InputRow> rowYielder; private volatile Yielder<InputRow> rowYielder;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册