提交 4064ec0b 编写于 作者: V VadimPE

CLICKHOUSE-3847 add value '1' and '2' in is_lost

上级 0c119d76
......@@ -377,6 +377,8 @@ namespace ErrorCodes
extern const int CANNOT_STAT = 400;
extern const int FEATURE_IS_NOT_ENABLED_AT_BUILD_TIME = 401;
extern const int CANNOT_IOSETUP = 402;
extern const int ALL_REPLICAS_LOST = 413;
extern const int CAN_NOT_CLONE_REPLICA = 414;
extern const int KEEPER_EXCEPTION = 999;
......
......@@ -185,7 +185,7 @@ bool ReplicatedMergeTreeCleanupThread::markLostReplicas(const std::unordered_map
/// If log pointer and host change version we can not mark replicas, so we check it.
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas/" + replica + "/host", hosts_version.at(replica)));
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer", log_pointers_version.at(replica)));
ops.emplace_back(zkutil::makeCreateRequest(storage.zookeeper_path + "/replicas/" + replica + "/is_lost", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeSetRequest(storage.zookeeper_path + "/replicas/" + replica + "/is_lost", "1", zkutil::CreateMode::Persistent));
futures.push_back(zookeeper->tryAsyncMulti(ops));
}
}
......
......@@ -600,6 +600,7 @@ void StorageReplicatedMergeTree::createReplica()
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/queue", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/parts", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/flags", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/is_lost", "1", zkutil::CreateMode::Persistent));
try
{
......@@ -612,6 +613,10 @@ void StorageReplicatedMergeTree::createReplica()
throw;
}
/// If replica is first, is lost will be "0".
if ((zookeeper->getChildren(zookeeper_path + "/replicas")).size() == 1)
zookeeper->set(replica_path + "/is_lost", "0", zkutil::CreateMode::Persistent);
/** You need to change the data of nodes/replicas to anything, so that the thread that removes old entries in the log,
* stumbled over this change and does not delete the entries we have not yet read.
......@@ -1943,12 +1948,14 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
bool StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zkutil::ZooKeeperPtr & zookeeper)
{
{
LOG_INFO(log, "Will mimic " << source_replica);
String source_path = zookeeper_path + "/replicas/" + source_replica;
zkutil::Stat is_lost_stat;
if (zookeeper->get(source_path + "/is_lost", &is_lost_stat) == "1")
return false;
/** If the reference/master replica is not yet fully created, let's wait.
* NOTE: If something went wrong while creating it, we can hang around forever.
......@@ -1972,16 +1979,28 @@ bool StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zku
/// The order of the following three actions is important. Entries in the log can be duplicated, but they can not be lost.
zkutil::Requests ops;
/// We must set log_pointer atomically in order to cleanupThread can not clear log with our log_pointer.
String raw_log_pointer = zookeeper->get(source_path + "/log_pointer");
zookeeper->set(replica_path + "/log_pointer", raw_log_pointer, -1);
zkutil::Responses resp;
ops.push_back(zkutil::makeCheckRequest(source_path + "/is_lost", is_lost_stat.version));
ops.push_back(zkutil::makeSetRequest(replica_path + "/log_pointer", raw_log_pointer, -1));
auto error = zookeeper->tryMulti(ops, resp);
if (error != ZooKeeperImpl::ZooKeeper::ZOK)
{
LOG_DEBUG(log, "Can not clone replica, because a source replica is lost");
return false;
}
ops.clear();
resp.clear();
/// Check that log_pointer in entries.
Strings entries = zookeeper->getChildren(zookeeper_path + "/log");
if (entries.empty())
LOG_DEBUG(log, "Can not clone replica, because log is empty");
return false;
return true;
auto min_record = std::min_element(entries.begin(), entries.end());
......@@ -2003,6 +2022,16 @@ bool StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zku
continue;
source_queue.push_back(entry);
}
ops.push_back(zkutil::makeCheckRequest(source_path + "/is_lost", is_lost_stat.version));
error = zookeeper->tryMulti(ops, resp);
if (error != ZooKeeperImpl::ZooKeeper::ZOK)
{
LOG_DEBUG(log, "Can not clone replica, because a source replica is lost");
return false;
}
ops.clear();
resp.clear();
/// Add to the queue jobs to receive all the active parts that the reference/master replica has.
Strings parts = zookeeper->getChildren(source_path + "/parts");
......@@ -2017,8 +2046,18 @@ bool StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zku
log_entry.new_part_name = name;
log_entry.create_time = tryGetPartCreateTime(zookeeper, source_path, name);
zookeeper->create(replica_path + "/queue/queue-", log_entry.toString(), zkutil::CreateMode::PersistentSequential);
ops.push_back(zkutil::makeCheckRequest(source_path + "/is_lost", is_lost_stat.version));
ops.push_back(zkutil::makeCreateRequest(replica_path + "/queue/queue-", log_entry.toString(), zkutil::CreateMode::PersistentSequential));
auto error = zookeeper->tryMulti(ops, resp);
if (error != ZooKeeperImpl::ZooKeeper::ZOK)
{
LOG_DEBUG(log, "Can not clone replica, because a source replica is lost");
return false;
}
ops.clear();
resp.clear();
}
LOG_DEBUG(log, "Queued " << active_parts.size() << " parts to be fetched");
/// Add content of the reference/master replica queue to the queue.
......@@ -2035,14 +2074,8 @@ bool StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zku
void StorageReplicatedMergeTree::cloneReplicaIfNeeded()
{
auto zookeeper = getZooKeeper();
String raw_log_pointer = zookeeper->get(replica_path + "/log_pointer");
Strings entries = zookeeper->getChildren(zookeeper_path + "/log");
if (entries.empty())
return;
if (!raw_log_pointer.empty() && !zookeeper->exists(replica_path + "/is_lost"))
if ((zookeeper->get(replica_path + "/is_lost") == "0"))
return;
String source_replica;
......@@ -2050,7 +2083,7 @@ void StorageReplicatedMergeTree::cloneReplicaIfNeeded()
for (const String & replica_name : zookeeper->getChildren(zookeeper_path + "/replicas"))
{
String source_replica_path = zookeeper_path + "/replicas/" + replica_name;
if ((source_replica_path != replica_path) && (!zookeeper->exists(source_replica_path + "/is_lost")))
if ((source_replica_path != replica_path) && (zookeeper->get(source_replica_path + "/is_lost") == "0"))
source_replica = replica_name;
}
......@@ -2058,9 +2091,9 @@ void StorageReplicatedMergeTree::cloneReplicaIfNeeded()
throw Exception("All replicas are lost", ErrorCodes::ALL_REPLICAS_LOST);
if (!cloneReplica(source_replica, zookeeper))
throw Exception("Can not clone replica from" + source_replica, ErrorCodes::CAN_NOT_CLONE_REPLICA);
throw Exception("Can not clone replica from " + source_replica, ErrorCodes::CAN_NOT_CLONE_REPLICA);
zookeeper->remove(replica_path + "/is_lost", -1);
zookeeper->set(replica_path + "/is_lost", "0");
}
......
......@@ -397,7 +397,10 @@ private:
void mutationsUpdatingTask();
/// Clone data from another replica.
/** Clone data from another replica.
* return true, if replica wil be cloned
* else return false (when source_replica was lost or log was empty).
*/
bool cloneReplica(const String & source_replica, zkutil::ZooKeeperPtr & zookeeper);
/// Clone replica if it is lost.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册