提交 98499b11 编写于 作者: N Nikolai Kochetov

LowCardinality optimization for single nullable column.

上级 cec49357
......@@ -36,6 +36,7 @@ public:
const ColumnPtr & getNestedColumn() const override;
const ColumnPtr & getNestedNotNullableColumn() const override { return column_holder; }
bool nestedColumnIsNullable() const override { return is_nullable; }
size_t uniqueInsert(const Field & x) override;
size_t uniqueInsertFrom(const IColumn & src, size_t n) override;
......
......@@ -18,6 +18,8 @@ public:
/// The same as getNestedColumn, but removes null map if nested column is nullable.
virtual const ColumnPtr & getNestedNotNullableColumn() const = 0;
virtual bool nestedColumnIsNullable() const = 0;
/// Returns array with StringRefHash calculated for each row of getNestedNotNullableColumn() column.
/// Returns nullptr if nested column doesn't contain strings. Otherwise calculates hash (if it wasn't).
/// Uses thread-safe cache.
......
......@@ -10,16 +10,17 @@ template
typename Cell,
typename Hash = DefaultHash<Key>,
typename Grower = TwoLevelHashTableGrower<>,
typename Allocator = HashTableAllocator
typename Allocator = HashTableAllocator,
template <typename, typename, typename, typename ,typename> typename ImplTable = HashMapTable
>
class TwoLevelHashMapTable : public TwoLevelHashTable<Key, Cell, Hash, Grower, Allocator, HashMapTable<Key, Cell, Hash, Grower, Allocator>>
class TwoLevelHashMapTable : public TwoLevelHashTable<Key, Cell, Hash, Grower, Allocator, ImplTable<Key, Cell, Hash, Grower, Allocator>>
{
public:
using key_type = Key;
using mapped_type = typename Cell::Mapped;
using value_type = typename Cell::value_type;
using TwoLevelHashTable<Key, Cell, Hash, Grower, Allocator, HashMapTable<Key, Cell, Hash, Grower, Allocator>>::TwoLevelHashTable;
using TwoLevelHashTable<Key, Cell, Hash, Grower, Allocator, ImplTable<Key, Cell, Hash, Grower, Allocator>>::TwoLevelHashTable;
mapped_type & ALWAYS_INLINE operator[](Key x)
{
......@@ -41,9 +42,10 @@ template
typename Mapped,
typename Hash = DefaultHash<Key>,
typename Grower = TwoLevelHashTableGrower<>,
typename Allocator = HashTableAllocator
typename Allocator = HashTableAllocator,
template <typename, typename, typename, typename ,typename> typename ImplTable = HashMapTable
>
using TwoLevelHashMap = TwoLevelHashMapTable<Key, HashMapCell<Key, Mapped, Hash>, Hash, Grower, Allocator>;
using TwoLevelHashMap = TwoLevelHashMapTable<Key, HashMapCell<Key, Mapped, Hash>, Hash, Grower, Allocator, ImplTable>;
template
......@@ -52,6 +54,7 @@ template
typename Mapped,
typename Hash = DefaultHash<Key>,
typename Grower = TwoLevelHashTableGrower<>,
typename Allocator = HashTableAllocator
typename Allocator = HashTableAllocator,
template <typename, typename, typename, typename ,typename> typename ImplTable = HashMapTable
>
using TwoLevelHashMapWithSavedHash = TwoLevelHashMapTable<Key, HashMapCellWithSavedHash<Key, Mapped, Hash>, Hash, Grower, Allocator>;
using TwoLevelHashMapWithSavedHash = TwoLevelHashMapTable<Key, HashMapCellWithSavedHash<Key, Mapped, Hash>, Hash, Grower, Allocator, ImplTable>;
......@@ -453,6 +453,27 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod()
return AggregatedDataVariants::Type::nullable_keys256;
}
if (has_low_cardinality && params.keys_size == 1)
{
if (types_removed_nullable[0]->isValueRepresentedByNumber())
{
size_t size_of_field = types_removed_nullable[0]->getSizeOfValueInMemory();
if (size_of_field == 1)
return AggregatedDataVariants::Type::low_cardinality_key8;
if (size_of_field == 2)
return AggregatedDataVariants::Type::low_cardinality_key16;
if (size_of_field == 4)
return AggregatedDataVariants::Type::low_cardinality_key32;
if (size_of_field == 8)
return AggregatedDataVariants::Type::low_cardinality_key64;
}
else if (isString(types_removed_nullable[0]))
return AggregatedDataVariants::Type::low_cardinality_key_string;
else if (isFixedString(types_removed_nullable[0]))
return AggregatedDataVariants::Type::low_cardinality_key_fixed_string;
}
/// Fallback case.
return AggregatedDataVariants::Type::serialized;
}
......@@ -1139,12 +1160,10 @@ void Aggregator::convertToBlockImpl(
convertToBlockImplFinal(method, data, key_columns, final_aggregate_columns);
else
convertToBlockImplNotFinal(method, data, key_columns, aggregate_columns);
/// In order to release memory early.
data.clearAndShrink();
}
template <typename Method, typename Table>
void NO_INLINE Aggregator::convertToBlockImplFinal(
Method & method,
......@@ -1152,6 +1171,19 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
MutableColumns & key_columns,
MutableColumns & final_aggregate_columns) const
{
if constexpr (Method::low_cardinality_optimization)
{
if (data.hasNullKeyData())
{
key_columns[0]->insert(Field()); /// Null
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->insertResultInto(
data.getNullKeyData() + offsets_of_aggregate_states[i],
*final_aggregate_columns[i]);
}
}
for (const auto & value : data)
{
method.insertKeyIntoColumns(value, key_columns, key_sizes);
......@@ -1172,6 +1204,17 @@ void NO_INLINE Aggregator::convertToBlockImplNotFinal(
MutableColumns & key_columns,
AggregateColumnsData & aggregate_columns) const
{
if constexpr (Method::low_cardinality_optimization)
{
if (data.hasNullKeyData())
{
key_columns[0]->insert(Field()); /// Null
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i]->push_back(data.getNullKeyData() + offsets_of_aggregate_states[i]);
}
}
for (auto & value : data)
{
method.insertKeyIntoColumns(value, key_columns, key_sizes);
......@@ -2341,6 +2384,15 @@ void NO_INLINE Aggregator::convertBlockToTwoLevelImpl(
/// For every row.
for (size_t i = 0; i < rows; ++i)
{
if constexpr (Method::low_cardinality_optimization)
{
if (state.isNullAt(i))
{
selector[i] = 0;
continue;
}
}
/// Obtain a key. Calculate bucket number from it.
typename Method::Key key = state.getKey(key_columns, params.keys_size, i, key_sizes, keys, *pool);
......
......@@ -88,6 +88,56 @@ using AggregatedDataWithStringKeyHash64 = HashMapWithSavedHash<StringRef, Aggreg
using AggregatedDataWithKeys128Hash64 = HashMap<UInt128, AggregateDataPtr, UInt128Hash>;
using AggregatedDataWithKeys256Hash64 = HashMap<UInt256, AggregateDataPtr, UInt256Hash>;
template <typename Base>
struct AggregationDataWithNullKey : public Base
{
using Base::Base;
bool & hasNullKeyData() { return has_null_key_data; }
AggregateDataPtr & getNullKeyData() { return null_key_data; }
bool hasNullKeyData() const { return has_null_key_data; }
const AggregateDataPtr & getNullKeyData() const { return null_key_data; }
private:
bool has_null_key_data = false;
AggregateDataPtr null_key_data = nullptr;
};
template <typename Base>
struct AggregationDataWithNullKeyTwoLevel : public Base
{
using Base::Base;
using Base::impls;
template <typename Other>
explicit AggregationDataWithNullKeyTwoLevel(const Other & other) : Base(other)
{
impls[0].hasNullKeyData() = other.hasNullKeyData();
impls[0].getNullKeyData() = other.getNullKeyData();
}
bool & hasNullKeyData() { return impls[0].hasNullKeyData(); }
AggregateDataPtr & getNullKeyData() { return impls[0].getNullKeyData(); }
bool hasNullKeyData() const { return impls[0].hasNullKeyData(); }
const AggregateDataPtr & getNullKeyData() const { return impls[0].getNullKeyData(); }
};
template <typename ... Types>
using HashTableWithNullKey = AggregationDataWithNullKey<HashMapTable<Types ...>>;
using AggregatedDataWithNullableUInt8Key = AggregationDataWithNullKey<AggregatedDataWithUInt8Key>;
using AggregatedDataWithNullableUInt16Key = AggregationDataWithNullKey<AggregatedDataWithUInt16Key>;
using AggregatedDataWithNullableUInt64Key = AggregationDataWithNullKey<AggregatedDataWithUInt64Key>;
using AggregatedDataWithNullableStringKey = AggregationDataWithNullKey<AggregatedDataWithStringKey>;
using AggregatedDataWithNullableUInt64KeyTwoLevel = AggregationDataWithNullKeyTwoLevel<
TwoLevelHashMap<UInt64, AggregateDataPtr, HashCRC32<UInt64>,
TwoLevelHashTableGrower<>, HashTableAllocator, HashTableWithNullKey>>;
using AggregatedDataWithNullableStringKeyTwoLevel = AggregationDataWithNullKeyTwoLevel<
TwoLevelHashMapWithSavedHash<StringRef, AggregateDataPtr, DefaultHash<StringRef>,
TwoLevelHashTableGrower<>, HashTableAllocator, HashTableWithNullKey>>;
/// Cache which can be used by aggregations method's states. Object is shared in all threads.
struct AggregationStateCache
{
......@@ -403,8 +453,10 @@ struct AggregationMethodSingleLowCardinalityColumn : public SingleColumnMethod
ColumnPtr dictionary_holder;
/// Cache AggregateDataPtr for current column in order to decrease the number of hash table usages.
PaddedPODArray<AggregateDataPtr> aggregate_data;
PaddedPODArray<AggregateDataPtr> * aggregate_data_cache;
PaddedPODArray<AggregateDataPtr> aggregate_data_cache;
/// If initialized column is nullable.
bool is_nullable = false;
void init(ColumnRawPtrs &)
{
......@@ -429,7 +481,8 @@ struct AggregationMethodSingleLowCardinalityColumn : public SingleColumnMethod
+ demangle(typeid(cached_val).name()), ErrorCodes::LOGICAL_ERROR);
}
auto * dict = column->getDictionary().getNestedColumn().get();
auto * dict = column->getDictionary().getNestedNotNullableColumn().get();
is_nullable = column->getDictionary().nestedColumnIsNullable();
key = {dict};
bool is_shared_dict = column->isSharedDictionary();
......@@ -463,8 +516,7 @@ struct AggregationMethodSingleLowCardinalityColumn : public SingleColumnMethod
}
AggregateDataPtr default_data = nullptr;
aggregate_data.assign(key[0]->size(), default_data);
aggregate_data_cache = &aggregate_data;
aggregate_data_cache.assign(key[0]->size(), default_data);
size_of_index_type = column->getSizeOfIndexType();
positions = column->getIndexesPtr().get();
......@@ -507,10 +559,18 @@ struct AggregationMethodSingleLowCardinalityColumn : public SingleColumnMethod
Arena & pool)
{
size_t row = getIndexAt(i);
if ((*aggregate_data_cache)[row])
if (is_nullable && row == 0)
{
inserted = !data.hasNullKeyData();
data.hasNullKeyData() = true;
return &data.getNullKeyData();
}
if (aggregate_data_cache[row])
{
inserted = false;
return &(*aggregate_data_cache)[row];
return &aggregate_data_cache[row];
}
else
{
......@@ -527,23 +587,35 @@ struct AggregationMethodSingleLowCardinalityColumn : public SingleColumnMethod
if (inserted)
Base::onNewKey(*it, keys_size, keys, pool);
else
(*aggregate_data_cache)[row] = Base::getAggregateData(it->second);
aggregate_data_cache[row] = Base::getAggregateData(it->second);
return &Base::getAggregateData(it->second);
}
}
ALWAYS_INLINE bool isNullAt(size_t i)
{
if (!is_nullable)
return false;
return getIndexAt(i) == 0;
}
ALWAYS_INLINE void cacheAggregateData(size_t i, AggregateDataPtr data)
{
size_t row = getIndexAt(i);
(*aggregate_data_cache)[row] = data;
aggregate_data_cache[row] = data;
}
template <typename D>
ALWAYS_INLINE AggregateDataPtr * findFromRow(D & data, size_t i)
{
size_t row = getIndexAt(i);
if (!(*aggregate_data_cache)[row])
if (is_nullable && row == 0)
return data.hasNullKeyData() ? &data.getNullKeyData() : nullptr;
if (!aggregate_data_cache[row])
{
ColumnRawPtrs key_columns;
Sizes key_sizes;
......@@ -558,9 +630,9 @@ struct AggregationMethodSingleLowCardinalityColumn : public SingleColumnMethod
it = data.find(key);
if (it != data.end())
(*aggregate_data_cache)[row] = Base::getAggregateData(it->second);
aggregate_data_cache[row] = Base::getAggregateData(it->second);
}
return &(*aggregate_data_cache)[row];
return &aggregate_data_cache[row];
}
};
......@@ -971,17 +1043,17 @@ struct AggregatedDataVariants : private boost::noncopyable
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel, true>> nullable_keys256_two_level;
/// Support for low cardinality.
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt8, AggregatedDataWithUInt8Key>>> low_cardinality_key8;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt16, AggregatedDataWithUInt16Key>>> low_cardinality_key16;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64Key>>> low_cardinality_key32;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64Key>>> low_cardinality_key64;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodString<AggregatedDataWithStringKey>>> low_cardinality_key_string;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodFixedString<AggregatedDataWithStringKey>>> low_cardinality_key_fixed_string;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64KeyTwoLevel>>> low_cardinality_key32_two_level;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyTwoLevel>>> low_cardinality_key64_two_level;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodString<AggregatedDataWithStringKeyTwoLevel>>> low_cardinality_key_string_two_level;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodFixedString<AggregatedDataWithStringKeyTwoLevel>>> low_cardinality_key_fixed_string_two_level;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt8, AggregatedDataWithNullableUInt8Key>>> low_cardinality_key8;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt16, AggregatedDataWithNullableUInt16Key>>> low_cardinality_key16;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt64Key>>> low_cardinality_key32;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64Key>>> low_cardinality_key64;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodString<AggregatedDataWithNullableStringKey>>> low_cardinality_key_string;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodFixedString<AggregatedDataWithNullableStringKey>>> low_cardinality_key_fixed_string;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt64KeyTwoLevel>>> low_cardinality_key32_two_level;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64KeyTwoLevel>>> low_cardinality_key64_two_level;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodString<AggregatedDataWithNullableStringKeyTwoLevel>>> low_cardinality_key_string_two_level;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodFixedString<AggregatedDataWithNullableStringKeyTwoLevel>>> low_cardinality_key_fixed_string_two_level;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128, false, true>> low_cardinality_keys128;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256, false, true>> low_cardinality_keys256;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册