From 61233b886a53b927a3841a2ca9e3161fab3eb88c Mon Sep 17 00:00:00 2001 From: Vitaliy Lyudvichenko Date: Wed, 24 Jan 2018 01:56:46 +0300 Subject: [PATCH] Better system.part_log. [#CLICKHOUSE-3342] --- dbms/src/Common/ErrorCodes.cpp | 1 + dbms/src/DataTypes/DataTypeEnum.h | 2 +- dbms/src/Interpreters/PartLog.cpp | 110 ++- dbms/src/Interpreters/PartLog.h | 26 +- dbms/src/Interpreters/SystemLog.h | 2 +- .../MergeTree/MergeTreeBlockOutputStream.cpp | 3 +- dbms/src/Storages/MergeTree/MergeTreeData.cpp | 42 +- .../Storages/MergeTree/MergeTreeDataPart.h | 2 +- .../ReplicatedMergeTreeBlockOutputStream.cpp | 32 +- dbms/src/Storages/StorageMergeTree.cpp | 71 +- .../Storages/StorageReplicatedMergeTree.cpp | 740 ++++++++++-------- .../src/Storages/StorageReplicatedMergeTree.h | 5 + 12 files changed, 617 insertions(+), 419 deletions(-) diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index 67c6c24c53..b584be8171 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -365,6 +365,7 @@ namespace ErrorCodes extern const int NO_COMMON_TYPE = 386; extern const int EXTERNAL_LOADABLE_ALREADY_EXISTS = 387; extern const int CANNOT_ASSIGN_OPTIMIZE = 388; + extern const int INSERT_WAS_DEDUPLICATED = 389; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/dbms/src/DataTypes/DataTypeEnum.h b/dbms/src/DataTypes/DataTypeEnum.h index 0487ca545a..54ab4b1cca 100644 --- a/dbms/src/DataTypes/DataTypeEnum.h +++ b/dbms/src/DataTypes/DataTypeEnum.h @@ -58,7 +58,7 @@ private: void fillMaps(); public: - DataTypeEnum(const Values & values_); + explicit DataTypeEnum(const Values & values_); DataTypeEnum(const DataTypeEnum & other); const Values & getValues() const { return values; } diff --git a/dbms/src/Interpreters/PartLog.cpp b/dbms/src/Interpreters/PartLog.cpp index 7683e9f438..9c537ba535 100644 --- a/dbms/src/Interpreters/PartLog.cpp +++ b/dbms/src/Interpreters/PartLog.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -16,21 +17,38 @@ namespace DB Block PartLogElement::createBlock() { + auto event_type_datatype = std::make_shared( + DataTypeEnum8::Values{ + {"NEW_PART", static_cast(NEW_PART)}, + {"MERGE_PARTS", static_cast(MERGE_PARTS)}, + {"DOWNLOAD_PART", static_cast(DOWNLOAD_PART)}, + {"REMOVE_PART", static_cast(REMOVE_PART)} + } + ); + return { - {ColumnUInt8::create(), std::make_shared(), "event_type"}, - - {ColumnUInt16::create(), std::make_shared(), "event_date"}, - {ColumnUInt32::create(), std::make_shared(), "event_time"}, - - {ColumnUInt64::create(), std::make_shared(), "size_in_bytes"}, - {ColumnUInt64::create(), std::make_shared(), "duration_ms"}, - - {ColumnString::create(), std::make_shared(), "database"}, - {ColumnString::create(), std::make_shared(), "table"}, - {ColumnString::create(), std::make_shared(), "part_name"}, - {ColumnArray::create(ColumnString::create()), - std::make_shared(std::make_shared()), "merged_from"}, + {ColumnInt8::create(), std::move(event_type_datatype), "event_type"}, + {ColumnUInt16::create(), std::make_shared(), "event_date"}, + {ColumnUInt32::create(), std::make_shared(), "event_time"}, + {ColumnUInt64::create(), std::make_shared(), "duration_ms"}, + + {ColumnString::create(), std::make_shared(), "database"}, + {ColumnString::create(), std::make_shared(), "table"}, + {ColumnString::create(), std::make_shared(), "part_name"}, + + {ColumnUInt64::create(), std::make_shared(), "rows"}, + {ColumnUInt64::create(), std::make_shared(), "size_in_bytes"}, // On disk + + /// Merge-specific info + {ColumnArray::create(ColumnString::create()), std::make_shared(std::make_shared()), "merged_from"}, + {ColumnUInt64::create(), std::make_shared(), "bytes_uncompressed"}, // Result bytes + {ColumnUInt64::create(), std::make_shared(), "read_rows"}, + {ColumnUInt64::create(), std::make_shared(), "read_bytes"}, + + /// Is there an error during the execution or commit + {ColumnUInt16::create(), std::make_shared(), "error"}, + {ColumnString::create(), std::make_shared(), "exception"}, }; } @@ -40,41 +58,71 @@ void PartLogElement::appendToBlock(Block & block) const size_t i = 0; - columns[i++]->insert(UInt64(event_type)); + columns[i++]->insert(Int64(event_type)); columns[i++]->insert(UInt64(DateLUT::instance().toDayNum(event_time))); columns[i++]->insert(UInt64(event_time)); - - columns[i++]->insert(UInt64(size_in_bytes)); columns[i++]->insert(UInt64(duration_ms)); columns[i++]->insert(database_name); columns[i++]->insert(table_name); columns[i++]->insert(part_name); - Array merged_from_array; - merged_from_array.reserve(merged_from.size()); - for (const auto & name : merged_from) - merged_from_array.push_back(name); + columns[i++]->insert(UInt64(rows)); + columns[i++]->insert(UInt64(bytes_compressed_on_disk)); + + Array source_part_names_array; + source_part_names_array.reserve(source_part_names.size()); + for (const auto & name : source_part_names) + source_part_names_array.push_back(name); + + columns[i++]->insert(source_part_names_array); + + columns[i++]->insert(UInt64(bytes_uncompressed)); + columns[i++]->insert(UInt64(rows_read)); + columns[i++]->insert(UInt64(bytes_read_uncompressed)); - columns[i++]->insert(merged_from_array); + columns[i++]->insert(UInt64(error)); + columns[i++]->insert(exception); block.setColumns(std::move(columns)); } -void PartLog::addNewPart(const MergeTreeDataPart & part, double elapsed) + +bool PartLog::addNewPartToTheLog(Context & context, const MergeTreeDataPart & part, UInt64 elapsed_ns, const ExecutionStatus & execution_status) { - PartLogElement elem; - elem.event_time = time(nullptr); + PartLog * part_log = nullptr; + + try + { + part_log = context.getPartLog(part.storage.getDatabaseName(), part.storage.getTableName()); + if (!part_log) + return false; + + PartLogElement elem; - elem.event_type = PartLogElement::NEW_PART; - elem.size_in_bytes = part.size_in_bytes; - elem.duration_ms = elapsed / 1000000; + elem.event_type = PartLogElement::NEW_PART; + elem.event_time = time(nullptr); + elem.duration_ms = elapsed_ns / 1000000; - elem.database_name = part.storage.getDatabaseName(); - elem.table_name = part.storage.getTableName(); - elem.part_name = part.name; + elem.database_name = part.storage.getDatabaseName(); + elem.table_name = part.storage.getTableName(); + elem.part_name = part.name; + + elem.bytes_compressed_on_disk = part.size_in_bytes; + elem.rows = part.rows_count; + + elem.error = static_cast(execution_status.code); + elem.exception = execution_status.message; + + part_log->add(elem); + } + catch (...) + { + tryLogCurrentException(part_log ? part_log->log : &Logger::get("PartLog"), __PRETTY_FUNCTION__); + return false; + } - add(elem); + return true; } } diff --git a/dbms/src/Interpreters/PartLog.h b/dbms/src/Interpreters/PartLog.h index ef7d5b80fc..8d1948492a 100644 --- a/dbms/src/Interpreters/PartLog.h +++ b/dbms/src/Interpreters/PartLog.h @@ -18,15 +18,28 @@ struct PartLogElement Type event_type = NEW_PART; - time_t event_time{}; - - UInt64 size_in_bytes{}; - UInt64 duration_ms{}; + time_t event_time = 0; + UInt64 duration_ms = 0; String database_name; String table_name; String part_name; - Strings merged_from; + + /// Size of the part + UInt64 rows = 0; + + /// Size of files in filesystem + UInt64 bytes_compressed_on_disk = 0; + + //// Make sense for Merges + Strings source_part_names; + UInt64 bytes_uncompressed = 0; + UInt64 rows_read = 0; + UInt64 bytes_read_uncompressed = 0; + + /// Is the operation was successful? + UInt16 error = 0; + String exception; static std::string name() { return "PartLog"; } @@ -44,7 +57,8 @@ class PartLog : public SystemLog public: /// Add a record about creation of new part. - void addNewPart(const MergeTreeDataPart & part, double elapsed); + static bool addNewPartToTheLog(Context & context, const MergeTreeDataPart & part, UInt64 elapsed_ns, + const ExecutionStatus & execution_status = {}); }; } diff --git a/dbms/src/Interpreters/SystemLog.h b/dbms/src/Interpreters/SystemLog.h index 58655c421f..faba63bc2a 100644 --- a/dbms/src/Interpreters/SystemLog.h +++ b/dbms/src/Interpreters/SystemLog.h @@ -101,7 +101,7 @@ public: LOG_ERROR(log, "SystemLog queue is full"); } -private: +protected: Context & context; const String database_name; const String table_name; diff --git a/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp index c36d04c30a..46b5f47043 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp @@ -18,8 +18,7 @@ void MergeTreeBlockOutputStream::write(const Block & block) MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block); storage.data.renameTempPartAndAdd(part, &storage.increment); - if (auto part_log = storage.context.getPartLog(part->storage.getDatabaseName(), part->storage.getTableName())) - part_log->addNewPart(*part, watch.elapsed()); + PartLog::addNewPartToTheLog(storage.context, *part, watch.elapsed()); /// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'. storage.merge_task_handle->wake(); diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index c3afacf499..7439637f01 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -47,6 +47,7 @@ #include #include #include +#include namespace ProfileEvents @@ -656,18 +657,43 @@ void MergeTreeData::rollbackDeletingParts(const MergeTreeData::DataPartsVector & void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & parts) { - std::lock_guard lock(data_parts_mutex); + { + std::lock_guard lock(data_parts_mutex); - /// TODO: use data_parts iterators instead of pointers - for (auto & part : parts) + /// TODO: use data_parts iterators instead of pointers + for (auto & part : parts) + { + auto it = data_parts_by_name.find(part->info); + if (it == data_parts_by_name.end()) + throw Exception("Deleting data part " + part->name + " is not exist", ErrorCodes::LOGICAL_ERROR); + + (*it)->assertState({DataPartState::Deleting}); + + data_parts_indexes.erase(it); + } + } + + /// Data parts is still alive (since DataPartsVector holds shared_ptrs) and contain useful metainformation for logging + /// NOTE: There is no need to log parts deletion somewhere else, all deleting parts pass through this function and pass away + if (auto part_log = context.getPartLog(database_name, table_name)) { - auto it = data_parts_by_name.find(part->info); - if (it == data_parts_by_name.end()) - throw Exception("Deleting data part " + part->name + " is not exist", ErrorCodes::LOGICAL_ERROR); + PartLogElement part_log_elem; + + part_log_elem.event_type = PartLogElement::REMOVE_PART; + part_log_elem.event_time = time(nullptr); + part_log_elem.duration_ms = 0; - (*it)->assertState({DataPartState::Deleting}); + part_log_elem.database_name = database_name; + part_log_elem.table_name = table_name; - data_parts_indexes.erase(it); + for (auto & part : parts) + { + part_log_elem.part_name = part->name; + part_log_elem.bytes_compressed_on_disk = part->size_in_bytes; + part_log_elem.rows = part->rows_count; + + part_log->add(part_log_elem); + } } } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPart.h b/dbms/src/Storages/MergeTree/MergeTreeDataPart.h index 17d15eba79..a1cff20a3a 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPart.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPart.h @@ -130,7 +130,7 @@ struct MergeTreeDataPart String name; MergeTreePartInfo info; - /// A directory path (realative to storage's path) where part data is actually stored + /// A directory path (relative to storage's path) where part data is actually stored /// Examples: 'detached/tmp_fetch_', 'tmp_', '' mutable String relative_path; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp index b15639a73e..4d4ed1f74f 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp @@ -20,6 +20,7 @@ namespace ErrorCodes extern const int NO_ZOOKEEPER; extern const int READONLY; extern const int UNKNOWN_STATUS_OF_INSERT; + extern const int INSERT_WAS_DEDUPLICATED; } @@ -141,10 +142,21 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block) LOG_DEBUG(log, "Wrote block with " << block.rows() << " rows"); } - commitPart(zookeeper, part, block_id); - if (auto part_log = storage.context.getPartLog(part->storage.getDatabaseName(), part->storage.getTableName())) - part_log->addNewPart(*part, watch.elapsed()); + + try + { + commitPart(zookeeper, part, block_id); + + /// Set a special error code if the block is duplicate + int error = (deduplicate && last_block_is_duplicate) ? ErrorCodes::INSERT_WAS_DEDUPLICATED : 0; + PartLog::addNewPartToTheLog(storage.context, *part, watch.elapsed(), ExecutionStatus(error)); + } + catch (...) + { + PartLog::addNewPartToTheLog(storage.context, *part, watch.elapsed(), ExecutionStatus::fromCurrentException(__PRETTY_FUNCTION__)); + throw; + } } } @@ -163,10 +175,16 @@ void ReplicatedMergeTreeBlockOutputStream::writeExistingPart(MergeTreeData::Muta Stopwatch watch; - commitPart(zookeeper, part, ""); - - if (auto part_log = storage.context.getPartLog(part->storage.getDatabaseName(), part->storage.getTableName())) - part_log->addNewPart(*part, watch.elapsed()); + try + { + commitPart(zookeeper, part, ""); + PartLog::addNewPartToTheLog(storage.context, *part, watch.elapsed()); + } + catch (...) + { + PartLog::addNewPartToTheLog(storage.context, *part, watch.elapsed(), ExecutionStatus::fromCurrentException(__PRETTY_FUNCTION__)); + throw; + } } diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 4fec351d6a..9bc637178a 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -330,45 +330,66 @@ bool StorageMergeTree::merge( merging_tagger.emplace(future_part.parts, MergeTreeDataMerger::estimateDiskSpaceForMerge(future_part.parts), *this); } - MergeList::EntryPtr merge_entry_ptr = context.getMergeList().insert(database_name, table_name, future_part.name, future_part.parts); + MergeList::EntryPtr merge_entry = context.getMergeList().insert(database_name, table_name, future_part.name, future_part.parts); /// Logging Stopwatch stopwatch; + MergeTreeData::MutableDataPartPtr new_part; - auto new_part = merger.mergePartsToTemporaryPart( - future_part, *merge_entry_ptr, aio_threshold, time(nullptr), merging_tagger->reserved_space.get(), deduplicate); + auto write_part_log = [&] (const ExecutionStatus & execution_status) + { + try + { + auto part_log = context.getPartLog(database_name, table_name); + if (!part_log) + return; - merger.renameMergedTemporaryPart(new_part, future_part.parts, nullptr); + PartLogElement part_log_elem; - if (auto part_log = context.getPartLog(database_name, table_name)) - { - PartLogElement elem; - elem.event_time = time(nullptr); + part_log_elem.event_type = PartLogElement::MERGE_PARTS; + part_log_elem.event_time = time(nullptr); + part_log_elem.duration_ms = stopwatch.elapsed() / 1000000; - elem.merged_from.reserve(future_part.parts.size()); - for (const auto & part : future_part.parts) - elem.merged_from.push_back(part->name); - elem.event_type = PartLogElement::MERGE_PARTS; - elem.size_in_bytes = new_part->size_in_bytes; + part_log_elem.database_name = database_name; + part_log_elem.table_name = table_name; + part_log_elem.part_name = future_part.name; - elem.database_name = new_part->storage.getDatabaseName(); - elem.table_name = new_part->storage.getTableName(); - elem.part_name = new_part->name; + if (new_part) + part_log_elem.bytes_compressed_on_disk = new_part->size_in_bytes; - elem.duration_ms = stopwatch.elapsed() / 1000000; + part_log_elem.source_part_names.reserve(future_part.parts.size()); + for (const auto & source_part : future_part.parts) + part_log_elem.source_part_names.push_back(source_part->name); - part_log->add(elem); + part_log_elem.rows_read = (*merge_entry)->bytes_read_uncompressed; + part_log_elem.bytes_read_uncompressed = (*merge_entry)->bytes_read_uncompressed; - elem.duration_ms = 0; - elem.event_type = PartLogElement::REMOVE_PART; - elem.merged_from = Strings(); + part_log_elem.rows = (*merge_entry)->rows_written; + part_log_elem.bytes_uncompressed = (*merge_entry)->bytes_written_uncompressed; - for (const auto & part : future_part.parts) + part_log_elem.error = static_cast(execution_status.code); + part_log_elem.exception = execution_status.message; + + part_log->add(part_log_elem); + } + catch (...) { - elem.part_name = part->name; - elem.size_in_bytes = part->size_in_bytes; - part_log->add(elem); + tryLogCurrentException(log, __PRETTY_FUNCTION__); } + }; + + try + { + new_part = merger.mergePartsToTemporaryPart(future_part, *merge_entry, aio_threshold, time(nullptr), + merging_tagger->reserved_space.get(), deduplicate); + merger.renameMergedTemporaryPart(new_part, future_part.parts, nullptr); + + write_part_log({}); + } + catch (...) + { + write_part_log(ExecutionStatus::fromCurrentException()); + throw; } return true; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 308088d5b2..4a6339b94e 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -980,12 +980,33 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry) } bool do_fetch = false; - if (entry.type == LogEntry::GET_PART) { do_fetch = true; } else if (entry.type == LogEntry::MERGE_PARTS) + { + tryExecuteMerge(entry, do_fetch); + } + else + { + throw Exception("Unexpected log entry type: " + toString(static_cast(entry.type))); + } + + if (do_fetch) + return executeFetch(entry); + + return true; +} + + +void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTree::LogEntry & entry, bool & do_fetch) +{ + /// The caller has already decided to make the fetch + if (do_fetch) + return; + + // Log source part names just in case { std::stringstream log_message; log_message << "Executing log entry to merge parts "; @@ -994,366 +1015,388 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry) log_message << " to " << entry.new_part_name; LOG_TRACE(log, log_message.rdbuf()); + } - MergeTreeData::DataPartsVector parts; - bool have_all_parts = true; - for (const String & name : entry.parts_to_merge) + MergeTreeData::DataPartsVector parts; + bool have_all_parts = true; + for (const String & name : entry.parts_to_merge) + { + MergeTreeData::DataPartPtr part = data.getActiveContainingPart(name); + if (!part) { - MergeTreeData::DataPartPtr part = data.getActiveContainingPart(name); - if (!part) - { - have_all_parts = false; - break; - } - if (part->name != name) - { - LOG_WARNING(log, "Part " << name << " is covered by " << part->name - << " but should be merged into " << entry.new_part_name << ". This shouldn't happen often."); - have_all_parts = false; - break; - } - parts.push_back(part); + have_all_parts = false; + break; } - - if (!have_all_parts) + if (part->name != name) { - /// If you do not have all the necessary parts, try to take some already merged part from someone. - do_fetch = true; - LOG_DEBUG(log, "Don't have all parts for merge " << entry.new_part_name << "; will try to fetch it instead"); + LOG_WARNING(log, "Part " << name << " is covered by " << part->name + << " but should be merged into " << entry.new_part_name << ". This shouldn't happen often."); + have_all_parts = false; + break; } - else if (entry.create_time + data.settings.prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr)) - { - /// If entry is old enough, and have enough size, and part are exists in any replica, - /// then prefer fetching of merged part from replica. + parts.push_back(part); + } + + if (!have_all_parts) + { + /// If you do not have all the necessary parts, try to take some already merged part from someone. + do_fetch = true; + LOG_DEBUG(log, "Don't have all parts for merge " << entry.new_part_name << "; will try to fetch it instead"); + } + else if (entry.create_time + data.settings.prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr)) + { + /// If entry is old enough, and have enough size, and part are exists in any replica, + /// then prefer fetching of merged part from replica. - size_t sum_parts_size_in_bytes = 0; - for (const auto & part : parts) - sum_parts_size_in_bytes += part->size_in_bytes; + size_t sum_parts_size_in_bytes = 0; + for (const auto & part : parts) + sum_parts_size_in_bytes += part->size_in_bytes; - if (sum_parts_size_in_bytes >= data.settings.prefer_fetch_merged_part_size_threshold) + if (sum_parts_size_in_bytes >= data.settings.prefer_fetch_merged_part_size_threshold) + { + String replica = findReplicaHavingPart(entry.new_part_name, true); /// NOTE excessive ZK requests for same data later, may remove. + if (!replica.empty()) { - String replica = findReplicaHavingPart(entry.new_part_name, true); /// NOTE excessive ZK requests for same data later, may remove. - if (!replica.empty()) - { - do_fetch = true; - LOG_DEBUG(log, "Preffering to fetch " << entry.new_part_name << " from replica"); - } + do_fetch = true; + LOG_DEBUG(log, "Prefer to fetch " << entry.new_part_name << " from replica " << replica); } } + } - if (!do_fetch) - { - size_t estimated_space_for_merge = MergeTreeDataMerger::estimateDiskSpaceForMerge(parts); + if (do_fetch) + return; - /// Can throw an exception. - DiskSpaceMonitor::ReservationPtr reserved_space = DiskSpaceMonitor::reserve(full_path, estimated_space_for_merge); + /// Start to make the main work - auto table_lock = lockStructure(false, __PRETTY_FUNCTION__); + size_t estimated_space_for_merge = MergeTreeDataMerger::estimateDiskSpaceForMerge(parts); - MergeList::EntryPtr merge_entry = context.getMergeList().insert(database_name, table_name, entry.new_part_name, parts); - MergeTreeData::Transaction transaction; - size_t aio_threshold = context.getSettings().min_bytes_to_use_direct_io; + /// Can throw an exception. + DiskSpaceMonitor::ReservationPtr reserved_space = DiskSpaceMonitor::reserve(full_path, estimated_space_for_merge); - /// Logging - Stopwatch stopwatch; + auto table_lock = lockStructure(false, __PRETTY_FUNCTION__); - MergeTreeDataMerger::FuturePart future_merged_part(parts); - if (future_merged_part.name != entry.new_part_name) - throw Exception( - "Future merged part name `" + future_merged_part.name + - "` differs from part name in log entry: `" + entry.new_part_name + "`", - ErrorCodes::BAD_DATA_PART_NAME); + MergeList::EntryPtr merge_entry = context.getMergeList().insert(database_name, table_name, entry.new_part_name, parts); + MergeTreeData::Transaction transaction; + size_t aio_threshold = context.getSettings().min_bytes_to_use_direct_io; - auto part = merger.mergePartsToTemporaryPart( - future_merged_part, *merge_entry, aio_threshold, entry.create_time, reserved_space.get(), entry.deduplicate); + MergeTreeDataMerger::FuturePart future_merged_part(parts); + if (future_merged_part.name != entry.new_part_name) + { + throw Exception("Future merged part name `" + future_merged_part.name + "` differs from part name in log entry: `" + + entry.new_part_name + "`", ErrorCodes::BAD_DATA_PART_NAME); + } - zkutil::Ops ops; + /// Logging + Stopwatch stopwatch; + ExecutionStatus execution_status; + MergeTreeData::MutableDataPartPtr part; - try - { - /// Checksums are checked here and `ops` is filled. In fact, the part is added to ZK just below, when executing `multi`. - checkPartAndAddToZooKeeper(part, ops, entry.new_part_name); - } - catch (const Exception & e) - { - if (e.code() == ErrorCodes::CHECKSUM_DOESNT_MATCH - || e.code() == ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART - || e.code() == ErrorCodes::NO_FILE_IN_DATA_PART - || e.code() == ErrorCodes::UNEXPECTED_FILE_IN_DATA_PART) - { - do_fetch = true; - part->remove(); - - ProfileEvents::increment(ProfileEvents::DataAfterMergeDiffersFromReplica); - - LOG_ERROR(log, getCurrentExceptionMessage(false) << ". " - "Data after merge is not byte-identical to data on another replicas. " - "There could be several reasons: " - "1. Using newer version of compression library after server update. " - "2. Using another compression method. " - "3. Non-deterministic compression algorithm (highly unlikely). " - "4. Non-deterministic merge algorithm due to logical error in code. " - "5. Data corruption in memory due to bug in code. " - "6. Data corruption in memory due to hardware issue. " - "7. Manual modification of source data after server startup. " - "8. Manual modification of checksums stored in ZooKeeper. " - "We will download merged part from replica to force byte-identical result."); - } - else - throw; - } + auto write_part_log = [&] (const ExecutionStatus & execution_status) + { + try + { + auto part_log = context.getPartLog(database_name, table_name); + if (!part_log) + return; - if (!do_fetch) - { - merger.renameMergedTemporaryPart(part, parts, &transaction); + PartLogElement part_log_elem; - /// Do not commit if the part is obsolete - if (!transaction.isEmpty()) - { - getZooKeeper()->multi(ops); /// After long merge, get fresh ZK handle, because previous session may be expired. - transaction.commit(); - } + part_log_elem.event_type = PartLogElement::MERGE_PARTS; + part_log_elem.event_time = time(nullptr); + /// TODO: Stop stopwatch in outer code to exclude ZK timings and so on + part_log_elem.duration_ms = stopwatch.elapsed() / 1000000; - /** Removing old chunks from ZK and from the disk is delayed - see ReplicatedMergeTreeCleanupThread, clearOldParts. - */ + part_log_elem.database_name = database_name; + part_log_elem.table_name = table_name; + part_log_elem.part_name = entry.new_part_name; - /** With `ZCONNECTIONLOSS` or `ZOPERATIONTIMEOUT`, we can inadvertently roll back local changes to the parts. - * This is not a problem, because in this case the merge will remain in the queue, and we will try again. - */ - merge_selecting_event.set(); + if (part) + part_log_elem.bytes_compressed_on_disk = part->size_in_bytes; - if (auto part_log = context.getPartLog(database_name, table_name)) - { - PartLogElement elem; - elem.event_time = time(nullptr); + part_log_elem.source_part_names.reserve(parts.size()); + for (const auto & source_part : parts) + part_log_elem.source_part_names.push_back(source_part->name); - elem.merged_from.reserve(parts.size()); - for (const auto & part : parts) - elem.merged_from.push_back(part->name); - elem.event_type = PartLogElement::MERGE_PARTS; - elem.size_in_bytes = part->size_in_bytes; + part_log_elem.rows_read = (*merge_entry)->bytes_read_uncompressed; + part_log_elem.bytes_read_uncompressed = (*merge_entry)->bytes_read_uncompressed; - elem.database_name = part->storage.getDatabaseName(); - elem.table_name = part->storage.getTableName(); - elem.part_name = part->name; + part_log_elem.rows = (*merge_entry)->rows_written; + part_log_elem.bytes_uncompressed = (*merge_entry)->bytes_written_uncompressed; - elem.duration_ms = stopwatch.elapsed() / 1000000; + part_log_elem.error = static_cast(execution_status.code); + part_log_elem.exception = execution_status.message; - part_log->add(elem); + part_log->add(part_log_elem); + } + catch (...) + { + tryLogCurrentException(log, __PRETTY_FUNCTION__); + } + }; - elem.duration_ms = 0; - elem.event_type = PartLogElement::REMOVE_PART; - elem.merged_from = Strings(); + try + { + part = merger.mergePartsToTemporaryPart( + future_merged_part, *merge_entry, aio_threshold, entry.create_time, reserved_space.get(), entry.deduplicate); - for (const auto & part : parts) - { - elem.part_name = part->name; - elem.size_in_bytes = part->size_in_bytes; - part_log->add(elem); - } - } + zkutil::Ops ops; - ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges); + try + { + /// Checksums are checked here and `ops` is filled. In fact, the part is added to ZK just below, when executing `multi`. + checkPartAndAddToZooKeeper(part, ops, entry.new_part_name); + } + catch (const Exception & e) + { + if (e.code() == ErrorCodes::CHECKSUM_DOESNT_MATCH + || e.code() == ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART + || e.code() == ErrorCodes::NO_FILE_IN_DATA_PART + || e.code() == ErrorCodes::UNEXPECTED_FILE_IN_DATA_PART) + { + do_fetch = true; + part->remove(); + + ProfileEvents::increment(ProfileEvents::DataAfterMergeDiffersFromReplica); + + LOG_ERROR(log, getCurrentExceptionMessage(false) << ". " + "Data after merge is not byte-identical to data on another replicas. " + "There could be several reasons: " + "1. Using newer version of compression library after server update. " + "2. Using another compression method. " + "3. Non-deterministic compression algorithm (highly unlikely). " + "4. Non-deterministic merge algorithm due to logical error in code. " + "5. Data corruption in memory due to bug in code. " + "6. Data corruption in memory due to hardware issue. " + "7. Manual modification of source data after server startup. " + "8. Manual modification of checksums stored in ZooKeeper. " + "We will download merged part from replica to force byte-identical result."); + + write_part_log(ExecutionStatus::fromCurrentException()); + return; } + + throw; + } + + merger.renameMergedTemporaryPart(part, parts, &transaction); + + /// Do not commit if the part is obsolete + if (!transaction.isEmpty()) + { + getZooKeeper()->multi(ops); /// After long merge, get fresh ZK handle, because previous session may be expired. + transaction.commit(); } + + /** Removing old chunks from ZK and from the disk is delayed - see ReplicatedMergeTreeCleanupThread, clearOldParts. + */ + + /** With `ZCONNECTIONLOSS` or `ZOPERATIONTIMEOUT`, we can inadvertently roll back local changes to the parts. + * This is not a problem, because in this case the merge will remain in the queue, and we will try again. + */ + merge_selecting_event.set(); + ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges); + + write_part_log({}); } - else + catch (...) { - throw Exception("Unexpected log entry type: " + toString(static_cast(entry.type))); + write_part_log(ExecutionStatus::fromCurrentException()); + throw; } +} - if (do_fetch) - { - String replica = findReplicaHavingCoveringPart(entry, true); - static std::atomic_uint total_fetches {0}; - if (data.settings.replicated_max_parallel_fetches && total_fetches >= data.settings.replicated_max_parallel_fetches) - { - throw Exception("Too many total fetches from replicas, maximum: " + data.settings.replicated_max_parallel_fetches.toString(), - ErrorCodes::TOO_MUCH_FETCHES); - } +bool StorageReplicatedMergeTree::executeFetch(const StorageReplicatedMergeTree::LogEntry & entry) +{ + String replica = findReplicaHavingCoveringPart(entry, true); - ++total_fetches; - SCOPE_EXIT({--total_fetches;}); + static std::atomic_uint total_fetches {0}; + if (data.settings.replicated_max_parallel_fetches && total_fetches >= data.settings.replicated_max_parallel_fetches) + { + throw Exception("Too many total fetches from replicas, maximum: " + data.settings.replicated_max_parallel_fetches.toString(), + ErrorCodes::TOO_MUCH_FETCHES); + } - if (data.settings.replicated_max_parallel_fetches_for_table && current_table_fetches >= data.settings.replicated_max_parallel_fetches_for_table) - { - throw Exception("Too many fetches from replicas for table, maximum: " + data.settings.replicated_max_parallel_fetches_for_table.toString(), - ErrorCodes::TOO_MUCH_FETCHES); - } + ++total_fetches; + SCOPE_EXIT({--total_fetches;}); - ++current_table_fetches; - SCOPE_EXIT({--current_table_fetches;}); + if (data.settings.replicated_max_parallel_fetches_for_table && current_table_fetches >= data.settings.replicated_max_parallel_fetches_for_table) + { + throw Exception("Too many fetches from replicas for table, maximum: " + data.settings.replicated_max_parallel_fetches_for_table.toString(), + ErrorCodes::TOO_MUCH_FETCHES); + } - try + ++current_table_fetches; + SCOPE_EXIT({--current_table_fetches;}); + + try + { + if (replica.empty()) { - if (replica.empty()) + /** If a part is to be written with a quorum and the quorum is not reached yet, + * then (due to the fact that a part is impossible to download right now), + * the quorum entry should be considered unsuccessful. + * TODO Complex code, extract separately. + */ + if (entry.quorum) { - /** If a part is to be written with a quorum and the quorum is not reached yet, - * then (due to the fact that a part is impossible to download right now), - * the quorum entry should be considered unsuccessful. - * TODO Complex code, extract separately. + if (entry.type != LogEntry::GET_PART) + throw Exception("Logical error: log entry with quorum but type is not GET_PART", ErrorCodes::LOGICAL_ERROR); + + LOG_DEBUG(log, "No active replica has part " << entry.new_part_name << " which needs to be written with quorum." + " Will try to mark that quorum as failed."); + + /** Atomically: + * - if replicas do not become active; + * - if there is a `quorum` node with this part; + * - delete `quorum` node; + * - set `nonincrement_block_numbers` to resolve merges through the number of the lost part; + * - add a part to the list `quorum/failed_parts`; + * - if the part is not already removed from the list for deduplication `blocks/block_num`, then delete it; + * + * If something changes, then we will nothing - we'll get here again next time. */ - if (entry.quorum) - { - if (entry.type != LogEntry::GET_PART) - throw Exception("Logical error: log entry with quorum but type is not GET_PART", ErrorCodes::LOGICAL_ERROR); - LOG_DEBUG(log, "No active replica has part " << entry.new_part_name << " which needs to be written with quorum." - " Will try to mark that quorum as failed."); + /** We collect the `host` node versions from the replicas. + * When the replica becomes active, it changes the value of host in the same transaction (with the creation of `is_active`). + * This will ensure that the replicas do not become active. + */ - /** Atomically: - * - if replicas do not become active; - * - if there is a `quorum` node with this part; - * - delete `quorum` node; - * - set `nonincrement_block_numbers` to resolve merges through the number of the lost part; - * - add a part to the list `quorum/failed_parts`; - * - if the part is not already removed from the list for deduplication `blocks/block_num`, then delete it; - * - * If something changes, then we will nothing - we'll get here again next time. - */ + auto zookeeper = getZooKeeper(); - /** We collect the `host` node versions from the replicas. - * When the replica becomes active, it changes the value of host in the same transaction (with the creation of `is_active`). - * This will ensure that the replicas do not become active. - */ + Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas"); - auto zookeeper = getZooKeeper(); + zkutil::Ops ops; - Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas"); + for (size_t i = 0, size = replicas.size(); i < size; ++i) + { + Stat stat; + String path = zookeeper_path + "/replicas/" + replicas[i] + "/host"; + zookeeper->get(path, &stat); + ops.emplace_back(std::make_unique(path, stat.version)); + } + + /// We verify that while we were collecting versions, the replica with the necessary part did not come alive. + replica = findReplicaHavingPart(entry.new_part_name, true); - zkutil::Ops ops; + /// Also during this time a completely new replica could be created. + /// But if a part does not appear on the old, then it can not be on the new one either. - for (size_t i = 0, size = replicas.size(); i < size; ++i) + if (replica.empty()) + { + Stat quorum_stat; + String quorum_path = zookeeper_path + "/quorum/status"; + String quorum_str = zookeeper->get(quorum_path, &quorum_stat); + ReplicatedMergeTreeQuorumEntry quorum_entry; + quorum_entry.fromString(quorum_str); + + if (quorum_entry.part_name == entry.new_part_name) { - Stat stat; - String path = zookeeper_path + "/replicas/" + replicas[i] + "/host"; - zookeeper->get(path, &stat); - ops.emplace_back(std::make_unique(path, stat.version)); - } + ops.emplace_back(std::make_unique(quorum_path, quorum_stat.version)); - /// We verify that while we were collecting versions, the replica with the necessary part did not come alive. - replica = findReplicaHavingPart(entry.new_part_name, true); + auto part_info = MergeTreePartInfo::fromPartName(entry.new_part_name, data.format_version); - /// Also during this time a completely new replica could be created. - /// But if a part does not appear on the old, then it can not be on the new one either. + if (part_info.min_block != part_info.max_block) + throw Exception("Logical error: log entry with quorum for part covering more than one block number", + ErrorCodes::LOGICAL_ERROR); - if (replica.empty()) - { - Stat quorum_stat; - String quorum_path = zookeeper_path + "/quorum/status"; - String quorum_str = zookeeper->get(quorum_path, &quorum_stat); - ReplicatedMergeTreeQuorumEntry quorum_entry; - quorum_entry.fromString(quorum_str); + zookeeper->createIfNotExists(zookeeper_path + "/nonincrement_block_numbers/" + part_info.partition_id, ""); - if (quorum_entry.part_name == entry.new_part_name) + auto acl = zookeeper->getDefaultACL(); + + ops.emplace_back(std::make_unique( + zookeeper_path + "/nonincrement_block_numbers/" + part_info.partition_id + "/block-" + padIndex(part_info.min_block), + "", + acl, + zkutil::CreateMode::Persistent)); + + ops.emplace_back(std::make_unique( + zookeeper_path + "/quorum/failed_parts/" + entry.new_part_name, + "", + acl, + zkutil::CreateMode::Persistent)); + + /// Deleting from `blocks`. + if (!entry.block_id.empty() && zookeeper->exists(zookeeper_path + "/blocks/" + entry.block_id)) + ops.emplace_back(std::make_unique(zookeeper_path + "/blocks/" + entry.block_id, -1)); + + auto code = zookeeper->tryMulti(ops); + + if (code == ZOK) { - ops.emplace_back(std::make_unique(quorum_path, quorum_stat.version)); - - auto part_info = MergeTreePartInfo::fromPartName(entry.new_part_name, data.format_version); - - if (part_info.min_block != part_info.max_block) - throw Exception("Logical error: log entry with quorum for part covering more than one block number", - ErrorCodes::LOGICAL_ERROR); - - zookeeper->createIfNotExists(zookeeper_path + "/nonincrement_block_numbers/" + part_info.partition_id, ""); - - auto acl = zookeeper->getDefaultACL(); - - ops.emplace_back(std::make_unique( - zookeeper_path + "/nonincrement_block_numbers/" + part_info.partition_id + "/block-" + padIndex(part_info.min_block), - "", - acl, - zkutil::CreateMode::Persistent)); - - ops.emplace_back(std::make_unique( - zookeeper_path + "/quorum/failed_parts/" + entry.new_part_name, - "", - acl, - zkutil::CreateMode::Persistent)); - - /// Deleting from `blocks`. - if (!entry.block_id.empty() && zookeeper->exists(zookeeper_path + "/blocks/" + entry.block_id)) - ops.emplace_back(std::make_unique(zookeeper_path + "/blocks/" + entry.block_id, -1)); - - auto code = zookeeper->tryMulti(ops); - - if (code == ZOK) - { - LOG_DEBUG(log, "Marked quorum for part " << entry.new_part_name << " as failed."); - return true; /// NOTE Deletion from `virtual_parts` is not done, but it is only necessary for merges. - } - else if (code == ZBADVERSION || code == ZNONODE || code == ZNODEEXISTS) - { - LOG_DEBUG(log, "State was changed or isn't expected when trying to mark quorum for part " - << entry.new_part_name << " as failed. Code: " << zerror(code)); - } - else - throw zkutil::KeeperException(code); + LOG_DEBUG(log, "Marked quorum for part " << entry.new_part_name << " as failed."); + return true; /// NOTE Deletion from `virtual_parts` is not done, but it is only necessary for merges. } - else + else if (code == ZBADVERSION || code == ZNONODE || code == ZNODEEXISTS) { - LOG_WARNING(log, "No active replica has part " << entry.new_part_name - << ", but that part needs quorum and /quorum/status contains entry about another part " << quorum_entry.part_name - << ". It means that part was successfully written to " << entry.quorum - << " replicas, but then all of them goes offline." - << " Or it is a bug."); + LOG_DEBUG(log, "State was changed or isn't expected when trying to mark quorum for part " + << entry.new_part_name << " as failed. Code: " << zerror(code)); } + else + throw zkutil::KeeperException(code); + } + else + { + LOG_WARNING(log, "No active replica has part " << entry.new_part_name + << ", but that part needs quorum and /quorum/status contains entry about another part " << quorum_entry.part_name + << ". It means that part was successfully written to " << entry.quorum + << " replicas, but then all of them goes offline." + << " Or it is a bug."); } - } - - if (replica.empty()) - { - ProfileEvents::increment(ProfileEvents::ReplicatedPartFailedFetches); - throw Exception("No active replica has part " + entry.new_part_name + " or covering part", ErrorCodes::NO_REPLICA_HAS_PART); } } - try - { - if (!fetchPart(entry.actual_new_part_name, zookeeper_path + "/replicas/" + replica, false, entry.quorum)) - return false; - } - catch (Exception & e) + if (replica.empty()) { - /// No stacktrace, just log message - if (e.code() == ErrorCodes::RECEIVED_ERROR_TOO_MANY_REQUESTS) - e.addMessage("Too busy replica. Will try later."); - throw; + ProfileEvents::increment(ProfileEvents::ReplicatedPartFailedFetches); + throw Exception("No active replica has part " + entry.new_part_name + " or covering part", ErrorCodes::NO_REPLICA_HAS_PART); } + } - if (entry.type == LogEntry::MERGE_PARTS) - ProfileEvents::increment(ProfileEvents::ReplicatedPartFetchesOfMerged); + try + { + if (!fetchPart(entry.actual_new_part_name, zookeeper_path + "/replicas/" + replica, false, entry.quorum)) + return false; } - catch (...) + catch (Exception & e) { - /** If you can not download the part you need for some merge, it's better not to try to get other parts for this merge, - * but try to get already merged part. To do this, move the action to get the remaining parts - * for this merge at the end of the queue. - */ - try - { - auto parts_for_merge = queue.moveSiblingPartsForMergeToEndOfQueue(entry.new_part_name); + /// No stacktrace, just log message + if (e.code() == ErrorCodes::RECEIVED_ERROR_TOO_MANY_REQUESTS) + e.addMessage("Too busy replica. Will try later."); + throw; + } - if (!parts_for_merge.empty() && replica.empty()) - { - LOG_INFO(log, "No active replica has part " << entry.new_part_name << ". Will fetch merged part instead."); - return false; - } + if (entry.type == LogEntry::MERGE_PARTS) + ProfileEvents::increment(ProfileEvents::ReplicatedPartFetchesOfMerged); + } + catch (...) + { + /** If you can not download the part you need for some merge, it's better not to try to get other parts for this merge, + * but try to get already merged part. To do this, move the action to get the remaining parts + * for this merge at the end of the queue. + */ + try + { + auto parts_for_merge = queue.moveSiblingPartsForMergeToEndOfQueue(entry.new_part_name); - /** If no active replica has a part, and there is no merge in the queue with its participation, - * check to see if any (active or inactive) replica has such a part or covering it. - */ - if (replica.empty()) - enqueuePartForCheck(entry.new_part_name); - } - catch (...) + if (!parts_for_merge.empty() && replica.empty()) { - tryLogCurrentException(__PRETTY_FUNCTION__); + LOG_INFO(log, "No active replica has part " << entry.new_part_name << ". Will fetch merged part instead."); + return false; } - throw; + /** If no active replica has a part, and there is no merge in the queue with its participation, + * check to see if any (active or inactive) replica has such a part or covering it. + */ + if (replica.empty()) + enqueuePartForCheck(entry.new_part_name); } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + + throw; } return true; @@ -2148,82 +2191,105 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin table_lock = lockStructure(true, __PRETTY_FUNCTION__); ReplicatedMergeTreeAddress address(getZooKeeper()->get(replica_path + "/host")); + auto timeouts = ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef()); + /// Logging Stopwatch stopwatch; + MergeTreeData::MutableDataPartPtr part; + MergeTreeData::DataPartsVector replaced_parts; - auto timeouts = ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef()); - MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart( - part_name, replica_path, address.host, address.replication_port, timeouts, to_detached); - - if (!to_detached) + auto write_part_log = [&] (const ExecutionStatus & execution_status) { - zkutil::Ops ops; + try + { + auto part_log = context.getPartLog(database_name, table_name); + if (!part_log) + return; - /** NOTE - * Here, an error occurs if ALTER occurred with a change in the column type or column deletion, - * and the part on remote server has not yet been modified. - * After a while, one of the following attempts to make `fetchPart` succeed. - */ - checkPartAndAddToZooKeeper(part, ops, part_name); + PartLogElement part_log_elem; - MergeTreeData::Transaction transaction; - auto removed_parts = data.renameTempPartAndReplace(part, nullptr, &transaction); + part_log_elem.event_time = time(nullptr); + part_log_elem.event_type = PartLogElement::DOWNLOAD_PART; + /// TODO: Stop stopwatch in outer code to exclude ZK timings and so on + part_log_elem.duration_ms = stopwatch.elapsed() / 10000000; - /// Do not commit if the part is obsolete - if (!transaction.isEmpty()) + part_log_elem.database_name = database_name; + part_log_elem.table_name = table_name; + part_log_elem.part_name = part_name; + + if (part) + { + part_log_elem.bytes_compressed_on_disk = part->size_in_bytes; + part_log_elem.rows = part->rows_count; /// Could be approximate (?) + } + + part_log_elem.source_part_names.reserve(replaced_parts.size()); + for (const auto & replaced_part : replaced_parts) + part_log_elem.source_part_names.push_back(replaced_part->name); + + part_log_elem.error = static_cast(execution_status.code); + part_log_elem.exception = execution_status.message; + + part_log->add(part_log_elem); + } + catch (...) { - getZooKeeper()->multi(ops); - transaction.commit(); + tryLogCurrentException(log, __PRETTY_FUNCTION__); } + }; + + try + { + part = fetcher.fetchPart(part_name, replica_path, address.host, address.replication_port, timeouts, to_detached); - if (auto part_log = context.getPartLog(database_name, table_name)) + if (!to_detached) { - PartLogElement elem; - elem.event_time = time(nullptr); - elem.event_type = PartLogElement::DOWNLOAD_PART; - elem.size_in_bytes = part->size_in_bytes; - elem.duration_ms = stopwatch.elapsed() / 10000000; + zkutil::Ops ops; + + /** NOTE + * Here, an error occurs if ALTER occurred with a change in the column type or column deletion, + * and the part on remote server has not yet been modified. + * After a while, one of the following attempts to make `fetchPart` succeed. + */ + checkPartAndAddToZooKeeper(part, ops, part_name); - elem.merged_from.reserve(removed_parts.size()); - for (const auto & part : removed_parts) + MergeTreeData::Transaction transaction; + replaced_parts = data.renameTempPartAndReplace(part, nullptr, &transaction); + + /// Do not commit if the part is obsolete + if (!transaction.isEmpty()) { - elem.merged_from.push_back(part->name); + getZooKeeper()->multi(ops); + transaction.commit(); } - elem.database_name = part->storage.getDatabaseName(); - elem.table_name = part->storage.getTableName(); - elem.part_name = part->name; + /** If a quorum is tracked for this part, you must update it. + * If you do not have time, in case of losing the session, when you restart the server - see the `ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart` method. + */ + if (quorum) + updateQuorum(part_name); - part_log->add(elem); + merge_selecting_event.set(); - elem.duration_ms = 0; - elem.event_type = PartLogElement::REMOVE_PART; - elem.merged_from = Strings(); - for (const auto & part : removed_parts) + for (const auto & replaced_part : replaced_parts) { - elem.part_name = part->name; - elem.size_in_bytes = part->size_in_bytes; - part_log->add(elem); + LOG_DEBUG(log, "Part " << replaced_part->name << " is rendered obsolete by fetching part " << part_name); + ProfileEvents::increment(ProfileEvents::ObsoleteReplicatedParts); } - } - - /** If a quorum is tracked for this part, you must update it. - * If you do not have time, in case of losing the session, when you restart the server - see the `ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart` method. - */ - if (quorum) - updateQuorum(part_name); - - merge_selecting_event.set(); - for (const auto & removed_part : removed_parts) + write_part_log({}); + } + else { - LOG_DEBUG(log, "Part " << removed_part->name << " is rendered obsolete by fetching part " << part_name); - ProfileEvents::increment(ProfileEvents::ObsoleteReplicatedParts); + part->renameTo("detached/" + part_name); } } - else + catch (...) { - part->renameTo("detached/" + part_name); + if (!to_detached) + write_part_log(ExecutionStatus::fromCurrentException()); + + throw; } ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index b9aabba977..7d7c39a451 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -342,6 +342,11 @@ private: void executeDropRange(const LogEntry & entry); + /// Do the merge or recommend to make the fetch instead of the merge + void tryExecuteMerge(const LogEntry & entry, bool & do_fetch); + + bool executeFetch(const LogEntry & entry); + void executeClearColumnInPartition(const LogEntry & entry); /** Updates the queue. -- GitLab