提交 fdb33d8f 编写于 作者: A Alexey Zatelepin

execute mutation log entries [#CLICKHOUSE-3747]

上级 fd81cc7f
......@@ -893,6 +893,34 @@ ReplicatedMergeTreeMergePredicate ReplicatedMergeTreeQueue::getMergePredicate(zk
}
MutationCommands ReplicatedMergeTreeQueue::getMutationCommands(
const MergeTreeData::DataPartPtr & part, Int64 desired_mutation_version) const
{
std::lock_guard lock(target_state_mutex);
auto in_partition = mutations_by_partition.find(part->info.partition_id);
if (in_partition == mutations_by_partition.end())
throw Exception("There are no mutations for partition ID " + part->info.partition_id
+ " (trying to mutate part " + part->name + "to " + toString(desired_mutation_version) + ")",
ErrorCodes::LOGICAL_ERROR);
auto begin = in_partition->second.upper_bound(part->info.getDataVersion());
auto end = in_partition->second.find(desired_mutation_version);
if (end == in_partition->second.end())
throw Exception("Mutation with version " + toString(desired_mutation_version)
+ " not found in partition ID " + part->info.partition_id
+ " (trying to mutate part " + part->name + ")",
ErrorCodes::LOGICAL_ERROR);
++end;
std::vector<MutationCommand> commands;
for (auto it = begin; it != end; ++it)
commands.insert(commands.end(), it->second->commands.commands.begin(), it->second->commands.commands.end());
return MutationCommands{commands};
}
void ReplicatedMergeTreeQueue::disableMergesInRange(const String & part_name)
{
std::lock_guard lock(target_state_mutex);
......
......@@ -225,6 +225,8 @@ public:
ReplicatedMergeTreeMergePredicate getMergePredicate(zkutil::ZooKeeperPtr & zookeeper);
MutationCommands getMutationCommands(const MergeTreeData::DataPartPtr & part, Int64 desired_mutation_version) const;
/// Prohibit merges in the specified range.
void disableMergesInRange(const String & part_name);
......
......@@ -1037,7 +1037,8 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
}
if (entry.type == LogEntry::GET_PART ||
entry.type == LogEntry::MERGE_PARTS)
entry.type == LogEntry::MERGE_PARTS ||
entry.type == LogEntry::MUTATE_PART)
{
/// If we already have this part or a part covering it, we do not need to do anything.
/// The part may be still in the PreCommitted -> Committed transition so we first search
......@@ -1074,7 +1075,11 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
}
else if (entry.type == LogEntry::MERGE_PARTS)
{
tryExecuteMerge(entry, do_fetch);
do_fetch = !tryExecuteMerge(entry);
}
else if (entry.type == LogEntry::MUTATE_PART)
{
do_fetch = !tryExecutePartMutation(entry);
}
else
{
......@@ -1088,12 +1093,8 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
}
void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTree::LogEntry & entry, bool & do_fetch)
bool StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTree::LogEntry & entry)
{
/// The caller has already decided to make the fetch
if (do_fetch)
return;
// Log source part names just in case
{
std::stringstream log_message;
......@@ -1128,8 +1129,8 @@ void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTre
if (!have_all_parts)
{
/// If you do not have all the necessary parts, try to take some already merged part from someone.
do_fetch = true;
LOG_DEBUG(log, "Don't have all parts for merge " << entry.new_part_name << "; will try to fetch it instead");
return false;
}
else if (entry.create_time + data.settings.prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr))
{
......@@ -1145,15 +1146,12 @@ void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTre
String replica = findReplicaHavingPart(entry.new_part_name, true); /// NOTE excessive ZK requests for same data later, may remove.
if (!replica.empty())
{
do_fetch = true;
LOG_DEBUG(log, "Prefer to fetch " << entry.new_part_name << " from replica " << replica);
return false;
}
}
}
if (do_fetch)
return;
/// Start to make the main work
size_t estimated_space_for_merge = MergeTreeDataMergerMutator::estimateNeededDiskSpace(parts);
......@@ -1164,7 +1162,6 @@ void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTre
auto table_lock = lockStructure(false, __PRETTY_FUNCTION__);
MergeList::EntryPtr merge_entry = context.getMergeList().insert(database_name, table_name, entry.new_part_name, parts);
MergeTreeData::Transaction transaction;
size_t aio_threshold = context.getSettings().min_bytes_to_use_direct_io;
MergeTreeDataMergerMutator::FuturePart future_merged_part(parts);
......@@ -1174,11 +1171,11 @@ void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTre
+ entry.new_part_name + "`", ErrorCodes::BAD_DATA_PART_NAME);
}
/// Logging
Stopwatch stopwatch;
ExecutionStatus execution_status;
MergeTreeData::Transaction transaction;
MergeTreeData::MutableDataPartPtr part;
/// Logging
Stopwatch stopwatch;
auto write_part_log = [&] (const ExecutionStatus & execution_status)
{
try
......@@ -1237,7 +1234,6 @@ void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTre
{
if (MergeTreeDataPartChecksums::isBadChecksumsErrorCode(e.code()))
{
do_fetch = true;
transaction.rollback();
ProfileEvents::increment(ProfileEvents::DataAfterMergeDiffersFromReplica);
......@@ -1256,13 +1252,14 @@ void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTre
"We will download merged part from replica to force byte-identical result.");
write_part_log(ExecutionStatus::fromCurrentException());
return;
return false;
}
throw;
}
/** Removing old chunks from ZK and from the disk is delayed - see ReplicatedMergeTreeCleanupThread, clearOldParts.
/** Removing old parts from ZK and from the disk is delayed - see ReplicatedMergeTreeCleanupThread, clearOldParts.
*/
/** With `ZSESSIONEXPIRED` or `ZOPERATIONTIMEOUT`, we can inadvertently roll back local changes to the parts.
......@@ -1272,6 +1269,128 @@ void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTre
ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
write_part_log({});
return true;
}
catch (...)
{
write_part_log(ExecutionStatus::fromCurrentException());
throw;
}
}
bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedMergeTree::LogEntry & entry)
{
const String & source_part_name = entry.source_parts.at(0);
LOG_TRACE(log, "Executing log entry to mutate part " << source_part_name << " to " << entry.new_part_name);
MergeTreeData::DataPartPtr source_part = data.getActiveContainingPart(source_part_name);
if (!source_part)
{
LOG_DEBUG(log, "Source part for " << entry.new_part_name << " is not ready; will try to fetch it instead");
return false;
}
if (source_part->name != source_part_name)
{
throw Exception("Part " + source_part_name + " is covered by " + source_part->name
+ " but should be mutated to " + entry.new_part_name + ". This is a bug.",
ErrorCodes::LOGICAL_ERROR);
}
/// TODO - some better heuristic?
size_t estimated_space_for_result = source_part->bytes_on_disk;
if (entry.create_time + data.settings.prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr)
&& estimated_space_for_result >= data.settings.prefer_fetch_merged_part_size_threshold)
{
/// If entry is old enough, and have enough size, and some replica has the desired part,
/// then prefer fetching from replica.
String replica = findReplicaHavingPart(entry.new_part_name, true); /// NOTE excessive ZK requests for same data later, may remove.
if (!replica.empty())
{
LOG_DEBUG(log, "Prefer to fetch " << entry.new_part_name << " from replica " << replica);
return false;
}
}
MergeTreePartInfo new_part_info = MergeTreePartInfo::fromPartName(
entry.new_part_name, data.format_version);
MutationCommands commands = queue.getMutationCommands(source_part, new_part_info.mutation);
/// Can throw an exception.
DiskSpaceMonitor::ReservationPtr reserved_space = DiskSpaceMonitor::reserve(full_path, estimated_space_for_result);
auto table_lock = lockStructure(false, __PRETTY_FUNCTION__);
MergeTreeData::MutableDataPartPtr new_part;
MergeTreeData::Transaction transaction;
/// Logging
Stopwatch stopwatch;
auto write_part_log = [&] (const ExecutionStatus & /* execution_status */)
{
try
{
/// TODO
return;
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
};
try
{
MergeTreeDataMergerMutator::FuturePart future_mutated_part;
future_mutated_part.parts.push_back(source_part);
future_mutated_part.part_info = new_part_info;
future_mutated_part.name = entry.new_part_name;
new_part = merger_mutator.mutatePartToTemporaryPart(future_mutated_part, commands.commands, context);
if (new_part->rows_count)
data.renameTempPartAndReplace(new_part, nullptr, &transaction);
else
throw Exception("The part was fully deleted, this case is not implemented yet",
ErrorCodes::NOT_IMPLEMENTED); /// TODO
try
{
checkPartChecksumsAndCommit(transaction, new_part);
}
catch (const Exception & e)
{
if (MergeTreeDataPartChecksums::isBadChecksumsErrorCode(e.code()))
{
transaction.rollback();
ProfileEvents::increment(ProfileEvents::DataAfterMergeDiffersFromReplica);
LOG_ERROR(log, getCurrentExceptionMessage(false) << ". "
"Data after mutation is not byte-identical to data on another replicas. "
"We will download merged part from replica to force byte-identical result.");
write_part_log(ExecutionStatus::fromCurrentException());
return false;
}
throw;
}
/// TODO immediately delete the old part so that it doesn't waste space.
/** With `ZSESSIONEXPIRED` or `ZOPERATIONTIMEOUT`, we can inadvertently roll back local changes to the parts.
* This is not a problem, because in this case the entry will remain in the queue, and we will try again.
*/
merge_selecting_event.set();
/// TODO metrics (ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);)
write_part_log({});
return true;
}
catch (...)
{
......
......@@ -351,7 +351,9 @@ private:
void executeDropRange(const LogEntry & entry);
/// Do the merge or recommend to make the fetch instead of the merge
void tryExecuteMerge(const LogEntry & entry, bool & do_fetch);
bool tryExecuteMerge(const LogEntry & entry);
bool tryExecutePartMutation(const LogEntry & entry);
bool executeFetch(const LogEntry & entry);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册