提交 b4ab7d4b 编写于 作者: V Vitaliy Lyudvichenko

Make early deduplication with block number creation. [#CLICKHOUSE-3545]

上级 61e5b047
......@@ -41,6 +41,16 @@ inline bool isHardwareErrorCode(int32_t zk_return_code)
return isUnrecoverableErrorCode(zk_return_code) || isTemporaryErrorCode(zk_return_code);
}
/// Valid errors sent from server
inline bool isUserError(int32_t zk_return_code)
{
return zk_return_code == ZNONODE
|| zk_return_code == ZBADVERSION
|| zk_return_code == ZNOCHILDRENFOREPHEMERALS
|| zk_return_code == ZNODEEXISTS
|| zk_return_code == ZNOTEMPTY;
}
class KeeperException : public DB::Exception
{
......
......@@ -21,7 +21,9 @@ public:
virtual std::unique_ptr<Op> clone() const = 0;
virtual std::string describe() = 0;
virtual std::string getPath() const = 0;
virtual std::string describe() const = 0;
std::unique_ptr<zoo_op_t> data;
......@@ -44,7 +46,9 @@ struct Op::Remove : public Op
return std::unique_ptr<zkutil::Op>(new Remove(path, version));
}
std::string describe() override { return "command: remove, path: " + path; }
std::string getPath() const override { return path; }
std::string describe() const override { return "command: remove, path: " + path; }
private:
std::string path;
......@@ -53,27 +57,26 @@ private:
struct Op::Create : public Op
{
Create(const std::string & path_, const std::string & value_, ACLPtr acl_, int32_t flags_);
Create(const std::string & path_pattern_, const std::string & value_, ACLPtr acl_, int32_t flags_);
std::unique_ptr<Op> clone() const override
{
return std::unique_ptr<zkutil::Op>(new Create(path, value, acl, flags));
return std::unique_ptr<zkutil::Op>(new Create(path_pattern, value, acl, flags));
}
std::string getPathCreated()
{
return created_path.data();
}
std::string getPathCreated() { return created_path.data(); }
std::string getPath() const override { return path_pattern; }
std::string describe() override
std::string describe() const override
{
return "command: create"
", path: " + path +
", path: " + path_pattern +
", value: " + value;
}
private:
std::string path;
std::string path_pattern;
std::string value;
ACLPtr acl;
int32_t flags;
......@@ -93,7 +96,9 @@ struct Op::SetData : public Op
return std::unique_ptr<zkutil::Op>(new SetData(path, value, version));
}
std::string describe() override
std::string getPath() const override { return path; }
std::string describe() const override
{
return
"command: set"
......@@ -122,7 +127,9 @@ struct Op::Check : public Op
return std::unique_ptr<zkutil::Op>(new Check(path, version));
}
std::string describe() override { return "command: check, path: " + path; }
std::string getPath() const override { return path; }
std::string describe() const override { return "command: check, path: " + path; }
private:
std::string path;
......@@ -162,4 +169,11 @@ class ZooKeeper;
/// path - znode path to which the change happened. if event == ZOO_SESSION_EVENT it is either NULL or empty string.
using WatchCallback = std::function<void(ZooKeeper & zookeeper, int type, int state, const char * path)>;
/// Returns first op which code != ZOK or throws an exception
/// ZooKeeper client sets correct OP codes if the transaction fails because of logical (user) errors like ZNODEEXISTS
/// If it is failed because of network error, for example, OP codes is not set.
/// Therefore you should make zkutil::isUserError() check before the function invocation.
size_t getFailedOpIndex(const OpResultsPtr & op_results, int32_t transaction_return_code);
}
......@@ -29,6 +29,15 @@ namespace CurrentMetrics
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
namespace zkutil
{
......@@ -581,6 +590,7 @@ int32_t ZooKeeper::multiImpl(const Ops & ops_, OpResultsPtr * out_results_)
/// Copy the struct containing pointers with default copy-constructor.
/// It is safe because it hasn't got a destructor.
std::vector<zoo_op_t> ops;
ops.reserve(ops_.size());
for (const auto & op : ops_)
ops.push_back(*(op->data));
......@@ -605,7 +615,7 @@ OpResultsPtr ZooKeeper::multi(const Ops & ops)
for (size_t i = 0; i < ops.size(); ++i)
{
if (results->at(i).err == code)
throw KeeperException("multi() failed at op #" + std::to_string(i) + ", " + ops[i]->describe(), code);
throw KeeperException("Transaction failed at op #" + std::to_string(i) + ": " + ops[i]->describe(), code);
}
}
......@@ -619,15 +629,22 @@ int32_t ZooKeeper::tryMulti(const Ops & ops_, OpResultsPtr * out_results_)
int32_t code = multiImpl(ops_, out_results_);
if (!(code == ZOK ||
code == ZNONODE ||
code == ZNODEEXISTS ||
code == ZNOCHILDRENFOREPHEMERALS ||
code == ZBADVERSION ||
code == ZNOTEMPTY))
code == ZNONODE ||
code == ZNODEEXISTS ||
code == ZNOCHILDRENFOREPHEMERALS ||
code == ZBADVERSION ||
code == ZNOTEMPTY))
throw KeeperException(code);
return code;
}
int32_t ZooKeeper::tryMultiUnsafe(const Ops & ops, MultiTransactionInfo & info)
{
info.code = multiImpl(ops, &info.op_results);
info.ops = &ops;
return info.code;
}
int32_t ZooKeeper::tryMultiWithRetries(const Ops & ops, OpResultsPtr * out_results, size_t * attempt)
{
int32_t code = retry(std::bind(&ZooKeeper::multiImpl, this, std::ref(ops), out_results), attempt);
......@@ -739,10 +756,10 @@ ZooKeeperPtr ZooKeeper::startNewSession() const
return std::make_shared<ZooKeeper>(hosts, identity, session_timeout_ms);
}
Op::Create::Create(const std::string & path_, const std::string & value_, ACLPtr acl_, int32_t flags_)
: path(path_), value(value_), acl(acl_), flags(flags_), created_path(path.size() + ZooKeeper::SEQUENTIAL_SUFFIX_SIZE)
Op::Create::Create(const std::string & path_pattern_, const std::string & value_, ACLPtr acl_, int32_t flags_)
: path_pattern(path_pattern_), value(value_), acl(acl_), flags(flags_), created_path(path_pattern_.size() + ZooKeeper::SEQUENTIAL_SUFFIX_SIZE)
{
zoo_create_op_init(data.get(), path.c_str(), value.c_str(), value.size(), acl, flags, created_path.data(), created_path.size());
zoo_create_op_init(data.get(), path_pattern.c_str(), value.c_str(), value.size(), acl, flags, created_path.data(), created_path.size());
}
ACLPtr ZooKeeper::getDefaultACL()
......@@ -1029,4 +1046,26 @@ ZooKeeper::MultiFuture ZooKeeper::asyncMulti(const zkutil::Ops & ops)
return asyncMultiImpl(ops, true);
}
size_t getFailedOpIndex(const OpResultsPtr & op_results, int32_t transaction_return_code)
{
if (op_results == nullptr || op_results->empty())
throw DB::Exception("OpResults is empty", DB::ErrorCodes::LOGICAL_ERROR);
for (size_t index = 0; index < op_results->size(); ++index)
{
if ((*op_results)[index].err != ZOK)
return index;
}
if (!isUserError(transaction_return_code))
{
throw DB::Exception("There are no failed OPs because '" + ZooKeeper::error2string(transaction_return_code) + "' is not valid response code for that",
DB::ErrorCodes::LOGICAL_ERROR);
}
throw DB::Exception("There is no failed OpResult", DB::ErrorCodes::LOGICAL_ERROR);
}
}
......@@ -36,7 +36,7 @@ const UInt32 BIG_SESSION_TIMEOUT = 600000;
constexpr size_t MULTI_BATCH_SIZE = 100;
struct WatchContext;
struct MultiTransactionInfo;
/// ZooKeeper session. The interface is substantially different from the usual libzookeeper API.
///
......@@ -200,6 +200,8 @@ public:
/// Throws only if some operation has returned an "unexpected" error
/// - an error that would cause the corresponding try- method to throw.
int32_t tryMulti(const Ops & ops, OpResultsPtr * out_results = nullptr);
/// Like previous one, but does not throw any ZooKeeper exceptions
int32_t tryMultiUnsafe(const Ops & ops, MultiTransactionInfo & info);
/// Use only with read-only operations.
int32_t tryMultiWithRetries(const Ops & ops, OpResultsPtr * out_results = nullptr, size_t * attempt = nullptr);
......@@ -429,6 +431,7 @@ private:
bool is_dirty = false;
};
using ZooKeeperPtr = ZooKeeper::Ptr;
......@@ -490,4 +493,41 @@ private:
using EphemeralNodeHolderPtr = EphemeralNodeHolder::Ptr;
/// Simple structure to handle transaction execution results
struct MultiTransactionInfo
{
MultiTransactionInfo() = default;
const Ops * ops = nullptr;
int32_t code = ZOK;
OpResultsPtr op_results;
bool empty() const
{
return ops == nullptr;
}
bool hasFailedOp() const
{
return zkutil::isUserError(code);
}
const Op & getFailedOp() const
{
return *ops->at(getFailedOpIndex(op_results, code));
}
KeeperException getException() const
{
if (hasFailedOp())
{
size_t i = getFailedOpIndex(op_results, code);
return KeeperException("Transaction failed at op #" + std::to_string(i) + ": " + ops->at(i)->describe(), code);
}
else
return KeeperException(code);
}
};
}
......@@ -15,6 +15,3 @@ target_link_libraries(zkutil_zookeeper_holder clickhouse_common_zookeeper)
add_executable (zk_many_watches_reconnect zk_many_watches_reconnect.cpp)
target_link_libraries (zk_many_watches_reconnect clickhouse_common_zookeeper clickhouse_common_configprocessor)
add_executable (zkutil_test_multi_exception zkutil_test_multi_exception.cpp)
target_link_libraries (zkutil_test_multi_exception clickhouse_common_zookeeper gtest_main)
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/Exception.h>
#include <Common/StringUtils/StringUtils.h>
#include <iostream>
#include <chrono>
......@@ -113,3 +113,32 @@ TEST(zkutil, multi_async)
ASSERT_EQ(res.ops_ptr->size(), 2);
}
}
TEST(zkutil, multi_create_sequential)
{
try
{
auto zookeeper = std::make_unique<zkutil::ZooKeeper>("localhost:2181");
auto acl = zookeeper->getDefaultACL();
zkutil::Ops ops;
String base_path = "/clickhouse_test/zkutil/multi_create_sequential";
zookeeper->tryRemoveRecursive(base_path);
zookeeper->createAncestors(base_path + "/");
String entry_path = base_path + "/queue-";
ops.emplace_back(new zkutil::Op::Create(entry_path, "", acl, zkutil::CreateMode::EphemeralSequential));
zkutil::OpResultsPtr results = zookeeper->multi(ops);
zkutil::OpResult & result = results->at(0);
EXPECT_TRUE(result.value != nullptr);
EXPECT_TRUE(startsWith(result.value, entry_path));
}
catch (...)
{
std::cerr << getCurrentExceptionMessage(false);
throw;
}
}
......@@ -8,3 +8,8 @@
sudo iptables -A OUTPUT -p tcp --dport 2181 -j DROP
sudo ip6tables -A OUTPUT -p tcp --dport 2181 -j DROP
# You could also test random drops:
#sudo iptables -A OUTPUT -p tcp --dport 2181 -j REJECT --reject-with tcp-reset -m statistic --mode random --probability 0.1
#sudo ip6tables -A OUTPUT -p tcp --dport 2181 -j REJECT --reject-with tcp-reset -m statistic --mode random --probability 0.1
#include <iostream>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Poco/ConsoleChannel.h>
#include <Common/Exception.h>
/// Проверяет, какие ошибки выдает ZooKeeper при попытке сделать какую-нибудь операцию через разное время после истечения сессии.
......@@ -36,17 +37,20 @@ int main(int argc, char ** argv)
ops.emplace_back(std::make_unique<zkutil::Op::Create>("/test/zk_expiration_test", "hello", zk.getDefaultACL(), zkutil::CreateMode::Persistent));
ops.emplace_back(std::make_unique<zkutil::Op::Remove>("/test/zk_expiration_test", -1));
int code;
zkutil::MultiTransactionInfo info;
zk.tryMultiUnsafe(ops, info);
std::cout << time(nullptr) - time0 << "s: " << zkutil::ZooKeeper::error2string(info.code) << std::endl;
try
{
code = zk.tryMulti(ops);
if (info.code != ZOK)
std::cout << "Path: " << info.getFailedOp().getPath() << std::endl;
}
catch (zkutil::KeeperException & e)
catch (...)
{
code = e.code;
std::cout << DB::getCurrentExceptionMessage(false) << std::endl;
}
std::cout << time(nullptr) - time0 << "s: " << zkutil::ZooKeeper::error2string(code) << std::endl;
}
sleep(1);
......@@ -54,12 +58,12 @@ int main(int argc, char ** argv)
}
catch (zkutil::KeeperException & e)
{
std::cerr << "KeeperException: " << e.displayText() << std::endl;
std::cerr << "KeeperException: " << DB::getCurrentExceptionMessage(true) << std::endl;
return 1;
}
catch (...)
{
std::cerr << "Some exception" << std::endl;
std::cerr << "Some exception: " << DB::getCurrentExceptionMessage(true) << std::endl;
return 2;
}
......
......@@ -624,7 +624,7 @@ struct ReplaceRegexpImpl
re2_st::StringPiece matches[max_captures];
size_t start_pos = 0;
while (start_pos < input.length())
while (start_pos < static_cast<size_t>(input.length()))
{
/// If no more replacements possible for current string
bool can_finish_current_string = false;
......
......@@ -30,11 +30,26 @@ public:
};
AbandonableLockInZooKeeper(
const String & path_prefix_, const String & temp_path, zkutil::ZooKeeper & zookeeper_)
const String & path_prefix_, const String & temp_path, zkutil::ZooKeeper & zookeeper_, zkutil::Ops * precheck_ops = nullptr)
: zookeeper(zookeeper_), path_prefix(path_prefix_)
{
String abandonable_path = temp_path + "/abandonable_lock-";
/// Let's create an secondary ephemeral node.
holder_path = zookeeper.create(temp_path + "/abandonable_lock-", "", zkutil::CreateMode::EphemeralSequential);
if (!precheck_ops || precheck_ops->empty())
{
holder_path = zookeeper.create(abandonable_path, "", zkutil::CreateMode::EphemeralSequential);
}
else
{
precheck_ops->emplace_back(std::make_unique<zkutil::Op::Create>(
abandonable_path, "", zookeeper.getDefaultACL(), zkutil::CreateMode::EphemeralSequential));
if (zookeeper.tryMultiUnsafe(*precheck_ops, precheck_info) != ZOK)
return;
holder_path = precheck_info.op_results->back().value;
}
/// Write the path to the secondary node in the main node.
path = zookeeper.create(path_prefix, holder_path, zkutil::CreateMode::PersistentSequential);
......@@ -51,19 +66,27 @@ public:
std::swap(holder_path, rhs.holder_path);
}
bool isCreated() const
{
return !holder_path.empty() && !path.empty();
}
String getPath() const
{
checkCreated();
return path;
}
/// Parse the number at the end of the path.
UInt64 getNumber() const
{
checkCreated();
return parse<UInt64>(path.c_str() + path_prefix.size(), path.size() - path_prefix.size());
}
void unlock()
{
checkCreated();
zookeeper.remove(path);
zookeeper.remove(holder_path);
holder_path = "";
......@@ -72,10 +95,17 @@ public:
/// Adds actions equivalent to `unlock()` to the list.
void getUnlockOps(zkutil::Ops & ops)
{
checkCreated();
ops.emplace_back(std::make_unique<zkutil::Op::Remove>(path, -1));
ops.emplace_back(std::make_unique<zkutil::Op::Remove>(holder_path, -1));
}
void checkCreated() const
{
if (!isCreated())
throw Exception("AbandonableLock is not created", ErrorCodes::LOGICAL_ERROR);
}
~AbandonableLockInZooKeeper()
{
if (holder_path.empty())
......@@ -127,6 +157,11 @@ private:
String path_prefix;
String path;
String holder_path;
public:
/// Contains information about execution of passed precheck ops and block-creation payload
/// TODO: quite ugly interface to handle precheck ops
zkutil::MultiTransactionInfo precheck_info;
};
}
......@@ -177,8 +177,48 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
/// Obtain incremental block number and lock it. The lock holds our intention to add the block to the filesystem.
/// We remove the lock just after renaming the part. In case of exception, block number will be marked as abandoned.
/// Also, make deduplication check. If a duplicate is detected, no nodes are created.
auto acl = zookeeper->getDefaultACL();
/// Deduplication stuff
bool deduplicate_block = !block_id.empty();
String block_id_path;
zkutil::Ops deduplication_check_ops;
zkutil::Ops * deduplication_check_ops_ptr = nullptr;
if (deduplicate_block)
{
block_id_path = storage.zookeeper_path + "/blocks/" + block_id;
/// Lets check for duplicates in advance, to avoid superflous block numbers allocation
deduplication_check_ops.emplace_back(std::make_unique<zkutil::Op::Create>(block_id_path, "", acl, zkutil::CreateMode::Persistent));
deduplication_check_ops.emplace_back(std::make_unique<zkutil::Op::Remove>(block_id_path, -1));
deduplication_check_ops_ptr = &deduplication_check_ops;
}
/// 2 RTT
AbandonableLockInZooKeeper block_number_lock = storage.allocateBlockNumber(part->info.partition_id, zookeeper, deduplication_check_ops_ptr);
if (!block_number_lock.isCreated())
{
zkutil::MultiTransactionInfo & info = block_number_lock.precheck_info;
if (!info.empty() && info.code != ZOK)
{
if (deduplicate_block && info.code == ZNODEEXISTS && info.getFailedOp().getPath() == block_id_path)
{
LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it (skip the insertion)");
part->is_duplicate = true;
last_block_is_duplicate = true;
return;
}
throw Exception("Cannot allocate block number in ZooKeeper: " + info.getException().displayText(), ErrorCodes::KEEPER_EXCEPTION);
}
throw Exception("Cannot allocate block number in ZooKeeper", ErrorCodes::KEEPER_EXCEPTION);
}
AbandonableLockInZooKeeper block_number_lock = storage.allocateBlockNumber(part->info.partition_id, zookeeper); /// 2 RTT
Int64 block_number = block_number_lock.getNumber();
/// Set part attributes according to part_number. Prepare an entry for log.
......@@ -207,15 +247,17 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
/// Information about the part.
zkutil::Ops ops;
auto acl = zookeeper->getDefaultACL();
if (!block_id.empty())
if (deduplicate_block)
{
/// Make final duplicate check and commit block_id
ops.emplace_back(
std::make_unique<zkutil::Op::Create>(
storage.zookeeper_path + "/blocks/" + block_id,
block_id_path,
toString(block_number), /// We will able to know original part number for duplicate blocks, if we want.
acl,
zkutil::CreateMode::Persistent));
}
/// Information about the part, in the replica data.
......@@ -291,62 +333,66 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
MergeTreeData::Transaction transaction; /// If you can not add a part to ZK, we'll remove it back from the working set.
storage.data.renameTempPartAndAdd(part, nullptr, &transaction);
try
zkutil::MultiTransactionInfo info;
zookeeper->tryMultiUnsafe(ops, info); /// 1 RTT
if (info.code == ZOK)
{
auto code = zookeeper->tryMulti(ops); /// 1 RTT
if (code == ZOK)
transaction.commit();
storage.merge_selecting_event.set();
}
else if (zkutil::isUserError(info.code))
{
String failed_op_path = info.getFailedOp().getPath();
if (info.code == ZNODEEXISTS && deduplicate_block && failed_op_path == block_id_path)
{
transaction.commit();
storage.merge_selecting_event.set();
/// Block with the same id have just appeared in table (or other replica), rollback thee insertion.
LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it (removing part " << part->name << ")");
part->is_duplicate = true;
transaction.rollback();
last_block_is_duplicate = true;
}
else if (code == ZNODEEXISTS)
else if (info.code == ZNODEEXISTS && failed_op_path == quorum_info.status_path)
{
/// If the block with such ID already exists in the table, rollback its insertion.
if (!block_id.empty() && zookeeper->exists(storage.zookeeper_path + "/blocks/" + block_id))
{
LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it (removing part " << part->name << ")");
part->is_duplicate = true;
transaction.rollback();
last_block_is_duplicate = true;
}
else if (zookeeper->exists(quorum_info.status_path))
{
transaction.rollback();
throw Exception("Another quorum insert has been already started", ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE);
}
else
{
/// if the node with the quorum existed, but was quickly removed.
transaction.rollback();
throw Exception("Unexpected ZNODEEXISTS while adding block " + toString(block_number) + " with ID '" + block_id + "': "
+ zkutil::ZooKeeper::error2string(code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
}
throw Exception("Another quorum insert has been already started", ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE);
}
else
{
throw Exception("Unexpected error while adding block " + toString(block_number) + " with ID '" + block_id + "': "
+ zkutil::ZooKeeper::error2string(code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
/// NOTE: We could be here if the node with the quorum existed, but was quickly removed.
transaction.rollback();
throw Exception("Unexpected logical error while adding block " + toString(block_number) + " with ID '" + block_id + "': "
+ zkutil::ZooKeeper::error2string(info.code) + ", path " + failed_op_path,
ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
}
}
catch (const zkutil::KeeperException & e)
else if (zkutil::isTemporaryErrorCode(info.code))
{
/** If the connection is lost, and we do not know if the changes were applied, you can not delete the local part
* if the changes were applied, the inserted block appeared in `/blocks/`, and it can not be inserted again.
*/
if (e.code == ZOPERATIONTIMEOUT ||
e.code == ZCONNECTIONLOSS)
{
transaction.commit();
storage.enqueuePartForCheck(part->name, MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER);
* if the changes were applied, the inserted block appeared in `/blocks/`, and it can not be inserted again.
*/
transaction.commit();
storage.enqueuePartForCheck(part->name, MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER);
/// We do not know whether or not data has been inserted.
throw Exception("Unknown status, client must retry. Reason: " + e.displayText(), ErrorCodes::UNKNOWN_STATUS_OF_INSERT);
}
throw;
/// We do not know whether or not data has been inserted.
throw Exception("Unknown status, client must retry. Reason: " + zkutil::ZooKeeper::error2string(info.code), ErrorCodes::UNKNOWN_STATUS_OF_INSERT);
}
else if (zkutil::isUnrecoverableErrorCode(info.code))
{
transaction.rollback();
throw Exception("Unrecoverable network error while adding block " + toString(block_number) + " with ID '" + block_id + "': "
+ zkutil::ZooKeeper::error2string(info.code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
}
else
{
transaction.rollback();
throw Exception("Unexpected ZooKeeper error while adding block " + toString(block_number) + " with ID '" + block_id + "': "
+ zkutil::ZooKeeper::error2string(info.code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
}
if (quorum)
{
......
......@@ -2891,7 +2891,8 @@ bool StorageReplicatedMergeTree::existsNodeCached(const std::string & path)
}
AbandonableLockInZooKeeper StorageReplicatedMergeTree::allocateBlockNumber(const String & partition_id, zkutil::ZooKeeperPtr & zookeeper)
AbandonableLockInZooKeeper StorageReplicatedMergeTree::allocateBlockNumber(const String & partition_id, zkutil::ZooKeeperPtr & zookeeper,
zkutil::Ops * precheck_ops)
{
String partition_path = zookeeper_path + "/block_numbers/" + partition_id;
if (!existsNodeCached(partition_path))
......@@ -2903,7 +2904,7 @@ AbandonableLockInZooKeeper StorageReplicatedMergeTree::allocateBlockNumber(const
return AbandonableLockInZooKeeper(
partition_path + "/block-",
zookeeper_path + "/temp", *zookeeper);
zookeeper_path + "/temp", *zookeeper, precheck_ops);
}
......
......@@ -396,7 +396,9 @@ private:
/// With the quorum being tracked, add a replica to the quorum for the part.
void updateQuorum(const String & part_name);
AbandonableLockInZooKeeper allocateBlockNumber(const String & partition_id, zkutil::ZooKeeperPtr & zookeeper);
/// Creates new block number and additionally perform precheck_ops while creates 'abandoned node'
AbandonableLockInZooKeeper allocateBlockNumber(const String & partition_id, zkutil::ZooKeeperPtr & zookeeper,
zkutil::Ops * precheck_ops = nullptr);
/** Wait until all replicas, including this, execute the specified action from the log.
* If replicas are added at the same time, it can not wait the added replica .
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册