MergeTreeSequentialSource.cpp 4.7 KB
Newer Older
1
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
A
alesapin 已提交
2
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
3
#include <Interpreters/Context.h>
A
alesapin 已提交
4 5 6 7 8 9 10 11

namespace DB
{
namespace ErrorCodes
{
    extern const int MEMORY_LIMIT_EXCEEDED;
}

12
MergeTreeSequentialSource::MergeTreeSequentialSource(
A
alesapin 已提交
13
    const MergeTreeData & storage_,
14
    const StorageMetadataPtr & metadata_snapshot_,
15
    MergeTreeData::DataPartPtr data_part_,
A
alesapin 已提交
16 17
    Names columns_to_read_,
    bool read_with_direct_io_,
A
alesapin 已提交
18
    bool take_column_types_from_storage,
A
alesapin 已提交
19
    bool quiet)
20
    : SourceWithProgress(metadata_snapshot_->getSampleBlockForColumns(columns_to_read_, storage_.getVirtuals()))
21
    , storage(storage_)
22
    , metadata_snapshot(metadata_snapshot_)
23 24
    , data_part(std::move(data_part_))
    , columns_to_read(std::move(columns_to_read_))
A
alesapin 已提交
25
    , read_with_direct_io(read_with_direct_io_)
26
    , mark_cache(storage.global_context.getMarkCache())
A
alesapin 已提交
27 28
{
    if (!quiet)
A
Alexey Milovidov 已提交
29
    {
A
Alexey Milovidov 已提交
30 31
        /// Print column name but don't pollute logs in case of many columns.
        if (columns_to_read.size() == 1)
A
Alexey Milovidov 已提交
32
            LOG_TRACE(log, "Reading {} marks from part {}, total {} rows starting from the beginning of the part, column {}",
A
Alexey Milovidov 已提交
33 34
                data_part->getMarksCount(), data_part->name, data_part->rows_count, columns_to_read.front());
        else
A
Alexey Milovidov 已提交
35
            LOG_TRACE(log, "Reading {} marks from part {}, total {} rows starting from the beginning of the part",
A
Alexey Milovidov 已提交
36
                data_part->getMarksCount(), data_part->name, data_part->rows_count);
A
Alexey Milovidov 已提交
37
    }
A
alesapin 已提交
38 39 40

    addTotalRowsApprox(data_part->rows_count);

A
alesapin 已提交
41
    /// Add columns because we don't want to read empty blocks
42
    injectRequiredColumns(storage, metadata_snapshot, data_part, columns_to_read);
A
alesapin 已提交
43
    NamesAndTypesList columns_for_reader;
A
alesapin 已提交
44 45
    if (take_column_types_from_storage)
    {
46
        const NamesAndTypesList & physical_columns = metadata_snapshot->getColumns().getAllPhysical();
A
alesapin 已提交
47
        columns_for_reader = physical_columns.addTypes(columns_to_read);
A
alesapin 已提交
48
    }
A
alesapin 已提交
49 50 51
    else
    {
        /// take columns from data_part
C
CurtizJ 已提交
52
        columns_for_reader = data_part->getColumns().addTypes(columns_to_read);
A
alesapin 已提交
53
    }
A
alesapin 已提交
54

C
CurtizJ 已提交
55
    MergeTreeReaderSettings reader_settings =
C
CurtizJ 已提交
56
    {
C
CurtizJ 已提交
57
        /// bytes to use AIO (this is hack)
C
CurtizJ 已提交
58 59 60 61 62
        .min_bytes_to_use_direct_io = read_with_direct_io ? 1UL : std::numeric_limits<size_t>::max(),
        .max_read_buffer_size = DBMS_DEFAULT_BUFFER_SIZE,
        .save_marks_in_cache = false
    };

63
    reader = data_part->getReader(columns_for_reader, metadata_snapshot,
A
alesapin 已提交
64
        MarkRanges{MarkRange(0, data_part->getMarksCount())},
C
CurtizJ 已提交
65
        /* uncompressed_cache = */ nullptr, mark_cache.get(), reader_settings);
A
alesapin 已提交
66 67
}

68
Chunk MergeTreeSequentialSource::generate()
A
alesapin 已提交
69 70
try
{
A
Alexey Milovidov 已提交
71
    const auto & header = getPort().getHeader();
72

A
alesapin 已提交
73 74
    if (!isCancelled() && current_row < data_part->rows_count)
    {
A
alesapin 已提交
75
        size_t rows_to_read = data_part->index_granularity.getMarkRows(current_mark);
A
alesapin 已提交
76 77
        bool continue_reading = (current_mark != 0);

A
Alexey Milovidov 已提交
78
        const auto & sample = reader->getColumns();
79
        Columns columns(sample.size());
80
        size_t rows_read = reader->readRows(current_mark, continue_reading, rows_to_read, columns);
A
alesapin 已提交
81

82
        if (rows_read)
83
        {
84 85
            current_row += rows_read;
            current_mark += (rows_to_read == rows_read);
A
alesapin 已提交
86

87
            bool should_evaluate_missing_defaults = false;
88
            reader->fillMissingColumns(columns, should_evaluate_missing_defaults, rows_read);
A
alesapin 已提交
89

A
alesapin 已提交
90
            if (should_evaluate_missing_defaults)
A
alesapin 已提交
91
            {
N
Nikolai Kochetov 已提交
92
                reader->evaluateMissingDefaults({}, columns);
A
alesapin 已提交
93
            }
A
alesapin 已提交
94

A
alesapin 已提交
95 96
            reader->performRequiredConversions(columns);

97 98
            /// Reorder columns and fill result block.
            size_t num_columns = sample.size();
99 100 101
            Columns res_columns;
            res_columns.reserve(num_columns);

102 103 104
            auto it = sample.begin();
            for (size_t i = 0; i < num_columns; ++i)
            {
105 106
                if (header.has(it->name))
                    res_columns.emplace_back(std::move(columns[i]));
107

108 109 110
                ++it;
            }

111
            return Chunk(std::move(res_columns), rows_read);
A
alesapin 已提交
112
        }
A
alesapin 已提交
113 114 115 116 117 118
    }
    else
    {
        finish();
    }

119
    return {};
A
alesapin 已提交
120 121 122 123 124 125 126 127 128
}
catch (...)
{
    /// Suspicion of the broken part. A part is added to the queue for verification.
    if (getCurrentExceptionCode() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
        storage.reportBrokenPart(data_part->name);
    throw;
}

129
void MergeTreeSequentialSource::finish()
A
alesapin 已提交
130 131 132 133 134 135 136 137 138
{
    /** Close the files (before destroying the object).
     * When many sources are created, but simultaneously reading only a few of them,
     * buffers don't waste memory.
     */
    reader.reset();
    data_part.reset();
}

139
MergeTreeSequentialSource::~MergeTreeSequentialSource() = default;
A
alesapin 已提交
140 141

}