MergeTreeData.h 36.3 KB
Newer Older
M
Merge  
Michael Kolupaev 已提交
1 2
#pragma once

3
#include <Common/SimpleIncrement.h>
4 5
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
6
#include <Storages/IStorage.h>
N
indices  
Nikita Vasilev 已提交
7
#include <Storages/MergeTree/MergeTreeIndices.h>
8
#include <Storages/MergeTree/MergeTreePartInfo.h>
9
#include <Storages/MergeTree/MergeTreeSettings.h>
10
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
11 12 13 14 15 16 17
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFile.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataStreams/GraphiteRollupSortedBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeDataPart.h>
N
Nikita Vasilev 已提交
18
#include <Storages/IndicesDescription.h>
19
#include <Storages/MergeTree/DiskSpaceMonitor.h>
M
Merge  
Michael Kolupaev 已提交
20

21 22 23 24
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/global_fun.hpp>
#include <boost/range/iterator_range_core.hpp>
25

M
Merge  
Michael Kolupaev 已提交
26

M
Merge  
Michael Kolupaev 已提交
27 28 29
namespace DB
{

30 31
class AlterCommands;

32 33
namespace ErrorCodes
{
34 35 36 37 38 39 40 41
    extern const int LOGICAL_ERROR;
    extern const int INVALID_PARTITION_NAME;
    extern const int NO_SUCH_DATA_PART;
    extern const int DUPLICATE_DATA_PART;
    extern const int DIRECTORY_ALREADY_EXISTS;
    extern const int TOO_MANY_UNEXPECTED_DATA_PARTS;
    extern const int NO_SUCH_COLUMN_IN_TABLE;
    extern const int TABLE_DIFFERS_TOO_MUCH;
42 43
}

44

45 46 47 48 49 50 51 52 53 54 55 56 57
/// Data structure for *MergeTree engines.
/// Merge tree is used for incremental sorting of data.
/// The table consists of several sorted parts.
/// During insertion new data is sorted according to the primary key and is written to the new part.
/// Parts are merged in the background according to a heuristic algorithm.
/// For each part the index file is created containing primary key values for every n-th row.
/// This allows efficient selection by primary key range predicate.
///
/// Additionally:
///
/// The date column is specified. For each part min and max dates are remembered.
/// Essentially it is an index too.
///
58 59
/// Data is partitioned by the value of the partitioning expression.
/// Parts belonging to different partitions are not merged - for the ease of administration (data sync and backup).
60
///
61
/// File structure of old-style month-partitioned tables (format_version = 0):
62 63 64 65 66 67 68 69
/// Part directory - / min-date _ max-date _ min-id _ max-id _ level /
/// Inside the part directory:
/// checksums.txt - contains the list of all files along with their sizes and checksums.
/// columns.txt - contains the list of all columns and their types.
/// primary.idx - contains the primary index.
/// [Column].bin - contains compressed column data.
/// [Column].mrk - marks, pointing to seek positions allowing to skip n * k rows.
///
70 71 72 73
/// File structure of tables with custom partitioning (format_version >= 1):
/// Part directory - / partiiton-id _ min-id _ max-id _ level /
/// Inside the part directory:
/// The same files as for month-partitioned tables, plus
74 75
/// count.txt - contains total number of rows in this part.
/// partition.dat - contains the value of the partitioning expression.
76 77
/// minmax_[Column].idx - MinMax indexes (see MergeTreeDataPart::MinMaxIndex class) for the columns required by the partitioning expression.
///
78 79 80 81 82 83 84 85 86 87 88 89 90 91
/// Several modes are implemented. Modes determine additional actions during merge:
/// - Ordinary - don't do anything special
/// - Collapsing - collapse pairs of rows with the opposite values of sign_columns for the same values
///   of primary key (cf. CollapsingSortedBlockInputStream.h)
/// - Replacing - for all rows with the same primary key keep only the latest one. Or, if the version
///   column is set, keep the latest row with the maximal version.
/// - Summing - sum all numeric columns not contained in the primary key for all rows with the same primary key.
/// - Aggregating - merge columns containing aggregate function states for all rows with the same primary key.
/// - Graphite - performs coarsening of historical data for Graphite (a system for quantitative monitoring).

/// The MergeTreeData class contains a list of parts and the data structure parameters.
/// To read and modify the data use other classes:
/// - MergeTreeDataSelectExecutor
/// - MergeTreeDataWriter
92
/// - MergeTreeDataMergerMutator
93

94
class MergeTreeData : public IStorage
M
Merge  
Michael Kolupaev 已提交
95 96
{
public:
97 98 99 100 101
    /// Function to call if the part is suspected to contain corrupt data.
    using BrokenPartCallback = std::function<void (const String &)>;
    using DataPart = MergeTreeDataPart;

    using MutableDataPartPtr = std::shared_ptr<DataPart>;
102
    using MutableDataPartsVector = std::vector<MutableDataPartPtr>;
103 104
    /// After the DataPart is added to the working set, it cannot be changed.
    using DataPartPtr = std::shared_ptr<const DataPart>;
105

106
    using DataPartState = MergeTreeDataPart::State;
107
    using DataPartStates = std::initializer_list<DataPartState>;
108
    using DataPartStateVector = std::vector<DataPartState>;
109

110 111 112 113 114
    /// Auxiliary structure for index comparison. Keep in mind lifetime of MergeTreePartInfo.
    struct DataPartStateAndInfo
    {
        DataPartState state;
        const MergeTreePartInfo & info;
115
    };
116

117 118 119 120 121
    /// Auxiliary structure for index comparison
    struct DataPartStateAndPartitionID
    {
        DataPartState state;
        String partition_id;
122 123
    };

124
    STRONG_TYPEDEF(String, PartitionID)
125

126
    struct LessDataPart
127 128 129 130 131 132
    {
        using is_transparent = void;

        bool operator()(const DataPartPtr & lhs, const MergeTreePartInfo & rhs) const { return lhs->info < rhs; }
        bool operator()(const MergeTreePartInfo & lhs, const DataPartPtr & rhs) const { return lhs < rhs->info; }
        bool operator()(const DataPartPtr & lhs, const DataPartPtr & rhs) const { return lhs->info < rhs->info; }
133 134
        bool operator()(const MergeTreePartInfo & lhs, const PartitionID & rhs) const { return lhs.partition_id < rhs.toUnderType(); }
        bool operator()(const PartitionID & lhs, const MergeTreePartInfo & rhs) const { return lhs.toUnderType() < rhs.partition_id; }
135 136
    };

137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
    struct LessStateDataPart
    {
        using is_transparent = void;

        bool operator() (const DataPartStateAndInfo & lhs, const DataPartStateAndInfo & rhs) const
        {
            return std::forward_as_tuple(static_cast<UInt8>(lhs.state), lhs.info)
                   < std::forward_as_tuple(static_cast<UInt8>(rhs.state), rhs.info);
        }

        bool operator() (DataPartStateAndInfo info, const DataPartState & state) const
        {
            return static_cast<size_t>(info.state) < static_cast<size_t>(state);
        }

        bool operator() (const DataPartState & state, DataPartStateAndInfo info) const
        {
            return static_cast<size_t>(state) < static_cast<size_t>(info.state);
        }
156 157 158 159 160 161 162 163 164 165 166 167

        bool operator() (const DataPartStateAndInfo & lhs, const DataPartStateAndPartitionID & rhs) const
        {
            return std::forward_as_tuple(static_cast<UInt8>(lhs.state), lhs.info.partition_id)
                   < std::forward_as_tuple(static_cast<UInt8>(rhs.state), rhs.partition_id);
        }

        bool operator() (const DataPartStateAndPartitionID & lhs, const DataPartStateAndInfo & rhs) const
        {
            return std::forward_as_tuple(static_cast<UInt8>(lhs.state), lhs.partition_id)
                   < std::forward_as_tuple(static_cast<UInt8>(rhs.state), rhs.info.partition_id);
        }
168 169 170
    };

    using DataParts = std::set<DataPartPtr, LessDataPart>;
171 172
    using DataPartsVector = std::vector<DataPartPtr>;

173 174 175
    using DataPartsLock = std::unique_lock<std::mutex>;
    DataPartsLock lockParts() const { return DataPartsLock(data_parts_mutex); }

176 177 178 179
    /// Auxiliary object to add a set of parts into the working set in two steps:
    /// * First, as PreCommitted parts (the parts are ready, but not yet in the active set).
    /// * Next, if commit() is called, the parts are added to the active set and the parts that are
    ///   covered by them are marked Outdated.
180 181 182 183
    /// If neither commit() nor rollback() was called, the destructor rollbacks the operation.
    class Transaction : private boost::noncopyable
    {
    public:
184
        Transaction(MergeTreeData & data_) : data(data_) {}
185

186
        DataPartsVector commit(MergeTreeData::DataPartsLock * acquired_parts_lock = nullptr);
187

188
        void rollback();
189

190 191
        size_t size() const { return precommitted_parts.size(); }
        bool isEmpty() const { return precommitted_parts.empty(); }
192

193 194 195 196 197 198
        ~Transaction()
        {
            try
            {
                rollback();
            }
199
            catch (...)
200 201 202 203
            {
                tryLogCurrentException("~MergeTreeData::Transaction");
            }
        }
204

205 206 207
    private:
        friend class MergeTreeData;

208
        MergeTreeData & data;
209
        DataParts precommitted_parts;
210

211
        void clear() { precommitted_parts.clear(); }
212 213 214 215 216 217 218 219 220 221 222 223 224
    };

    /// An object that stores the names of temporary files created in the part directory during ALTER of its
    /// columns.
    class AlterDataPartTransaction : private boost::noncopyable
    {
    public:
        /// Renames temporary files, finishing the ALTER of the part.
        void commit();

        /// If commit() was not called, deletes temporary files, canceling the ALTER.
        ~AlterDataPartTransaction();

225 226
        const String & getPartName() const { return data_part->name; }

227
        /// Review the changes before the commit.
228
        const NamesAndTypesList & getNewColumns() const { return new_columns; }
229 230 231 232 233 234 235 236 237 238 239 240 241 242
        const DataPart::Checksums & getNewChecksums() const { return new_checksums; }

    private:
        friend class MergeTreeData;

        AlterDataPartTransaction(DataPartPtr data_part_) : data_part(data_part_), alter_lock(data_part->alter_mutex) {}

        void clear()
        {
            alter_lock.unlock();
            data_part = nullptr;
        }

        DataPartPtr data_part;
243
        DataPartsLock alter_lock;
244 245

        DataPart::Checksums new_checksums;
246
        NamesAndTypesList new_columns;
247 248 249 250 251 252 253 254 255 256 257 258 259
        /// If the value is an empty string, the file is not temporary, and it must be deleted.
        NameToNameMap rename_map;
    };

    using AlterDataPartTransactionPtr = std::unique_ptr<AlterDataPartTransaction>;


    /// Parameters for various modes.
    struct MergingParams
    {
        /// Merging mode. See above.
        enum Mode
        {
260 261 262 263 264 265 266
            Ordinary            = 0,    /// Enum values are saved. Do not change them.
            Collapsing          = 1,
            Summing             = 2,
            Aggregating         = 3,
            Replacing           = 5,
            Graphite            = 6,
            VersionedCollapsing = 7,
267 268 269 270
        };

        Mode mode;

271
        /// For Collapsing and VersionedCollapsing mode.
272 273 274 275 276
        String sign_column;

        /// For Summing mode. If empty - columns_to_sum is determined automatically.
        Names columns_to_sum;

277
        /// For Replacing and VersionedCollapsing mode. Can be empty for Replacing.
278 279 280 281 282 283
        String version_column;

        /// For Graphite mode.
        Graphite::Params graphite_params;

        /// Check that needed columns are present and have correct types.
284
        void check(const NamesAndTypesList & columns) const;
285 286 287 288

        String getModeName() const;
    };

A
alesapin 已提交
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303
    /// Meta information about index granularity
    struct IndexGranularityInfo
    {
        /// Marks file extension '.mrk' or '.mrk2'
        String marks_file_extension;

        /// Size of one mark in file two or three size_t numbers
        UInt8 mark_size_in_bytes;

        /// Is stride in rows between marks non fixed?
        bool is_adaptive;

        /// Fixed size in rows of one granule if index_granularity_bytes is zero
        size_t fixed_index_granularity;

A
alesapin 已提交
304 305 306
        /// Approximate bytes size of one granule
        size_t index_granularity_bytes;

A
alesapin 已提交
307 308 309 310 311 312 313 314
        IndexGranularityInfo(const MergeTreeSettings & settings);

        String getMarksFilePath(const String & column_path) const
        {
            return column_path + marks_file_extension;
        }
    };

315

316
    /// Attach the table corresponding to the directory in full_path inside schema (must end with /), with the given columns.
317 318
    /// Correctness of names and paths is not checked.
    ///
319 320 321 322 323 324 325 326
    /// date_column_name - if not empty, the name of the Date column used for partitioning by month.
    ///     Otherwise, partition_by_ast is used for partitioning.
    ///
    /// order_by_ast - a single expression or a tuple. It is used as a sorting key
    ///     (an ASTExpressionList used for sorting data in parts);
    /// primary_key_ast - can be nullptr, an expression, or a tuple.
    ///     Used to determine an ASTExpressionList values of which are written in the primary.idx file
    ///     for one row in every `index_granularity` rows to speed up range queries.
A
Alexey Zatelepin 已提交
327
    ///     Primary key must be a prefix of the sorting key;
328
    ///     If it is nullptr, then it will be determined from order_by_ast.
A
Alexey Zatelepin 已提交
329
    ///
330 331
    /// require_part_metadata - should checksums.txt and columns.txt exist in the part directory.
    /// attach - whether the existing table is attached or the new table is created.
332
    MergeTreeData(const String & database_, const String & table_,
333
                  const ColumnsDescription & columns_,
N
Nikita Vasilev 已提交
334
                  const IndicesDescription & indices_,
335 336
                  Context & context_,
                  const String & date_column_name,
337 338 339
                  const ASTPtr & partition_by_ast_,
                  const ASTPtr & order_by_ast_,
                  const ASTPtr & primary_key_ast_,
340
                  const ASTPtr & sample_by_ast_, /// nullptr, if sampling is not supported.
A
Anton Popov 已提交
341
                  const ASTPtr & ttl_table_ast_,
342 343 344 345 346
                  const MergingParams & merging_params_,
                  const MergeTreeSettings & settings_,
                  bool require_part_metadata_,
                  bool attach,
                  BrokenPartCallback broken_part_callback_ = [](const String &){});
347

348 349 350 351
    ASTPtr getPartitionKeyAST() const override { return partition_by_ast; }
    ASTPtr getSortingKeyAST() const override { return sorting_key_expr_ast; }
    ASTPtr getPrimaryKeyAST() const override { return primary_key_expr_ast; }
    ASTPtr getSamplingKeyAST() const override { return sample_by_ast; }
352

353 354 355 356 357
    Names getColumnsRequiredForPartitionKey() const override { return (partition_key_expr ? partition_key_expr->getRequiredColumns() : Names{}); }
    Names getColumnsRequiredForSortingKey() const override { return sorting_key_expr->getRequiredColumns(); }
    Names getColumnsRequiredForPrimaryKey() const override { return primary_key_expr->getRequiredColumns(); }
    Names getColumnsRequiredForSampling() const override { return columns_required_for_sampling; }
    Names getColumnsRequiredForFinal() const override { return sorting_key_expr->getRequiredColumns(); }
358

359
    bool supportsPrewhere() const override { return true; }
360
    bool supportsSampling() const override { return sample_by_ast != nullptr; }
361

362
    bool supportsFinal() const override
363 364 365 366
    {
        return merging_params.mode == MergingParams::Collapsing
            || merging_params.mode == MergingParams::Summing
            || merging_params.mode == MergingParams::Aggregating
367
            || merging_params.mode == MergingParams::Replacing
368
            || merging_params.mode == MergingParams::VersionedCollapsing;
369 370
    }

371
    bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context &) const override;
372

373
    NameAndTypePair getColumn(const String & column_name) const override
374 375
    {
        if (column_name == "_part")
376
            return NameAndTypePair("_part", std::make_shared<DataTypeString>());
377
        if (column_name == "_part_index")
378
            return NameAndTypePair("_part_index", std::make_shared<DataTypeUInt64>());
379 380
        if (column_name == "_partition_id")
            return NameAndTypePair("_partition_id", std::make_shared<DataTypeString>());
381
        if (column_name == "_sample_factor")
382
            return NameAndTypePair("_sample_factor", std::make_shared<DataTypeFloat64>());
383

A
Alexey Zatelepin 已提交
384
        return getColumns().getPhysical(column_name);
385 386 387 388
    }

