提交 9beb8414 编写于 作者: V VadimPE

CLICKHOUSE-3847 fix throw and del log

上级 1f7ec5d5
......@@ -390,10 +390,7 @@ namespace ErrorCodes
extern const int CANNOT_SET_SIGNAL_HANDLER = 413;
extern const int CANNOT_READLINE = 414;
extern const int ALL_REPLICAS_LOST = 415;
extern const int CAN_NOT_CLONE_REPLICA = 416;
extern const int REPLICA_IS_ACTIVE = 417;
extern const int SOURCE_REPLICA_IS_LOST = 418;
extern const int UPDATE_CH = 419;
extern const int REPLICA_STATUS_CHANGED = 416;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;
......
......@@ -13,7 +13,7 @@ namespace ErrorCodes
{
extern const int NOT_FOUND_NODE;
extern const int ALL_REPLICAS_LOST;
extern const int REPLICA_IS_ACTIVE;
extern const int REPLICA_STATUS_CHANGED;
}
......@@ -81,7 +81,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
return;
Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas", &stat);
UInt64 min_pointer_active_replica = std::numeric_limits<UInt64>::max();
UInt64 min_saved_log_pointer = std::numeric_limits<UInt64>::max();
Strings entries = zookeeper->getChildren(storage.zookeeper_path + "/log");
......@@ -93,66 +93,63 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
String min_saved_record_log_str = entries[entries.size() > storage.data.settings.max_replicated_logs_to_keep.value
? entries.size() - storage.data.settings.max_replicated_logs_to_keep.value
: 0];
String min_pointer_inactive_replica_str;
std::unordered_map<String, UInt32> hosts_version;
std::unordered_map<String, String> log_pointers_lost_replicas;
std::unordered_map<String, UInt32> host_versions_inactive_replicas;
std::unordered_map<String, String> log_pointers_inactive_replicas;
for (const String & replica : replicas)
{
zkutil::Stat host_stat;
zkutil::Stat log_pointer_stat;
zkutil::Stat host_stat, log_pointer_stat;
zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/host", &host_stat);
String pointer = zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer", &log_pointer_stat);
if (pointer.empty())
return;
UInt32 log_pointer = parse<UInt64>(pointer);
String log_pointer_str = "log-" + padIndex(log_pointer);
/// Check status of replica (active or not).
/// If replica was not active, we could check when it's log_pointer locates.
if (zookeeper->exists(storage.zookeeper_path + "/replicas/" + replica + "/is_active") || !zookeeper->exists(storage.zookeeper_path + "/replicas/" + replica + "/is_lost"))
min_pointer_active_replica = std::min(min_pointer_active_replica, log_pointer);
if (zookeeper->exists(storage.zookeeper_path + "/replicas/" + replica + "/is_active"))
min_saved_log_pointer = std::min(min_saved_log_pointer, log_pointer);
else
{
/// We can not mark lost replicas twice.
if (zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/is_lost") == "0")
{
hosts_version[replica] = host_stat.version;
log_pointers_lost_replicas[replica] = log_pointer_str;
if (log_pointer_str >= min_saved_record_log_str)
String res;
if (!zookeeper->tryGet(storage.zookeeper_path + "/replicas/" + replica + "/is_lost", res))
min_saved_log_pointer = std::min(min_saved_log_pointer, log_pointer);
else
if (res == "0")
{
if (min_pointer_inactive_replica_str != "" && min_pointer_inactive_replica_str >= log_pointer_str)
min_pointer_inactive_replica_str = log_pointer_str;
else if (min_pointer_inactive_replica_str == "")
min_pointer_inactive_replica_str = log_pointer_str;
String log_pointer_str = "log-" + padIndex(log_pointer);
if (log_pointer_str >= min_saved_record_log_str)
min_saved_log_pointer = std::min(min_saved_log_pointer, log_pointer);
else
{
host_versions_inactive_replicas[replica] = host_stat.version;
log_pointers_inactive_replicas[replica] = log_pointer_str;
}
}
}
else
host_versions_inactive_replicas[replica] = host_stat.version;
}
}
String min_pointer_active_replica_str = "log-" + padIndex(min_pointer_active_replica);
String min_pointer_replica_str = min_pointer_inactive_replica_str == ""
? min_pointer_active_replica_str
: std::min(min_pointer_inactive_replica_str, min_pointer_active_replica_str);
/// We will not touch the last `min_replicated_logs_to_keep` records.
entries.erase(entries.end() - std::min(entries.size(), storage.data.settings.min_replicated_logs_to_keep.value), entries.end());
/// We will not touch records that are no less than `min_pointer_active_replica`.
entries.erase(std::lower_bound(entries.begin(), entries.end(), min_pointer_replica_str), entries.end());
entries.erase(std::lower_bound(entries.begin(), entries.end(), "log-" + padIndex(min_saved_log_pointer)), entries.end());
if (entries.empty())
return;
markLostReplicas(hosts_version, log_pointers_lost_replicas, entries.back(), zookeeper);
markLostReplicas(host_versions_inactive_replicas, log_pointers_inactive_replicas, replicas.size(), entries.back(), zookeeper);
zkutil::Requests ops;
for (size_t i = 0; i < entries.size(); ++i)
{
ops.emplace_back(zkutil::makeRemoveRequest(storage.zookeeper_path + "/log/" + entries[i], -1));
for (auto host_version: host_versions_inactive_replicas)
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas" + host_version.first + "/host", host_version.second));
if (ops.size() > 4 * zkutil::MULTI_BATCH_SIZE || i + 1 == entries.size())
{
......@@ -167,39 +164,39 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
}
void ReplicatedMergeTreeCleanupThread::markLostReplicas(const std::unordered_map<String, UInt32> & hosts_version,
const std::unordered_map<String, String> & log_pointers_lost_replicas,
const String & remove_border, const zkutil::ZooKeeperPtr & zookeeper)
void ReplicatedMergeTreeCleanupThread::markLostReplicas(const std::unordered_map<String, UInt32> & host_versions_inactive_replicas,
const std::unordered_map<String, String> & log_pointers_inactive_replicas,
size_t replicas_count, const String & remove_border, const zkutil::ZooKeeperPtr & zookeeper)
{
std::vector<zkutil::Requests> requests;
std::vector<zkutil::ZooKeeper::FutureMulti> futures;
for (auto pair : log_pointers_lost_replicas)
for (auto pair : log_pointers_inactive_replicas)
{
String replica = pair.first;
if (pair.second <= remove_border)
{
zkutil::Requests ops;
/// If host changed version we can not mark replicas, because replica started to be active.
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas/" + replica + "/host", hosts_version.at(replica)));
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas/" + replica + "/host", host_versions_inactive_replicas.at(replica)));
ops.emplace_back(zkutil::makeSetRequest(storage.zookeeper_path + "/replicas/" + replica + "/is_lost", "1", -1));
requests.push_back(ops);
}
}
if (requests.size() == (zookeeper->getChildren(storage.zookeeper_path + "/replicas")).size())
throw Exception("All replicas are lost", ErrorCodes::ALL_REPLICAS_LOST);
if (requests.size() == replicas_count)
throw Exception("All replicas wiil be lost", ErrorCodes::ALL_REPLICAS_LOST);
for (auto & req : requests)
futures.push_back(zookeeper->tryAsyncMulti(req));
for (auto & future : futures)
for (size_t i = 0; i < futures.size(); ++i)
{
auto res = future.get();
if (res.error == ZooKeeperImpl::ZooKeeper::ZBADVERSION)
throw Exception("One of the replicas became active, when we clear log", ErrorCodes::REPLICA_IS_ACTIVE);
else if (res.error != ZooKeeperImpl::ZooKeeper::ZOK)
throw;
auto multi_responses = futures[i].get();
if (multi_responses.responses[0]->error == ZooKeeperImpl::ZooKeeper::ZBADVERSION)
throw Exception("One of the replicas became active, when we clear log", DB::ErrorCodes::REPLICA_STATUS_CHANGED);
else if (multi_responses.responses[0]->error == ZooKeeperImpl::ZooKeeper::ZOK)
zkutil::KeeperMultiException::check(multi_responses.error, requests[i], multi_responses.responses);
}
}
......
......@@ -47,9 +47,9 @@ private:
void clearOldLogs();
/// Mark lost replicas.
void markLostReplicas(const std::unordered_map<String, UInt32> & hosts_version,
const std::unordered_map<String, String> & log_pointers_lost_replicas,
const String & remove_border, const zkutil::ZooKeeperPtr & zookeeper);
void markLostReplicas(const std::unordered_map<String, UInt32> & host_versions_inactive_replicas,
const std::unordered_map<String, String> & log_pointers_inactive_replicas,
size_t replicas_count, const String & remove_border, const zkutil::ZooKeeperPtr & zookeeper);
/// Remove old block hashes from ZooKeeper. This is done by the leader replica.
void clearOldBlocks();
......
......@@ -107,9 +107,7 @@ namespace ErrorCodes
extern const int CANNOT_ASSIGN_OPTIMIZE;
extern const int KEEPER_EXCEPTION;
extern const int ALL_REPLICAS_LOST;
extern const int CAN_NOT_CLONE_REPLICA;
extern const int SOURCE_REPLICA_IS_LOST;
extern const int UPDATE_CH;
extern const int REPLICA_STATUS_CHANGED;
}
namespace ActionLocks
......@@ -614,13 +612,14 @@ void StorageReplicatedMergeTree::createReplica()
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/is_lost", is_lost_value, zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/columns", getColumns().toString(), zkutil::CreateMode::Persistent));
/// Check version of /replicas to see if there are any replicas.
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/replicas", replicas_stat.version));
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/replicas", "last added replica: " + replica_name, -1));
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/replicas", "last added replica: " + replica_name, replicas_stat.version));
code = zookeeper->tryMulti(ops, resps);
if (code == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS)
throw Exception("Replica " + replica_path + " already exists.", ErrorCodes::REPLICA_IS_ALREADY_EXIST);
} while (code != ZooKeeperImpl::ZooKeeper::ZOK);
else if (code != ZooKeeperImpl::ZooKeeper::ZBADVERSION)
zkutil::KeeperMultiException::check(code, ops, resps);
} while (code == ZooKeeperImpl::ZooKeeper::ZBADVERSION);
}
......@@ -1949,13 +1948,13 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
}
void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zkutil::Stat is_lost_stat, zkutil::ZooKeeperPtr & zookeeper)
void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zkutil::Stat source_is_lost_stat, zkutil::ZooKeeperPtr & zookeeper)
{
LOG_INFO(log, "Will mimic " << source_replica);
String source_path = zookeeper_path + "/replicas/" + source_replica;
/** That check will be delete (It is only for old version of CH server).
/** TODO: it will be deleted! (It is only for old version of CH server).
* If the reference/master replica is not yet fully created, let's wait.
* NOTE: If something went wrong while creating it, we can hang around forever.
* You can create an ephemeral node at the time of creation to make sure that the replica is created, and not abandoned.
......@@ -1983,23 +1982,23 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zku
zkutil::Requests ops;
ops.push_back(zkutil::makeSetRequest(replica_path + "/log_pointer", raw_log_pointer, -1));
if (is_lost_stat.version == -1)
if (source_is_lost_stat.version == -1)
{
ops.push_back(zkutil::makeCreateRequest(replica_path + "/is_lost", "0", zkutil::CreateMode::PersistentSequential));
ops.push_back(zkutil::makeRemoveRequest(replica_path + "/is_lost", -1));
}
else
ops.push_back(zkutil::makeCheckRequest(source_path + "/is_lost", is_lost_stat.version));
ops.push_back(zkutil::makeCheckRequest(source_path + "/is_lost", source_is_lost_stat.version));
zkutil::Responses resp;
auto error = zookeeper->tryMulti(ops, resp);
if (error == ZooKeeperImpl::ZooKeeper::ZBADVERSION)
throw Exception("Can not clone replica, because a source replica is lost", ErrorCodes::SOURCE_REPLICA_IS_LOST);
throw Exception("Can not clone replica, because a source replica is lost", ErrorCodes::REPLICA_STATUS_CHANGED);
else if (error == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS)
throw Exception("ClickHouse server updated to new version", ErrorCodes::UPDATE_CH);
else if (error != ZooKeeperImpl::ZooKeeper::ZOK)
throw ("cloneReplica() failed");
throw Exception("ClickHouse server updated to new version", ErrorCodes::REPLICA_STATUS_CHANGED);
else
zkutil::KeeperMultiException::check(error, ops, resp);
/// Let's remember the queue of the reference/master replica.
......@@ -2052,14 +2051,14 @@ void StorageReplicatedMergeTree::cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zooke
}
else
{
/// If old replica is_active, and we must create is_lost node (for old version of CH).
/// We must update old version(Replica doesn't have /is_lost) of replica.
zookeeper->create(replica_path + "/is_lost", "0", zkutil::CreateMode::Persistent);
return;
}
String source_replica;
zkutil::Stat is_lost_stat;
is_lost_stat.version = -1;
zkutil::Stat source_is_lost_stat;
source_is_lost_stat.version = -1;
for (const String & replica_name : zookeeper->getChildren(zookeeper_path + "/replicas"))
{
......@@ -2068,7 +2067,7 @@ void StorageReplicatedMergeTree::cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zooke
if (source_replica_path != replica_path)
{
String resp;
if (!zookeeper->tryGet(source_replica_path + "/is_lost", resp, &is_lost_stat) || resp == "0")
if (!zookeeper->tryGet(source_replica_path + "/is_lost", resp, &source_is_lost_stat) || resp == "0")
source_replica = replica_name;
}
}
......@@ -2076,7 +2075,7 @@ void StorageReplicatedMergeTree::cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zooke
if (source_replica == "")
throw Exception("All replicas are lost", ErrorCodes::ALL_REPLICAS_LOST);
cloneReplica(source_replica, is_lost_stat, zookeeper);
cloneReplica(source_replica, source_is_lost_stat, zookeeper);
zookeeper->set(replica_path + "/is_lost", "0");
}
......
......@@ -404,7 +404,7 @@ private:
* return true, if replica wil be cloned
* else return false (when source_replica was lost or log was empty).
*/
void cloneReplica(const String & source_replica, zkutil::Stat is_lost_stat, zkutil::ZooKeeperPtr & zookeeper);
void cloneReplica(const String & source_replica, zkutil::Stat source_is_lost_stat, zkutil::ZooKeeperPtr & zookeeper);
/// Clone replica if it is lost.
void cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zookeeper);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册