MergedBlockOutputStream.cpp 6.7 KB
Newer Older
1
#include <Storages/MergeTree/MergedBlockOutputStream.h>
2
#include <Interpreters/Context.h>
A
alexey-milovidov 已提交
3 4
#include <Poco/File.h>

5 6 7 8

namespace DB
{

9 10
namespace ErrorCodes
{
A
Alexey Milovidov 已提交
11 12
    extern const int NOT_IMPLEMENTED;
    extern const int LOGICAL_ERROR;
13 14
}

15 16

MergedBlockOutputStream::MergedBlockOutputStream(
C
CurtizJ 已提交
17
    const MergeTreeDataPartPtr & data_part,
18
    const StorageMetadataPtr & metadata_snapshot_,
19
    const NamesAndTypesList & columns_list_,
20
    const MergeTreeIndices & skip_indices,
C
CurtizJ 已提交
21 22
    CompressionCodecPtr default_codec,
    bool blocks_are_granules_size)
23
    : MergedBlockOutputStream(
24 25 26 27 28 29
        data_part,
        metadata_snapshot_,
        columns_list_,
        skip_indices,
        default_codec,
        {},
30 31
        data_part->storage.global_context.getSettings().min_bytes_to_use_direct_io,
        blocks_are_granules_size)
C
CurtizJ 已提交
32
{
33 34 35
}

MergedBlockOutputStream::MergedBlockOutputStream(
C
CurtizJ 已提交
36
    const MergeTreeDataPartPtr & data_part,
37
    const StorageMetadataPtr & metadata_snapshot_,
38
    const NamesAndTypesList & columns_list_,
39
    const MergeTreeIndices & skip_indices,
C
CurtizJ 已提交
40
    CompressionCodecPtr default_codec,
C
CurtizJ 已提交
41 42
    const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size,
    size_t aio_threshold,
C
CurtizJ 已提交
43
    bool blocks_are_granules_size)
44
    : IMergedBlockOutputStream(data_part, metadata_snapshot_)
N
fix  
Nikita Vasilev 已提交
45
    , columns_list(columns_list_)
46
{
C
CurtizJ 已提交
47
    MergeTreeWriterSettings writer_settings(data_part->storage.global_context.getSettings(),
48
        data_part->storage.canUseAdaptiveGranularity(), aio_threshold, blocks_are_granules_size);
C
CurtizJ 已提交
49 50 51

    if (aio_threshold > 0 && !merged_column_to_size.empty())
    {
52
        for (const auto & column : columns_list)
C
CurtizJ 已提交
53
        {
54 55 56
            auto size_it = merged_column_to_size.find(column.name);
            if (size_it != merged_column_to_size.end())
                writer_settings.estimated_size += size_it->second;
C
CurtizJ 已提交
57 58 59
        }
    }

60
    volume->getDisk()->createDirectories(part_path);
61

A
alesapin 已提交
62
    writer = data_part->getWriter(columns_list, metadata_snapshot, skip_indices, default_codec, writer_settings);
63 64
    writer->initPrimaryIndex();
    writer->initSkipIndices();
65 66
}

67
/// If data is pre-sorted.
68 69
void MergedBlockOutputStream::write(const Block & block)
{
70
    writeImpl(block, nullptr);
71 72
}

F
f1yegor 已提交
73
/** If the data is not sorted, but we pre-calculated the permutation, after which they will be sorted.
74 75
    * This method is used to save RAM, since you do not need to keep two blocks at once - the source and the sorted.
    */
76 77
void MergedBlockOutputStream::writeWithPermutation(const Block & block, const IColumn::Permutation * permutation)
{
78
    writeImpl(block, permutation);
79 80 81 82
}

void MergedBlockOutputStream::writeSuffix()
{
83
    throw Exception("Method writeSuffix is not supported by MergedBlockOutputStream", ErrorCodes::NOT_IMPLEMENTED);
84 85
}

86 87
void MergedBlockOutputStream::writeSuffixAndFinalizePart(
        MergeTreeData::MutableDataPartPtr & new_part,
A
Alexey Milovidov 已提交
88
        const NamesAndTypesList * total_columns_list,
89
        MergeTreeData::DataPart::Checksums * additional_column_checksums)
90
{
C
CurtizJ 已提交
91 92 93
    /// Finish write and get checksums.
    MergeTreeData::DataPart::Checksums checksums;

C
CurtizJ 已提交
94 95 96
    if (additional_column_checksums)
        checksums = std::move(*additional_column_checksums);

97
    /// Finish columns serialization.
C
CurtizJ 已提交
98 99
    writer->finishDataSerialization(checksums);
    writer->finishPrimaryIndexSerialization(checksums);
C
CurtizJ 已提交
100
    writer->finishSkipIndicesSerialization(checksums);
101

102
    NamesAndTypesList part_columns;
A
Alexey Milovidov 已提交
103
    if (!total_columns_list)
104 105 106
        part_columns = columns_list;
    else
        part_columns = *total_columns_list;
107

C
CurtizJ 已提交
108
    if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING || isCompactPart(new_part))
109
    {
110
        new_part->partition.store(storage, volume->getDisk(), part_path, checksums);
111
        if (new_part->minmax_idx.initialized)
112
            new_part->minmax_idx.store(storage, volume->getDisk(), part_path, checksums);
113
        else if (rows_count)
114
            throw Exception("MinMax index was not initialized for new non-empty part " + new_part->name
115
                + ". It is a bug.", ErrorCodes::LOGICAL_ERROR);
116

117
        auto count_out = volume->getDisk()->writeFile(part_path + "count.txt", 4096);
118
        HashingWriteBuffer count_out_hashing(*count_out);
119 120 121 122
        writeIntText(rows_count, count_out_hashing);
        count_out_hashing.next();
        checksums.files["count.txt"].file_size = count_out_hashing.count();
        checksums.files["count.txt"].file_hash = count_out_hashing.getHash();
123 124
    }

V
Vladimir Chebotarev 已提交
125
    if (!new_part->ttl_infos.empty())
A
Anton Popov 已提交
126 127
    {
        /// Write a file with ttl infos in json format.
128
        auto out = volume->getDisk()->writeFile(part_path + "ttl.txt", 4096);
129
        HashingWriteBuffer out_hashing(*out);
A
Anton Popov 已提交
130 131 132 133
        new_part->ttl_infos.write(out_hashing);
        checksums.files["ttl.txt"].file_size = out_hashing.count();
        checksums.files["ttl.txt"].file_hash = out_hashing.getHash();
    }
134

135 136
    removeEmptyColumnsFromPart(new_part, part_columns, checksums);

137 138
    {
        /// Write a file with a description of columns.
139
        auto out = volume->getDisk()->writeFile(part_path + "columns.txt", 4096);
140
        part_columns.writeText(*out);
141 142 143 144
    }

