MergeTreeDataPartWriterWide.cpp 12.6 KB
Newer Older
C
CurtizJ 已提交
1 2 3 4 5
#include <Storages/MergeTree/MergeTreeDataPartWriterWide.h>

namespace DB
{

C
CurtizJ 已提交
6 7 8 9 10 11 12 13 14
namespace
{
    constexpr auto DATA_FILE_EXTENSION = ".bin";
}

MergeTreeDataPartWriterWide::MergeTreeDataPartWriterWide(
    const String & part_path_,
    const MergeTreeData & storage_,
    const NamesAndTypesList & columns_list_,
C
CurtizJ 已提交
15
    const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,
C
CurtizJ 已提交
16 17
    const String & marks_file_extension_,
    const CompressionCodecPtr & default_codec_,
C
CurtizJ 已提交
18
    const MergeTreeWriterSettings & settings_,
C
CurtizJ 已提交
19
    const MergeTreeIndexGranularity & index_granularity_)
C
CurtizJ 已提交
20
    : IMergeTreeDataPartWriter(part_path_,
C
CurtizJ 已提交
21
        storage_, columns_list_, indices_to_recalc_,
C
CurtizJ 已提交
22
        marks_file_extension_, default_codec_, settings_, index_granularity_, false)
C
CurtizJ 已提交
23 24 25
{
    const auto & columns = storage.getColumns();
    for (const auto & it : columns_list)
C
CurtizJ 已提交
26
        addStreams(it.name, *it.type, columns.getCodecOrDefault(it.name, default_codec), settings.estimated_size);
C
CurtizJ 已提交
27 28 29 30 31 32
}

void MergeTreeDataPartWriterWide::addStreams(
    const String & name,
    const IDataType & type,
    const CompressionCodecPtr & effective_codec,
C
CurtizJ 已提交
33
    size_t estimated_size)
C
CurtizJ 已提交
34 35 36
{
    IDataType::StreamCallback callback = [&] (const IDataType::SubstreamPath & substream_path)
    {
37
        if (settings.skip_offsets && !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes)
C
CurtizJ 已提交
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
            return;

        String stream_name = IDataType::getFileNameForStream(name, substream_path);
        /// Shared offsets for Nested type.
        if (column_streams.count(stream_name))
            return;

        column_streams[stream_name] = std::make_unique<ColumnStream>(
            stream_name,
            part_path + stream_name, DATA_FILE_EXTENSION,
            part_path + stream_name, marks_file_extension,
            effective_codec,
            settings.max_compress_block_size,
            estimated_size,
            settings.aio_threshold);
    };

    IDataType::SubstreamPath stream_path;
    type.enumerateStreams(callback, stream_path);
}


IDataType::OutputStreamGetter MergeTreeDataPartWriterWide::createStreamGetter(
C
CurtizJ 已提交
61
        const String & name, WrittenOffsetColumns & offset_columns)
C
CurtizJ 已提交
62
{
C
CurtizJ 已提交
63
    return [&, this] (const IDataType::SubstreamPath & substream_path) -> WriteBuffer *
C
CurtizJ 已提交
64 65
    {
        bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
66
        if (is_offsets && settings.skip_offsets)
C
CurtizJ 已提交
67 68 69 70 71 72 73 74 75 76 77 78
            return nullptr;

        String stream_name = IDataType::getFileNameForStream(name, substream_path);

        /// Don't write offsets more than one time for Nested type.
        if (is_offsets && offset_columns.count(stream_name))
            return nullptr;

        return &column_streams[stream_name]->compressed;
    };
}

C
CurtizJ 已提交
79
void MergeTreeDataPartWriterWide::write(const Block & block,
C
CurtizJ 已提交
80 81
    const IColumn::Permutation * permutation,
    const Block & primary_key_block, const Block & skip_indexes_block)
C
CurtizJ 已提交
82
{
C
CurtizJ 已提交
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
    // if (serialization_states.empty())
    // {
    //     serialization_states.reserve(columns_list.size());
    //     WrittenOffsetColumns tmp_offset_columns;
    //     IDataType::SerializeBinaryBulkSettings serialize_settings;

    //     for (const auto & col : columns_list)
    //     {
    //         serialize_settings.getter = createStreamGetter(col.name, tmp_offset_columns, false);
    //         serialization_states.emplace_back(nullptr);
    //         col.type->serializeBinaryBulkStatePrefix(serialize_settings, serialization_states.back());
    //     }
    // }

    /// Fill index granularity for this block
    /// if it's unknown (in case of insert data or horizontal merge,
    /// but not in case of vertical merge)
    if (compute_granularity)
        fillIndexGranularity(block);
C
CurtizJ 已提交
102

C
CurtizJ 已提交
103
    auto offset_columns = written_offset_columns ? *written_offset_columns : WrittenOffsetColumns{};
C
CurtizJ 已提交
104 105 106 107 108 109 110 111 112 113 114

    auto it = columns_list.begin();
    for (size_t i = 0; i < columns_list.size(); ++i, ++it)
    {
        const ColumnWithTypeAndName & column = block.getByName(it->name);

        if (permutation)
        {
            if (primary_key_block.has(it->name))
            {
                const auto & primary_column = *primary_key_block.getByName(it->name).column;
C
CurtizJ 已提交
115
                writeColumn(column.name, *column.type, primary_column, offset_columns);
C
CurtizJ 已提交
116 117 118 119
            }
            else if (skip_indexes_block.has(it->name))
            {
                const auto & index_column = *skip_indexes_block.getByName(it->name).column;
C
CurtizJ 已提交
120
                writeColumn(column.name, *column.type, index_column, offset_columns);
C
CurtizJ 已提交
121 122 123 124 125
            }
            else
            {
                /// We rearrange the columns that are not included in the primary key here; Then the result is released - to save RAM.
                ColumnPtr permuted_column = column.column->permute(*permutation, 0);
C
CurtizJ 已提交
126
                writeColumn(column.name, *column.type, *permuted_column, offset_columns);
C
CurtizJ 已提交
127 128 129 130
            }
        }
        else
        {
C
CurtizJ 已提交
131
            writeColumn(column.name, *column.type, *column.column, offset_columns);
C
CurtizJ 已提交
132 133 134 135 136 137 138 139 140 141 142 143 144 145
        }
    }
}

void MergeTreeDataPartWriterWide::writeSingleMark(
    const String & name,
    const IDataType & type,
    WrittenOffsetColumns & offset_columns,
    size_t number_of_rows,
    DB::IDataType::SubstreamPath & path)
{
     type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
     {
         bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
146
         if (is_offsets && settings.skip_offsets)
C
CurtizJ 已提交
147 148 149 150 151 152 153 154 155 156 157
             return;

         String stream_name = IDataType::getFileNameForStream(name, substream_path);

         /// Don't write offsets more than one time for Nested type.
         if (is_offsets && offset_columns.count(stream_name))
             return;

         ColumnStream & stream = *column_streams[stream_name];

         /// There could already be enough data to compress into the new block.
C
CurtizJ 已提交
158
         if (stream.compressed.offset() >= settings.min_compress_block_size)
C
CurtizJ 已提交
159 160 161 162
             stream.compressed.next();

         writeIntBinary(stream.plain_hashing.count(), stream.marks);
         writeIntBinary(stream.compressed.offset(), stream.marks);
C
CurtizJ 已提交
163
         if (settings.can_use_adaptive_granularity)
C
CurtizJ 已提交
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
             writeIntBinary(number_of_rows, stream.marks);
     }, path);
}