    bool hasColumn(const String & column_name) const override
    {
A
Alexey Zatelepin 已提交
389
        return getColumns().hasPhysical(column_name)
390 391
            || column_name == "_part"
            || column_name == "_part_index"
392
            || column_name == "_partition_id"
393 394 395
            || column_name == "_sample_factor";
    }

396 397
    String getDatabaseName() const override { return database_name; }
    String getTableName() const override { return table_name; }
398

399 400
    /// Load the set of data parts from disk. Call once - immediately after the object is created.
    void loadDataParts(bool skip_sanity_checks);
401 402 403

    String getLogName() const { return log_name; }

404 405
    Int64 getMaxBlockNumber() const;

406
    /// Returns a copy of the list so that the caller shouldn't worry about locks.
407
    DataParts getDataParts(const DataPartStates & affordable_states) const;
408 409 410
    /// Returns sorted list of the parts with specified states
    ///  out_states will contain snapshot of each part state
    DataPartsVector getDataPartsVector(const DataPartStates & affordable_states, DataPartStateVector * out_states = nullptr) const;
411

412 413
    /// Returns absolutely all parts (and snapshot of their states)
    DataPartsVector getAllDataPartsVector(DataPartStateVector * out_states = nullptr) const;
414 415

    /// Returns Committed parts
416 417
    DataParts getDataParts() const;
    DataPartsVector getDataPartsVector() const;
418

419
    /// Returns a committed part with the given name or a part containing it. If there is no such part, returns nullptr.
420
    DataPartPtr getActiveContainingPart(const String & part_name);
421
    DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info);
