提交 d17abeda 编写于 作者: X xvrl

Merge pull request #499 from metamx/fix-pull-from-cache

Fix pull from cache
...@@ -265,8 +265,12 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser ...@@ -265,8 +265,12 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
final Map<String, Object> values = Maps.newHashMap(); final Map<String, Object> values = Maps.newHashMap();
final TimeseriesResultValue holder = result.getValue(); final TimeseriesResultValue holder = result.getValue();
if (calculatePostAggs) { if (calculatePostAggs) {
// put non finalized aggregators for calculating dependent post Aggregators
for (AggregatorFactory agg : query.getAggregatorSpecs()) {
values.put(agg.getName(), holder.getMetric(agg.getName()));
}
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) { for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
values.put(postAgg.getName(), postAgg.compute(holder.getBaseObject())); values.put(postAgg.getName(), postAgg.compute(values));
} }
} }
for (AggregatorFactory agg : query.getAggregatorSpecs()) { for (AggregatorFactory agg : query.getAggregatorSpecs()) {
......
...@@ -90,19 +90,7 @@ public class TopNBinaryFn implements BinaryFn<Result<TopNResultValue>, Result<To ...@@ -90,19 +90,7 @@ public class TopNBinaryFn implements BinaryFn<Result<TopNResultValue>, Result<To
TopNResultValue arg2Vals = arg2.getValue(); TopNResultValue arg2Vals = arg2.getValue();
for (DimensionAndMetricValueExtractor arg1Val : arg1Vals) { for (DimensionAndMetricValueExtractor arg1Val : arg1Vals) {
final String dimensionValue = arg1Val.getStringDimensionValue(dimension); retVals.put(arg1Val.getStringDimensionValue(dimension), arg1Val);
Map<String, Object> retVal = new LinkedHashMap<String, Object>(aggregations.size() + 2);
retVal.put(dimension, dimensionValue);
for (AggregatorFactory factory : aggregations) {
final String metricName = factory.getName();
retVal.put(metricName, arg1Val.getMetric(metricName));
}
for (PostAggregator postAgg : postAggregations) {
retVal.put(postAgg.getName(), postAgg.compute(retVal));
}
retVals.put(dimensionValue, new DimensionAndMetricValueExtractor(retVal));
} }
for (DimensionAndMetricValueExtractor arg2Val : arg2Vals) { for (DimensionAndMetricValueExtractor arg2Val : arg2Vals) {
final String dimensionValue = arg2Val.getStringDimensionValue(dimension); final String dimensionValue = arg2Val.getStringDimensionValue(dimension);
...@@ -124,18 +112,7 @@ public class TopNBinaryFn implements BinaryFn<Result<TopNResultValue>, Result<To ...@@ -124,18 +112,7 @@ public class TopNBinaryFn implements BinaryFn<Result<TopNResultValue>, Result<To
retVals.put(dimensionValue, new DimensionAndMetricValueExtractor(retVal)); retVals.put(dimensionValue, new DimensionAndMetricValueExtractor(retVal));
} else { } else {
Map<String, Object> retVal = new LinkedHashMap<String, Object>(aggregations.size() + 2); retVals.put(dimensionValue, arg2Val);
retVal.put(dimension, dimensionValue);
for (AggregatorFactory factory : aggregations) {
final String metricName = factory.getName();
retVal.put(metricName, arg2Val.getMetric(metricName));
}
for (PostAggregator postAgg : postAggregations) {
retVal.put(postAgg.getName(), postAgg.compute(retVal));
}
retVals.put(dimensionValue, new DimensionAndMetricValueExtractor(retVal));
} }
} }
......
...@@ -208,13 +208,17 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal ...@@ -208,13 +208,17 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
public Map<String, Object> apply(DimensionAndMetricValueExtractor input) public Map<String, Object> apply(DimensionAndMetricValueExtractor input)
{ {
final Map<String, Object> values = Maps.newHashMap(); final Map<String, Object> values = Maps.newHashMap();
// compute all post aggs // put non finalized aggregators for calculating dependent post Aggregators
for (AggregatorFactory agg : query.getAggregatorSpecs()) {
values.put(agg.getName(), input.getMetric(agg.getName()));
}
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) { for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
Object calculatedPostAgg = input.getMetric(postAgg.getName()); Object calculatedPostAgg = input.getMetric(postAgg.getName());
if (calculatedPostAgg != null) { if (calculatedPostAgg != null) {
values.put(postAgg.getName(), calculatedPostAgg); values.put(postAgg.getName(), calculatedPostAgg);
} else { } else {
values.put(postAgg.getName(), postAgg.compute(input.getBaseObject())); values.put(postAgg.getName(), postAgg.compute(values));
} }
} }
for (AggregatorFactory agg : query.getAggregatorSpecs()) { for (AggregatorFactory agg : query.getAggregatorSpecs()) {
...@@ -249,6 +253,11 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal ...@@ -249,6 +253,11 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
return new CacheStrategy<Result<TopNResultValue>, Object, TopNQuery>() return new CacheStrategy<Result<TopNResultValue>, Object, TopNQuery>()
{ {
private final List<AggregatorFactory> aggs = query.getAggregatorSpecs(); private final List<AggregatorFactory> aggs = query.getAggregatorSpecs();
private final List<PostAggregator> postAggs = AggregatorUtil.pruneDependentPostAgg(
query.getPostAggregatorSpecs(),
query.getTopNMetricSpec()
.getMetricName(query.getDimensionSpec())
);
@Override @Override
public byte[] computeCacheKey(TopNQuery query) public byte[] computeCacheKey(TopNQuery query)
...@@ -338,6 +347,10 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal ...@@ -338,6 +347,10 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
vals.put(factory.getName(), factory.deserialize(resultIter.next())); vals.put(factory.getName(), factory.deserialize(resultIter.next()));
} }
for (PostAggregator postAgg : postAggs) {
vals.put(postAgg.getName(), postAgg.compute(vals));
}
retVal.add(vals); retVal.add(vals);
} }
......
...@@ -294,17 +294,20 @@ public class TopNBinaryFnTest ...@@ -294,17 +294,20 @@ public class TopNBinaryFnTest
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"rows", 1L, "rows", 1L,
"index", 2L, "index", 2L,
"testdim", "1" "testdim", "1",
"addrowsindexconstant", 3.0
), ),
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"rows", 2L, "rows", 2L,
"index", 4L, "index", 4L,
"testdim", "2" "testdim", "2",
"addrowsindexconstant", 7.0
), ),
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"rows", 0L, "rows", 0L,
"index", 2L, "index", 2L,
"testdim", "3" "testdim", "3",
"addrowsindexconstant", 3.0
) )
) )
) )
...@@ -316,17 +319,20 @@ public class TopNBinaryFnTest ...@@ -316,17 +319,20 @@ public class TopNBinaryFnTest
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"rows", 2L, "rows", 2L,
"index", 3L, "index", 3L,
"testdim", "1" "testdim", "1",
"addrowsindexconstant", 6.0
), ),
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"rows", 2L, "rows", 2L,
"index", 0L, "index", 0L,
"testdim", "2" "testdim", "2",
"addrowsindexconstant", 3.0
), ),
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"rows", 4L, "rows", 4L,
"index", 5L, "index", 5L,
"testdim", "other" "testdim", "other",
"addrowsindexconstant", 10.0
) )
) )
) )
...@@ -434,7 +440,8 @@ public class TopNBinaryFnTest ...@@ -434,7 +440,8 @@ public class TopNBinaryFnTest
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"testdim", "2", "testdim", "2",
"rows", 4L, "rows", 4L,
"index", 4L ) "index", 4L
)
) )
) )
); );
......
...@@ -61,6 +61,7 @@ import io.druid.query.aggregation.CountAggregatorFactory; ...@@ -61,6 +61,7 @@ import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.ArithmeticPostAggregator; import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.ConstantPostAggregator;
import io.druid.query.aggregation.post.FieldAccessPostAggregator; import io.druid.query.aggregation.post.FieldAccessPostAggregator;
import io.druid.query.filter.DimFilter; import io.druid.query.filter.DimFilter;
import io.druid.query.search.SearchQueryQueryToolChest; import io.druid.query.search.SearchQueryQueryToolChest;
...@@ -145,6 +146,22 @@ public class CachingClusteredClientTest ...@@ -145,6 +146,22 @@ public class CachingClusteredClientTest
new FieldAccessPostAggregator("imps", "imps"), new FieldAccessPostAggregator("imps", "imps"),
new FieldAccessPostAggregator("rows", "rows") new FieldAccessPostAggregator("rows", "rows")
) )
),
new ArithmeticPostAggregator(
"avg_imps_per_row_double",
"*",
Arrays.<PostAggregator>asList(
new FieldAccessPostAggregator("avg_imps_per_row", "avg_imps_per_row"),
new ConstantPostAggregator("constant", 2, 2 )
)
),
new ArithmeticPostAggregator(
"avg_imps_per_row_half",
"/",
Arrays.<PostAggregator>asList(
new FieldAccessPostAggregator("avg_imps_per_row", "avg_imps_per_row"),
new ConstantPostAggregator("constant", 2, 2 )
)
) )
); );
private static final List<AggregatorFactory> RENAMED_AGGS = Arrays.asList( private static final List<AggregatorFactory> RENAMED_AGGS = Arrays.asList(
...@@ -412,7 +429,7 @@ public class CachingClusteredClientTest ...@@ -412,7 +429,7 @@ public class CachingClusteredClientTest
new DateTime("2011-01-06"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, new DateTime("2011-01-06"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
new DateTime("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, new DateTime("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
new DateTime("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, new DateTime("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986,
new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983 new DateTime("2011-01-09"), "c1", 50, 4985, "b", 50, 4984, "c", 50, 4983
), ),
new Interval("2011-01-05/2011-01-10"), new Interval("2011-01-05/2011-01-10"),
...@@ -421,7 +438,7 @@ public class CachingClusteredClientTest ...@@ -421,7 +438,7 @@ public class CachingClusteredClientTest
new DateTime("2011-01-06T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, new DateTime("2011-01-06T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
new DateTime("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, new DateTime("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
new DateTime("2011-01-08T01"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, new DateTime("2011-01-08T01"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986,
new DateTime("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983 new DateTime("2011-01-09T01"), "c2", 50, 4985, "b", 50, 4984, "c", 50, 4983
) )
); );
...@@ -437,8 +454,8 @@ public class CachingClusteredClientTest ...@@ -437,8 +454,8 @@ public class CachingClusteredClientTest
new DateTime("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, new DateTime("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
new DateTime("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, new DateTime("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986,
new DateTime("2011-01-08T01"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, new DateTime("2011-01-08T01"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986,
new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983, new DateTime("2011-01-09"), "c1", 50, 4985, "b", 50, 4984, "c", 50, 4983,
new DateTime("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983 new DateTime("2011-01-09T01"), "c2", 50, 4985, "b", 50, 4984, "c", 50, 4983
), ),
runner.run( runner.run(
builder.intervals("2011-01-01/2011-01-10") builder.intervals("2011-01-01/2011-01-10")
...@@ -567,6 +584,73 @@ public class CachingClusteredClientTest ...@@ -567,6 +584,73 @@ public class CachingClusteredClientTest
); );
} }
@Test
public void testTopNOnPostAggMetricCaching() {
final TopNQueryBuilder builder = new TopNQueryBuilder()
.dataSource(DATA_SOURCE)
.dimension(TOP_DIM)
.metric("avg_imps_per_row_double")
.threshold(3)
.intervals(SEG_SPEC)
.filters(DIM_FILTER)
.granularity(GRANULARITY)
.aggregators(AGGS)
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig()));
testQueryCaching(
runner,
builder.build(),
new Interval("2011-01-01/2011-01-02"),
makeTopNResults(),
new Interval("2011-01-02/2011-01-03"),
makeTopNResults(),
new Interval("2011-01-05/2011-01-10"),
makeTopNResults(
new DateTime("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992,
new DateTime("2011-01-06"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
new DateTime("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
new DateTime("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986,
new DateTime("2011-01-09"), "c1", 50, 4985, "b", 50, 4984, "c", 50, 4983
),
new Interval("2011-01-05/2011-01-10"),
makeTopNResults(
new DateTime("2011-01-05T01"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992,
new DateTime("2011-01-06T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
new DateTime("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
new DateTime("2011-01-08T01"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986,
new DateTime("2011-01-09T01"), "c2", 50, 4985, "b", 50, 4984, "c", 50, 4983
)
);
TestHelper.assertExpectedResults(
makeTopNResults(
new DateTime("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992,
new DateTime("2011-01-05T01"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992,
new DateTime("2011-01-06"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
new DateTime("2011-01-06T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
new DateTime("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
new DateTime("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
new DateTime("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986,
new DateTime("2011-01-08T01"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986,
new DateTime("2011-01-09"), "c1", 50, 4985, "b", 50, 4984, "c", 50, 4983,
new DateTime("2011-01-09T01"), "c2", 50, 4985, "b", 50, 4984, "c", 50, 4983
),
runner.run(
builder.intervals("2011-01-01/2011-01-10")
.metric("avg_imps_per_row_double")
.aggregators(AGGS)
.postAggregators(POST_AGGS)
.build()
)
);
}
@Test @Test
public void testSearchCaching() throws Exception public void testSearchCaching() throws Exception
{ {
...@@ -1007,20 +1091,22 @@ public class CachingClusteredClientTest ...@@ -1007,20 +1091,22 @@ public class CachingClusteredClientTest
List<Result<TimeseriesResultValue>> retVal = Lists.newArrayListWithCapacity(objects.length / 3); List<Result<TimeseriesResultValue>> retVal = Lists.newArrayListWithCapacity(objects.length / 3);
for (int i = 0; i < objects.length; i += 3) { for (int i = 0; i < objects.length; i += 3) {
double avg_impr = ((Number) objects[i + 2]).doubleValue() / ((Number) objects[i + 1]).doubleValue();
retVal.add( retVal.add(
new Result<>( new Result<>(
(DateTime) objects[i], (DateTime) objects[i],
new TimeseriesResultValue( new TimeseriesResultValue(
ImmutableMap.of( ImmutableMap.<String, Object>builder()
"rows", objects[i + 1], .put("rows", objects[i + 1])
"imps", objects[i + 2], .put("imps", objects[i + 2])
"impers", objects[i + 2], .put("impers", objects[i + 2])
"avg_imps_per_row", .put("avg_imps_per_row",avg_impr)
((Number) objects[i + 2]).doubleValue() / ((Number) objects[i + 1]).doubleValue() .put("avg_imps_per_row_half",avg_impr / 2)
.put("avg_imps_per_row_double",avg_impr * 2)
.build()
) )
) )
) );
);
} }
return retVal; return retVal;
} }
...@@ -1099,13 +1185,15 @@ public class CachingClusteredClientTest ...@@ -1099,13 +1185,15 @@ public class CachingClusteredClientTest
final double imps = ((Number) objects[index + 2]).doubleValue(); final double imps = ((Number) objects[index + 2]).doubleValue();
final double rows = ((Number) objects[index + 1]).doubleValue(); final double rows = ((Number) objects[index + 1]).doubleValue();
values.add( values.add(
ImmutableMap.of( ImmutableMap.<String, Object>builder()
TOP_DIM, objects[index], .put(TOP_DIM, objects[index])
"rows", rows, .put("rows", rows)
"imps", imps, .put("imps", imps)
"impers", imps, .put("impers", imps)
"avg_imps_per_row", imps / rows .put("avg_imps_per_row", imps / rows)
) .put("avg_imps_per_row_double", ((imps * 2) / rows))
.put("avg_imps_per_row_half", (imps / (rows * 2)))
.build()
); );
index += 3; index += 3;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册