提交 03bca214 编写于 作者: V Vitaliy Lyudvichenko

OPTIMIZE doesn't produce intersecting merges now, add optimize_throw_if_noop...

OPTIMIZE doesn't produce intersecting merges now, add optimize_throw_if_noop setting. [#CLICKHOUSE-3432] [#CLICKHOUSE-3420]

Resolves #1753
上级 7a865af3
......@@ -364,6 +364,7 @@ namespace ErrorCodes
extern const int MULTIPLE_STREAMS_REQUIRED = 385;
extern const int NO_COMMON_TYPE = 386;
extern const int EXTERNAL_LOADABLE_ALREADY_EXISTS = 387;
extern const int CANNOT_ASSIGN_OPTIMIZE = 388;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;
......
......@@ -305,6 +305,7 @@ struct Settings
M(SettingSeconds, http_connection_timeout, DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, "HTTP connection timeout.") \
M(SettingSeconds, http_send_timeout, DEFAULT_HTTP_READ_BUFFER_TIMEOUT, "HTTP send timeout") \
M(SettingSeconds, http_receive_timeout, DEFAULT_HTTP_READ_BUFFER_TIMEOUT, "HTTP receive timeout") \
M(SettingBool, optimize_throw_if_noop, false, "If setting is enabled and OPTIMIZE query didn't actually assign a merge then an explanatory exception is thrown")
/// Possible limits for query execution.
......
......@@ -150,12 +150,16 @@ bool MergeTreeDataMerger::selectPartsToMerge(
FuturePart & future_part,
bool aggressive,
size_t max_total_size_to_merge,
const AllowedMergingPredicate & can_merge_callback)
const AllowedMergingPredicate & can_merge_callback,
String * out_disable_reason)
{
MergeTreeData::DataPartsVector data_parts = data.getDataPartsVector();
if (data_parts.empty())
{
if (out_disable_reason) *out_disable_reason = "There are no parts in the table";
return false;
}
time_t current_time = time(nullptr);
......@@ -166,7 +170,7 @@ bool MergeTreeDataMerger::selectPartsToMerge(
for (const MergeTreeData::DataPartPtr & part : data_parts)
{
const String & partition_id = part->info.partition_id;
if (!prev_partition_id || partition_id != *prev_partition_id || (prev_part && !can_merge_callback(*prev_part, part)))
if (!prev_partition_id || partition_id != *prev_partition_id || (prev_part && !can_merge_callback(*prev_part, part, nullptr)))
{
if (partitions.empty() || !partitions.back().empty())
partitions.emplace_back();
......@@ -205,7 +209,10 @@ bool MergeTreeDataMerger::selectPartsToMerge(
max_total_size_to_merge);
if (parts_to_merge.empty())
{
if (out_disable_reason) *out_disable_reason = "There are no need to merge parts according to merge selector algorithm";
return false;
}
if (parts_to_merge.size() == 1)
throw Exception("Logical error: merge selector returned only one part to merge", ErrorCodes::LOGICAL_ERROR);
......@@ -229,7 +236,8 @@ bool MergeTreeDataMerger::selectAllPartsToMergeWithinPartition(
size_t available_disk_space,
const AllowedMergingPredicate & can_merge,
const String & partition_id,
bool final)
bool final,
String * out_disable_reason)
{
MergeTreeData::DataPartsVector parts = selectAllPartsFromPartition(partition_id);
......@@ -237,7 +245,10 @@ bool MergeTreeDataMerger::selectAllPartsToMergeWithinPartition(
return false;
if (!final && parts.size() == 1)
{
if (out_disable_reason) *out_disable_reason = "There is only one part inside partition";
return false;
}
auto it = parts.begin();
auto prev_it = it;
......@@ -246,7 +257,7 @@ bool MergeTreeDataMerger::selectAllPartsToMergeWithinPartition(
while (it != parts.end())
{
/// For the case of one part, we check that it can be merged "with itself".
if ((it != parts.begin() || parts.size() == 1) && !can_merge(*prev_it, *it))
if ((it != parts.begin() || parts.size() == 1) && !can_merge(*prev_it, *it, out_disable_reason))
{
return false;
}
......@@ -258,7 +269,8 @@ bool MergeTreeDataMerger::selectAllPartsToMergeWithinPartition(
}
/// Enough disk space to cover the new merge with a margin.
if (available_disk_space <= sum_bytes * DISK_USAGE_COEFFICIENT_TO_SELECT)
auto required_disk_space = sum_bytes * DISK_USAGE_COEFFICIENT_TO_SELECT;
if (available_disk_space <= required_disk_space)
{
time_t now = time(nullptr);
if (now - disk_space_warning_time > 3600)
......@@ -273,6 +285,10 @@ bool MergeTreeDataMerger::selectAllPartsToMergeWithinPartition(
<< " required now (+" << static_cast<int>((DISK_USAGE_COEFFICIENT_TO_SELECT - 1.0) * 100)
<< "% on overhead); suppressing similar warnings for the next hour");
}
if (out_disable_reason) *out_disable_reason = "Insufficient available disk space, required " +
formatReadableSizeWithDecimalSuffix(required_disk_space);
return false;
}
......
......@@ -20,7 +20,7 @@ class MergeTreeDataMerger
{
public:
using CancellationHook = std::function<void()>;
using AllowedMergingPredicate = std::function<bool (const MergeTreeData::DataPartPtr &, const MergeTreeData::DataPartPtr &)>;
using AllowedMergingPredicate = std::function<bool (const MergeTreeData::DataPartPtr &, const MergeTreeData::DataPartPtr &, String * reason)>;
struct FuturePart
{
......@@ -65,7 +65,8 @@ public:
FuturePart & future_part,
bool aggressive,
size_t max_total_size_to_merge,
const AllowedMergingPredicate & can_merge);
const AllowedMergingPredicate & can_merge,
String * out_disable_reason = nullptr);
/** Select all the parts in the specified partition for merge, if possible.
* final - choose to merge even a single part - that is, allow to merge one part "with itself".
......@@ -75,7 +76,8 @@ public:
size_t available_disk_space,
const AllowedMergingPredicate & can_merge,
const String & partition_id,
bool final);
bool final,
String * out_disable_reason = nullptr);
/** Merge the parts.
* If `reservation != nullptr`, now and then reduces the size of the reserved space
......
......@@ -27,6 +27,7 @@ namespace ErrorCodes
extern const int ABORTED;
extern const int BAD_ARGUMENTS;
extern const int INCORRECT_DATA;
extern const int CANNOT_ASSIGN_OPTIMIZE;
}
......@@ -286,7 +287,8 @@ bool StorageMergeTree::merge(
bool aggressive,
const String & partition_id,
bool final,
bool deduplicate)
bool deduplicate,
String * out_disable_reason)
{
/// Clear old parts. It does not matter to do it more frequently than each second.
if (auto lock = time_after_previous_cleanup.lockTestAndRestartAfter(1))
......@@ -307,7 +309,7 @@ bool StorageMergeTree::merge(
{
std::lock_guard<std::mutex> lock(currently_merging_mutex);
auto can_merge = [this] (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
auto can_merge = [this] (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, String *)
{
return !currently_merging.count(left) && !currently_merging.count(right);
};
......@@ -318,11 +320,11 @@ bool StorageMergeTree::merge(
{
size_t max_parts_size_for_merge = merger.getMaxPartsSizeForMerge();
if (max_parts_size_for_merge > 0)
selected = merger.selectPartsToMerge(future_part, aggressive, max_parts_size_for_merge, can_merge);
selected = merger.selectPartsToMerge(future_part, aggressive, max_parts_size_for_merge, can_merge, out_disable_reason);
}
else
{
selected = merger.selectAllPartsToMergeWithinPartition(future_part, disk_space, can_merge, partition_id, final);
selected = merger.selectAllPartsToMergeWithinPartition(future_part, disk_space, can_merge, partition_id, final, out_disable_reason);
}
if (!selected)
......@@ -454,7 +456,16 @@ bool StorageMergeTree::optimize(
String partition_id;
if (partition)
partition_id = data.getPartitionIDFromQuery(partition, context);
return merge(context.getSettingsRef().min_bytes_to_use_direct_io, true, partition_id, final, deduplicate);
String disable_reason;
if (!merge(context.getSettingsRef().min_bytes_to_use_direct_io, true, partition_id, final, deduplicate, &disable_reason))
{
if (context.getSettingsRef().optimize_throw_if_noop)
throw Exception(disable_reason.empty() ? "Can't OPTIMIZE by some reason" : disable_reason, ErrorCodes::CANNOT_ASSIGN_OPTIMIZE);
return false;
}
return true;
}
......
......@@ -114,7 +114,8 @@ private:
* If aggressive - when selects parts don't takes into account their ratio size and novelty (used for OPTIMIZE query).
* Returns true if merge is finished successfully.
*/
bool merge(size_t aio_threshold, bool aggressive, const String & partition_id, bool final, bool deduplicate);
bool merge(size_t aio_threshold, bool aggressive, const String & partition_id, bool final, bool deduplicate,
String * out_disable_reason = nullptr);
bool mergeTask();
......
......@@ -95,6 +95,7 @@ namespace ErrorCodes
extern const int TOO_MUCH_FETCHES;
extern const int BAD_DATA_PART_NAME;
extern const int PART_IS_TEMPORARILY_LOCKED;
extern const int CANNOT_ASSIGN_OPTIMIZE;
}
......@@ -1582,7 +1583,7 @@ namespace
bool canMergePartsAccordingToZooKeeperInfo(
const MergeTreeData::DataPartPtr & left,
const MergeTreeData::DataPartPtr & right,
zkutil::ZooKeeperPtr && zookeeper, const String & zookeeper_path, const MergeTreeData & data)
zkutil::ZooKeeperPtr && zookeeper, const String & zookeeper_path, const MergeTreeData & data, String * out_reason = nullptr)
{
const String & partition_id = left->info.partition_id;
......@@ -1600,7 +1601,10 @@ namespace
throw Exception("Logical error: part written with quorum covers more than one block numbers", ErrorCodes::LOGICAL_ERROR);
if (left->info.max_block <= part_info.min_block && right->info.min_block >= part_info.max_block)
{
if (out_reason) *out_reason = "Quorum status condition is unsatisfied";
return false;
}
}
/// Won't merge last_part even if quorum is satisfied, because we gonna check if replica has this part
......@@ -1614,7 +1618,10 @@ namespace
throw Exception("Logical error: part written with quorum covers more than one block numbers", ErrorCodes::LOGICAL_ERROR);
if (left->info.max_block <= part_info.min_block && right->info.min_block >= part_info.max_block)
{
if (out_reason) *out_reason = "Quorum 'last part' condition is unsatisfied";
return false;
}
}
/// You can merge the parts, if all the numbers between them are abandoned - do not correspond to any blocks.
......@@ -1625,13 +1632,39 @@ namespace
if (AbandonableLockInZooKeeper::check(path1, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED &&
AbandonableLockInZooKeeper::check(path2, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED)
{
if (out_reason)
*out_reason = "Block " + toString(number) + " in gap between merging parts " + left->name + " and "
+ right->name + " is not abandoned";
return false;
}
}
return true;
}
/// If any of the parts is already going to be merge into a larger one, do not agree to merge it.
bool partsWillNotBeMergedOrDisabled(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right,
ReplicatedMergeTreeQueue & queue, String * out_reason = nullptr)
{
auto set_reason = [&out_reason] (const String & part_name)
{
if (out_reason)
*out_reason = "Part " + part_name + " cannot be merged yet, a merge has already assigned for it or it is temporarily disabled";
return false;
};
if (queue.partWillBeMergedOrMergesDisabled(left->name))
return set_reason(left->name);
if (left.get() != right.get() && queue.partWillBeMergedOrMergesDisabled(right->name))
return set_reason(right->name);
return true;
}
/** It can take a long time to determine whether it is possible to merge two adjacent parts.
* Two adjacent parts can be merged if all block numbers between their numbers are not used (abandoned).
* This means that another part can not be inserted between these parts.
......@@ -1721,7 +1754,6 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
LOG_DEBUG(log, "Merge selecting thread started");
bool deduplicate = false; /// TODO: read deduplicate option from table config
bool need_pull = true;
auto uncached_merging_predicate = [this](const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
{
......@@ -1738,15 +1770,10 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
/// Will be updated below.
std::chrono::steady_clock::time_point now;
auto can_merge = [&]
(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
auto can_merge = [&] (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, String *)
{
/// If any of the parts is already going to be merge into a larger one, do not agree to merge it.
if (queue.partWillBeMergedOrMergesDisabled(left->name)
|| (left.get() != right.get() && queue.partWillBeMergedOrMergesDisabled(right->name)))
return false;
return cached_merging_predicate.get(now, uncached_merging_predicate, merging_predicate_args_to_key, left, right);
return partsWillNotBeMergedOrDisabled(left, right, queue)
&& cached_merging_predicate.get(now, uncached_merging_predicate, merging_predicate_args_to_key, left, right);
};
while (!shutdown_called && is_leader_node)
......@@ -1755,20 +1782,16 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
try
{
if (need_pull)
{
/// You need to load new entries into the queue before you select parts to merge.
/// (so we know which parts are already going to be merged).
pullLogsToQueue();
need_pull = false;
}
std::lock_guard<std::mutex> merge_selecting_lock(merge_selecting_mutex);
/** If many merges is already queued, then will queue only small enough merges.
* Otherwise merge queue could be filled with only large merges,
* and in the same time, many small parts could be created and won't be merged.
*/
/// You need to load new entries into the queue before you select parts to merge.
/// (so we know which parts are already going to be merged).
/// We must select parts for merge under the mutex because other threads (OPTIMIZE queries) could push new merges.
pullLogsToQueue();
/// If many merges is already queued, then will queue only small enough merges.
/// Otherwise merge queue could be filled with only large merges,
/// and in the same time, many small parts could be created and won't be merged.
size_t merges_queued = queue.countMerges();
if (merges_queued >= data.settings.max_replicated_merges_in_queue)
......@@ -1786,14 +1809,10 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
now = std::chrono::steady_clock::now();
if (max_parts_size_for_merge > 0
&& merger.selectPartsToMerge(
future_merged_part, false,
max_parts_size_for_merge,
can_merge)
&& merger.selectPartsToMerge(future_merged_part, false, max_parts_size_for_merge, can_merge)
&& createLogEntryToMergeParts(future_merged_part.parts, future_merged_part.name, deduplicate))
{
success = true;
need_pull = true;
}
}
}
......@@ -2345,42 +2364,51 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
return true;
}
auto can_merge = [this]
(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
auto can_merge = [this] (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, String * out_reason)
{
return canMergePartsAccordingToZooKeeperInfo(left, right, getZooKeeper(), zookeeper_path, data);
return partsWillNotBeMergedOrDisabled(left, right, queue, out_reason)
&& canMergePartsAccordingToZooKeeperInfo(left, right, getZooKeeper(), zookeeper_path, data, out_reason);
};
pullLogsToQueue();
ReplicatedMergeTreeLogEntryData merge_entry;
{
std::lock_guard<std::mutex> merge_selecting_lock(merge_selecting_mutex);
/// We must select parts for merge under the mutex because other threads (OPTIMIZE queries) could push new merges.
pullLogsToQueue();
size_t disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
MergeTreeDataMerger::FuturePart future_merged_part;
String disable_reason;
bool selected = false;
if (!partition)
{
selected = merger.selectPartsToMerge(
future_merged_part, true, data.settings.max_bytes_to_merge_at_max_space_in_pool, can_merge);
future_merged_part, true, data.settings.max_bytes_to_merge_at_max_space_in_pool, can_merge, &disable_reason);
}
else
{
String partition_id = data.getPartitionIDFromQuery(partition, context);
selected = merger.selectAllPartsToMergeWithinPartition(future_merged_part, disk_space, can_merge, partition_id, final);
selected = merger.selectAllPartsToMergeWithinPartition(future_merged_part, disk_space, can_merge, partition_id, final, &disable_reason);
}
if (!selected)
auto handle_noop = [&] (const String & message)
{
LOG_INFO(log, "Cannot select parts for optimization");
if (context.getSettingsRef().optimize_throw_if_noop)
throw Exception(message, ErrorCodes::CANNOT_ASSIGN_OPTIMIZE);
return false;
};
if (!selected)
{
LOG_INFO(log, "Cannot select parts for optimization" + (disable_reason.empty() ? "" : ": " + disable_reason));
return handle_noop(disable_reason);
}
if (!createLogEntryToMergeParts(future_merged_part.parts, future_merged_part.name, deduplicate, &merge_entry))
return false;
return handle_noop("Can't create merge queue node in ZooKeeper");
}
waitForAllReplicasToProcessLogEntry(merge_entry);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册