AggregatingSortedAlgorithm.cpp 11.5 KB
Newer Older
1
#include <Processors/Merges/Algorithms/AggregatingSortedAlgorithm.h>
2

N
Nikolai Kochetov 已提交
3 4
#include <Columns/ColumnAggregateFunction.h>
#include <Common/AlignedBuffer.h>
5 6 7 8 9 10 11
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeCustomSimpleAggregateFunction.h>
#include <DataTypes/DataTypeLowCardinality.h>

namespace DB
{

N
Nikolai Kochetov 已提交
12 13 14 15 16
namespace ErrorCodes
{
    extern const int LOGICAL_ERROR;
}

N
Nikolai Kochetov 已提交
17
AggregatingSortedAlgorithm::ColumnsDefinition::ColumnsDefinition() = default;
N
Nikolai Kochetov 已提交
18
AggregatingSortedAlgorithm::ColumnsDefinition::ColumnsDefinition(ColumnsDefinition &&) noexcept = default;
N
Nikolai Kochetov 已提交
19 20
AggregatingSortedAlgorithm::ColumnsDefinition::~ColumnsDefinition() = default;

21 22
/// Stores information for aggregation of AggregateFunction columns
struct AggregatingSortedAlgorithm::AggregateDescription
23
{
24
    ColumnAggregateFunction * column = nullptr;
N
Nikolai Kochetov 已提交
25
    const size_t column_number = 0; /// Position in header.
26

27 28 29
    AggregateDescription() = default;
    explicit AggregateDescription(size_t col_number) : column_number(col_number) {}
};
30

31 32 33 34 35 36
/// Stores information for aggregation of SimpleAggregateFunction columns
struct AggregatingSortedAlgorithm::SimpleAggregateDescription
{
    /// An aggregate function 'anyLast', 'sum'...
    AggregateFunctionPtr function;
    IAggregateFunction::AddFunc add_function = nullptr;
37

38 39
    size_t column_number = 0;
    IColumn * column = nullptr;
40

41 42 43
    /// For LowCardinality, convert is converted to nested type. nested_type is nullptr if no conversion needed.
    const DataTypePtr nested_type; /// Nested type for LowCardinality, if it is.
    const DataTypePtr real_type; /// Type in header.
44

45 46
    AlignedBuffer state;
    bool created = false;
47

48 49 50 51 52 53 54 55 56
    SimpleAggregateDescription(
            AggregateFunctionPtr function_, const size_t column_number_,
            DataTypePtr nested_type_, DataTypePtr real_type_)
            : function(std::move(function_)), column_number(column_number_)
            , nested_type(std::move(nested_type_)), real_type(std::move(real_type_))
    {
        add_function = function->getAddressOfAddFunction();
        state.reset(function->sizeOfData(), function->alignOfData());
    }
57

58 59 60 61 62 63 64
    void createState()
    {
        if (created)
            return;
        function->create(state.data());
        created = true;
    }
N
Nikolai Kochetov 已提交
65

66 67 68 69 70 71 72 73 74 75 76 77
    void destroyState()
    {
        if (!created)
            return;
        function->destroy(state.data());
        created = false;
    }

