提交 5bc6bd55 编写于 作者: A Alexey Zatelepin

allow PreCommitted parts to contain each other, clean up code

上级 958a6f09
...@@ -108,8 +108,8 @@ MergeTreeData::MergeTreeData( ...@@ -108,8 +108,8 @@ MergeTreeData::MergeTreeData(
full_path(full_path_), full_path(full_path_),
broken_part_callback(broken_part_callback_), broken_part_callback(broken_part_callback_),
log_name(database_name + "." + table_name), log(&Logger::get(log_name + " (Data)")), log_name(database_name + "." + table_name), log(&Logger::get(log_name + " (Data)")),
data_parts_by_name(data_parts_indexes.get<TagByName>()), data_parts_by_info(data_parts_indexes.get<TagByInfo>()),
data_parts_by_state_and_name(data_parts_indexes.get<TagByStateAndName>()) data_parts_by_state_and_info(data_parts_indexes.get<TagByStateAndInfo>())
{ {
merging_params.check(columns); merging_params.check(columns);
...@@ -418,7 +418,7 @@ Int64 MergeTreeData::getMaxDataPartIndex() ...@@ -418,7 +418,7 @@ Int64 MergeTreeData::getMaxDataPartIndex()
std::lock_guard<std::mutex> lock_all(data_parts_mutex); std::lock_guard<std::mutex> lock_all(data_parts_mutex);
Int64 max_block_id = 0; Int64 max_block_id = 0;
for (const DataPartPtr & part : data_parts_by_name) for (const DataPartPtr & part : data_parts_by_info)
max_block_id = std::max(max_block_id, part->info.max_block); max_block_id = std::max(max_block_id, part->info.max_block);
return max_block_id; return max_block_id;
...@@ -552,11 +552,11 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) ...@@ -552,11 +552,11 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
if (data_parts_indexes.size() >= 2) if (data_parts_indexes.size() >= 2)
{ {
/// Now all parts are committed, so data_parts_by_state_and_name == committed_parts_range /// Now all parts are committed, so data_parts_by_state_and_info == committed_parts_range
auto prev_jt = data_parts_by_state_and_name.begin(); auto prev_jt = data_parts_by_state_and_info.begin();
auto curr_jt = std::next(prev_jt); auto curr_jt = std::next(prev_jt);
auto deactivate_part = [&] (DataPartIteratorByStateAndName it) auto deactivate_part = [&] (DataPartIteratorByStateAndInfo it)
{ {
(*it)->remove_time = (*it)->modification_time; (*it)->remove_time = (*it)->modification_time;
modifyPartState(it, DataPartState::Outdated); modifyPartState(it, DataPartState::Outdated);
...@@ -564,7 +564,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) ...@@ -564,7 +564,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
(*prev_jt)->assertState({DataPartState::Committed}); (*prev_jt)->assertState({DataPartState::Committed});
while (curr_jt != data_parts_by_state_and_name.end() && (*curr_jt)->state == DataPartState::Committed) while (curr_jt != data_parts_by_state_and_info.end() && (*curr_jt)->state == DataPartState::Committed)
{ {
/// Don't consider data parts belonging to different partitions. /// Don't consider data parts belonging to different partitions.
if ((*curr_jt)->info.partition_id != (*prev_jt)->info.partition_id) if ((*curr_jt)->info.partition_id != (*prev_jt)->info.partition_id)
...@@ -664,7 +664,7 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts() ...@@ -664,7 +664,7 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts()
return res; return res;
time_t now = time(nullptr); time_t now = time(nullptr);
std::vector<DataPartIteratorByStateAndName> parts_to_delete; std::vector<DataPartIteratorByStateAndInfo> parts_to_delete;
{ {
std::lock_guard<std::mutex> lock_parts(data_parts_mutex); std::lock_guard<std::mutex> lock_parts(data_parts_mutex);
...@@ -674,7 +674,7 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts() ...@@ -674,7 +674,7 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts()
{ {
const DataPartPtr & part = *it; const DataPartPtr & part = *it;
if (part.unique() && /// Grab only parts that is not using by anyone (SELECTs for example) if (part.unique() && /// Grab only parts that are not used by anyone (SELECTs for example).
part->remove_time < now && part->remove_time < now &&
now - part->remove_time > settings.old_parts_lifetime.totalSeconds()) now - part->remove_time > settings.old_parts_lifetime.totalSeconds())
{ {
...@@ -716,9 +716,9 @@ void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & pa ...@@ -716,9 +716,9 @@ void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & pa
/// TODO: use data_parts iterators instead of pointers /// TODO: use data_parts iterators instead of pointers
for (auto & part : parts) for (auto & part : parts)
{ {
auto it = data_parts_by_name.find(part->info); auto it = data_parts_by_info.find(part->info);
if (it == data_parts_by_name.end()) if (it == data_parts_by_info.end())
throw Exception("Deleting data part " + part->name + " is not exist", ErrorCodes::LOGICAL_ERROR); throw Exception("Deleting data part " + part->name + " doesn't exist", ErrorCodes::LOGICAL_ERROR);
(*it)->assertState({DataPartState::Deleting}); (*it)->assertState({DataPartState::Deleting});
...@@ -1362,6 +1362,58 @@ MergeTreeData::AlterDataPartTransaction::~AlterDataPartTransaction() ...@@ -1362,6 +1362,58 @@ MergeTreeData::AlterDataPartTransaction::~AlterDataPartTransaction()
} }
MergeTreeData::DataPartsVector MergeTreeData::getActivePartsToReplace(
const MergeTreePartInfo & new_part_info,
DataPartPtr & out_covering_part,
std::lock_guard<std::mutex> & /* data_parts_lock */) const
{
/// Parts contained in the part are consecutive in data_parts, intersecting the insertion place for the part itself.
auto it_middle = data_parts_by_state_and_info.lower_bound(DataPartStateAndInfo(DataPartState::Committed, new_part_info));
auto committed_parts_range = getDataPartsStateRange(DataPartState::Committed);
/// Go to the left.
DataPartIteratorByStateAndInfo begin = it_middle;
while (begin != committed_parts_range.begin())
{
auto prev = std::prev(begin);
if (!new_part_info.contains((*prev)->info))
{
if ((*prev)->info.contains(new_part_info))
{
out_covering_part = *prev;
return {};
}
break;
}
begin = prev;
}
/// Go to the right.
DataPartIteratorByStateAndInfo end = it_middle;
while (end != committed_parts_range.end())
{
if ((*end)->info == new_part_info)
throw Exception("Unexpected duplicate part " + (*end)->getNameWithState() + ". It is a bug.", ErrorCodes::LOGICAL_ERROR);
if (!new_part_info.contains((*end)->info))
{
if ((*end)->info.contains(new_part_info))
{
out_covering_part = *end;
return {};
}
break;
}
++end;
}
return DataPartsVector{begin, end};
}
void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction) void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction)
{ {
auto removed = renameTempPartAndReplace(part, increment, out_transaction); auto removed = renameTempPartAndReplace(part, increment, out_transaction);
...@@ -1375,19 +1427,17 @@ void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrem ...@@ -1375,19 +1427,17 @@ void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrem
MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace( MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction) MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction)
{ {
if (out_transaction && out_transaction->data) if (out_transaction && out_transaction->data && out_transaction->data != this)
throw Exception("Using the same MergeTreeData::Transaction for overlapping transactions is invalid", ErrorCodes::LOGICAL_ERROR); throw Exception("The same MergeTreeData::Transaction cannot be used for different tables",
ErrorCodes::LOGICAL_ERROR);
std::lock_guard<std::mutex> lock(data_parts_mutex);
part->assertState({DataPartState::Temporary}); part->assertState({DataPartState::Temporary});
MergeTreePartInfo part_info = part->info; MergeTreePartInfo part_info = part->info;
String part_name; String part_name;
DataPartsVector replaced_parts;
std::vector<DataPartIteratorByStateAndName> replaced_iterators;
{
std::unique_lock<std::mutex> lock(data_parts_mutex);
if (DataPartPtr existing_part_in_partition = getAnyPartInPartition(part->info.partition_id, lock)) if (DataPartPtr existing_part_in_partition = getAnyPartInPartition(part->info.partition_id, lock))
{ {
if (part->partition.value != existing_part_in_partition->partition.value) if (part->partition.value != existing_part_in_partition->partition.value)
...@@ -1410,149 +1460,58 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace( ...@@ -1410,149 +1460,58 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
LOG_TRACE(log, "Renaming temporary part " << part->relative_path << " to " << part_name << "."); LOG_TRACE(log, "Renaming temporary part " << part->relative_path << " to " << part_name << ".");
auto it_duplicate = data_parts_by_name.find(part_info); auto it_duplicate = data_parts_by_info.find(part_info);
if (it_duplicate != data_parts_by_name.end()) if (it_duplicate != data_parts_by_info.end())
{ {
String message = "Part " + (*it_duplicate)->getNameWithState() + " already exists"; String message = "Part " + (*it_duplicate)->getNameWithState() + " already exists";
if ((*it_duplicate)->checkState({DataPartState::Outdated, DataPartState::Deleting})) if ((*it_duplicate)->checkState({DataPartState::Outdated, DataPartState::Deleting}))
{
throw Exception(message + ", but it will be deleted soon", ErrorCodes::PART_IS_TEMPORARILY_LOCKED); throw Exception(message + ", but it will be deleted soon", ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
}
throw Exception(message, ErrorCodes::DUPLICATE_DATA_PART); throw Exception(message, ErrorCodes::DUPLICATE_DATA_PART);
} }
/// Check that part is not covered and doesn't cover other in-progress parts, it makes sense only for Replicated* engines
if (out_transaction)
{
auto check_coverage = [&part_info, &part_name] (const DataPartPtr & part)
{
if (part_info.contains(part->info))
throw Exception("Cannot add part " + part_name + " covering pre-committed part " + part->name, ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
else if (part->info.contains(part_info))
throw Exception("Cannot add part " + part_name + " covered by pre-committed part " + part->name, ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
};
auto it_middle = data_parts_by_state_and_name.lower_bound(DataPartStateAndInfo(DataPartState::PreCommitted, part_info));
auto precommitted_parts_range = getDataPartsStateRange(DataPartState::PreCommitted);
for (auto it = it_middle; it != precommitted_parts_range.begin();)
{
--it;
check_coverage(*it);
}
for (auto it = it_middle; it != precommitted_parts_range.end();)
{
check_coverage(*it);
++it;
}
}
/// Is the part covered by some other part?
DataPartPtr covering_part; DataPartPtr covering_part;
DataPartsVector covered_parts = getActivePartsToReplace(part_info, covering_part, lock);
auto it_middle = data_parts_by_state_and_name.lower_bound(DataPartStateAndInfo(DataPartState::Committed, part_info));
/// Parts contained in the part are consecutive in data_parts, intersecting the insertion place for the part itself.
auto committed_parts_range = getDataPartsStateRange(DataPartState::Committed);
/// Go to the left.
for (auto it = it_middle; it != committed_parts_range.begin();)
{
--it;
if (!part_info.contains((*it)->info))
{
if ((*it)->info.contains(part_info))
covering_part = *it;
break;
}
replaced_iterators.push_back(it);
}
/// Parts must be in ascending order.
std::reverse(replaced_iterators.begin(), replaced_iterators.end());
/// Go to the right.
for (auto it = it_middle; it != committed_parts_range.end();)
{
if ((*it)->name == part_name)
throw Exception("Unexpected duplicate part " + (*it)->getNameWithState() + ". It is a bug.", ErrorCodes::LOGICAL_ERROR);
if (!part_info.contains((*it)->info))
{
if ((*it)->info.contains(part_info))
covering_part = *it;
break;
}
replaced_iterators.push_back(it);
++it;
}
if (covering_part) if (covering_part)
{ {
LOG_WARNING(log, "Tried to add obsolete part " << part_name << " covered by " << covering_part->getNameWithState()); LOG_WARNING(log, "Tried to add obsolete part " << part_name << " covered by " << covering_part->getNameWithState());
/// It is a temporary part, we want to delete it from filesystem immediately
/// Other fields remain the same
part->remove_time = time(nullptr);
part->is_temp = true;
/// Nothing to commit or rollback
if (out_transaction)
{
out_transaction->data = this;
out_transaction->parts_to_add_on_rollback = {};
out_transaction->parts_to_remove_on_rollback = {};
}
/// We replaced nothing
return {}; return {};
} }
/// All checks are passed. Now we can rename the part on disk. /// All checks are passed. Now we can rename the part on disk.
/// So, we maintain invariant: if a non-temporary part in filesystem then it is in data_parts /// So, we maintain invariant: if a non-temporary part in filesystem then it is in data_parts
/// ///
/// Ordinary MergeTree engines (they don't use out_transaction) commit parts immediately, /// If out_transaction is null, we commit the part to the active set immediately, else add it to the transaction.
/// whereas ReplicatedMergeTree uses intermediate PreCommitted state
part->name = part_name; part->name = part_name;
part->info = part_info; part->info = part_info;
part->is_temp = false; part->is_temp = false;
part->state = (out_transaction) ? DataPartState::PreCommitted : DataPartState::Committed; part->state = DataPartState::PreCommitted;
part->renameTo(part_name); part->renameTo(part_name);
data_parts_indexes.insert(part); auto part_it = data_parts_indexes.insert(part).first;
replaced_parts.reserve(replaced_iterators.size());
for (auto it_replacing_part : replaced_iterators)
replaced_parts.emplace_back(*it_replacing_part);
if (!out_transaction)
{
addPartContributionToColumnSizes(part);
auto current_time = time(nullptr); if (out_transaction)
for (auto it_replacing_part : replaced_iterators)
{ {
(*it_replacing_part)->remove_time = current_time; out_transaction->data = this;
modifyPartState(it_replacing_part, DataPartState::Outdated); out_transaction->precommitted_parts.insert(part);
removePartContributionToColumnSizes(*it_replacing_part);
}
} }
else else
{ {
out_transaction->data = this; auto current_time = time(nullptr);
out_transaction->parts_to_add_on_rollback = replaced_parts; for (const DataPartPtr & covered_part : covered_parts)
out_transaction->parts_to_remove_on_rollback = {part}; {
covered_part->remove_time = current_time;
modifyPartState(covered_part, DataPartState::Outdated);
removePartContributionToColumnSizes(covered_part);
} }
modifyPartState(part_it, DataPartState::Committed);
addPartContributionToColumnSizes(part);
} }
return replaced_parts; return covered_parts;
} }
void MergeTreeData::removePartsFromWorkingSet(const DataPartsVector & remove, bool clear_without_timeout) void MergeTreeData::removePartsFromWorkingSet(const DataPartsVector & remove, bool clear_without_timeout)
...@@ -1561,7 +1520,7 @@ void MergeTreeData::removePartsFromWorkingSet(const DataPartsVector & remove, bo ...@@ -1561,7 +1520,7 @@ void MergeTreeData::removePartsFromWorkingSet(const DataPartsVector & remove, bo
for (auto & part : remove) for (auto & part : remove)
{ {
if (!data_parts_by_name.count(part->info)) if (!data_parts_by_info.count(part->info))
throw Exception("Part " + part->getNameWithState() + " not found in data_parts", ErrorCodes::LOGICAL_ERROR); throw Exception("Part " + part->getNameWithState() + " not found in data_parts", ErrorCodes::LOGICAL_ERROR);
part->assertState({DataPartState::PreCommitted, DataPartState::Committed, DataPartState::Outdated}); part->assertState({DataPartState::PreCommitted, DataPartState::Committed, DataPartState::Outdated});
...@@ -1586,8 +1545,8 @@ void MergeTreeData::renameAndDetachPart(const DataPartPtr & part_to_detach, cons ...@@ -1586,8 +1545,8 @@ void MergeTreeData::renameAndDetachPart(const DataPartPtr & part_to_detach, cons
std::lock_guard<std::mutex> lock(data_parts_mutex); std::lock_guard<std::mutex> lock(data_parts_mutex);
auto it_part = data_parts_by_name.find(part_to_detach->info); auto it_part = data_parts_by_info.find(part_to_detach->info);
if (it_part == data_parts_by_name.end()) if (it_part == data_parts_by_info.end())
throw Exception("No such data part " + part_to_detach->getNameWithState(), ErrorCodes::NO_SUCH_DATA_PART); throw Exception("No such data part " + part_to_detach->getNameWithState(), ErrorCodes::NO_SUCH_DATA_PART);
/// What if part_to_detach is reference to *it_part? Make a new owner just in case. /// What if part_to_detach is reference to *it_part? Make a new owner just in case.
...@@ -1619,16 +1578,16 @@ void MergeTreeData::renameAndDetachPart(const DataPartPtr & part_to_detach, cons ...@@ -1619,16 +1578,16 @@ void MergeTreeData::renameAndDetachPart(const DataPartPtr & part_to_detach, cons
return state == DataPartState::Committed || state == DataPartState::Outdated; return state == DataPartState::Committed || state == DataPartState::Outdated;
}; };
auto update_error = [&] (DataPartIteratorByAndName it) auto update_error = [&] (DataPartIteratorByInfo it)
{ {
error = true; error = true;
error_parts += (*it)->getNameWithState() + " "; error_parts += (*it)->getNameWithState() + " ";
}; };
auto it_middle = data_parts_by_name.lower_bound(part->info); auto it_middle = data_parts_by_info.lower_bound(part->info);
/// Restore the leftmost part covered by the part /// Restore the leftmost part covered by the part
if (it_middle != data_parts_by_name.begin()) if (it_middle != data_parts_by_info.begin())
{ {
auto it = std::prev(it_middle); auto it = std::prev(it_middle);
...@@ -1654,7 +1613,7 @@ void MergeTreeData::renameAndDetachPart(const DataPartPtr & part_to_detach, cons ...@@ -1654,7 +1613,7 @@ void MergeTreeData::renameAndDetachPart(const DataPartPtr & part_to_detach, cons
error = true; error = true;
/// Restore "right" parts /// Restore "right" parts
for (auto it = it_middle; it != data_parts_by_name.end() && part->contains(**it); ++it) for (auto it = it_middle; it != data_parts_by_info.end() && part->contains(**it); ++it)
{ {
if ((*it)->info.min_block < pos) if ((*it)->info.min_block < pos)
continue; continue;
...@@ -1776,7 +1735,7 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & ...@@ -1776,7 +1735,7 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String &
auto committed_parts_range = getDataPartsStateRange(DataPartState::Committed); auto committed_parts_range = getDataPartsStateRange(DataPartState::Committed);
/// The part can be covered only by the previous or the next one in data_parts. /// The part can be covered only by the previous or the next one in data_parts.
auto it = data_parts_by_state_and_name.lower_bound(DataPartStateAndInfo(DataPartState::Committed, part_info)); auto it = data_parts_by_state_and_info.lower_bound(DataPartStateAndInfo(DataPartState::Committed, part_info));
if (it != committed_parts_range.end()) if (it != committed_parts_range.end())
{ {
...@@ -1803,8 +1762,8 @@ MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_na ...@@ -1803,8 +1762,8 @@ MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_na
std::lock_guard<std::mutex> lock(data_parts_mutex); std::lock_guard<std::mutex> lock(data_parts_mutex);
auto it = data_parts_by_name.find(part_info); auto it = data_parts_by_info.find(part_info);
if (it == data_parts_by_name.end()) if (it == data_parts_by_info.end())
return nullptr; return nullptr;
for (auto state : valid_states) for (auto state : valid_states)
...@@ -2067,7 +2026,7 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, const Context ...@@ -2067,7 +2026,7 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, const Context
String partition_id = partition.getID(*this); String partition_id = partition.getID(*this);
{ {
std::unique_lock<std::mutex> data_parts_lock(data_parts_mutex); std::lock_guard<std::mutex> data_parts_lock(data_parts_mutex);
DataPartPtr existing_part_in_partition = getAnyPartInPartition(partition_id, data_parts_lock); DataPartPtr existing_part_in_partition = getAnyPartInPartition(partition_id, data_parts_lock);
if (existing_part_in_partition && existing_part_in_partition->partition.value != partition.value) if (existing_part_in_partition && existing_part_in_partition->partition.value != partition.value)
{ {
...@@ -2115,7 +2074,7 @@ MergeTreeData::DataPartsVector MergeTreeData::getAllDataPartsVector(MergeTreeDat ...@@ -2115,7 +2074,7 @@ MergeTreeData::DataPartsVector MergeTreeData::getAllDataPartsVector(MergeTreeDat
DataPartsVector res; DataPartsVector res;
{ {
std::lock_guard<std::mutex> lock(data_parts_mutex); std::lock_guard<std::mutex> lock(data_parts_mutex);
res.assign(data_parts_by_name.begin(), data_parts_by_name.end()); res.assign(data_parts_by_info.begin(), data_parts_by_info.end());
if (out_states != nullptr) if (out_states != nullptr)
{ {
...@@ -2153,14 +2112,14 @@ MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector() const ...@@ -2153,14 +2112,14 @@ MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector() const
} }
MergeTreeData::DataPartPtr MergeTreeData::getAnyPartInPartition( MergeTreeData::DataPartPtr MergeTreeData::getAnyPartInPartition(
const String & partition_id, std::unique_lock<std::mutex> & /*data_parts_lock*/) const String & partition_id, std::lock_guard<std::mutex> & /*data_parts_lock*/)
{ {
auto min_block = std::numeric_limits<Int64>::min(); auto min_block = std::numeric_limits<Int64>::min();
MergeTreePartInfo dummy_part_info(partition_id, min_block, min_block, 0); MergeTreePartInfo dummy_part_info(partition_id, min_block, min_block, 0);
auto it = data_parts_by_state_and_name.lower_bound(DataPartStateAndInfo(DataPartState::Committed, dummy_part_info)); auto it = data_parts_by_state_and_info.lower_bound(DataPartStateAndInfo(DataPartState::Committed, dummy_part_info));
if (it != data_parts_by_state_and_name.end() && (*it)->state == DataPartState::Committed && (*it)->info.partition_id == partition_id) if (it != data_parts_by_state_and_info.end() && (*it)->state == DataPartState::Committed && (*it)->info.partition_id == partition_id)
return *it; return *it;
return nullptr; return nullptr;
...@@ -2171,79 +2130,60 @@ void MergeTreeData::Transaction::rollback() ...@@ -2171,79 +2130,60 @@ void MergeTreeData::Transaction::rollback()
if (!isEmpty()) if (!isEmpty())
{ {
std::stringstream ss; std::stringstream ss;
if (!parts_to_remove_on_rollback.empty())
{
ss << " Removing parts:"; ss << " Removing parts:";
for (const auto & part : parts_to_remove_on_rollback) for (const auto & part : precommitted_parts)
ss << " " << part->relative_path;
ss << ".";
}
if (!parts_to_add_on_rollback.empty())
{
ss << " Adding parts: ";
for (const auto & part : parts_to_add_on_rollback)
ss << " " << part->relative_path; ss << " " << part->relative_path;
ss << "."; ss << ".";
}
LOG_DEBUG(data->log, "Undoing transaction." << ss.str()); LOG_DEBUG(data->log, "Undoing transaction." << ss.str());
/// PreCommitted -> Outdated data->removePartsFromWorkingSet(
replaceParts(DataPartState::Outdated, DataPartState::Committed, true); DataPartsVector(precommitted_parts.begin(), precommitted_parts.end()),
/* clear_without_timeout = */ true);
} }
clear(); clear();
} }
void MergeTreeData::Transaction::commit() MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit()
{ {
DataPartsVector total_covered_parts;
if (!isEmpty()) if (!isEmpty())
{ {
/// PreCommitted -> Committed, Committed -> Outdated std::lock_guard<std::mutex> data_parts_lock(data->data_parts_mutex);
replaceParts(DataPartState::Committed, DataPartState::Outdated, false);
}
clear();
}
void MergeTreeData::Transaction::replaceParts(MergeTreeData::DataPartState move_precommitted_to,
MergeTreeData::DataPartState move_committed_to, bool remove_without_delay)
{
auto & committed_parts = parts_to_add_on_rollback;
auto & precommitted_parts = parts_to_remove_on_rollback;
/// TODO: also make sense to activate CleanupThread's cv
auto remove_time = (remove_without_delay) ? 0 : time(nullptr);
auto current_time = time(nullptr);
for (const DataPartPtr & part : precommitted_parts)
{ {
std::lock_guard<std::mutex> lock(data->data_parts_mutex); DataPartPtr covering_part;
DataPartsVector covered_parts = data->getActivePartsToReplace(part->info, covering_part, data_parts_lock);
for (auto & part : committed_parts) if (covering_part)
part->assertState({DataPartState::Committed}); {
for (auto & part : precommitted_parts) LOG_WARNING(data->log, "Tried to commit obsolete part " << part->name
part->assertState({DataPartState::PreCommitted}); << " covered by " << covering_part->getNameWithState());
/// If it is rollback then do nothing, else make it Outdated and remove their size contribution part->remove_time = 0; /// The part will be removed without waiting for old_parts_lifetime seconds.
if (move_committed_to != DataPartState::Committed) data->modifyPartState(part, DataPartState::Outdated);
}
else
{ {
for (const DataPartPtr & part : committed_parts) total_covered_parts.insert(total_covered_parts.end(), covered_parts.begin(), covered_parts.end());
for (const DataPartPtr & covered_part : covered_parts)
{ {
data->modifyPartState(part, move_committed_to); covered_part->remove_time = current_time;
part->remove_time = remove_time; data->modifyPartState(covered_part, DataPartState::Outdated);
data->removePartContributionToColumnSizes(part); data->removePartContributionToColumnSizes(covered_part);
}
} }
/// If it is rollback just change state to Outdated, else change state to Committed and add their size contribution data->modifyPartState(part, DataPartState::Committed);
for (auto & part : precommitted_parts)
{
data->modifyPartState(part, move_precommitted_to);
if (move_precommitted_to == DataPartState::Committed)
data->addPartContributionToColumnSizes(part); data->addPartContributionToColumnSizes(part);
else
part->remove_time = remove_time;
} }
} }
}
clear();
return total_covered_parts;
} }
bool MergeTreeData::isPrimaryKeyColumn(const ASTPtr &node) const bool MergeTreeData::isPrimaryKeyColumn(const ASTPtr &node) const
......
...@@ -145,20 +145,24 @@ public: ...@@ -145,20 +145,24 @@ public:
using DataParts = std::set<DataPartPtr, LessDataPart>; using DataParts = std::set<DataPartPtr, LessDataPart>;
using DataPartsVector = std::vector<DataPartPtr>; using DataPartsVector = std::vector<DataPartPtr>;
/// Some operations on the set of parts return a Transaction object. /// Auxiliary object to add a set of parts into the working set in two steps:
/// * First, as PreCommitted parts (the parts are ready, but not yet in the active set).
/// * Next, if commit() is called, the parts are added to the active set and the parts that are
/// covered by them are marked Outdated.
/// If neither commit() nor rollback() was called, the destructor rollbacks the operation. /// If neither commit() nor rollback() was called, the destructor rollbacks the operation.
class Transaction : private boost::noncopyable class Transaction : private boost::noncopyable
{ {
public: public:
Transaction() {} Transaction() {}
void commit(); /// Return parts marked Obsolete as a result of the transaction commit.
DataPartsVector commit();
void rollback(); void rollback();
bool isEmpty() const bool isEmpty() const
{ {
return parts_to_add_on_rollback.empty() && parts_to_remove_on_rollback.empty(); return precommitted_parts.empty();
} }
~Transaction() ~Transaction()
...@@ -172,23 +176,18 @@ public: ...@@ -172,23 +176,18 @@ public:
tryLogCurrentException("~MergeTreeData::Transaction"); tryLogCurrentException("~MergeTreeData::Transaction");
} }
} }
private: private:
friend class MergeTreeData; friend class MergeTreeData;
MergeTreeData * data = nullptr; MergeTreeData * data = nullptr;
DataParts precommitted_parts;
/// What to do on rollback.
DataPartsVector parts_to_remove_on_rollback;
DataPartsVector parts_to_add_on_rollback;
void clear() void clear()
{ {
data = nullptr; data = nullptr;
parts_to_remove_on_rollback.clear(); precommitted_parts.clear();
parts_to_add_on_rollback.clear();
} }
void replaceParts(DataPartState move_precommitted_to, DataPartState move_committed_to, bool remove_without_delay);
}; };
/// An object that stores the names of temporary files created in the part directory during ALTER of its /// An object that stores the names of temporary files created in the part directory during ALTER of its
...@@ -368,14 +367,17 @@ public: ...@@ -368,14 +367,17 @@ public:
/// If until is non-null, wake up from the sleep earlier if the event happened. /// If until is non-null, wake up from the sleep earlier if the event happened.
void delayInsertIfNeeded(Poco::Event * until = nullptr); void delayInsertIfNeeded(Poco::Event * until = nullptr);
/// Renames temporary part to a permanent part and adds it to the working set. /// Renames temporary part to a permanent part and adds it to the parts set.
/// If increment != nullptr, part index is determing using increment. Otherwise part index remains unchanged.
/// It is assumed that the part does not intersect with existing parts. /// It is assumed that the part does not intersect with existing parts.
/// If out_transaction != nullptr, sets it to an object allowing to rollback part addition (but not the renaming). /// If increment != nullptr, part index is determing using increment. Otherwise part index remains unchanged.
/// If out_transaction != nullptr, adds the part in the PreCommitted state (the part will be added to the
/// active set later with out_transaction->commit()).
/// Else, commits the part immediately.
void renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr); void renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr);
/// The same as renameTempPartAndAdd but the part can intersect existing parts. /// The same as renameTempPartAndAdd but the block range of the part can contain existing parts.
/// Deletes and returns all parts covered by the added part (in ascending order). /// Returns all parts covered by the added part (in ascending order).
/// If out_transaction == nullptr, marks covered parts as Outdated.
DataPartsVector renameTempPartAndReplace( DataPartsVector renameTempPartAndReplace(
MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr); MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr);
...@@ -573,8 +575,8 @@ private: ...@@ -573,8 +575,8 @@ private:
/// Work with data parts /// Work with data parts
struct TagByName{}; struct TagByInfo{};
struct TagByStateAndName{}; struct TagByStateAndInfo{};
static const MergeTreePartInfo & dataPartPtrToInfo(const DataPartPtr & part) static const MergeTreePartInfo & dataPartPtrToInfo(const DataPartPtr & part)
{ {
...@@ -588,14 +590,14 @@ private: ...@@ -588,14 +590,14 @@ private:
using DataPartsIndexes = boost::multi_index_container<DataPartPtr, using DataPartsIndexes = boost::multi_index_container<DataPartPtr,
boost::multi_index::indexed_by< boost::multi_index::indexed_by<
/// Index by Name /// Index by Info
boost::multi_index::ordered_unique< boost::multi_index::ordered_unique<
boost::multi_index::tag<TagByName>, boost::multi_index::tag<TagByInfo>,
boost::multi_index::global_fun<const DataPartPtr &, const MergeTreePartInfo &, dataPartPtrToInfo> boost::multi_index::global_fun<const DataPartPtr &, const MergeTreePartInfo &, dataPartPtrToInfo>
>, >,
/// Index by (State, Name), is used to obtain ordered slices of parts with the same state /// Index by (State, Info), is used to obtain ordered slices of parts with the same state
boost::multi_index::ordered_unique< boost::multi_index::ordered_unique<
boost::multi_index::tag<TagByStateAndName>, boost::multi_index::tag<TagByStateAndInfo>,
boost::multi_index::global_fun<const DataPartPtr &, DataPartStateAndInfo, dataPartPtrToStateAndInfo>, boost::multi_index::global_fun<const DataPartPtr &, DataPartStateAndInfo, dataPartPtrToStateAndInfo>,
LessStateDataPart LessStateDataPart
> >
...@@ -605,16 +607,16 @@ private: ...@@ -605,16 +607,16 @@ private:
/// Current set of data parts. /// Current set of data parts.
mutable std::mutex data_parts_mutex; mutable std::mutex data_parts_mutex;
DataPartsIndexes data_parts_indexes; DataPartsIndexes data_parts_indexes;
DataPartsIndexes::index<TagByName>::type & data_parts_by_name; DataPartsIndexes::index<TagByInfo>::type & data_parts_by_info;
DataPartsIndexes::index<TagByStateAndName>::type & data_parts_by_state_and_name; DataPartsIndexes::index<TagByStateAndInfo>::type & data_parts_by_state_and_info;
using DataPartIteratorByAndName = DataPartsIndexes::index<TagByName>::type::iterator; using DataPartIteratorByInfo = DataPartsIndexes::index<TagByInfo>::type::iterator;
using DataPartIteratorByStateAndName = DataPartsIndexes::index<TagByStateAndName>::type::iterator; using DataPartIteratorByStateAndInfo = DataPartsIndexes::index<TagByStateAndInfo>::type::iterator;
boost::iterator_range<DataPartIteratorByStateAndName> getDataPartsStateRange(DataPartState state) const boost::iterator_range<DataPartIteratorByStateAndInfo> getDataPartsStateRange(DataPartState state) const
{ {
auto begin = data_parts_by_state_and_name.lower_bound(state, LessStateDataPart()); auto begin = data_parts_by_state_and_info.lower_bound(state, LessStateDataPart());
auto end = data_parts_by_state_and_name.upper_bound(state, LessStateDataPart()); auto end = data_parts_by_state_and_info.upper_bound(state, LessStateDataPart());
return {begin, end}; return {begin, end};
} }
...@@ -623,25 +625,25 @@ private: ...@@ -623,25 +625,25 @@ private:
return [state] (const DataPartPtr & part) { part->state = state; }; return [state] (const DataPartPtr & part) { part->state = state; };
} }
void modifyPartState(DataPartIteratorByStateAndName it, DataPartState state) void modifyPartState(DataPartIteratorByStateAndInfo it, DataPartState state)
{ {
if (!data_parts_by_state_and_name.modify(it, getStateModifier(state))) if (!data_parts_by_state_and_info.modify(it, getStateModifier(state)))
throw Exception("Can't modify " + (*it)->getNameWithState(), ErrorCodes::LOGICAL_ERROR); throw Exception("Can't modify " + (*it)->getNameWithState(), ErrorCodes::LOGICAL_ERROR);
} }
void modifyPartState(DataPartIteratorByAndName it, DataPartState state) void modifyPartState(DataPartIteratorByInfo it, DataPartState state)
{ {
if (!data_parts_by_state_and_name.modify(data_parts_indexes.project<TagByStateAndName>(it), getStateModifier(state))) if (!data_parts_by_state_and_info.modify(data_parts_indexes.project<TagByStateAndInfo>(it), getStateModifier(state)))
throw Exception("Can't modify " + (*it)->getNameWithState(), ErrorCodes::LOGICAL_ERROR); throw Exception("Can't modify " + (*it)->getNameWithState(), ErrorCodes::LOGICAL_ERROR);
} }
void modifyPartState(const DataPartPtr & part, DataPartState state) void modifyPartState(const DataPartPtr & part, DataPartState state)
{ {
auto it = data_parts_by_name.find(part->info); auto it = data_parts_by_info.find(part->info);
if (it == data_parts_by_name.end() || (*it).get() != part.get()) if (it == data_parts_by_info.end() || (*it).get() != part.get())
throw Exception("Part " + part->name + " is not exists", ErrorCodes::LOGICAL_ERROR); throw Exception("Part " + part->name + " doesn't exist", ErrorCodes::LOGICAL_ERROR);
if (!data_parts_by_state_and_name.modify(data_parts_indexes.project<TagByStateAndName>(it), getStateModifier(state))) if (!data_parts_by_state_and_info.modify(data_parts_indexes.project<TagByStateAndInfo>(it), getStateModifier(state)))
throw Exception("Can't modify " + (*it)->getNameWithState(), ErrorCodes::LOGICAL_ERROR); throw Exception("Can't modify " + (*it)->getNameWithState(), ErrorCodes::LOGICAL_ERROR);
} }
...@@ -672,7 +674,14 @@ private: ...@@ -672,7 +674,14 @@ private:
void removePartContributionToColumnSizes(const DataPartPtr & part); void removePartContributionToColumnSizes(const DataPartPtr & part);
/// If there is no part in the partition with ID `partition_id`, returns empty ptr. Should be called under the lock. /// If there is no part in the partition with ID `partition_id`, returns empty ptr. Should be called under the lock.
DataPartPtr getAnyPartInPartition(const String & partition_id, std::unique_lock<std::mutex> & data_parts_lock); DataPartPtr getAnyPartInPartition(const String & partition_id, std::lock_guard<std::mutex> & data_parts_lock);
/// Return parts in the Committed set that are covered by the new_part_info or the part that covers it.
/// Will check that the new part doesn't already exist and that it doesn't intersect existing part.
DataPartsVector getActivePartsToReplace(
const MergeTreePartInfo & new_part_info,
DataPartPtr & out_covering_part,
std::lock_guard<std::mutex> & data_parts_lock) const;
/// Checks whether the column is in the primary key. /// Checks whether the column is in the primary key.
bool isPrimaryKeyColumn(const ASTPtr &node) const; bool isPrimaryKeyColumn(const ASTPtr &node) const;
......
...@@ -29,6 +29,11 @@ struct MergeTreePartInfo ...@@ -29,6 +29,11 @@ struct MergeTreePartInfo
< std::forward_as_tuple(rhs.partition_id, rhs.min_block, rhs.max_block, rhs.level); < std::forward_as_tuple(rhs.partition_id, rhs.min_block, rhs.max_block, rhs.level);
} }
bool operator==(const MergeTreePartInfo & rhs) const
{
return !(*this < rhs || rhs < *this);
}
/// Contains another part (obtained after merging another part with some other) /// Contains another part (obtained after merging another part with some other)
bool contains(const MergeTreePartInfo & rhs) const bool contains(const MergeTreePartInfo & rhs) const
{ {
......
...@@ -2259,13 +2259,13 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin ...@@ -2259,13 +2259,13 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
checkPartAndAddToZooKeeper(part, ops, part_name); checkPartAndAddToZooKeeper(part, ops, part_name);
MergeTreeData::Transaction transaction; MergeTreeData::Transaction transaction;
replaced_parts = data.renameTempPartAndReplace(part, nullptr, &transaction); data.renameTempPartAndReplace(part, nullptr, &transaction);
/// Do not commit if the part is obsolete /// Do not commit if the part is obsolete
if (!transaction.isEmpty()) if (!transaction.isEmpty())
{ {
getZooKeeper()->multi(ops); getZooKeeper()->multi(ops);
transaction.commit(); replaced_parts = transaction.commit();
} }
/** If a quorum is tracked for this part, you must update it. /** If a quorum is tracked for this part, you must update it.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册