diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index a629cda19b95865b9e3850f1cdca79ac5b1592ef..67c6c24c531dcd1179141a40a80c772bd5d61164 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -364,6 +364,7 @@ namespace ErrorCodes extern const int MULTIPLE_STREAMS_REQUIRED = 385; extern const int NO_COMMON_TYPE = 386; extern const int EXTERNAL_LOADABLE_ALREADY_EXISTS = 387; + extern const int CANNOT_ASSIGN_OPTIMIZE = 388; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index f481fc5c30349003b3e2b81eaa253037f3203713..0167c2a3c517cbb12c63ae760c2e89f48e1541dd 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -305,6 +305,7 @@ struct Settings M(SettingSeconds, http_connection_timeout, DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, "HTTP connection timeout.") \ M(SettingSeconds, http_send_timeout, DEFAULT_HTTP_READ_BUFFER_TIMEOUT, "HTTP send timeout") \ M(SettingSeconds, http_receive_timeout, DEFAULT_HTTP_READ_BUFFER_TIMEOUT, "HTTP receive timeout") \ + M(SettingBool, optimize_throw_if_noop, false, "If setting is enabled and OPTIMIZE query didn't actually assign a merge then an explanatory exception is thrown") /// Possible limits for query execution. diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index 004b104dc0cb76e47cf0bf18b70d5b1cd39217a4..00e8251b30af3d344ad2636363bcf200666665d0 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -150,12 +150,16 @@ bool MergeTreeDataMerger::selectPartsToMerge( FuturePart & future_part, bool aggressive, size_t max_total_size_to_merge, - const AllowedMergingPredicate & can_merge_callback) + const AllowedMergingPredicate & can_merge_callback, + String * out_disable_reason) { MergeTreeData::DataPartsVector data_parts = data.getDataPartsVector(); if (data_parts.empty()) + { + if (out_disable_reason) *out_disable_reason = "There are no parts in the table"; return false; + } time_t current_time = time(nullptr); @@ -166,7 +170,7 @@ bool MergeTreeDataMerger::selectPartsToMerge( for (const MergeTreeData::DataPartPtr & part : data_parts) { const String & partition_id = part->info.partition_id; - if (!prev_partition_id || partition_id != *prev_partition_id || (prev_part && !can_merge_callback(*prev_part, part))) + if (!prev_partition_id || partition_id != *prev_partition_id || (prev_part && !can_merge_callback(*prev_part, part, nullptr))) { if (partitions.empty() || !partitions.back().empty()) partitions.emplace_back(); @@ -205,7 +209,10 @@ bool MergeTreeDataMerger::selectPartsToMerge( max_total_size_to_merge); if (parts_to_merge.empty()) + { + if (out_disable_reason) *out_disable_reason = "There are no need to merge parts according to merge selector algorithm"; return false; + } if (parts_to_merge.size() == 1) throw Exception("Logical error: merge selector returned only one part to merge", ErrorCodes::LOGICAL_ERROR); @@ -229,7 +236,8 @@ bool MergeTreeDataMerger::selectAllPartsToMergeWithinPartition( size_t available_disk_space, const AllowedMergingPredicate & can_merge, const String & partition_id, - bool final) + bool final, + String * out_disable_reason) { MergeTreeData::DataPartsVector parts = selectAllPartsFromPartition(partition_id); @@ -237,7 +245,10 @@ bool MergeTreeDataMerger::selectAllPartsToMergeWithinPartition( return false; if (!final && parts.size() == 1) + { + if (out_disable_reason) *out_disable_reason = "There is only one part inside partition"; return false; + } auto it = parts.begin(); auto prev_it = it; @@ -246,7 +257,7 @@ bool MergeTreeDataMerger::selectAllPartsToMergeWithinPartition( while (it != parts.end()) { /// For the case of one part, we check that it can be merged "with itself". - if ((it != parts.begin() || parts.size() == 1) && !can_merge(*prev_it, *it)) + if ((it != parts.begin() || parts.size() == 1) && !can_merge(*prev_it, *it, out_disable_reason)) { return false; } @@ -258,7 +269,8 @@ bool MergeTreeDataMerger::selectAllPartsToMergeWithinPartition( } /// Enough disk space to cover the new merge with a margin. - if (available_disk_space <= sum_bytes * DISK_USAGE_COEFFICIENT_TO_SELECT) + auto required_disk_space = sum_bytes * DISK_USAGE_COEFFICIENT_TO_SELECT; + if (available_disk_space <= required_disk_space) { time_t now = time(nullptr); if (now - disk_space_warning_time > 3600) @@ -273,6 +285,10 @@ bool MergeTreeDataMerger::selectAllPartsToMergeWithinPartition( << " required now (+" << static_cast((DISK_USAGE_COEFFICIENT_TO_SELECT - 1.0) * 100) << "% on overhead); suppressing similar warnings for the next hour"); } + + if (out_disable_reason) *out_disable_reason = "Insufficient available disk space, required " + + formatReadableSizeWithDecimalSuffix(required_disk_space); + return false; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.h b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.h index e2ad5dc23a4e27669c43c5fd8e6588163b6723a8..25269687d3fa276091aa97d6e6011680ad5e67a6 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.h @@ -20,7 +20,7 @@ class MergeTreeDataMerger { public: using CancellationHook = std::function; - using AllowedMergingPredicate = std::function; + using AllowedMergingPredicate = std::function; struct FuturePart { @@ -65,7 +65,8 @@ public: FuturePart & future_part, bool aggressive, size_t max_total_size_to_merge, - const AllowedMergingPredicate & can_merge); + const AllowedMergingPredicate & can_merge, + String * out_disable_reason = nullptr); /** Select all the parts in the specified partition for merge, if possible. * final - choose to merge even a single part - that is, allow to merge one part "with itself". @@ -75,7 +76,8 @@ public: size_t available_disk_space, const AllowedMergingPredicate & can_merge, const String & partition_id, - bool final); + bool final, + String * out_disable_reason = nullptr); /** Merge the parts. * If `reservation != nullptr`, now and then reduces the size of the reserved space diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 18a3faa7b609a29507a524a9bd701448c51ec763..773f9d7ea02e1edcdc7b9e77783fd055c3f5a7fe 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -27,6 +27,7 @@ namespace ErrorCodes extern const int ABORTED; extern const int BAD_ARGUMENTS; extern const int INCORRECT_DATA; + extern const int CANNOT_ASSIGN_OPTIMIZE; } @@ -286,7 +287,8 @@ bool StorageMergeTree::merge( bool aggressive, const String & partition_id, bool final, - bool deduplicate) + bool deduplicate, + String * out_disable_reason) { /// Clear old parts. It does not matter to do it more frequently than each second. if (auto lock = time_after_previous_cleanup.lockTestAndRestartAfter(1)) @@ -307,7 +309,7 @@ bool StorageMergeTree::merge( { std::lock_guard lock(currently_merging_mutex); - auto can_merge = [this] (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right) + auto can_merge = [this] (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, String *) { return !currently_merging.count(left) && !currently_merging.count(right); }; @@ -318,11 +320,11 @@ bool StorageMergeTree::merge( { size_t max_parts_size_for_merge = merger.getMaxPartsSizeForMerge(); if (max_parts_size_for_merge > 0) - selected = merger.selectPartsToMerge(future_part, aggressive, max_parts_size_for_merge, can_merge); + selected = merger.selectPartsToMerge(future_part, aggressive, max_parts_size_for_merge, can_merge, out_disable_reason); } else { - selected = merger.selectAllPartsToMergeWithinPartition(future_part, disk_space, can_merge, partition_id, final); + selected = merger.selectAllPartsToMergeWithinPartition(future_part, disk_space, can_merge, partition_id, final, out_disable_reason); } if (!selected) @@ -454,7 +456,16 @@ bool StorageMergeTree::optimize( String partition_id; if (partition) partition_id = data.getPartitionIDFromQuery(partition, context); - return merge(context.getSettingsRef().min_bytes_to_use_direct_io, true, partition_id, final, deduplicate); + + String disable_reason; + if (!merge(context.getSettingsRef().min_bytes_to_use_direct_io, true, partition_id, final, deduplicate, &disable_reason)) + { + if (context.getSettingsRef().optimize_throw_if_noop) + throw Exception(disable_reason.empty() ? "Can't OPTIMIZE by some reason" : disable_reason, ErrorCodes::CANNOT_ASSIGN_OPTIMIZE); + return false; + } + + return true; } diff --git a/dbms/src/Storages/StorageMergeTree.h b/dbms/src/Storages/StorageMergeTree.h index ed8172781a89800c10b963fd34409eaf0c1768ee..d6654c22bec9cf648b6724bb20178174b89d1cc5 100644 --- a/dbms/src/Storages/StorageMergeTree.h +++ b/dbms/src/Storages/StorageMergeTree.h @@ -114,7 +114,8 @@ private: * If aggressive - when selects parts don't takes into account their ratio size and novelty (used for OPTIMIZE query). * Returns true if merge is finished successfully. */ - bool merge(size_t aio_threshold, bool aggressive, const String & partition_id, bool final, bool deduplicate); + bool merge(size_t aio_threshold, bool aggressive, const String & partition_id, bool final, bool deduplicate, + String * out_disable_reason = nullptr); bool mergeTask(); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 6ae8e9cf340a3d1861d286efbeba61719ad8d9c7..e0b22df8854f8956a8d2e15c0aad81af15ebd280 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -95,6 +95,7 @@ namespace ErrorCodes extern const int TOO_MUCH_FETCHES; extern const int BAD_DATA_PART_NAME; extern const int PART_IS_TEMPORARILY_LOCKED; + extern const int CANNOT_ASSIGN_OPTIMIZE; } @@ -1582,7 +1583,7 @@ namespace bool canMergePartsAccordingToZooKeeperInfo( const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, - zkutil::ZooKeeperPtr && zookeeper, const String & zookeeper_path, const MergeTreeData & data) + zkutil::ZooKeeperPtr && zookeeper, const String & zookeeper_path, const MergeTreeData & data, String * out_reason = nullptr) { const String & partition_id = left->info.partition_id; @@ -1600,7 +1601,10 @@ namespace throw Exception("Logical error: part written with quorum covers more than one block numbers", ErrorCodes::LOGICAL_ERROR); if (left->info.max_block <= part_info.min_block && right->info.min_block >= part_info.max_block) + { + if (out_reason) *out_reason = "Quorum status condition is unsatisfied"; return false; + } } /// Won't merge last_part even if quorum is satisfied, because we gonna check if replica has this part @@ -1614,7 +1618,10 @@ namespace throw Exception("Logical error: part written with quorum covers more than one block numbers", ErrorCodes::LOGICAL_ERROR); if (left->info.max_block <= part_info.min_block && right->info.min_block >= part_info.max_block) + { + if (out_reason) *out_reason = "Quorum 'last part' condition is unsatisfied"; return false; + } } /// You can merge the parts, if all the numbers between them are abandoned - do not correspond to any blocks. @@ -1625,13 +1632,39 @@ namespace if (AbandonableLockInZooKeeper::check(path1, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED && AbandonableLockInZooKeeper::check(path2, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED) + { + if (out_reason) + *out_reason = "Block " + toString(number) + " in gap between merging parts " + left->name + " and " + + right->name + " is not abandoned"; return false; + } } return true; } + /// If any of the parts is already going to be merge into a larger one, do not agree to merge it. + bool partsWillNotBeMergedOrDisabled(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, + ReplicatedMergeTreeQueue & queue, String * out_reason = nullptr) + { + auto set_reason = [&out_reason] (const String & part_name) + { + if (out_reason) + *out_reason = "Part " + part_name + " cannot be merged yet, a merge has already assigned for it or it is temporarily disabled"; + return false; + }; + + if (queue.partWillBeMergedOrMergesDisabled(left->name)) + return set_reason(left->name); + + if (left.get() != right.get() && queue.partWillBeMergedOrMergesDisabled(right->name)) + return set_reason(right->name); + + return true; + } + + /** It can take a long time to determine whether it is possible to merge two adjacent parts. * Two adjacent parts can be merged if all block numbers between their numbers are not used (abandoned). * This means that another part can not be inserted between these parts. @@ -1721,7 +1754,6 @@ void StorageReplicatedMergeTree::mergeSelectingThread() LOG_DEBUG(log, "Merge selecting thread started"); bool deduplicate = false; /// TODO: read deduplicate option from table config - bool need_pull = true; auto uncached_merging_predicate = [this](const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right) { @@ -1738,15 +1770,10 @@ void StorageReplicatedMergeTree::mergeSelectingThread() /// Will be updated below. std::chrono::steady_clock::time_point now; - auto can_merge = [&] - (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right) + auto can_merge = [&] (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, String *) { - /// If any of the parts is already going to be merge into a larger one, do not agree to merge it. - if (queue.partWillBeMergedOrMergesDisabled(left->name) - || (left.get() != right.get() && queue.partWillBeMergedOrMergesDisabled(right->name))) - return false; - - return cached_merging_predicate.get(now, uncached_merging_predicate, merging_predicate_args_to_key, left, right); + return partsWillNotBeMergedOrDisabled(left, right, queue) + && cached_merging_predicate.get(now, uncached_merging_predicate, merging_predicate_args_to_key, left, right); }; while (!shutdown_called && is_leader_node) @@ -1755,20 +1782,16 @@ void StorageReplicatedMergeTree::mergeSelectingThread() try { - if (need_pull) - { - /// You need to load new entries into the queue before you select parts to merge. - /// (so we know which parts are already going to be merged). - pullLogsToQueue(); - need_pull = false; - } - std::lock_guard merge_selecting_lock(merge_selecting_mutex); - /** If many merges is already queued, then will queue only small enough merges. - * Otherwise merge queue could be filled with only large merges, - * and in the same time, many small parts could be created and won't be merged. - */ + /// You need to load new entries into the queue before you select parts to merge. + /// (so we know which parts are already going to be merged). + /// We must select parts for merge under the mutex because other threads (OPTIMIZE queries) could push new merges. + pullLogsToQueue(); + + /// If many merges is already queued, then will queue only small enough merges. + /// Otherwise merge queue could be filled with only large merges, + /// and in the same time, many small parts could be created and won't be merged. size_t merges_queued = queue.countMerges(); if (merges_queued >= data.settings.max_replicated_merges_in_queue) @@ -1786,14 +1809,10 @@ void StorageReplicatedMergeTree::mergeSelectingThread() now = std::chrono::steady_clock::now(); if (max_parts_size_for_merge > 0 - && merger.selectPartsToMerge( - future_merged_part, false, - max_parts_size_for_merge, - can_merge) + && merger.selectPartsToMerge(future_merged_part, false, max_parts_size_for_merge, can_merge) && createLogEntryToMergeParts(future_merged_part.parts, future_merged_part.name, deduplicate)) { success = true; - need_pull = true; } } } @@ -2345,42 +2364,51 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p return true; } - auto can_merge = [this] - (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right) + auto can_merge = [this] (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, String * out_reason) { - return canMergePartsAccordingToZooKeeperInfo(left, right, getZooKeeper(), zookeeper_path, data); + return partsWillNotBeMergedOrDisabled(left, right, queue, out_reason) + && canMergePartsAccordingToZooKeeperInfo(left, right, getZooKeeper(), zookeeper_path, data, out_reason); }; - pullLogsToQueue(); - ReplicatedMergeTreeLogEntryData merge_entry; { std::lock_guard merge_selecting_lock(merge_selecting_mutex); + /// We must select parts for merge under the mutex because other threads (OPTIMIZE queries) could push new merges. + pullLogsToQueue(); + size_t disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path); MergeTreeDataMerger::FuturePart future_merged_part; + String disable_reason; bool selected = false; if (!partition) { selected = merger.selectPartsToMerge( - future_merged_part, true, data.settings.max_bytes_to_merge_at_max_space_in_pool, can_merge); + future_merged_part, true, data.settings.max_bytes_to_merge_at_max_space_in_pool, can_merge, &disable_reason); } else { String partition_id = data.getPartitionIDFromQuery(partition, context); - selected = merger.selectAllPartsToMergeWithinPartition(future_merged_part, disk_space, can_merge, partition_id, final); + selected = merger.selectAllPartsToMergeWithinPartition(future_merged_part, disk_space, can_merge, partition_id, final, &disable_reason); } - if (!selected) + auto handle_noop = [&] (const String & message) { - LOG_INFO(log, "Cannot select parts for optimization"); + if (context.getSettingsRef().optimize_throw_if_noop) + throw Exception(message, ErrorCodes::CANNOT_ASSIGN_OPTIMIZE); return false; + }; + + if (!selected) + { + LOG_INFO(log, "Cannot select parts for optimization" + (disable_reason.empty() ? "" : ": " + disable_reason)); + return handle_noop(disable_reason); } if (!createLogEntryToMergeParts(future_merged_part.parts, future_merged_part.name, deduplicate, &merge_entry)) - return false; + return handle_noop("Can't create merge queue node in ZooKeeper"); } waitForAllReplicasToProcessLogEntry(merge_entry);