提交 578ca64e 编写于 作者: G Gian Merlino

Merge pull request #711 from metamx/fix-maxtime-caching

Fix maxtime caching
......@@ -22,7 +22,6 @@ package io.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.data.input.MapBasedRow;
......@@ -80,7 +79,7 @@ public class TimewarpOperator<T> implements PostProcessingOperator<T>
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(Query<T> query)
public Sequence<T> run(final Query<T> query)
{
final long offset = computeOffset(now);
......@@ -103,12 +102,19 @@ public class TimewarpOperator<T> implements PostProcessingOperator<T>
Object value = res.getValue();
if (value instanceof TimeBoundaryResultValue) {
TimeBoundaryResultValue boundary = (TimeBoundaryResultValue) value;
value = new TimeBoundaryResultValue(
ImmutableMap.of(
TimeBoundaryQuery.MIN_TIME, boundary.getMinTime().minus(offset),
TimeBoundaryQuery.MAX_TIME, new DateTime(Math.min(boundary.getMaxTime().getMillis() - offset, now))
)
);
DateTime minTime = null;
try{
minTime = boundary.getMinTime();
} catch(IllegalArgumentException e) {}
final DateTime maxTime = boundary.getMaxTime();
return (T) ((TimeBoundaryQuery) query).buildResult(
new DateTime(Math.min(res.getTimestamp().getMillis() - offset, now)),
minTime != null ? minTime.minus(offset) : null,
maxTime != null ? new DateTime(Math.min(maxTime.getMillis() - offset, now)) : null
).iterator().next();
}
return (T) new Result(res.getTimestamp().minus(offset), value);
} else if (input instanceof MapBasedRow) {
......
......@@ -145,9 +145,10 @@ public class TimeBoundaryQueryQueryToolChest
@Override
public byte[] computeCacheKey(TimeBoundaryQuery query)
{
return ByteBuffer.allocate(2)
final byte[] cacheKey = query.getCacheKey();
return ByteBuffer.allocate(1 + cacheKey.length)
.put(TIMEBOUNDARY_QUERY)
.put(query.getCacheKey())
.put(cacheKey)
.array();
}
......
......@@ -26,6 +26,7 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.timeboundary.TimeBoundaryResultValue;
import io.druid.query.timeseries.TimeseriesResultValue;
import org.joda.time.DateTime;
import org.joda.time.Interval;
......@@ -122,5 +123,46 @@ public class TimewarpOperatorTest
Sequences.toList(queryRunner.run(query), Lists.<Result<TimeseriesResultValue>>newArrayList())
);
TimewarpOperator<Result<TimeBoundaryResultValue>> timeBoundaryOperator = new TimewarpOperator<>(
new Interval(new DateTime("2014-01-01"), new DateTime("2014-01-15")),
new Period("P1W"),
new DateTime("2014-01-06") // align on Monday
);
QueryRunner<Result<TimeBoundaryResultValue>> timeBoundaryRunner = timeBoundaryOperator.postProcess(
new QueryRunner<Result<TimeBoundaryResultValue>>()
{
@Override
public Sequence<Result<TimeBoundaryResultValue>> run(Query<Result<TimeBoundaryResultValue>> query)
{
return Sequences.simple(
ImmutableList.of(
new Result<>(
new DateTime("2014-01-12"),
new TimeBoundaryResultValue(ImmutableMap.<String, Object>of("maxTime", new DateTime("2014-01-12")))
)
)
);
}
},
new DateTime("2014-08-02").getMillis()
);
final Query<Result<TimeBoundaryResultValue>> timeBoundaryQuery =
Druids.newTimeBoundaryQueryBuilder()
.dataSource("dummy")
.build();
Assert.assertEquals(
Lists.newArrayList(
new Result<>(
new DateTime("2014-08-02"),
new TimeBoundaryResultValue(ImmutableMap.<String, Object>of("maxTime", new DateTime("2014-08-02")))
)
),
Sequences.toList(timeBoundaryRunner.run(timeBoundaryQuery), Lists.<Result<TimeBoundaryResultValue>>newArrayList())
);
}
}
......@@ -929,6 +929,48 @@ public class CachingClusteredClientTest
new Interval("2011-01-01/2011-01-10"),
makeTimeBoundaryResult(new DateTime("2011-01-05T01"), new DateTime("2011-01-05T01"), new DateTime("2011-01-10"))
);
testQueryCaching(
client,
Druids.newTimeBoundaryQueryBuilder()
.dataSource(CachingClusteredClientTest.DATA_SOURCE)
.intervals(CachingClusteredClientTest.SEG_SPEC)
.context(CachingClusteredClientTest.CONTEXT)
.bound(TimeBoundaryQuery.MAX_TIME)
.build(),
new Interval("2011-01-01/2011-01-02"),
makeTimeBoundaryResult(new DateTime("2011-01-01"), null, new DateTime("2011-01-02")),
new Interval("2011-01-01/2011-01-03"),
makeTimeBoundaryResult(new DateTime("2011-01-02"), null, new DateTime("2011-01-03")),
new Interval("2011-01-01/2011-01-10"),
makeTimeBoundaryResult(new DateTime("2011-01-05"), null, new DateTime("2011-01-10")),
new Interval("2011-01-01/2011-01-10"),
makeTimeBoundaryResult(new DateTime("2011-01-05T01"), null, new DateTime("2011-01-10"))
);
testQueryCaching(
client,
Druids.newTimeBoundaryQueryBuilder()
.dataSource(CachingClusteredClientTest.DATA_SOURCE)
.intervals(CachingClusteredClientTest.SEG_SPEC)
.context(CachingClusteredClientTest.CONTEXT)
.bound(TimeBoundaryQuery.MIN_TIME)
.build(),
new Interval("2011-01-01/2011-01-02"),
makeTimeBoundaryResult(new DateTime("2011-01-01"), new DateTime("2011-01-01"), null),
new Interval("2011-01-01/2011-01-03"),
makeTimeBoundaryResult(new DateTime("2011-01-02"), new DateTime("2011-01-02"), null),
new Interval("2011-01-01/2011-01-10"),
makeTimeBoundaryResult(new DateTime("2011-01-05"), new DateTime("2011-01-05"), null),
new Interval("2011-01-01/2011-01-10"),
makeTimeBoundaryResult(new DateTime("2011-01-05T01"), new DateTime("2011-01-05T01"), null)
);
}
private Iterable<Result<TimeBoundaryResultValue>> makeTimeBoundaryResult(
......@@ -937,17 +979,30 @@ public class CachingClusteredClientTest
DateTime maxTime
)
{
final Object value;
if (minTime != null && maxTime != null) {
value = ImmutableMap.of(
TimeBoundaryQuery.MIN_TIME,
minTime.toString(),
TimeBoundaryQuery.MAX_TIME,
maxTime.toString()
);
} else if (maxTime != null) {
value = ImmutableMap.of(
TimeBoundaryQuery.MAX_TIME,
maxTime.toString()
);
} else {
value = ImmutableMap.of(
TimeBoundaryQuery.MIN_TIME,
minTime.toString()
);
}
return Arrays.asList(
new Result<>(
timestamp,
new TimeBoundaryResultValue(
ImmutableMap.of(
TimeBoundaryQuery.MIN_TIME,
minTime.toString(),
TimeBoundaryQuery.MAX_TIME,
maxTime.toString()
)
)
new TimeBoundaryResultValue(value)
)
);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册