diff --git a/indexing-service/src/main/java/io/druid/guice/IndexingServiceFirehoseModule.java b/indexing-service/src/main/java/io/druid/guice/IndexingServiceFirehoseModule.java index 55f94147b676c22ed932aba4509fc6a08a00baeb..988ee329d001a52661afed08927faff9e459f856 100644 --- a/indexing-service/src/main/java/io/druid/guice/IndexingServiceFirehoseModule.java +++ b/indexing-service/src/main/java/io/druid/guice/IndexingServiceFirehoseModule.java @@ -24,7 +24,7 @@ import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; -import io.druid.indexing.firehose.DruidFirehoseFactory; +import io.druid.indexing.firehose.IngestSegmentFirehoseFactory; import io.druid.initialization.DruidModule; import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory; @@ -39,7 +39,7 @@ public class IndexingServiceFirehoseModule implements DruidModule new SimpleModule("IndexingServiceFirehoseModule") .registerSubtypes( new NamedType(EventReceiverFirehoseFactory.class, "receiver"), - new NamedType(DruidFirehoseFactory.class, "druid") + new NamedType(IngestSegmentFirehoseFactory.class, "ingestSegment") ) ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/firehose/DruidFirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java similarity index 90% rename from indexing-service/src/main/java/io/druid/indexing/firehose/DruidFirehoseFactory.java rename to indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 215ddd937e473477c5c4da72b575d88bca153808..f8f603b6c23aea41100e287a143783dc6dfe25bf 100644 --- a/indexing-service/src/main/java/io/druid/indexing/firehose/DruidFirehoseFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -61,6 +61,7 @@ import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; +import io.druid.utils.Runnables; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -72,9 +73,9 @@ import java.util.List; import java.util.Map; import java.util.Set; -public class DruidFirehoseFactory implements FirehoseFactory +public class IngestSegmentFirehoseFactory implements FirehoseFactory { - private static final EmittingLogger log = new EmittingLogger(DruidFirehoseFactory.class); + private static final EmittingLogger log = new EmittingLogger(IngestSegmentFirehoseFactory.class); private final String dataSource; private final Interval interval; private final DimFilter dimFilter; @@ -83,7 +84,7 @@ public class DruidFirehoseFactory implements FirehoseFactory private final Injector injector; @JsonCreator - public DruidFirehoseFactory( + public IngestSegmentFirehoseFactory( @JsonProperty("dataSource") final String dataSource, @JsonProperty("interval") Interval interval, @JsonProperty("filter") DimFilter dimFilter, @@ -155,6 +156,9 @@ public class DruidFirehoseFactory implements FirehoseFactory VersionedIntervalTimeline timeline = new VersionedIntervalTimeline( Ordering.natural().nullsFirst() ); + final List> timeLineSegments = timeline.lookup( + interval + ); for (DataSegment segment : usedSegments) { timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)); } @@ -163,8 +167,8 @@ public class DruidFirehoseFactory implements FirehoseFactory dims = dimensions; } else { Set dimSet = new HashSet<>(); - for (DataSegment segment : usedSegments) { - dimSet.addAll(segment.getDimensions()); + for (TimelineObjectHolder timelineObjectHolder : timeLineSegments) { + dimSet.addAll(timelineObjectHolder.getObject().getChunk(0).getObject().getDimensions()); } dims = Lists.newArrayList(dimSet); } @@ -174,15 +178,15 @@ public class DruidFirehoseFactory implements FirehoseFactory metricsList = metrics; } else { Set metricsSet = new HashSet<>(); - for (DataSegment segment : usedSegments) { - metricsSet.addAll(segment.getMetrics()); + for (TimelineObjectHolder timelineObjectHolder : timeLineSegments) { + metricsSet.addAll(timelineObjectHolder.getObject().getChunk(0).getObject().getDimensions()); } metricsList = Lists.newArrayList(metricsSet); } final List adapters = Lists.transform( - timeline.lookup(new Interval("1000-01-01/3000-01-01")), + timeLineSegments, new Function, StorageAdapter>() { @Override @@ -234,11 +238,11 @@ public class DruidFirehoseFactory implements FirehoseFactory { @Nullable @Override - public Sequence apply(@Nullable StorageAdapter input) + public Sequence apply(@Nullable StorageAdapter adapter) { return Sequences.concat( Sequences.map( - input.makeCursors( + adapter.makeCursors( Filters.convertDimensionFilters(dimFilter), interval, QueryGranularity.ALL @@ -246,24 +250,24 @@ public class DruidFirehoseFactory implements FirehoseFactory { @Nullable @Override - public Sequence apply(@Nullable Cursor input) + public Sequence apply(@Nullable Cursor cursor) { - TimestampColumnSelector timestampColumnSelector = input.makeTimestampColumnSelector(); + TimestampColumnSelector timestampColumnSelector = cursor.makeTimestampColumnSelector(); Map dimSelectors = Maps.newHashMap(); for (String dim : dims) { - final DimensionSelector dimSelector = input.makeDimensionSelector(dim); + final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim); dimSelectors.put(dim, dimSelector); } Map metSelectors = Maps.newHashMap(); for (String metric : metrics) { - final ObjectColumnSelector metricSelector = input.makeObjectColumnSelector(metric); + final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric); metSelectors.put(metric, metricSelector); } List rowList = Lists.newArrayList(); - while (!input.isDone()) { + while (!cursor.isDone()) { final Map theEvent = Maps.newLinkedHashMap(); final long timestamp = timestampColumnSelector.getTimestamp(); theEvent.put(EventHolder.timestampKey, new DateTime(timestamp)); @@ -291,7 +295,7 @@ public class DruidFirehoseFactory implements FirehoseFactory theEvent.put(metric, selector.get()); } rowList.add(new MapBasedInputRow(timestamp, dims, theEvent)); - input.advance(); + cursor.advance(); } return Sequences.simple(rowList); } @@ -333,14 +337,7 @@ public class DruidFirehoseFactory implements FirehoseFactory @Override public Runnable commit() { - return new Runnable() - { - @Override - public void run() - { - // Nothing to do. - } - }; + return Runnables.getNoopRunnable(); } @Override