提交 0ddf667f 编写于 作者: X xvrl

Merge pull request #705 from metamx/fix-timeseries-zero-filling

Fix extraneous timeseries zero filling at end of data interval
...@@ -138,7 +138,10 @@ public class QueryableIndexStorageAdapter implements StorageAdapter ...@@ -138,7 +138,10 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
{ {
Interval actualInterval = interval; Interval actualInterval = interval;
final Interval dataInterval = new Interval(getMinTime().getMillis(), gran.next(getMaxTime().getMillis())); final Interval dataInterval = new Interval(
getMinTime().getMillis(),
gran.next(gran.truncate(getMaxTime().getMillis()))
);
if (!actualInterval.overlaps(dataInterval)) { if (!actualInterval.overlaps(dataInterval)) {
return Sequences.empty(); return Sequences.empty();
......
...@@ -133,8 +133,11 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter ...@@ -133,8 +133,11 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
Interval actualIntervalTmp = interval; Interval actualIntervalTmp = interval;
final Interval dataInterval = new Interval(
getMinTime().getMillis(),
gran.next(gran.truncate(getMaxTime().getMillis()))
);
final Interval dataInterval = new Interval(getMinTime().getMillis(), gran.next(getMaxTime().getMillis()));
if (!actualIntervalTmp.overlaps(dataInterval)) { if (!actualIntervalTmp.overlaps(dataInterval)) {
return Sequences.empty(); return Sequences.empty();
} }
......
...@@ -21,7 +21,9 @@ package io.druid.query.timeseries; ...@@ -21,7 +21,9 @@ package io.druid.query.timeseries;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.metamx.common.Granularity;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import io.druid.granularity.PeriodGranularity; import io.druid.granularity.PeriodGranularity;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
...@@ -465,6 +467,121 @@ public class TimeseriesQueryRunnerTest ...@@ -465,6 +467,121 @@ public class TimeseriesQueryRunnerTest
TestHelper.assertExpectedResults(expectedResults1, results1); TestHelper.assertExpectedResults(expectedResults1, results1);
} }
@Test
public void testTimeseriesQueryZeroFilling()
{
TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.filters(QueryRunnerTestHelper.providerDimension, "spot", "upfront", "total_market")
.granularity(QueryGranularity.HOUR)
.intervals(
Arrays.asList(
new Interval(
"2011-04-14T00:00:00.000Z/2011-05-01T00:00:00.000Z"
)
)
)
.aggregators(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory(
"idx",
"index"
)
)
)
.build();
List<Result<TimeseriesResultValue>> lotsOfZeroes = Lists.newArrayList();
for (final Long millis : QueryGranularity.HOUR.iterable(
new DateTime("2011-04-14T01").getMillis(),
new DateTime("2011-04-15").getMillis()
)) {
lotsOfZeroes.add(
new Result<>(
new DateTime(millis),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 0L, "idx", 0L)
)
)
);
}
List<Result<TimeseriesResultValue>> expectedResults1 = Lists.newArrayList(
Iterables.concat(
Arrays.asList(
new Result<>(
new DateTime("2011-04-14T00"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 13L, "idx", 4907L)
)
)
),
lotsOfZeroes,
Arrays.asList(
new Result<>(
new DateTime("2011-04-15T00"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 13L, "idx", 4717L)
)
)
)
)
);
Iterable<Result<TimeseriesResultValue>> results1 = Sequences.toList(
runner.run(query1),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults1, results1);
}
@Test
public void testTimeseriesQueryGranularityNotAlignedWithRollupGranularity()
{
TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.filters(QueryRunnerTestHelper.providerDimension, "spot", "upfront", "total_market")
.granularity(
new PeriodGranularity(
new Period("PT1H"),
new DateTime(60000),
DateTimeZone.UTC
)
)
.intervals(
Arrays.asList(
new Interval(
"2011-04-15T00:00:00.000Z/2012"
)
)
)
.aggregators(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory(
"idx",
"index"
)
)
)
.build();
List<Result<TimeseriesResultValue>> expectedResults1 = Arrays.asList(
new Result<TimeseriesResultValue>(
new DateTime("2011-04-14T23:01Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("rows", 13L, "idx", 4717L)
)
)
);
Iterable<Result<TimeseriesResultValue>> results1 = Sequences.toList(
runner.run(query1),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
TestHelper.assertExpectedResults(expectedResults1, results1);
}
@Test @Test
public void testTimeseriesWithVaryingGranWithFilter() public void testTimeseriesWithVaryingGranWithFilter()
{ {
......
...@@ -35,7 +35,6 @@ import io.druid.query.aggregation.AggregatorFactory; ...@@ -35,7 +35,6 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.serde.ComplexMetrics; import io.druid.segment.serde.ComplexMetrics;
import org.joda.time.DateTime; import org.joda.time.DateTime;
...@@ -69,7 +68,7 @@ public class TestIndex ...@@ -69,7 +68,7 @@ public class TestIndex
}; };
public static final String[] DIMENSIONS = new String[]{"provider", "quALIty", "plAcEmEnT", "pLacementish"}; public static final String[] DIMENSIONS = new String[]{"provider", "quALIty", "plAcEmEnT", "pLacementish"};
public static final String[] METRICS = new String[]{"iNdEx"}; public static final String[] METRICS = new String[]{"iNdEx"};
private static final Interval DATA_INTERVAL = new Interval("2011-01-12T00:00:00.000Z/2011-04-16T00:00:00.000Z"); private static final Interval DATA_INTERVAL = new Interval("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z");
private static final AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{ private static final AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{
new DoubleSumAggregatorFactory(METRICS[0], METRICS[0]), new DoubleSumAggregatorFactory(METRICS[0], METRICS[0]),
new HyperUniquesAggregatorFactory("quality_uniques", "quality") new HyperUniquesAggregatorFactory("quality_uniques", "quality")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册