diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java index 53cba0fcda35b65299ec149d4520cd6fed427e55..94c3cf2749cc263573f2cdbf7c884dabb508dd8d 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -236,9 +236,10 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter { currEntry = new IncrementalIndexRowHolder(); columnSelectorFactory = new IncrementalIndexColumnSelectorFactory(index, virtualColumns, descending, currEntry); + // Set maxRowIndex before creating the filterMatcher. See https://github.com/apache/incubator-druid/pull/6340 + maxRowIndex = index.getLastRowIndex(); filterMatcher = filter == null ? BooleanValueMatcher.of(true) : filter.makeMatcher(columnSelectorFactory); numAdvanced = -1; - maxRowIndex = index.getLastRowIndex(); final long timeStart = Math.max(interval.getStartMillis(), actualInterval.getStartMillis()); cursorIterable = index.getFacts().timeRangeIterable( descending, diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java index 00e718435f7be78583a1a10a9313ba648b4e7879..de7a6fc4192c3d2c0ef3ec620523b41a3ed86cc4 100644 --- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java +++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java @@ -19,6 +19,8 @@ package org.apache.druid.segment.incremental; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -33,23 +35,34 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.js.JavaScriptConfig; +import org.apache.druid.query.BitmapResultFactory; import org.apache.druid.query.Result; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.JavaScriptAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.filter.BitmapIndexSelector; import org.apache.druid.query.filter.DimFilters; +import org.apache.druid.query.filter.DruidDoublePredicate; +import org.apache.druid.query.filter.DruidFloatPredicate; +import org.apache.druid.query.filter.DruidLongPredicate; +import org.apache.druid.query.filter.DruidPredicateFactory; +import org.apache.druid.query.filter.Filter; +import org.apache.druid.query.filter.ValueMatcher; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByQueryEngine; import org.apache.druid.query.topn.TopNQueryBuilder; import org.apache.druid.query.topn.TopNQueryEngine; import org.apache.druid.query.topn.TopNResultValue; +import org.apache.druid.segment.ColumnSelector; +import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.data.IndexedInts; +import org.apache.druid.segment.filter.Filters; import org.apache.druid.segment.filter.SelectorFilter; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -478,6 +491,59 @@ public class IncrementalIndexStorageAdapterTest Assert.assertEquals(1, assertCursorsNotEmpty.get()); } + @Test + public void testCursorDictionaryRaceConditionFix() throws Exception + { + // Tests the dictionary ID race condition bug described at https://github.com/apache/incubator-druid/pull/6340 + + final IncrementalIndex index = indexCreator.createIndex(); + final long timestamp = System.currentTimeMillis(); + + for (int i = 0; i < 5; i++) { + index.add( + new MapBasedInputRow( + timestamp, + Collections.singletonList("billy"), + ImmutableMap.of("billy", "v1" + i) + ) + ); + } + + final StorageAdapter sa = new IncrementalIndexStorageAdapter(index); + + Sequence cursors = sa.makeCursors( + new DictionaryRaceTestFilter(index, timestamp), + Intervals.utc(timestamp - 60_000, timestamp + 60_000), + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + final AtomicInteger assertCursorsNotEmpty = new AtomicInteger(0); + + cursors + .map(cursor -> { + DimensionSelector dimSelector = cursor + .getColumnSelectorFactory() + .makeDimensionSelector(new DefaultDimensionSpec("billy", "billy")); + int cardinality = dimSelector.getValueCardinality(); + + int rowNumInCursor = 0; + while (!cursor.isDone()) { + IndexedInts row = dimSelector.getRow(); + row.forEach(i -> Assert.assertTrue(i < cardinality)); + cursor.advance(); + rowNumInCursor++; + } + Assert.assertEquals(5, rowNumInCursor); + assertCursorsNotEmpty.incrementAndGet(); + + return null; + }) + .toList(); + Assert.assertEquals(1, assertCursorsNotEmpty.get()); + } + @Test public void testCursoringAndSnapshot() throws Exception { @@ -584,4 +650,97 @@ public class IncrementalIndexStorageAdapterTest .toList(); Assert.assertEquals(1, assertCursorsNotEmpty.get()); } + + private class DictionaryRaceTestFilter implements Filter + { + private final IncrementalIndex index; + private final long timestamp; + + private DictionaryRaceTestFilter( + IncrementalIndex index, + long timestamp + ) + { + this.index = index; + this.timestamp = timestamp; + } + + @Override + public T getBitmapResult( + BitmapIndexSelector selector, BitmapResultFactory bitmapResultFactory + ) + { + return bitmapResultFactory.wrapAllTrue(Filters.allTrue(selector)); + } + + @Override + public double estimateSelectivity(BitmapIndexSelector indexSelector) + { + return 1; + } + + @Override + public ValueMatcher makeMatcher(ColumnSelectorFactory factory) + { + return Filters.makeValueMatcher( + factory, + "billy", + new DictionaryRaceTestFilterDruidPredicateFactory() + ); + } + + @Override + public boolean supportsBitmapIndex(BitmapIndexSelector selector) + { + return true; + } + + @Override + public boolean supportsSelectivityEstimation( + ColumnSelector columnSelector, BitmapIndexSelector indexSelector + ) + { + return true; + } + + private class DictionaryRaceTestFilterDruidPredicateFactory implements DruidPredicateFactory + { + @Override + public Predicate makeStringPredicate() + { + try { + index.add( + new MapBasedInputRow( + timestamp, + Collections.singletonList("billy"), + ImmutableMap.of("billy", "v31234") + ) + ); + } + catch (IndexSizeExceededException isee) { + throw new RuntimeException(isee); + } + + return Predicates.alwaysTrue(); + } + + @Override + public DruidLongPredicate makeLongPredicate() + { + throw new UnsupportedOperationException(); + } + + @Override + public DruidFloatPredicate makeFloatPredicate() + { + throw new UnsupportedOperationException(); + } + + @Override + public DruidDoublePredicate makeDoublePredicate() + { + throw new UnsupportedOperationException(); + } + } + } }