提交 974402ab 编写于 作者: A Azat Khuzhin 提交者: Nikolai Kochetov

Access aggregate keys by names over indexes

Relax this to allow accepting aggregate keys in different order (typical
use case is distributed queries, where the initiator server will do
final merge).
上级 eb71fe4c
......@@ -125,11 +125,11 @@ Block Aggregator::getHeader(bool final) const
if (final)
{
for (size_t i = 0; i < params.aggregates_size; ++i)
for (const auto & aggregate : params.aggregates)
{
auto & elem = res.getByPosition(params.keys_size + i);
auto & elem = res.getByName(aggregate.column_name);
elem.type = params.aggregates[i].function->getReturnType();
elem.type = aggregate.function->getReturnType();
elem.column = elem.type->createColumn();
}
}
......@@ -1030,7 +1030,8 @@ Block Aggregator::prepareBlockAndFill(
{
if (!final)
{
aggregate_columns[i] = header.safeGetByPosition(i + params.keys_size).type->createColumn();
const auto & aggregate_column_name = params.aggregates[i].column_name;
aggregate_columns[i] = header.getByName(aggregate_column_name).type->createColumn();
/// The ColumnAggregateFunction column captures the shared ownership of the arena with the aggregate function states.
ColumnAggregateFunction & column_aggregate_func = assert_cast<ColumnAggregateFunction &>(*aggregate_columns[i]);
......@@ -1066,10 +1067,11 @@ Block Aggregator::prepareBlockAndFill(
for (size_t i = 0; i < params.aggregates_size; ++i)
{
const auto & aggregate_column_name = params.aggregates[i].column_name;
if (final)
res.getByPosition(i + params.keys_size).column = std::move(final_aggregate_columns[i]);
res.getByName(aggregate_column_name).column = std::move(final_aggregate_columns[i]);
else
res.getByPosition(i + params.keys_size).column = std::move(aggregate_columns[i]);
res.getByName(aggregate_column_name).column = std::move(aggregate_columns[i]);
}
/// Change the size of the columns-constants in the block.
......@@ -1785,7 +1787,10 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
key_columns[i] = block.safeGetByPosition(i).column.get();
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i] = &typeid_cast<const ColumnAggregateFunction &>(*block.safeGetByPosition(params.keys_size + i).column).getData();
{
const auto & aggregate_column_name = params.aggregates[i].column_name;
aggregate_columns[i] = &typeid_cast<const ColumnAggregateFunction &>(*block.getByName(aggregate_column_name).column).getData();
}
typename Method::State state(key_columns, key_sizes, aggregation_state_cache);
......@@ -1861,7 +1866,10 @@ void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl(
/// Remember the columns we will work with
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i] = &typeid_cast<const ColumnAggregateFunction &>(*block.safeGetByPosition(params.keys_size + i).column).getData();
{
const auto & aggregate_column_name = params.aggregates[i].column_name;
aggregate_columns[i] = &typeid_cast<const ColumnAggregateFunction &>(*block.getByName(aggregate_column_name).column).getData();
}
AggregatedDataWithoutKey & res = result.without_key;
if (!res)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册