MergeTreeSequentialSource.cpp 4.4 KB
Newer Older
1
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
A
alesapin 已提交
2
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
A
alesapin 已提交
3 4 5 6 7 8 9 10

namespace DB
{
namespace ErrorCodes
{
    extern const int MEMORY_LIMIT_EXCEEDED;
}

11
MergeTreeSequentialSource::MergeTreeSequentialSource(
A
alesapin 已提交
12
    const MergeTreeData & storage_,
13
    MergeTreeData::DataPartPtr data_part_,
A
alesapin 已提交
14 15
    Names columns_to_read_,
    bool read_with_direct_io_,
A
alesapin 已提交
16
    bool take_column_types_from_storage,
A
alesapin 已提交
17
    bool quiet)
N
Nikolai Kochetov 已提交
18
    : SourceWithProgress(storage_.getSampleBlockForColumns(columns_to_read_))
19 20 21
    , storage(storage_)
    , data_part(std::move(data_part_))
    , columns_to_read(std::move(columns_to_read_))
A
alesapin 已提交
22
    , read_with_direct_io(read_with_direct_io_)
23
    , mark_cache(storage.global_context.getMarkCache())
A
alesapin 已提交
24 25
{
    if (!quiet)
A
Alexey Milovidov 已提交
26 27
    {
        std::stringstream message;
A
alesapin 已提交
28
        message << "Reading " << data_part->getMarksCount() << " marks from part " << data_part->name
A
Alexey Milovidov 已提交
29
            << ", total " << data_part->rows_count
N
done  
Nikita Mikhaylov 已提交
30
            << " rows starting from the beginning of the part";
31 32
        if (columns_to_read.size() == 1)    /// Print column name but don't pollute logs in case of many columns.
            message << ", column " << columns_to_read.front();
A
Alexey Milovidov 已提交
33 34 35

        LOG_TRACE(log, message.rdbuf());
    }
A
alesapin 已提交
36 37 38

    addTotalRowsApprox(data_part->rows_count);

A
alesapin 已提交
39 40 41
    /// Add columns because we don't want to read empty blocks
    injectRequiredColumns(storage, data_part, columns_to_read);
    NamesAndTypesList columns_for_reader;
A
alesapin 已提交
42 43 44
    if (take_column_types_from_storage)
    {
        const NamesAndTypesList & physical_columns = storage.getColumns().getAllPhysical();
A
alesapin 已提交
45
        columns_for_reader = physical_columns.addTypes(columns_to_read);
A
alesapin 已提交
46
    }
A
alesapin 已提交
47 48 49
    else
    {
        /// take columns from data_part
C
CurtizJ 已提交
50
        columns_for_reader = data_part->getColumns().addTypes(columns_to_read);
A
alesapin 已提交
51
    }
A
alesapin 已提交
52

C
CurtizJ 已提交
53
    MergeTreeReaderSettings reader_settings =
C
CurtizJ 已提交
54
    {
C
CurtizJ 已提交
55
        /// bytes to use AIO (this is hack)
C
CurtizJ 已提交
56 57 58 59 60
        .min_bytes_to_use_direct_io = read_with_direct_io ? 1UL : std::numeric_limits<size_t>::max(),
        .max_read_buffer_size = DBMS_DEFAULT_BUFFER_SIZE,
        .save_marks_in_cache = false
    };

C
CurtizJ 已提交
61
    reader = data_part->getReader(columns_for_reader,
A
alesapin 已提交
62
        MarkRanges{MarkRange(0, data_part->getMarksCount())},
C
CurtizJ 已提交
63
        /* uncompressed_cache = */ nullptr, mark_cache.get(), reader_settings);
A
alesapin 已提交
64 65
}

66
Chunk MergeTreeSequentialSource::generate()
A
alesapin 已提交
67 68
try
{
A
Alexey Milovidov 已提交
69
    const auto & header = getPort().getHeader();
70

A
alesapin 已提交
71 72
    if (!isCancelled() && current_row < data_part->rows_count)
    {
A
alesapin 已提交
73
        size_t rows_to_read = data_part->index_granularity.getMarkRows(current_mark);
A
alesapin 已提交
74 75
        bool continue_reading = (current_mark != 0);

A
Alexey Milovidov 已提交
76
        const auto & sample = reader->getColumns();
77
        Columns columns(sample.size());
78
        size_t rows_read = reader->readRows(current_mark, continue_reading, rows_to_read, columns);
A
alesapin 已提交
79

80
        if (rows_read)
81
        {
82 83
            current_row += rows_read;
            current_mark += (rows_to_read == rows_read);
A
alesapin 已提交
84

85
            bool should_evaluate_missing_defaults = false;
86
            reader->fillMissingColumns(columns, should_evaluate_missing_defaults, rows_read);
A
alesapin 已提交
87

A
alesapin 已提交
88
            if (should_evaluate_missing_defaults)
A
alesapin 已提交
89
            {
N
Nikolai Kochetov 已提交
90
                reader->evaluateMissingDefaults({}, columns);
A
alesapin 已提交
91
            }
A
alesapin 已提交
92

A
alesapin 已提交
93 94
            reader->performRequiredConversions(columns);

95 96
            /// Reorder columns and fill result block.
            size_t num_columns = sample.size();
97 98 99
            Columns res_columns;
            res_columns.reserve(num_columns);

100 101 102
            auto it = sample.begin();
            for (size_t i = 0; i < num_columns; ++i)
            {
103 104
                if (header.has(it->name))
                    res_columns.emplace_back(std::move(columns[i]));
105

106 107 108
                ++it;
            }

109
            return Chunk(std::move(res_columns), rows_read);
A
alesapin 已提交
110
        }
A
alesapin 已提交
111 112 113 114 115 116
    }
    else
    {
        finish();
    }

117
    return {};
A
alesapin 已提交
118 119 120 121 122 123 124 125 126
}
catch (...)
{
    /// Suspicion of the broken part. A part is added to the queue for verification.
    if (getCurrentExceptionCode() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
        storage.reportBrokenPart(data_part->name);
    throw;
}

127
void MergeTreeSequentialSource::finish()
A
alesapin 已提交
128 129 130 131 132 133 134 135 136
{
    /** Close the files (before destroying the object).
     * When many sources are created, but simultaneously reading only a few of them,
     * buffers don't waste memory.
     */
    reader.reset();
    data_part.reset();
}

137
MergeTreeSequentialSource::~MergeTreeSequentialSource() = default;
A
alesapin 已提交
138 139

}