未验证 提交 4437c6af 编写于 作者: K kaijianding 提交者: GitHub

use actual dataInterval in cache key (#10714)

* use actual dataInterval in cache key

* fix ut fail

* fix segmentMaxTime exclusive
上级 b3325c16
......@@ -37,6 +37,7 @@ import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.context.ResponseContext;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.Collections;
......@@ -47,6 +48,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
{
private final String cacheId;
private final SegmentDescriptor segmentDescriptor;
private final Interval actualDataInterval;
private final Optional<byte[]> cacheKeyPrefix;
private final QueryRunner<T> base;
private final QueryToolChest toolChest;
......@@ -59,6 +61,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
String cacheId,
Optional<byte[]> cacheKeyPrefix,
SegmentDescriptor segmentDescriptor,
Interval actualDataInterval,
ObjectMapper mapper,
Cache cache,
QueryToolChest toolchest,
......@@ -71,6 +74,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
this.base = base;
this.cacheId = cacheId;
this.segmentDescriptor = segmentDescriptor;
this.actualDataInterval = actualDataInterval;
this.toolChest = toolchest;
this.cache = cache;
this.mapper = mapper;
......@@ -90,7 +94,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
if (useCache || populateCache) {
key = CacheUtil.computeSegmentCacheKey(
cacheId,
segmentDescriptor,
alignToActualDataInterval(segmentDescriptor),
Bytes.concat(cacheKeyPrefix.get(), strategy.computeCacheKey(query))
);
} else {
......@@ -172,4 +176,14 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
) && cacheKeyPrefix.isPresent();
}
private SegmentDescriptor alignToActualDataInterval(SegmentDescriptor in)
{
Interval interval = in.getInterval();
return new SegmentDescriptor(
interval.overlaps(actualDataInterval) ? interval.overlap(actualDataInterval) : interval,
in.getVersion(),
in.getPartitionNumber()
);
}
}
......@@ -29,6 +29,7 @@ import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.ForegroundCachePopulator;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
......@@ -57,6 +58,7 @@ import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.segment.realtime.FireHydrant;
......@@ -230,10 +232,15 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
// 1) Only use caching if data is immutable
// 2) Hydrants are not the same between replicas, make sure cache is local
if (hydrantDefinitelySwapped && cache.isLocal()) {
StorageAdapter storageAdapter = segmentAndCloseable.lhs.asStorageAdapter();
long segmentMinTime = storageAdapter.getMinTime().getMillis();
long segmentMaxTime = storageAdapter.getMaxTime().getMillis();
Interval actualDataInterval = Intervals.utc(segmentMinTime, segmentMaxTime + 1);
runner = new CachingQueryRunner<>(
makeHydrantCacheIdentifier(hydrant),
cacheKeyPrefix,
descriptor,
actualDataInterval,
objectMapper,
cache,
toolChest,
......
......@@ -29,6 +29,7 @@ import org.apache.druid.client.cache.CachePopulator;
import org.apache.druid.guice.annotations.Processing;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.emitter.EmittingLogger;
......@@ -57,6 +58,7 @@ import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.server.SegmentManager;
......@@ -300,10 +302,15 @@ public class ServerManager implements QuerySegmentWalker
queryMetrics -> queryMetrics.segment(segmentIdString)
);
StorageAdapter storageAdapter = segment.asStorageAdapter();
long segmentMaxTime = storageAdapter.getMaxTime().getMillis();
long segmentMinTime = storageAdapter.getMinTime().getMillis();
Interval actualDataInterval = Intervals.utc(segmentMinTime, segmentMaxTime + 1);
CachingQueryRunner<T> cachingQueryRunner = new CachingQueryRunner<>(
segmentIdString,
cacheKeyPrefix,
segmentDescriptor,
actualDataInterval,
objectMapper,
cache,
toolChest,
......
......@@ -413,6 +413,7 @@ public class CachingQueryRunnerTest
CACHE_ID,
Optional.ofNullable(cacheKeyPrefix),
SEGMENT_DESCRIPTOR,
SEGMENT_DESCRIPTOR.getInterval(),
objectMapper,
cache,
toolchest,
......
......@@ -63,16 +63,22 @@ import org.apache.druid.query.context.DefaultResponseContext;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.context.ResponseContext.Key;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.search.SearchQuery;
import org.apache.druid.query.search.SearchResultValue;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.Metadata;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.segment.loading.SegmentLoadingException;
......@@ -85,6 +91,7 @@ import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
......@@ -92,6 +99,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
......@@ -837,7 +845,7 @@ public class ServerManagerTest
@Override
public StorageAdapter asStorageAdapter()
{
throw new UnsupportedOperationException();
return makeFakeStorageAdapter(interval, 0);
}
@Override
......@@ -847,6 +855,112 @@ public class ServerManagerTest
closed = true;
}
}
private StorageAdapter makeFakeStorageAdapter(Interval interval, int cardinality)
{
StorageAdapter adapter = new StorageAdapter()
{
@Override
public Interval getInterval()
{
return interval;
}
@Override
public int getDimensionCardinality(String column)
{
return cardinality;
}
@Override
public DateTime getMinTime()
{
return interval.getStart();
}
@Override
public DateTime getMaxTime()
{
return interval.getEnd();
}
// stubs below this line not important for tests
@Override
public Indexed<String> getAvailableDimensions()
{
return null;
}
@Override
public Iterable<String> getAvailableMetrics()
{
return null;
}
@Nullable
@Override
public Comparable getMinValue(String column)
{
return null;
}
@Nullable
@Override
public Comparable getMaxValue(String column)
{
return null;
}
@Nullable
@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
return null;
}
@Nullable
@Override
public String getColumnTypeName(String column)
{
return null;
}
@Override
public int getNumRows()
{
return 0;
}
@Override
public DateTime getMaxIngestedEventTime()
{
return null;
}
@Override
public Metadata getMetadata()
{
return null;
}
@Override
public Sequence<Cursor> makeCursors(
@Nullable Filter filter,
Interval interval,
VirtualColumns virtualColumns,
Granularity gran,
boolean descending,
@Nullable QueryMetrics<?> queryMetrics
)
{
return null;
}
};
return adapter;
}
}
private static class BlockingQueryRunner<T> implements QueryRunner<T>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册