422 423 424 425
    DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info, DataPartState state, DataPartsLock &lock);

    /// Returns all parts in specified partition
    DataPartsVector getDataPartsVectorInPartition(DataPartState state, const String & partition_id);
426

427 428
    /// Returns the part with the given name and state or nullptr if no such part.
    DataPartPtr getPartIfExists(const String & part_name, const DataPartStates & valid_states);
429
    DataPartPtr getPartIfExists(const MergeTreePartInfo & part_info, const DataPartStates & valid_states);
430

431 432 433
    /// Total size of active parts in bytes.
    size_t getTotalActiveSizeInBytes() const;

434
    size_t getPartsCount() const;
435
    size_t getMaxPartsCountForPartition() const;
436

437 438 439 440
    /// Get min value of part->info.getDataVersion() for all active parts.
    /// Makes sense only for ordinary MergeTree engines because for them block numbering doesn't depend on partition.
    std::optional<Int64> getMinPartDataVersion() const;

441 442
    /// If the table contains too many active parts, sleep for a while to give them time to merge.
    /// If until is non-null, wake up from the sleep earlier if the event happened.
443
    void delayInsertOrThrowIfNeeded(Poco::Event * until = nullptr) const;
444
    void throwInsertIfNeeded() const;
445

446
    /// Renames temporary part to a permanent part and adds it to the parts set.
