SummingSortedAlgorithm.h 3.2 KB
Newer Older
N
Nikolai Kochetov 已提交
1 2
#pragma once

3 4
#include <Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.h>
#include <Processors/Merges/Algorithms/MergedData.h>
N
Nikolai Kochetov 已提交
5 6 7 8 9
#include <Core/Row.h>

namespace DB
{

N
Nikolai Kochetov 已提交
10 11 12 13 14 15
/** Merges several sorted inputs into one.
  * For each group of consecutive identical values of the primary key (the columns by which the data is sorted),
  *  collapses them into one row, summing all the numeric columns except the primary key.
  * If in all numeric columns, except for the primary key, the result is zero, it deletes the row.
  */
class SummingSortedAlgorithm final : public IMergingAlgorithmWithDelayedChunk
N
Nikolai Kochetov 已提交
16 17 18 19 20 21 22 23 24 25
{
public:
    SummingSortedAlgorithm(
        const Block & header, size_t num_inputs,
        SortDescription description_,
        /// List of columns to be summed. If empty, all numeric columns that are not in the description are taken.
        const Names & column_names_to_sum,
        size_t max_block_size);

    void initialize(Chunks chunks) override;
N
Nikolai Kochetov 已提交
26
    void consume(Chunk & chunk, size_t source_num) override;
N
Nikolai Kochetov 已提交
27 28 29
    Status merge() override;

    struct AggregateDescription;
30
    struct MapDescription;
N
Nikolai Kochetov 已提交
31 32 33 34 35 36 37

    /// This structure define columns into one of three types:
    /// * columns which values not needed to be aggregated
    /// * aggregate functions and columns which needed to be summed
    /// * mapping for nested columns
    struct ColumnsDefinition
    {
38 39 40 41
        ColumnsDefinition(); /// Is needed because destructor is defined.
        ColumnsDefinition(ColumnsDefinition &&) noexcept; /// Is needed because destructor is defined.
        ~ColumnsDefinition(); /// Is needed because otherwise std::vector's destructor uses incomplete types.

N
Nikolai Kochetov 已提交
42 43 44 45 46 47 48
        /// Columns with which values should not be aggregated.
        ColumnNumbers column_numbers_not_to_aggregate;
        /// Columns which should be aggregated.
        std::vector<AggregateDescription> columns_to_aggregate;
        /// Mapping for nested columns.
        std::vector<MapDescription> maps_to_sum;

49 50
        /// Names of columns from header.
        Names column_names;
N
Nikolai Kochetov 已提交
51 52 53 54 55 56 57
    };

    /// Specialization for SummingSortedTransform. Inserts only data for non-aggregated columns.
    class SummingMergedData : public MergedData
    {
    private:
        using MergedData::pull;
58
        using MergedData::insertRow;
N
Nikolai Kochetov 已提交
59 60

    public:
61
        SummingMergedData(MutableColumns columns_, UInt64 max_block_size_, ColumnsDefinition & def_);
N
Nikolai Kochetov 已提交
62

63 64 65 66 67
        void startGroup(ColumnRawPtrs & raw_columns, size_t row);
        void finishGroup();

        bool isGroupStarted() const { return is_group_started; }
        void addRow(ColumnRawPtrs & raw_columns, size_t row); /// Possible only when group was started.
N
Nikolai Kochetov 已提交
68

69
        Chunk pull();
N
Nikolai Kochetov 已提交
70 71 72

    private:
        ColumnsDefinition & def;
N
Nikolai Kochetov 已提交
73

74
        bool is_group_started = false;
N
Nikolai Kochetov 已提交
75

76 77
        Row current_row;
        bool current_row_is_zero = true;    /// Are all summed columns zero (or empty)? It is updated incrementally.
N
Nikolai Kochetov 已提交
78

79
        void addRowImpl(ColumnRawPtrs & raw_columns, size_t row);
N
Nikolai Kochetov 已提交
80

81 82
        /// Initialize aggregate descriptions with columns.
        void initAggregateDescription();
N
Nikolai Kochetov 已提交
83
    };
84 85 86 87 88

private:
    /// Order between members is important because merged_data has reference to columns_definition.
    ColumnsDefinition columns_definition;
    SummingMergedData merged_data;
N
Nikolai Kochetov 已提交
89 90 91
};

}