    {
        /// Write file with checksums.
145
        auto out = volume->getDisk()->writeFile(part_path + "checksums.txt", 4096);
146
        checksums.write(*out);
147 148
    }

149
    new_part->setColumns(part_columns);
150
    new_part->rows_count = rows_count;
151
    new_part->modification_time = time(nullptr);
C
CurtizJ 已提交
152
    new_part->index = writer->releaseIndexColumns();
153
    new_part->checksums = checksums;
A
alesapin 已提交
154
    new_part->setBytesOnDisk(checksums.getTotalSizeOnDisk());
C
CurtizJ 已提交
155
    new_part->index_granularity = writer->getIndexGranularity();
A
alesapin 已提交
156
    new_part->calculateColumnsSizesOnDisk();
157 158 159 160
}

void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Permutation * permutation)
{
161 162
    block.checkNumberOfRows();
    size_t rows = block.rows();
A
alesapin 已提交
163 164
    if (!rows)
        return;
A
alesapin 已提交
165

C
CurtizJ 已提交
166
    std::unordered_set<String> skip_indexes_column_names_set;
167
    for (const auto & index : metadata_snapshot->getSecondaryIndices())
A
alesapin 已提交
168
        std::copy(index.column_names.cbegin(), index.column_names.cend(),
N
fix  
Nikita Vasilev 已提交
169 170
                std::inserter(skip_indexes_column_names_set, skip_indexes_column_names_set.end()));
    Names skip_indexes_column_names(skip_indexes_column_names_set.begin(), skip_indexes_column_names_set.end());
171

A
alesapin 已提交
172
    Block primary_key_block = getBlockAndPermute(block, metadata_snapshot->getPrimaryKeyColumns(), permutation);
C
CurtizJ 已提交
173
    Block skip_indexes_block = getBlockAndPermute(block, skip_indexes_column_names, permutation);
N
Nikita Vasilev 已提交
174

C
CurtizJ 已提交
175
    writer->write(block, permutation, primary_key_block, skip_indexes_block);
C
CurtizJ 已提交
176
    writer->calculateAndSerializeSkipIndices(skip_indexes_block, rows);
C
CurtizJ 已提交
177
    writer->calculateAndSerializePrimaryIndex(primary_key_block, rows);
C
CurtizJ 已提交
178
    writer->next();
179

180
    rows_count += rows;
181 182 183
}

}