MergeJoin.h 5.3 KB
Newer Older
C
chertus 已提交
1 2
#pragma once

C
chertus 已提交
3
#include <shared_mutex>
C
chertus 已提交
4

5
#include <Common/LRUCache.h>
C
chertus 已提交
6
#include <Core/Block.h>
C
chertus 已提交
7
#include <Core/SortDescription.h>
C
chertus 已提交
8
#include <Interpreters/IJoin.h>
9
#include <Interpreters/SortedBlocksWriter.h>
10
#include <DataStreams/SizeLimits.h>
C
chertus 已提交
11 12 13 14

namespace DB
{

15
class TableJoin;
C
chertus 已提交
16 17
class MergeJoinCursor;
struct MergeJoinEqualRange;
18
class RowBitmaps;
C
chertus 已提交
19

20

C
chertus 已提交
21 22 23
class MergeJoin : public IJoin
{
public:
24
    MergeJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block);
C
chertus 已提交
25

26
    bool addJoinedBlock(const Block & block, bool check_limits) override;
C
chertus 已提交
27
    void joinBlock(Block &, ExtraBlockPtr & not_processed) override;
C
chertus 已提交
28
    void joinTotals(Block &) const override;
C
chertus 已提交
29
    void setTotals(const Block &) override;
C
chertus 已提交
30
    bool hasTotals() const override { return totals; }
31 32
    size_t getTotalRowCount() const override { return right_blocks.row_count; }
    size_t getTotalByteCount() const override { return right_blocks.bytes; }
C
chertus 已提交
33

34 35
    BlockInputStreamPtr createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const override;

C
chertus 已提交
36
private:
37 38
    friend class NonMergeJoinedBlockInputStream;

C
chertus 已提交
39 40
    struct NotProcessed : public ExtraBlock
    {
C
chertus 已提交
41 42 43
        size_t left_position;
        size_t right_position;
        size_t right_block;
C
chertus 已提交
44 45
    };

46 47 48 49 50 51 52 53 54 55 56 57 58
    struct RightBlockInfo
    {
        std::shared_ptr<Block> block;
        size_t block_number;
        size_t & skip;
        RowBitmaps * bitmaps;
        std::unique_ptr<std::vector<bool>> used_bitmap;

        RightBlockInfo(std::shared_ptr<Block> block_, size_t block_number, size_t & skip_, RowBitmaps * bitmaps);
        ~RightBlockInfo(); /// apply used bitmap
        void setUsed(size_t start, size_t length);
    };

59
    /// There're two size limits for right-hand table: max_rows_in_join, max_bytes_in_join.
A
Alexey Milovidov 已提交
60
    /// max_bytes is preferred. If it isn't set we approximate it as (max_rows * bytes/row).
61 62 63 64 65 66 67
    struct BlockByteWeight
    {
        size_t operator()(const Block & block) const { return block.bytes(); }
    };

    using Cache = LRUCache<size_t, Block, std::hash<size_t>, BlockByteWeight>;

C
chertus 已提交
68
    mutable std::shared_mutex rwlock;
69
    std::shared_ptr<TableJoin> table_join;
70
    SizeLimits size_limits;
C
chertus 已提交
71
    SortDescription left_sort_description;
C
chertus 已提交
72 73 74
    SortDescription right_sort_description;
    SortDescription left_merge_description;
    SortDescription right_merge_description;
75
    Block right_sample_block;
C
chertus 已提交
76
    Block right_table_keys;
C
chertus 已提交
77
    Block right_columns_to_add;
78
    SortedBlocksWriter::Blocks right_blocks;
79 80

    /// Each block stores first and last row from corresponding sorted block on disk
81
    Blocks min_max_right_blocks;
82
    std::shared_ptr<SortedBlocksBuffer> left_blocks_buffer;
83 84
    std::shared_ptr<RowBitmaps> used_rows_bitmap;
    mutable std::unique_ptr<Cache> cached_right_blocks;
85
    std::vector<std::shared_ptr<Block>> loaded_right_blocks;
86
    std::unique_ptr<SortedBlocksWriter> disk_writer;
87
    /// Set of files with sorted blocks
88
    SortedBlocksWriter::SortedFiles flushed_right_blocks;
C
chertus 已提交
89
    Block totals;
90
    std::atomic<bool> is_in_memory{true};
91
    const bool nullable_right_side;
92
    const bool nullable_left_side;
93
    const bool is_any_join;
94
    const bool is_all_join;
95
    const bool is_semi_join;
96 97
    const bool is_inner;
    const bool is_left;
98 99
    const bool is_right;
    const bool is_full;
100
    static constexpr const bool skip_not_intersected = true; /// skip index for right blocks
C
chertus 已提交
101
    const size_t max_joined_block_rows;
102
    const size_t max_rows_in_right_block;
103
    const size_t max_files_to_merge;
C
chertus 已提交
104

A
Alexey Milovidov 已提交
105
    void changeLeftColumns(Block & block, MutableColumns && columns) const;
106
    void addRightColumns(Block & block, MutableColumns && columns);
C
chertus 已提交
107 108 109

    template <bool is_all>
    ExtraBlockPtr extraBlock(Block & processed, MutableColumns && left_columns, MutableColumns && right_columns,
C
chertus 已提交
110
                             size_t left_position, size_t right_position, size_t right_block_number);
C
chertus 已提交
111

C
chertus 已提交
112
    void mergeRightBlocks();
113 114

    template <bool in_memory>
115
    size_t rightBlocksCount() const;
116
    template <bool in_memory, bool is_all>
C
chertus 已提交
117
    void joinSortedBlock(Block & block, ExtraBlockPtr & not_processed);
118
    template <bool in_memory>
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
    std::shared_ptr<Block> loadRightBlock(size_t pos) const;

    std::shared_ptr<Block> getRightBlock(size_t pos) const
    {
        if (is_in_memory)
            return loadRightBlock<true>(pos);
        return loadRightBlock<false>(pos);
    }

    size_t getRightBlocksCount() const
    {
        if (is_in_memory)
            return rightBlocksCount<true>();
        return rightBlocksCount<false>();
    }
134

135
    template <bool is_all> /// ALL or ANY
136 137 138
    bool leftJoin(MergeJoinCursor & left_cursor, const Block & left_block, RightBlockInfo & right_block_info,
                  MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail);
    bool semiLeftJoin(MergeJoinCursor & left_cursor, const Block & left_block, const RightBlockInfo & right_block_info,
139
                  MutableColumns & left_columns, MutableColumns & right_columns);
140 141
    bool allInnerJoin(MergeJoinCursor & left_cursor, const Block & left_block, RightBlockInfo & right_block_info,
                  MutableColumns & left_columns, MutableColumns & right_columns, size_t & left_key_tail);
142

143
    Block modifyRightBlock(const Block & src_block) const;
144 145 146 147
    bool saveRightBlock(Block && block);

    void mergeInMemoryRightBlocks();
    void mergeFlushedRightBlocks();
148 149

    void initRightTableWriter();
C
chertus 已提交
150 151 152
};

}