MergedBlockOutputStream.cpp 6.5 KB
Newer Older
1
#include <Storages/MergeTree/MergedBlockOutputStream.h>
2
#include <Interpreters/Context.h>
A
alexey-milovidov 已提交
3
#include <Poco/File.h>
4
#include <Parsers/queryToString.h>
A
alexey-milovidov 已提交
5

6 7 8 9

namespace DB
{

10 11
namespace ErrorCodes
{
A
Alexey Milovidov 已提交
12 13
    extern const int NOT_IMPLEMENTED;
    extern const int LOGICAL_ERROR;
14 15
}

16 17

MergedBlockOutputStream::MergedBlockOutputStream(
C
CurtizJ 已提交
18
    const MergeTreeDataPartPtr & data_part,
19
    const StorageMetadataPtr & metadata_snapshot_,
20
    const NamesAndTypesList & columns_list_,
21
    const MergeTreeIndices & skip_indices,
22
    CompressionCodecPtr default_codec_,
C
CurtizJ 已提交
23
    bool blocks_are_granules_size)
24
    : IMergedBlockOutputStream(data_part, metadata_snapshot_)
N
fix  
Nikita Vasilev 已提交
25
    , columns_list(columns_list_)
26
    , default_codec(default_codec_)
27
{
A
Anton Popov 已提交
28 29
    MergeTreeWriterSettings writer_settings(
        storage.global_context.getSettings(),
30
        storage.getSettings(),
31
        data_part->index_granularity_info.is_adaptive,
A
alesapin 已提交
32
        /* rewrite_primary_key = */ true,
A
Anton Popov 已提交
33
        blocks_are_granules_size);
C
CurtizJ 已提交
34

A
Anton Popov 已提交
35
    if (!part_path.empty())
36
        volume->getDisk()->createDirectories(part_path);
37

A
alesapin 已提交
38
    writer = data_part->getWriter(columns_list, metadata_snapshot, skip_indices, default_codec, writer_settings);
39 40
}

41
/// If data is pre-sorted.
42 43
void MergedBlockOutputStream::write(const Block & block)
{
44
    writeImpl(block, nullptr);
45 46
}

F
f1yegor 已提交
47
/** If the data is not sorted, but we pre-calculated the permutation, after which they will be sorted.
48 49
    * This method is used to save RAM, since you do not need to keep two blocks at once - the source and the sorted.
    */
50 51
void MergedBlockOutputStream::writeWithPermutation(const Block & block, const IColumn::Permutation * permutation)
{
52
    writeImpl(block, permutation);
53 54 55 56
}

void MergedBlockOutputStream::writeSuffix()
{
57
    throw Exception("Method writeSuffix is not supported by MergedBlockOutputStream", ErrorCodes::NOT_IMPLEMENTED);
58 59
}

60 61
void MergedBlockOutputStream::writeSuffixAndFinalizePart(
        MergeTreeData::MutableDataPartPtr & new_part,
62
        bool sync,
A
Alexey Milovidov 已提交
63
        const NamesAndTypesList * total_columns_list,
64
        MergeTreeData::DataPart::Checksums * additional_column_checksums)
65
{
C
CurtizJ 已提交
66 67 68
    /// Finish write and get checksums.
    MergeTreeData::DataPart::Checksums checksums;

C
CurtizJ 已提交
69 70 71
    if (additional_column_checksums)
        checksums = std::move(*additional_column_checksums);

72
    /// Finish columns serialization.
A
alesapin 已提交
73
    writer->finish(checksums, sync);
74

75
    NamesAndTypesList part_columns;
A
Alexey Milovidov 已提交
76
    if (!total_columns_list)
77 78 79
        part_columns = columns_list;
    else
        part_columns = *total_columns_list;
80

A
Anton Popov 已提交
81
    if (new_part->isStoredOnDisk())
A
Anton Popov 已提交
82
        finalizePartOnDisk(new_part, part_columns, checksums, sync);
A
Anton Popov 已提交
83 84 85 86 87 88 89 90

    new_part->setColumns(part_columns);
    new_part->rows_count = rows_count;
    new_part->modification_time = time(nullptr);
    new_part->index = writer->releaseIndexColumns();
    new_part->checksums = checksums;
    new_part->setBytesOnDisk(checksums.getTotalSizeOnDisk());
    new_part->index_granularity = writer->getIndexGranularity();
A
Anton Popov 已提交
91
    new_part->calculateColumnsSizesOnDisk();
A
alesapin 已提交
92 93
    if (default_codec != nullptr)
        new_part->default_codec = default_codec;
A
Anton Ivashkin 已提交
94
    new_part->storage.lockSharedData(*new_part);
A
Anton Popov 已提交
95 96 97 98 99
}

void MergedBlockOutputStream::finalizePartOnDisk(
    const MergeTreeData::MutableDataPartPtr & new_part,
    NamesAndTypesList & part_columns,
A
Anton Popov 已提交
100 101
    MergeTreeData::DataPart::Checksums & checksums,
    bool sync)
A
Anton Popov 已提交
102
{
103
    if (new_part->uuid != UUIDHelpers::Nil)
104 105
    {
        auto out = volume->getDisk()->writeFile(part_path + IMergeTreeDataPart::UUID_FILE_NAME, 4096);
106 107 108 109
        HashingWriteBuffer out_hashing(*out);
        writeUUIDText(new_part->uuid, out_hashing);
        checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_size = out_hashing.count();
        checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_hash = out_hashing.getHash();
110 111 112 113 114
        out->finalize();
        if (sync)
            out->sync();
    }

C
CurtizJ 已提交
115
    if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING || isCompactPart(new_part))
116
    {
117
        new_part->partition.store(storage, volume->getDisk(), part_path, checksums);
118
        if (new_part->minmax_idx.initialized)
119
            new_part->minmax_idx.store(storage, volume->getDisk(), part_path, checksums);
120
        else if (rows_count)
121
            throw Exception("MinMax index was not initialized for new non-empty part " + new_part->name
122
                + ". It is a bug.", ErrorCodes::LOGICAL_ERROR);
123

124
        auto count_out = volume->getDisk()->writeFile(part_path + "count.txt", 4096);
125
        HashingWriteBuffer count_out_hashing(*count_out);
126 127 128 129
        writeIntText(rows_count, count_out_hashing);
        count_out_hashing.next();
        checksums.files["count.txt"].file_size = count_out_hashing.count();
        checksums.files["count.txt"].file_hash = count_out_hashing.getHash();
130
        count_out->finalize();
A
Anton Popov 已提交
131 132
        if (sync)
            count_out->sync();
133 134
    }

V
Vladimir Chebotarev 已提交
135
    if (!new_part->ttl_infos.empty())
A
Anton Popov 已提交
136 137
    {
        /// Write a file with ttl infos in json format.
138
        auto out = volume->getDisk()->writeFile(part_path + "ttl.txt", 4096);
139
        HashingWriteBuffer out_hashing(*out);
A
Anton Popov 已提交
140 141 142
        new_part->ttl_infos.write(out_hashing);
        checksums.files["ttl.txt"].file_size = out_hashing.count();
        checksums.files["ttl.txt"].file_hash = out_hashing.getHash();
143
        out->finalize();
A
Anton Popov 已提交
144 145
        if (sync)
            out->sync();
A
Anton Popov 已提交
146
    }
147

148 149
    removeEmptyColumnsFromPart(new_part, part_columns, checksums);

150 151
    {
        /// Write a file with a description of columns.
152
        auto out = volume->getDisk()->writeFile(part_path + "columns.txt", 4096);
153
        part_columns.writeText(*out);
154
        out->finalize();
A
Anton Popov 已提交
155 156
        if (sync)
            out->sync();
157 158
    }

A
alesapin 已提交
159
    if (default_codec != nullptr)
160 161 162
    {
        auto out = volume->getDisk()->writeFile(part_path + IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096);
        DB::writeText(queryToString(default_codec->getFullCodecDesc()), *out);
163
        out->finalize();
164
    }
A
alesapin 已提交
165 166 167 168 169
    else
    {
        throw Exception("Compression codec have to be specified for part on disk, empty for" + new_part->name
                + ". It is a bug.", ErrorCodes::LOGICAL_ERROR);
    }
170

171 172
    {
        /// Write file with checksums.
173
        auto out = volume->getDisk()->writeFile(part_path + "checksums.txt", 4096);
174
        checksums.write(*out);
175
        out->finalize();
A
Anton Popov 已提交
176 177
        if (sync)
            out->sync();
178
    }
179 180 181 182
}

void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Permutation * permutation)
{
183 184
    block.checkNumberOfRows();
    size_t rows = block.rows();
A
alesapin 已提交
185 186
    if (!rows)
        return;
A
alesapin 已提交
187

A
alesapin 已提交
188
    writer->write(block, permutation);
189
    rows_count += rows;
190 191 192
}

}