diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h index 73cf5907a7fd720e9260c0f3de47558cdf9e350f..998c8d6f8fde395de3eda0b8dde5dffe7a554c0a 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h @@ -1,7 +1,5 @@ #pragma once -#include - #include #include #include @@ -20,6 +18,9 @@ #define MERGE_TREE_MARK_SIZE (2 * sizeof(size_t)) +struct SimpleIncrement; + + namespace DB { @@ -743,12 +744,12 @@ public: * Предполагается, что кусок не пересекается с существующими. * Если out_transaction не nullptr, присваивает туда объект, позволяющий откатить добавление куска (но не переименование). */ - void renameTempPartAndAdd(MutableDataPartPtr & part, Increment * increment = nullptr, Transaction * out_transaction = nullptr); + void renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr); /** То же, что renameTempPartAndAdd, но кусок может покрывать существующие куски. * Удаляет и возвращает все куски, покрытые добавляемым (в возрастающем порядке). */ - DataPartsVector renameTempPartAndReplace(MutableDataPartPtr & part, Increment * increment = nullptr, Transaction * out_transaction = nullptr); + DataPartsVector renameTempPartAndReplace(MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr); /** Убирает из рабочего набора куски remove и добавляет куски add. add должны уже быть в all_data_parts. * Если clear_without_timeout, данные будут удалены при следующем clearOldParts, игнорируя old_parts_lifetime. diff --git a/dbms/include/DB/Storages/StorageMergeTree.h b/dbms/include/DB/Storages/StorageMergeTree.h index d5d657f3c3ae9aa580a2aab4684bd62d8b480327..ec86c004d97477ef6e53894d2dd369e01ed2fcd1 100644 --- a/dbms/include/DB/Storages/StorageMergeTree.h +++ b/dbms/include/DB/Storages/StorageMergeTree.h @@ -6,6 +6,7 @@ #include #include #include +#include namespace DB @@ -107,7 +108,6 @@ private: String database_name; String table_name; String full_path; - Increment increment; Context & context; BackgroundProcessingPool & background_pool; @@ -117,6 +117,9 @@ private: MergeTreeDataWriter writer; MergeTreeDataMerger merger; + /// Для нумерации блоков. + SimpleIncrement increment; + MergeTreeData::DataParts currently_merging; Poco::FastMutex currently_merging_mutex; diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index b07d13279b84e1734c241d08f05eda8ea0f94496..e1b4cf42a09de5825bee5dd51f83a1474193092a 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -681,7 +682,7 @@ MergeTreeData::AlterDataPartTransaction::~AlterDataPartTransaction() } -void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, Increment * increment, Transaction * out_transaction) +void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction) { auto removed = renameTempPartAndReplace(part, increment, out_transaction); if (!removed.empty()) @@ -692,7 +693,7 @@ void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, Increment * } MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace( - MutableDataPartPtr & part, Increment * increment, Transaction * out_transaction) + MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction) { if (out_transaction && out_transaction->data) throw Exception("Using the same MergeTreeData::Transaction for overlapping transactions is invalid", ErrorCodes::LOGICAL_ERROR); @@ -710,7 +711,7 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace( * содержат ещё не добавленный кусок. */ if (increment) - part->left = part->right = increment->get(false); + part->left = part->right = increment->get(); String new_name = ActiveDataPartSet::getPartName(part->left_date, part->right_date, part->left, part->right, part->level); @@ -814,7 +815,9 @@ void MergeTreeData::attachPart(const DataPartPtr & part) if (!all_data_parts.insert(part).second) throw Exception("Part " + part->name + " is already attached", ErrorCodes::DUPLICATE_DATA_PART); + data_parts.insert(part); + addPartContributionToColumnSizes(part); } void MergeTreeData::renameAndDetachPart(const DataPartPtr & part, const String & prefix, bool restore_covered, bool move_to_detached) diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 6cdbd13b56d8390b6d830afd46a7b7b8520649eb..397a535fe67bced2fa062abb4bfbca1b8484cf56 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -29,18 +29,17 @@ StorageMergeTree::StorageMergeTree( const MergeTreeSettings & settings_) : IStorage{materialized_columns_, alias_columns_, column_defaults_}, path(path_), database_name(database_name_), table_name(table_name_), full_path(path + escapeForFileName(table_name) + '/'), - increment(full_path + "increment.txt"), context(context_), background_pool(context_.getBackgroundPool()), + context(context_), background_pool(context_.getBackgroundPool()), data(full_path, columns_, materialized_columns_, alias_columns_, column_defaults_, context_, primary_expr_ast_, date_column_name_, sampling_expression_, index_granularity_,mode_, sign_column_, columns_to_sum_, settings_, database_name_ + "." + table_name, false), reader(data), writer(data), merger(data), + increment(data.getMaxDataPartIndex()), log(&Logger::get(database_name_ + "." + table_name + " (StorageMergeTree)")), shutdown_called(false) { - increment.fixIfBroken(data.getMaxDataPartIndex()); - data.loadDataParts(false); data.clearOldParts(); } @@ -130,8 +129,6 @@ void StorageMergeTree::rename(const String & new_path_to_db, const String & new_ table_name = new_table_name; full_path = new_full_path; - increment.setPath(full_path + "increment.txt"); - /// TODO: Можно обновить названия логгеров у this, data, reader, writer, merger. } @@ -341,16 +338,11 @@ void StorageMergeTree::attachPartition(const Field & field, bool unreplicated, b LOG_DEBUG(log, "Checking data"); MergeTreeData::MutableDataPartPtr part = data.loadPartAndFixMetadata(source_path); - UInt64 index = increment.get(); - String new_part_name = ActiveDataPartSet::getPartName(part->left_date, part->right_date, index, index, 0); - part->renameTo(new_part_name); - part->name = new_part_name; - ActiveDataPartSet::parsePartName(part->name, *part); - - LOG_INFO(log, "Attaching part " << source_part_name << " from " << source_path << " as " << new_part_name); + LOG_INFO(log, "Attaching part " << source_part_name << " from " << source_path); + data.renameTempPartAndAdd(part, &increment); data.attachPart(part); - LOG_INFO(log, "Finished attaching part " << new_part_name); + LOG_INFO(log, "Finished attaching part"); } /// На месте удаленных кусков могут появиться новые, с другими данными.