未验证 提交 0080e333 编写于 作者: A Abhishek Agarwal 提交者: GitHub

Fix cardinality estimation (#10762)

* Fix cardinality estimation

* Add unit test

* code coverage

* fix typo
上级 2a1e47af
......@@ -44,12 +44,12 @@ import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.HashPartitioner;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -146,11 +146,6 @@ public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) tuningConfig.getPartitionsSpec();
Preconditions.checkNotNull(partitionsSpec, "partitionsSpec required in tuningConfig");
List<String> partitionDimensions = partitionsSpec.getPartitionDimensions();
if (partitionDimensions == null) {
partitionDimensions = HashBasedNumberedShardSpec.DEFAULT_PARTITION_DIMENSIONS;
}
InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
ingestionSchema.getDataSchema().getParser()
);
......@@ -179,8 +174,7 @@ public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
) {
Map<Interval, byte[]> cardinalities = determineCardinalities(
inputRowIterator,
granularitySpec,
partitionDimensions
granularitySpec
);
sendReport(
......@@ -194,8 +188,7 @@ public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
private Map<Interval, byte[]> determineCardinalities(
CloseableIterator<InputRow> inputRowIterator,
GranularitySpec granularitySpec,
List<String> partitionDimensions
GranularitySpec granularitySpec
)
{
Map<Interval, HllSketch> intervalToCardinalities = new HashMap<>();
......@@ -218,8 +211,10 @@ public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
interval,
(intervalKey) -> DimensionCardinalityReport.createHllSketchForReport()
);
// For cardinality estimation, we want to consider unique rows instead of unique hash buckets and therefore
// we do not use partition dimensions in computing the group key
List<Object> groupKey = HashPartitioner.extractKeys(
partitionDimensions,
Collections.emptyList(),
queryGranularity.bucketStart(timestamp).getMillis(),
inputRow
);
......
......@@ -53,6 +53,7 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -277,6 +278,18 @@ class ParallelIndexTestingFactory
}
}
static String createRowFromMap(long timestamp, Map<String, Object> fields)
{
HashMap<String, Object> row = new HashMap<>(fields);
row.put(SCHEMA_TIME, timestamp);
try {
return NESTED_OBJECT_MAPPER.writeValueAsString(row);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
static InputFormat getInputFormat()
{
return new JsonInputFormat(null, null, null);
......
......@@ -21,12 +21,14 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
......@@ -264,6 +266,38 @@ public class PartialDimensionCardinalityTaskTest
Assert.assertEquals(1L, (long) hllSketch.getEstimate());
}
@Test
public void sendsCorrectReportWhenNonEmptyPartitionDimension()
{
InputSource inlineInputSource = new InlineInputSource(
ParallelIndexTestingFactory.createRowFromMap(0, ImmutableMap.of("dim1", "a", "dim2", "1")) + "\n" +
ParallelIndexTestingFactory.createRowFromMap(0, ImmutableMap.of("dim1", "a", "dim2", "2")) + "\n" +
ParallelIndexTestingFactory.createRowFromMap(0, ImmutableMap.of("dim1", "b", "dim2", "3")) + "\n" +
ParallelIndexTestingFactory.createRowFromMap(0, ImmutableMap.of("dim1", "b", "dim2", "4"))
);
HashedPartitionsSpec partitionsSpec = new HashedPartitionsSpec(null, null,
Collections.singletonList("dim1")
);
ParallelIndexTuningConfig tuningConfig = new ParallelIndexTestingFactory.TuningConfigBuilder()
.partitionsSpec(partitionsSpec)
.build();
PartialDimensionCardinalityTaskBuilder taskBuilder = new PartialDimensionCardinalityTaskBuilder()
.inputSource(inlineInputSource)
.tuningConfig(tuningConfig)
.withDimensions(Arrays.asList("dim1", "dim2"));
DimensionCardinalityReport report = runTask(taskBuilder);
Assert.assertEquals(ParallelIndexTestingFactory.ID, report.getTaskId());
Map<Interval, byte[]> intervalToCardinalities = report.getIntervalToCardinalities();
byte[] hllSketchBytes = Iterables.getOnlyElement(intervalToCardinalities.values());
HllSketch hllSketch = HllSketch.wrap(Memory.wrap(hllSketchBytes));
Assert.assertNotNull(hllSketch);
Assert.assertEquals(4L, (long) hllSketch.getEstimate());
}
@Test
public void sendsCorrectReportWithMultipleIntervalsInData()
{
......@@ -368,6 +402,12 @@ public class PartialDimensionCardinalityTaskTest
return this;
}
PartialDimensionCardinalityTaskBuilder withDimensions(List<String> dims)
{
this.dataSchema = dataSchema.withDimensionsSpec(new DimensionsSpec(DimensionsSpec.getDefaultSchemas(dims)));
return this;
}
PartialDimensionCardinalityTask build()
{
......
......@@ -300,6 +300,20 @@ public class DataSchema
);
}
public DataSchema withDimensionsSpec(DimensionsSpec dimensionsSpec)
{
return new DataSchema(
dataSource,
timestampSpec,
dimensionsSpec,
aggregators,
granularitySpec,
transformSpec,
parserMap,
objectMapper
);
}
@Override
public String toString()
{
......
......@@ -44,6 +44,7 @@ import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.testing.InitializedNullHandlingTest;
......@@ -52,6 +53,8 @@ import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import java.io.IOException;
import java.nio.ByteBuffer;
......@@ -60,6 +63,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class DataSchemaTest extends InitializedNullHandlingTest
{
......@@ -546,4 +550,31 @@ public class DataSchemaTest extends InitializedNullHandlingTest
Assert.assertEquals(originalSchema.getTransformSpec(), deserialized.getTransformSpec());
Assert.assertEquals(originalSchema.getParserMap(), deserialized.getParserMap());
}
@Test
public void testWithDimensionSpec()
{
TimestampSpec tsSpec = Mockito.mock(TimestampSpec.class);
GranularitySpec gSpec = Mockito.mock(GranularitySpec.class);
DimensionsSpec oldDimSpec = Mockito.mock(DimensionsSpec.class);
DimensionsSpec newDimSpec = Mockito.mock(DimensionsSpec.class);
AggregatorFactory aggFactory = Mockito.mock(AggregatorFactory.class);
TransformSpec transSpec = Mockito.mock(TransformSpec.class);
Map<String, Object> parserMap = Mockito.mock(Map.class);
Mockito.when(newDimSpec.withDimensionExclusions(ArgumentMatchers.any(Set.class))).thenReturn(newDimSpec);
DataSchema oldSchema = new DataSchema("dataSource", tsSpec, oldDimSpec,
new AggregatorFactory[]{aggFactory}, gSpec,
transSpec, parserMap, jsonMapper
);
DataSchema newSchema = oldSchema.withDimensionsSpec(newDimSpec);
Assert.assertSame(oldSchema.getDataSource(), newSchema.getDataSource());
Assert.assertSame(oldSchema.getTimestampSpec(), newSchema.getTimestampSpec());
Assert.assertSame(newDimSpec, newSchema.getDimensionsSpec());
Assert.assertSame(oldSchema.getAggregators(), newSchema.getAggregators());
Assert.assertSame(oldSchema.getGranularitySpec(), newSchema.getGranularitySpec());
Assert.assertSame(oldSchema.getTransformSpec(), newSchema.getTransformSpec());
Assert.assertSame(oldSchema.getParserMap(), newSchema.getParserMap());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册