447
    /// It is assumed that the part does not intersect with existing parts.
448 449 450 451
    /// If increment != nullptr, part index is determing using increment. Otherwise part index remains unchanged.
    /// If out_transaction != nullptr, adds the part in the PreCommitted state (the part will be added to the
    /// active set later with out_transaction->commit()).
    /// Else, commits the part immediately.
452 453
    void renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr);

454 455 456
    /// The same as renameTempPartAndAdd but the block range of the part can contain existing parts.
    /// Returns all parts covered by the added part (in ascending order).
    /// If out_transaction == nullptr, marks covered parts as Outdated.
457 458 459
    DataPartsVector renameTempPartAndReplace(
        MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr);

460 461 462 463 464
    /// Low-level version of previous one, doesn't lock mutex
    void renameTempPartAndReplace(
            MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction, DataPartsLock & lock,
            DataPartsVector * out_covered_parts = nullptr);

465 466
    /// Removes parts from the working set parts.
    /// Parts in add must already be in data_parts with PreCommitted, Committed, or Outdated states.
467 468
    /// If clear_without_timeout is true, the parts will be deleted at once, or during the next call to
    /// clearOldParts (ignoring old_parts_lifetime).
469 470
    void removePartsFromWorkingSet(const DataPartsVector & remove, bool clear_without_timeout, DataPartsLock * acquired_lock = nullptr);
    void removePartsFromWorkingSet(const DataPartsVector & remove, bool clear_without_timeout, DataPartsLock & acquired_lock);
