CollapsingSortedAlgorithm.h 2.9 KB
Newer Older
1
#pragma once
2 3
#include <Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.h>
#include <Processors/Merges/Algorithms/MergedData.h>
4 5 6 7 8 9 10 11 12 13
#include <DataStreams/ColumnGathererStream.h>

namespace Poco
{
    class Logger;
}

namespace DB
{

N
Nikolai Kochetov 已提交
14 15 16 17 18 19 20 21 22 23 24 25 26
/** Merges several sorted inputs to one.
  * For each group of consecutive identical values of the primary key (the columns by which the data is sorted),
  *  keeps no more than one row with the value of the column `sign_column = -1` ("negative row")
  *  and no more than a row with the value of the column `sign_column = 1` ("positive row").
  * That is, it collapses the records from the change log.
  *
  * If the number of positive and negative rows is the same, and the last row is positive, then the first negative and last positive rows are written.
  * If the number of positive and negative rows is the same, and the last line is negative, it writes nothing.
  * If the positive by 1 is greater than the negative rows, then only the last positive row is written.
  * If negative by 1 is greater than positive rows, then only the first negative row is written.
  * Otherwise, a logical error.
  */
class CollapsingSortedAlgorithm final : public IMergingAlgorithmWithSharedChunks
27 28 29 30 31 32 33
{
public:
    CollapsingSortedAlgorithm(
        const Block & header,
        size_t num_inputs,
        SortDescription description_,
        const String & sign_column,
N
Nikolai Kochetov 已提交
34
        bool only_positive_sign_, /// For select final. Skip rows with sum(sign) < 0.
35 36 37 38 39 40 41 42 43 44 45
        size_t max_block_size,
        WriteBuffer * out_row_sources_buf_,
        bool use_average_block_sizes,
        Logger * log_);

    Status merge() override;

private:
    MergedData merged_data;

    const size_t sign_column_number;
N
Nikolai Kochetov 已提交
46
    const bool only_positive_sign;
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73

    static constexpr size_t max_row_refs = 4; /// first_negative, last_positive, last, current.
    RowRef first_negative_row;
    RowRef last_positive_row;
    RowRef last_row;

    size_t count_positive = 0;    /// The number of positive rows for the current primary key.
    size_t count_negative = 0;    /// The number of negative rows for the current primary key.
    bool last_is_positive = false;  /// true if the last row for the current primary key is positive.

    /// Fields specific for VERTICAL merge algorithm.
    /// Row numbers are relative to the start of current primary key.
    size_t current_pos = 0;                        /// Current row number
    size_t first_negative_pos = 0;                 /// Row number of first_negative
    size_t last_positive_pos = 0;                  /// Row number of last_positive
    PODArray<RowSourcePart> current_row_sources;   /// Sources of rows with the current primary key

    size_t count_incorrect_data = 0;    /// To prevent too many error messages from writing to the log.
    Logger * log;

    void reportIncorrectData();
    void insertRow(RowRef & row);
    void insertRows();
};

}