    /// Explicitly destroy aggregation state if the stream is terminated
    ~SimpleAggregateDescription()
    {
        destroyState();
78
    }
79

80 81 82 83 84 85 86 87 88 89 90 91 92
    SimpleAggregateDescription() = default;
    SimpleAggregateDescription(SimpleAggregateDescription &&) = default;
    SimpleAggregateDescription(const SimpleAggregateDescription &) = delete;
};

static AggregatingSortedAlgorithm::ColumnsDefinition defineColumns(
    const Block & header, const SortDescription & description)
{
    AggregatingSortedAlgorithm::ColumnsDefinition def = {};
    size_t num_columns = header.columns();

    /// Fill in the column numbers that need to be aggregated.
    for (size_t i = 0; i < num_columns; ++i)
93
    {
94 95 96 97 98 99 100 101 102
        const ColumnWithTypeAndName & column = header.safeGetByPosition(i);

        /// We leave only states of aggregate functions.
        if (!dynamic_cast<const DataTypeAggregateFunction *>(column.type.get())
            && !dynamic_cast<const DataTypeCustomSimpleAggregateFunction *>(column.type->getCustomName()))
        {
            def.column_numbers_not_to_aggregate.push_back(i);
            continue;
        }
103

104 105 106 107 108 109 110
        /// Included into PK?
        auto it = description.begin();
        for (; it != description.end(); ++it)
            if (it->column_name == column.name || (it->column_name.empty() && it->column_number == i))
                break;

        if (it != description.end())
111
        {
112 113
            def.column_numbers_not_to_aggregate.push_back(i);
            continue;
114 115
        }

A
Alexey Milovidov 已提交
116
        if (const auto * simple = dynamic_cast<const DataTypeCustomSimpleAggregateFunction *>(column.type->getCustomName()))
117 118 119 120 121 122 123 124 125
        {
            auto type = recursiveRemoveLowCardinality(column.type);
            if (type.get() == column.type.get())
                type = nullptr;

            // simple aggregate function
            AggregatingSortedAlgorithm::SimpleAggregateDescription desc(simple->getFunction(), i, type, column.type);
            if (desc.function->allocatesMemoryInArena())
                def.allocates_memory_in_arena = true;
126

127 128 129 130 131 132 133
            def.columns_to_simple_aggregate.emplace_back(std::move(desc));
        }
        else
        {
            // standard aggregate function
            def.columns_to_aggregate.emplace_back(i);
        }
134
    }
135 136

    return def;
137 138
}

139
static MutableColumns getMergedColumns(const Block & header, const AggregatingSortedAlgorithm::ColumnsDefinition & def)
140
{
141 142 143
    MutableColumns columns;
    columns.resize(header.columns());

A
Alexey Milovidov 已提交
144
    for (const auto & desc : def.columns_to_simple_aggregate)
145
    {
A
Alexey Milovidov 已提交
146
        const auto & type = desc.nested_type ? desc.nested_type
N
Nikolai Kochetov 已提交
147 148
                                       : desc.real_type;
        columns[desc.column_number] = type->createColumn();
149 150 151 152 153 154 155
    }

    for (size_t i = 0; i < columns.size(); ++i)
        if (!columns[i])
            columns[i] = header.getByPosition(i).type->createColumn();

    return columns;
156 157
}

N
Nikolai Kochetov 已提交
158 159
/// Remove constants and LowCardinality for SimpleAggregateFunction
static void preprocessChunk(Chunk & chunk, const AggregatingSortedAlgorithm::ColumnsDefinition & def)
160 161 162 163 164 165 166
{
    auto num_rows = chunk.getNumRows();
    auto columns = chunk.detachColumns();

    for (auto & column : columns)
        column = column->convertToFullColumnIfConst();

A
Alexey Milovidov 已提交
167
    for (const auto & desc : def.columns_to_simple_aggregate)
168
        if (desc.nested_type)
169 170 171 172 173
            columns[desc.column_number] = recursiveRemoveLowCardinality(columns[desc.column_number]);

    chunk.setColumns(std::move(columns), num_rows);
}

N
Nikolai Kochetov 已提交
174 175 176 177
/// Return back LowCardinality for SimpleAggregateFunction
static void postprocessChunk(Chunk & chunk, const AggregatingSortedAlgorithm::ColumnsDefinition & def)
{
    size_t num_rows = chunk.getNumRows();
N
Nikolai Kochetov 已提交
178
    auto columns = chunk.detachColumns();
N
Nikolai Kochetov 已提交
179

A
Alexey Milovidov 已提交
180
    for (const auto & desc : def.columns_to_simple_aggregate)
N
Nikolai Kochetov 已提交
181 182 183
    {
        if (desc.nested_type)
        {
A
Alexey Milovidov 已提交
184 185
            const auto & from_type = desc.nested_type;
            const auto & to_type = desc.real_type;
N
Nikolai Kochetov 已提交
186
            columns[desc.column_number] = recursiveTypeConversion(columns[desc.column_number], from_type, to_type);
N
Nikolai Kochetov 已提交
187 188 189
        }
    }

N
Nikolai Kochetov 已提交
190
    chunk.setColumns(std::move(columns), num_rows);
N
Nikolai Kochetov 已提交
191 192
}

193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256

AggregatingSortedAlgorithm::AggregatingMergedData::AggregatingMergedData(
    MutableColumns columns_, UInt64 max_block_size_, ColumnsDefinition & def_)
    : MergedData(std::move(columns_), false, max_block_size_), def(def_)
{
        initAggregateDescription();
}

void AggregatingSortedAlgorithm::AggregatingMergedData::startGroup(const ColumnRawPtrs & raw_columns, size_t row)
{
    /// We will write the data for the group. We copy the values of ordinary columns.
    for (auto column_number : def.column_numbers_not_to_aggregate)
        columns[column_number]->insertFrom(*raw_columns[column_number], row);

    /// Add the empty aggregation state to the aggregate columns. The state will be updated in the `addRow` function.
    for (auto & column_to_aggregate : def.columns_to_aggregate)
        column_to_aggregate.column->insertDefault();

    /// Reset simple aggregation states for next row
    for (auto & desc : def.columns_to_simple_aggregate)
        desc.createState();

    if (def.allocates_memory_in_arena)
        arena = std::make_unique<Arena>();

    is_group_started = true;
}

void AggregatingSortedAlgorithm::AggregatingMergedData::finishGroup()
{
    /// Write the simple aggregation result for the current group.
    for (auto & desc : def.columns_to_simple_aggregate)
    {
        desc.function->insertResultInto(desc.state.data(), *desc.column);
        desc.destroyState();
    }

    is_group_started = false;
    ++total_merged_rows;
    ++merged_rows;
    /// TODO: sum_blocks_granularity += block_size;
}

void AggregatingSortedAlgorithm::AggregatingMergedData::addRow(SortCursor & cursor)
{
    if (!is_group_started)
        throw Exception("Can't add a row to the group because it was not started.", ErrorCodes::LOGICAL_ERROR);

    for (auto & desc : def.columns_to_aggregate)
        desc.column->insertMergeFrom(*cursor->all_columns[desc.column_number], cursor->pos);

    for (auto & desc : def.columns_to_simple_aggregate)
    {
        auto & col = cursor->all_columns[desc.column_number];
        desc.add_function(desc.function.get(), desc.state.data(), &col, cursor->pos, arena.get());
    }
}

Chunk AggregatingSortedAlgorithm::AggregatingMergedData::pull()
{
    if (is_group_started)
        throw Exception("Can't pull chunk because group was not finished.", ErrorCodes::LOGICAL_ERROR);

    auto chunk = MergedData::pull();
N
Nikolai Kochetov 已提交
257
    postprocessChunk(chunk, def);
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282

    initAggregateDescription();

    return chunk;
}

void AggregatingSortedAlgorithm::AggregatingMergedData::initAggregateDescription()
{
    for (auto & desc : def.columns_to_simple_aggregate)
        desc.column = columns[desc.column_number].get();

    for (auto & desc : def.columns_to_aggregate)
        desc.column = typeid_cast<ColumnAggregateFunction *>(columns[desc.column_number].get());
}


AggregatingSortedAlgorithm::AggregatingSortedAlgorithm(
    const Block & header, size_t num_inputs,
    SortDescription description_, size_t max_block_size)
    : IMergingAlgorithmWithDelayedChunk(num_inputs, std::move(description_))
    , columns_definition(defineColumns(header, description_))
    , merged_data(getMergedColumns(header, columns_definition), max_block_size, columns_definition)
{
}

283
void AggregatingSortedAlgorithm::initialize(Chunks chunks)
284
{
285 286
    for (auto & chunk : chunks)
        if (chunk)
N
Nikolai Kochetov 已提交
287
            preprocessChunk(chunk, columns_definition);
288

289 290
    initializeQueue(std::move(chunks));
}
291

N
Nikolai Kochetov 已提交
292
void AggregatingSortedAlgorithm::consume(Chunk & chunk, size_t source_num)
293
{
N
Nikolai Kochetov 已提交
294
    preprocessChunk(chunk, columns_definition);
N
Nikolai Kochetov 已提交
295
    updateCursor(chunk, source_num);
296 297
}

298
IMergingAlgorithm::Status AggregatingSortedAlgorithm::merge()
299 300 301 302 303 304 305
{
    /// We take the rows in the correct order and put them in `merged_block`, while the rows are no more than `max_block_size`
    while (queue.isValid())
    {
        bool key_differs;
        SortCursor current = queue.current();

N
Nikolai Kochetov 已提交
306 307 308 309
        {
            detail::RowRef current_key;
            current_key.set(current);

310
            key_differs = last_key.empty() || !last_key.hasEqualSortColumnsWith(current_key);
N
Nikolai Kochetov 已提交
311 312 313 314

            last_key = current_key;
            last_chunk_sort_columns.clear();
        }
315 316 317

        if (key_differs)
        {
N
Nikolai Kochetov 已提交
318
            if (merged_data.isGroupStarted())
319
                merged_data.finishGroup();
N
Nikolai Kochetov 已提交
320

321 322
            /// if there are enough rows accumulated and the last one is calculated completely
            if (merged_data.hasEnoughRows())
N
Nikolai Kochetov 已提交
323 324
            {
                last_key.reset();
N
Nikolai Kochetov 已提交
325
                return Status(merged_data.pull());
N
Nikolai Kochetov 已提交
326
            }
327

328
            merged_data.startGroup(current->all_columns, current->pos);
329 330
        }

331
        merged_data.addRow(current);
332 333 334 335 336 337 338 339 340

        if (!current->isLast())
        {
            queue.next();
        }
        else
        {
            /// We get the next block from the corresponding source, if there is one.
            queue.removeTop();
341
            return Status(current.impl->order);
342 343 344 345
        }
    }

    /// Write the simple aggregation result for the previous group.
N
Nikolai Kochetov 已提交
346
    if (merged_data.isGroupStarted())
347
        merged_data.finishGroup();
348 349

    last_chunk_sort_columns.clear();
350
    return Status(merged_data.pull(), true);
351 352 353
}

}