提交 2311c69d 编写于 作者: A Alexey Milovidov

dbms: development of memory efficient distributed aggregation method [#METR-17536].

上级 8d25390c
......@@ -53,7 +53,7 @@ public:
size_t hash(const Key & x) const { return Hash::operator()(x); }
/// NOTE Плохо для хэш-таблиц больше чем на 2^32 ячеек.
size_t getBucketFromHash(size_t hash_value) const { return (hash_value >> (32 - BITS_FOR_BUCKET)) & MAX_BUCKET; }
static size_t getBucketFromHash(size_t hash_value) { return (hash_value >> (32 - BITS_FOR_BUCKET)) & MAX_BUCKET; }
protected:
typename Impl::iterator beginOfNextNonEmptyBucket(size_t & bucket)
......
......@@ -19,12 +19,10 @@ public:
MergingAggregatedMemoryEfficientBlockInputStream(BlockInputStreams inputs_, const Names & keys_names_,
const AggregateDescriptions & aggregates_, bool overflow_row_, bool final_)
: aggregator(keys_names_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0),
final(final_)
final(final_),
inputs(inputs_.begin(), inputs_.end())
{
children = inputs_;
current_blocks.resize(children.size());
overflow_blocks.resize(children.size());
is_exhausted.resize(children.size());
}
String getName() const override { return "MergingAggregatedMemorySavvy"; }
......@@ -43,68 +41,196 @@ protected:
Block readImpl() override
{
/// Если child - RemoteBlockInputStream, то отправляет запрос на все удалённые серверы, инициируя вычисления.
if (current_bucket_num == -1)
if (!started)
{
started = true;
for (auto & child : children)
child->readPrefix();
}
/// Всё прочитали.
if (current_bucket_num > 255)
return {};
/** Имеем несколько источников.
* Из каждого из них могут приходить следующие данные:
*
* 1. Блок, с указанным bucket_num.
* Это значит, что на удалённом сервере, данные были разрезаны по корзинам.
* И данные для одного bucket_num с разных серверов можно независимо объединять.
* При этом, даннные для разных bucket_num будут идти по возрастанию.
*
* 2. Блок без указания bucket_num.
* Это значит, что на удалённом сервере, данные не были разрезаны по корзинам.
* В случае, когда со всех серверов прийдут такие данные, их можно всех объединить.
* А если с другой части серверов прийдут данные, разрезанные по корзинам,
* то данные, не разрезанные по корзинам, нужно сначала разрезать, а потом объединять.
*
* 3. Блоки с указанием is_overflows.
* Это дополнительные данные для строк, не прошедших через max_rows_to_group_by.
* Они должны объединяться друг с другом отдельно.
*/
constexpr size_t NUM_BUCKETS = 256;
/// Читаем следующие блоки для current_bucket_num
for (size_t i = 0, size = children.size(); i < size; ++i)
++current_bucket_num;
for (auto & input : inputs)
{
if (input.is_exhausted)
continue;
if (input.block.info.bucket_num >= current_bucket_num)
continue;
//std::cerr << "reading block\n";
Block block = input.stream->read();
if (!block)
{
//std::cerr << "input is exhausted\n";
input.is_exhausted = true;
continue;
}
if (block.info.bucket_num != -1)
{
//std::cerr << "block for bucket " << block.info.bucket_num << "\n";
has_two_level = true;
input.block = block;
}
else if (block.info.is_overflows)
{
//std::cerr << "block for overflows\n";
has_overflows = true;
input.overflow_block = block;
}
else
{
//std::cerr << "block without bucket\n";
input.block = block;
}
}
while (true)
{
while (!is_exhausted[i] && (!current_blocks[i] || current_blocks[i].info.bucket_num < current_bucket_num))
if (current_bucket_num == NUM_BUCKETS)
{
current_blocks[i] = children[i]->read();
//std::cerr << "at end\n";
if (!current_blocks[i])
if (has_overflows)
{
is_exhausted[i] = true;
//std::cerr << "merging overflows\n";
has_overflows = false;
BlocksList blocks_to_merge;
for (auto & input : inputs)
if (input.overflow_block)
blocks_to_merge.emplace_back(std::move(input.overflow_block));
return aggregator.mergeBlocks(blocks_to_merge, final);
}
else if (current_blocks[i].info.is_overflows)
else
return {};
}
else if (has_two_level)
{
//std::cerr << "has two level\n";
int min_bucket_num = NUM_BUCKETS;
for (auto & input : inputs)
{
overflow_blocks[i].swap(current_blocks[i]);
if (input.block.info.bucket_num != -1 && input.block.info.bucket_num < min_bucket_num)
min_bucket_num = input.block.info.bucket_num;
if (input.block.info.bucket_num == -1 && input.block && input.splitted_blocks.empty())
{
//std::cerr << "having block without bucket; will split\n";
input.splitted_blocks = aggregator.convertBlockToTwoLevel(input.block);
/// Нельзя уничтожать исходный блок.
}
if (!input.splitted_blocks.empty())
{
for (const auto & block : input.splitted_blocks)
{
if (block && block.info.bucket_num < min_bucket_num)
{
min_bucket_num = block.info.bucket_num;
break;
}
}
}
}
}
}
/// Может быть, нет блоков для current_bucket_num, а все блоки имеют больший bucket_num.
Int32 min_bucket_num = 256;
for (size_t i = 0, size = children.size(); i < size; ++i)
if (!is_exhausted[i] && current_blocks[i].info.bucket_num < min_bucket_num)
min_bucket_num = current_blocks[i].info.bucket_num;
current_bucket_num = min_bucket_num;
//std::cerr << "current_bucket_num = " << current_bucket_num << "\n";
current_bucket_num = min_bucket_num;
if (current_bucket_num == NUM_BUCKETS)
continue;
/// Все потоки исчерпаны.
if (current_bucket_num > 255)
return {}; /// TODO overflow_blocks.
BlocksList blocks_to_merge;
/// TODO Если есть single_level и two_level блоки.
for (auto & input : inputs)
{
if (input.block.info.bucket_num == current_bucket_num)
{
//std::cerr << "having block for current_bucket_num\n";
blocks_to_merge.emplace_back(std::move(input.block));
input.block = Block();
}
else if (!input.splitted_blocks.empty() && input.splitted_blocks[min_bucket_num])
{
//std::cerr << "having splitted data for bucket\n";
blocks_to_merge.emplace_back(std::move(input.splitted_blocks[min_bucket_num]));
input.splitted_blocks[min_bucket_num] = Block();
}
}
/// Объединяем все блоки с current_bucket_num.
return aggregator.mergeBlocks(blocks_to_merge, final);
}
else
{
//std::cerr << "don't have two level\n";
BlocksList blocks_to_merge;
for (size_t i = 0, size = children.size(); i < size; ++i)
if (current_blocks[i].info.bucket_num == current_bucket_num)
blocks_to_merge.emplace_back(std::move(current_blocks[i]));
BlocksList blocks_to_merge;
Block res = aggregator.mergeBlocks(blocks_to_merge, final);
for (auto & input : inputs)
if (input.block)
blocks_to_merge.emplace_back(std::move(input.block));
++current_bucket_num;
return res;
current_bucket_num = NUM_BUCKETS;
return aggregator.mergeBlocks(blocks_to_merge, final);
}
}
}
private:
Aggregator aggregator;
bool final;
Int32 current_bucket_num = -1;
std::vector<Block> current_blocks;
std::vector<UInt8> is_exhausted;
bool started = false;
bool has_two_level = false;
bool has_overflows = false;
int current_bucket_num = -1;
struct Input
{
BlockInputStreamPtr stream;
Block block;
Block overflow_block;
std::vector<Block> splitted_blocks;
bool is_exhausted = false;
Input(BlockInputStreamPtr & stream_) : stream(stream_) {}
};
std::vector<Block> overflow_blocks;
std::vector<Input> inputs;
};
}
......@@ -731,6 +731,11 @@ public:
*/
Block mergeBlocks(BlocksList & blocks, bool final);
/** Преобразовать (разрезать) блок частично-агрегированных данных на много блоков, как если бы использовался двухуровневый метод агрегации.
* Это нужно, чтобы потом было проще объединить результат с другими результатами, уже являющимися двухуровневыми.
*/
std::vector<Block> convertBlockToTwoLevel(const Block & block);
using CancellationHook = std::function<bool()>;
/** Установить функцию, которая проверяет, можно ли прервать текущую задачу.
......@@ -806,7 +811,7 @@ protected:
/** Если заданы только имена столбцов (key_names, а также aggregates[i].column_name), то вычислить номера столбцов.
* Сформировать блок - пример результата.
*/
void initialize(Block & block);
void initialize(const Block & block);
/** Выбрать способ агрегации на основе количества и типов ключей. */
AggregatedDataVariants::Type chooseAggregationMethod(const ConstColumnPlainPtrs & key_columns, Sizes & key_sizes);
......@@ -961,6 +966,16 @@ protected:
Block & block,
AggregatedDataVariants & result) const;
template <typename Method>
void convertBlockToTwoLevelImpl(
Method & method,
Arena * pool,
ConstColumnPlainPtrs & key_columns,
const Sizes & key_sizes,
StringRefs & keys,
const Block & source,
std::vector<Block> & destinations) const;
template <typename Method>
void destroyImpl(
Method & method) const;
......
......@@ -58,7 +58,7 @@ void AggregatedDataVariants::convertToTwoLevel()
}
void Aggregator::initialize(Block & block)
void Aggregator::initialize(const Block & block)
{
if (isCancelled())
return;
......@@ -1749,6 +1749,106 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
}
template <typename Method>
void NO_INLINE Aggregator::convertBlockToTwoLevelImpl(
Method & method,
Arena * pool,
ConstColumnPlainPtrs & key_columns,
const Sizes & key_sizes,
StringRefs & keys,
const Block & source,
std::vector<Block> & destinations) const
{
typename Method::State state;
state.init(key_columns);
size_t rows = source.rowsInFirstColumn();
size_t columns = source.columns();
/// Для всех строчек.
for (size_t i = 0; i < rows; ++i)
{
/// Получаем ключ. Вычисляем на его основе номер корзины.
typename Method::Key key = state.getKey(key_columns, keys_size, i, key_sizes, keys, *pool);
auto hash = method.data.hash(key);
auto bucket = method.data.getBucketFromHash(hash);
/// Этот ключ нам больше не нужен.
method.onExistingKey(key, keys, *pool);
Block & dst = destinations[bucket];
if (unlikely(!dst))
{
dst = source.cloneEmpty();
dst.info.bucket_num = bucket;
}
for (size_t j = 0; j < columns; ++j)
dst.unsafeGetByPosition(j).column.get()->insertFrom(*source.unsafeGetByPosition(j).column.get(), i);
}
}
std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block)
{
if (!block)
return {};
initialize(block);
AggregatedDataVariants data;
StringRefs key(keys_size);
ConstColumnPlainPtrs key_columns(keys_size);
Sizes key_sizes;
/// Запоминаем столбцы, с которыми будем работать
for (size_t i = 0; i < keys_size; ++i)
key_columns[i] = block.getByPosition(i).column;
AggregatedDataVariants::Type type = chooseAggregationMethod(key_columns, key_sizes);
#define M(NAME) \
else if (type == AggregatedDataVariants::Type::NAME) \
type = AggregatedDataVariants::Type::NAME ## _two_level;
if (false) {}
APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M)
#undef M
else
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
data.init(type);
size_t num_buckets = 0;
#define M(NAME) \
else if (data.type == AggregatedDataVariants::Type::NAME) \
num_buckets = data.NAME->data.NUM_BUCKETS;
if (false) {}
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
#undef M
else
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
std::vector<Block> splitted_blocks(num_buckets);
#define M(NAME) \
else if (data.type == AggregatedDataVariants::Type::NAME) \
convertBlockToTwoLevelImpl(*data.NAME, data.aggregates_pool, \
key_columns, data.key_sizes, key, block, splitted_blocks);
if (false) {}
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
#undef M
else
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
return splitted_blocks;
}
template <typename Method>
void NO_INLINE Aggregator::destroyImpl(
Method & method) const
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册