size_t MergeTreeDataPartWriterWide::writeSingleGranule(
    const String & name,
    const IDataType & type,
    const IColumn & column,
    WrittenOffsetColumns & offset_columns,
    IDataType::SerializeBinaryBulkStatePtr & serialization_state,
    IDataType::SerializeBinaryBulkSettings & serialize_settings,
    size_t from_row,
    size_t number_of_rows,
    bool write_marks)
{
    if (write_marks)
C
CurtizJ 已提交
180
        writeSingleMark(name, type, offset_columns, number_of_rows, serialize_settings.path);
C
CurtizJ 已提交
181 182 183 184 185 186 187

    type.serializeBinaryBulkWithMultipleStreams(column, from_row, number_of_rows, serialize_settings, serialization_state);

    /// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one.
    type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
    {
        bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
188
        if (is_offsets && settings.skip_offsets)
C
CurtizJ 已提交
189 190 191 192 193 194 195 196 197 198 199 200 201 202
            return;

        String stream_name = IDataType::getFileNameForStream(name, substream_path);

        /// Don't write offsets more than one time for Nested type.
        if (is_offsets && offset_columns.count(stream_name))
            return;

        column_streams[stream_name]->compressed.nextIfAtEnd();
    }, serialize_settings.path);

    return from_row + number_of_rows;
}

C
CurtizJ 已提交
203 204
/// Column must not be empty. (column.size() !== 0)
void MergeTreeDataPartWriterWide::writeColumn(
C
CurtizJ 已提交
205 206 207
    const String & name,
    const IDataType & type,
    const IColumn & column,
C
CurtizJ 已提交
208
    WrittenOffsetColumns & offset_columns)
