提交 10be6ca8 编写于 作者: A Alexey Milovidov

Rewriting ZooKeeper library [#CLICKHOUSE-2]

上级 6b684fc1
......@@ -29,11 +29,11 @@ public:
if (zookeeper->tryGet(path, result_str, &stat))
{
result = std::stol(result_str) + 1;
success = zookeeper->trySet(path, std::to_string(result), stat.version) == ZOK;
success = zookeeper->trySet(path, std::to_string(result), stat.version) == ZooKeeperImpl::ZooKeeper::ZOK;
}
else
{
success = zookeeper->tryCreate(path, std::to_string(result), zkutil::CreateMode::Persistent) == ZOK;
success = zookeeper->tryCreate(path, std::to_string(result), zkutil::CreateMode::Persistent) == ZooKeeperImpl::ZooKeeper::ZOK;
}
}
while (!success);
......
......@@ -76,41 +76,9 @@ private:
std::string node_path = node->getPath();
node_name = node_path.substr(node_path.find_last_of('/') + 1);
cleanOldEphemeralNodes();
thread = std::thread(&LeaderElection::threadFunction, this);
}
void cleanOldEphemeralNodes()
{
if (identifier.empty())
return;
/** If there are nodes with same identifier, remove them.
* Such nodes could still be alive after failed attempt of removal,
* if it was temporary communication failure, that was continued for more than session timeout,
* but ZK session is still alive for unknown reason, and someone still holds that ZK session.
* See comments in destructor of EphemeralNodeHolder.
*/
Strings brothers = zookeeper.getChildren(path);
for (const auto & brother : brothers)
{
if (brother == node_name)
continue;
std::string brother_path = path + "/" + brother;
std::string brother_identifier = zookeeper.get(brother_path);
if (brother_identifier == identifier)
{
ProfileEvents::increment(ProfileEvents::ObsoleteEphemeralNode);
LOG_WARNING(&Logger::get("LeaderElection"), "Found obsolete ephemeral node for identifier "
+ identifier + ", removing: " + brother_path);
zookeeper.tryRemoveWithRetries(brother_path);
}
}
}
void releaseNode()
{
shutdown = true;
......
......@@ -29,7 +29,31 @@ using EventPtr = std::shared_ptr<Poco::Event>;
/// they must execute as quickly as possible (preferably just set some notification).
using WatchCallback = ZooKeeperImpl::ZooKeeper::WatchCallback;
using RequestPtr = ZooKeeperImpl::ZooKeeper::RequestPtr;
using ResponsePtr = ZooKeeperImpl::ZooKeeper::ResponsePtr;
using Requests = ZooKeeperImpl::ZooKeeper::Requests;
using Responses = ZooKeeperImpl::ZooKeeper::Responses;
using CreateRequest = ZooKeeperImpl::ZooKeeper::CreateRequest;
using RemoveRequest = ZooKeeperImpl::ZooKeeper::RemoveRequest;
using ExistsRequest = ZooKeeperImpl::ZooKeeper::ExistsRequest;
using GetRequest = ZooKeeperImpl::ZooKeeper::GetRequest;
using SetRequest = ZooKeeperImpl::ZooKeeper::SetRequest;
using ListRequest = ZooKeeperImpl::ZooKeeper::ListRequest;
using CheckRequest = ZooKeeperImpl::ZooKeeper::CheckRequest;
using CreateResponse = ZooKeeperImpl::ZooKeeper::CreateResponse;
using RemoveResponse = ZooKeeperImpl::ZooKeeper::RemoveResponse;
using ExistsResponse = ZooKeeperImpl::ZooKeeper::ExistsResponse;
using GetResponse = ZooKeeperImpl::ZooKeeper::GetResponse;
using SetResponse = ZooKeeperImpl::ZooKeeper::SetResponse;
using ListResponse = ZooKeeperImpl::ZooKeeper::ListResponse;
using CheckResponse = ZooKeeperImpl::ZooKeeper::CheckResponse;
RequestPtr makeCreateRequest(const std::string & path, const std::string & data, int create_mode);
RequestPtr makeRemoveRequest(const std::string & path, int version);
RequestPtr makeSetRequest(const std::string & path, const std::string & data);
RequestPtr makeCheckRequest(const std::string & path, int version);
}
......@@ -830,4 +830,39 @@ void KeeperMultiException::check(int32_t code, const Requests & requests, const
throw KeeperException(code);
}
RequestPtr makeCreateRequest(const std::string & path, const std::string & data, int create_mode)
{
auto request = std::make_shared<CreateRequest>();
request->path = path;
request->data = data;
request->is_ephemeral = create_mode == CreateMode::Ephemeral || create_mode == CreateMode::EphemeralSequential;
request->is_sequential = create_mode == CreateMode::PersistentSequential || create_mode == CreateMode::EphemeralSequential;
return request;
}
RequestPtr makeRemoveRequest(const std::string & path, int version)
{
auto request = std::make_shared<RemoveRequest>();
request->path = path;
request->version = version;
return request;
}
RequestPtr makeSetRequest(const std::string & path, const std::string & data)
{
auto request = std::make_shared<SetRequest>();
request->path = path;
request->data = data;
return request;
}
RequestPtr makeCheckRequest(const std::string & path, int version)
{
auto request = std::make_shared<CheckRequest>();
request->path = path;
request->version = version;
return request;
}
}
......@@ -31,25 +31,24 @@ TEST(zkutil, multi_nice_exception_msg)
{
auto zookeeper = std::make_unique<zkutil::ZooKeeper>("localhost:2181");
auto acl = zookeeper->getDefaultACL();
zkutil::Ops ops;
zkutil::Requests ops;
ASSERT_NO_THROW(
zookeeper->tryRemoveRecursive("/clickhouse_test/zkutil_multi");
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test/zkutil_multi", "_", acl, zkutil::CreateMode::Persistent));
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test/zkutil_multi/a", "_", acl, zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest("/clickhouse_test/zkutil_multi", "_", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest("/clickhouse_test/zkutil_multi/a", "_", zkutil::CreateMode::Persistent));
zookeeper->multi(ops);
);
try
{
ops.clear();
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test/zkutil_multi/c", "_", acl, zkutil::CreateMode::Persistent));
ops.emplace_back(new zkutil::Op::Remove("/clickhouse_test/zkutil_multi/c", -1));
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test/zkutil_multi/a", "BadBoy", acl, zkutil::CreateMode::Persistent));
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test/zkutil_multi/b", "_", acl, zkutil::CreateMode::Persistent));
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test/zkutil_multi/a", "_", acl, zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest("/clickhouse_test/zkutil_multi/c", "_", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeRemoveRequest("/clickhouse_test/zkutil_multi/c", -1));
ops.emplace_back(zkutil::makeCreateRequest("/clickhouse_test/zkutil_multi/a", "BadBoy", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest("/clickhouse_test/zkutil_multi/b", "_", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest("/clickhouse_test/zkutil_multi/a", "_", zkutil::CreateMode::Persistent));
zookeeper->multi(ops);
FAIL();
......@@ -69,8 +68,7 @@ TEST(zkutil, multi_nice_exception_msg)
TEST(zkutil, multi_async)
{
auto zookeeper = std::make_unique<zkutil::ZooKeeper>("localhost:2181");
auto acl = zookeeper->getDefaultACL();
zkutil::Ops ops;
zkutil::Requests ops;
zookeeper->tryRemoveRecursive("/clickhouse_test/zkutil_multi");
......@@ -81,14 +79,14 @@ TEST(zkutil, multi_async)
{
ops.clear();
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test/zkutil_multi", "", acl, zkutil::CreateMode::Persistent));
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test/zkutil_multi/a", "", acl, zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest("/clickhouse_test/zkutil_multi", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest("/clickhouse_test/zkutil_multi/a", "", zkutil::CreateMode::Persistent));
auto fut = zookeeper->tryAsyncMulti(ops);
ops.clear();
auto res = fut.get();
ASSERT_TRUE(res.code == ZOK);
ASSERT_TRUE(res.code == ZooKeeperImpl::ZooKeeper::ZOK);
ASSERT_EQ(res.results->size(), 2);
ASSERT_EQ(res.ops_ptr->size(), 2);
}
......@@ -100,11 +98,11 @@ TEST(zkutil, multi_async)
for (size_t i = 0; i < 10000; ++i)
{
ops.clear();
ops.emplace_back(new zkutil::Op::Remove("/clickhouse_test/zkutil_multi", -1));
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test/zkutil_multi", "_", acl, zkutil::CreateMode::Persistent));
ops.emplace_back(new zkutil::Op::Check("/clickhouse_test/zkutil_multi", -1));
ops.emplace_back(new zkutil::Op::SetData("/clickhouse_test/zkutil_multi", "xxx", 42));
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test/zkutil_multi/a", "_", acl, zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeRemoveRequest("/clickhouse_test/zkutil_multi", -1));
ops.emplace_back(zkutil::makeCreateRequest("/clickhouse_test/zkutil_multi", "_", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCheckRequest("/clickhouse_test/zkutil_multi", -1));
ops.emplace_back(zkutil::makeSetRequest("/clickhouse_test/zkutil_multi", "xxx", 42));
ops.emplace_back(zkutil::makeCreateRequest("/clickhouse_test/zkutil_multi/a", "_", zkutil::CreateMode::Persistent));
futures.emplace_back(zookeeper->asyncMulti(ops));
}
......@@ -118,14 +116,14 @@ TEST(zkutil, multi_async)
{
ops.clear();
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test/zkutil_multi", "_", acl, zkutil::CreateMode::Persistent));
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test/zkutil_multi/a", "_", acl, zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest("/clickhouse_test/zkutil_multi", "_", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest("/clickhouse_test/zkutil_multi/a", "_", zkutil::CreateMode::Persistent));
auto fut = zookeeper->tryAsyncMulti(ops);
ops.clear();
auto res = fut.get();
ASSERT_TRUE(res.code == ZNODEEXISTS);
ASSERT_TRUE(res.code == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS);
ASSERT_EQ(res.results->size(), 2);
ASSERT_EQ(res.ops_ptr->size(), 2);
}
......@@ -135,9 +133,9 @@ TEST(zkutil, multi_async)
TEST(zkutil, multi_async_libzookeeper_segfault)
{
auto zookeeper = std::make_unique<zkutil::ZooKeeper>("localhost:2181", "", 1000);
zkutil::Ops ops;
zkutil::Requests ops;
ops.emplace_back(new zkutil::Op::Check("/clickhouse_test/zkutil_multi", 0));
ops.emplace_back(zkutil::makeCheckRequest("/clickhouse_test/zkutil_multi", 0));
/// Uncomment to test
//auto cmd = ShellCommand::execute("sudo service zookeeper restart");
......@@ -159,15 +157,14 @@ TEST(zkutil, multi_create_sequential)
zookeeper->createAncestors("/clickhouse_test/");
zookeeper = std::make_unique<zkutil::ZooKeeper>("localhost:2181", "", zkutil::DEFAULT_SESSION_TIMEOUT, "/clickhouse_test");
auto acl = zookeeper->getDefaultACL();
zkutil::Ops ops;
zkutil::Requests ops;
String base_path = "/zkutil/multi_create_sequential";
zookeeper->tryRemoveRecursive(base_path);
zookeeper->createAncestors(base_path + "/");
String sequential_node_prefix = base_path + "/queue-";
ops.emplace_back(new zkutil::Op::Create(sequential_node_prefix, "", acl, zkutil::CreateMode::EphemeralSequential));
ops.emplace_back(zkutil::makeCreateRequest(sequential_node_prefix, "", zkutil::CreateMode::EphemeralSequential));
zkutil::OpResultsPtr results = zookeeper->multi(ops);
zkutil::OpResult & sequential_node_result_op = results->at(0);
......
......@@ -33,9 +33,9 @@ int main(int argc, char ** argv)
while (true)
{
{
zkutil::Ops ops;
ops.emplace_back(std::make_shared<zkutil::Op::Create>("/test/zk_expiration_test", "hello", zk.getDefaultACL(), zkutil::CreateMode::Persistent));
ops.emplace_back(std::make_shared<zkutil::Op::Remove>("/test/zk_expiration_test", -1));
zkutil::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest>("/test/zk_expiration_test", "hello", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeRemoveRequest>("/test/zk_expiration_test", -1));
zkutil::MultiTransactionInfo info;
zk.tryMultiNoThrow(ops, nullptr, &info);
......@@ -43,7 +43,7 @@ int main(int argc, char ** argv)
std::cout << time(nullptr) - time0 << "s: " << zkutil::ZooKeeper::error2string(info.code) << std::endl;
try
{
if (info.code != ZOK)
if (info.code)
std::cout << "Path: " << info.getFailedOp().getPath() << std::endl;
}
catch (...)
......
......@@ -35,13 +35,13 @@ try
zk.remove("/test");
Ops ops;
ops.emplace_back(std::make_unique<Op::Create>("/test", "multi1", zk.getDefaultACL(), CreateMode::Persistent));
ops.emplace_back(std::make_unique<Op::SetData>("/test", "multi2", -1));
ops.emplace_back(std::make_unique<Op::Remove>("/test", -1));
zkutil::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest("/test", "multi1", CreateMode::Persistent));
ops.emplace_back(zkutil::makeSetRequest("/test", "multi2", -1));
ops.emplace_back(zkutil::makeRemoveRequest("/test", -1));
std::cout << "multi" << std::endl;
OpResultsPtr res = zk.multi(ops);
std::cout << "path created: " << dynamic_cast<Op::Create &>(*ops[0]).getPathCreated() << std::endl;
zkutil::Responses res = zk.multi(ops);
std::cout << "path created: " << dynamic_cast<ZooKeeperImpl::ZooKeeper::CreateResponse &>(*ops[0]).path_created << std::endl;
return 0;
}
......
......@@ -549,16 +549,16 @@ void DDLWorker::processTask(DDLTask & task)
String active_node_path = task.entry_path + "/active/" + task.host_id_str;
String finished_node_path = task.entry_path + "/finished/" + task.host_id_str;
auto code = zookeeper->tryCreateWithRetries(active_node_path, "", zkutil::CreateMode::Ephemeral, dummy);
if (code == ZOK || code == ZNODEEXISTS)
auto code = zookeeper->tryCreate(active_node_path, "", zkutil::CreateMode::Ephemeral, dummy);
if (code == ZooKeeperImpl::ZooKeeper::ZOK || code == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS)
{
// Ok
}
else if (code == ZNONODE)
else if (code == ZooKeeperImpl::ZooKeeper::ZNONODE)
{
/// There is no parent
createStatusDirs(task.entry_path);
if (ZOK != zookeeper->tryCreateWithRetries(active_node_path, "", zkutil::CreateMode::Ephemeral, dummy))
if (ZooKeeperImpl::ZooKeeper::ZOK != zookeeper->tryCreate(active_node_path, "", zkutil::CreateMode::Ephemeral, dummy))
throw zkutil::KeeperException(code, active_node_path);
}
else
......@@ -599,10 +599,9 @@ void DDLWorker::processTask(DDLTask & task)
/// FIXME: if server fails right here, the task will be executed twice. We need WAL here.
/// Delete active flag and create finish flag
zkutil::Ops ops;
ops.emplace_back(std::make_shared<zkutil::Op::Remove>(active_node_path, -1));
ops.emplace_back(std::make_shared<zkutil::Op::Create>(finished_node_path, task.execution_status.serializeText(),
zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
zkutil::Requests ops;
ops.emplace_back(zkutil::makeRemoveRequest(active_node_path, -1));
ops.emplace_back(zkutil::makeCreateRequest(finished_node_path, task.execution_status.serializeText(), zkutil::CreateMode::Persistent));
zookeeper->multi(ops);
}
......@@ -779,9 +778,9 @@ void DDLWorker::cleanupQueue()
}
/// Remove the lock node and its parent atomically
zkutil::Ops ops;
ops.emplace_back(std::make_shared<zkutil::Op::Remove>(lock_path, -1));
ops.emplace_back(std::make_shared<zkutil::Op::Remove>(node_path, -1));
zkutil::Requests ops;
ops.emplace_back(zkutil::makeRemoveRequest(lock_path, -1));
ops.emplace_back(zkutil::makeRemoveRequest(node_path, -1));
zookeeper->multi(ops);
lock->unlockAssumeLockNodeRemovedManually();
......@@ -798,13 +797,19 @@ void DDLWorker::cleanupQueue()
/// Try to create nonexisting "status" dirs for a node
void DDLWorker::createStatusDirs(const std::string & node_path)
{
zkutil::Ops ops;
auto acl = zookeeper->getDefaultACL();
ops.emplace_back(std::make_shared<zkutil::Op::Create>(node_path + "/active", "", acl, zkutil::CreateMode::Persistent));
ops.emplace_back(std::make_shared<zkutil::Op::Create>(node_path + "/finished", "", acl, zkutil::CreateMode::Persistent));
zkutil::Requests ops;
{
zkutil::CreateRequest request;
request.path = node_path + "/active";
ops.emplace_back(std::make_shared<zkutil::CreateRequest>(std::move(request)));
}
{
zkutil::CreateRequest request;
request.path = node_path + "/finished";
ops.emplace_back(std::make_shared<zkutil::CreateRequest>(std::move(request)));
}
int code = zookeeper->tryMulti(ops);
if (code != ZOK && code != ZNODEEXISTS)
if (code && code != ZooKeeperImpl::ZooKeeper::ZNODEEXISTS)
throw zkutil::KeeperException(code);
}
......@@ -1041,7 +1046,7 @@ private:
{
Strings res;
int code = zookeeper->tryGetChildren(node_path, res);
if (code != ZOK && code != ZNONODE)
if (code && code != ZooKeeperImpl::ZooKeeper::ZNONODE)
throw zkutil::KeeperException(code, node_path);
return res;
}
......
......@@ -359,18 +359,26 @@ struct TaskCluster
static zkutil::MultiTransactionInfo checkNoNodeAndCommit(
const zkutil::ZooKeeperPtr & zookeeper,
const String & checking_node_path,
zkutil::OpPtr && op)
zkutil::RequestPtr && op)
{
zkutil::Ops ops;
ops.emplace_back(std::make_shared<zkutil::Op::Create>(checking_node_path, "", zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
ops.emplace_back(std::make_shared<zkutil::Op::Remove>(checking_node_path, -1));
zkutil::Requests ops;
{
zkutil::CreateRequest request;
request.path = checking_node_path;
ops.emplace_back(std::make_shared<zkutil::CreateRequest>(std::move(request)));
}
{
zkutil::RemoveRequest request;
request.path = checking_node_path;
ops.emplace_back(std::make_shared<zkutil::RemoveRequest>(std::move(request)));
}
ops.emplace_back(std::move(op));
zkutil::MultiTransactionInfo info;
zookeeper->tryMultiNoThrow(ops, nullptr, &info);
zkutil::Responses responses;
auto code = zookeeper->tryMultiNoThrow(ops, responses);
if (info.code != ZOK && !zkutil::isUserError(info.code))
throw zkutil::KeeperException(info.code);
if (code && !zkutil::isUserError(code))
throw zkutil::KeeperException(code);
return info;
}
......@@ -896,7 +904,7 @@ public:
int code;
zookeeper->tryGetWatch(task_description_path, task_config_str, &stat, task_description_watch_callback, &code);
if (code != ZOK)
if (code)
throw Exception("Can't get description node " + task_description_path, ErrorCodes::BAD_ARGUMENTS);
LOG_DEBUG(log, "Loading description, zxid=" << task_descprtion_current_stat.czxid);
......@@ -1050,15 +1058,15 @@ protected:
}
else
{
zkutil::Ops ops;
ops.emplace_back(new zkutil::Op::SetData(workers_version_path, description, version));
ops.emplace_back(new zkutil::Op::Create(current_worker_path, description, zookeeper->getDefaultACL(), zkutil::CreateMode::Ephemeral));
zkutil::Requests ops;
ops.emplace_back(zkutil::makeSetRequest(workers_version_path, description, version));
ops.emplace_back(zkutil::makeCreateRequest(current_worker_path, description, zkutil::CreateMode::Ephemeral));
auto code = zookeeper->tryMulti(ops);
if (code == ZOK || code == ZNODEEXISTS)
if (code == ZooKeeperImpl::ZooKeeper::ZOK || code == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS)
return std::make_shared<zkutil::EphemeralNodeHolder>(current_worker_path, *zookeeper, false, false, description);
if (code == ZBADVERSION)
if (code == ZooKeeperImpl::ZooKeeper::ZBADVERSION)
{
LOG_DEBUG(log, "A concurrent worker has just been added, will check free worker slots again");
}
......@@ -1212,7 +1220,7 @@ protected:
}
catch (zkutil::KeeperException & e)
{
if (e.code == ZNODEEXISTS)
if (e.code == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS)
{
LOG_DEBUG(log, "Partition " << task_partition.name << " is cleaning now by somebody, sleep");
std::this_thread::sleep_for(default_sleep_time);
......@@ -1447,7 +1455,6 @@ protected:
ClusterPartition & cluster_partition = task_table.getClusterPartition(task_partition.name);
auto zookeeper = getZooKeeper();
auto acl = zookeeper->getDefaultACL();
String is_dirty_flag_path = task_partition.getCommonPartitionIsDirtyPath();
String current_task_is_active_path = task_partition.getActiveWorkerPath();
......@@ -1459,7 +1466,7 @@ protected:
auto create_is_dirty_node = [&] ()
{
auto code = zookeeper->tryCreate(is_dirty_flag_path, current_task_status_path, zkutil::CreateMode::Persistent);
if (code != ZOK && code != ZNODEEXISTS)
if (code && code != ZNODEEXISTS)
throw zkutil::KeeperException(code, is_dirty_flag_path);
};
......@@ -1510,7 +1517,7 @@ protected:
}
catch (const zkutil::KeeperException & e)
{
if (e.code == ZNODEEXISTS)
if (e.code == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS)
{
LOG_DEBUG(log, "Someone is already processing " << current_task_is_active_path);
return PartitionTaskStatus::Active;
......@@ -1579,10 +1586,10 @@ protected:
/// Try start processing, create node about it
{
String start_state = TaskStateWithOwner::getData(TaskState::Started, host_id);
auto op_create = std::make_shared<zkutil::Op::Create>(current_task_status_path, start_state, acl, zkutil::CreateMode::Persistent);
auto op_create = zkutil::makeCreateRequest(current_task_status_path, start_state, zkutil::CreateMode::Persistent);
zkutil::MultiTransactionInfo info = checkNoNodeAndCommit(zookeeper, is_dirty_flag_path, std::move(op_create));
if (info.code != ZOK)
if (info.code)
{
if (info.getFailedOp().getPath() == is_dirty_flag_path)
{
......@@ -1721,10 +1728,10 @@ protected:
/// Finalize the processing, change state of current partition task (and also check is_dirty flag)
{
String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id);
auto op_set = std::make_shared<zkutil::Op::SetData>(current_task_status_path, state_finished, 0);
auto op_set = zkutil::makeSetRequest(current_task_status_path, state_finished, 0);
zkutil::MultiTransactionInfo info = checkNoNodeAndCommit(zookeeper, is_dirty_flag_path, std::move(op_set));
if (info.code != ZOK)
if (info.code)
{
if (info.getFailedOp().getPath() == is_dirty_flag_path)
LOG_INFO(log, "Partition " << task_partition.name << " became dirty and will be dropped and refilled");
......
......@@ -30,7 +30,7 @@ public:
};
AbandonableLockInZooKeeper(
const String & path_prefix_, const String & temp_path, zkutil::ZooKeeper & zookeeper_, zkutil::Ops * precheck_ops = nullptr)
const String & path_prefix_, const String & temp_path, zkutil::ZooKeeper & zookeeper_, zkutil::Requests * precheck_ops = nullptr)
: zookeeper(&zookeeper_), path_prefix(path_prefix_)
{
String abandonable_path = temp_path + "/abandonable_lock-";
......@@ -42,12 +42,9 @@ public:
}
else
{
precheck_ops->emplace_back(std::make_shared<zkutil::Op::Create>(
abandonable_path, "", zookeeper->getDefaultACL(), zkutil::CreateMode::EphemeralSequential));
zkutil::OpResultsPtr op_results = zookeeper->multi(*precheck_ops);
holder_path = op_results->back().value;
precheck_ops->emplace_back(zkutil::makeCreateRequest(abandonable_path, "", zkutil::CreateMode::EphemeralSequential));
zkutil::Responses op_results = zookeeper->multi(*precheck_ops);
holder_path = dynamic_cast<const zkutil::CreateResponse &>(*op_results.back()).path_created;
}
/// Write the path to the secondary node in the main node.
......@@ -101,11 +98,11 @@ public:
}
/// Adds actions equivalent to `unlock()` to the list.
void getUnlockOps(zkutil::Ops & ops)
void getUnlockOps(zkutil::Requests & ops)
{
checkCreated();
ops.emplace_back(std::make_shared<zkutil::Op::Remove>(path, -1));
ops.emplace_back(std::make_shared<zkutil::Op::Remove>(holder_path, -1));
ops.emplace_back(zkutil::makeRemoveRequest(path, -1));
ops.emplace_back(zkutil::makeRemoveRequest(holder_path, -1));
}
/// Do not delete nodes in destructor. You may call this method after 'getUnlockOps' and successful execution of these ops,
......@@ -128,7 +125,7 @@ public:
try
{
zookeeper->tryRemoveEphemeralNodeWithRetries(holder_path);
zookeeper->tryRemove(holder_path);
zookeeper->trySet(path, ""); /// It's not necessary.
}
catch (...)
......@@ -156,7 +153,7 @@ public:
/// If there is no secondary node, you need to test again the existence of the main node,
/// because during this time you might have time to call unlock().
/// At the same time, we will remove the path to the secondary node from there.
if (zookeeper.trySet(path, "") == ZOK)
if (zookeeper.trySet(path, "") == ZooKeeperImpl::ZooKeeper::ZOK)
return ABANDONED;
return UNLOCKED;
......
......@@ -140,10 +140,10 @@ void ReplicatedMergeTreeAlterThread::run()
++changed_parts;
/// Update part metadata in ZooKeeper.
zkutil::Ops ops;
ops.emplace_back(std::make_shared<zkutil::Op::SetData>(
zkutil::Requests ops;
ops.emplace_back(zkutil::makeSetRequest(
storage.replica_path + "/parts/" + part->name + "/columns", transaction->getNewColumns().toString(), -1));
ops.emplace_back(std::make_shared<zkutil::Op::SetData>(
ops.emplace_back(zkutil::makeSetRequest(
storage.replica_path + "/parts/" + part->name + "/checksums",
storage.getChecksumsForZooKeeper(transaction->getNewChecksums()),
-1));
......@@ -155,7 +155,7 @@ void ReplicatedMergeTreeAlterThread::run()
catch (const zkutil::KeeperException & e)
{
/// The part does not exist in ZK. We will add to queue for verification - maybe the part is superfluous, and it must be removed locally.
if (e.code == ZNONODE)
if (e.code == ZooKeeperImpl::ZooKeeper::ZNONODE)
storage.enqueuePartForCheck(part->name);
throw;
......
......@@ -205,21 +205,19 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
/// We remove the lock just after renaming the part. In case of exception, block number will be marked as abandoned.
/// Also, make deduplication check. If a duplicate is detected, no nodes are created.
auto acl = zookeeper->getDefaultACL();
/// Deduplication stuff
bool deduplicate_block = !block_id.empty();
String block_id_path;
zkutil::Ops deduplication_check_ops;
zkutil::Ops * deduplication_check_ops_ptr = nullptr;
zkutil::Requests deduplication_check_ops;
zkutil::Requests * deduplication_check_ops_ptr = nullptr;
if (deduplicate_block)
{
block_id_path = storage.zookeeper_path + "/blocks/" + block_id;
/// Lets check for duplicates in advance, to avoid superflous block numbers allocation
deduplication_check_ops.emplace_back(std::make_shared<zkutil::Op::Create>(block_id_path, "", acl, zkutil::CreateMode::Persistent));
deduplication_check_ops.emplace_back(std::make_shared<zkutil::Op::Remove>(block_id_path, -1));
deduplication_check_ops.emplace_back(zkutil::makeCreateRequest(block_id_path, "", zkutil::CreateMode::Persistent));
deduplication_check_ops.emplace_back(zkutil::makeRemoveRequest(block_id_path, -1));
deduplication_check_ops_ptr = &deduplication_check_ops;
}
......@@ -233,7 +231,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
{
zkutil::MultiTransactionInfo & info = e.info;
if (deduplicate_block && info.code == ZNODEEXISTS && info.getFailedOp().getPath() == block_id_path)
if (deduplicate_block && info.code == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS && info.getFailedOp().getPath() == block_id_path)
{
LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it (skip the insertion)");
part->is_duplicate = true;
......@@ -276,13 +274,13 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
/// Simultaneously add information about the part to all the necessary places in ZooKeeper and remove block_number_lock.
/// Information about the part.
zkutil::Ops ops;
zkutil::Requests ops;
if (deduplicate_block)
{
/// Make final duplicate check and commit block_id
ops.emplace_back(
std::make_shared<zkutil::Op::Create>(
zkutil::makeCreateRequest(
block_id_path,
toString(block_number), /// We will able to know original part number for duplicate blocks, if we want.
acl,
......@@ -291,30 +289,26 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
/// Information about the part, in the replica data.
ops.emplace_back(std::make_shared<zkutil::Op::Check>(
ops.emplace_back(zkutil::makeCheckRequest(
storage.zookeeper_path + "/columns",
storage.columns_version));
ops.emplace_back(std::make_shared<zkutil::Op::Create>(
ops.emplace_back(zkutil::makeCreateRequest(
storage.replica_path + "/parts/" + part->name,
"",
acl,
zkutil::CreateMode::Persistent));
ops.emplace_back(std::make_shared<zkutil::Op::Create>(
ops.emplace_back(zkutil::makeCreateRequest(
storage.replica_path + "/parts/" + part->name + "/columns",
part->columns.toString(),
acl,
zkutil::CreateMode::Persistent));
ops.emplace_back(std::make_shared<zkutil::Op::Create>(
ops.emplace_back(zkutil::makeCreateRequest(
storage.replica_path + "/parts/" + part->name + "/checksums",
storage.getChecksumsForZooKeeper(part->checksums),
acl,
zkutil::CreateMode::Persistent));
/// Replication log.
ops.emplace_back(std::make_shared<zkutil::Op::Create>(
ops.emplace_back(zkutil::makeCreateRequest(
storage.zookeeper_path + "/log/log-",
log_entry.toString(),
acl,
zkutil::CreateMode::PersistentSequential));
/// Deletes the information that the block number is used for writing.
......@@ -339,7 +333,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
*/
ops.emplace_back(
std::make_shared<zkutil::Op::Create>(
zkutil::makeCreateRequest(
quorum_info.status_path,
quorum_entry.toString(),
acl,
......@@ -347,7 +341,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
/// Make sure that during the insertion time, the replica was not reinitialized or disabled (when the server is finished).
ops.emplace_back(
std::make_shared<zkutil::Op::Check>(
zkutil::makeCheckRequest(
storage.replica_path + "/is_active",
quorum_info.is_active_node_version));
......@@ -355,7 +349,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
/// But then the `host` value will change. We will check this.
/// It's great that these two nodes change in the same transaction (see MergeTreeRestartingThread).
ops.emplace_back(
std::make_shared<zkutil::Op::Check>(
zkutil::makeCheckRequest(
storage.replica_path + "/host",
quorum_info.host_node_version));
}
......@@ -366,7 +360,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
zkutil::MultiTransactionInfo info;
zookeeper->tryMultiNoThrow(ops, nullptr, &info); /// 1 RTT
if (info.code == ZOK)
if (info.code == ZooKeeperImpl::ZooKeeper::ZOK)
{
transaction.commit();
storage.merge_selecting_event.set();
......@@ -378,7 +372,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
{
String failed_op_path = info.getFailedOp().getPath();
if (info.code == ZNODEEXISTS && deduplicate_block && failed_op_path == block_id_path)
if (info.code == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS && deduplicate_block && failed_op_path == block_id_path)
{
/// Block with the same id have just appeared in table (or other replica), rollback thee insertion.
LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it (removing part " << part->name << ")");
......@@ -388,7 +382,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
last_block_is_duplicate = true;
ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks);
}
else if (info.code == ZNODEEXISTS && failed_op_path == quorum_info.status_path)
else if (info.code == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS && failed_op_path == quorum_info.status_path)
{
transaction.rollback();
......
......@@ -95,15 +95,15 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
if (entries.empty())
return;
zkutil::Ops ops;
zkutil::Requests ops;
for (size_t i = 0; i < entries.size(); ++i)
{
ops.emplace_back(std::make_shared<zkutil::Op::Remove>(storage.zookeeper_path + "/log/" + entries[i], -1));
ops.emplace_back(zkutil::makeRemoveRequest(storage.zookeeper_path + "/log/" + entries[i], -1));
if (ops.size() > 4 * zkutil::MULTI_BATCH_SIZE || i + 1 == entries.size())
{
/// Simultaneously with clearing the log, we check to see if replica was added since we received replicas list.
ops.emplace_back(std::make_shared<zkutil::Op::Check>(storage.zookeeper_path + "/replicas", stat.version));
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas", stat.version));
zookeeper->multi(ops);
ops.clear();
}
......@@ -159,7 +159,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
{
const String & path = pair.first;
int32_t rc = pair.second.get();
if (rc == ZNOTEMPTY)
if (rc == ZooKeeperImpl::ZooKeeper::ZNOTEMPTY)
{
/// Can happen if there are leftover block nodes with children created by previous server versions.
zookeeper->removeRecursive(path);
......
......@@ -161,21 +161,21 @@ void ReplicatedMergeTreeQueue::updateTimesInZooKeeper(
/// because we update times in ZooKeeper with unlocked mutex, while these times may change.
/// Consider it unimportant (for a short time, ZK will have a slightly different time value).
zkutil::Ops ops;
zkutil::Requests ops;
if (min_unprocessed_insert_time_changed)
ops.emplace_back(std::make_shared<zkutil::Op::SetData>(
ops.emplace_back(zkutil::makeSetRequest(
replica_path + "/min_unprocessed_insert_time", toString(*min_unprocessed_insert_time_changed), -1));
if (max_processed_insert_time_changed)
ops.emplace_back(std::make_shared<zkutil::Op::SetData>(
ops.emplace_back(zkutil::makeSetRequest(
replica_path + "/max_processed_insert_time", toString(*max_processed_insert_time_changed), -1));
if (!ops.empty())
{
auto code = zookeeper->tryMulti(ops);
if (code != ZOK)
if (code)
LOG_ERROR(log, "Couldn't set value of nodes for insert times ("
<< replica_path << "/min_unprocessed_insert_time, max_processed_insert_time)" << ": "
<< zkutil::ZooKeeper::error2string(code) + ". This shouldn't happen often.");
......@@ -187,7 +187,7 @@ void ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, LogEntryPt
{
auto code = zookeeper->tryRemove(replica_path + "/queue/" + entry->znode_name);
if (code != ZOK)
if (code)
LOG_ERROR(log, "Couldn't remove " << replica_path << "/queue/" << entry->znode_name << ": "
<< zkutil::ZooKeeper::error2string(code) << ". This shouldn't happen often.");
......@@ -316,7 +316,7 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z
/// Simultaneously add all new entries to the queue and move the pointer to the log.
zkutil::Ops ops;
zkutil::Requests ops;
std::vector<LogEntryPtr> copied_entries;
copied_entries.reserve(end - begin);
......@@ -327,8 +327,8 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z
zkutil::ZooKeeper::ValueAndStat res = future.second.get();
copied_entries.emplace_back(LogEntry::parse(res.value, res.stat));
ops.emplace_back(std::make_shared<zkutil::Op::Create>(
replica_path + "/queue/queue-", res.value, zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential));
ops.emplace_back(zkutil::makeCreateRequest(
replica_path + "/queue/queue-", res.value, zkutil::CreateMode::PersistentSequential));
const auto & entry = *copied_entries.back();
if (entry.type == LogEntry::GET_PART)
......@@ -342,14 +342,14 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z
}
}
ops.emplace_back(std::make_shared<zkutil::Op::SetData>(
ops.emplace_back(zkutil::makeSetRequest(
replica_path + "/log_pointer", toString(last_entry_index + 1), -1));
if (min_unprocessed_insert_time_changed)
ops.emplace_back(std::make_shared<zkutil::Op::SetData>(
ops.emplace_back(zkutil::makeSetRequest(
replica_path + "/min_unprocessed_insert_time", toString(*min_unprocessed_insert_time_changed), -1));
zookeeper->multi(ops);
auto responses = zookeeper->multi(ops);
/// Now we have successfully updated the queue in ZooKeeper. Update it in RAM.
......@@ -359,7 +359,7 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z
for (size_t i = 0, size = copied_entries.size(); i < size; ++i)
{
String path_created = dynamic_cast<zkutil::Op::Create &>(*ops[i]).getPathCreated();
String path_created = dynamic_cast<const zkutil::CreateResponse &>(*responses[i]).path_created;
copied_entries[i]->znode_name = path_created.substr(path_created.find_last_of('/') + 1);
std::optional<time_t> unused = false;
......@@ -451,7 +451,7 @@ void ReplicatedMergeTreeQueue::removeGetsAndMergesInRange(zkutil::ZooKeeperPtr z
if ((*it)->currently_executing)
to_wait.push_back(*it);
auto code = zookeeper->tryRemove(replica_path + "/queue/" + (*it)->znode_name);
if (code != ZOK)
if (code)
LOG_INFO(log, "Couldn't remove " << replica_path + "/queue/" + (*it)->znode_name << ": "
<< zkutil::ZooKeeper::error2string(code));
......
......@@ -260,10 +260,10 @@ void ReplicatedMergeTreeRestartingThread::removeFailedQuorumParts()
{
LOG_DEBUG(log, "Found part " << part_name << " with failed quorum. Moving to detached. This shouldn't happen often.");
zkutil::Ops ops;
zkutil::Requests ops;
storage.removePartFromZooKeeper(part_name, ops);
auto code = zookeeper->tryMulti(ops);
if (code == ZNONODE)
if (code == ZooKeeperImpl::ZooKeeper::ZNONODE)
LOG_WARNING(log, "Part " << part_name << " with failed quorum is not in ZooKeeper. This shouldn't happen often.");
storage.data.renameAndDetachPart(part, "noquorum");
......@@ -318,20 +318,19 @@ void ReplicatedMergeTreeRestartingThread::activateReplica()
{
auto code = zookeeper->tryRemove(is_active_path, stat.version);
if (code == ZBADVERSION)
if (code == ZooKeeperImpl::ZooKeeper::ZBADVERSION)
throw Exception("Another instance of replica " + storage.replica_path + " was created just now."
" You shouldn't run multiple instances of same replica. You need to check configuration files.",
ErrorCodes::REPLICA_IS_ALREADY_ACTIVE);
if (code != ZOK && code != ZNONODE)
if (code && code != ZooKeeperImpl::ZooKeeper::ZNONODE)
throw zkutil::KeeperException(code, is_active_path);
}
/// Simultaneously declare that this replica is active, and update the host.
zkutil::Ops ops;
ops.emplace_back(std::make_shared<zkutil::Op::Create>(is_active_path,
active_node_identifier, zookeeper->getDefaultACL(), zkutil::CreateMode::Ephemeral));
ops.emplace_back(std::make_shared<zkutil::Op::SetData>(storage.replica_path + "/host", address.toString(), -1));
zkutil::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(is_active_path, active_node_identifier, zkutil::CreateMode::Ephemeral));
ops.emplace_back(zkutil::makeSetRequest(storage.replica_path + "/host", address.toString(), -1));
try
{
......@@ -339,7 +338,7 @@ void ReplicatedMergeTreeRestartingThread::activateReplica()
}
catch (const zkutil::KeeperException & e)
{
if (e.code == ZNODEEXISTS)
if (e.code == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS)
throw Exception("Replica " + storage.replica_path + " appears to be already active. If you're sure it's not, "
"try again in a minute or remove znode " + storage.replica_path + "/is_active manually", ErrorCodes::REPLICA_IS_ALREADY_ACTIVE);
......
......@@ -192,9 +192,14 @@ BlockInputStreams StorageDistributed::read(
size_t num_remote_shards = cluster->getRemoteShardCount();
size_t result_size = (num_remote_shards * settings.max_parallel_replicas) + num_local_shards;
processed_stage = result_size == 1 || settings.distributed_group_by_no_merge
? QueryProcessingStage::Complete
: QueryProcessingStage::WithMergeableState;
if (settings.distributed_group_by_no_merge)
processed_stage = QueryProcessingStage::Complete;
else if (settings.distributed_group_by_force_mergeable_state)
processed_stage = QueryProcessingStage::WithMergeableState;
else /// Normal mode.
processed_stage = result_size == 1
? QueryProcessingStage::Complete
: QueryProcessingStage::WithMergeableState;
const auto & modified_query_ast = rewriteSelectQuery(
query_info.query, remote_database, remote_table);
......
......@@ -321,7 +321,7 @@ private:
* Call under TableStructureLock.
*/
void checkPartChecksumsAndAddCommitOps(const zkutil::ZooKeeperPtr & zookeeper, const MergeTreeData::DataPartPtr & part,
zkutil::Ops & ops, String part_name = "", NameSet * absent_replicas_paths = nullptr);
zkutil::Requests & ops, String part_name = "", NameSet * absent_replicas_paths = nullptr);
String getChecksumsForZooKeeper(const MergeTreeDataPartChecksums & checksums);
......@@ -330,7 +330,7 @@ private:
const MergeTreeData::DataPartPtr & part);
/// Adds actions to `ops` that remove a part from ZooKeeper.
void removePartFromZooKeeper(const String & part_name, zkutil::Ops & ops);
void removePartFromZooKeeper(const String & part_name, zkutil::Requests & ops);
/// Quickly removes big set of parts from ZooKeeper (using async multi queries)
void removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names,
......@@ -415,7 +415,7 @@ private:
/// Creates new block number and additionally perform precheck_ops while creates 'abandoned node'
AbandonableLockInZooKeeper allocateBlockNumber(const String & partition_id, zkutil::ZooKeeperPtr & zookeeper,
zkutil::Ops * precheck_ops = nullptr);
zkutil::Requests * precheck_ops = nullptr);
/** Wait until all replicas, including this, execute the specified action from the log.
* If replicas are added at the same time, it can not wait the added replica .
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册