提交 4b41c527 编写于 作者: A Alexey Milovidov

Attempt to improve performance of replication queue processing [#METR-22352].

上级 bb5bdd09
......@@ -419,6 +419,13 @@ private:
*/
String findReplicaHavingPart(const String & part_name, bool active);
/** Find replica having specified part or any part that covers it.
* If active = true, consider only active replicas.
* If found, returns replica name and set 'out_covering_part_name' to name of found largest covering part.
* If not found, returns empty string.
*/
String findReplicaHavingCoveringPart(const String & part_name, bool active, String & out_covering_part_name);
/** Скачать указанный кусок с указанной реплики.
* Если to_detached, то кусок помещается в директорию detached.
* Если quorum != 0, то обновляется узел для отслеживания кворума.
......
......@@ -1208,7 +1208,8 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
try
{
replica = findReplicaHavingPart(entry.new_part_name, true);
String covering_part;
replica = findReplicaHavingCoveringPart(entry.new_part_name, true, covering_part);
if (replica.empty() && entry.type == LogEntry::ATTACH_PART)
{
......@@ -1223,7 +1224,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
/// Подождём, пока реплика-инициатор подцепит кусок.
waitForReplicaToProcessLogEntry(entry.source_replica, entry);
replica = findReplicaHavingPart(entry.new_part_name, true);
replica = findReplicaHavingCoveringPart(entry.new_part_name, true, covering_part);
}
if (replica.empty())
......@@ -1351,11 +1352,11 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
if (replica.empty())
{
ProfileEvents::increment(ProfileEvents::ReplicatedPartFailedFetches);
throw Exception("No active replica has part " + entry.new_part_name, ErrorCodes::NO_REPLICA_HAS_PART);
throw Exception("No active replica has part " + entry.new_part_name + " or covering part", ErrorCodes::NO_REPLICA_HAS_PART);
}
}
fetchPart(entry.new_part_name, zookeeper_path + "/replicas/" + replica, false, entry.quorum);
fetchPart(covering_part, zookeeper_path + "/replicas/" + replica, false, entry.quorum);
if (entry.type == LogEntry::MERGE_PARTS)
ProfileEvents::increment(ProfileEvents::ReplicatedPartFetchesOfMerged);
......@@ -1872,7 +1873,7 @@ String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_nam
auto zookeeper = getZooKeeper();
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
/// Из реплик, у которых есть кусок, выберем одну равновероятно.
/// Select replicas in uniformly random order.
std::random_shuffle(replicas.begin(), replicas.end());
for (const String & replica : replicas)
......@@ -1884,7 +1885,45 @@ String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_nam
/// Конечно, реплика может перестать быть активной или даже перестать существовать после возврата из этой функции.
}
return "";
return {};
}
String StorageReplicatedMergeTree::findReplicaHavingCoveringPart(const String & part_name, bool active, String & out_covering_part_name)
{
auto zookeeper = getZooKeeper();
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
/// Select replicas in uniformly random order.
std::random_shuffle(replicas.begin(), replicas.end());
for (const String & replica : replicas)
{
if (active && !zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
continue;
String largest_part_found;
Strings parts = zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/parts");
for (const String & part_on_replica : parts)
{
if (part_on_replica == part_name || ActiveDataPartSet::contains(part_on_replica, part_name))
{
if (largest_part_found.empty()
|| ActiveDataPartSet::contains(part_on_replica, largest_part_found))
{
largest_part_found = part_on_replica;
}
}
}
if (!largest_part_found.empty())
{
out_covering_part_name = largest_part_found;
return replica_name;
}
}
return {};
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册