提交 609da018 编写于 作者: J Jonathan Wei 提交者: Roman Leventov

Fix dictionary ID race condition in IncrementalIndexStorageAdapter (#6340)

Possibly related to https://github.com/apache/incubator-druid/issues/4937

--------

There is currently a race condition in IncrementalIndexStorageAdapter that can lead to exceptions like the following, when running queries with filters on String dimensions that hit realtime tasks: 

```
org.apache.druid.java.util.common.ISE: id[5] >= maxId[5]
	at org.apache.druid.segment.StringDimensionIndexer$1IndexerDimensionSelector.lookupName(StringDimensionIndexer.java:591)
	at org.apache.druid.segment.StringDimensionIndexer$1IndexerDimensionSelector$2.matches(StringDimensionIndexer.java:562)
	at org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter$IncrementalIndexCursor.advance(IncrementalIndexStorageAdapter.java:284)
```

When the `filterMatcher` is created in the constructor of `IncrementalIndexStorageAdapter.IncrementalIndexCursor`, `StringDimensionIndexer.makeDimensionSelector` gets called eventually, which calls:

```
final int maxId = getCardinality();
...

 @Override
  public int getCardinality()
  {
    return dimLookup.size();
  }
```

So `maxId` is set to the size of the dictionary at the time that the `filterMatcher` is created.

However, the `maxRowIndex` which is meant to prevent the Cursor from returning rows that were added after the Cursor was created (see https://github.com/apache/incubator-druid/pull/4049) is set after the `filterMatcher` is created.

If rows with new dictionary values are added after the `filterMatcher` is created but before `maxRowIndex` is set, then it is possible for the Cursor to return rows that contain the new values, which will have `id >= maxId`.

This PR sets `maxRowIndex` before creating the `filterMatcher` to prevent rows with unknown dictionary IDs from being passed to the `filterMatcher`.

-----------

The included test triggers the error with a custom Filter + DruidPredicateFactory.

The DimensionSelector for predicate-based filter matching is created here in `Filters.makeValueMatcher`:

```
  public static ValueMatcher makeValueMatcher(
      final ColumnSelectorFactory columnSelectorFactory,
      final String columnName,
      final DruidPredicateFactory predicateFactory
  )
  {
    final ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(columnName);

    // This should be folded into the ValueMatcherColumnSelectorStrategy once that can handle LONG typed columns.
    if (capabilities != null && capabilities.getType() == ValueType.LONG) {
      return getLongPredicateMatcher(
          columnSelectorFactory.makeColumnValueSelector(columnName),
          predicateFactory.makeLongPredicate()
      );
    }

    final ColumnSelectorPlus<ValueMatcherColumnSelectorStrategy> selector =
        DimensionHandlerUtils.createColumnSelectorPlus(
            ValueMatcherColumnSelectorStrategyFactory.instance(),
            DefaultDimensionSpec.of(columnName),
            columnSelectorFactory
        );

    return selector.getColumnSelectorStrategy().makeValueMatcher(selector.getSelector(), predicateFactory);
  }
```

The test Filter adds a row to the IncrementalIndex in the test when the predicateFactory creates a new String predicate, after `DimensionHandlerUtils.createColumnSelectorPlus` is called.
上级 edf0c138
......@@ -236,9 +236,10 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
{
currEntry = new IncrementalIndexRowHolder();
columnSelectorFactory = new IncrementalIndexColumnSelectorFactory(index, virtualColumns, descending, currEntry);
// Set maxRowIndex before creating the filterMatcher. See https://github.com/apache/incubator-druid/pull/6340
maxRowIndex = index.getLastRowIndex();
filterMatcher = filter == null ? BooleanValueMatcher.of(true) : filter.makeMatcher(columnSelectorFactory);
numAdvanced = -1;
maxRowIndex = index.getLastRowIndex();
final long timeStart = Math.max(interval.getStartMillis(), actualInterval.getStartMillis());
cursorIterable = index.getFacts().timeRangeIterable(
descending,
......
......@@ -19,6 +19,8 @@
package org.apache.druid.segment.incremental;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
......@@ -33,23 +35,34 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.js.JavaScriptConfig;
import org.apache.druid.query.BitmapResultFactory;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.JavaScriptAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.filter.BitmapIndexSelector;
import org.apache.druid.query.filter.DimFilters;
import org.apache.druid.query.filter.DruidDoublePredicate;
import org.apache.druid.query.filter.DruidFloatPredicate;
import org.apache.druid.query.filter.DruidLongPredicate;
import org.apache.druid.query.filter.DruidPredicateFactory;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.filter.ValueMatcher;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryEngine;
import org.apache.druid.query.topn.TopNQueryBuilder;
import org.apache.druid.query.topn.TopNQueryEngine;
import org.apache.druid.query.topn.TopNResultValue;
import org.apache.druid.segment.ColumnSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.filter.SelectorFilter;
import org.joda.time.DateTime;
import org.joda.time.Interval;
......@@ -478,6 +491,59 @@ public class IncrementalIndexStorageAdapterTest
Assert.assertEquals(1, assertCursorsNotEmpty.get());
}
@Test
public void testCursorDictionaryRaceConditionFix() throws Exception
{
// Tests the dictionary ID race condition bug described at https://github.com/apache/incubator-druid/pull/6340
final IncrementalIndex index = indexCreator.createIndex();
final long timestamp = System.currentTimeMillis();
for (int i = 0; i < 5; i++) {
index.add(
new MapBasedInputRow(
timestamp,
Collections.singletonList("billy"),
ImmutableMap.of("billy", "v1" + i)
)
);
}
final StorageAdapter sa = new IncrementalIndexStorageAdapter(index);
Sequence<Cursor> cursors = sa.makeCursors(
new DictionaryRaceTestFilter(index, timestamp),
Intervals.utc(timestamp - 60_000, timestamp + 60_000),
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
);
final AtomicInteger assertCursorsNotEmpty = new AtomicInteger(0);
cursors
.map(cursor -> {
DimensionSelector dimSelector = cursor
.getColumnSelectorFactory()
.makeDimensionSelector(new DefaultDimensionSpec("billy", "billy"));
int cardinality = dimSelector.getValueCardinality();
int rowNumInCursor = 0;
while (!cursor.isDone()) {
IndexedInts row = dimSelector.getRow();
row.forEach(i -> Assert.assertTrue(i < cardinality));
cursor.advance();
rowNumInCursor++;
}
Assert.assertEquals(5, rowNumInCursor);
assertCursorsNotEmpty.incrementAndGet();
return null;
})
.toList();
Assert.assertEquals(1, assertCursorsNotEmpty.get());
}
@Test
public void testCursoringAndSnapshot() throws Exception
{
......@@ -584,4 +650,97 @@ public class IncrementalIndexStorageAdapterTest
.toList();
Assert.assertEquals(1, assertCursorsNotEmpty.get());
}
private class DictionaryRaceTestFilter implements Filter
{
private final IncrementalIndex index;
private final long timestamp;
private DictionaryRaceTestFilter(
IncrementalIndex index,
long timestamp
)
{
this.index = index;
this.timestamp = timestamp;
}
@Override
public <T> T getBitmapResult(
BitmapIndexSelector selector, BitmapResultFactory<T> bitmapResultFactory
)
{
return bitmapResultFactory.wrapAllTrue(Filters.allTrue(selector));
}
@Override
public double estimateSelectivity(BitmapIndexSelector indexSelector)
{
return 1;
}
@Override
public ValueMatcher makeMatcher(ColumnSelectorFactory factory)
{
return Filters.makeValueMatcher(
factory,
"billy",
new DictionaryRaceTestFilterDruidPredicateFactory()
);
}
@Override
public boolean supportsBitmapIndex(BitmapIndexSelector selector)
{
return true;
}
@Override
public boolean supportsSelectivityEstimation(
ColumnSelector columnSelector, BitmapIndexSelector indexSelector
)
{
return true;
}
private class DictionaryRaceTestFilterDruidPredicateFactory implements DruidPredicateFactory
{
@Override
public Predicate<String> makeStringPredicate()
{
try {
index.add(
new MapBasedInputRow(
timestamp,
Collections.singletonList("billy"),
ImmutableMap.of("billy", "v31234")
)
);
}
catch (IndexSizeExceededException isee) {
throw new RuntimeException(isee);
}
return Predicates.alwaysTrue();
}
@Override
public DruidLongPredicate makeLongPredicate()
{
throw new UnsupportedOperationException();
}
@Override
public DruidFloatPredicate makeFloatPredicate()
{
throw new UnsupportedOperationException();
}
@Override
public DruidDoublePredicate makeDoublePredicate()
{
throw new UnsupportedOperationException();
}
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册