提交 3b5a3e73 编写于 作者: A Alexey Milovidov

dbms: quorum insert: development [#METR-16779].

上级 dcf40ef7
......@@ -154,6 +154,8 @@ public:
std::call_once(once_flag, [&]
{
zookeeper->createIfNotExists(storage.zookeeper_path + "/quorum", "");
zookeeper->createIfNotExists(storage.zookeeper_path + "/quorum/last_part", "");
zookeeper->createIfNotExists(storage.zookeeper_path + "/quorum/failed_parts", "");
});
ReplicatedMergeTreeQuorumEntry quorum_entry;
......
......@@ -39,6 +39,8 @@ private:
/// Удалить из ZooKeeper старые хеши блоков. Это делает ведущая реплика.
void clearOldBlocks();
/// TODO Удаление старых quorum/failed_parts
};
......
......@@ -63,6 +63,12 @@ private:
/// Отметить в ZooKeeper, что эта реплика сейчас активна.
void activateReplica();
/// Удалить куски, для которых кворум пофейлился (за то время, когда реплика была неактивной).
void removeFailedQuorumParts();
/// Если есть недостигнутый кворум, и у нас есть кусок, то добавить эту реплику в кворум.
void updateQuorumIfWeHavePart();
void partialShutdown();
/// Запретить запись в таблицу и завершить все фоновые потоки.
......
......@@ -363,6 +363,9 @@ private:
*/
void checkPartAndAddToZooKeeper(const MergeTreeData::DataPartPtr & part, zkutil::Ops & ops, String name_override = "");
/// Кладет в ops действия, удаляющие кусок из ZooKeeper.
void removePartFromZooKeeper(const String & part_name, zkutil::Ops & ops);
/// Убирает кусок из ZooKeeper и добавляет в очередь задание скачать его. Предполагается это делать с битыми кусками.
void removePartAndEnqueueFetch(const String & part_name);
......@@ -429,6 +432,10 @@ private:
*/
void fetchPart(const String & part_name, const String & replica_path, bool to_detached, size_t quorum);
/** При отслеживаемом кворуме - добавить реплику в кворум для куска.
*/
void updateQuorum(const String & part_name);
AbandonableLockInZooKeeper allocateBlockNumber(const String & month_name);
/** Дождаться, пока все реплики, включая эту, выполнят указанное действие из лога.
......
......@@ -71,9 +71,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldParts()
LOG_DEBUG(log, "Removing " << part->name);
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Remove(storage.replica_path + "/parts/" + part->name + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(storage.replica_path + "/parts/" + part->name + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(storage.replica_path + "/parts/" + part->name, -1));
storage.removePartFromZooKeeper(part->name, ops);
auto code = zookeeper->tryMulti(ops);
if (code != ZOK)
LOG_WARNING(log, "Couldn't remove " << part->name << " from ZooKeeper: " << zkutil::ZooKeeper::error2string(code));
......
#include <DB/IO/Operators.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
namespace DB
......@@ -110,7 +111,9 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
{
try
{
removeFailedQuorumParts();
activateReplica();
updateQuorumIfWeHavePart();
storage.leader_election = new zkutil::LeaderElection(
storage.zookeeper_path + "/leader_election",
......@@ -165,6 +168,54 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
}
void ReplicatedMergeTreeRestartingThread::removeFailedQuorumParts()
{
auto zookeeper = storage.getZooKeeper();
Strings failed_parts;
if (!zookeeper->tryGetChildren(storage.zookeeper_path + "/quorum/failed_parts", failed_parts))
return;
for (auto part_name : failed_parts)
{
auto part = storage.data.getPartIfExists(part_name);
if (part)
{
LOG_DEBUG(log, "Found part " << part_name << " with failed quorum. Moving to detached. This shouldn't happen often.");
zkutil::Ops ops;
storage.removePartFromZooKeeper(part_name, ops);
auto code = zookeeper->tryMulti(ops);
if (code == ZNONODE)
LOG_WARNING(log, "Part " << part_name << " with failed quorum is not in ZooKeeper. This shouldn't happen often.");
storage.data.renameAndDetachPart(part, "noquorum");
}
}
}
void ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart()
{
auto zookeeper = storage.getZooKeeper();
String quorum_str;
if (zookeeper->tryGet(storage.zookeeper_path + "/quorum/status", quorum_str))
{
ReplicatedMergeTreeQuorumEntry quorum_entry;
quorum_entry.fromString(quorum_str);
if (!quorum_entry.replicas.count(storage.replica_name)
&& zookeeper->exists(storage.replica_path + "/parts/" + quorum_entry.part_name))
{
LOG_WARNING(log, "We have part " << quorum_entry.part_name
<< " but we is not in quorum. Updating quorum. This shouldn't happen often.");
storage.updateQuorum(quorum_entry.part_name);
}
}
}
void ReplicatedMergeTreeRestartingThread::activateReplica()
{
auto host_port = storage.context.getInterserverIOAddress();
......@@ -178,18 +229,30 @@ void ReplicatedMergeTreeRestartingThread::activateReplica()
<< "port: " << host_port.second << '\n';
}
String is_active_path = storage.replica_path + "/is_active";
/** Если нода отмечена как активная, но отметка сделана в этом же экземпляре, удалим ее.
* Такое возможно только при истечении сессии в ZooKeeper.
* Здесь есть небольшой race condition (можем удалить не ту ноду, для которой сделали tryGet),
* но он крайне маловероятен при нормальном использовании.
*/
String data;
if (zookeeper->tryGet(storage.replica_path + "/is_active", data) && data == active_node_identifier)
zookeeper->tryRemove(storage.replica_path + "/is_active");
Stat stat;
bool has_is_active = zookeeper->tryGet(is_active_path, data, &stat);
if (has_is_active && data == active_node_identifier)
{
auto code = zookeeper->tryRemove(is_active_path, stat.version);
if (code == ZBADVERSION)
throw Exception("Another instance of replica " + storage.replica_path + " was created just now."
" You shouldn't run multiple instances of same replica. You need to check configuration files.",
ErrorCodes::REPLICA_IS_ALREADY_ACTIVE);
if (code != ZOK && code != ZNONODE)
throw zkutil::KeeperException(code, is_active_path);
}
/// Одновременно объявим, что эта реплика активна, и обновим хост.
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Create(storage.replica_path + "/is_active",
ops.push_back(new zkutil::Op::Create(is_active_path,
active_node_identifier, zookeeper->getDefaultACL(), zkutil::CreateMode::Ephemeral));
ops.push_back(new zkutil::Op::SetData(storage.replica_path + "/host", address, -1));
......@@ -208,7 +271,7 @@ void ReplicatedMergeTreeRestartingThread::activateReplica()
/// current_zookeeper живёт в течение времени жизни replica_is_active_node,
/// так как до изменения current_zookeeper, объект replica_is_active_node уничтожается в методе partialShutdown.
storage.replica_is_active_node = zkutil::EphemeralNodeHolder::existing(storage.replica_path + "/is_active", *storage.current_zookeeper);
storage.replica_is_active_node = zkutil::EphemeralNodeHolder::existing(is_active_path, *storage.current_zookeeper);
}
......
......@@ -601,9 +601,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
LOG_ERROR(log, "Removing unexpectedly merged local part from ZooKeeper: " << name);
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name, -1));
removePartFromZooKeeper(name, ops);
zookeeper->multi(ops);
}
......@@ -620,9 +618,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
/// Полагаемся, что это происходит до загрузки очереди (loadQueue).
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name, -1));
removePartFromZooKeeper(name, ops);
ops.push_back(new zkutil::Op::Create(
replica_path + "/queue/queue-", log_entry.toString(), zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential));
zookeeper->multi(ops);
......@@ -888,7 +884,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
if (containing_part && zookeeper->exists(replica_path + "/parts/" + containing_part->name))
{
if (!(entry.type == LogEntry::GET_PART && entry.source_replica == replica_name))
LOG_DEBUG(log, "Skipping action for part " + entry.new_part_name + " - part already exists");
LOG_DEBUG(log, "Skipping action for part " << entry.new_part_name << " - part already exists.");
return true;
}
}
......@@ -896,6 +892,13 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
if (entry.type == LogEntry::GET_PART && entry.source_replica == replica_name)
LOG_WARNING(log, "Part " << entry.new_part_name << " from own log doesn't exist.");
/// Возможно, этот кусок нам не нужен, так как при записи с кворумом, кворум пофейлился (см. ниже про /quorum/failed_parts).
if (entry.quorum && zookeeper->exists(zookeeper_path + "/quorum/failed_parts"))
{
LOG_DEBUG(log, "Skipping action for part " << entry.new_part_name << " because quorum for that part was failed.");
return true; /// NOTE Удаление из virtual_parts не делается, но оно нужно только для мерджей.
}
bool do_fetch = false;
if (entry.type == LogEntry::GET_PART)
......@@ -1021,8 +1024,113 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
if (replica.empty())
{
ProfileEvents::increment(ProfileEvents::ReplicatedPartFailedFetches);
throw Exception("No active replica has part " + entry.new_part_name, ErrorCodes::NO_REPLICA_HAS_PART);
/** Если кусок должен быть записан с кворумом, и кворум ещё недостигнут,
* то (из-за того, что кусок невозможно прямо сейчас скачать),
* кворумную запись следует считать безуспешной.
* TODO Сложный код, вынести отдельно.
*/
if (entry.quorum)
{
if (entry.type != LogEntry::GET_PART)
throw Exception("Logical error: log entry with quorum but type is not GET_PART", ErrorCodes::LOGICAL_ERROR);
LOG_DEBUG(log, "No active replica has part " << entry.new_part_name << " which needs to be written with quorum."
" Will try to mark that quorum as failed.");
/** Атомарно:
* - если реплики не стали активными;
* - если существует узел quorum с этим куском;
* - удалим узел quorum;
* - установим nonincrement_block_numbers, чтобы разрешить мерджи через номер потерянного куска;
* - добавим кусок в список quorum/failed_parts.
*
* Если что-то изменится, то ничего не сделаем - попадём сюда снова в следующий раз.
*/
/** Соберём версии узлов host у реплик.
* Когда реплика становится активной, она в той же транзакции (с созданием is_active), меняет значение host.
* Это позволит проследить, что реплики не стали активными.
*/
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
zkutil::Ops ops;
for (size_t i = 0, size = replicas.size(); i < size; ++i)
{
Stat stat;
String path = zookeeper_path + "/replicas/" + replicas[i] + "/host";
zookeeper->get(path, &stat);
ops.push_back(new zkutil::Op::Check(path, stat.version));
}
/// Проверяем, что пока мы собирали версии, не ожила реплика с нужным куском.
replica = findReplicaHavingPart(entry.new_part_name, true);
/// Также за это время могла быть создана совсем новая реплика. Но если на старых не появится куска, то на новой его тоже не может быть.
if (replica.empty())
{
Stat quorum_stat;
String quorum_path = zookeeper_path + "/quorum/status";
String quorum_str = zookeeper->get(quorum_path, &quorum_stat);
ReplicatedMergeTreeQuorumEntry quorum_entry;
quorum_entry.fromString(quorum_str);
if (quorum_entry.part_name == entry.new_part_name)
{
ops.push_back(new zkutil::Op::Remove(quorum_path, quorum_stat.version));
const auto partition_str = entry.new_part_name.substr(0, 6);
ActiveDataPartSet::Part part_info;
ActiveDataPartSet::parsePartName(entry.new_part_name, part_info);
if (part_info.left != part_info.right)
throw Exception("Logical error: log entry with quorum for part covering more than one block number",
ErrorCodes::LOGICAL_ERROR);
ops.push_back(new zkutil::Op::Create(
zookeeper_path + "/nonincrement_block_numbers/" + partition_str + "/block-" + padIndex(part_info.left),
"",
zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
zookeeper_path + "/quorum/failed_parts/" + entry.new_part_name,
"",
zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
auto code = zookeeper->tryMulti(ops);
if (code == ZOK)
{
LOG_DEBUG(log, "Marked quorum for part " << entry.new_part_name << " as failed.");
return true; /// NOTE Удаление из virtual_parts не делается, но оно нужно только для мерджей.
}
else if (code == ZBADVERSION || code == ZNONODE || code == ZNODEEXISTS)
{
LOG_DEBUG(log, "State was changed or isn't expected when trying to mark quorum for part "
<< entry.new_part_name << " as failed.");
}
else
throw zkutil::KeeperException(code);
}
else
{
LOG_WARNING(log, "No active replica has part " << entry.new_part_name
<< ", but that part needs quorum and /quorum/status contains entry about another part " << quorum_entry.part_name
<< ". It means that part was successfully written to " << entry.quorum << " replicas, but then all of them goes offline."
<< " Or it is a bug.");
}
}
}
if (replica.empty())
{
ProfileEvents::increment(ProfileEvents::ReplicatedPartFailedFetches);
throw Exception("No active replica has part " + entry.new_part_name, ErrorCodes::NO_REPLICA_HAS_PART);
}
}
fetchPart(entry.new_part_name, zookeeper_path + "/replicas/" + replica, false, entry.quorum);
......@@ -1159,9 +1267,7 @@ void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr
data.renameAndDetachPart(part);
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + part->name + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + part->name + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + part->name, -1));
removePartFromZooKeeper(part->name, ops);
zookeeper->multi(ops);
/// Если кусок нужно удалить, надежнее удалить директорию после изменений в ZooKeeper.
......@@ -1759,6 +1865,16 @@ void StorageReplicatedMergeTree::alterThread()
}
void StorageReplicatedMergeTree::removePartFromZooKeeper(const String & part_name, zkutil::Ops & ops)
{
String part_path = replica_path + "/parts/" + part_name;
ops.push_back(new zkutil::Op::Remove(part_path + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(part_path + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(part_path, -1));
}
void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_name)
{
auto zookeeper = getZooKeeper();
......@@ -1775,9 +1891,9 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n
ops.push_back(new zkutil::Op::Create(
replica_path + "/queue/queue-", log_entry->toString(), zookeeper->getDefaultACL(),
zkutil::CreateMode::PersistentSequential));
ops.push_back(new zkutil::Op::Remove(part_path + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(part_path + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(part_path, -1));
removePartFromZooKeeper(part_name, ops);
auto results = zookeeper->multi(ops);
{
......@@ -2111,17 +2227,15 @@ String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_nam
/** Если для куска отслеживается кворум, то обновить информацию о нём в ZK.
*/
static void updateQuorum(
zkutil::ZooKeeperPtr & zookeeper,
const String & zookeeper_path,
const String & replica_name,
const String & part_name,
size_t quorum)
void StorageReplicatedMergeTree::updateQuorum(const String & part_name)
{
if (!quorum)
return;
auto zookeeper = getZooKeeper();
/// Информация, на какие реплики был добавлен кусок, если кворум ещё не достигнут.
const String quorum_status_path = zookeeper_path + "/quorum/status";
/// Имя предыдущего куска, для которого был достигнут кворум.
const String quorum_last_part_path = zookeeper_path + "/quorum/last_part";
String value;
zkutil::Stat stat;
......@@ -2137,16 +2251,16 @@ static void updateQuorum(
break;
}
if (quorum_entry.required_number_of_replicas != quorum)
throw Exception("Logical error: quorum size in log entry is different than quorum size in node /quorum/status",
ErrorCodes::LOGICAL_ERROR);
quorum_entry.replicas.insert(replica_name);
if (quorum_entry.replicas.size() >= quorum_entry.required_number_of_replicas)
{
/// Кворум достигнут. Удаляем узел.
auto code = zookeeper->tryRemove(quorum_status_path, stat.version);
/// Кворум достигнут. Удаляем узел, а также обновляем информацию о последнем куске, который был успешно записан с кворумом.
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Remove(quorum_status_path, stat.version));
ops.push_back(new zkutil::Op::SetData(quorum_last_part_path, part_name, -1));
auto code = zookeeper->tryMulti(ops);
if (code == ZOK)
{
......@@ -2227,9 +2341,10 @@ void StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
transaction.commit();
/** Если для этого куска отслеживается кворум, то надо его обновить.
* TODO Обработка в случае неизвестной ошибки, потери сессии, при перезапуске сервера.
* Если не успеем, в случае потери сессии, при перезапуске сервера - см. метод ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart.
*/
updateQuorum(zookeeper, zookeeper_path, replica_name, part_name, quorum);
if (quorum)
updateQuorum(part_name);
merge_selecting_event.set();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册