471

472 473 474 475 476 477 478 479 480 481 482 483
    /// Removes all parts from the working set parts
    ///  for which (partition_id = drop_range.partition_id && min_block >= drop_range.min_block && max_block <= drop_range.max_block).
    /// If a part intersecting drop_range.max_block is found, an exception will be thrown.
    /// Used in REPLACE PARTITION command;
    DataPartsVector removePartsInRangeFromWorkingSet(const MergeTreePartInfo & drop_range, bool clear_without_timeout,
                                                     bool skip_intersecting_parts, DataPartsLock & lock);

    /// Renames the part to detached/<prefix>_<part> and removes it from working set.
    void removePartsFromWorkingSetAndCloneToDetached(const DataPartsVector & parts, bool clear_without_timeout, const String & prefix = "");

    /// Renames the part to detached/<prefix>_<part> and removes it from data_parts,
    //// so it will not be deleted in clearOldParts.
484
    /// If restore_covered is true, adds to the working set inactive parts, which were merged into the deleted part.
485
    void forgetPartAndMoveToDetached(const DataPartPtr & part, const String & prefix = "", bool restore_covered = false);
486

487 488 489
    /// If the part is Obsolete and not used by anybody else, immediately delete it from filesystem and remove from memory.
    void tryRemovePartImmediately(DataPartPtr && part);

490 491 492 493
    /// Returns old inactive parts that can be deleted. At the same time removes them from the list of parts
    /// but not from the disk.
    DataPartsVector grabOldParts();

494 495 496 497 498
    /// Reverts the changes made by grabOldParts(), parts should be in Deleting state.
    void rollbackDeletingParts(const DataPartsVector & parts);

    /// Removes parts from data_parts, they should be in Deleting state
    void removePartsFinally(const DataPartsVector & parts);
499

500 501
    /// Delete irrelevant parts from memory and disk.
    void clearOldPartsFromFilesystem();
502

503
    /// Delete all directories which names begin with "tmp"
504
    /// Set non-negative parameter value to override MergeTreeSettings temporary_directories_lifetime
505
    /// Must be called with locked lockStructureForShare().
506
    void clearOldTemporaryDirectories(ssize_t custom_directories_lifetime_seconds = -1);
507 508 509 510 511 512 513 514

    /// After the call to dropAllData() no method can be called.
    /// Deletes the data directory and flushes the uncompressed blocks cache and the marks cache.
    void dropAllData();

    /// Moves the entire data directory.
    /// Flushes the uncompressed blocks cache and the marks cache.
    /// Must be called with locked lockStructureForAlter().
515
    void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
516 517 518 519

    /// Check if the ALTER can be performed:
    /// - all needed columns are present.
    /// - all type conversions can be done.
N
Nikita Vasilev 已提交
520
    /// - columns corresponding to primary key, indices, sign, sampling expression and date are not affected.
521
    /// If something is wrong, throws an exception.
N
setting  
Nikita Vasilev 已提交
522
    void checkAlter(const AlterCommands & commands, const Context & context);
523 524 525 526 527 528 529

    /// Performs ALTER of the data part, writes the result to temporary files.
    /// Returns an object allowing to rename temporary files to permanent files.
    /// If the number of affected columns is suspiciously high and skip_sanity_checks is false, throws an exception.
    /// If no data transformations are necessary, returns nullptr.
    AlterDataPartTransactionPtr alterDataPart(
        const DataPartPtr & part,
530
        const NamesAndTypesList & new_columns,
N
ASTs  
Nikita Vasilev 已提交
531
        const IndicesASTs & new_indices,
532 533
        bool skip_sanity_checks);

