diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index ed2030204d418a7ceab3f4ab1b0bbde2d9ad2480..c21e87af1f33977dbb1f2df166e41b436418dafe 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -390,10 +390,7 @@ namespace ErrorCodes extern const int CANNOT_SET_SIGNAL_HANDLER = 413; extern const int CANNOT_READLINE = 414; extern const int ALL_REPLICAS_LOST = 415; - extern const int CAN_NOT_CLONE_REPLICA = 416; - extern const int REPLICA_IS_ACTIVE = 417; - extern const int SOURCE_REPLICA_IS_LOST = 418; - extern const int UPDATE_CH = 419; + extern const int REPLICA_STATUS_CHANGED = 416; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index 8c725ce40e76a173f653564f92772fc63e44d76d..e06dfe6bb9a4eb1d2c8c8859aa30d8c2b0fa4305 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -13,7 +13,7 @@ namespace ErrorCodes { extern const int NOT_FOUND_NODE; extern const int ALL_REPLICAS_LOST; - extern const int REPLICA_IS_ACTIVE; + extern const int REPLICA_STATUS_CHANGED; } @@ -81,7 +81,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() return; Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas", &stat); - UInt64 min_pointer_active_replica = std::numeric_limits::max(); + UInt64 min_saved_log_pointer = std::numeric_limits::max(); Strings entries = zookeeper->getChildren(storage.zookeeper_path + "/log"); @@ -93,66 +93,63 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() String min_saved_record_log_str = entries[entries.size() > storage.data.settings.max_replicated_logs_to_keep.value ? entries.size() - storage.data.settings.max_replicated_logs_to_keep.value : 0]; - String min_pointer_inactive_replica_str; - std::unordered_map hosts_version; - std::unordered_map log_pointers_lost_replicas; + std::unordered_map host_versions_inactive_replicas; + std::unordered_map log_pointers_inactive_replicas; for (const String & replica : replicas) { - zkutil::Stat host_stat; - zkutil::Stat log_pointer_stat; + zkutil::Stat host_stat, log_pointer_stat; zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/host", &host_stat); String pointer = zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer", &log_pointer_stat); if (pointer.empty()) return; UInt32 log_pointer = parse(pointer); - String log_pointer_str = "log-" + padIndex(log_pointer); - + /// Check status of replica (active or not). /// If replica was not active, we could check when it's log_pointer locates. - if (zookeeper->exists(storage.zookeeper_path + "/replicas/" + replica + "/is_active") || !zookeeper->exists(storage.zookeeper_path + "/replicas/" + replica + "/is_lost")) - min_pointer_active_replica = std::min(min_pointer_active_replica, log_pointer); + if (zookeeper->exists(storage.zookeeper_path + "/replicas/" + replica + "/is_active")) + min_saved_log_pointer = std::min(min_saved_log_pointer, log_pointer); else { - /// We can not mark lost replicas twice. - if (zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/is_lost") == "0") - { - hosts_version[replica] = host_stat.version; - log_pointers_lost_replicas[replica] = log_pointer_str; - - if (log_pointer_str >= min_saved_record_log_str) + String res; + if (!zookeeper->tryGet(storage.zookeeper_path + "/replicas/" + replica + "/is_lost", res)) + min_saved_log_pointer = std::min(min_saved_log_pointer, log_pointer); + else + if (res == "0") { - if (min_pointer_inactive_replica_str != "" && min_pointer_inactive_replica_str >= log_pointer_str) - min_pointer_inactive_replica_str = log_pointer_str; - else if (min_pointer_inactive_replica_str == "") - min_pointer_inactive_replica_str = log_pointer_str; + String log_pointer_str = "log-" + padIndex(log_pointer); + if (log_pointer_str >= min_saved_record_log_str) + min_saved_log_pointer = std::min(min_saved_log_pointer, log_pointer); + else + { + host_versions_inactive_replicas[replica] = host_stat.version; + log_pointers_inactive_replicas[replica] = log_pointer_str; + } } - } + else + host_versions_inactive_replicas[replica] = host_stat.version; } } - String min_pointer_active_replica_str = "log-" + padIndex(min_pointer_active_replica); - - String min_pointer_replica_str = min_pointer_inactive_replica_str == "" - ? min_pointer_active_replica_str - : std::min(min_pointer_inactive_replica_str, min_pointer_active_replica_str); - /// We will not touch the last `min_replicated_logs_to_keep` records. entries.erase(entries.end() - std::min(entries.size(), storage.data.settings.min_replicated_logs_to_keep.value), entries.end()); /// We will not touch records that are no less than `min_pointer_active_replica`. - entries.erase(std::lower_bound(entries.begin(), entries.end(), min_pointer_replica_str), entries.end()); + entries.erase(std::lower_bound(entries.begin(), entries.end(), "log-" + padIndex(min_saved_log_pointer)), entries.end()); if (entries.empty()) return; - markLostReplicas(hosts_version, log_pointers_lost_replicas, entries.back(), zookeeper); + markLostReplicas(host_versions_inactive_replicas, log_pointers_inactive_replicas, replicas.size(), entries.back(), zookeeper); zkutil::Requests ops; for (size_t i = 0; i < entries.size(); ++i) { ops.emplace_back(zkutil::makeRemoveRequest(storage.zookeeper_path + "/log/" + entries[i], -1)); + + for (auto host_version: host_versions_inactive_replicas) + ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas" + host_version.first + "/host", host_version.second)); if (ops.size() > 4 * zkutil::MULTI_BATCH_SIZE || i + 1 == entries.size()) { @@ -167,39 +164,39 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() } -void ReplicatedMergeTreeCleanupThread::markLostReplicas(const std::unordered_map & hosts_version, - const std::unordered_map & log_pointers_lost_replicas, - const String & remove_border, const zkutil::ZooKeeperPtr & zookeeper) +void ReplicatedMergeTreeCleanupThread::markLostReplicas(const std::unordered_map & host_versions_inactive_replicas, + const std::unordered_map & log_pointers_inactive_replicas, + size_t replicas_count, const String & remove_border, const zkutil::ZooKeeperPtr & zookeeper) { std::vector requests; std::vector futures; - for (auto pair : log_pointers_lost_replicas) + for (auto pair : log_pointers_inactive_replicas) { String replica = pair.first; if (pair.second <= remove_border) { zkutil::Requests ops; /// If host changed version we can not mark replicas, because replica started to be active. - ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas/" + replica + "/host", hosts_version.at(replica))); + ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas/" + replica + "/host", host_versions_inactive_replicas.at(replica))); ops.emplace_back(zkutil::makeSetRequest(storage.zookeeper_path + "/replicas/" + replica + "/is_lost", "1", -1)); requests.push_back(ops); } } - if (requests.size() == (zookeeper->getChildren(storage.zookeeper_path + "/replicas")).size()) - throw Exception("All replicas are lost", ErrorCodes::ALL_REPLICAS_LOST); + if (requests.size() == replicas_count) + throw Exception("All replicas wiil be lost", ErrorCodes::ALL_REPLICAS_LOST); for (auto & req : requests) futures.push_back(zookeeper->tryAsyncMulti(req)); - for (auto & future : futures) + for (size_t i = 0; i < futures.size(); ++i) { - auto res = future.get(); - if (res.error == ZooKeeperImpl::ZooKeeper::ZBADVERSION) - throw Exception("One of the replicas became active, when we clear log", ErrorCodes::REPLICA_IS_ACTIVE); - else if (res.error != ZooKeeperImpl::ZooKeeper::ZOK) - throw; + auto multi_responses = futures[i].get(); + if (multi_responses.responses[0]->error == ZooKeeperImpl::ZooKeeper::ZBADVERSION) + throw Exception("One of the replicas became active, when we clear log", DB::ErrorCodes::REPLICA_STATUS_CHANGED); + else if (multi_responses.responses[0]->error == ZooKeeperImpl::ZooKeeper::ZOK) + zkutil::KeeperMultiException::check(multi_responses.error, requests[i], multi_responses.responses); } } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h index 09b25cbe3aa3e61dbe5eea9a1f4d730259b74c0a..fab93e84b4d4306810d92b6eab6ef6f9417fe9a0 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h @@ -47,9 +47,9 @@ private: void clearOldLogs(); /// Mark lost replicas. - void markLostReplicas(const std::unordered_map & hosts_version, - const std::unordered_map & log_pointers_lost_replicas, - const String & remove_border, const zkutil::ZooKeeperPtr & zookeeper); + void markLostReplicas(const std::unordered_map & host_versions_inactive_replicas, + const std::unordered_map & log_pointers_inactive_replicas, + size_t replicas_count, const String & remove_border, const zkutil::ZooKeeperPtr & zookeeper); /// Remove old block hashes from ZooKeeper. This is done by the leader replica. void clearOldBlocks(); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index fdf25cc7c8fb105b74921fcb878013d44d2e6743..3fc6517a881350fcb9341fc5566488ce3cf71c08 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -107,9 +107,7 @@ namespace ErrorCodes extern const int CANNOT_ASSIGN_OPTIMIZE; extern const int KEEPER_EXCEPTION; extern const int ALL_REPLICAS_LOST; - extern const int CAN_NOT_CLONE_REPLICA; - extern const int SOURCE_REPLICA_IS_LOST; - extern const int UPDATE_CH; + extern const int REPLICA_STATUS_CHANGED; } namespace ActionLocks @@ -614,13 +612,14 @@ void StorageReplicatedMergeTree::createReplica() ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/is_lost", is_lost_value, zkutil::CreateMode::Persistent)); ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/columns", getColumns().toString(), zkutil::CreateMode::Persistent)); /// Check version of /replicas to see if there are any replicas. - ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/replicas", replicas_stat.version)); - ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/replicas", "last added replica: " + replica_name, -1)); + ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/replicas", "last added replica: " + replica_name, replicas_stat.version)); code = zookeeper->tryMulti(ops, resps); if (code == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS) throw Exception("Replica " + replica_path + " already exists.", ErrorCodes::REPLICA_IS_ALREADY_EXIST); - } while (code != ZooKeeperImpl::ZooKeeper::ZOK); + else if (code != ZooKeeperImpl::ZooKeeper::ZBADVERSION) + zkutil::KeeperMultiException::check(code, ops, resps); + } while (code == ZooKeeperImpl::ZooKeeper::ZBADVERSION); } @@ -1949,13 +1948,13 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry) } -void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zkutil::Stat is_lost_stat, zkutil::ZooKeeperPtr & zookeeper) +void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zkutil::Stat source_is_lost_stat, zkutil::ZooKeeperPtr & zookeeper) { LOG_INFO(log, "Will mimic " << source_replica); String source_path = zookeeper_path + "/replicas/" + source_replica; - /** That check will be delete (It is only for old version of CH server). + /** TODO: it will be deleted! (It is only for old version of CH server). * If the reference/master replica is not yet fully created, let's wait. * NOTE: If something went wrong while creating it, we can hang around forever. * You can create an ephemeral node at the time of creation to make sure that the replica is created, and not abandoned. @@ -1983,23 +1982,23 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zku zkutil::Requests ops; ops.push_back(zkutil::makeSetRequest(replica_path + "/log_pointer", raw_log_pointer, -1)); - if (is_lost_stat.version == -1) + if (source_is_lost_stat.version == -1) { ops.push_back(zkutil::makeCreateRequest(replica_path + "/is_lost", "0", zkutil::CreateMode::PersistentSequential)); ops.push_back(zkutil::makeRemoveRequest(replica_path + "/is_lost", -1)); } else - ops.push_back(zkutil::makeCheckRequest(source_path + "/is_lost", is_lost_stat.version)); + ops.push_back(zkutil::makeCheckRequest(source_path + "/is_lost", source_is_lost_stat.version)); zkutil::Responses resp; auto error = zookeeper->tryMulti(ops, resp); if (error == ZooKeeperImpl::ZooKeeper::ZBADVERSION) - throw Exception("Can not clone replica, because a source replica is lost", ErrorCodes::SOURCE_REPLICA_IS_LOST); + throw Exception("Can not clone replica, because a source replica is lost", ErrorCodes::REPLICA_STATUS_CHANGED); else if (error == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS) - throw Exception("ClickHouse server updated to new version", ErrorCodes::UPDATE_CH); - else if (error != ZooKeeperImpl::ZooKeeper::ZOK) - throw ("cloneReplica() failed"); + throw Exception("ClickHouse server updated to new version", ErrorCodes::REPLICA_STATUS_CHANGED); + else + zkutil::KeeperMultiException::check(error, ops, resp); /// Let's remember the queue of the reference/master replica. @@ -2052,14 +2051,14 @@ void StorageReplicatedMergeTree::cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zooke } else { - /// If old replica is_active, and we must create is_lost node (for old version of CH). + /// We must update old version(Replica doesn't have /is_lost) of replica. zookeeper->create(replica_path + "/is_lost", "0", zkutil::CreateMode::Persistent); return; } String source_replica; - zkutil::Stat is_lost_stat; - is_lost_stat.version = -1; + zkutil::Stat source_is_lost_stat; + source_is_lost_stat.version = -1; for (const String & replica_name : zookeeper->getChildren(zookeeper_path + "/replicas")) { @@ -2068,7 +2067,7 @@ void StorageReplicatedMergeTree::cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zooke if (source_replica_path != replica_path) { String resp; - if (!zookeeper->tryGet(source_replica_path + "/is_lost", resp, &is_lost_stat) || resp == "0") + if (!zookeeper->tryGet(source_replica_path + "/is_lost", resp, &source_is_lost_stat) || resp == "0") source_replica = replica_name; } } @@ -2076,7 +2075,7 @@ void StorageReplicatedMergeTree::cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zooke if (source_replica == "") throw Exception("All replicas are lost", ErrorCodes::ALL_REPLICAS_LOST); - cloneReplica(source_replica, is_lost_stat, zookeeper); + cloneReplica(source_replica, source_is_lost_stat, zookeeper); zookeeper->set(replica_path + "/is_lost", "0"); } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index 82f4029874819d6206abc982a7281b83b0da93e6..519859062bd310ad7d8ad04c20d0a3b86f34cafe 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -404,7 +404,7 @@ private: * return true, if replica wil be cloned * else return false (when source_replica was lost or log was empty). */ - void cloneReplica(const String & source_replica, zkutil::Stat is_lost_stat, zkutil::ZooKeeperPtr & zookeeper); + void cloneReplica(const String & source_replica, zkutil::Stat source_is_lost_stat, zkutil::ZooKeeperPtr & zookeeper); /// Clone replica if it is lost. void cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zookeeper);