提交 5fae4d9a 编写于 作者: F fjy

Merge pull request #457 from metamx/stream-cache

Populate cache without materializing results first
......@@ -20,6 +20,8 @@
package io.druid.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
......@@ -33,6 +35,7 @@ import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.SegmentDescriptor;
import javax.annotation.Nullable;
import java.util.ArrayList;
public class CachePopulatingQueryRunner<T> implements QueryRunner<T>
......@@ -72,26 +75,33 @@ public class CachePopulatingQueryRunner<T> implements QueryRunner<T>
final boolean populateCache = Boolean.parseBoolean(query.getContextValue(CacheConfig.POPULATE_CACHE, "true"))
&& strategy != null
&& cacheConfig.isPopulateCache()
// historical only populates distributed cache since the cache lookups are done at broker.
&& !(cache instanceof MapCache);
&& cacheConfig.isPopulateCache();
final Sequence<T> results = base.run(query);
if (populateCache) {
Sequence<T> results = base.run(query);
Cache.NamedKey key = CacheUtil.computeSegmentCacheKey(
final Cache.NamedKey key = CacheUtil.computeSegmentCacheKey(
segmentIdentifier,
segmentDescriptor,
strategy.computeCacheKey(query)
);
ArrayList<T> resultAsList = Sequences.toList(results, new ArrayList<T>());
CacheUtil.populate(
cache,
mapper,
key,
Lists.transform(resultAsList, strategy.prepareForCache())
final Function cacheFn = strategy.prepareForCache();
return Sequences.map(
results,
new Function<T, T>()
{
@Nullable
@Override
public T apply(@Nullable T input)
{
CacheUtil.populate(cache, mapper, key, ImmutableList.of(cacheFn.apply(input)));
return input;
}
}
);
return Sequences.simple(resultAsList);
} else {
return base.run(query);
return results;
}
}
......
......@@ -123,7 +123,7 @@ public class CachePopulatingQueryRunnerTest
Sequence res = runner.run(builder.build());
// base sequence is not closed yet
Assert.assertTrue(closable.isClosed());
Assert.assertFalse("sequence must not be closed", closable.isClosed());
ArrayList results = Sequences.toList(res, new ArrayList());
Assert.assertTrue(closable.isClosed());
Assert.assertEquals(expectedRes, results);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册