提交 f3125490 编写于 作者: I Igal Levy

Merge branch 'master' into igalDruid

......@@ -30,4 +30,4 @@ echo "For examples, see: "
echo " "
ls -1 examples/*/*sh
echo " "
echo "See also http://druid.io/docs/0.6.86"
echo "See also http://druid.io/docs/latest"
......@@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.90-SNAPSHOT</version>
<version>0.6.91-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.90-SNAPSHOT</version>
<version>0.6.91-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -81,7 +81,7 @@ druid.server.http.numThreads=50
druid.request.logging.type=emitter
druid.request.logging.feed=druid_requests
druid.monitoring.monitors=["com.metamx.metrics.SysMonitor","com.metamx.metrics.JvmMonitor", "io.druid.client.cache.CacheMonitor"]
druid.monitoring.monitors=["com.metamx.metrics.SysMonitor","com.metamx.metrics.JvmMonitor"]
# Emit metrics over http
druid.emitter=http
......@@ -106,16 +106,16 @@ The broker module uses several of the default modules in [Configuration](Configu
|Property|Description|Default|
|--------|-----------|-------|
|`druid.broker.cache.sizeInBytes`|Maximum size of the cache. If this is zero, cache is disabled.|10485760 (10MB)|
|`druid.broker.cache.initialSize`|The initial size of the cache in bytes.|500000|
|`druid.broker.cache.logEvictionCount`|If this is non-zero, there will be an eviction of entries.|0|
|`druid.broker.cache.sizeInBytes`|Maximum cache size in bytes. Zero disables caching.|0|
|`druid.broker.cache.initialSize`|Initial size of the hashtable backing the cache.|500000|
|`druid.broker.cache.logEvictionCount`|If non-zero, log cache eviction every `logEvictionCount` items.|0|
#### Memcache
|Property|Description|Default|
|--------|-----------|-------|
|`druid.broker.cache.expiration`|Memcache [expiration time ](https://code.google.com/p/memcached/wiki/NewCommands#Standard_Protocol).|2592000 (30 days)|
|`druid.broker.cache.timeout`|Maximum time in milliseconds to wait for a response from Memcache.|500|
|`druid.broker.cache.hosts`|Memcache hosts.|none|
|`druid.broker.cache.maxObjectSize`|Maximum object size in bytes for a Memcache object.|52428800 (50 MB)|
|`druid.broker.cache.memcachedPrefix`|Key prefix for all keys in Memcache.|druid|
|`druid.broker.cache.expiration`|Memcached [expiration time](https://code.google.com/p/memcached/wiki/NewCommands#Standard_Protocol).|2592000 (30 days)|
|`druid.broker.cache.timeout`|Maximum time in milliseconds to wait for a response from Memcached.|500|
|`druid.broker.cache.hosts`|Command separated list of Memcached hosts `<host:port>`.|none|
|`druid.broker.cache.maxObjectSize`|Maximum object size in bytes for a Memcached object.|52428800 (50 MB)|
|`druid.broker.cache.memcachedPrefix`|Key prefix for all keys in Memcached.|druid|
......@@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.90-SNAPSHOT</version>
<version>0.6.91-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.90-SNAPSHOT</version>
<version>0.6.91-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.90-SNAPSHOT</version>
<version>0.6.91-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.90-SNAPSHOT</version>
<version>0.6.91-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.90-SNAPSHOT</version>
<version>0.6.91-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -64,7 +64,7 @@ public abstract class AbstractTask implements Task
this.id = Preconditions.checkNotNull(id, "id");
this.groupId = Preconditions.checkNotNull(groupId, "groupId");
this.taskResource = Preconditions.checkNotNull(taskResource, "resource");
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.dataSource = Preconditions.checkNotNull(dataSource.toLowerCase(), "dataSource");
}
@JsonProperty
......
......@@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.90-SNAPSHOT</version>
<version>0.6.91-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.90-SNAPSHOT</version>
<version>0.6.91-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -23,7 +23,7 @@
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<packaging>pom</packaging>
<version>0.6.90-SNAPSHOT</version>
<version>0.6.91-SNAPSHOT</version>
<name>druid</name>
<description>druid</description>
<scm>
......@@ -313,17 +313,17 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>9.1.3.v20140225</version>
<version>9.1.4.v20140401</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>9.1.3.v20140225</version>
<version>9.1.4.v20140401</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlets</artifactId>
<version>9.1.3.v20140225</version>
<version>9.1.4.v20140401</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
......
......@@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.90-SNAPSHOT</version>
<version>0.6.91-SNAPSHOT</version>
</parent>
<dependencies>
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
public class AggregatorUtil
{
/**
* returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg
*
* @param postAggregatorList List of postAggregator, there is a restriction that the list should be in an order
* such that all the dependencies of any given aggregator should occur before that aggregator.
* See AggregatorUtilTest.testOutOfOrderPruneDependentPostAgg for example.
* @param postAggName name of the postAgg on which dependency is to be calculated
*/
public static List<PostAggregator> pruneDependentPostAgg(List<PostAggregator> postAggregatorList, String postAggName)
{
LinkedList<PostAggregator> rv = Lists.newLinkedList();
Set<String> deps = new HashSet<>();
deps.add(postAggName);
// Iterate backwards to find the last calculated aggregate and add dependent aggregator as we find dependencies in reverse order
for (PostAggregator agg : Lists.reverse(postAggregatorList)) {
if (deps.contains(agg.getName())) {
rv.addFirst(agg); // add to the beginning of List
deps.remove(agg.getName());
deps.addAll(agg.getDependentFields());
}
}
return rv;
}
public static Pair<List<AggregatorFactory>, List<PostAggregator>> condensedAggregators(
List<AggregatorFactory> aggList,
List<PostAggregator> postAggList,
String metric
)
{
List<PostAggregator> condensedPostAggs = AggregatorUtil.pruneDependentPostAgg(
postAggList,
metric
);
// calculate dependent aggregators for these postAgg
Set<String> dependencySet = new HashSet<>();
dependencySet.add(metric);
for (PostAggregator postAggregator : condensedPostAggs) {
dependencySet.addAll(postAggregator.getDependentFields());
}
List<AggregatorFactory> condensedAggs = Lists.newArrayList();
for (AggregatorFactory aggregatorSpec : aggList) {
if (dependencySet.contains(aggregatorSpec.getName())) {
condensedAggs.add(aggregatorSpec);
}
}
return new Pair(condensedAggs, condensedPostAggs);
}
}
......@@ -24,7 +24,6 @@ import io.druid.granularity.AllGranularity;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import java.util.LinkedHashMap;
import java.util.List;
......@@ -37,17 +36,14 @@ public class TimeseriesBinaryFn
{
private final QueryGranularity gran;
private final List<AggregatorFactory> aggregations;
private final List<PostAggregator> postAggregations;
public TimeseriesBinaryFn(
QueryGranularity granularity,
List<AggregatorFactory> aggregations,
List<PostAggregator> postAggregations
List<AggregatorFactory> aggregations
)
{
this.gran = granularity;
this.aggregations = aggregations;
this.postAggregations = postAggregations;
}
@Override
......@@ -71,11 +67,6 @@ public class TimeseriesBinaryFn
retVal.put(metricName, factory.combine(arg1Val.getMetric(metricName), arg2Val.getMetric(metricName)));
}
for (PostAggregator pf : postAggregations) {
final String metricName = pf.getName();
retVal.put(metricName, pf.compute(retVal));
}
return (gran instanceof AllGranularity) ?
new Result<TimeseriesResultValue>(
arg1.getTimestamp(),
......
......@@ -74,10 +74,6 @@ public class TimeseriesQueryEngine
bob.addMetric(aggregator);
}
for (PostAggregator postAgg : postAggregatorSpecs) {
bob.addMetric(postAgg);
}
Result<TimeseriesResultValue> retVal = bob.build();
// cleanup
......
......@@ -101,8 +101,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
TimeseriesQuery query = (TimeseriesQuery) input;
return new TimeseriesBinaryFn(
query.getGranularity(),
query.getAggregatorSpecs(),
query.getPostAggregatorSpecs()
query.getAggregatorSpecs()
);
}
};
......@@ -147,7 +146,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
values.put(agg.getName(), fn.manipulate(agg, holder.getMetric(agg.getName())));
}
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
values.put(postAgg.getName(), holder.getMetric(postAgg.getName()));
values.put(postAgg.getName(), postAgg.compute(values));
}
return new Result<TimeseriesResultValue>(
result.getTimestamp(),
......@@ -169,7 +168,6 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
return new CacheStrategy<Result<TimeseriesResultValue>, Object, TimeseriesQuery>()
{
private final List<AggregatorFactory> aggs = query.getAggregatorSpecs();
private final List<PostAggregator> postAggs = query.getPostAggregatorSpecs();
@Override
public byte[] computeCacheKey(TimeseriesQuery query)
......@@ -238,10 +236,6 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
retVal.put(factory.getName(), factory.deserialize(resultIter.next()));
}
for (PostAggregator postAgg : postAggs) {
retVal.put(postAgg.getName(), postAgg.compute(retVal));
}
return new Result<TimeseriesResultValue>(
timestamp,
new TimeseriesResultValue(retVal)
......
......@@ -19,10 +19,11 @@
package io.druid.query.topn;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import io.druid.collections.StupidPool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorUtil;
import io.druid.query.aggregation.PostAggregator;
import io.druid.segment.Capabilities;
import io.druid.segment.Cursor;
......@@ -56,7 +57,6 @@ public class AggregateTopNMetricFirstAlgorithm implements TopNAlgorithm<int[], T
this.bufferPool = bufferPool;
}
@Override
public TopNParams makeInitParams(
DimensionSelector dimSelector, Cursor cursor
......@@ -65,65 +65,27 @@ public class AggregateTopNMetricFirstAlgorithm implements TopNAlgorithm<int[], T
return new TopNParams(dimSelector, cursor, dimSelector.getValueCardinality(), Integer.MAX_VALUE);
}
@Override
public TopNResultBuilder makeResultBuilder(TopNParams params)
{
return query.getTopNMetricSpec().getResultBuilder(
params.getCursor().getTime(), query.getDimensionSpec(), query.getThreshold(), comparator
);
}
@Override
public void run(
TopNParams params, TopNResultBuilder resultBuilder, int[] ints
)
{
final TopNResultBuilder singleMetricResultBuilder = makeResultBuilder(params);
final String metric;
// ugly
TopNMetricSpec spec = query.getTopNMetricSpec();
if (spec instanceof InvertedTopNMetricSpec
&& ((InvertedTopNMetricSpec) spec).getDelegate() instanceof NumericTopNMetricSpec) {
metric = ((NumericTopNMetricSpec) ((InvertedTopNMetricSpec) spec).getDelegate()).getMetric();
} else if (spec instanceof NumericTopNMetricSpec) {
metric = ((NumericTopNMetricSpec) query.getTopNMetricSpec()).getMetric();
} else {
throw new ISE("WTF?! We are in AggregateTopNMetricFirstAlgorithm with a [%s] spec", spec.getClass().getName());
}
// Find either the aggregator or post aggregator to do the topN over
List<AggregatorFactory> condensedAggs = Lists.newArrayList();
for (AggregatorFactory aggregatorSpec : query.getAggregatorSpecs()) {
if (aggregatorSpec.getName().equalsIgnoreCase(metric)) {
condensedAggs.add(aggregatorSpec);
break;
}
}
List<PostAggregator> condensedPostAggs = Lists.newArrayList();
if (condensedAggs.isEmpty()) {
for (PostAggregator postAggregator : query.getPostAggregatorSpecs()) {
if (postAggregator.getName().equalsIgnoreCase(metric)) {
condensedPostAggs.add(postAggregator);
final String metric = query.getTopNMetricSpec().getMetricName(query.getDimensionSpec());
Pair<List<AggregatorFactory>, List<PostAggregator>> condensedAggPostAggPair = AggregatorUtil.condensedAggregators(
query.getAggregatorSpecs(),
query.getPostAggregatorSpecs(),
metric
);
// Add all dependent metrics
for (AggregatorFactory aggregatorSpec : query.getAggregatorSpecs()) {
if (postAggregator.getDependentFields().contains(aggregatorSpec.getName())) {
condensedAggs.add(aggregatorSpec);
}
}
break;
}
}
}
if (condensedAggs.isEmpty() && condensedPostAggs.isEmpty()) {
if (condensedAggPostAggPair.lhs.isEmpty() && condensedAggPostAggPair.rhs.isEmpty()) {
throw new ISE("WTF! Can't find the metric to do topN over?");
}
// Run topN for only a single metric
TopNQuery singleMetricQuery = new TopNQueryBuilder().copy(query)
.aggregators(condensedAggs)
.postAggregators(condensedPostAggs)
.aggregators(condensedAggPostAggPair.lhs)
.postAggregators(condensedAggPostAggPair.rhs)
.build();
final TopNResultBuilder singleMetricResultBuilder = BaseTopNAlgorithm.makeResultBuilder(params, singleMetricQuery);
PooledTopNAlgorithm singleMetricAlgo = new PooledTopNAlgorithm(capabilities, singleMetricQuery, bufferPool);
PooledTopNAlgorithm.PooledTopNParams singleMetricParam = null;
......
......@@ -28,6 +28,7 @@ import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
/**
......@@ -230,4 +231,18 @@ public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Pa
return Pair.of(startIndex, endIndex);
}
}
public static TopNResultBuilder makeResultBuilder(TopNParams params, TopNQuery query)
{
Comparator comparator = query.getTopNMetricSpec()
.getComparator(query.getAggregatorSpecs(), query.getPostAggregatorSpecs());
return query.getTopNMetricSpec().getResultBuilder(
params.getCursor().getTime(),
query.getDimensionSpec(),
query.getThreshold(),
comparator,
query.getAggregatorSpecs(),
query.getPostAggregatorSpecs()
);
}
}
......@@ -56,14 +56,6 @@ public class DimExtractionTopNAlgorithm extends BaseTopNAlgorithm<Aggregator[][]
return new TopNParams(dimSelector, cursor, dimSelector.getValueCardinality(), Integer.MAX_VALUE);
}
@Override
public TopNResultBuilder makeResultBuilder(TopNParams params)
{
return query.getTopNMetricSpec().getResultBuilder(
params.getCursor().getTime(), query.getDimensionSpec(), query.getThreshold(), comparator
);
}
@Override
protected Aggregator[][] makeDimValSelector(TopNParams params, int numProcessed, int numToProcess)
{
......@@ -144,9 +136,7 @@ public class DimExtractionTopNAlgorithm extends BaseTopNAlgorithm<Aggregator[][]
resultBuilder.addEntry(
entry.getKey(),
entry.getKey(),
vals,
query.getAggregatorSpecs(),
query.getPostAggregatorSpecs()
vals
);
}
}
......
......@@ -36,7 +36,6 @@ import java.util.List;
public class InvertedTopNMetricSpec implements TopNMetricSpec
{
private static final byte CACHE_TYPE_ID = 0x3;
private final TopNMetricSpec delegate;
@JsonCreator
......@@ -76,10 +75,12 @@ public class InvertedTopNMetricSpec implements TopNMetricSpec
DateTime timestamp,
DimensionSpec dimSpec,
int threshold,
Comparator comparator
Comparator comparator,
List<AggregatorFactory> aggFactories,
List<PostAggregator> postAggs
)
{
return delegate.getResultBuilder(timestamp, dimSpec, threshold, comparator);
return delegate.getResultBuilder(timestamp, dimSpec, threshold, comparator, aggFactories, postAggs);
}
@Override
......@@ -102,15 +103,27 @@ public class InvertedTopNMetricSpec implements TopNMetricSpec
delegate.initTopNAlgorithmSelector(selector);
}
@Override
public String getMetricName(DimensionSpec dimSpec)
{
return delegate.getMetricName(dimSpec);
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
InvertedTopNMetricSpec that = (InvertedTopNMetricSpec) o;
if (delegate != null ? !delegate.equals(that.delegate) : that.delegate != null) return false;
if (delegate != null ? !delegate.equals(that.delegate) : that.delegate != null) {
return false;
}
return true;
}
......
......@@ -80,10 +80,12 @@ public class LexicographicTopNMetricSpec implements TopNMetricSpec
DateTime timestamp,
DimensionSpec dimSpec,
int threshold,
Comparator comparator
Comparator comparator,
List<AggregatorFactory> aggFactories,
List<PostAggregator> postAggs
)
{
return new TopNLexicographicResultBuilder(timestamp, dimSpec, threshold, previousStop, comparator);
return new TopNLexicographicResultBuilder(timestamp, dimSpec, threshold, previousStop, comparator, aggFactories);
}
@Override
......@@ -111,6 +113,12 @@ public class LexicographicTopNMetricSpec implements TopNMetricSpec
selector.setAggregateAllMetrics(true);
}
@Override
public String getMetricName(DimensionSpec dimSpec)
{
return dimSpec.getOutputName();
}
@Override
public String toString()
{
......
......@@ -121,10 +121,12 @@ public class NumericTopNMetricSpec implements TopNMetricSpec
DateTime timestamp,
DimensionSpec dimSpec,
int threshold,
Comparator comparator
Comparator comparator,
List<AggregatorFactory> aggFactories,
List<PostAggregator> postAggs
)
{
return new TopNNumericResultBuilder(timestamp, dimSpec, metric, threshold, comparator);
return new TopNNumericResultBuilder(timestamp, dimSpec, metric, threshold, comparator, aggFactories, postAggs);
}
@Override
......@@ -150,6 +152,12 @@ public class NumericTopNMetricSpec implements TopNMetricSpec
selector.setAggregateTopNMetricFirst(true);
}
@Override
public String getMetricName(DimensionSpec dimSpec)
{
return metric;
}
@Override
public String toString()
{
......
......@@ -35,7 +35,8 @@ import java.util.Comparator;
/**
*/
public class PooledTopNAlgorithm extends BaseTopNAlgorithm<int[], BufferAggregator[], PooledTopNAlgorithm.PooledTopNParams>
public class PooledTopNAlgorithm
extends BaseTopNAlgorithm<int[], BufferAggregator[], PooledTopNAlgorithm.PooledTopNParams>
{
private final Capabilities capabilities;
private final TopNQuery query;
......@@ -113,13 +114,7 @@ public class PooledTopNAlgorithm extends BaseTopNAlgorithm<int[], BufferAggregat
.build();
}
@Override
public TopNResultBuilder makeResultBuilder(PooledTopNParams params)
{
return query.getTopNMetricSpec().getResultBuilder(
params.getCursor().getTime(), query.getDimensionSpec(), query.getThreshold(), comparator
);
}
@Override
protected int[] makeDimValSelector(PooledTopNParams params, int numProcessed, int numToProcess)
......@@ -217,9 +212,7 @@ public class PooledTopNAlgorithm extends BaseTopNAlgorithm<int[], BufferAggregat
resultBuilder.addEntry(
dimSelector.lookupName(i),
i,
vals,
query.getAggregatorSpecs(),
query.getPostAggregatorSpecs()
vals
);
}
}
......@@ -228,7 +221,7 @@ public class PooledTopNAlgorithm extends BaseTopNAlgorithm<int[], BufferAggregat
@Override
protected void closeAggregators(BufferAggregator[] bufferAggregators)
{
for(BufferAggregator agg : bufferAggregators) {
for (BufferAggregator agg : bufferAggregators) {
agg.close();
}
}
......@@ -246,11 +239,6 @@ public class PooledTopNAlgorithm extends BaseTopNAlgorithm<int[], BufferAggregat
public static class PooledTopNParams extends TopNParams
{
public static Builder builder()
{
return new Builder();
}
private final ResourceHolder<ByteBuffer> resultsBufHolder;
private final ByteBuffer resultsBuf;
private final int[] aggregatorSizes;
......@@ -278,6 +266,11 @@ public class PooledTopNAlgorithm extends BaseTopNAlgorithm<int[], BufferAggregat
this.arrayProvider = arrayProvider;
}
public static Builder builder()
{
return new Builder();
}
public ResourceHolder<ByteBuffer> getResultsBufHolder()
{
return resultsBufHolder;
......
......@@ -33,8 +33,6 @@ public interface TopNAlgorithm<DimValSelector, Parameters extends TopNParams>
public TopNParams makeInitParams(DimensionSelector dimSelector, Cursor cursor);
public TopNResultBuilder makeResultBuilder(Parameters params);
public void run(
Parameters params,
TopNResultBuilder resultBuilder,
......
......@@ -24,6 +24,7 @@ import io.druid.granularity.AllGranularity;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorUtil;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DimensionSpec;
import org.joda.time.DateTime;
......@@ -40,7 +41,7 @@ public class TopNBinaryFn implements BinaryFn<Result<TopNResultValue>, Result<To
private final TopNResultMerger merger;
private final DimensionSpec dimSpec;
private final QueryGranularity gran;
private final String dimension;
private final DimensionSpec dimensionSpec;
private final TopNMetricSpec topNMetricSpec;
private final int threshold;
private final List<AggregatorFactory> aggregations;
......@@ -63,9 +64,13 @@ public class TopNBinaryFn implements BinaryFn<Result<TopNResultValue>, Result<To
this.topNMetricSpec = topNMetricSpec;
this.threshold = threshold;
this.aggregations = aggregatorSpecs;
this.postAggregations = postAggregatorSpecs;
this.dimensionSpec = dimSpec;
this.postAggregations = AggregatorUtil.pruneDependentPostAgg(
postAggregatorSpecs,
this.topNMetricSpec.getMetricName(this.dimensionSpec)
);
this.dimension = dimSpec.getOutputName();
this.comparator = topNMetricSpec.getComparator(aggregatorSpecs, postAggregatorSpecs);
}
......@@ -79,11 +84,13 @@ public class TopNBinaryFn implements BinaryFn<Result<TopNResultValue>, Result<To
return merger.getResult(arg1, comparator);
}
Map<String, DimensionAndMetricValueExtractor> retVals = new LinkedHashMap<String, DimensionAndMetricValueExtractor>();
TopNResultValue arg1Vals = arg1.getValue();
TopNResultValue arg2Vals = arg2.getValue();
Map<String, DimensionAndMetricValueExtractor> retVals = new LinkedHashMap<String, DimensionAndMetricValueExtractor>();
String dimension = dimensionSpec.getOutputName();
String topNMetricName = topNMetricSpec.getMetricName(dimensionSpec);
for (DimensionAndMetricValueExtractor arg1Val : arg1Vals) {
retVals.put(arg1Val.getStringDimensionValue(dimension), arg1Val);
}
......@@ -92,16 +99,16 @@ public class TopNBinaryFn implements BinaryFn<Result<TopNResultValue>, Result<To
DimensionAndMetricValueExtractor arg1Val = retVals.get(dimensionValue);
if (arg1Val != null) {
Map<String, Object> retVal = new LinkedHashMap<String, Object>();
// size of map = aggregator + topNDim + postAgg (If sorting is done on post agg field)
Map<String, Object> retVal = new LinkedHashMap<String, Object>(aggregations.size() + 2);
retVal.put(dimension, dimensionValue);
for (AggregatorFactory factory : aggregations) {
final String metricName = factory.getName();
retVal.put(metricName, factory.combine(arg1Val.getMetric(metricName), arg2Val.getMetric(metricName)));
}
for (PostAggregator pf : postAggregations) {
retVal.put(pf.getName(), pf.compute(retVal));
for (PostAggregator postAgg : postAggregations) {
retVal.put(postAgg.getName(), postAgg.compute(retVal));
}
retVals.put(dimensionValue, new DimensionAndMetricValueExtractor(retVal));
......@@ -117,7 +124,14 @@ public class TopNBinaryFn implements BinaryFn<Result<TopNResultValue>, Result<To
timestamp = gran.toDateTime(gran.truncate(arg1.getTimestamp().getMillis()));
}
TopNResultBuilder bob = topNMetricSpec.getResultBuilder(timestamp, dimSpec, threshold, comparator);
TopNResultBuilder bob = topNMetricSpec.getResultBuilder(
timestamp,
dimSpec,
threshold,
comparator,
aggregations,
postAggregations
);
for (DimensionAndMetricValueExtractor extractor : retVals.values()) {
bob.addEntry(extractor);
}
......
......@@ -40,7 +40,7 @@ public class TopNLexicographicResultBuilder implements TopNResultBuilder
private final DateTime timestamp;
private final DimensionSpec dimSpec;
private final String previousStop;
private final List<AggregatorFactory> aggFactories;
private MinMaxPriorityQueue<DimValHolder> pQueue = null;
public TopNLexicographicResultBuilder(
......@@ -48,12 +48,14 @@ public class TopNLexicographicResultBuilder implements TopNResultBuilder
DimensionSpec dimSpec,
int threshold,
String previousStop,
final Comparator comparator
final Comparator comparator,
List<AggregatorFactory> aggFactories
)
{
this.timestamp = timestamp;
this.dimSpec = dimSpec;
this.previousStop = previousStop;
this.aggFactories = aggFactories;
instantiatePQueue(threshold, comparator);
}
......@@ -62,9 +64,7 @@ public class TopNLexicographicResultBuilder implements TopNResultBuilder
public TopNResultBuilder addEntry(
String dimName,
Object dimValIndex,
Object[] metricVals,
List<AggregatorFactory> aggFactories,
List<PostAggregator> postAggs
Object[] metricVals
)
{
Map<String, Object> metricValues = Maps.newLinkedHashMap();
......@@ -75,9 +75,6 @@ public class TopNLexicographicResultBuilder implements TopNResultBuilder
for (Object metricVal : metricVals) {
metricValues.put(aggsIter.next().getName(), metricVal);
}
for (PostAggregator postAgg : postAggs) {
metricValues.put(postAgg.getName(), postAgg.compute(metricValues));
}
pQueue.add(new DimValHolder.Builder().withDirName(dimName).withMetricValues(metricValues).build());
}
......
......@@ -24,6 +24,8 @@ import io.druid.query.Result;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import java.util.Comparator;
public class TopNMapFn implements Function<Cursor, Result<TopNResultValue>>
{
private final TopNQuery query;
......@@ -52,7 +54,7 @@ public class TopNMapFn implements Function<Cursor, Result<TopNResultValue>>
try {
params = topNAlgorithm.makeInitParams(dimSelector, cursor);
TopNResultBuilder resultBuilder = topNAlgorithm.makeResultBuilder(params);
TopNResultBuilder resultBuilder = BaseTopNAlgorithm.makeResultBuilder(params, query);
topNAlgorithm.run(params, resultBuilder, null);
......
......@@ -47,7 +47,9 @@ public interface TopNMetricSpec
DateTime timestamp,
DimensionSpec dimSpec,
int threshold,
Comparator comparator
Comparator comparator,
List<AggregatorFactory> aggFactories,
List<PostAggregator> postAggs
);
public byte[] getCacheKey();
......@@ -55,4 +57,6 @@ public interface TopNMetricSpec
public <T> TopNMetricSpecBuilder<T> configureOptimizer(TopNMetricSpecBuilder<T> builder);
public void initTopNAlgorithmSelector(TopNAlgorithmSelector selector);
public String getMetricName(DimensionSpec dimSpec);
}
......@@ -23,6 +23,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorUtil;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DimensionSpec;
import org.joda.time.DateTime;
......@@ -40,7 +41,8 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
private final DateTime timestamp;
private final DimensionSpec dimSpec;
private final String metricName;
private final List<AggregatorFactory> aggFactories;
private final List<PostAggregator> postAggs;
private MinMaxPriorityQueue<DimValHolder> pQueue = null;
public TopNNumericResultBuilder(
......@@ -48,12 +50,16 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
DimensionSpec dimSpec,
String metricName,
int threshold,
final Comparator comparator
final Comparator comparator,
List<AggregatorFactory> aggFactories,
List<PostAggregator> postAggs
)
{
this.timestamp = timestamp;
this.dimSpec = dimSpec;
this.metricName = metricName;
this.aggFactories = aggFactories;
this.postAggs = AggregatorUtil.pruneDependentPostAgg(postAggs, this.metricName);
instantiatePQueue(threshold, comparator);
}
......@@ -62,9 +68,7 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
public TopNResultBuilder addEntry(
String dimName,
Object dimValIndex,
Object[] metricVals,
List<AggregatorFactory> aggFactories,
List<PostAggregator> postAggs
Object[] metricVals
)
{
Map<String, Object> metricValues = Maps.newLinkedHashMap();
......@@ -75,6 +79,7 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
for (Object metricVal : metricVals) {
metricValues.put(aggFactoryIter.next().getName(), metricVal);
}
for (PostAggregator postAgg : postAggs) {
metricValues.put(postAgg.getName(), postAgg.compute(metricValues));
}
......
......@@ -64,11 +64,13 @@ import java.util.Map;
public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultValue>, TopNQuery>
{
private static final byte TOPN_QUERY = 0x1;
private static final Joiner COMMA_JOIN = Joiner.on(",");
private static final TypeReference<Result<TopNResultValue>> TYPE_REFERENCE = new TypeReference<Result<TopNResultValue>>(){};
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>(){};
private static final TypeReference<Result<TopNResultValue>> TYPE_REFERENCE = new TypeReference<Result<TopNResultValue>>()
{
};
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>()
{
};
private final TopNQueryConfig config;
@Inject
......@@ -161,7 +163,12 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
values.put(agg.getName(), fn.manipulate(agg, input.getMetric(agg.getName())));
}
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
values.put(postAgg.getName(), input.getMetric(postAgg.getName()));
Object calculatedPostAgg = input.getMetric(postAgg.getName());
if (calculatedPostAgg != null) {
values.put(postAgg.getName(), calculatedPostAgg);
} else {
values.put(postAgg.getName(), postAgg.compute(values));
}
}
values.put(dimension, input.getDimensionValue(dimension));
......@@ -281,10 +288,6 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
vals.put(factory.getName(), factory.deserialize(resultIter.next()));
}
for (PostAggregator postAgg : postAggs) {
vals.put(postAgg.getName(), postAgg.compute(vals));
}
retVal.add(vals);
}
......@@ -313,6 +316,11 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
return new ThresholdAdjustingQueryRunner(runner, config.getMinTopNThreshold());
}
public Ordering<Result<TopNResultValue>> getOrdering()
{
return Ordering.natural();
}
private static class ThresholdAdjustingQueryRunner implements QueryRunner<Result<TopNResultValue>>
{
private final QueryRunner<Result<TopNResultValue>> runner;
......@@ -397,9 +405,4 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
);
}
}
public Ordering<Result<TopNResultValue>> getOrdering()
{
return Ordering.natural();
}
}
......@@ -33,9 +33,7 @@ public interface TopNResultBuilder
public TopNResultBuilder addEntry(
String dimName,
Object dimValIndex,
Object[] metricVals,
List<AggregatorFactory> aggFactories,
List<PostAggregator> postAggs
Object[] metricVals
);
public TopNResultBuilder addEntry(
......
......@@ -60,6 +60,7 @@ public class QueryRunnerTestHelper
public static final String indexMetric = "index";
public static final String uniqueMetric = "uniques";
public static final String addRowsIndexConstantMetric = "addRowsIndexConstant";
public static String dependentPostAggMetric = "dependentPostAgg";
public static final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows");
public static final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index");
public static final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", "index");
......@@ -72,8 +73,20 @@ public class QueryRunnerTestHelper
public static final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index");
public static final ArithmeticPostAggregator addRowsIndexConstant =
new ArithmeticPostAggregator(
"addRowsIndexConstant", "+", Lists.newArrayList(constant, rowsPostAgg, indexPostAgg)
addRowsIndexConstantMetric, "+", Lists.newArrayList(constant, rowsPostAgg, indexPostAgg)
);
// dependent on AddRowsIndexContact postAgg
public static final ArithmeticPostAggregator dependentPostAgg = new ArithmeticPostAggregator(
dependentPostAggMetric,
"+",
Lists.newArrayList(
constant,
new FieldAccessPostAggregator(addRowsIndexConstantMetric, addRowsIndexConstantMetric),
new FieldAccessPostAggregator("rows", "rows")
)
);
public static final List<AggregatorFactory> commonAggregators = Arrays.asList(
rowsCount,
indexDoubleSum,
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.Pair;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.ConstantPostAggregator;
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static io.druid.query.QueryRunnerTestHelper.dependentPostAggMetric;
public class AggregatorUtilTest
{
@Test
public void testPruneDependentPostAgg()
{
PostAggregator agg1 = new ArithmeticPostAggregator(
"abc", "+", Lists.<PostAggregator>newArrayList(
new ConstantPostAggregator("1", 1L, 1L), new ConstantPostAggregator("2", 2L, 2L)
)
);
PostAggregator dependency1 = new ArithmeticPostAggregator(
"dep1", "+", Lists.<PostAggregator>newArrayList(
new ConstantPostAggregator("1", 1L, 1L), new ConstantPostAggregator("4", 4L, 4L)
)
);
PostAggregator agg2 = new FieldAccessPostAggregator("def", "def");
PostAggregator dependency2 = new FieldAccessPostAggregator("dep2", "dep2");
PostAggregator aggregator = new ArithmeticPostAggregator(
"finalAgg",
"+",
Lists.<PostAggregator>newArrayList(
new FieldAccessPostAggregator("dep1", "dep1"),
new FieldAccessPostAggregator("dep2", "dep2")
)
);
List<PostAggregator> prunedAgg = AggregatorUtil.pruneDependentPostAgg(
Lists.newArrayList(
agg1,
dependency1,
agg2,
dependency2,
aggregator
), aggregator.getName()
);
Assert.assertEquals(Lists.newArrayList(dependency1, dependency2, aggregator), prunedAgg);
}
@Test
public void testOutOfOrderPruneDependentPostAgg()
{
PostAggregator agg1 = new ArithmeticPostAggregator(
"abc", "+", Lists.<PostAggregator>newArrayList(
new ConstantPostAggregator("1", 1L, 1L), new ConstantPostAggregator("2", 2L, 2L)
)
);
PostAggregator dependency1 = new ArithmeticPostAggregator(
"dep1", "+", Lists.<PostAggregator>newArrayList(
new ConstantPostAggregator("1", 1L, 1L), new ConstantPostAggregator("4", 4L, 4L)
)
);
PostAggregator agg2 = new FieldAccessPostAggregator("def", "def");
PostAggregator dependency2 = new FieldAccessPostAggregator("dep2", "dep2");
PostAggregator aggregator = new ArithmeticPostAggregator(
"finalAgg",
"+",
Lists.<PostAggregator>newArrayList(
new FieldAccessPostAggregator("dep1", "dep1"),
new FieldAccessPostAggregator("dep2", "dep2")
)
);
List<PostAggregator> prunedAgg = AggregatorUtil.pruneDependentPostAgg(
Lists.newArrayList(
agg1,
dependency1,
aggregator, // dependency is added later than the aggregator
agg2,
dependency2
), aggregator.getName()
);
Assert.assertEquals(Lists.newArrayList(dependency1, aggregator), prunedAgg);
}
@Test
public void testCondenseAggregators()
{
ArrayList<AggregatorFactory> aggregatorFactories = Lists.<AggregatorFactory>newArrayList(
Iterables.concat(
QueryRunnerTestHelper.commonAggregators,
Lists.newArrayList(
new MaxAggregatorFactory("maxIndex", "index"),
new MinAggregatorFactory("minIndex", "index")
)
)
);
List<PostAggregator> postAggregatorList = Arrays.<PostAggregator>asList(
QueryRunnerTestHelper.addRowsIndexConstant,
QueryRunnerTestHelper.dependentPostAgg
);
Pair<List<AggregatorFactory>, List<PostAggregator>> aggregatorsPair = AggregatorUtil.condensedAggregators(
aggregatorFactories,
postAggregatorList,
dependentPostAggMetric
);
// verify aggregators
Assert.assertEquals(
Lists.newArrayList(QueryRunnerTestHelper.rowsCount, QueryRunnerTestHelper.indexDoubleSum),
aggregatorsPair.lhs
);
Assert.assertEquals(
Lists.newArrayList(
QueryRunnerTestHelper.addRowsIndexConstant,
QueryRunnerTestHelper.dependentPostAgg
), aggregatorsPair.rhs
);
}
}
......@@ -20,16 +20,11 @@
package io.druid.query.timeseries;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.ConstantPostAggregator;
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.junit.Test;
......@@ -43,21 +38,10 @@ public class TimeseriesBinaryFnTest
{
final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows");
final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index");
final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null);
final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows");
final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index");
final ArithmeticPostAggregator addRowsIndexConstant = new ArithmeticPostAggregator(
"addRowsIndexConstant",
"+",
Lists.newArrayList(constant, rowsPostAgg, indexPostAgg)
);
final List<AggregatorFactory> aggregatorFactories = Arrays.asList(
rowsCount,
indexLongSum
);
final List<PostAggregator> postAggregators = Arrays.<PostAggregator>asList(
addRowsIndexConstant
);
final DateTime currTime = new DateTime();
@Test
......@@ -87,16 +71,14 @@ public class TimeseriesBinaryFnTest
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"rows", 3L,
"index", 5L,
"addRowsIndexConstant", 9.0
"index", 5L
)
)
);
Result<TimeseriesResultValue> actual = new TimeseriesBinaryFn(
QueryGranularity.ALL,
aggregatorFactories,
postAggregators
aggregatorFactories
).apply(
result1,
result2
......@@ -131,16 +113,14 @@ public class TimeseriesBinaryFnTest
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"rows", 3L,
"index", 5L,
"addRowsIndexConstant", 9.0
"index", 5L
)
)
);
Result<TimeseriesResultValue> actual = new TimeseriesBinaryFn(
QueryGranularity.DAY,
aggregatorFactories,
postAggregators
aggregatorFactories
).apply(
result1,
result2
......@@ -166,8 +146,7 @@ public class TimeseriesBinaryFnTest
Result<TimeseriesResultValue> actual = new TimeseriesBinaryFn(
QueryGranularity.ALL,
aggregatorFactories,
postAggregators
aggregatorFactories
).apply(
result1,
result2
......@@ -202,16 +181,14 @@ public class TimeseriesBinaryFnTest
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"rows", 3L,
"index", 5L,
"addRowsIndexConstant", 9.0
"index", 5L
)
)
);
Result<TimeseriesResultValue> actual = new TimeseriesBinaryFn(
QueryGranularity.ALL,
aggregatorFactories,
postAggregators
aggregatorFactories
).apply(
result1,
result2
......
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import com.google.caliper.Param;
import com.google.caliper.Runner;
import com.google.caliper.SimpleBenchmark;
import com.google.common.collect.Lists;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.ConstantPostAggregator;
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
import io.druid.query.dimension.DefaultDimensionSpec;
import org.joda.time.DateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class TopNBinaryFnBenchmark extends SimpleBenchmark
{
@Param({"1", "5", "10", "15"})
int aggCount;
@Param({"1", "5", "10", "15"})
int postAggCount;
@Param({"1000", "10000"})
int threshold;
Result<TopNResultValue> result1;
Result<TopNResultValue> result2;
TopNBinaryFn fn;
public static void main(String[] args) throws Exception
{
Runner.main(TopNBinaryFnBenchmark.class, args);
}
@Override
protected void setUp() throws Exception
{
final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L, null);
final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows");
final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index");
final List<AggregatorFactory> aggregatorFactories = new ArrayList<>();
aggregatorFactories.add(new CountAggregatorFactory("rows"));
aggregatorFactories.add(new LongSumAggregatorFactory("index", "index"));
for (int i = 1; i < aggCount; i++) {
aggregatorFactories.add(new CountAggregatorFactory("rows" + i));
}
final List<PostAggregator> postAggregators = new ArrayList<>();
for (int i = 0; i < postAggCount; i++) {
postAggregators.add(
new ArithmeticPostAggregator(
"addrowsindexconstant" + i,
"+",
Lists.newArrayList(constant, rowsPostAgg, indexPostAgg)
)
);
}
final DateTime currTime = new DateTime();
List<Map<String, Object>> list = new ArrayList<>();
for (int i = 0; i < threshold; i++) {
Map<String, Object> res = new HashMap<>();
res.put("testdim", "" + i);
res.put("rows", 1L);
for (int j = 0; j < aggCount; j++) {
res.put("rows" + j, 1L);
}
res.put("index", 1L);
list.add(res);
}
result1 = new Result<>(
currTime,
new TopNResultValue(list)
);
List<Map<String, Object>> list2 = new ArrayList<>();
for (int i = 0; i < threshold; i++) {
Map<String, Object> res = new HashMap<>();
res.put("testdim", "" + i);
res.put("rows", 2L);
for (int j = 0; j < aggCount; j++) {
res.put("rows" + j, 2L);
}
res.put("index", 2L);
list2.add(res);
}
result2 = new Result<>(
currTime,
new TopNResultValue(list2)
);
fn = new TopNBinaryFn(
TopNResultMerger.identity,
QueryGranularity.ALL,
new DefaultDimensionSpec("testdim", null),
new NumericTopNMetricSpec("index"),
100,
aggregatorFactories,
postAggregators
);
}
public void timeMerge(int nReps)
{
for (int i = 0; i < nReps; i++) {
fn.apply(result1, result2);
}
}
}
......@@ -129,15 +129,13 @@ public class TopNBinaryFnTest
ImmutableMap.<String, Object>of(
"testdim", "1",
"rows", 3L,
"index", 5L,
"addrowsindexconstant", 9.0
"index", 5L
),
ImmutableMap.<String, Object>of(
"testdim", "2",
"rows", 4L,
"index", 4L,
"addrowsindexconstant", 9.0
"index", 4L
)
)
)
......@@ -214,14 +212,12 @@ public class TopNBinaryFnTest
ImmutableMap.<String, Object>of(
"testdim", "1",
"rows", 3L,
"index", 5L,
"addrowsindexconstant", 9.0
"index", 5L
),
ImmutableMap.<String, Object>of(
"testdim", "2",
"rows", 4L,
"index", 4L,
"addrowsindexconstant", 9.0
"index", 4L
)
)
)
......@@ -427,15 +423,12 @@ public class TopNBinaryFnTest
ImmutableMap.<String, Object>of(
"testdim", "1",
"rows", 3L,
"index", 5L,
"addrowsindexconstant", 9.0
"index", 5L
),
ImmutableMap.<String, Object>of(
"testdim", "2",
"rows", 4L,
"index", 4L,
"addrowsindexconstant", 9.0
)
"index", 4L )
)
)
);
......
......@@ -1176,4 +1176,75 @@ public class TopNQueryRunnerTest
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
}
@Test
public void testTopNDependentPostAgg() {
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.dimension(providerDimension)
.metric(QueryRunnerTestHelper.dependentPostAggMetric)
.threshold(4)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.aggregators(
Lists.<AggregatorFactory>newArrayList(
Iterables.concat(
QueryRunnerTestHelper.commonAggregators,
Lists.newArrayList(
new MaxAggregatorFactory("maxIndex", "index"),
new MinAggregatorFactory("minIndex", "index")
)
)
)
)
.postAggregators(
Arrays.<PostAggregator>asList(
QueryRunnerTestHelper.addRowsIndexConstant,
QueryRunnerTestHelper.dependentPostAgg
)
)
.build();
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
new Result<TopNResultValue>(
new DateTime("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put(providerDimension, "total_market")
.put("rows", 186L)
.put("index", 215679.82879638672D)
.put("addRowsIndexConstant", 215866.82879638672D)
.put(QueryRunnerTestHelper.dependentPostAggMetric, 216053.82879638672D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1743.9217529296875D)
.put("minIndex", 792.3260498046875D)
.build(),
ImmutableMap.<String, Object>builder()
.put(providerDimension, "upfront")
.put("rows", 186L)
.put("index", 192046.1060180664D)
.put("addRowsIndexConstant", 192233.1060180664D)
.put(QueryRunnerTestHelper.dependentPostAggMetric, 192420.1060180664D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1870.06103515625D)
.put("minIndex", 545.9906005859375D)
.build(),
ImmutableMap.<String, Object>builder()
.put(providerDimension, "spot")
.put("rows", 837L)
.put("index", 95606.57232284546D)
.put("addRowsIndexConstant", 96444.57232284546D)
.put(QueryRunnerTestHelper.dependentPostAggMetric, 97282.57232284546D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_9)
.put("maxIndex", 277.2735290527344D)
.put("minIndex", 59.02102279663086D)
.build()
)
)
)
);
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
}
}
......@@ -9,7 +9,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.90-SNAPSHOT</version>
<version>0.6.91-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.90-SNAPSHOT</version>
<version>0.6.91-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.90-SNAPSHOT</version>
<version>0.6.91-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -28,8 +28,9 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.HttpResponseHandler;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Client;
import io.druid.query.Query;
import io.druid.server.router.Router;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import javax.inject.Inject;
......@@ -52,7 +53,7 @@ public class RoutingDruidClient<IntermediateType, FinalType>
@Inject
public RoutingDruidClient(
ObjectMapper objectMapper,
@Global HttpClient httpClient
@Router HttpClient httpClient
)
{
this.objectMapper = objectMapper;
......@@ -67,7 +68,7 @@ public class RoutingDruidClient<IntermediateType, FinalType>
return openConnections.get();
}
public ListenableFuture<FinalType> run(
public ListenableFuture<FinalType> post(
String url,
Query query,
HttpResponseHandler<IntermediateType, FinalType> responseHandler
......@@ -109,4 +110,19 @@ public class RoutingDruidClient<IntermediateType, FinalType>
return future;
}
public ListenableFuture<FinalType> get(
String url,
HttpResponseHandler<IntermediateType, FinalType> responseHandler
)
{
try {
return httpClient
.get(new URL(url))
.go(responseHandler);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
}
......@@ -29,7 +29,7 @@ public class LocalCacheProvider implements CacheProvider
{
@JsonProperty
@Min(0)
private long sizeInBytes = 10485760;
private long sizeInBytes = 0;
@JsonProperty
@Min(0)
......
......@@ -68,7 +68,6 @@ import java.util.concurrent.ScheduledExecutorService;
public class RealtimePlumber implements Plumber
{
private static final EmittingLogger log = new EmittingLogger(RealtimePlumber.class);
private final Period windowPeriod;
private final File basePersistDirectory;
private final IndexGranularity segmentGranularity;
......@@ -84,16 +83,15 @@ public class RealtimePlumber implements Plumber
private final SegmentPublisher segmentPublisher;
private final ServerView serverView;
private final int maxPendingPersists;
private final Object handoffCondition = new Object();
private final Map<Long, Sink> sinks = Maps.newConcurrentMap();
private final VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline<String, Sink>(
String.CASE_INSENSITIVE_ORDER
);
private volatile boolean shuttingDown = false;
private volatile boolean stopped = false;
private volatile ExecutorService persistExecutor = null;
private volatile ExecutorService mergeExecutor = null;
private volatile ScheduledExecutorService scheduledExecutor = null;
public RealtimePlumber(
......@@ -300,13 +298,13 @@ public class RealtimePlumber implements Plumber
);
}
// Submits persist-n-merge task for a Sink to the persistExecutor
// Submits persist-n-merge task for a Sink to the mergeExecutor
private void persistAndMerge(final long truncatedTime, final Sink sink)
{
final String threadName = String.format(
"%s-%s-persist-n-merge", schema.getDataSource(), new DateTime(truncatedTime)
);
persistExecutor.execute(
mergeExecutor.execute(
new ThreadRenamingRunnable(threadName)
{
@Override
......@@ -431,6 +429,13 @@ public class RealtimePlumber implements Plumber
"plumber_persist_%d", maxPendingPersists
);
}
if (mergeExecutor == null) {
// use a blocking single threaded executor to throttle the firehose when write to disk is slow
mergeExecutor = Execs.newBlockingSingleThreaded(
"plumber_merge_%d", 1
);
}
if (scheduledExecutor == null) {
scheduledExecutor = Executors.newScheduledThreadPool(
1,
......@@ -592,7 +597,11 @@ public class RealtimePlumber implements Plumber
log.info("Adding entry[%s] for merge and push.", entry);
sinksToPush.add(entry);
} else {
log.warn("[%s] < [%s] Skipping persist and merge.", new DateTime(intervalStart), minTimestampAsDate);
log.warn(
"[%s] < [%s] Skipping persist and merge.",
new DateTime(intervalStart),
minTimestampAsDate
);
}
}
......@@ -660,39 +669,46 @@ public class RealtimePlumber implements Plumber
*/
protected int persistHydrant(FireHydrant indexToPersist, Schema schema, Interval interval)
{
if (indexToPersist.hasSwapped()) {
synchronized (indexToPersist) {
if (indexToPersist.hasSwapped()) {
log.info(
"DataSource[%s], Interval[%s], Hydrant[%s] already swapped. Ignoring request to persist.",
schema.getDataSource(), interval, indexToPersist
);
return 0;
}
log.info(
"DataSource[%s], Interval[%s], Hydrant[%s] already swapped. Ignoring request to persist.",
schema.getDataSource(), interval, indexToPersist
"DataSource[%s], Interval[%s], persisting Hydrant[%s]",
schema.getDataSource(),
interval,
indexToPersist
);
return 0;
}
log.info("DataSource[%s], Interval[%s], persisting Hydrant[%s]", schema.getDataSource(), interval, indexToPersist);
try {
int numRows = indexToPersist.getIndex().size();
try {
int numRows = indexToPersist.getIndex().size();
File persistedFile = IndexMerger.persist(
indexToPersist.getIndex(),
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount()))
);
File persistedFile = IndexMerger.persist(
indexToPersist.getIndex(),
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount()))
);
indexToPersist.swapSegment(
new QueryableIndexSegment(
indexToPersist.getSegment().getIdentifier(),
IndexIO.loadIndex(persistedFile)
)
);
indexToPersist.swapSegment(
new QueryableIndexSegment(
indexToPersist.getSegment().getIdentifier(),
IndexIO.loadIndex(persistedFile)
)
);
return numRows;
}
catch (IOException e) {
log.makeAlert("dataSource[%s] -- incremental persist failed", schema.getDataSource())
.addData("interval", interval)
.addData("count", indexToPersist.getCount())
.emit();
return numRows;
}
catch (IOException e) {
log.makeAlert("dataSource[%s] -- incremental persist failed", schema.getDataSource())
.addData("interval", interval)
.addData("count", indexToPersist.getCount())
.emit();
throw Throwables.propagate(e);
throw Throwables.propagate(e);
}
}
}
......
......@@ -85,6 +85,116 @@ public class AsyncQueryForwardingServlet extends HttpServlet
this.requestLogger = requestLogger;
}
@Override
protected void doGet(final HttpServletRequest req, final HttpServletResponse resp)
throws ServletException, IOException
{
OutputStream out = null;
AsyncContext ctx = null;
try {
ctx = req.startAsync(req, resp);
final AsyncContext asyncContext = ctx;
if (req.getAttribute(DISPATCHED) != null) {
return;
}
out = resp.getOutputStream();
final OutputStream outputStream = out;
final String host = hostFinder.getDefaultHost();
final HttpResponseHandler<OutputStream, OutputStream> responseHandler = new HttpResponseHandler<OutputStream, OutputStream>()
{
@Override
public ClientResponse<OutputStream> handleResponse(HttpResponse response)
{
resp.setStatus(response.getStatus().getCode());
resp.setContentType("application/json");
try {
ChannelBuffer buf = response.getContent();
buf.readBytes(outputStream, buf.readableBytes());
}
catch (Exception e) {
asyncContext.complete();
throw Throwables.propagate(e);
}
return ClientResponse.finished(outputStream);
}
@Override
public ClientResponse<OutputStream> handleChunk(
ClientResponse<OutputStream> clientResponse, HttpChunk chunk
)
{
try {
ChannelBuffer buf = chunk.getContent();
buf.readBytes(outputStream, buf.readableBytes());
}
catch (Exception e) {
asyncContext.complete();
throw Throwables.propagate(e);
}
return clientResponse;
}
@Override
public ClientResponse<OutputStream> done(ClientResponse<OutputStream> clientResponse)
{
final OutputStream obj = clientResponse.getObj();
try {
resp.flushBuffer();
outputStream.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
asyncContext.complete();
}
return ClientResponse.finished(obj);
}
};
asyncContext.start(
new Runnable()
{
@Override
public void run()
{
routingDruidClient.get(makeUrl(host, req), responseHandler);
}
}
);
asyncContext.dispatch();
req.setAttribute(DISPATCHED, true);
}
catch (Exception e) {
if (!resp.isCommitted()) {
resp.setStatus(500);
resp.resetBuffer();
if (out == null) {
out = resp.getOutputStream();
}
if (ctx != null) {
ctx.complete();
}
out.write((e.getMessage() == null) ? "Exception null".getBytes(UTF8) : e.getMessage().getBytes(UTF8));
out.write("\n".getBytes(UTF8));
}
resp.flushBuffer();
}
}
@Override
protected void doPost(
final HttpServletRequest req, final HttpServletResponse resp
......@@ -99,16 +209,16 @@ public class AsyncQueryForwardingServlet extends HttpServlet
final ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper;
OutputStream out = null;
AsyncContext ctx = null;
try {
final AsyncContext ctx = req.startAsync(req, resp);
ctx = req.startAsync(req, resp);
final AsyncContext asyncContext = ctx;
if (req.getAttribute(DISPATCHED) != null) {
return;
}
req.setAttribute(DISPATCHED, true);
query = objectMapper.readValue(req.getInputStream(), Query.class);
queryId = query.getId();
if (queryId == null) {
......@@ -136,14 +246,13 @@ public class AsyncQueryForwardingServlet extends HttpServlet
resp.setStatus(response.getStatus().getCode());
resp.setContentType("application/x-javascript");
byte[] bytes = getContentBytes(response.getContent());
if (bytes.length > 0) {
try {
outputStream.write(bytes);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
try {
ChannelBuffer buf = response.getContent();
buf.readBytes(outputStream, buf.readableBytes());
}
catch (Exception e) {
asyncContext.complete();
throw Throwables.propagate(e);
}
return ClientResponse.finished(outputStream);
}
......@@ -153,14 +262,13 @@ public class AsyncQueryForwardingServlet extends HttpServlet
ClientResponse<OutputStream> clientResponse, HttpChunk chunk
)
{
byte[] bytes = getContentBytes(chunk.getContent());
if (bytes.length > 0) {
try {
clientResponse.getObj().write(bytes);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
try {
ChannelBuffer buf = chunk.getContent();
buf.readBytes(outputStream, buf.readableBytes());
}
catch (Exception e) {
asyncContext.complete();
throw Throwables.propagate(e);
}
return clientResponse;
}
......@@ -202,30 +310,26 @@ public class AsyncQueryForwardingServlet extends HttpServlet
throw Throwables.propagate(e);
}
finally {
ctx.dispatch();
asyncContext.complete();
}
return ClientResponse.finished(obj);
}
private byte[] getContentBytes(ChannelBuffer content)
{
byte[] contentBytes = new byte[content.readableBytes()];
content.readBytes(contentBytes);
return contentBytes;
}
};
ctx.start(
asyncContext.start(
new Runnable()
{
@Override
public void run()
{
routingDruidClient.run(makeUrl(host, req), theQuery, responseHandler);
routingDruidClient.post(makeUrl(host, req), theQuery, responseHandler);
}
}
);
asyncContext.dispatch();
req.setAttribute(DISPATCHED, true);
}
catch (Exception e) {
if (!resp.isCommitted()) {
......@@ -242,6 +346,10 @@ public class AsyncQueryForwardingServlet extends HttpServlet
resp.flushBuffer();
if (ctx != null) {
ctx.complete();
}
try {
requestLogger.log(
new RequestLogLine(
......
......@@ -20,6 +20,7 @@
package io.druid.server.router;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.emitter.EmittingLogger;
import io.druid.client.selector.Server;
......@@ -49,11 +50,57 @@ public class QueryHostFinder<T>
public Server findServer(Query<T> query)
{
final Pair<String, ServerDiscoverySelector> selected = hostSelector.select(query);
return findServerInner(selected);
}
public Server findDefaultServer()
{
final Pair<String, ServerDiscoverySelector> selected = hostSelector.getDefaultLookup();
return findServerInner(selected);
}
public String getHost(Query<T> query)
{
Server server = findServer(query);
if (server == null) {
log.makeAlert(
"Catastrophic failure! No servers found at all! Failing request!"
).emit();
final String serviceName = selected.lhs;
final ServerDiscoverySelector selector = selected.rhs;
throw new ISE("No server found for query[%s]", query);
}
log.debug("Selected [%s]", server.getHost());
return server.getHost();
}
public String getDefaultHost()
{
Server server = findDefaultServer();
Server server = selector.pick();
if (server == null) {
log.makeAlert(
"Catastrophic failure! No servers found at all! Failing request!"
).emit();
throw new ISE("No default server found!");
}
return server.getHost();
}
private Server findServerInner(final Pair<String, ServerDiscoverySelector> selected)
{
if (selected == null) {
log.error("Danger, Will Robinson! Unable to find any brokers!");
}
final String serviceName = selected == null ? hostSelector.getDefaultServiceName() : selected.lhs;
final ServerDiscoverySelector selector = selected == null ? null : selected.rhs;
Server server = selector == null ? null : selector.pick();
if (server == null) {
log.error(
"WTF?! No server found for serviceName[%s]. Using backup",
......@@ -78,21 +125,4 @@ public class QueryHostFinder<T>
return server;
}
public String getHost(Query<T> query)
{
Server server = findServer(query);
if (server == null) {
log.makeAlert(
"Catastrophic failure! No servers found at all! Failing request!"
).emit();
return null;
}
log.debug("Selected [%s]", server.getHost());
return server.getHost();
}
}
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.router;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
*/
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@BindingAnnotation
public @interface Router
{
}
......@@ -122,7 +122,7 @@ public class TieredBrokerHostSelector<T> implements HostSelector<T>
{
synchronized (lock) {
if (!ruleManager.isStarted() || !started) {
return null;
return getDefaultLookup();
}
}
......@@ -157,7 +157,7 @@ public class TieredBrokerHostSelector<T> implements HostSelector<T>
}
if (baseRule == null) {
return null;
return getDefaultLookup();
}
// in the baseRule, find the broker of highest priority
......@@ -192,4 +192,11 @@ public class TieredBrokerHostSelector<T> implements HostSelector<T>
return new Pair<>(brokerServiceName, retVal);
}
public Pair<String, ServerDiscoverySelector> getDefaultLookup()
{
final String brokerServiceName = tierConfig.getDefaultBrokerServiceName();
final ServerDiscoverySelector retVal = selectorMap.get(brokerServiceName);
return new Pair<>(brokerServiceName, retVal);
}
}
......@@ -47,6 +47,7 @@ import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.BySegmentResultValueClass;
import io.druid.query.DataSource;
import io.druid.query.Druids;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.MapQueryToolChestWarehouse;
import io.druid.query.Query;
import io.druid.query.QueryConfig;
......@@ -115,17 +116,21 @@ import java.util.concurrent.Executor;
@RunWith(Parameterized.class)
public class CachingClusteredClientTest
{
public static final ImmutableMap<String, Object> CONTEXT = ImmutableMap.of();
public static final MultipleIntervalSegmentSpec SEG_SPEC = new MultipleIntervalSegmentSpec(ImmutableList.<Interval>of());
public static final String DATA_SOURCE = "test";
protected static final DefaultObjectMapper jsonMapper = new DefaultObjectMapper(new SmileFactory());
static {
jsonMapper.getFactory().setCodec(jsonMapper);
}
/**
* We want a deterministic test, but we'd also like a bit of randomness for the distribution of segments
* across servers. Thus, we loop multiple times and each time use a deterministically created Random instance.
* Increase this value to increase exposure to random situations at the expense of test run time.
*/
private static final int RANDOMNESS = 10;
public static final ImmutableMap<String, Object> CONTEXT = ImmutableMap.of();
public static final MultipleIntervalSegmentSpec SEG_SPEC = new MultipleIntervalSegmentSpec(ImmutableList.<Interval>of());
public static final String DATA_SOURCE = "test";
private static final List<AggregatorFactory> AGGS = Arrays.asList(
new CountAggregatorFactory("rows"),
new LongSumAggregatorFactory("imps", "imps"),
......@@ -152,6 +157,17 @@ public class CachingClusteredClientTest
private static final DateTimeZone TIMEZONE = DateTimeZone.forID("America/Los_Angeles");
private static final QueryGranularity PT1H_TZ_GRANULARITY = new PeriodGranularity(new Period("PT1H"), null, TIMEZONE);
private static final String TOP_DIM = "a_dim";
private final Random random;
protected VersionedIntervalTimeline<String, ServerSelector> timeline;
protected TimelineServerView serverView;
protected Cache cache;
public CachingClusteredClient client;
DruidServer[] servers;
public CachingClusteredClientTest(int randomSeed)
{
this.random = new Random(randomSeed);
}
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
......@@ -169,28 +185,6 @@ public class CachingClusteredClientTest
);
}
protected static final DefaultObjectMapper jsonMapper = new DefaultObjectMapper(new SmileFactory());
static {
jsonMapper.getFactory().setCodec(jsonMapper);
}
private final Random random;
protected VersionedIntervalTimeline<String, ServerSelector> timeline;
protected TimelineServerView serverView;
protected Cache cache;
CachingClusteredClient client;
DruidServer[] servers;
public CachingClusteredClientTest(int randomSeed)
{
this.random = new Random(randomSeed);
}
@Before
public void setUp() throws Exception
{
......@@ -222,7 +216,10 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig()));
testQueryCaching(
runner,
builder.build(),
new Interval("2011-01-01/2011-01-02"), makeTimeResults(new DateTime("2011-01-01"), 50, 5000),
new Interval("2011-01-02/2011-01-03"), makeTimeResults(new DateTime("2011-01-02"), 30, 6000),
......@@ -263,7 +260,7 @@ public class CachingClusteredClientTest
new DateTime("2011-01-09"), 18, 521,
new DateTime("2011-01-09T01"), 181, 52
),
client.run(
runner.run(
builder.intervals("2011-01-01/2011-01-10")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
......@@ -285,7 +282,10 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TimeseriesQueryQueryToolChest(new QueryConfig()));
testQueryCaching(
runner,
builder.build(),
new Interval("2011-11-04/2011-11-08"),
makeTimeResults(
......@@ -303,7 +303,7 @@ public class CachingClusteredClientTest
new DateTime("2011-11-06", TIMEZONE), 23, 85312,
new DateTime("2011-11-07", TIMEZONE), 85, 102
),
client.run(
runner.run(
builder.intervals("2011-11-04/2011-11-08")
.aggregators(RENAMED_AGGS)
.postAggregators(RENAMED_POST_AGGS)
......@@ -324,6 +324,7 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS);
testQueryCaching(
client,
1,
true,
builder.context(
......@@ -342,6 +343,7 @@ public class CachingClusteredClientTest
cache.close("0_0");
testQueryCaching(
client,
1,
false,
builder.context(
......@@ -358,6 +360,7 @@ public class CachingClusteredClientTest
Assert.assertEquals(0, cache.getStats().getNumMisses());
testQueryCaching(
client,
1,
false,
builder.context(
......@@ -390,7 +393,10 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig()));
testQueryCaching(
runner,
builder.build(),
new Interval("2011-01-01/2011-01-02"),
makeTopNResults(new DateTime("2011-01-01"), "a", 50, 5000, "b", 50, 4999, "c", 50, 4998),
......@@ -432,7 +438,7 @@ public class CachingClusteredClientTest
new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983,
new DateTime("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
),
client.run(
runner.run(
builder.intervals("2011-01-01/2011-01-10")
.metric("imps")
.aggregators(RENAMED_AGGS)
......@@ -458,7 +464,10 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig()));
testQueryCaching(
runner,
builder.build(),
new Interval("2011-11-04/2011-11-08"),
makeTopNResults(
......@@ -477,7 +486,7 @@ public class CachingClusteredClientTest
new DateTime("2011-11-06", TIMEZONE), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
new DateTime("2011-11-07", TIMEZONE), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986
),
client.run(
runner.run(
builder.intervals("2011-11-04/2011-11-08")
.metric("imps")
.aggregators(RENAMED_AGGS)
......@@ -503,7 +512,9 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(client, new TopNQueryQueryToolChest(new TopNQueryConfig()));
testQueryCaching(
runner,
builder.build(),
new Interval("2011-01-01/2011-01-02"),
makeTopNResults(),
......@@ -530,6 +541,7 @@ public class CachingClusteredClientTest
)
);
TestHelper.assertExpectedResults(
makeRenamedTopNResults(
new DateTime("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992,
......@@ -543,7 +555,7 @@ public class CachingClusteredClientTest
new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983,
new DateTime("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
),
client.run(
runner.run(
builder.intervals("2011-01-01/2011-01-10")
.metric("imps")
.aggregators(RENAMED_AGGS)
......@@ -557,6 +569,7 @@ public class CachingClusteredClientTest
public void testSearchCaching() throws Exception
{
testQueryCaching(
client,
new SearchQuery(
new TableDataSource(DATA_SOURCE),
DIM_FILTER,
......@@ -594,13 +607,14 @@ public class CachingClusteredClientTest
);
}
public void testQueryCaching(final Query query, Object... args)
public void testQueryCaching(QueryRunner runner, final Query query, Object... args)
{
testQueryCaching(3, true, query, args);
testQueryCaching(runner, 3, true, query, args);
}
@SuppressWarnings("unchecked")
public void testQueryCaching(
final QueryRunner runner,
final int numTimesToQuery,
boolean expectBySegment,
final Query query, Object... args // does this assume query intervals must be ordered?
......@@ -754,7 +768,7 @@ public class CachingClusteredClientTest
}
)
),
client.run(
runner.run(
query.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(
Arrays.asList(
......@@ -774,7 +788,7 @@ public class CachingClusteredClientTest
for (Capture queryCapture : queryCaptures) {
Query capturedQuery = (Query) queryCapture.getValue();
if (expectBySegment) {
Assert.assertEquals(true, capturedQuery.<Boolean>getContextValue("bySegment"));
Assert.assertEquals(true, capturedQuery.getContextValue("bySegment"));
} else {
Assert.assertTrue(
capturedQuery.getContextValue("bySegment") == null ||
......@@ -1253,6 +1267,8 @@ public class CachingClusteredClientTest
private class MyDataSegment extends DataSegment
{
private final DataSegment baseSegment = segment;
private MyDataSegment()
{
super(
......@@ -1268,8 +1284,6 @@ public class CachingClusteredClientTest
);
}
private final DataSegment baseSegment = segment;
@Override
@JsonProperty
public String getDataSource()
......@@ -1370,7 +1384,6 @@ public class CachingClusteredClientTest
{
private final DruidServer server;
private final QueryRunner queryRunner;
private final List<ServerExpectation> expectations = Lists.newArrayList();
public ServerExpectations(
......
......@@ -142,19 +142,18 @@ public class TieredBrokerHostSelectorTest
@Test
public void testSelectMatchesNothing() throws Exception
{
Pair retVal = brokerSelector.select(
String brokerName = (String) brokerSelector.select(
Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.granularity("all")
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("rows")))
.intervals(Arrays.<Interval>asList(new Interval("2010-08-31/2010-09-01")))
.build()
);
).lhs;
Assert.assertEquals(null, retVal);
Assert.assertEquals("hotBroker", brokerName);
}
@Test
public void testSelectMultiInterval() throws Exception
{
......
......@@ -27,7 +27,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.90-SNAPSHOT</version>
<version>0.6.91-SNAPSHOT</version>
</parent>
<dependencies>
......
......@@ -55,7 +55,7 @@ import java.util.List;
*/
@Command(
name = "broker",
description = "Runs a broker node, see http://druid.io/docs/0.6.86/Broker.html for a description"
description = "Runs a broker node, see http://druid.io/docs/latest/Broker.html for a description"
)
public class CliBroker extends ServerRunnable
{
......
......@@ -66,7 +66,7 @@ import java.util.List;
*/
@Command(
name = "coordinator",
description = "Runs the Coordinator, see http://druid.io/docs/0.6.86/Coordinator.html for a description."
description = "Runs the Coordinator, see http://druid.io/docs/latest/Coordinator.html for a description."
)
public class CliCoordinator extends ServerRunnable
{
......
......@@ -41,7 +41,7 @@ import java.util.List;
*/
@Command(
name = "hadoop",
description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/0.6.86/Batch-ingestion.html for a description."
description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/latest/Batch-ingestion.html for a description."
)
public class CliHadoopIndexer implements Runnable
{
......
......@@ -26,6 +26,7 @@ import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.CacheMonitor;
import io.druid.client.cache.CacheProvider;
import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider;
......@@ -38,6 +39,7 @@ import io.druid.server.QueryResource;
import io.druid.server.coordination.ServerManager;
import io.druid.server.coordination.ZkCoordinator;
import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.metrics.MetricsModule;
import org.eclipse.jetty.server.Server;
import java.util.List;
......@@ -46,7 +48,7 @@ import java.util.List;
*/
@Command(
name = "historical",
description = "Runs a Historical node, see http://druid.io/docs/0.6.86/Historical.html for a description"
description = "Runs a Historical node, see http://druid.io/docs/latest/Historical.html for a description"
)
public class CliHistorical extends ServerRunnable
{
......@@ -77,10 +79,11 @@ public class CliHistorical extends ServerRunnable
LifecycleModule.register(binder, ZkCoordinator.class);
LifecycleModule.register(binder, Server.class);
binder.bind(Cache.class).toProvider(CacheProvider.class).in(ManageLifecycle.class);
JsonConfigProvider.bind(binder, "druid.historical.cache", CacheProvider.class);
JsonConfigProvider.bind(binder, "druid.historical.cache", CacheConfig.class);
MetricsModule.register(binder, CacheMonitor.class);
}
}
);
......
......@@ -93,7 +93,7 @@ import java.util.List;
*/
@Command(
name = "overlord",
description = "Runs an Overlord node, see http://druid.io/docs/0.6.86/Indexing-Service.html for a description"
description = "Runs an Overlord node, see http://druid.io/docs/latest/Indexing-Service.html for a description"
)
public class CliOverlord extends ServerRunnable
{
......
......@@ -30,7 +30,7 @@ import java.util.List;
*/
@Command(
name = "realtime",
description = "Runs a realtime node, see http://druid.io/docs/0.6.86/Realtime.html for a description"
description = "Runs a realtime node, see http://druid.io/docs/latest/Realtime.html for a description"
)
public class CliRealtime extends ServerRunnable
{
......
......@@ -29,14 +29,17 @@ import io.druid.client.RoutingDruidClient;
import io.druid.curator.discovery.DiscoveryModule;
import io.druid.curator.discovery.ServerDiscoveryFactory;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.guice.HttpClientModule;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Client;
import io.druid.guice.annotations.Self;
import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.router.CoordinatorRuleManager;
import io.druid.server.router.QueryHostFinder;
import io.druid.server.router.Router;
import io.druid.server.router.TieredBrokerConfig;
import io.druid.server.router.TieredBrokerHostSelector;
import org.eclipse.jetty.server.Server;
......@@ -62,6 +65,7 @@ public class CliRouter extends ServerRunnable
protected List<Object> getModules()
{
return ImmutableList.<Object>of(
new HttpClientModule("druid.router.http", Router.class),
new Module()
{
@Override
......
......@@ -86,6 +86,7 @@ public class RouterJettyServerInitializer implements JettyServerInitializer
), "/druid/v2/*"
);
queries.addFilter(GzipFilter.class, "/druid/v2/*", null);
queries.addFilter(GuiceFilter.class, "/status/*", null);
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册