From 1267fbb7f5b2c5adb290ad7c1b354011d249c4b2 Mon Sep 17 00:00:00 2001 From: fjy Date: Sun, 6 Apr 2014 09:20:58 -0700 Subject: [PATCH] fix context to be backwards compat --- .../main/java/io/druid/query/BaseQuery.java | 65 +++++++++ .../io/druid/query/BySegmentQueryRunner.java | 2 +- .../query/BySegmentSkippingQueryRunner.java | 2 +- .../query/ChainedExecutionQueryRunner.java | 2 +- .../query/FinalizeResultsQueryRunner.java | 9 +- .../query/GroupByParallelQueryRunner.java | 2 +- .../src/main/java/io/druid/query/Query.java | 7 + .../search/SearchQueryQueryToolChest.java | 2 +- .../query/topn/TopNQueryQueryToolChest.java | 2 +- .../client/CachePopulatingQueryRunner.java | 2 +- .../druid/client/CachingClusteredClient.java | 15 +- .../io/druid/client/DirectDruidClient.java | 2 +- .../server/AsyncQueryForwardingServlet.java | 5 +- .../client/CachingClusteredClientTest.java | 138 ++++++++++-------- 14 files changed, 169 insertions(+), 86 deletions(-) diff --git a/processing/src/main/java/io/druid/query/BaseQuery.java b/processing/src/main/java/io/druid/query/BaseQuery.java index 71beaa2665..09316db058 100644 --- a/processing/src/main/java/io/druid/query/BaseQuery.java +++ b/processing/src/main/java/io/druid/query/BaseQuery.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import com.metamx.common.ISE; import com.metamx.common.guava.Sequence; import io.druid.query.spec.QuerySegmentSpec; import org.joda.time.Duration; @@ -120,6 +121,70 @@ public abstract class BaseQuery implements Query return retVal == null ? defaultValue : retVal; } + @Override + public int getContextPriority(int defaultValue) + { + Object val = context.get("priority"); + if (val == null) { + return defaultValue; + } + if (val instanceof String) { + return Integer.parseInt((String) val); + } else if (val instanceof Integer) { + return (int) val; + } else { + throw new ISE("Unknown type [%s]", val.getClass()); + } + } + + @Override + public boolean getContextBySegment(boolean defaultValue) + { + Object val = context.get("bySegment"); + if (val == null) { + return defaultValue; + } + if (val instanceof String) { + return Boolean.parseBoolean((String) val); + } else if (val instanceof Integer) { + return (boolean) val; + } else { + throw new ISE("Unknown type [%s]", val.getClass()); + } + } + + @Override + public boolean getContextPopulateCache(boolean defaultValue) + { + Object val = context.get("populateCache"); + if (val == null) { + return defaultValue; + } + if (val instanceof String) { + return Boolean.parseBoolean((String) val); + } else if (val instanceof Integer) { + return (boolean) val; + } else { + throw new ISE("Unknown type [%s]", val.getClass()); + } + } + + @Override + public boolean getContextUseCache(boolean defaultValue) + { + Object val = context.get("useCache"); + if (val == null) { + return defaultValue; + } + if (val instanceof String) { + return Boolean.parseBoolean((String) val); + } else if (val instanceof Integer) { + return (boolean) val; + } else { + throw new ISE("Unknown type [%s]", val.getClass()); + } + } + protected Map computeOverridenContext(Map overrides) { Map overridden = Maps.newTreeMap(); diff --git a/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java b/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java index d6150f6345..44094d0216 100644 --- a/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java +++ b/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java @@ -53,7 +53,7 @@ public class BySegmentQueryRunner implements QueryRunner @SuppressWarnings("unchecked") public Sequence run(final Query query) { - if (Boolean.parseBoolean(query.getContextValue("bySegment"))) { + if (query.getContextBySegment(false)) { final Sequence baseSequence = base.run(query); return new Sequence() { diff --git a/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java b/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java index 8e666c30b1..13ca4dd75d 100644 --- a/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java @@ -37,7 +37,7 @@ public abstract class BySegmentSkippingQueryRunner implements QueryRunner @Override public Sequence run(Query query) { - if (Boolean.parseBoolean(query.getContextValue("bySegment"))) { + if (query.getContextBySegment(false)) { return baseRunner.run(query); } diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java index d3600068a2..3e3e6b0324 100644 --- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java @@ -83,7 +83,7 @@ public class ChainedExecutionQueryRunner implements QueryRunner @Override public Sequence run(final Query query) { - final int priority = Integer.parseInt((String) query.getContextValue("priority", "0")); + final int priority = query.getContextValue("priority", 0); return new BaseSequence>( new BaseSequence.IteratorMaker>() diff --git a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java index 2880332e18..dee0588847 100644 --- a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java +++ b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java @@ -48,8 +48,8 @@ public class FinalizeResultsQueryRunner implements QueryRunner @Override public Sequence run(final Query query) { - final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment")); - final boolean shouldFinalize = Boolean.parseBoolean(query.getContextValue("finalize", "true")); + final boolean isBySegment = query.getContextBySegment(false); + final boolean shouldFinalize = query.getContextFinalize(true); if (shouldFinalize) { Function finalizerFn; if (isBySegment) { @@ -84,8 +84,7 @@ public class FinalizeResultsQueryRunner implements QueryRunner ); } }; - } - else { + } else { finalizerFn = toolChest.makeMetricManipulatorFn( query, new MetricManipulationFn() @@ -100,7 +99,7 @@ public class FinalizeResultsQueryRunner implements QueryRunner } return Sequences.map( - baseRunner.run(query.withOverriddenContext(ImmutableMap.of("finalize", "false"))), + baseRunner.run(query.withOverriddenContext(ImmutableMap.of("finalize", false))), finalizerFn ); } diff --git a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java index 10dde9b26e..20817a772e 100644 --- a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java @@ -83,7 +83,7 @@ public class GroupByParallelQueryRunner implements QueryRunner query, configSupplier.get() ); - final int priority = Integer.parseInt((String) query.getContextValue("priority", "0")); + final int priority = query.getContextPriority(0); if (Iterables.isEmpty(queryables)) { log.warn("No queryables found."); diff --git a/processing/src/main/java/io/druid/query/Query.java b/processing/src/main/java/io/druid/query/Query.java index 10a8432858..2de75e5745 100644 --- a/processing/src/main/java/io/druid/query/Query.java +++ b/processing/src/main/java/io/druid/query/Query.java @@ -74,6 +74,13 @@ public interface Query public ContextType getContextValue(String key, ContextType defaultValue); + // For backwards compatibility + public int getContextPriority(int defaultValue); + public boolean getContextBySegment(boolean defaultValue); + public boolean getContextPopulateCache(boolean defaultValue); + public boolean getContextUseCache(boolean defaultValue); + public boolean getContextFinalize(boolean defaultValue); + public Query withOverriddenContext(Map contextOverride); public Query withQuerySegmentSpec(QuerySegmentSpec spec); diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index 6e14ef1c1f..f559829d59 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -294,7 +294,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest implements QueryRunner final CacheStrategy strategy = toolChest.getCacheStrategy(query); - final boolean populateCache = Boolean.parseBoolean(query.getContextValue(CacheConfig.POPULATE_CACHE, "true")) + final boolean populateCache = query.getContextPopulateCache(true) && strategy != null && cacheConfig.isPopulateCache() // historical only populates distributed cache since the cache lookups are done at broker. diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 65ac6bea41..0e63f9e4ac 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -62,7 +62,6 @@ import io.druid.timeline.partition.PartitionChunk; import org.joda.time.DateTime; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -125,24 +124,24 @@ public class CachingClusteredClient implements QueryRunner final List> cachedResults = Lists.newArrayList(); final Map cachePopulatorMap = Maps.newHashMap(); - final boolean useCache = Boolean.parseBoolean(query.getContextValue(CacheConfig.USE_CACHE, "true")) + final boolean useCache = query.getContextUseCache(true) && strategy != null && cacheConfig.isUseCache(); - final boolean populateCache = Boolean.parseBoolean(query.getContextValue(CacheConfig.POPULATE_CACHE, "true")) + final boolean populateCache = query.getContextPopulateCache(true) && strategy != null && cacheConfig.isPopulateCache(); - final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false")); + final boolean isBySegment = query.getContextBySegment(false); ImmutableMap.Builder contextBuilder = new ImmutableMap.Builder<>(); - final String priority = query.getContextValue("priority", "0"); + final int priority = query.getContextPriority(0); contextBuilder.put("priority", priority); if (populateCache) { - contextBuilder.put(CacheConfig.POPULATE_CACHE, "false"); - contextBuilder.put("bySegment", "true"); + contextBuilder.put(CacheConfig.POPULATE_CACHE, false); + contextBuilder.put("bySegment", true); } - contextBuilder.put("intermediate", "true"); + contextBuilder.put("intermediate", true); final Query rewrittenQuery = query.withOverriddenContext(contextBuilder.build()); diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index fa95ba97f1..76c842029b 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -106,7 +106,7 @@ public class DirectDruidClient implements QueryRunner public Sequence run(Query query) { QueryToolChest> toolChest = warehouse.getToolChest(query); - boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false")); + boolean isBySegment = query.getContextBySegment(false); Pair types = typesMap.get(query.getClass()); if (types == null) { diff --git a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java index 85f33a7000..26f2c8f6ff 100644 --- a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java +++ b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java @@ -106,8 +106,6 @@ public class AsyncQueryForwardingServlet extends HttpServlet } req.setAttribute(DISPATCHED, true); - resp.setStatus(200); - resp.setContentType("application/x-javascript"); query = objectMapper.readValue(req.getInputStream(), Query.class); queryId = query.getId(); @@ -132,6 +130,9 @@ public class AsyncQueryForwardingServlet extends HttpServlet @Override public ClientResponse handleResponse(HttpResponse response) { + resp.setStatus(response.getStatus().getCode()); + resp.setContentType("application/x-javascript"); + byte[] bytes = getContentBytes(response.getContent()); if (bytes.length > 0) { try { diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index cfea29f9a8..bb8787e66d 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -214,13 +214,13 @@ public class CachingClusteredClientTest public void testTimeseriesCaching() throws Exception { final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() - .dataSource(DATA_SOURCE) - .intervals(SEG_SPEC) - .filters(DIM_FILTER) - .granularity(GRANULARITY) - .aggregators(AGGS) - .postAggregators(POST_AGGS) - .context(CONTEXT); + .dataSource(DATA_SOURCE) + .intervals(SEG_SPEC) + .filters(DIM_FILTER) + .granularity(GRANULARITY) + .aggregators(AGGS) + .postAggregators(POST_AGGS) + .context(CONTEXT); testQueryCaching( builder.build(), @@ -265,9 +265,9 @@ public class CachingClusteredClientTest ), client.run( builder.intervals("2011-01-01/2011-01-10") - .aggregators(RENAMED_AGGS) - .postAggregators(RENAMED_POST_AGGS) - .build() + .aggregators(RENAMED_AGGS) + .postAggregators(RENAMED_POST_AGGS) + .build() ) ); } @@ -277,13 +277,13 @@ public class CachingClusteredClientTest public void testTimeseriesCachingTimeZone() throws Exception { final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() - .dataSource(DATA_SOURCE) - .intervals(SEG_SPEC) - .filters(DIM_FILTER) - .granularity(PT1H_TZ_GRANULARITY) - .aggregators(AGGS) - .postAggregators(POST_AGGS) - .context(CONTEXT); + .dataSource(DATA_SOURCE) + .intervals(SEG_SPEC) + .filters(DIM_FILTER) + .granularity(PT1H_TZ_GRANULARITY) + .aggregators(AGGS) + .postAggregators(POST_AGGS) + .context(CONTEXT); testQueryCaching( builder.build(), @@ -305,9 +305,9 @@ public class CachingClusteredClientTest ), client.run( builder.intervals("2011-11-04/2011-11-08") - .aggregators(RENAMED_AGGS) - .postAggregators(RENAMED_POST_AGGS) - .build() + .aggregators(RENAMED_AGGS) + .postAggregators(RENAMED_POST_AGGS) + .build() ) ); } @@ -316,18 +316,22 @@ public class CachingClusteredClientTest public void testDisableUseCache() throws Exception { final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() - .dataSource(DATA_SOURCE) - .intervals(SEG_SPEC) - .filters(DIM_FILTER) - .granularity(GRANULARITY) - .aggregators(AGGS) - .postAggregators(POST_AGGS); + .dataSource(DATA_SOURCE) + .intervals(SEG_SPEC) + .filters(DIM_FILTER) + .granularity(GRANULARITY) + .aggregators(AGGS) + .postAggregators(POST_AGGS); testQueryCaching( 1, true, - builder.context(ImmutableMap.of("useCache", "false", - "populateCache", "true")).build(), + builder.context( + ImmutableMap.of( + "useCache", "false", + "populateCache", "true" + ) + ).build(), new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000) ); @@ -340,8 +344,12 @@ public class CachingClusteredClientTest testQueryCaching( 1, false, - builder.context(ImmutableMap.of("useCache", "false", - "populateCache", "false")).build(), + builder.context( + ImmutableMap.of( + "useCache", "false", + "populateCache", "false" + ) + ).build(), new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000) ); @@ -352,8 +360,12 @@ public class CachingClusteredClientTest testQueryCaching( 1, false, - builder.context(ImmutableMap.of("useCache", "true", - "populateCache", "false")).build(), + builder.context( + ImmutableMap.of( + "useCache", "true", + "populateCache", "false" + ) + ).build(), new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000) ); @@ -422,10 +434,10 @@ public class CachingClusteredClientTest ), client.run( builder.intervals("2011-01-01/2011-01-10") - .metric("imps") - .aggregators(RENAMED_AGGS) - .postAggregators(RENAMED_POST_AGGS) - .build() + .metric("imps") + .aggregators(RENAMED_AGGS) + .postAggregators(RENAMED_POST_AGGS) + .build() ) ); } @@ -467,10 +479,10 @@ public class CachingClusteredClientTest ), client.run( builder.intervals("2011-11-04/2011-11-08") - .metric("imps") - .aggregators(RENAMED_AGGS) - .postAggregators(RENAMED_POST_AGGS) - .build() + .metric("imps") + .aggregators(RENAMED_AGGS) + .postAggregators(RENAMED_POST_AGGS) + .build() ) ); } @@ -533,10 +545,10 @@ public class CachingClusteredClientTest ), client.run( builder.intervals("2011-01-01/2011-01-10") - .metric("imps") - .aggregators(RENAMED_AGGS) - .postAggregators(RENAMED_POST_AGGS) - .build() + .metric("imps") + .aggregators(RENAMED_AGGS) + .postAggregators(RENAMED_POST_AGGS) + .build() ) ); } @@ -638,8 +650,8 @@ public class CachingClusteredClientTest EasyMock.expect(serverView.getQueryRunner(server)) - .andReturn(expectations.getQueryRunner()) - .once(); + .andReturn(expectations.getQueryRunner()) + .once(); final Capture capture = new Capture(); queryCaptures.add(capture); @@ -656,8 +668,8 @@ public class CachingClusteredClientTest } EasyMock.expect(queryable.run(EasyMock.capture(capture))) - .andReturn(toQueryableTimeseriesResults(expectBySegment, segmentIds, intervals, results)) - .once(); + .andReturn(toQueryableTimeseriesResults(expectBySegment, segmentIds, intervals, results)) + .once(); } else if (query instanceof TopNQuery) { List segmentIds = Lists.newArrayList(); @@ -669,8 +681,8 @@ public class CachingClusteredClientTest results.add(expectation.getResults()); } EasyMock.expect(queryable.run(EasyMock.capture(capture))) - .andReturn(toQueryableTopNResults(segmentIds, intervals, results)) - .once(); + .andReturn(toQueryableTopNResults(segmentIds, intervals, results)) + .once(); } else if (query instanceof SearchQuery) { List segmentIds = Lists.newArrayList(); List intervals = Lists.newArrayList(); @@ -681,8 +693,8 @@ public class CachingClusteredClientTest results.add(expectation.getResults()); } EasyMock.expect(queryable.run(EasyMock.capture(capture))) - .andReturn(toQueryableSearchResults(segmentIds, intervals, results)) - .once(); + .andReturn(toQueryableSearchResults(segmentIds, intervals, results)) + .once(); } else if (query instanceof TimeBoundaryQuery) { List segmentIds = Lists.newArrayList(); List intervals = Lists.newArrayList(); @@ -693,8 +705,8 @@ public class CachingClusteredClientTest results.add(expectation.getResults()); } EasyMock.expect(queryable.run(EasyMock.capture(capture))) - .andReturn(toQueryableTimeBoundaryResults(segmentIds, intervals, results)) - .once(); + .andReturn(toQueryableTimeBoundaryResults(segmentIds, intervals, results)) + .once(); } else { throw new ISE("Unknown query type[%s]", query.getClass()); } @@ -762,11 +774,11 @@ public class CachingClusteredClientTest for (Capture queryCapture : queryCaptures) { Query capturedQuery = (Query) queryCapture.getValue(); if (expectBySegment) { - Assert.assertEquals("true", capturedQuery.getContextValue("bySegment")); + Assert.assertEquals(true, capturedQuery.getContextValue("bySegment")); } else { Assert.assertTrue( capturedQuery.getContextValue("bySegment") == null || - capturedQuery.getContextValue("bySegment").equals("false") + capturedQuery.getContextValue("bySegment").equals(false) ); } } @@ -1160,13 +1172,13 @@ public class CachingClusteredClientTest return new CachingClusteredClient( new MapQueryToolChestWarehouse( ImmutableMap., QueryToolChest>builder() - .put( - TimeseriesQuery.class, - new TimeseriesQueryQueryToolChest(new QueryConfig()) - ) - .put(TopNQuery.class, new TopNQueryQueryToolChest(new TopNQueryConfig())) - .put(SearchQuery.class, new SearchQueryQueryToolChest(new SearchQueryConfig())) - .build() + .put( + TimeseriesQuery.class, + new TimeseriesQueryQueryToolChest(new QueryConfig()) + ) + .put(TopNQuery.class, new TopNQueryQueryToolChest(new TopNQueryConfig())) + .put(SearchQuery.class, new SearchQueryQueryToolChest(new SearchQueryConfig())) + .build() ), new TimelineServerView() { -- GitLab