提交 9ce0607d 编写于 作者: N Nikolai Kochetov

Remove header from AggregatingSortedAlgorithm.

上级 0544c1f2
......@@ -48,7 +48,7 @@ namespace
type = nullptr;
// simple aggregate function
AggregatingSortedAlgorithm::SimpleAggregateDescription desc(simple_aggr->getFunction(), i, type);
AggregatingSortedAlgorithm::SimpleAggregateDescription desc(simple_aggr->getFunction(), i, type, column.type);
if (desc.function->allocatesMemoryInArena())
def.allocates_memory_in_arena = true;
......@@ -84,10 +84,9 @@ namespace
}
AggregatingSortedAlgorithm::AggregatingSortedAlgorithm(
const Block & header_, size_t num_inputs,
const Block & header, size_t num_inputs,
SortDescription description_, size_t max_block_size)
: IMergingAlgorithmWithDelayedChunk(num_inputs, std::move(description_))
, header(header_)
, columns_definition(defineColumns(header, description_))
, merged_data(getMergedColumns(header, columns_definition), max_block_size, columns_definition)
{
......@@ -102,7 +101,7 @@ void AggregatingSortedAlgorithm::prepareChunk(Chunk & chunk) const
column = column->convertToFullColumnIfConst();
for (auto & desc : columns_definition.columns_to_simple_aggregate)
if (desc.inner_type)
if (desc.nested_type)
columns[desc.column_number] = recursiveRemoveLowCardinality(columns[desc.column_number]);
chunk.setColumns(std::move(columns), num_rows);
......@@ -159,7 +158,7 @@ IMergingAlgorithm::Status AggregatingSortedAlgorithm::merge()
if (merged_data.hasEnoughRows())
{
last_key.reset();
Status(merged_data.pull(columns_definition, header));
Status(merged_data.pull(columns_definition));
}
/// We will write the data for the group. We copy the values of ordinary columns.
......@@ -200,7 +199,7 @@ IMergingAlgorithm::Status AggregatingSortedAlgorithm::merge()
}
last_chunk_sort_columns.clear();
return Status(merged_data.pull(columns_definition, header), true);
return Status(merged_data.pull(columns_definition), true);
}
void AggregatingSortedAlgorithm::addRow(SortCursor & cursor)
......
......@@ -72,7 +72,7 @@ private:
/// TODO: sum_blocks_granularity += block_size;
}
Chunk pull(ColumnsDefinition & def, const Block & header_)
Chunk pull(ColumnsDefinition & def)
{
auto chunk = pull();
......@@ -81,10 +81,10 @@ private:
for (auto & desc : def.columns_to_simple_aggregate)
{
if (desc.inner_type)
if (desc.nested_type)
{
auto & from_type = desc.inner_type;
auto & to_type = header_.getByPosition(desc.column_number).type;
auto & from_type = desc.nested_type;
auto & to_type = desc.real_type;
columns_[desc.column_number] = recursiveTypeConversion(columns_[desc.column_number], from_type, to_type);
}
}
......@@ -111,8 +111,6 @@ private:
using MergedData::pull;
};
Block header;
ColumnsDefinition columns_definition;
AggregatingMergedData merged_data;
......@@ -134,13 +132,19 @@ public:
size_t column_number = 0;
IColumn * column = nullptr;
const DataTypePtr inner_type;
/// For LowCardinality, convert is converted to nested type. nested_type is nullptr if no conversion needed.
const DataTypePtr nested_type; /// Nested type for LowCardinality, if it is.
const DataTypePtr real_type; /// Type in header.
AlignedBuffer state;
bool created = false;
SimpleAggregateDescription(AggregateFunctionPtr function_, const size_t column_number_, DataTypePtr type)
: function(std::move(function_)), column_number(column_number_), inner_type(std::move(type))
SimpleAggregateDescription(
AggregateFunctionPtr function_, const size_t column_number_,
DataTypePtr nested_type_, DataTypePtr real_type_)
: function(std::move(function_)), column_number(column_number_)
, nested_type(std::move(nested_type_)), real_type(std::move(real_type_))
{
add_function = function->getAddressOfAddFunction();
state.reset(function->sizeOfData(), function->alignOfData());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册