提交 6d50fb4b 编写于 作者: A Alexey Milovidov

dbms: fixed quorum inserts; improved performance of ordinary inserts [#METR-16779].

上级 40995442
......@@ -171,6 +171,7 @@ struct Settings
\
/** Для запросов INSERT в реплицируемую таблицу, ждать записи на указанное число реплик и лианеризовать добавление данных. 0 - отключено. */ \
M(SettingUInt64, insert_quorum, 0) \
M(SettingMilliseconds, insert_quorum_timeout, 600000) \
/** Для запросов SELECT из реплицируемой таблицы, кидать исключение, если на реплике нет куска, записанного с кворумом; \
* не читать куски, которые ещё не были записаны с кворумом. */ \
M(SettingUInt64, select_sequential_consistency, 0) \
......
......@@ -8,6 +8,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
/** Примитив синхронизации. Работает следующим образом:
* При создании создает неэфемерную инкрементную ноду и помечает ее как заблокированную (LOCKED).
* unlock() разблокирует ее (UNLOCKED).
......
......@@ -12,7 +12,8 @@ class StorageReplicatedMergeTree;
class ReplicatedMergeTreeBlockOutputStream : public IBlockOutputStream
{
public:
ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_, const String & insert_id_, size_t quorum_);
ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_, const String & insert_id_,
size_t quorum_, size_t quorum_timeout_ms_);
void writePrefix() override;
void write(const Block & block) override;
......@@ -21,6 +22,7 @@ private:
StorageReplicatedMergeTree & storage;
String insert_id;
size_t quorum;
size_t quorum_timeout_ms;
size_t block_index = 0;
Logger * log;
......
......@@ -358,6 +358,12 @@ private:
*/
void checkPartAndAddToZooKeeper(const MergeTreeData::DataPartPtr & part, zkutil::Ops & ops, String name_override = "");
/** Исходит из допущения, что такого куска ещё нигде нет (Это обеспечено, если номер куска выделен с помощью AbandonableLock).
* Кладет в ops действия, добавляющие данные о куске в ZooKeeper.
* Вызывать под TableStructureLock.
*/
void addNewPartToZooKeeper(const MergeTreeData::DataPartPtr & part, zkutil::Ops & ops, String name_override = "");
/// Кладет в ops действия, удаляющие кусок из ZooKeeper.
void removePartFromZooKeeper(const String & part_name, zkutil::Ops & ops);
......@@ -446,8 +452,14 @@ private:
*/
void releasePartitionMergeLock(const std::string & partition_name);
/// Проверить наличие узла в ZK. Если он есть - запомнить эту информацию, и затем сразу отвечать true.
std::unordered_set<std::string> existing_nodes_cache;
std::mutex existing_nodes_cache_mutex;
bool existsNodeCached(const std::string & path);
/// Перешардирование.
private:
struct ReplicaSpaceInfo
{
long double factor = 0.0;
......@@ -469,7 +481,6 @@ private:
using PartitionToMergeLock = std::map<std::string, PartitionMergeLockInfo>;
private:
/** Проверяет, что структуры локальной и реплицируемых таблиц совпадают.
*/
void enforceShardsConsistency(const WeightedZooKeeperPaths & weighted_zookeeper_paths);
......@@ -483,7 +494,6 @@ private:
*/
bool checkSpaceForResharding(const ReplicaToSpaceInfo & replica_to_space_info, size_t partition_size) const;
private:
std::mutex mutex_partition_to_merge_lock;
PartitionToMergeLock partition_to_merge_lock;
};
......
......@@ -323,6 +323,7 @@ namespace ErrorCodes
extern const int RESHARDING_INVALID_PARAMETERS = 317;
extern const int INVALID_SHARD_WEIGHT = 318;
extern const int SHARD_DOESNT_REFERENCE_TABLE = 319;
extern const int UNKNOWN_STATUS_OF_INSERT = 320;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;
......
......@@ -16,12 +16,14 @@ namespace ErrorCodes
extern const int CHECKSUM_DOESNT_MATCH;
extern const int UNEXPECTED_ZOOKEEPER_ERROR;
extern const int NO_ZOOKEEPER;
extern const int READONLY;
extern const int UNKNOWN_STATUS_OF_INSERT;
}
ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream(
StorageReplicatedMergeTree & storage_, const String & insert_id_, size_t quorum_)
: storage(storage_), insert_id(insert_id_), quorum(quorum_),
StorageReplicatedMergeTree & storage_, const String & insert_id_, size_t quorum_, size_t quorum_timeout_ms_)
: storage(storage_), insert_id(insert_id_), quorum(quorum_), quorum_timeout_ms(quorum_timeout_ms_),
log(&Logger::get(storage.data.getLogName() + " (Replicated OutputStream)"))
{
/// Значение кворума 1 имеет такой же смысл, как если он отключён.
......@@ -55,17 +57,28 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
assertSessionIsNotExpired(zookeeper);
/** Если запись с кворумом, то проверим, что требуемое количество реплик сейчас живо,
* а также что для всех предыдущих кусков, для которых требуется кворум, этот кворум достигнут.
*/
* а также что для всех предыдущих кусков, для которых требуется кворум, этот кворум достигнут.
* А также будем проверять, что во время вставки, реплика не была переинициализирована или выключена (по значению узла is_active).
* TODO Слишком сложная логика, можно сделать лучше.
*/
String quorum_status_path = storage.zookeeper_path + "/quorum/status";
String is_active_node_value;
int is_active_node_version = -1;
int host_node_version = -1;
if (quorum)
{
zkutil::ZooKeeper::TryGetFuture quorum_status_future = zookeeper->asyncTryGet(quorum_status_path);
zkutil::ZooKeeper::TryGetFuture is_active_future = zookeeper->asyncTryGet(storage.replica_path + "/is_active");
zkutil::ZooKeeper::TryGetFuture host_future = zookeeper->asyncTryGet(storage.replica_path + "/host");
/// Список живых реплик. Все они регистрируют эфемерную ноду для leader_election.
auto live_replicas = zookeeper->getChildren(storage.zookeeper_path + "/leader_election");
if (live_replicas.size() < quorum)
zkutil::Stat leader_election_stat;
zookeeper->get(storage.zookeeper_path + "/leader_election", &leader_election_stat);
if (leader_election_stat.numChildren < static_cast<int32_t>(quorum))
throw Exception("Number of alive replicas ("
+ toString(live_replicas.size()) + ") is less than requested quorum (" + toString(quorum) + ").",
+ toString(leader_election_stat.numChildren) + ") is less than requested quorum (" + toString(quorum) + ").",
ErrorCodes::TOO_LESS_LIVE_REPLICAS);
/** Достигнут ли кворум для последнего куска, для которого нужен кворум?
......@@ -76,13 +89,21 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
* Если кворум достигнут, то нода удаляется.
*/
String quorum_status;
bool quorum_unsatisfied = zookeeper->tryGet(quorum_status_path, quorum_status);
if (quorum_unsatisfied)
throw Exception("Quorum for previous write has not been satisfied yet. Status: " + quorum_status, ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE);
auto quorum_status = quorum_status_future.get();
if (quorum_status.exists)
throw Exception("Quorum for previous write has not been satisfied yet. Status: " + quorum_status.value, ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE);
/// Обе проверки неявно делаются и позже (иначе был бы race condition).
auto is_active = is_active_future.get();
auto host = host_future.get();
if (!is_active.exists || !host.exists)
throw Exception("Replica is not active right now", ErrorCodes::READONLY);
is_active_node_value = is_active.value;
is_active_node_version = is_active.stat.version;
host_node_version = host.stat.version;
}
auto part_blocks = storage.writer.splitBlockIntoParts(block);
......@@ -95,7 +116,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
String block_id = insert_id.empty() ? "" : insert_id + "__" + toString(block_index);
String month_name = toString(DateLUT::instance().toNumYYYYMMDD(DayNum_t(current_block.min_date)) / 100);
AbandonableLockInZooKeeper block_number_lock = storage.allocateBlockNumber(month_name);
AbandonableLockInZooKeeper block_number_lock = storage.allocateBlockNumber(month_name); /// 2 RTT
Int64 part_number = block_number_lock.getNumber();
......@@ -161,7 +182,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
zkutil::CreateMode::Persistent));
/// Информация о куске, в данных реплики.
storage.checkPartAndAddToZooKeeper(part, ops, part_name);
storage.addNewPartToZooKeeper(part, ops, part_name);
/// Лог репликации.
ops.push_back(new zkutil::Op::Create(
......@@ -197,6 +218,20 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
quorum_entry.toString(),
acl,
zkutil::CreateMode::Persistent));
/// Удостоверяемся, что за время вставки, реплика не была переинициализирована или выключена (при завершении сервера).
ops.push_back(
new zkutil::Op::Check(
storage.replica_path + "/is_active",
is_active_node_version));
/// К сожалению, одной лишь проверки выше недостаточно, потому что узел is_active может удалиться и появиться заново с той же версией.
/// Но тогда изменится значение узла host. Будем проверять это.
/// Замечательно, что эти два узла меняются в одной транзакции (см. MergeTreeRestartingThread).
ops.push_back(
new zkutil::Op::Check(
storage.replica_path + "/host",
host_node_version));
}
MergeTreeData::Transaction transaction; /// Если не получится добавить кусок в ZK, снова уберем его из рабочего набора.
......@@ -260,6 +295,9 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
{
transaction.commit();
storage.enqueuePartForCheck(part->name);
/// Мы не знаем, были или не были вставлены данные.
throw Exception("Unknown status, client must retry. Reason: " + e.displayText(), ErrorCodes::UNKNOWN_STATUS_OF_INSERT);
}
throw;
......@@ -267,9 +305,44 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
if (quorum)
{
/// Дожидаемся достижения кворума. TODO Настраиваемый таймаут.
/// Дожидаемся достижения кворума.
LOG_TRACE(log, "Waiting for quorum");
zookeeper->waitForDisappear(quorum_status_path);
try
{
while (true)
{
zkutil::EventPtr event = new Poco::Event;
std::string value;
/// get вместо exists, чтобы не утек watch, если ноды уже нет.
if (!zookeeper->tryGet(quorum_status_path, value, nullptr, event))
break;
ReplicatedMergeTreeQuorumEntry quorum_entry(value);
/// Если нода успела исчезнуть, а потом появиться снова уже для следующей вставки.
if (quorum_entry.part_name != part_name)
break;
if (!event->tryWait(quorum_timeout_ms))
throw Exception("Timeout while waiting for quorum");
}
/// А вдруг возможно, что текущая реплика в это время перестала быть активной и кворум помечен как неудавшийся, и удалён?
String value;
if (!zookeeper->tryGet(storage.replica_path + "/is_active", value, nullptr)
|| value != is_active_node_value)
throw Exception("Replica become inactive while waiting for quorum");
}
catch (...)
{
/// Мы не знаем, были или не были вставлены данные
/// - успели или не успели другие реплики скачать кусок и пометить кворум как выполненный.
throw Exception("Unknown status, client must retry. Reason: " + getCurrentExceptionMessage(false),
ErrorCodes::UNKNOWN_STATUS_OF_INSERT);
}
LOG_TRACE(log, "Quorum satisfied");
}
}
......
......@@ -860,6 +860,38 @@ void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(const MergeTreeData:
}
void StorageReplicatedMergeTree::addNewPartToZooKeeper(const MergeTreeData::DataPartPtr & part, zkutil::Ops & ops, String part_name)
{
auto zookeeper = getZooKeeper();
if (part_name.empty())
part_name = part->name;
check(part->columns);
auto acl = zookeeper->getDefaultACL();
ops.push_back(new zkutil::Op::Check(
zookeeper_path + "/columns",
columns_version));
ops.push_back(new zkutil::Op::Create(
replica_path + "/parts/" + part_name,
"",
acl,
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
replica_path + "/parts/" + part_name + "/columns",
part->columns.toString(),
acl,
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
replica_path + "/parts/" + part_name + "/checksums",
part->checksums.toString(),
acl,
zkutil::CreateMode::Persistent));
}
void StorageReplicatedMergeTree::pullLogsToQueue(zkutil::EventPtr next_update_event)
{
if (queue.pullLogsToQueue(getZooKeeper(), next_update_event))
......@@ -2453,7 +2485,8 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(ASTPtr query, const Setti
if (ASTInsertQuery * insert = typeid_cast<ASTInsertQuery *>(&*query))
insert_id = insert->insert_id;
return new ReplicatedMergeTreeBlockOutputStream(*this, insert_id, settings.insert_quorum);
return new ReplicatedMergeTreeBlockOutputStream(*this, insert_id,
settings.insert_quorum, settings.insert_quorum_timeout.totalMilliseconds());
}
......@@ -2975,12 +3008,33 @@ void StorageReplicatedMergeTree::rename(const String & new_path_to_db, const Str
}
bool StorageReplicatedMergeTree::existsNodeCached(const std::string & path)
{
{
std::lock_guard<std::mutex> lock(existing_nodes_cache_mutex);
if (existing_nodes_cache.count(path))
return true;
}
auto zookeeper = getZooKeeper();
bool res = zookeeper->exists(path);
if (res)
{
std::lock_guard<std::mutex> lock(existing_nodes_cache_mutex);
existing_nodes_cache.insert(path);
}
return res;
}
AbandonableLockInZooKeeper StorageReplicatedMergeTree::allocateBlockNumber(const String & month_name)
{
auto zookeeper = getZooKeeper();
String month_path = zookeeper_path + "/block_numbers/" + month_name;
if (!zookeeper->exists(month_path))
if (!existsNodeCached(month_path))
{
/// Создадим в block_numbers ноду для месяца и пропустим в ней N=RESERVED_BLOCK_NUMBERS значений инкремента.
/// Нужно, чтобы в будущем при необходимости можно было добавить данные в начало.
......
......@@ -6,6 +6,7 @@
#include <unordered_set>
#include <future>
#include <memory>
#include <mutex>
#include <string>
#include <common/logger_useful.h>
......@@ -345,7 +346,7 @@ private:
std::string task_queue_path;
int32_t session_timeout_ms;
Poco::FastMutex mutex;
std::mutex mutex;
ACLPtr default_acl;
zhandle_t * impl;
......
......@@ -53,7 +53,7 @@ void ZooKeeper::processEvent(zhandle_t * zh, int type, int state, const char * p
if (type != ZOO_SESSION_EVENT)
{
{
Poco::ScopedLock<Poco::FastMutex> lock(watch->zk.mutex);
std::lock_guard<std::mutex> lock(watch->zk.mutex);
watch->zk.watch_store.erase(watch);
}
delete watch;
......@@ -140,7 +140,7 @@ void * ZooKeeper::watchForEvent(EventPtr event)
{
WatchWithEvent * res = new WatchWithEvent(*this, event);
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
std::lock_guard<std::mutex> lock(mutex);
watch_store.insert(res);
if (watch_store.size() % 10000 == 0)
{
......@@ -621,13 +621,13 @@ Op::Create::Create(const std::string & path_, const std::string & value_, ACLPtr
ACLPtr ZooKeeper::getDefaultACL()
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
std::lock_guard<std::mutex> lock(mutex);
return default_acl;
}
void ZooKeeper::setDefaultACL(ACLPtr new_acl)
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
std::lock_guard<std::mutex> lock(mutex);
default_acl = new_acl;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册