A
Anton Popov 已提交
534 535 536
    /// Remove columns, that have been markedd as empty after zeroing values with expired ttl
    void removeEmptyColumnsFromPart(MergeTreeData::MutableDataPartPtr & data_part);

I
Ivan Lezhankin 已提交
537 538 539
    /// Freezes all parts.
    void freezeAll(const String & with_name, const Context & context);

540
    /// Should be called if part data is suspected to be corrupted.
A
Alexey Milovidov 已提交
541
    void reportBrokenPart(const String & name) const
542 543 544 545
    {
        broken_part_callback(name);
    }

546 547 548 549 550 551
    /** Get the key expression AST as an ASTExpressionList.
      * It can be specified in the tuple: (CounterID, Date),
      *  or as one column: CounterID.
      */
    static ASTPtr extractKeyExpressionList(const ASTPtr & node);

A
Alexey Zatelepin 已提交
552
    bool hasSortingKey() const { return !sorting_key_columns.empty(); }
553
    bool hasPrimaryKey() const { return !primary_key_columns.empty(); }
N
Nikita Vasilev 已提交
554
    bool hasSkipIndices() const { return !skip_indices.empty(); }
A
Anton Popov 已提交
555
    bool hasTableTTL() const { return ttl_table_ast != nullptr; }
556

557
    /// Check that the part is not broken and calculate the checksums for it if they are not present.
558
    MutableDataPartPtr loadPartAndFixMetadata(const DiskPtr & disk, const String & relative_path);
559 560 561 562 563

    /** Create local backup (snapshot) for parts with specified prefix.
      * Backup is created in directory clickhouse_dir/shadow/i/, where i - incremental number,
      *  or if 'with_name' is specified - backup is created in directory with specified name.
      */
564
    void freezePartition(const ASTPtr & partition, const String & with_name, const Context & context);
565 566 567

    size_t getColumnCompressedSize(const std::string & name) const
    {
A
Alexey Milovidov 已提交
568
        auto lock = lockParts();
569 570 571 572
        const auto it = column_sizes.find(name);
        return it == std::end(column_sizes) ? 0 : it->second.data_compressed;
    }

573 574
    using ColumnSizeByName = std::unordered_map<std::string, DataPart::ColumnSize>;
    ColumnSizeByName getColumnSizes() const
575
    {
A
Alexey Milovidov 已提交
576
        auto lock = lockParts();
577 578 579
        return column_sizes;
    }

580
    /// Calculates column sizes in compressed form for the current state of data_parts.
581 582
    void recalculateColumnSizes()
    {
A
Alexey Milovidov 已提交
583
        auto lock = lockParts();
584 585 586
        calculateColumnSizesImpl();
    }

587
    /// For ATTACH/DETACH/DROP PARTITION.
588
    String getPartitionIDFromQuery(const ASTPtr & partition, const Context & context);
589

590 591 592
    /// Extracts MergeTreeData of other *MergeTree* storage
    ///  and checks that their structure suitable for ALTER TABLE ATTACH PARTITION FROM
    /// Tables structure should be locked.
593
    MergeTreeData & checkStructureAndGetMergeTreeData(const StoragePtr & source_table) const;
594 595

    MergeTreeData::MutableDataPartPtr cloneAndLoadDataPart(const MergeTreeData::DataPartPtr & src_part, const String & tmp_part_prefix,
596

597
                                                   const MergeTreePartInfo & dst_part_info);
598
    virtual std::vector<MergeTreeMutationStatus> getMutationsStatus() const = 0;
599

600 601
    String getFullPathOnDisk(const DiskPtr & disk) const;

602
    Strings getDataPaths() const override;
603

604 605 606
    DiskSpaceMonitor::ReservationPtr reserveSpaceAtDisk(UInt64 expected_size);

    DiskSpaceMonitor::ReservationPtr reserveSpaceForPart(UInt64 expected_size);
607

608 609 610 611
    /// Choose disk with max available free space
    /// Reserves 0 bytes
    DiskSpaceMonitor::ReservationPtr reserveOnMaxDiskWithoutReservation() { return schema.reserveOnMaxDiskWithoutReservation(); }

612 613
    MergeTreeDataFormatVersion format_version;

614
    Context global_context;
A
alesapin 已提交
615
    IndexGranularityInfo index_granularity_info;
616 617 618 619

    /// Merging params - what additional actions to perform during merge.
    const MergingParams merging_params;

620
    bool is_custom_partitioned = false;
621
    ExpressionActionsPtr partition_key_expr;
622
    Block partition_key_sample;
623

624 625 626 627
    ExpressionActionsPtr minmax_idx_expr;
    Names minmax_idx_columns;
    DataTypes minmax_idx_column_types;
    Int64 minmax_idx_date_column_pos = -1; /// In a common case minmax index includes a date column.
628
    Int64 minmax_idx_time_column_pos = -1; /// In other cases, minmax index often includes a dateTime column.
629

