提交 d53f1734 编写于 作者: A Alexey Milovidov

dbms: quorum inserts: development [#METR-16779].

上级 f395a408
......@@ -288,6 +288,7 @@ namespace ErrorCodes
INCORRECT_INDEX = 282,
UNKNOWN_GLOBAL_SUBQUERIES_METHOD = 284,
TOO_LESS_LIVE_REPLICAS = 285,
UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE = 286,
KEEPER_EXCEPTION = 999,
POCO_EXCEPTION = 1000,
......
......@@ -2,6 +2,7 @@
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Storages/MergeTree/AbandonableLockInZooKeeper.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
#include <DB/DataStreams/IBlockOutputStream.h>
#include <DB/IO/Operators.h>
......@@ -14,7 +15,12 @@ class ReplicatedMergeTreeBlockOutputStream : public IBlockOutputStream
public:
ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_, const String & insert_id_, size_t quorum_)
: storage(storage_), insert_id(insert_id_), quorum(quorum_),
log(&Logger::get(storage.data.getLogName() + " (Replicated OutputStream)")) {}
log(&Logger::get(storage.data.getLogName() + " (Replicated OutputStream)"))
{
/// Значение кворума 1 имеет такой же смысл, как если он отключён.
if (quorum == 1)
quorum = 0;
}
void writePrefix() override
{
......@@ -29,8 +35,9 @@ public:
assertSessionIsNotExpired(zookeeper);
/** Если запись с кворумом, то проверим, что требуемое количество реплик сейчас живо,
* а также что у нас есть все предыдущие куски, которые были записаны с кворумом.
* а также что для всех предыдущих кусков, для которых требуется кворум, этот кворум достигнут.
*/
String quorum_status_path = storage.zookeeper_path + "/quorum/status";
if (quorum)
{
/// Список живых реплик. Все они регистрируют эфемерную ноду для leader_election.
......@@ -55,11 +62,21 @@ public:
ErrorCodes::TOO_LESS_LIVE_REPLICAS);
}
/// Разумеется, реплики могут перестать быть живыми после этой проверки. Это не проблема.
/** Есть ли у нас последний кусок, записанный с кворумом?
* В ZK будем иметь следующую структуру директорий:
/** Достигнут ли кворум для последнего куска, для которого нужен кворум?
* Запись всех кусков с включенным кворумом линейно упорядочена.
* Это значит, что в любой момент времени может быть только один кусок,
* для которого нужен, но ещё не достигнут кворум.
* Информация о таком куске будет расположена в ноде /quorum/status.
* Если кворум достигнут, то нода удаляется.
*/
String quorum_status;
bool quorum_unsatisfied = zookeeper->tryGet(quorum_status_path, quorum_status);
if (quorum_unsatisfied)
throw Exception("Quorum for previous write has not been satisfied yet. Status: " + quorum_status, ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE);
/// Обе проверки неявно делаются и позже (иначе был бы race condition).
}
auto part_blocks = storage.writer.splitBlockIntoParts(block);
......@@ -83,48 +100,96 @@ public:
/// NOTE: Если такая дедупликация не нужна, можно вместо этого оставлять block_id пустым.
/// Можно для этого сделать настройку или синтаксис в запросе (например, ID=null).
if (block_id.empty())
{
block_id = part->checksums.summaryDataChecksum();
if (block_id.empty())
throw Exception("Logical error: block_id is empty.", ErrorCodes::LOGICAL_ERROR);
}
LOG_DEBUG(log, "Wrote block " << part_number << " with ID " << block_id << ", " << current_block.block.rows() << " rows");
StorageReplicatedMergeTree::LogEntry log_entry;
log_entry.type = StorageReplicatedMergeTree::LogEntry::GET_PART;
log_entry.source_replica = storage.replica_name;
log_entry.new_part_name = part_name;
log_entry.quorum = quorum;
/// Одновременно добавим информацию о куске во все нужные места в ZooKeeper и снимем block_number_lock.
/// Информация о блоке.
zkutil::Ops ops;
if (!block_id.empty())
{
ops.push_back(new zkutil::Op::Create(
ops.push_back(
new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id,
"",
zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
ops.push_back(
new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id + "/columns",
part->columns.toString(),
zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
ops.push_back(
new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id + "/checksums",
part->checksums.toString(),
zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
ops.push_back(
new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id + "/number",
toString(part_number),
zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
}
/// Информация о куске, в данных реплики.
storage.checkPartAndAddToZooKeeper(part, ops, part_name);
/// Лог репликации.
ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/log/log-",
log_entry.toString(),
zookeeper->getDefaultACL(),
zkutil::CreateMode::PersistentSequential));
/// Удаление информации о том, что номер блока используется для записи.
block_number_lock.getUnlockOps(ops);
/** Если нужен кворум - создание узла, в котором отслеживается кворум.
* (Если такой узел уже существует - значит кто-то успел одновременно сделать другую кворумную запись, но для неё кворум ещё не достигнут.
* Делать в это время следующую кворумную запись нельзя.)
*/
if (quorum)
{
static std::once_flag once_flag;
std::call_once(once_flag, [&]
{
zookeeper->createIfNotExists(storage.zookeeper_path + "/quorum", "");
});
ReplicatedMergeTreeQuorumEntry quorum_entry;
quorum_entry.part_name = part_name;
quorum_entry.required_number_of_replicas = quorum;
quorum_entry.replicas.insert(storage.replica_name);
/** В данный момент, этот узел будет содержать информацию о том, что текущая реплика получила кусок.
* Когда другие реплики будут получать этот кусок (обычным способом, обрабатывая лог репликации),
* они будут добавлять себя в содержимое этого узла.
* Когда в нём будет информация о quorum количестве реплик, этот узел удаляется,
* что говорит о том, что кворум достигнут.
*/
ops.push_back(
new zkutil::Op::Create(
quorum_status_path,
quorum_entry.toString(),
zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
}
MergeTreeData::Transaction transaction; /// Если не получится добавить кусок в ZK, снова уберем его из рабочего набора.
storage.data.renameTempPartAndAdd(part, nullptr, &transaction);
......@@ -154,8 +219,17 @@ public:
part.reset();
transaction.rollback();
}
else if (zookeeper->exists(quorum_status_path))
{
part.reset();
transaction.rollback();
throw Exception("Another quorum insert has been already started", ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE);
}
else
{
/// Сюда можем попасть также, если узел с кворумом существовал, но потом быстро был удалён.
throw Exception("Unexpected ZNODEEXISTS while adding block " + toString(part_number) + " with ID " + block_id + ": "
+ zkutil::ZooKeeper::error2string(code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
}
......@@ -180,6 +254,14 @@ public:
throw;
}
if (quorum)
{
/// Дожидаемся достижения кворума. TODO Настраиваемый таймаут.
LOG_TRACE(log, "Waiting for quorum");
zookeeper->waitForDisappear(quorum_status_path);
LOG_TRACE(log, "Quorum satisfied");
}
}
}
......
#pragma once
#include <DB/Core/Types.h>
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/ReadBufferFromString.h>
#include <DB/IO/WriteBuffer.h>
#include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/Operators.h>
#include <zkutil/ZooKeeper.h>
namespace DB
{
/** Для реализации функциональности "кворумная запись".
* Информация о том, на каких репликах появился вставленный кусок данных,
* и на скольких репликах он должен быть.
*/
struct ReplicatedMergeTreeQuorumEntry
{
String part_name;
size_t required_number_of_replicas;
std::set<String> replicas;
void writeText(WriteBuffer & out) const
{
out << "version: 1\n"
<< "part_name: " << part_name << "\n"
<< "required_number_of_replicas: " << required_number_of_replicas << "\n"
<< "actual_number_of_replicas: " << replicas.size() << "\n"
<< "replicas:\n";
for (const auto & replica : replicas)
out << escape << replica << "\n";
}
void readText(ReadBuffer & in)
{
size_t actual_number_of_replicas = 0;
in >> "version: 1\n"
>> "part_name: " >> part_name >> "\n"
>> "required_number_of_replicas: " >> required_number_of_replicas >> "\n"
>> "actual_number_of_replicas: " >> actual_number_of_replicas >> "\n"
>> "replicas:\n";
for (size_t i = 0; i < actual_number_of_replicas; ++i)
{
String replica;
in >> escape >> replica >> "\n";
replicas.insert(replica);
}
}
String toString() const
{
String res;
{
WriteBufferFromString out(res);
writeText(out);
}
return res;
}
void fromString(const String & str)
{
ReadBufferFromString in(str);
readText(in);
}
};
}
......@@ -379,8 +379,9 @@ private:
/** Скачать указанный кусок с указанной реплики.
* Если to_detached, то кусок помещается в директорию detached.
* Если quorum != 0, то обновляется узел для отслеживания кворума.
*/
void fetchPart(const String & part_name, const String & replica_path, bool to_detached = false);
void fetchPart(const String & part_name, const String & replica_path, bool to_detached, size_t quorum);
AbandonableLockInZooKeeper allocateBlockNumber(const String & month_name);
......
......@@ -3,6 +3,7 @@
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h>
#include <DB/Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
#include <DB/Storages/MergeTree/MergeTreePartChecker.h>
#include <DB/Storages/MergeTree/MergeList.h>
#include <DB/Storages/MergeTree/MergeTreeWhereOptimizer.h>
......@@ -921,10 +922,13 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
if (replica.empty())
{
ProfileEvents::increment(ProfileEvents::ReplicatedPartFailedFetches);
// TODO Проверить, может быть куска нет ни на одной живой или мёртвой реплике, и он исчез навсегда?
throw Exception("No active replica has part " + entry.new_part_name, ErrorCodes::NO_REPLICA_HAS_PART);
}
fetchPart(entry.new_part_name, zookeeper_path + "/replicas/" + replica);
fetchPart(entry.new_part_name, zookeeper_path + "/replicas/" + replica, false, entry.quorum);
if (entry.type == LogEntry::MERGE_PARTS)
ProfileEvents::increment(ProfileEvents::ReplicatedPartFetchesOfMerged);
......@@ -1271,6 +1275,24 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
String month_name = left->name.substr(0, 6);
auto zookeeper = getZooKeeper();
/// Нельзя сливать куски, среди которых находится кусок, для которого неудовлетворён кворум.
/// Замечание: теоретически, это можно было бы разрешить. Но это сделает логику более сложной.
String quorum_node_value;
if (zookeeper->tryGet(zookeeper_path + "/quorum/status", quorum_node_value))
{
ReplicatedMergeTreeQuorumEntry quorum_entry;
quorum_entry.fromString(quorum_node_value);
ActiveDataPartSet::Part part_info;
ActiveDataPartSet::parsePartName(quorum_entry.part_name, part_info);
if (part_info.left != part_info.right)
throw Exception("Logical error: part written with quorum covers more than one block numbers", ErrorCodes::LOGICAL_ERROR);
if (left->right <= part_info.left && right->left >= part_info.right)
return false;
}
/// Можно слить куски, если все номера между ними заброшены - не соответствуют никаким блокам.
/// Номера до RESERVED_BLOCK_NUMBERS всегда не соответствуют никаким блокам.
for (Int64 number = std::max(RESERVED_BLOCK_NUMBERS, left->right + 1); number <= right->left - 1; ++number)
......@@ -1908,7 +1930,81 @@ String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_nam
}
void StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & replica_path, bool to_detached)
/** Если для куска отслеживается кворум, то обновить информацию о нём в ZK.
*/
static void updateQuorum(
zkutil::ZooKeeperPtr & zookeeper,
const String & zookeeper_path,
const String & replica_name,
const String & part_name,
size_t quorum)
{
if (!quorum)
return;
const String quorum_status_path = zookeeper_path + "/quorum/status";
String value;
zkutil::Stat stat;
/// Если узла нет, значит по всем кворумным INSERT-ам уже был достигнут кворум, и ничего делать не нужно.
while (zookeeper->tryGet(quorum_status_path, value, &stat))
{
ReplicatedMergeTreeQuorumEntry quorum_entry;
quorum_entry.fromString(value);
if (quorum_entry.part_name != part_name)
{
/// Кворум уже был достигнут. Более того, уже начался другой INSERT с кворумом.
break;
}
if (quorum_entry.required_number_of_replicas != quorum)
throw Exception("Logical error: quorum size in log entry is different than quorum size in node /quorum/status",
ErrorCodes::LOGICAL_ERROR);
quorum_entry.replicas.insert(replica_name);
if (quorum_entry.replicas.size() >= quorum_entry.required_number_of_replicas)
{
/// Кворум достигнут. Удаляем узел.
auto code = zookeeper->tryRemove(quorum_status_path, stat.version);
if (code == ZNONODE)
{
/// Кворум уже был достигнут.
break;
}
else if (code == ZBADVERSION)
{
/// Узел успели обновить. Надо заново его прочитать и повторить все действия.
continue;
}
else
throw zkutil::KeeperException(code, quorum_status_path);
}
else
{
/// Обновляем узел, прописывая туда на одну реплику больше.
auto code = zookeeper->trySet(quorum_status_path, quorum_entry.toString(), stat.version);
if (code == ZNONODE)
{
/// Кворум уже был достигнут.
break;
}
else if (code == ZBADVERSION)
{
/// Узел успели обновить. Надо заново его прочитать и повторить все действия.
continue;
}
else
throw zkutil::KeeperException(code, quorum_status_path);
}
}
}
void StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & replica_path, bool to_detached, size_t quorum)
{
auto zookeeper = getZooKeeper();
......@@ -1942,6 +2038,12 @@ void StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
zookeeper->multi(ops);
transaction.commit();
/** Если для этого куска отслеживается кворум, то надо его обновить.
* TODO Обработка в случае неизвестной ошибки, потери сессии, при перезапуске сервера.
*/
updateQuorum(zookeeper, zookeeper_path, replica_name, part_name, quorum);
merge_selecting_event.set();
for (const auto & removed_part : removed_parts)
......@@ -2636,18 +2738,7 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
LOG_DEBUG(log, "Waiting for " << queue_entry_to_wait_for << " to disappear from " << replica << " queue");
/// Третье - дождемся, пока запись исчезнет из очереди реплики.
while (true)
{
zkutil::EventPtr event = new Poco::Event;
String unused;
/// get вместо exists, чтобы не утек watch, если ноды уже нет.
if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/queue/" + queue_entry_to_wait_for, unused, nullptr, event))
break;
event->wait();
}
zookeeper->waitForDisappear(zookeeper_path + "/replicas/" + replica + "/queue/" + queue_entry_to_wait_for);
}
......@@ -2865,7 +2956,7 @@ void StorageReplicatedMergeTree::fetchPartition(const Field & partition, const S
{
try
{
fetchPart(part, best_replica_path, true);
fetchPart(part, best_replica_path, true, 0);
}
catch (const DB::Exception & e)
{
......
......@@ -167,6 +167,10 @@ public:
*/
void tryRemoveRecursive(const std::string & path);
/** Подождать, пока нода перестанет существовать или вернуть сразу, если нода не существует.
*/
void waitForDisappear(const std::string & path);
/** Асинхронный интерфейс (реализовано небольшое подмножество операций).
*
......
......@@ -558,6 +558,23 @@ void ZooKeeper::tryRemoveRecursive(const std::string & path)
tryRemove(path);
}
void ZooKeeper::waitForDisappear(const std::string & path)
{
while (true)
{
zkutil::EventPtr event = new Poco::Event;
std::string unused;
/// get вместо exists, чтобы не утек watch, если ноды уже нет.
if (!tryGet(path, unused, nullptr, event))
break;
event->wait();
}
}
ZooKeeper::~ZooKeeper()
{
LOG_INFO(&Logger::get("~ZooKeeper"), "Closing ZooKeeper session");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册