diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 6fe55153a59dd310598ecf6342009fcf7478b559..482e92adc3e5f49f22dacf2e8ee560c5251234dd 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -265,8 +265,12 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest values = Maps.newHashMap(); final TimeseriesResultValue holder = result.getValue(); if (calculatePostAggs) { + // put non finalized aggregators for calculating dependent post Aggregators + for (AggregatorFactory agg : query.getAggregatorSpecs()) { + values.put(agg.getName(), holder.getMetric(agg.getName())); + } for (PostAggregator postAgg : query.getPostAggregatorSpecs()) { - values.put(postAgg.getName(), postAgg.compute(holder.getBaseObject())); + values.put(postAgg.getName(), postAgg.compute(values)); } } for (AggregatorFactory agg : query.getAggregatorSpecs()) { diff --git a/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java b/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java index e2a928465178b67a8fe5add0954d5f18b8b594b7..4c02da447aae7eb67c7b4d435de97c27d062eb0c 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java +++ b/processing/src/main/java/io/druid/query/topn/TopNBinaryFn.java @@ -90,19 +90,7 @@ public class TopNBinaryFn implements BinaryFn, Result retVal = new LinkedHashMap(aggregations.size() + 2); - retVal.put(dimension, dimensionValue); - - for (AggregatorFactory factory : aggregations) { - final String metricName = factory.getName(); - retVal.put(metricName, arg1Val.getMetric(metricName)); - } - for (PostAggregator postAgg : postAggregations) { - retVal.put(postAgg.getName(), postAgg.compute(retVal)); - } - - retVals.put(dimensionValue, new DimensionAndMetricValueExtractor(retVal)); + retVals.put(arg1Val.getStringDimensionValue(dimension), arg1Val); } for (DimensionAndMetricValueExtractor arg2Val : arg2Vals) { final String dimensionValue = arg2Val.getStringDimensionValue(dimension); @@ -124,18 +112,7 @@ public class TopNBinaryFn implements BinaryFn, Result retVal = new LinkedHashMap(aggregations.size() + 2); - retVal.put(dimension, dimensionValue); - - for (AggregatorFactory factory : aggregations) { - final String metricName = factory.getName(); - retVal.put(metricName, arg2Val.getMetric(metricName)); - } - for (PostAggregator postAgg : postAggregations) { - retVal.put(postAgg.getName(), postAgg.compute(retVal)); - } - - retVals.put(dimensionValue, new DimensionAndMetricValueExtractor(retVal)); + retVals.put(dimensionValue, arg2Val); } } diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index f290fc29e8ab1695a662e820eeb096392514fa6b..5db416f1d0b9ada4842db1041b253e5942764d1b 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -208,13 +208,17 @@ public class TopNQueryQueryToolChest extends QueryToolChest apply(DimensionAndMetricValueExtractor input) { final Map values = Maps.newHashMap(); - // compute all post aggs + // put non finalized aggregators for calculating dependent post Aggregators + for (AggregatorFactory agg : query.getAggregatorSpecs()) { + values.put(agg.getName(), input.getMetric(agg.getName())); + } + for (PostAggregator postAgg : query.getPostAggregatorSpecs()) { Object calculatedPostAgg = input.getMetric(postAgg.getName()); if (calculatedPostAgg != null) { values.put(postAgg.getName(), calculatedPostAgg); } else { - values.put(postAgg.getName(), postAgg.compute(input.getBaseObject())); + values.put(postAgg.getName(), postAgg.compute(values)); } } for (AggregatorFactory agg : query.getAggregatorSpecs()) { @@ -249,6 +253,11 @@ public class TopNQueryQueryToolChest extends QueryToolChest, Object, TopNQuery>() { private final List aggs = query.getAggregatorSpecs(); + private final List postAggs = AggregatorUtil.pruneDependentPostAgg( + query.getPostAggregatorSpecs(), + query.getTopNMetricSpec() + .getMetricName(query.getDimensionSpec()) + ); @Override public byte[] computeCacheKey(TopNQuery query) @@ -338,6 +347,10 @@ public class TopNQueryQueryToolChest extends QueryToolChestof( "rows", 1L, "index", 2L, - "testdim", "1" + "testdim", "1", + "addrowsindexconstant", 3.0 ), ImmutableMap.of( "rows", 2L, "index", 4L, - "testdim", "2" + "testdim", "2", + "addrowsindexconstant", 7.0 ), ImmutableMap.of( "rows", 0L, "index", 2L, - "testdim", "3" + "testdim", "3", + "addrowsindexconstant", 3.0 ) ) ) @@ -316,17 +319,20 @@ public class TopNBinaryFnTest ImmutableMap.of( "rows", 2L, "index", 3L, - "testdim", "1" + "testdim", "1", + "addrowsindexconstant", 6.0 ), ImmutableMap.of( "rows", 2L, "index", 0L, - "testdim", "2" + "testdim", "2", + "addrowsindexconstant", 3.0 ), ImmutableMap.of( "rows", 4L, "index", 5L, - "testdim", "other" + "testdim", "other", + "addrowsindexconstant", 10.0 ) ) ) @@ -434,7 +440,8 @@ public class TopNBinaryFnTest ImmutableMap.of( "testdim", "2", "rows", 4L, - "index", 4L ) + "index", 4L + ) ) ) ); diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index 964e87b8b529915b37f2475734cb282a99b0a2bf..582d0c648cc48b4e402d161992c0e528e489b5d2 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -61,6 +61,7 @@ import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.post.ArithmeticPostAggregator; +import io.druid.query.aggregation.post.ConstantPostAggregator; import io.druid.query.aggregation.post.FieldAccessPostAggregator; import io.druid.query.filter.DimFilter; import io.druid.query.search.SearchQueryQueryToolChest; @@ -145,6 +146,22 @@ public class CachingClusteredClientTest new FieldAccessPostAggregator("imps", "imps"), new FieldAccessPostAggregator("rows", "rows") ) + ), + new ArithmeticPostAggregator( + "avg_imps_per_row_double", + "*", + Arrays.asList( + new FieldAccessPostAggregator("avg_imps_per_row", "avg_imps_per_row"), + new ConstantPostAggregator("constant", 2, 2 ) + ) + ), + new ArithmeticPostAggregator( + "avg_imps_per_row_half", + "/", + Arrays.asList( + new FieldAccessPostAggregator("avg_imps_per_row", "avg_imps_per_row"), + new ConstantPostAggregator("constant", 2, 2 ) + ) ) ); private static final List RENAMED_AGGS = Arrays.asList( @@ -412,7 +429,7 @@ public class CachingClusteredClientTest new DateTime("2011-01-06"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, new DateTime("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, new DateTime("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, - new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983 + new DateTime("2011-01-09"), "c1", 50, 4985, "b", 50, 4984, "c", 50, 4983 ), new Interval("2011-01-05/2011-01-10"), @@ -421,7 +438,7 @@ public class CachingClusteredClientTest new DateTime("2011-01-06T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, new DateTime("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, new DateTime("2011-01-08T01"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, - new DateTime("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983 + new DateTime("2011-01-09T01"), "c2", 50, 4985, "b", 50, 4984, "c", 50, 4983 ) ); @@ -437,8 +454,8 @@ public class CachingClusteredClientTest new DateTime("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, new DateTime("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, new DateTime("2011-01-08T01"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, - new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983, - new DateTime("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983 + new DateTime("2011-01-09"), "c1", 50, 4985, "b", 50, 4984, "c", 50, 4983, + new DateTime("2011-01-09T01"), "c2", 50, 4985, "b", 50, 4984, "c", 50, 4983 ), runner.run( builder.intervals("2011-01-01/2011-01-10") @@ -567,6 +584,73 @@ public class CachingClusteredClientTest ); } + @Test + public void testTopNOnPostAggMetricCaching() { + final TopNQueryBuilder builder = new TopNQueryBuilder() + .dataSource(DATA_SOURCE) + .dimension(TOP_DIM) + .metric("avg_imps_per_row_double") + .threshold(3) + .intervals(SEG_SPEC) + .filters(DIM_FILTER) + .granularity(GRANULARITY) + .aggregators(AGGS) + .postAggregators(POST_AGGS) + .context(CONTEXT); + + QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig())); + testQueryCaching( + runner, + builder.build(), + new Interval("2011-01-01/2011-01-02"), + makeTopNResults(), + + new Interval("2011-01-02/2011-01-03"), + makeTopNResults(), + + new Interval("2011-01-05/2011-01-10"), + makeTopNResults( + new DateTime("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992, + new DateTime("2011-01-06"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, + new DateTime("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, + new DateTime("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, + new DateTime("2011-01-09"), "c1", 50, 4985, "b", 50, 4984, "c", 50, 4983 + ), + + new Interval("2011-01-05/2011-01-10"), + makeTopNResults( + new DateTime("2011-01-05T01"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992, + new DateTime("2011-01-06T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, + new DateTime("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, + new DateTime("2011-01-08T01"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, + new DateTime("2011-01-09T01"), "c2", 50, 4985, "b", 50, 4984, "c", 50, 4983 + ) + ); + + + TestHelper.assertExpectedResults( + makeTopNResults( + new DateTime("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992, + new DateTime("2011-01-05T01"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992, + new DateTime("2011-01-06"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, + new DateTime("2011-01-06T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, + new DateTime("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, + new DateTime("2011-01-07T01"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, + new DateTime("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, + new DateTime("2011-01-08T01"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, + new DateTime("2011-01-09"), "c1", 50, 4985, "b", 50, 4984, "c", 50, 4983, + new DateTime("2011-01-09T01"), "c2", 50, 4985, "b", 50, 4984, "c", 50, 4983 + ), + runner.run( + builder.intervals("2011-01-01/2011-01-10") + .metric("avg_imps_per_row_double") + .aggregators(AGGS) + .postAggregators(POST_AGGS) + .build() + ) + ); + } + @Test public void testSearchCaching() throws Exception { @@ -1007,20 +1091,22 @@ public class CachingClusteredClientTest List> retVal = Lists.newArrayListWithCapacity(objects.length / 3); for (int i = 0; i < objects.length; i += 3) { + double avg_impr = ((Number) objects[i + 2]).doubleValue() / ((Number) objects[i + 1]).doubleValue(); retVal.add( new Result<>( (DateTime) objects[i], new TimeseriesResultValue( - ImmutableMap.of( - "rows", objects[i + 1], - "imps", objects[i + 2], - "impers", objects[i + 2], - "avg_imps_per_row", - ((Number) objects[i + 2]).doubleValue() / ((Number) objects[i + 1]).doubleValue() + ImmutableMap.builder() + .put("rows", objects[i + 1]) + .put("imps", objects[i + 2]) + .put("impers", objects[i + 2]) + .put("avg_imps_per_row",avg_impr) + .put("avg_imps_per_row_half",avg_impr / 2) + .put("avg_imps_per_row_double",avg_impr * 2) + .build() ) ) - ) - ); + ); } return retVal; } @@ -1099,13 +1185,15 @@ public class CachingClusteredClientTest final double imps = ((Number) objects[index + 2]).doubleValue(); final double rows = ((Number) objects[index + 1]).doubleValue(); values.add( - ImmutableMap.of( - TOP_DIM, objects[index], - "rows", rows, - "imps", imps, - "impers", imps, - "avg_imps_per_row", imps / rows - ) + ImmutableMap.builder() + .put(TOP_DIM, objects[index]) + .put("rows", rows) + .put("imps", imps) + .put("impers", imps) + .put("avg_imps_per_row", imps / rows) + .put("avg_imps_per_row_double", ((imps * 2) / rows)) + .put("avg_imps_per_row_half", (imps / (rows * 2))) + .build() ); index += 3; }