提交 4674beb9 编写于 作者: N Nikolai Kochetov

Shared cache for low cradinality single aggregation column. [#CLICKHOUSE-3903]

上级 df71c478
......@@ -532,8 +532,7 @@ template <typename ColumnType>
UInt128 ColumnUnique<ColumnType>::IncrementalHash::getHash(const ColumnType & column)
{
size_t column_size = column.size();
UInt128 cur_hash = hash;
UInt128 cur_hash;
if (column_size != num_added_rows.load())
{
......@@ -542,12 +541,14 @@ UInt128 ColumnUnique<ColumnType>::IncrementalHash::getHash(const ColumnType & co
column.updateHashWithValue(i, sip_hash);
std::lock_guard lock(mutex);
if (column_size != num_added_rows.load())
{
sip_hash.get128(hash.low, hash.high);
cur_hash = hash;
num_added_rows.store(column_size);
}
sip_hash.get128(hash.low, hash.high);
cur_hash = hash;
num_added_rows.store(column_size);
}
else
{
std::lock_guard lock(mutex);
cur_hash = hash;
}
return cur_hash;
......
......@@ -530,8 +530,6 @@ MergingAggregatedMemoryEfficientBlockInputStream::BlocksToMerge MergingAggregate
int min_bucket_num = NUM_BUCKETS;
AggregationStateCachePtr cache;
for (auto & input : inputs)
{
/// Blocks for already partitioned (two-level) data.
......@@ -543,7 +541,7 @@ MergingAggregatedMemoryEfficientBlockInputStream::BlocksToMerge MergingAggregate
{
LOG_TRACE(&Logger::get("MergingAggregatedMemoryEfficient"), "Having block without bucket: will split.");
input.splitted_blocks = aggregator.convertBlockToTwoLevel(input.block, cache);
input.splitted_blocks = aggregator.convertBlockToTwoLevel(input.block);
input.block = Block();
}
......
......@@ -109,7 +109,7 @@ ParallelAggregatingBlockInputStream::TemporaryFileStream::TemporaryFileStream(co
void ParallelAggregatingBlockInputStream::Handler::onBlock(Block & block, size_t thread_num)
{
parent.aggregator.executeOnBlock(block, *parent.many_data[thread_num], parent.threads_data[thread_num].cache,
parent.aggregator.executeOnBlock(block, *parent.many_data[thread_num],
parent.threads_data[thread_num].key_columns, parent.threads_data[thread_num].aggregate_columns,
parent.threads_data[thread_num].key, parent.no_more_keys);
......@@ -125,7 +125,7 @@ void ParallelAggregatingBlockInputStream::Handler::onFinishThread(size_t thread_
auto & data = *parent.many_data[thread_num];
if (data.isConvertibleToTwoLevel())
data.convertToTwoLevel(parent.threads_data[thread_num].cache);
data.convertToTwoLevel();
if (data.size())
parent.aggregator.writeToTemporaryFile(data);
......@@ -138,12 +138,10 @@ void ParallelAggregatingBlockInputStream::Handler::onFinish()
{
/// It may happen that some data has not yet been flushed,
/// because at the time of `onFinishThread` call, no data has been flushed to disk, and then some were.
for (size_t thread_num = 0; thread_num < parent.many_data.size(); ++thread_num)
for (auto & data : parent.many_data)
{
auto & data = parent.many_data[thread_num];
if (data->isConvertibleToTwoLevel())
data->convertToTwoLevel(parent.threads_data[thread_num].cache);
data->convertToTwoLevel();
if (data->size())
parent.aggregator.writeToTemporaryFile(*data);
......@@ -206,7 +204,7 @@ void ParallelAggregatingBlockInputStream::execute()
/// If there was no data, and we aggregate without keys, we must return single row with the result of empty aggregation.
/// To do this, we pass a block with zero rows to aggregate.
if (total_src_rows == 0 && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set)
aggregator.executeOnBlock(children.at(0)->getHeader(), *many_data[0], threads_data[0].cache,
aggregator.executeOnBlock(children.at(0)->getHeader(), *many_data[0],
threads_data[0].key_columns, threads_data[0].aggregate_columns,
threads_data[0].key, no_more_keys);
}
......
......@@ -84,8 +84,6 @@ private:
ColumnRawPtrs key_columns;
Aggregator::AggregateColumns aggregate_columns;
AggregationStateCachePtr cache;
ThreadData(size_t keys_size, size_t aggregates_size)
{
key.resize(keys_size);
......
......@@ -26,6 +26,7 @@
#include <Interpreters/config_compile.h>
#include <Columns/ColumnWithDictionary.h>
#include <DataTypes/DataTypeWithDictionary.h>
#include "Settings.h"
#endif
......@@ -70,10 +71,8 @@ AggregatedDataVariants::~AggregatedDataVariants()
}
void AggregatedDataVariants::convertToTwoLevel(AggregationStateCachePtr & cache)
void AggregatedDataVariants::convertToTwoLevel()
{
cache = nullptr;
if (aggregator)
LOG_TRACE(aggregator->log, "Converting aggregation data to two-level.");
......@@ -176,6 +175,9 @@ Aggregator::Aggregator(const Params & params_)
}
method = chooseAggregationMethod();
AggregationStateCache::Settings cache_settings;
cache_settings.max_threads = params.max_threads;
aggregation_state_cache = AggregatedDataVariants::createCache(method, cache_settings);
}
......@@ -561,7 +563,6 @@ void Aggregator::createAggregateStates(AggregateDataPtr & aggregate_data) const
template <typename Method>
void NO_INLINE Aggregator::executeImpl(
Method & method,
AggregationStateCachePtr & cache,
Arena * aggregates_pool,
size_t rows,
ColumnRawPtrs & key_columns,
......@@ -573,7 +574,7 @@ void NO_INLINE Aggregator::executeImpl(
{
typename Method::State state;
if constexpr (Method::low_cardinality_optimization)
state.init(key_columns, cache, aggregates_pool);
state.init(key_columns, aggregation_state_cache);
else
state.init(key_columns);
......@@ -722,7 +723,7 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl(
}
bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & result, AggregationStateCachePtr & cache,
bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & result,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, StringRefs & key,
bool & no_more_keys)
{
......@@ -877,7 +878,7 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
{
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
executeImpl(*result.NAME, cache, result.aggregates_pool, rows, key_columns, &aggregate_functions_instructions[0], \
executeImpl(*result.NAME, result.aggregates_pool, rows, key_columns, &aggregate_functions_instructions[0], \
result.key_sizes, key, no_more_keys, overflow_row_ptr);
if (false) {}
......@@ -901,7 +902,7 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
* It allows you to make, in the subsequent, an effective merge - either economical from memory or parallel.
*/
if (result.isConvertibleToTwoLevel() && worth_convert_to_two_level)
result.convertToTwoLevel(cache);
result.convertToTwoLevel();
/// Checking the constraints.
if (!checkLimits(result_size, no_more_keys))
......@@ -1102,8 +1103,6 @@ void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVaria
size_t src_rows = 0;
size_t src_bytes = 0;
AggregationStateCachePtr cache;
/// Read all the data
while (Block block = stream->read())
{
......@@ -1113,14 +1112,14 @@ void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVaria
src_rows += block.rows();
src_bytes += block.bytes();
if (!executeOnBlock(block, result, cache, key_columns, aggregate_columns, key, no_more_keys))
if (!executeOnBlock(block, result, key_columns, aggregate_columns, key, no_more_keys))
break;
}
/// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation.
/// To do this, we pass a block with zero rows to aggregate.
if (result.empty() && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set)
executeOnBlock(stream->getHeader(), result, cache, key_columns, aggregate_columns, key, no_more_keys);
executeOnBlock(stream->getHeader(), result, key_columns, aggregate_columns, key, no_more_keys);
double elapsed_seconds = watch.elapsedSeconds();
size_t rows = result.sizeWithoutOverflowRow();
......@@ -1898,11 +1897,10 @@ std::unique_ptr<IBlockInputStream> Aggregator::mergeAndConvertToBlocks(
}
}
AggregationStateCachePtr cache;
if (has_at_least_one_two_level)
for (auto & variant : non_empty_data)
if (!variant->isTwoLevel())
variant->convertToTwoLevel(cache);
variant->convertToTwoLevel();
AggregatedDataVariantsPtr & first = non_empty_data[0];
......@@ -1929,7 +1927,6 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
const Sizes & key_sizes,
Arena * aggregates_pool,
Method & method,
AggregationStateCachePtr & cache,
Table & data,
AggregateDataPtr overflow_row) const
{
......@@ -1945,7 +1942,7 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
typename Method::State state;
if constexpr (Method::low_cardinality_optimization)
state.init(key_columns, cache, aggregates_pool);
state.init(key_columns, aggregation_state_cache);
else
state.init(key_columns);
......@@ -2033,15 +2030,14 @@ void NO_INLINE Aggregator::mergeStreamsImpl(
const Sizes & key_sizes,
Arena * aggregates_pool,
Method & method,
AggregationStateCachePtr & cache,
Table & data,
AggregateDataPtr overflow_row,
bool no_more_keys) const
{
if (!no_more_keys)
mergeStreamsImplCase<false>(block, key_sizes, aggregates_pool, method, cache, data, overflow_row);
mergeStreamsImplCase<false>(block, key_sizes, aggregates_pool, method, data, overflow_row);
else
mergeStreamsImplCase<true>(block, key_sizes, aggregates_pool, method, cache, data, overflow_row);
mergeStreamsImplCase<true>(block, key_sizes, aggregates_pool, method, data, overflow_row);
}
......@@ -2149,8 +2145,6 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
{
CurrentThread::attachToIfDetached(thread_group);
AggregationStateCachePtr cache;
for (Block & block : bucket_to_blocks[bucket])
{
if (isCancelled())
......@@ -2158,7 +2152,7 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
#define M(NAME) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(block, result.key_sizes, aggregates_pool, *result.NAME, cache, result.NAME->data.impls[bucket], nullptr, false);
mergeStreamsImpl(block, result.key_sizes, aggregates_pool, *result.NAME, result.NAME->data.impls[bucket], nullptr, false);
if (false) {}
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
......@@ -2208,8 +2202,6 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
bool no_more_keys = false;
AggregationStateCachePtr cache;
BlocksList & blocks = bucket_to_blocks[-1];
for (Block & block : blocks)
{
......@@ -2227,7 +2219,7 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(block, result.key_sizes, result.aggregates_pool, *result.NAME, cache, result.NAME->data, result.without_key, no_more_keys);
mergeStreamsImpl(block, result.key_sizes, result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, no_more_keys);
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
......@@ -2285,8 +2277,6 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
result.keys_size = params.keys_size;
result.key_sizes = key_sizes;
AggregationStateCachePtr cache;
for (Block & block : blocks)
{
if (isCancelled())
......@@ -2300,7 +2290,7 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(block, key_sizes, result.aggregates_pool, *result.NAME, cache, result.NAME->data, nullptr, false);
mergeStreamsImpl(block, key_sizes, result.aggregates_pool, *result.NAME, result.NAME->data, nullptr, false);
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
......@@ -2344,7 +2334,6 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
template <typename Method>
void NO_INLINE Aggregator::convertBlockToTwoLevelImpl(
Method & method,
AggregationStateCachePtr & cache,
Arena * pool,
ColumnRawPtrs & key_columns,
const Sizes & key_sizes,
......@@ -2354,7 +2343,7 @@ void NO_INLINE Aggregator::convertBlockToTwoLevelImpl(
{
typename Method::State state;
if constexpr (Method::low_cardinality_optimization)
state.init(key_columns, cache, pool);
state.init(key_columns, aggregation_state_cache);
else
state.init(key_columns);
......@@ -2403,7 +2392,7 @@ void NO_INLINE Aggregator::convertBlockToTwoLevelImpl(
}
std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block, AggregationStateCachePtr & cache)
std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block)
{
if (!block)
return {};
......@@ -2449,7 +2438,7 @@ std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block, Aggre
#define M(NAME) \
else if (data.type == AggregatedDataVariants::Type::NAME) \
convertBlockToTwoLevelImpl(*data.NAME, cache, data.aggregates_pool, \
convertBlockToTwoLevelImpl(*data.NAME, data.aggregates_pool, \
key_columns, data.key_sizes, key, block, splitted_blocks);
if (false) {}
......
......@@ -14,6 +14,7 @@
#include <Common/HashTable/TwoLevelHashMap.h>
#include <common/ThreadPool.h>
#include <Common/UInt128.h>
#include <Common/LRUCache.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/SizeLimits.h>
......@@ -89,10 +90,15 @@ using AggregatedDataWithStringKeyHash64 = HashMapWithSavedHash<StringRef, Aggreg
using AggregatedDataWithKeys128Hash64 = HashMap<UInt128, AggregateDataPtr, UInt128Hash>;
using AggregatedDataWithKeys256Hash64 = HashMap<UInt256, AggregateDataPtr, UInt256Hash>;
/// Cache which can be used by aggregations method's states. Object is shared in all threads.
struct AggregationStateCache
{
virtual ~AggregationStateCache() = default;
struct Settings
{
size_t max_threads;
};
};
using AggregationStateCachePtr = std::shared_ptr<AggregationStateCache>;
......@@ -171,6 +177,8 @@ struct AggregationMethodOneNumber
{
return StringRef(reinterpret_cast<const char *>(&value.first), sizeof(value.first));
}
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; }
};
......@@ -240,6 +248,8 @@ struct AggregationMethodString
{
key_columns[0]->insertData(value.first.data, value.first.size);
}
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; }
};
......@@ -307,8 +317,54 @@ struct AggregationMethodFixedString
{
key_columns[0]->insertData(value.first.data, value.first.size);
}
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; }
};
class LowCardinalityDictionaryCache : public AggregationStateCache
{
public:
/// Will assume that dictionaries with same hash has the same keys.
/// Just in case, check that they have also the same size.
struct DictionaryKey
{
UInt128 hash;
UInt64 size;
bool operator== (const DictionaryKey & other) const { return hash == other.hash && size == other.size; }
};
struct DictionaryKeyHash
{
size_t operator()(const DictionaryKey & key) const
{
SipHash hash;
hash.update(key.hash.low);
hash.update(key.hash.high);
hash.update(key.size);
return hash.get64();
}
};
struct CachedValues
{
/// Store ptr to dictionary to be sure it won't be deleted.
ColumnPtr dictionary_holder;
/// Hashes for dictionary keys.
const UInt64 * saved_hash = nullptr;
};
using CachedValuesPtr = std::shared_ptr<CachedValues>;
LowCardinalityDictionaryCache(const AggregationStateCache::Settings & settings) : cache(settings.max_threads) {}
CachedValuesPtr get(const DictionaryKey & key) { return cache.get(key); }
void set(const DictionaryKey & key, const CachedValuesPtr & mapped) { cache.set(key, mapped); }
private:
using Cache = LRUCache<DictionaryKey, CachedValues, DictionaryKeyHash>;
Cache cache;
};
/// Single low cardinality column.
template <typename SingleColumnMethod>
......@@ -325,6 +381,11 @@ struct AggregationMethodSingleLowCardinalityColumn : public SingleColumnMethod
using Base::data;
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & settings)
{
return std::make_shared<LowCardinalityDictionaryCache>(settings);
}
AggregationMethodSingleLowCardinalityColumn() = default;
template <typename Other>
......@@ -337,25 +398,17 @@ struct AggregationMethodSingleLowCardinalityColumn : public SingleColumnMethod
size_t size_of_index_type = 0;
const UInt64 * saved_hash = nullptr;
ColumnPtr dictionary_holder;
PaddedPODArray<AggregateDataPtr> aggregate_data;
PaddedPODArray<AggregateDataPtr> * aggregate_data_cache;
struct Cache : public AggregationStateCache
{
const UInt64 * saved_hash = nullptr;
PaddedPODArray<AggregateDataPtr> aggregate_data;
Arena * pool = nullptr;
const IColumn * dict = nullptr;
ColumnPtr column_unique_holder;
UInt128 dict_hash;
};
void init(ColumnRawPtrs &)
{
throw Exception("Expected cache for AggregationMethodSingleLowCardinalityColumn::init", ErrorCodes::LOGICAL_ERROR);
}
void init(ColumnRawPtrs & key_columns, AggregationStateCachePtr & cache_ptr, Arena * pool)
void init(ColumnRawPtrs & key_columns, const AggregationStateCachePtr & cache_ptr)
{
auto column = typeid_cast<const ColumnWithDictionary *>(key_columns[0]);
if (!column)
......@@ -363,55 +416,61 @@ struct AggregationMethodSingleLowCardinalityColumn : public SingleColumnMethod
"Excepted LowCardinality, got " + key_columns[0]->getName(), ErrorCodes::LOGICAL_ERROR);
if (!cache_ptr)
cache_ptr = std::make_shared<Cache>();
auto cache = static_cast<Cache *>(cache_ptr.get());
throw Exception("Cache wasn't created for AggregationMethodSingleLowCardinalityColumn", ErrorCodes::LOGICAL_ERROR);
auto cache = typeid_cast<LowCardinalityDictionaryCache *>(cache_ptr.get());
if (!cache)
{
const auto & cached_val = *cache_ptr;
throw Exception("Invalid type for AggregationMethodSingleLowCardinalityColumn cache: "
+ demangle(typeid(cached_val).name()), ErrorCodes::LOGICAL_ERROR);
}
auto * dict = column->getDictionary().getNestedColumn().get();
key = {dict};
UInt128 dict_hash = column->getDictionary().getHash();
// bool dict_in_cache = cache->dict && dict == cache->dict;
bool dict_in_cache = cache->dict && dict->size() == cache->dict->size() && dict_hash == cache->dict_hash;
bool is_shared_dict = column->isSharedDictionary();
saved_hash = cache->saved_hash;
aggregate_data_cache = &cache->aggregate_data;
typename LowCardinalityDictionaryCache::DictionaryKey dictionary_key;
typename LowCardinalityDictionaryCache::CachedValuesPtr cached_values;
if (pool == nullptr || pool != cache->pool || !dict_in_cache)
if (is_shared_dict)
{
AggregateDataPtr default_data = nullptr;
aggregate_data.assign(key[0]->size(), default_data);
aggregate_data_cache = &aggregate_data;
if (is_shared_dict)
{
cache->pool = pool;
cache->aggregate_data.swap(aggregate_data);
aggregate_data_cache = &cache->aggregate_data;
}
dictionary_key = {column->getDictionary().getHash(), dict->size()};
cached_values = cache->get(dictionary_key);
}
if (!dict_in_cache)
if (cached_values)
{
saved_hash = cached_values->saved_hash;
dictionary_holder = cached_values->dictionary_holder;
}
else
{
saved_hash = column->getDictionary().tryGetSavedHash();
dictionary_holder = column->getDictionaryPtr();
if (is_shared_dict)
{
cache->dict = dict;
cache->saved_hash = saved_hash;
cache->dict_hash = dict_hash;
cache->column_unique_holder = column->getDictionaryPtr();
cached_values = std::make_shared<typename LowCardinalityDictionaryCache::CachedValues>();
cached_values->saved_hash = saved_hash;
cached_values->dictionary_holder = dictionary_holder;
cache->set(dictionary_key, cached_values);
}
}
AggregateDataPtr default_data = nullptr;
aggregate_data.assign(key[0]->size(), default_data);
aggregate_data_cache = &aggregate_data;
size_of_index_type = column->getSizeOfIndexType();
positions = column->getIndexesPtr().get();
BaseState::init(key);
}
size_t getIndexAt(size_t row) const
ALWAYS_INLINE size_t getIndexAt(size_t row) const
{
switch (size_of_index_type)
{
......@@ -424,7 +483,7 @@ struct AggregationMethodSingleLowCardinalityColumn : public SingleColumnMethod
}
/// Get the key from the key columns for insertion into the hash table.
Key getKey(
ALWAYS_INLINE Key getKey(
const ColumnRawPtrs & /*key_columns*/,
size_t /*keys_size*/,
size_t i,
......@@ -437,7 +496,7 @@ struct AggregationMethodSingleLowCardinalityColumn : public SingleColumnMethod
}
template <typename D>
AggregateDataPtr * emplaceKeyFromRow(
ALWAYS_INLINE AggregateDataPtr * emplaceKeyFromRow(
D & data,
size_t i,
bool & inserted,
......@@ -472,14 +531,14 @@ struct AggregationMethodSingleLowCardinalityColumn : public SingleColumnMethod
}
}
void cacheAggregateData(size_t i, AggregateDataPtr data)
ALWAYS_INLINE void cacheAggregateData(size_t i, AggregateDataPtr data)
{
size_t row = getIndexAt(i);
(*aggregate_data_cache)[row] = data;
}
template <typename D>
AggregateDataPtr * findFromRow(D & data, size_t i)
ALWAYS_INLINE AggregateDataPtr * findFromRow(D & data, size_t i)
{
size_t row = getIndexAt(i);
if (!(*aggregate_data_cache)[row])
......@@ -731,6 +790,8 @@ struct AggregationMethodKeysFixed
}
}
}
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; }
};
......@@ -790,6 +851,8 @@ struct AggregationMethodConcat
insertKeyIntoColumnsImpl(value, key_columns, keys_size, key_sizes);
}
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; }
private:
/// Insert the values of the specified keys into the corresponding columns.
static void insertKeyIntoColumnsImpl(const typename Data::value_type & value, MutableColumns & key_columns, size_t keys_size, const Sizes &)
......@@ -877,6 +940,8 @@ struct AggregationMethodSerialized
for (size_t i = 0; i < keys_size; ++i)
pos = key_columns[i]->deserializeAndInsertFromArena(pos);
}
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; }
};
......@@ -933,6 +998,8 @@ struct AggregationMethodHashed
for (size_t i = 0; i < keys_size; ++i)
key_columns[i]->insertDataWithTerminatingZero(value.second.first[i].data, value.second.first[i].size);
}
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; }
};
......@@ -1219,7 +1286,7 @@ struct AggregatedDataVariants : private boost::noncopyable
}
}
void convertToTwoLevel(AggregationStateCachePtr & cache);
void convertToTwoLevel();
#define APPLY_FOR_VARIANTS_TWO_LEVEL(M) \
M(key32_two_level) \
......@@ -1263,6 +1330,25 @@ struct AggregatedDataVariants : private boost::noncopyable
return false;
}
}
static AggregationStateCachePtr createCache(Type type, const AggregationStateCache::Settings & settings)
{
switch (type)
{
#define M(NAME, IS_TWO_LEVEL) \
case Type::NAME: \
{ \
using TPtr ## NAME = decltype(AggregatedDataVariants::NAME); \
using T ## NAME = typename TPtr ## NAME ::element_type; \
return T ## NAME ::createCache(settings); \
}
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
default:
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
}
}
};
using AggregatedDataVariantsPtr = std::shared_ptr<AggregatedDataVariants>;
......@@ -1324,6 +1410,9 @@ public:
const std::string tmp_path;
/// Settings is used to determine cache size. No threads is created.
size_t max_threads;
Params(
const Block & src_header_,
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_,
......@@ -1332,7 +1421,7 @@ public:
size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_,
size_t max_bytes_before_external_group_by_,
bool empty_result_for_aggregation_by_empty_set_,
const std::string & tmp_path_)
const std::string & tmp_path_, size_t max_threads_)
: src_header(src_header_),
keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()),
overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
......@@ -1340,14 +1429,14 @@ public:
group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_),
max_bytes_before_external_group_by(max_bytes_before_external_group_by_),
empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_),
tmp_path(tmp_path_)
tmp_path(tmp_path_), max_threads(max_threads_)
{
}
/// Only parameters that matter during merge.
Params(const Block & intermediate_header_,
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_)
: Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0, 0, 0, false, "")
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_)
: Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0, 0, 0, false, "", max_threads_)
{
intermediate_header = intermediate_header_;
}
......@@ -1368,7 +1457,6 @@ public:
/// Process one block. Return false if the processing should be aborted (with group_by_overflow_mode = 'break').
bool executeOnBlock(const Block & block, AggregatedDataVariants & result,
AggregationStateCachePtr & cache,
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
StringRefs & keys, /// - pass the corresponding objects that are initially empty.
bool & no_more_keys);
......@@ -1400,7 +1488,7 @@ public:
/** Split block with partially-aggregated data to many blocks, as if two-level method of aggregation was used.
* This is needed to simplify merging of that data with other results, that are already two-level.
*/
std::vector<Block> convertBlockToTwoLevel(const Block & block, AggregationStateCachePtr & cache);
std::vector<Block> convertBlockToTwoLevel(const Block & block);
using CancellationHook = std::function<bool()>;
......@@ -1441,6 +1529,8 @@ protected:
AggregatedDataVariants::Type method;
Sizes key_sizes;
AggregationStateCachePtr aggregation_state_cache;
AggregateFunctionsPlainPtrs aggregate_functions;
/** This array serves two purposes.
......@@ -1516,7 +1606,6 @@ protected:
template <typename Method>
void executeImpl(
Method & method,
AggregationStateCachePtr & cache,
Arena * aggregates_pool,
size_t rows,
ColumnRawPtrs & key_columns,
......@@ -1673,7 +1762,6 @@ protected:
const Sizes & key_sizes,
Arena * aggregates_pool,
Method & method,
AggregationStateCachePtr & cache,
Table & data,
AggregateDataPtr overflow_row) const;
......@@ -1683,7 +1771,6 @@ protected:
const Sizes & key_sizes,
Arena * aggregates_pool,
Method & method,
AggregationStateCachePtr & cache,
Table & data,
AggregateDataPtr overflow_row,
bool no_more_keys) const;
......@@ -1699,7 +1786,6 @@ protected:
template <typename Method>
void convertBlockToTwoLevelImpl(
Method & method,
AggregationStateCachePtr & cache,
Arena * pool,
ColumnRawPtrs & key_columns,
const Sizes & key_sizes,
......
......@@ -861,7 +861,7 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0),
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
context.getTemporaryPath());
context.getTemporaryPath(), settings.max_threads);
/// If there are several sources, then we perform parallel aggregation
if (pipeline.streams.size() > 1)
......@@ -921,10 +921,10 @@ void InterpreterSelectQuery::executeMergeAggregated(Pipeline & pipeline, bool ov
* but it can work more slowly.
*/
Aggregator::Params params(header, keys, aggregates, overflow_row);
const Settings & settings = context.getSettingsRef();
Aggregator::Params params(header, keys, aggregates, overflow_row, settings.max_threads);
if (!settings.distributed_aggregation_memory_efficient)
{
/// We union several sources into one, parallelizing the work.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册