未验证 提交 0de89e29 编写于 作者: A Alexander Kuzmenkov 提交者: GitHub

Merge pull request #19401 from CurtizJ/aggregating-in-order

Try improve performance of aggregation in order of sorting key
......@@ -366,4 +366,20 @@ private:
}
};
template <typename TLeftColumns, typename TRightColumns>
bool less(const TLeftColumns & lhs, const TRightColumns & rhs, size_t i, size_t j, const SortDescription & descr)
{
for (const auto & elem : descr)
{
size_t ind = elem.column_number;
int res = elem.direction * lhs[ind]->compareAt(i, j, *rhs[ind], elem.nulls_direction);
if (res < 0)
return true;
else if (res > 0)
return false;
}
return false;
}
}
#include <Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Core/SortCursor.h>
#include <ext/range.h>
namespace DB
{
FinishAggregatingInOrderAlgorithm::State::State(
const Chunk & chunk, const SortDescription & desc)
: num_rows(chunk.getNumRows())
, all_columns(chunk.getColumns())
{
if (!chunk)
return;
sorting_columns.reserve(desc.size());
for (const auto & column_desc : desc)
sorting_columns.emplace_back(all_columns[column_desc.column_number].get());
}
FinishAggregatingInOrderAlgorithm::FinishAggregatingInOrderAlgorithm(
const Block & header_,
size_t num_inputs_,
AggregatingTransformParamsPtr params_,
SortDescription description_)
: header(header_)
, num_inputs(num_inputs_)
, params(params_)
, description(std::move(description_))
{
/// Replace column names in description to positions.
for (auto & column_description : description)
{
if (!column_description.column_name.empty())
{
column_description.column_number = header_.getPositionByName(column_description.column_name);
column_description.column_name.clear();
}
}
}
void FinishAggregatingInOrderAlgorithm::initialize(Inputs inputs)
{
current_inputs = std::move(inputs);
states.reserve(num_inputs);
for (size_t i = 0; i < num_inputs; ++i)
states.emplace_back(current_inputs[i].chunk, description);
}
void FinishAggregatingInOrderAlgorithm::consume(Input & input, size_t source_num)
{
states[source_num] = State{input.chunk, description};
}
IMergingAlgorithm::Status FinishAggregatingInOrderAlgorithm::merge()
{
/// Find the input with smallest last row.
std::optional<size_t> best_input;
for (size_t i = 0; i < num_inputs; ++i)
{
if (!states[i].isValid())
continue;
if (!best_input
|| less(states[i].sorting_columns, states[*best_input].sorting_columns,
states[i].num_rows - 1, states[*best_input].num_rows - 1, description))
{
best_input = i;
}
}
if (!best_input)
return Status{aggregate(), true};
/// Chunk at best_input will be aggregated entirely.
auto & best_state = states[*best_input];
best_state.to_row = states[*best_input].num_rows;
/// Find the positions up to which need to aggregate in other chunks.
for (size_t i = 0; i < num_inputs; ++i)
{
if (!states[i].isValid() || i == *best_input)
continue;
auto indices = ext::range(states[i].current_row, states[i].num_rows);
auto it = std::upper_bound(indices.begin(), indices.end(), best_state.num_rows - 1,
[&](size_t lhs_pos, size_t rhs_pos)
{
return less(best_state.sorting_columns, states[i].sorting_columns, lhs_pos, rhs_pos, description);
});
states[i].to_row = (it == indices.end() ? states[i].num_rows : *it);
}
Status status(*best_input);
status.chunk = aggregate();
return status;
}
Chunk FinishAggregatingInOrderAlgorithm::aggregate()
{
BlocksList blocks;
for (size_t i = 0; i < num_inputs; ++i)
{
const auto & state = states[i];
if (!state.isValid() || state.current_row == state.to_row)
continue;
if (state.to_row - state.current_row == state.num_rows)
{
blocks.emplace_back(header.cloneWithColumns(states[i].all_columns));
}
else
{
Columns new_columns;
new_columns.reserve(state.all_columns.size());
for (const auto & column : state.all_columns)
new_columns.emplace_back(column->cut(state.current_row, state.to_row - state.current_row));
blocks.emplace_back(header.cloneWithColumns(new_columns));
}
states[i].current_row = states[i].to_row;
}
auto aggregated = params->aggregator.mergeBlocks(blocks, false);
return {aggregated.getColumns(), aggregated.rows()};
}
}
#pragma once
#include <Processors/Merges/Algorithms/IMergingAlgorithm.h>
#include <Processors/Merges/Algorithms/MergedData.h>
#include <Core/SortDescription.h>
#include <Core/Block.h>
namespace DB
{
struct AggregatingTransformParams;
using AggregatingTransformParamsPtr = std::shared_ptr<AggregatingTransformParams>;
/**
* The second step of aggregation in order of sorting key.
* The transform receives k inputs with partially aggregated data,
* sorted by group by key (prefix of sorting key).
* Then it merges aggregated data from inputs by the following algorithm:
* - At each step find the smallest value X of the sorting key among last rows of current blocks of inputs.
* Since the data is sorted in order of sorting key and has no duplicates in single input stream (because of aggregation),
* X will never appear later in any of input streams.
* - Aggregate all rows in current blocks of inputs up to the upper_bound of X using
* regular hash table algorithm (Aggregator::mergeBlock).
* The hash table at one step will contain all keys <= X from all blocks.
* There is another, simpler algorithm (AggregatingSortedAlgorithm), that merges
* and aggregates sorted data for one key at a time, using one aggregation state.
* It is a simple k-way merge algorithm and it makes O(n*log(k)) comparisons,
* where * n -- number of rows, k -- number of parts. In comparison, this algorithm
* makes about * O(n + k * log(n)) operations, n -- for hash table, k * log(n)
* -- for finding positions in blocks. It is better than O(n*log(k)), when k is not
* too big and not too small (about 100-1000).
*/
class FinishAggregatingInOrderAlgorithm final : public IMergingAlgorithm
{
public:
FinishAggregatingInOrderAlgorithm(
const Block & header_,
size_t num_inputs_,
AggregatingTransformParamsPtr params_,
SortDescription description_);
void initialize(Inputs inputs) override;
void consume(Input & input, size_t source_num) override;
Status merge() override;
private:
Chunk aggregate();
struct State
{
size_t num_rows;
Columns all_columns;
ColumnRawPtrs sorting_columns;
/// Number of row starting from which need to aggregate.
size_t current_row = 0;
/// Number of row up to which need to aggregate (not included).
size_t to_row = 0;
State(const Chunk & chunk, const SortDescription & description);
bool isValid() const { return current_row < num_rows; }
};
Block header;
size_t num_inputs;
AggregatingTransformParamsPtr params;
SortDescription description;
Inputs current_inputs;
std::vector<State> states;
};
}
#pragma once
#include <Processors/Merges/IMergingTransform.h>
#include <Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.h>
namespace DB
{
class ColumnAggregateFunction;
/// Implementation of IMergingTransform via FinishAggregatingInOrderAlgorithm.
class FinishAggregatingInOrderTransform final : public IMergingTransform<FinishAggregatingInOrderAlgorithm>
{
public:
FinishAggregatingInOrderTransform(
const Block & header,
size_t num_inputs,
AggregatingTransformParamsPtr params,
SortDescription description)
: IMergingTransform(
num_inputs, header, header, true,
header,
num_inputs,
params,
std::move(description))
{
}
String getName() const override { return "FinishAggregatingInOrderTransform"; }
};
}
......@@ -3,6 +3,7 @@
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Transforms/AggregatingInOrderTransform.h>
#include <Processors/Merges/AggregatingSortedTransform.h>
#include <Processors/Merges/FinishAggregatingInOrderTransform.h>
namespace DB
{
......@@ -95,11 +96,11 @@ void AggregatingStep::transformPipeline(QueryPipeline & pipeline)
}
}
auto transform = std::make_shared<AggregatingSortedTransform>(
auto transform = std::make_shared<FinishAggregatingInOrderTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
group_by_sort_description,
max_block_size);
transform_params,
group_by_sort_description);
pipeline.addTransform(std::move(transform));
aggregating_sorted = collector.detachProcessors(1);
......
#include <Processors/Transforms/AggregatingInOrderTransform.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Core/SortCursor.h>
namespace DB
{
......@@ -46,21 +47,6 @@ AggregatingInOrderTransform::AggregatingInOrderTransform(
AggregatingInOrderTransform::~AggregatingInOrderTransform() = default;
static bool less(const MutableColumns & lhs, const Columns & rhs, size_t i, size_t j, const SortDescription & descr)
{
for (const auto & elem : descr)
{
size_t ind = elem.column_number;
int res = elem.direction * lhs[ind]->compareAt(i, j, *rhs[ind], elem.nulls_direction);
if (res < 0)
return true;
else if (res > 0)
return false;
}
return false;
}
void AggregatingInOrderTransform::consume(Chunk chunk)
{
size_t rows = chunk.getNumRows();
......
......@@ -37,20 +37,6 @@ FinishSortingTransform::FinishSortingTransform(
description_sorted.assign(description.begin(), description.begin() + prefix_size);
}
static bool less(const Columns & lhs, const Columns & rhs, size_t i, size_t j, const SortDescription & descr)
{
for (const auto & elem : descr)
{
size_t ind = elem.column_number;
int res = elem.direction * lhs[ind]->compareAt(i, j, *rhs[ind], elem.nulls_direction);
if (res < 0)
return true;
else if (res > 0)
return false;
}
return false;
}
void FinishSortingTransform::consume(Chunk chunk)
{
generated_prefix = false;
......
......@@ -79,6 +79,7 @@ SRCS(
LimitTransform.cpp
Merges/Algorithms/AggregatingSortedAlgorithm.cpp
Merges/Algorithms/CollapsingSortedAlgorithm.cpp
Merges/Algorithms/FinishAggregatingInOrderAlgorithm.cpp
Merges/Algorithms/GraphiteRollupSortedAlgorithm.cpp
Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.cpp
Merges/Algorithms/IMergingAlgorithmWithSharedChunks.cpp
......
......@@ -9,12 +9,12 @@ INSERT INTO pk_order(a, b, c, d) VALUES (2, 2, 107, 2), (2, 3, 108, 2), (2, 4, 1
-- Order after group by in order is determined
SELECT a, b FROM pk_order GROUP BY a, b;
SELECT a FROM pk_order GROUP BY a;
SELECT a, b FROM pk_order GROUP BY a, b ORDER BY a, b;
SELECT a FROM pk_order GROUP BY a ORDER BY a;
SELECT a, b, sum(c), avg(d) FROM pk_order GROUP BY a, b;
SELECT a, sum(c), avg(d) FROM pk_order GROUP BY a;
SELECT a, sum(c), avg(d) FROM pk_order GROUP BY -a;
SELECT a, b, sum(c), avg(d) FROM pk_order GROUP BY a, b ORDER BY a, b;
SELECT a, sum(c), avg(d) FROM pk_order GROUP BY a ORDER BY a;
SELECT a, sum(c), avg(d) FROM pk_order GROUP BY -a ORDER BY a;
DROP TABLE IF EXISTS pk_order;
......@@ -26,8 +26,8 @@ INSERT INTO pk_order
set max_block_size = 1;
SELECT d, max(b) FROM pk_order GROUP BY d, a LIMIT 5;
SELECT d, avg(a) FROM pk_order GROUP BY toString(d) LIMIT 5;
SELECT toStartOfHour(d) as d1, min(a), max(b) FROM pk_order GROUP BY d1 LIMIT 5;
SELECT d, max(b) FROM pk_order GROUP BY d, a ORDER BY d, a LIMIT 5;
SELECT d, avg(a) FROM pk_order GROUP BY toString(d) ORDER BY toString(d) LIMIT 5;
SELECT toStartOfHour(d) as d1, min(a), max(b) FROM pk_order GROUP BY d1 ORDER BY d1 LIMIT 5;
DROP TABLE pk_order;
......@@ -2,7 +2,7 @@
ExpressionTransform
(Aggregating)
FinalizingSimpleTransform
AggregatingSortedTransform 3 → 1
FinishAggregatingInOrderTransform 3 → 1
AggregatingInOrderTransform × 3
(Expression)
ExpressionTransform × 3
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册