N
indices  
Nikita Vasilev 已提交
630 631
    /// Secondary (data skipping) indices for MergeTree
    MergeTreeIndices skip_indices;
N
Nikita Vasilev 已提交
632 633 634

    ExpressionActionsPtr primary_key_and_skip_indices_expr;
    ExpressionActionsPtr sorting_key_and_skip_indices_expr;
N
ptrs  
Nikita Vasilev 已提交
635

636 637
    /// Names of columns for primary key + secondary sorting columns.
    Names sorting_key_columns;
638
    ASTPtr sorting_key_expr_ast;
639 640 641 642
    ExpressionActionsPtr sorting_key_expr;

    /// Names of columns for primary key.
    Names primary_key_columns;
643
    ASTPtr primary_key_expr_ast;
644 645 646 647
    ExpressionActionsPtr primary_key_expr;
    Block primary_key_sample;
    DataTypes primary_key_data_types;

A
Anton Popov 已提交
648 649 650 651 652 653 654 655 656 657 658
    struct TTLEntry
    {
        ExpressionActionsPtr expression;
        String result_column;
    };

    using TTLEntriesByName = std::unordered_map<String, TTLEntry>;
    TTLEntriesByName ttl_entries_by_name;

    TTLEntry ttl_table_entry;

659 660 661 662 663
    String sampling_expr_column_name;
    Names columns_required_for_sampling;

    const MergeTreeSettings settings;

664 665 666
    /// Limiting parallel sends per one table, used in DataPartsExchange
    std::atomic_uint current_table_sends {0};

667 668 669
    /// For generating names of temporary parts during insertion.
    SimpleIncrement insert_increment;

670
protected:
671
    friend struct MergeTreeDataPart;
672 673 674
    friend class MergeTreeDataMergerMutator;
    friend class ReplicatedMergeTreeAlterThread;
    friend struct ReplicatedMergeTreeTableMetadata;
675
    friend class StorageReplicatedMergeTree;
676

677 678 679
    ASTPtr partition_by_ast;
    ASTPtr order_by_ast;
    ASTPtr primary_key_ast;
680
    ASTPtr sample_by_ast;
A
Anton Popov 已提交
681
    ASTPtr ttl_table_ast;
682

683
    bool require_part_metadata;
684

685 686
    String database_name;
    String table_name;
687 688

    Schema schema;
689 690

    /// Current column sizes in compressed and uncompressed form.
691
    ColumnSizeByName column_sizes;
692

693
    /// Engine-specific methods
694 695 696 697 698
    BrokenPartCallback broken_part_callback;

    String log_name;
    Logger * log;

699 700 701

    /// Work with data parts

702 703
    struct TagByInfo{};
    struct TagByStateAndInfo{};
704 705 706 707 708 709 710 711 712

    static const MergeTreePartInfo & dataPartPtrToInfo(const DataPartPtr & part)
    {
        return part->info;
    }

    static DataPartStateAndInfo dataPartPtrToStateAndInfo(const DataPartPtr & part)
    {
        return {part->state, part->info};
713
    }
714 715 716

    using DataPartsIndexes = boost::multi_index_container<DataPartPtr,
        boost::multi_index::indexed_by<
717
            /// Index by Info
718
            boost::multi_index::ordered_unique<
719
                boost::multi_index::tag<TagByInfo>,
720 721
                boost::multi_index::global_fun<const DataPartPtr &, const MergeTreePartInfo &, dataPartPtrToInfo>
            >,
722
            /// Index by (State, Info), is used to obtain ordered slices of parts with the same state
723
            boost::multi_index::ordered_unique<
724
                boost::multi_index::tag<TagByStateAndInfo>,
725 726 727 728 729 730
                boost::multi_index::global_fun<const DataPartPtr &, DataPartStateAndInfo, dataPartPtrToStateAndInfo>,
                LessStateDataPart
            >
        >
    >;

731 732
    /// Current set of data parts.
    mutable std::mutex data_parts_mutex;
733
    DataPartsIndexes data_parts_indexes;
734 735
    DataPartsIndexes::index<TagByInfo>::type & data_parts_by_info;
    DataPartsIndexes::index<TagByStateAndInfo>::type & data_parts_by_state_and_info;
736

737 738
    using DataPartIteratorByInfo = DataPartsIndexes::index<TagByInfo>::type::iterator;
    using DataPartIteratorByStateAndInfo = DataPartsIndexes::index<TagByStateAndInfo>::type::iterator;
739

740
    boost::iterator_range<DataPartIteratorByStateAndInfo> getDataPartsStateRange(DataPartState state) const
741
    {
742 743
        auto begin = data_parts_by_state_and_info.lower_bound(state, LessStateDataPart());
        auto end = data_parts_by_state_and_info.upper_bound(state, LessStateDataPart());
744 745 746
        return {begin, end};
    }

747 748 749 750 751 752 753
    boost::iterator_range<DataPartIteratorByInfo> getDataPartsPartitionRange(const String & partition_id) const
    {
        auto begin = data_parts_by_info.lower_bound(PartitionID(partition_id), LessDataPart());
        auto end = data_parts_by_info.upper_bound(PartitionID(partition_id), LessDataPart());
        return {begin, end};
    }

