提交 f5dc75ef 编写于 作者: F fjy

Merge pull request #464 from metamx/fix-context

Fix context to be backwards compatible with String values
......@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequence;
import io.druid.query.spec.QuerySegmentSpec;
import org.joda.time.Duration;
......@@ -120,6 +121,61 @@ public abstract class BaseQuery<T> implements Query<T>
return retVal == null ? defaultValue : retVal;
}
@Override
public int getContextPriority(int defaultValue)
{
Object val = context.get("priority");
if (val == null) {
return defaultValue;
}
if (val instanceof String) {
return Integer.parseInt((String) val);
} else if (val instanceof Integer) {
return (int) val;
} else {
throw new ISE("Unknown type [%s]", val.getClass());
}
}
@Override
public boolean getContextBySegment(boolean defaultValue)
{
return parseBoolean("bySegment", defaultValue);
}
@Override
public boolean getContextPopulateCache(boolean defaultValue)
{
return parseBoolean("populateCache", defaultValue);
}
@Override
public boolean getContextUseCache(boolean defaultValue)
{
return parseBoolean("useCache", defaultValue);
}
@Override
public boolean getContextFinalize(boolean defaultValue)
{
return parseBoolean("finalize", defaultValue);
}
private boolean parseBoolean(String key, boolean defaultValue)
{
Object val = context.get(key);
if (val == null) {
return defaultValue;
}
if (val instanceof String) {
return Boolean.parseBoolean((String) val);
} else if (val instanceof Boolean) {
return (boolean) val;
} else {
throw new ISE("Unknown type [%s]. Cannot parse!", val.getClass());
}
}
protected Map<String, Object> computeOverridenContext(Map<String, Object> overrides)
{
Map<String, Object> overridden = Maps.newTreeMap();
......
......@@ -53,7 +53,7 @@ public class BySegmentQueryRunner<T> implements QueryRunner<T>
@SuppressWarnings("unchecked")
public Sequence<T> run(final Query<T> query)
{
if (Boolean.parseBoolean(query.<String>getContextValue("bySegment"))) {
if (query.getContextBySegment(false)) {
final Sequence<T> baseSequence = base.run(query);
return new Sequence<T>()
{
......
......@@ -37,7 +37,7 @@ public abstract class BySegmentSkippingQueryRunner<T> implements QueryRunner<T>
@Override
public Sequence<T> run(Query<T> query)
{
if (Boolean.parseBoolean(query.<String>getContextValue("bySegment"))) {
if (query.getContextBySegment(false)) {
return baseRunner.run(query);
}
......
......@@ -83,7 +83,7 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
@Override
public Sequence<T> run(final Query<T> query)
{
final int priority = Integer.parseInt((String) query.getContextValue("priority", "0"));
final int priority = query.getContextValue("priority", 0);
return new BaseSequence<T, Iterator<T>>(
new BaseSequence.IteratorMaker<T, Iterator<T>>()
......
......@@ -48,8 +48,8 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
@Override
public Sequence<T> run(final Query<T> query)
{
final boolean isBySegment = Boolean.parseBoolean(query.<String>getContextValue("bySegment"));
final boolean shouldFinalize = Boolean.parseBoolean(query.getContextValue("finalize", "true"));
final boolean isBySegment = query.getContextBySegment(false);
final boolean shouldFinalize = query.getContextFinalize(true);
if (shouldFinalize) {
Function<T, T> finalizerFn;
if (isBySegment) {
......@@ -84,8 +84,7 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
);
}
};
}
else {
} else {
finalizerFn = toolChest.makeMetricManipulatorFn(
query,
new MetricManipulationFn()
......@@ -100,7 +99,7 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
}
return Sequences.map(
baseRunner.run(query.withOverriddenContext(ImmutableMap.<String, Object>of("finalize", "false"))),
baseRunner.run(query.withOverriddenContext(ImmutableMap.<String, Object>of("finalize", false))),
finalizerFn
);
}
......
......@@ -83,7 +83,7 @@ public class GroupByParallelQueryRunner implements QueryRunner<Row>
query,
configSupplier.get()
);
final int priority = Integer.parseInt((String) query.getContextValue("priority", "0"));
final int priority = query.getContextPriority(0);
if (Iterables.isEmpty(queryables)) {
log.warn("No queryables found.");
......
......@@ -74,6 +74,13 @@ public interface Query<T>
public <ContextType> ContextType getContextValue(String key, ContextType defaultValue);
// For backwards compatibility
public int getContextPriority(int defaultValue);
public boolean getContextBySegment(boolean defaultValue);
public boolean getContextPopulateCache(boolean defaultValue);
public boolean getContextUseCache(boolean defaultValue);
public boolean getContextFinalize(boolean defaultValue);
public Query<T> withOverriddenContext(Map<String, Object> contextOverride);
public Query<T> withQuerySegmentSpec(QuerySegmentSpec spec);
......
......@@ -294,7 +294,7 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
return runner.run(query);
}
final boolean isBySegment = Boolean.parseBoolean((String) query.getContextValue("bySegment", "false"));
final boolean isBySegment = query.getContextBySegment(false);
return Sequences.map(
runner.run(query.withLimit(config.getMaxSearchLimit())),
......
......@@ -339,7 +339,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
return runner.run(query);
}
final boolean isBySegment = Boolean.parseBoolean((String) query.getContextValue("bySegment", "false"));
final boolean isBySegment = query.getContextBySegment(false);
return Sequences.map(
runner.run(query.withThreshold(minTopNThreshold)),
......
......@@ -70,7 +70,7 @@ public class CachePopulatingQueryRunner<T> implements QueryRunner<T>
final CacheStrategy strategy = toolChest.getCacheStrategy(query);
final boolean populateCache = Boolean.parseBoolean(query.getContextValue(CacheConfig.POPULATE_CACHE, "true"))
final boolean populateCache = query.getContextPopulateCache(true)
&& strategy != null
&& cacheConfig.isPopulateCache()
// historical only populates distributed cache since the cache lookups are done at broker.
......
......@@ -62,7 +62,6 @@ import io.druid.timeline.partition.PartitionChunk;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
......@@ -125,24 +124,24 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final List<Pair<DateTime, byte[]>> cachedResults = Lists.newArrayList();
final Map<String, CachePopulator> cachePopulatorMap = Maps.newHashMap();
final boolean useCache = Boolean.parseBoolean(query.getContextValue(CacheConfig.USE_CACHE, "true"))
final boolean useCache = query.getContextUseCache(true)
&& strategy != null
&& cacheConfig.isUseCache();
final boolean populateCache = Boolean.parseBoolean(query.getContextValue(CacheConfig.POPULATE_CACHE, "true"))
final boolean populateCache = query.getContextPopulateCache(true)
&& strategy != null && cacheConfig.isPopulateCache();
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
final boolean isBySegment = query.getContextBySegment(false);
ImmutableMap.Builder<String, Object> contextBuilder = new ImmutableMap.Builder<>();
final String priority = query.getContextValue("priority", "0");
final int priority = query.getContextPriority(0);
contextBuilder.put("priority", priority);
if (populateCache) {
contextBuilder.put(CacheConfig.POPULATE_CACHE, "false");
contextBuilder.put("bySegment", "true");
contextBuilder.put(CacheConfig.POPULATE_CACHE, false);
contextBuilder.put("bySegment", true);
}
contextBuilder.put("intermediate", "true");
contextBuilder.put("intermediate", true);
final Query<T> rewrittenQuery = query.withOverriddenContext(contextBuilder.build());
......
......@@ -106,7 +106,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
public Sequence<T> run(Query<T> query)
{
QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
boolean isBySegment = query.getContextBySegment(false);
Pair<JavaType, JavaType> types = typesMap.get(query.getClass());
if (types == null) {
......
......@@ -68,13 +68,12 @@ public class RoutingDruidClient<IntermediateType, FinalType>
}
public ListenableFuture<FinalType> run(
String host,
String url,
Query query,
HttpResponseHandler<IntermediateType, FinalType> responseHandler
)
{
final ListenableFuture<FinalType> future;
final String url = String.format("http://%s/druid/v2/", host);
try {
log.debug("Querying url[%s]", url);
......
......@@ -106,8 +106,6 @@ public class AsyncQueryForwardingServlet extends HttpServlet
}
req.setAttribute(DISPATCHED, true);
resp.setStatus(200);
resp.setContentType("application/x-javascript");
query = objectMapper.readValue(req.getInputStream(), Query.class);
queryId = query.getId();
......@@ -132,6 +130,9 @@ public class AsyncQueryForwardingServlet extends HttpServlet
@Override
public ClientResponse<OutputStream> handleResponse(HttpResponse response)
{
resp.setStatus(response.getStatus().getCode());
resp.setContentType("application/x-javascript");
byte[] bytes = getContentBytes(response.getContent());
if (bytes.length > 0) {
try {
......@@ -209,7 +210,7 @@ public class AsyncQueryForwardingServlet extends HttpServlet
@Override
public void run()
{
routingDruidClient.run(host, theQuery, responseHandler);
routingDruidClient.run(makeUrl(host, req), theQuery, responseHandler);
}
}
);
......@@ -235,4 +236,14 @@ public class AsyncQueryForwardingServlet extends HttpServlet
.emit();
}
}
private String makeUrl(String host, HttpServletRequest req)
{
String queryString = req.getQueryString();
if (queryString == null) {
return String.format("http://%s%s", host, req.getRequestURI());
}
return String.format("http://%s%s?%s", host, req.getRequestURI(), req.getQueryString());
}
}
......@@ -44,7 +44,7 @@ public class QueryIDProvider
return String.format(
"%s_%s_%s_%s_%s",
query.getDataSource(),
query.getDuration(),
query.getIntervals(),
host,
new DateTime(),
id.incrementAndGet()
......
......@@ -214,13 +214,13 @@ public class CachingClusteredClientTest
public void testTimeseriesCaching() throws Exception
{
final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(SEG_SPEC)
.filters(DIM_FILTER)
.granularity(GRANULARITY)
.aggregators(AGGS)
.postAggregators(POST_AGGS)
.context(CONTEXT);
.dataSource(DATA_SOURCE)
.intervals(SEG_SPEC)
.filters(DIM_FILTER)
.granularity(GRANULARITY)
.aggregators(AGGS)
.postAggregators(POST_AGGS)
.context(CONTEXT);
testQueryCaching(
builder.build(),
......@@ -265,9 +265,9 @@ public class CachingClusteredClientTest
),
client.run(
builder.intervals("2011-01-01/2011-01-10")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
)
);
}
......@@ -277,13 +277,13 @@ public class CachingClusteredClientTest
public void testTimeseriesCachingTimeZone() throws Exception
{
final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(SEG_SPEC)
.filters(DIM_FILTER)
.granularity(PT1H_TZ_GRANULARITY)
.aggregators(AGGS)
.postAggregators(POST_AGGS)
.context(CONTEXT);
.dataSource(DATA_SOURCE)
.intervals(SEG_SPEC)
.filters(DIM_FILTER)
.granularity(PT1H_TZ_GRANULARITY)
.aggregators(AGGS)
.postAggregators(POST_AGGS)
.context(CONTEXT);
testQueryCaching(
builder.build(),
......@@ -305,9 +305,9 @@ public class CachingClusteredClientTest
),
client.run(
builder.intervals("2011-11-04/2011-11-08")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
)
);
}
......@@ -316,18 +316,22 @@ public class CachingClusteredClientTest
public void testDisableUseCache() throws Exception
{
final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(SEG_SPEC)
.filters(DIM_FILTER)
.granularity(GRANULARITY)
.aggregators(AGGS)
.postAggregators(POST_AGGS);
.dataSource(DATA_SOURCE)
.intervals(SEG_SPEC)
.filters(DIM_FILTER)
.granularity(GRANULARITY)
.aggregators(AGGS)
.postAggregators(POST_AGGS);
testQueryCaching(
1,
true,
builder.context(ImmutableMap.<String, Object>of("useCache", "false",
"populateCache", "true")).build(),
builder.context(
ImmutableMap.<String, Object>of(
"useCache", "false",
"populateCache", "true"
)
).build(),
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000)
);
......@@ -340,8 +344,12 @@ public class CachingClusteredClientTest
testQueryCaching(
1,
false,
builder.context(ImmutableMap.<String, Object>of("useCache", "false",
"populateCache", "false")).build(),
builder.context(
ImmutableMap.<String, Object>of(
"useCache", "false",
"populateCache", "false"
)
).build(),
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000)
);
......@@ -352,8 +360,12 @@ public class CachingClusteredClientTest
testQueryCaching(
1,
false,
builder.context(ImmutableMap.<String, Object>of("useCache", "true",
"populateCache", "false")).build(),
builder.context(
ImmutableMap.<String, Object>of(
"useCache", "true",
"populateCache", "false"
)
).build(),
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000)
);
......@@ -422,10 +434,10 @@ public class CachingClusteredClientTest
),
client.run(
builder.intervals("2011-01-01/2011-01-10")
.metric("imps")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
.metric("imps")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
)
);
}
......@@ -467,10 +479,10 @@ public class CachingClusteredClientTest
),
client.run(
builder.intervals("2011-11-04/2011-11-08")
.metric("imps")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
.metric("imps")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
)
);
}
......@@ -533,10 +545,10 @@ public class CachingClusteredClientTest
),
client.run(
builder.intervals("2011-01-01/2011-01-10")
.metric("imps")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
.metric("imps")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
.build()
)
);
}
......@@ -638,8 +650,8 @@ public class CachingClusteredClientTest
EasyMock.expect(serverView.getQueryRunner(server))
.andReturn(expectations.getQueryRunner())
.once();
.andReturn(expectations.getQueryRunner())
.once();
final Capture<? extends Query> capture = new Capture();
queryCaptures.add(capture);
......@@ -656,8 +668,8 @@ public class CachingClusteredClientTest
}
EasyMock.expect(queryable.run(EasyMock.capture(capture)))
.andReturn(toQueryableTimeseriesResults(expectBySegment, segmentIds, intervals, results))
.once();
.andReturn(toQueryableTimeseriesResults(expectBySegment, segmentIds, intervals, results))
.once();
} else if (query instanceof TopNQuery) {
List<String> segmentIds = Lists.newArrayList();
......@@ -669,8 +681,8 @@ public class CachingClusteredClientTest
results.add(expectation.getResults());
}
EasyMock.expect(queryable.run(EasyMock.capture(capture)))
.andReturn(toQueryableTopNResults(segmentIds, intervals, results))
.once();
.andReturn(toQueryableTopNResults(segmentIds, intervals, results))
.once();
} else if (query instanceof SearchQuery) {
List<String> segmentIds = Lists.newArrayList();
List<Interval> intervals = Lists.newArrayList();
......@@ -681,8 +693,8 @@ public class CachingClusteredClientTest
results.add(expectation.getResults());
}
EasyMock.expect(queryable.run(EasyMock.capture(capture)))
.andReturn(toQueryableSearchResults(segmentIds, intervals, results))
.once();
.andReturn(toQueryableSearchResults(segmentIds, intervals, results))
.once();
} else if (query instanceof TimeBoundaryQuery) {
List<String> segmentIds = Lists.newArrayList();
List<Interval> intervals = Lists.newArrayList();
......@@ -693,8 +705,8 @@ public class CachingClusteredClientTest
results.add(expectation.getResults());
}
EasyMock.expect(queryable.run(EasyMock.capture(capture)))
.andReturn(toQueryableTimeBoundaryResults(segmentIds, intervals, results))
.once();
.andReturn(toQueryableTimeBoundaryResults(segmentIds, intervals, results))
.once();
} else {
throw new ISE("Unknown query type[%s]", query.getClass());
}
......@@ -762,11 +774,11 @@ public class CachingClusteredClientTest
for (Capture queryCapture : queryCaptures) {
Query capturedQuery = (Query) queryCapture.getValue();
if (expectBySegment) {
Assert.assertEquals("true", capturedQuery.getContextValue("bySegment"));
Assert.assertEquals(true, capturedQuery.<Boolean>getContextValue("bySegment"));
} else {
Assert.assertTrue(
capturedQuery.getContextValue("bySegment") == null ||
capturedQuery.getContextValue("bySegment").equals("false")
capturedQuery.getContextValue("bySegment").equals(false)
);
}
}
......@@ -1160,13 +1172,13 @@ public class CachingClusteredClientTest
return new CachingClusteredClient(
new MapQueryToolChestWarehouse(
ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
.put(
TimeseriesQuery.class,
new TimeseriesQueryQueryToolChest(new QueryConfig())
)
.put(TopNQuery.class, new TopNQueryQueryToolChest(new TopNQueryConfig()))
.put(SearchQuery.class, new SearchQueryQueryToolChest(new SearchQueryConfig()))
.build()
.put(
TimeseriesQuery.class,
new TimeseriesQueryQueryToolChest(new QueryConfig())
)
.put(TopNQuery.class, new TopNQueryQueryToolChest(new TopNQueryConfig()))
.put(SearchQuery.class, new SearchQueryQueryToolChest(new SearchQueryConfig()))
.build()
),
new TimelineServerView()
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册