提交 a8974f87 编写于 作者: V VadimPE

CLICKHOUSE-3847 add Coordinator

上级 be2d8216
......@@ -104,7 +104,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
for (const String & replica : replicas)
{
zkutil::Stat host_stat;
Coordination::Stat host_stat;
zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/host", &host_stat);
String pointer = zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer");
if (pointer.empty())
......@@ -163,7 +163,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
min_saved_log_pointer = std::min(min_saved_log_pointer, log_pointer);
}
if (recovering_replicas.size() != 0)
if (!recovering_replicas.empty())
min_saved_log_pointer = std::min(min_saved_log_pointer, min_inactive_log_pointer);
/// We will not touch the last `min_replicated_logs_to_keep` records.
......@@ -205,17 +205,17 @@ void ReplicatedMergeTreeCleanupThread::markLostReplicas(const std::unordered_map
struct LostReplicaInfo
{
String name;
zkutil::Requests requests;
Coordination::Requests requests;
};
std::vector<zkutil::Requests> requests;
std::vector<Coordination::Requests> requests;
std::vector<LostReplicaInfo> lost_replicas_info;
std::vector<std::pair<LostReplicaInfo, zkutil::ZooKeeper::FutureMulti>> info_and_future;
for (auto pair : log_pointers_lost_replicas)
{
String replica = pair.first;
zkutil::Requests ops;
Coordination::Requests ops;
/// If host changed version we can not mark replicas, because replica started to be active.
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas/" + replica + "/host", host_versions_inactive_replicas.at(replica)));
ops.emplace_back(zkutil::makeSetRequest(storage.zookeeper_path + "/replicas/" + replica + "/is_lost", "1", -1));
......@@ -231,7 +231,7 @@ void ReplicatedMergeTreeCleanupThread::markLostReplicas(const std::unordered_map
for (auto & pair : info_and_future)
{
auto multi_responses = pair.second.get();
if (multi_responses.responses[0]->error == ZooKeeperImpl::ZooKeeper::ZBADVERSION)
if (multi_responses.responses[0]->error == Coordination::Error::ZBADVERSION)
throw Exception(pair.first.name + " became active, when we clear log", DB::ErrorCodes::REPLICA_STATUS_CHANGED);
zkutil::KeeperMultiException::check(multi_responses.error, pair.first.requests, multi_responses.responses);
}
......
......@@ -596,13 +596,13 @@ void StorageReplicatedMergeTree::createReplica()
do
{
zkutil::Stat replicas_stat;
Coordination::Stat replicas_stat;
String last_added_replica = zookeeper->get(zookeeper_path + "/replicas", &replicas_stat);
String is_lost_value = last_added_replica == "" ? "0" : "1";
zkutil::Requests ops;
zkutil::Responses resps;
Coordination::Requests ops;
Coordination::Responses resps;
ops.emplace_back(zkutil::makeCreateRequest(replica_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/host", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_pointer", "", zkutil::CreateMode::Persistent));
......@@ -615,13 +615,13 @@ void StorageReplicatedMergeTree::createReplica()
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/replicas", "last added replica: " + replica_name, replicas_stat.version));
code = zookeeper->tryMulti(ops, resps);
if (code == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS)
if (code == Coordination::Error::ZNODEEXISTS)
throw Exception("Replica " + replica_path + " already exists.", ErrorCodes::REPLICA_IS_ALREADY_EXIST);
else if (code == ZooKeeperImpl::ZooKeeper::ZBADVERSION)
else if (code == Coordination::Error::ZBADVERSION)
LOG_ERROR(log, "Retry createReplica(), because some replicas were created");
else
zkutil::KeeperMultiException::check(code, ops, resps);
} while (code == ZooKeeperImpl::ZooKeeper::ZBADVERSION);
} while (code == Coordination::Error::ZBADVERSION);
}
......@@ -1950,7 +1950,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
}
void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zkutil::Stat source_is_lost_stat, zkutil::ZooKeeperPtr & zookeeper)
void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coordination::Stat source_is_lost_stat, zkutil::ZooKeeperPtr & zookeeper)
{
LOG_INFO(log, "Will mimic " << source_replica);
......@@ -1981,7 +1981,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zku
String raw_log_pointer = zookeeper->get(source_path + "/log_pointer");
zkutil::Requests ops;
Coordination::Requests ops;
ops.push_back(zkutil::makeSetRequest(replica_path + "/log_pointer", raw_log_pointer, -1));
/// For support old versions CH.
......@@ -1993,12 +1993,12 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zku
else
ops.push_back(zkutil::makeCheckRequest(source_path + "/is_lost", source_is_lost_stat.version));
zkutil::Responses resp;
Coordination::Responses resp;
auto error = zookeeper->tryMulti(ops, resp);
if (error == ZooKeeperImpl::ZooKeeper::ZBADVERSION)
if (error == Coordination::Error::ZBADVERSION)
throw Exception("Can not clone replica, because a " + source_path + " became lost", ErrorCodes::REPLICA_STATUS_CHANGED);
else if (error == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS)
else if (error == Coordination::Error::ZNODEEXISTS)
throw Exception("Can not clone replica, because the ClickHouse server updated to new version", ErrorCodes::REPLICA_STATUS_CHANGED);
else
zkutil::KeeperMultiException::check(error, ops, resp);
......@@ -2060,7 +2060,7 @@ void StorageReplicatedMergeTree::cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zooke
}
String source_replica;
zkutil::Stat source_is_lost_stat;
Coordination::Stat source_is_lost_stat;
source_is_lost_stat.version = -1;
for (const String & replica_name : zookeeper->getChildren(zookeeper_path + "/replicas"))
......
......@@ -402,7 +402,7 @@ private:
/** Clone data from another replica.
* If replica can not be cloned throw Exception.
*/
void cloneReplica(const String & source_replica, zkutil::Stat source_is_lost_stat, zkutil::ZooKeeperPtr & zookeeper);
void cloneReplica(const String & source_replica, Coordination::Stat source_is_lost_stat, zkutil::ZooKeeperPtr & zookeeper);
/// Clone replica if it is lost.
void cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zookeeper);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册