754 755 756 757 758
    static decltype(auto) getStateModifier(DataPartState state)
    {
        return [state] (const DataPartPtr & part) { part->state = state; };
    }

759
    void modifyPartState(DataPartIteratorByStateAndInfo it, DataPartState state)
760
    {
761
        if (!data_parts_by_state_and_info.modify(it, getStateModifier(state)))
762 763 764
            throw Exception("Can't modify " + (*it)->getNameWithState(), ErrorCodes::LOGICAL_ERROR);
    }

765
    void modifyPartState(DataPartIteratorByInfo it, DataPartState state)
766
    {
767
        if (!data_parts_by_state_and_info.modify(data_parts_indexes.project<TagByStateAndInfo>(it), getStateModifier(state)))
768 769 770 771 772
            throw Exception("Can't modify " + (*it)->getNameWithState(), ErrorCodes::LOGICAL_ERROR);
    }

    void modifyPartState(const DataPartPtr & part, DataPartState state)
    {
773 774 775
        auto it = data_parts_by_info.find(part->info);
        if (it == data_parts_by_info.end() || (*it).get() != part.get())
            throw Exception("Part " + part->name + " doesn't exist", ErrorCodes::LOGICAL_ERROR);
776

777
        if (!data_parts_by_state_and_info.modify(data_parts_indexes.project<TagByStateAndInfo>(it), getStateModifier(state)))
778 779 780
            throw Exception("Can't modify " + (*it)->getNameWithState(), ErrorCodes::LOGICAL_ERROR);
    }

781 782 783 784 785 786

    /// Used to serialize calls to grabOldParts.
    std::mutex grab_old_parts_mutex;
    /// The same for clearOldTemporaryDirectories.
    std::mutex clear_old_temporary_directories_mutex;

A
Alexey Milovidov 已提交
787 788 789
    void setPrimaryKeyIndicesAndColumns(const ASTPtr & new_order_by_ast, const ASTPtr & new_primary_key_ast,
                                        const ColumnsDescription & new_columns,
                                        const IndicesDescription & indices_description, bool only_check = false);
790

791
    void initPartitionKey();
792

A
Anton Popov 已提交
793 794 795
    void setTTLExpressions(const ColumnsDescription::ColumnTTLs & new_column_ttls,
                           const ASTPtr & new_ttl_table_ast, bool only_check = false);

796 797 798 799 800 801 802
    /// Expression for column type conversion.
    /// If no conversions are needed, out_expression=nullptr.
    /// out_rename_map maps column files for the out_expression onto new table files.
    /// out_force_update_metadata denotes if metadata must be changed even if out_rename_map is empty (used
    /// for transformation-free changing of Enum values list).
    /// Files to be deleted are mapped to an empty string in out_rename_map.
    /// If part == nullptr, just checks that all type conversions are possible.
803
    void createConvertExpression(const DataPartPtr & part, const NamesAndTypesList & old_columns, const NamesAndTypesList & new_columns,
N
ASTs  
Nikita Vasilev 已提交
804
                                 const IndicesASTs & old_indices, const IndicesASTs & new_indices,
N
Nikita Vasilev 已提交
805
                                 ExpressionActionsPtr & out_expression, NameToNameMap & out_rename_map, bool & out_force_update_metadata) const;
806 807

    /// Calculates column sizes in compressed form for the current state of data_parts. Call with data_parts mutex locked.
808
    void calculateColumnSizesImpl();
809 810 811
    /// Adds or subtracts the contribution of the part to compressed column sizes.
    void addPartContributionToColumnSizes(const DataPartPtr & part);
    void removePartContributionToColumnSizes(const DataPartPtr & part);
812

813
    /// If there is no part in the partition with ID `partition_id`, returns empty ptr. Should be called under the lock.
814
    DataPartPtr getAnyPartInPartition(const String & partition_id, DataPartsLock & data_parts_lock);
815 816 817 818 819

    /// Return parts in the Committed set that are covered by the new_part_info or the part that covers it.
    /// Will check that the new part doesn't already exist and that it doesn't intersect existing part.
    DataPartsVector getActivePartsToReplace(
        const MergeTreePartInfo & new_part_info,
820
        const String & new_part_name,
821
        DataPartPtr & out_covering_part,
822
        DataPartsLock & data_parts_lock) const;
823

A
Alexey Milovidov 已提交
824
    /// Checks whether the column is in the primary key, possibly wrapped in a chain of functions with single argument.
825
    bool isPrimaryOrMinMaxKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node) const;
826 827

    /// Common part for |freezePartition()| and |freezeAll()|.
828
    using MatcherFn = std::function<bool(const DataPartPtr &)>;
829
    void freezePartitionsByMatcher(MatcherFn matcher, const String & with_name, const Context & context);
M
Merge  
Michael Kolupaev 已提交
830 831 832
};

}