提交 28a3e78a 编写于 作者: A alesapin

Remove 'Adding of unexpected local part to zookeeper' logic because it's redundant.

上级 9121822e
......@@ -547,45 +547,25 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
/// There are no PreCommitted parts at startup.
auto parts = data.getDataParts({MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated});
/// Local parts that are not in ZK.
/** Local parts that are not in ZK.
* In very rare cases they may cover missing parts
* and someone may think that pushing them to zookeeper is good idea.
* But actually we can't precisely determine that ALL missing parts
* covered by this unexpected part. So missing parts will be downloaded.
*/
MergeTreeData::DataParts unexpected_parts;
for (const auto & part : parts)
{
if (expected_parts.count(part->name))
expected_parts.erase(part->name);
else
unexpected_parts.insert(part);
}
/// Which local parts to added into ZK.
MergeTreeData::DataPartsVector parts_to_add;
UInt64 parts_to_add_rows = 0;
/// Which parts should be taken from other replicas.
Strings parts_to_fetch;
for (const String & missing_name : expected_parts)
for (const auto & part : parts)
{
/// If locally some part is missing, but there is a part covering it, you can replace it in ZK with the covering one.
auto containing = data.getActiveContainingPart(missing_name);
if (containing)
{
LOG_ERROR(log, "Ignoring missing local part " << missing_name << " because part " << containing->name << " exists");
if (unexpected_parts.count(containing))
{
parts_to_add.push_back(containing);
unexpected_parts.erase(containing);
parts_to_add_rows += containing->rows_count;
}
}
if (expected_parts.count(part->name))
parts_to_fetch.push_back(part->name); /// these parts we will fetch from other replicas
else
parts_to_fetch.push_back(missing_name);
unexpected_parts.insert(part); /// this parts we will place to detached with ignored_ prefix
}
for (const String & name : parts_to_fetch)
expected_parts.erase(name);
/** To check the adequacy, for the parts that are in the FS, but not in ZK, we will only consider not the most recent parts.
* Because unexpected new parts usually arise only because they did not have time to enroll in ZK with a rough restart of the server.
......@@ -620,16 +600,10 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
for (const String & name : parts_to_fetch)
parts_to_fetch_blocks += get_blocks_count_in_data_part(name);
UInt64 expected_parts_blocks = 0;
for (const String & name : expected_parts)
expected_parts_blocks += get_blocks_count_in_data_part(name);
std::stringstream sanity_report;
sanity_report << "There are "
<< unexpected_parts.size() << " unexpected parts with " << unexpected_parts_rows << " rows ("
<< unexpected_parts_nonnew << " of them is not just-written with " << unexpected_parts_rows << " rows), "
<< parts_to_add.size() << " unexpectedly merged parts with " << parts_to_add_rows << " rows, "
<< expected_parts.size() << " missing obsolete parts (with " << expected_parts_blocks << " blocks), "
<< parts_to_fetch.size() << " missing parts (with " << parts_to_fetch_blocks << " blocks).";
/** We can automatically synchronize data,
......@@ -645,45 +619,23 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
for (const auto & part : parts)
total_rows_on_filesystem += part->rows_count;
UInt64 total_suspicious_rows = parts_to_add_rows + unexpected_parts_rows;
UInt64 total_suspicious_rows_no_new = parts_to_add_rows + unexpected_parts_nonnew_rows;
bool insane = total_suspicious_rows > total_rows_on_filesystem * data.settings.replicated_max_ratio_of_wrong_parts;
bool insane = unexpected_parts_rows > total_rows_on_filesystem * data.settings.replicated_max_ratio_of_wrong_parts;
if (insane && !skip_sanity_checks)
{
std::stringstream why;
why << "The local set of parts of table " << database_name << "." << table_name << " doesn't look like the set of parts "
<< "in ZooKeeper: "
<< formatReadableQuantity(total_suspicious_rows) << " rows of " << formatReadableQuantity(total_rows_on_filesystem)
<< formatReadableQuantity(unexpected_parts_rows) << " rows of " << formatReadableQuantity(total_rows_on_filesystem)
<< " total rows in filesystem are suspicious.";
throw Exception(why.str() + " " + sanity_report.str(), ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS);
}
if (total_suspicious_rows_no_new > 0)
if (unexpected_parts_nonnew_rows > 0)
LOG_WARNING(log, sanity_report.str());
/// Add information to the ZK about the parts that cover the missing parts.
for (const MergeTreeData::DataPartPtr & part : parts_to_add)
{
LOG_ERROR(log, "Adding unexpected local part to ZooKeeper: " << part->name);
Coordination::Requests ops;
checkPartChecksumsAndAddCommitOps(zookeeper, part, ops);
zookeeper->multi(ops);
}
/// Remove from ZK information about the parts covered by the newly added ones.
{
for (const String & name : expected_parts)
LOG_ERROR(log, "Removing unexpectedly merged local part from ZooKeeper: " << name);
removePartsFromZooKeeper(zookeeper, Strings(expected_parts.begin(), expected_parts.end()));
}
/// Add to the queue jobs to pick up the missing parts from other replicas and remove from ZK the information that we have them.
std::vector<std::future<Coordination::ExistsResponse>> exists_futures;
exists_futures.reserve(parts_to_fetch.size());
for (const String & part_name : parts_to_fetch)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册