From 6d50fb4b4448e095debfe32188d5abaf394ca41d Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 24 Jan 2016 08:00:24 +0300 Subject: [PATCH] dbms: fixed quorum inserts; improved performance of ordinary inserts [#METR-16779]. --- dbms/include/DB/Interpreters/Settings.h | 1 + .../MergeTree/AbandonableLockInZooKeeper.h | 5 + .../ReplicatedMergeTreeBlockOutputStream.h | 4 +- .../DB/Storages/StorageReplicatedMergeTree.h | 16 ++- dbms/src/Core/ErrorCodes.cpp | 1 + .../ReplicatedMergeTreeBlockOutputStream.cpp | 105 +++++++++++++++--- .../Storages/StorageReplicatedMergeTree.cpp | 58 +++++++++- libs/libzkutil/include/zkutil/ZooKeeper.h | 3 +- libs/libzkutil/src/ZooKeeper.cpp | 8 +- 9 files changed, 174 insertions(+), 27 deletions(-) diff --git a/dbms/include/DB/Interpreters/Settings.h b/dbms/include/DB/Interpreters/Settings.h index 4606eeecfe..a3904ce625 100644 --- a/dbms/include/DB/Interpreters/Settings.h +++ b/dbms/include/DB/Interpreters/Settings.h @@ -171,6 +171,7 @@ struct Settings \ /** Для запросов INSERT в реплицируемую таблицу, ждать записи на указанное число реплик и лианеризовать добавление данных. 0 - отключено. */ \ M(SettingUInt64, insert_quorum, 0) \ + M(SettingMilliseconds, insert_quorum_timeout, 600000) \ /** Для запросов SELECT из реплицируемой таблицы, кидать исключение, если на реплике нет куска, записанного с кворумом; \ * не читать куски, которые ещё не были записаны с кворумом. */ \ M(SettingUInt64, select_sequential_consistency, 0) \ diff --git a/dbms/include/DB/Storages/MergeTree/AbandonableLockInZooKeeper.h b/dbms/include/DB/Storages/MergeTree/AbandonableLockInZooKeeper.h index e38d768ea8..bfe67caa9c 100644 --- a/dbms/include/DB/Storages/MergeTree/AbandonableLockInZooKeeper.h +++ b/dbms/include/DB/Storages/MergeTree/AbandonableLockInZooKeeper.h @@ -8,6 +8,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + /** Примитив синхронизации. Работает следующим образом: * При создании создает неэфемерную инкрементную ноду и помечает ее как заблокированную (LOCKED). * unlock() разблокирует ее (UNLOCKED). diff --git a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h index f90df18e55..978ffe935e 100644 --- a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h +++ b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h @@ -12,7 +12,8 @@ class StorageReplicatedMergeTree; class ReplicatedMergeTreeBlockOutputStream : public IBlockOutputStream { public: - ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_, const String & insert_id_, size_t quorum_); + ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_, const String & insert_id_, + size_t quorum_, size_t quorum_timeout_ms_); void writePrefix() override; void write(const Block & block) override; @@ -21,6 +22,7 @@ private: StorageReplicatedMergeTree & storage; String insert_id; size_t quorum; + size_t quorum_timeout_ms; size_t block_index = 0; Logger * log; diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index d2d0a5fd25..5b37efadf1 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -358,6 +358,12 @@ private: */ void checkPartAndAddToZooKeeper(const MergeTreeData::DataPartPtr & part, zkutil::Ops & ops, String name_override = ""); + /** Исходит из допущения, что такого куска ещё нигде нет (Это обеспечено, если номер куска выделен с помощью AbandonableLock). + * Кладет в ops действия, добавляющие данные о куске в ZooKeeper. + * Вызывать под TableStructureLock. + */ + void addNewPartToZooKeeper(const MergeTreeData::DataPartPtr & part, zkutil::Ops & ops, String name_override = ""); + /// Кладет в ops действия, удаляющие кусок из ZooKeeper. void removePartFromZooKeeper(const String & part_name, zkutil::Ops & ops); @@ -446,8 +452,14 @@ private: */ void releasePartitionMergeLock(const std::string & partition_name); + + /// Проверить наличие узла в ZK. Если он есть - запомнить эту информацию, и затем сразу отвечать true. + std::unordered_set existing_nodes_cache; + std::mutex existing_nodes_cache_mutex; + bool existsNodeCached(const std::string & path); + + /// Перешардирование. -private: struct ReplicaSpaceInfo { long double factor = 0.0; @@ -469,7 +481,6 @@ private: using PartitionToMergeLock = std::map; -private: /** Проверяет, что структуры локальной и реплицируемых таблиц совпадают. */ void enforceShardsConsistency(const WeightedZooKeeperPaths & weighted_zookeeper_paths); @@ -483,7 +494,6 @@ private: */ bool checkSpaceForResharding(const ReplicaToSpaceInfo & replica_to_space_info, size_t partition_size) const; -private: std::mutex mutex_partition_to_merge_lock; PartitionToMergeLock partition_to_merge_lock; }; diff --git a/dbms/src/Core/ErrorCodes.cpp b/dbms/src/Core/ErrorCodes.cpp index a71b22531b..d261b50028 100644 --- a/dbms/src/Core/ErrorCodes.cpp +++ b/dbms/src/Core/ErrorCodes.cpp @@ -323,6 +323,7 @@ namespace ErrorCodes extern const int RESHARDING_INVALID_PARAMETERS = 317; extern const int INVALID_SHARD_WEIGHT = 318; extern const int SHARD_DOESNT_REFERENCE_TABLE = 319; + extern const int UNKNOWN_STATUS_OF_INSERT = 320; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp index f47c0e061d..2c2f3f351c 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp @@ -16,12 +16,14 @@ namespace ErrorCodes extern const int CHECKSUM_DOESNT_MATCH; extern const int UNEXPECTED_ZOOKEEPER_ERROR; extern const int NO_ZOOKEEPER; + extern const int READONLY; + extern const int UNKNOWN_STATUS_OF_INSERT; } ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream( - StorageReplicatedMergeTree & storage_, const String & insert_id_, size_t quorum_) - : storage(storage_), insert_id(insert_id_), quorum(quorum_), + StorageReplicatedMergeTree & storage_, const String & insert_id_, size_t quorum_, size_t quorum_timeout_ms_) + : storage(storage_), insert_id(insert_id_), quorum(quorum_), quorum_timeout_ms(quorum_timeout_ms_), log(&Logger::get(storage.data.getLogName() + " (Replicated OutputStream)")) { /// Значение кворума 1 имеет такой же смысл, как если он отключён. @@ -55,17 +57,28 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block) assertSessionIsNotExpired(zookeeper); /** Если запись с кворумом, то проверим, что требуемое количество реплик сейчас живо, - * а также что для всех предыдущих кусков, для которых требуется кворум, этот кворум достигнут. - */ + * а также что для всех предыдущих кусков, для которых требуется кворум, этот кворум достигнут. + * А также будем проверять, что во время вставки, реплика не была переинициализирована или выключена (по значению узла is_active). + * TODO Слишком сложная логика, можно сделать лучше. + */ String quorum_status_path = storage.zookeeper_path + "/quorum/status"; + String is_active_node_value; + int is_active_node_version = -1; + int host_node_version = -1; if (quorum) { + zkutil::ZooKeeper::TryGetFuture quorum_status_future = zookeeper->asyncTryGet(quorum_status_path); + zkutil::ZooKeeper::TryGetFuture is_active_future = zookeeper->asyncTryGet(storage.replica_path + "/is_active"); + zkutil::ZooKeeper::TryGetFuture host_future = zookeeper->asyncTryGet(storage.replica_path + "/host"); + /// Список живых реплик. Все они регистрируют эфемерную ноду для leader_election. - auto live_replicas = zookeeper->getChildren(storage.zookeeper_path + "/leader_election"); - if (live_replicas.size() < quorum) + zkutil::Stat leader_election_stat; + zookeeper->get(storage.zookeeper_path + "/leader_election", &leader_election_stat); + + if (leader_election_stat.numChildren < static_cast(quorum)) throw Exception("Number of alive replicas (" - + toString(live_replicas.size()) + ") is less than requested quorum (" + toString(quorum) + ").", + + toString(leader_election_stat.numChildren) + ") is less than requested quorum (" + toString(quorum) + ").", ErrorCodes::TOO_LESS_LIVE_REPLICAS); /** Достигнут ли кворум для последнего куска, для которого нужен кворум? @@ -76,13 +89,21 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block) * Если кворум достигнут, то нода удаляется. */ - String quorum_status; - bool quorum_unsatisfied = zookeeper->tryGet(quorum_status_path, quorum_status); - - if (quorum_unsatisfied) - throw Exception("Quorum for previous write has not been satisfied yet. Status: " + quorum_status, ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE); + auto quorum_status = quorum_status_future.get(); + if (quorum_status.exists) + throw Exception("Quorum for previous write has not been satisfied yet. Status: " + quorum_status.value, ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE); /// Обе проверки неявно делаются и позже (иначе был бы race condition). + + auto is_active = is_active_future.get(); + auto host = host_future.get(); + + if (!is_active.exists || !host.exists) + throw Exception("Replica is not active right now", ErrorCodes::READONLY); + + is_active_node_value = is_active.value; + is_active_node_version = is_active.stat.version; + host_node_version = host.stat.version; } auto part_blocks = storage.writer.splitBlockIntoParts(block); @@ -95,7 +116,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block) String block_id = insert_id.empty() ? "" : insert_id + "__" + toString(block_index); String month_name = toString(DateLUT::instance().toNumYYYYMMDD(DayNum_t(current_block.min_date)) / 100); - AbandonableLockInZooKeeper block_number_lock = storage.allocateBlockNumber(month_name); + AbandonableLockInZooKeeper block_number_lock = storage.allocateBlockNumber(month_name); /// 2 RTT Int64 part_number = block_number_lock.getNumber(); @@ -161,7 +182,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block) zkutil::CreateMode::Persistent)); /// Информация о куске, в данных реплики. - storage.checkPartAndAddToZooKeeper(part, ops, part_name); + storage.addNewPartToZooKeeper(part, ops, part_name); /// Лог репликации. ops.push_back(new zkutil::Op::Create( @@ -197,6 +218,20 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block) quorum_entry.toString(), acl, zkutil::CreateMode::Persistent)); + + /// Удостоверяемся, что за время вставки, реплика не была переинициализирована или выключена (при завершении сервера). + ops.push_back( + new zkutil::Op::Check( + storage.replica_path + "/is_active", + is_active_node_version)); + + /// К сожалению, одной лишь проверки выше недостаточно, потому что узел is_active может удалиться и появиться заново с той же версией. + /// Но тогда изменится значение узла host. Будем проверять это. + /// Замечательно, что эти два узла меняются в одной транзакции (см. MergeTreeRestartingThread). + ops.push_back( + new zkutil::Op::Check( + storage.replica_path + "/host", + host_node_version)); } MergeTreeData::Transaction transaction; /// Если не получится добавить кусок в ZK, снова уберем его из рабочего набора. @@ -260,6 +295,9 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block) { transaction.commit(); storage.enqueuePartForCheck(part->name); + + /// Мы не знаем, были или не были вставлены данные. + throw Exception("Unknown status, client must retry. Reason: " + e.displayText(), ErrorCodes::UNKNOWN_STATUS_OF_INSERT); } throw; @@ -267,9 +305,44 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block) if (quorum) { - /// Дожидаемся достижения кворума. TODO Настраиваемый таймаут. + /// Дожидаемся достижения кворума. LOG_TRACE(log, "Waiting for quorum"); - zookeeper->waitForDisappear(quorum_status_path); + + try + { + while (true) + { + zkutil::EventPtr event = new Poco::Event; + + std::string value; + /// get вместо exists, чтобы не утек watch, если ноды уже нет. + if (!zookeeper->tryGet(quorum_status_path, value, nullptr, event)) + break; + + ReplicatedMergeTreeQuorumEntry quorum_entry(value); + + /// Если нода успела исчезнуть, а потом появиться снова уже для следующей вставки. + if (quorum_entry.part_name != part_name) + break; + + if (!event->tryWait(quorum_timeout_ms)) + throw Exception("Timeout while waiting for quorum"); + } + + /// А вдруг возможно, что текущая реплика в это время перестала быть активной и кворум помечен как неудавшийся, и удалён? + String value; + if (!zookeeper->tryGet(storage.replica_path + "/is_active", value, nullptr) + || value != is_active_node_value) + throw Exception("Replica become inactive while waiting for quorum"); + } + catch (...) + { + /// Мы не знаем, были или не были вставлены данные + /// - успели или не успели другие реплики скачать кусок и пометить кворум как выполненный. + throw Exception("Unknown status, client must retry. Reason: " + getCurrentExceptionMessage(false), + ErrorCodes::UNKNOWN_STATUS_OF_INSERT); + } + LOG_TRACE(log, "Quorum satisfied"); } } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index e562ff36b2..83e95eb7d0 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -860,6 +860,38 @@ void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(const MergeTreeData: } +void StorageReplicatedMergeTree::addNewPartToZooKeeper(const MergeTreeData::DataPartPtr & part, zkutil::Ops & ops, String part_name) +{ + auto zookeeper = getZooKeeper(); + + if (part_name.empty()) + part_name = part->name; + + check(part->columns); + + auto acl = zookeeper->getDefaultACL(); + + ops.push_back(new zkutil::Op::Check( + zookeeper_path + "/columns", + columns_version)); + ops.push_back(new zkutil::Op::Create( + replica_path + "/parts/" + part_name, + "", + acl, + zkutil::CreateMode::Persistent)); + ops.push_back(new zkutil::Op::Create( + replica_path + "/parts/" + part_name + "/columns", + part->columns.toString(), + acl, + zkutil::CreateMode::Persistent)); + ops.push_back(new zkutil::Op::Create( + replica_path + "/parts/" + part_name + "/checksums", + part->checksums.toString(), + acl, + zkutil::CreateMode::Persistent)); +} + + void StorageReplicatedMergeTree::pullLogsToQueue(zkutil::EventPtr next_update_event) { if (queue.pullLogsToQueue(getZooKeeper(), next_update_event)) @@ -2453,7 +2485,8 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(ASTPtr query, const Setti if (ASTInsertQuery * insert = typeid_cast(&*query)) insert_id = insert->insert_id; - return new ReplicatedMergeTreeBlockOutputStream(*this, insert_id, settings.insert_quorum); + return new ReplicatedMergeTreeBlockOutputStream(*this, insert_id, + settings.insert_quorum, settings.insert_quorum_timeout.totalMilliseconds()); } @@ -2975,12 +3008,33 @@ void StorageReplicatedMergeTree::rename(const String & new_path_to_db, const Str } +bool StorageReplicatedMergeTree::existsNodeCached(const std::string & path) +{ + { + std::lock_guard lock(existing_nodes_cache_mutex); + if (existing_nodes_cache.count(path)) + return true; + } + + auto zookeeper = getZooKeeper(); + bool res = zookeeper->exists(path); + + if (res) + { + std::lock_guard lock(existing_nodes_cache_mutex); + existing_nodes_cache.insert(path); + } + + return res; +} + + AbandonableLockInZooKeeper StorageReplicatedMergeTree::allocateBlockNumber(const String & month_name) { auto zookeeper = getZooKeeper(); String month_path = zookeeper_path + "/block_numbers/" + month_name; - if (!zookeeper->exists(month_path)) + if (!existsNodeCached(month_path)) { /// Создадим в block_numbers ноду для месяца и пропустим в ней N=RESERVED_BLOCK_NUMBERS значений инкремента. /// Нужно, чтобы в будущем при необходимости можно было добавить данные в начало. diff --git a/libs/libzkutil/include/zkutil/ZooKeeper.h b/libs/libzkutil/include/zkutil/ZooKeeper.h index 3fb971ba69..36aa867969 100644 --- a/libs/libzkutil/include/zkutil/ZooKeeper.h +++ b/libs/libzkutil/include/zkutil/ZooKeeper.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -345,7 +346,7 @@ private: std::string task_queue_path; int32_t session_timeout_ms; - Poco::FastMutex mutex; + std::mutex mutex; ACLPtr default_acl; zhandle_t * impl; diff --git a/libs/libzkutil/src/ZooKeeper.cpp b/libs/libzkutil/src/ZooKeeper.cpp index 029e389429..2d4bcb8da6 100644 --- a/libs/libzkutil/src/ZooKeeper.cpp +++ b/libs/libzkutil/src/ZooKeeper.cpp @@ -53,7 +53,7 @@ void ZooKeeper::processEvent(zhandle_t * zh, int type, int state, const char * p if (type != ZOO_SESSION_EVENT) { { - Poco::ScopedLock lock(watch->zk.mutex); + std::lock_guard lock(watch->zk.mutex); watch->zk.watch_store.erase(watch); } delete watch; @@ -140,7 +140,7 @@ void * ZooKeeper::watchForEvent(EventPtr event) { WatchWithEvent * res = new WatchWithEvent(*this, event); { - Poco::ScopedLock lock(mutex); + std::lock_guard lock(mutex); watch_store.insert(res); if (watch_store.size() % 10000 == 0) { @@ -621,13 +621,13 @@ Op::Create::Create(const std::string & path_, const std::string & value_, ACLPtr ACLPtr ZooKeeper::getDefaultACL() { - Poco::ScopedLock lock(mutex); + std::lock_guard lock(mutex); return default_acl; } void ZooKeeper::setDefaultACL(ACLPtr new_acl) { - Poco::ScopedLock lock(mutex); + std::lock_guard lock(mutex); default_acl = new_acl; } -- GitLab