提交 22e368e7 编写于 作者: X Xavier Léauté

add dimension/column selector to incremental index

上级 cde86d81
......@@ -46,6 +46,7 @@ import io.druid.segment.DimensionSelector;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.TimestampColumnSelector;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.serde.ComplexMetricExtractor;
import io.druid.segment.serde.ComplexMetricSerde;
import io.druid.segment.serde.ComplexMetrics;
......@@ -53,6 +54,7 @@ import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashMap;
......@@ -186,7 +188,7 @@ public class IncrementalIndex implements Iterable<Row>
dims = newDims;
}
TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims);
final TimeAndDims key = new TimeAndDims(Math.max(gran.truncate(row.getTimestampFromEpoch()), minTimestamp), dims);
Aggregator[] aggs = facts.get(key);
if (aggs == null) {
......@@ -248,8 +250,34 @@ public class IncrementalIndex implements Iterable<Row>
};
}
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
final List<String> dimensionValues = in.getDimension(columnName);
if (dimensionValues != null) {
return new ObjectColumnSelector<Object>()
{
@Override
public Class classOfObject()
{
return Object.class;
}
@Override
public Object get()
{
final String[] dimVals = dimensionValues.toArray(new String[]{});
if (dimVals.length == 1) {
return dimVals[0];
}
else if (dimVals.length == 0) {
return null;
}
else {
return dimVals;
}
}
};
}
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
throw new ISE("Don't know how to handle type[%s]", typeName);
}
......@@ -273,12 +301,67 @@ public class IncrementalIndex implements Iterable<Row>
}
@Override
public DimensionSelector makeDimensionSelector(String dimension)
public DimensionSelector makeDimensionSelector(final String dimension)
{
// we should implement this, but this is going to be rewritten soon anyways
throw new UnsupportedOperationException(
"Incremental index aggregation does not support dimension selectors"
);
final String dimensionName = dimension.toLowerCase();
final List<String> dimensionValues = in.getDimension(dimensionName);
if (dimensionValues == null) {
return null;
}
final IncrementalIndex.DimDim dimValLookup = getDimension(dimensionName);
final int maxId = dimValLookup.size();
return new DimensionSelector()
{
@Override
public IndexedInts getRow()
{
final ArrayList<Integer> vals = Lists.newArrayList();
for (String dimVal : dimensionValues) {
int id = dimValLookup.getId(dimVal);
vals.add(id);
}
return new IndexedInts()
{
@Override
public int size()
{
return vals.size();
}
@Override
public int get(int index)
{
return vals.get(index);
}
@Override
public Iterator<Integer> iterator()
{
return vals.iterator();
}
};
}
@Override
public int getValueCardinality()
{
return maxId;
}
@Override
public String lookupName(int id)
{
return dimValLookup.getValue(id);
}
@Override
public int lookupId(String name)
{
return dimValLookup.getId(name);
}
};
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册