C
CurtizJ 已提交
209
{
C
CurtizJ 已提交
210 211 212 213
    auto [it, inserted] = serialization_states.emplace(name, nullptr);
    if (inserted)
    {
        IDataType::SerializeBinaryBulkSettings serialize_settings;
C
CurtizJ 已提交
214
        serialize_settings.getter = createStreamGetter(name, offset_columns);
C
CurtizJ 已提交
215 216 217
        type.serializeBinaryBulkStatePrefix(serialize_settings, it->second);
    }

C
CurtizJ 已提交
218
    const auto & global_settings = storage.global_context.getSettingsRef();
C
CurtizJ 已提交
219
    IDataType::SerializeBinaryBulkSettings serialize_settings;
C
CurtizJ 已提交
220
    serialize_settings.getter = createStreamGetter(name, offset_columns);
C
CurtizJ 已提交
221 222
    serialize_settings.low_cardinality_max_dictionary_size = global_settings.low_cardinality_max_dictionary_size;
    serialize_settings.low_cardinality_use_single_dictionary_for_part = global_settings.low_cardinality_use_single_dictionary_for_part != 0;
C
CurtizJ 已提交
223 224 225

    size_t total_rows = column.size();
    size_t current_row = 0;
C
CurtizJ 已提交
226
    size_t current_column_mark = current_mark;
C
CurtizJ 已提交
227 228 229
    while (current_row < total_rows)
    {
        size_t rows_to_write;
C
CurtizJ 已提交
230
        bool write_marks = true;
C
CurtizJ 已提交
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247

        /// If there is `index_offset`, then the first mark goes not immediately, but after this number of rows.
        if (current_row == 0 && index_offset != 0)
        {
            write_marks = false;
            rows_to_write = index_offset;
        }
        else
        {
            if (index_granularity.getMarksCount() <= current_column_mark)
                throw Exception(
                    "Incorrect size of index granularity expect mark " + toString(current_column_mark) + " totally have marks " + toString(index_granularity.getMarksCount()),
                    ErrorCodes::LOGICAL_ERROR);

            rows_to_write = index_granularity.getMarkRows(current_column_mark);
        }

C
CurtizJ 已提交
248 249 250
        if (rows_to_write != 0)
            data_written = true;

C
CurtizJ 已提交
251 252 253 254 255
        current_row = writeSingleGranule(
            name,
            type,
            column,
            offset_columns,
C
CurtizJ 已提交
256
            it->second,
C
CurtizJ 已提交
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
            serialize_settings,
            current_row,
            rows_to_write,
            write_marks
        );

        if (write_marks)
            current_column_mark++;
    }

    type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
    {
        bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
        if (is_offsets)
        {
            String stream_name = IDataType::getFileNameForStream(name, substream_path);
            offset_columns.insert(stream_name);
        }
    }, serialize_settings.path);

C
CurtizJ 已提交
277 278
    next_mark = current_column_mark;
    next_index_offset = current_row - total_rows;
C
CurtizJ 已提交
279 280
}

C
CurtizJ 已提交
281
void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync)
C
CurtizJ 已提交
282
{
C
CurtizJ 已提交
283
    const auto & global_settings = storage.global_context.getSettingsRef();
C
CurtizJ 已提交
284
    IDataType::SerializeBinaryBulkSettings serialize_settings;
C
CurtizJ 已提交
285 286
    serialize_settings.low_cardinality_max_dictionary_size = global_settings.low_cardinality_max_dictionary_size;
    serialize_settings.low_cardinality_use_single_dictionary_for_part = global_settings.low_cardinality_use_single_dictionary_for_part != 0;
C
CurtizJ 已提交
287 288
    WrittenOffsetColumns offset_columns;

C
CurtizJ 已提交
289 290
    bool write_final_mark = (with_final_mark && data_written);

C
CurtizJ 已提交
291 292 293 294 295 296
    {
        auto it = columns_list.begin();
        for (size_t i = 0; i < columns_list.size(); ++i, ++it)
        {
            if (!serialization_states.empty())
            {
C
CurtizJ 已提交
297
                serialize_settings.getter = createStreamGetter(it->name, written_offset_columns ? *written_offset_columns : offset_columns);
C
CurtizJ 已提交
298
                it->type->serializeBinaryBulkStateSuffix(serialize_settings, serialization_states[it->name]);
C
CurtizJ 已提交
299 300 301
            }

            if (write_final_mark)
C
CurtizJ 已提交
302
            {
C
CurtizJ 已提交
303
                writeFinalMark(it->name, it->type, offset_columns, serialize_settings.path);
C
CurtizJ 已提交
304
            }
C
CurtizJ 已提交
305 306 307 308 309 310
        }
    }

    for (ColumnStreams::iterator it = column_streams.begin(); it != column_streams.end(); ++it)
    {
        it->second->finalize();
C
CurtizJ 已提交
311 312
        if (sync)
            it->second->sync();
C
CurtizJ 已提交
313 314 315 316
        it->second->addToChecksums(checksums);
    }

    column_streams.clear();
C
CurtizJ 已提交
317
    serialization_states.clear();
C
CurtizJ 已提交
318 319 320 321 322 323 324 325
}

void MergeTreeDataPartWriterWide::writeFinalMark(
    const std::string & column_name,
    const DataTypePtr column_type,
    WrittenOffsetColumns & offset_columns,
    DB::IDataType::SubstreamPath & path)
{
C
CurtizJ 已提交
326
    writeSingleMark(column_name, *column_type, offset_columns, 0, path);
C
CurtizJ 已提交
327 328 329 330 331 332 333 334 335 336 337 338
    /// Memoize information about offsets
    column_type->enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
    {
        bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
        if (is_offsets)
        {
            String stream_name = IDataType::getFileNameForStream(column_name, substream_path);
            offset_columns.insert(stream_name);
        }
    }, path);
}

C
CurtizJ 已提交
339
}