提交 79d5c899 编写于 作者: A Alexey Arno

dbms: Fault tolerance improvement. Fixed a race condition. Some cleanup. [#METR-18510]

上级 4c64c5e1
...@@ -30,8 +30,11 @@ ...@@ -30,8 +30,11 @@
#include <Poco/SharedPtr.h> #include <Poco/SharedPtr.h>
#include <openssl/sha.h> #include <openssl/sha.h>
#include <future> #include <future>
#include <chrono> #include <chrono>
#include <cstdlib>
#include <ctime>
namespace DB namespace DB
{ {
...@@ -582,6 +585,12 @@ void ReshardingWorker::publishShardedPartitions() ...@@ -582,6 +585,12 @@ void ReshardingWorker::publishShardedPartitions()
{ {
} }
TaskInfo(const TaskInfo &) = delete;
TaskInfo & operator=(const TaskInfo &) = delete;
TaskInfo(TaskInfo &&) = default;
TaskInfo & operator=(TaskInfo &&) = default;
std::string replica_path; std::string replica_path;
ReplicatedMergeTreeAddress dest; ReplicatedMergeTreeAddress dest;
std::string part; std::string part;
...@@ -704,17 +713,24 @@ void ReshardingWorker::publishShardedPartitions() ...@@ -704,17 +713,24 @@ void ReshardingWorker::publishShardedPartitions()
void ReshardingWorker::applyChanges() void ReshardingWorker::applyChanges()
{ {
/// Note: since this method actually performs a distributed commit (i.e. it
/// attaches partitions on various shards), we should implement a two-phase
/// commit protocol in a future release in order to get even more safety
/// guarantees.
LOG_DEBUG(log, "Attaching new partitions."); LOG_DEBUG(log, "Attaching new partitions.");
auto & storage = *(current_job.storage); auto & storage = *(current_job.storage);
auto zookeeper = context.getZooKeeper(); auto zookeeper = context.getZooKeeper();
/// На локальном узле удалить первоначальную партицию. /// Locally drop the initial partition.
std::string query_str = "ALTER TABLE " + current_job.database_name + "." std::string query_str = "ALTER TABLE " + current_job.database_name + "."
+ current_job.table_name + " DROP PARTITION " + current_job.partition; + current_job.table_name + " DROP PARTITION " + current_job.partition;
(void) executeQuery(query_str, context, true); (void) executeQuery(query_str, context, true);
/// На всех участвующих репликах добавить соответствующие шардированные партиции в таблицу. /// On each participating shard, attach the corresponding sharded partition to the table.
/// Description of a task on a replica.
struct TaskInfo struct TaskInfo
{ {
TaskInfo(const std::string & replica_path_, const ReplicatedMergeTreeAddress & dest_) TaskInfo(const std::string & replica_path_, const ReplicatedMergeTreeAddress & dest_)
...@@ -722,13 +738,52 @@ void ReshardingWorker::applyChanges() ...@@ -722,13 +738,52 @@ void ReshardingWorker::applyChanges()
{ {
} }
TaskInfo(const TaskInfo &) = delete;
TaskInfo & operator=(const TaskInfo &) = delete;
TaskInfo(TaskInfo &&) = default;
TaskInfo & operator=(TaskInfo &&) = default;
std::string replica_path; std::string replica_path;
ReplicatedMergeTreeAddress dest; ReplicatedMergeTreeAddress dest;
}; };
using TaskInfoList = std::vector<TaskInfo>; /// Description of tasks for each replica of a shard.
/// For fault tolerance purposes, some fields are provided
/// to perform attempts on more than one replica if needed.
struct ShardTaskInfo
{
ShardTaskInfo()
{
struct timespec times;
if (clock_gettime(CLOCK_THREAD_CPUTIME_ID, &times))
throwFromErrno("Cannot clock_gettime.", DB::ErrorCodes::CANNOT_CLOCK_GETTIME);
(void) srand48_r(reinterpret_cast<intptr_t>(this) ^ times.tv_nsec, &rand_state);
}
ShardTaskInfo(const ShardTaskInfo &) = delete;
ShardTaskInfo & operator=(const ShardTaskInfo &) = delete;
ShardTaskInfo(ShardTaskInfo &&) = default;
ShardTaskInfo & operator=(ShardTaskInfo &&) = default;
/// one task for each replica
std::vector<TaskInfo> shard_tasks;
/// index to the replica to be used
size_t next = 0;
/// result of the operation on the current replica
bool is_success = false;
/// index to the corresponding thread pool entry
size_t pool_index;
drand48_data rand_state;
};
using TaskInfoList = std::vector<ShardTaskInfo>;
TaskInfoList task_info_list; TaskInfoList task_info_list;
/// Initialize all the possible tasks for each replica of each shard.
for (const auto & entry : storage.data.per_shard_data_parts) for (const auto & entry : storage.data.per_shard_data_parts)
{ {
size_t shard_no = entry.first; size_t shard_no = entry.first;
...@@ -739,55 +794,95 @@ void ReshardingWorker::applyChanges() ...@@ -739,55 +794,95 @@ void ReshardingWorker::applyChanges()
const WeightedZooKeeperPath & weighted_path = current_job.paths[shard_no]; const WeightedZooKeeperPath & weighted_path = current_job.paths[shard_no];
const std::string & zookeeper_path = weighted_path.first; const std::string & zookeeper_path = weighted_path.first;
task_info_list.emplace_back();
ShardTaskInfo & shard_task_info = task_info_list.back();
auto children = zookeeper->getChildren(zookeeper_path + "/replicas"); auto children = zookeeper->getChildren(zookeeper_path + "/replicas");
for (const auto & child : children) for (const auto & child : children)
{ {
const std::string replica_path = zookeeper_path + "/replicas/" + child; const std::string replica_path = zookeeper_path + "/replicas/" + child;
auto host = zookeeper->get(replica_path + "/host"); auto host = zookeeper->get(replica_path + "/host");
ReplicatedMergeTreeAddress host_desc(host); ReplicatedMergeTreeAddress host_desc(host);
task_info_list.emplace_back(replica_path, host_desc);
shard_task_info.shard_tasks.emplace_back(replica_path, host_desc);
} }
} }
boost::threadpool::pool pool(task_info_list.size()); /// Loop as long as there are ATTACH operations that need to be performed
/// on some shards and there remains at least one valid replica on each of
/// these shards.
size_t remaining_task_count = task_info_list.size();
while (remaining_task_count > 0)
{
boost::threadpool::pool pool(remaining_task_count);
using Tasks = std::vector<std::packaged_task<bool()> >; using Tasks = std::vector<std::packaged_task<bool()> >;
Tasks tasks(task_info_list.size()); Tasks tasks(remaining_task_count);
try try
{
for (size_t i = 0; i < task_info_list.size(); ++i)
{ {
const auto & entry = task_info_list[i]; size_t pool_index = 0;
const auto & replica_path = entry.replica_path; for (auto & info : task_info_list)
const auto & dest = entry.dest; {
if (info.is_success)
{
/// We have already successfully performed the operation on this shard.
continue;
}
/// Randomly choose a replica on which to perform the operation.
long int rand_res;
(void) lrand48_r(&info.rand_state, &rand_res);
size_t current = info.next + rand_res % (info.shard_tasks.size() - info.next);
std::swap(info.shard_tasks[info.next], info.shard_tasks[current]);
++info.next;
info.pool_index = pool_index;
InterserverIOEndpointLocation location(replica_path, dest.host, dest.replication_port); TaskInfo & cur_task_info = info.shard_tasks[info.next - 1];
std::string query_str = "ALTER TABLE " + dest.database + "." const auto & replica_path = cur_task_info.replica_path;
+ dest.table + " ATTACH PARTITION " + current_job.partition; const auto & dest = cur_task_info.dest;
tasks[i] = Tasks::value_type(std::bind(&RemoteQueryExecutor::Client::executeQuery, /// Create and register the task.
&storage.remote_query_executor_client, location, query_str));
pool.schedule([i, &tasks]{ tasks[i](); }); InterserverIOEndpointLocation location(replica_path, dest.host, dest.replication_port);
std::string query_str = "ALTER TABLE " + dest.database + "."
+ dest.table + " ATTACH PARTITION " + current_job.partition;
tasks[pool_index] = Tasks::value_type(std::bind(&RemoteQueryExecutor::Client::executeQuery,
&storage.remote_query_executor_client, location, query_str));
pool.schedule([pool_index, &tasks]{ tasks[pool_index](); });
/// Allocate an entry for the next task.
++pool_index;
}
} }
} catch (...)
catch (...) {
{ pool.wait();
tryLogCurrentException(__PRETTY_FUNCTION__); throw;
}
pool.wait(); pool.wait();
throw;
}
pool.wait(); for (auto & info : task_info_list)
{
if (info.is_success)
continue;
for (auto & task : tasks) info.is_success = tasks[info.pool_index].get_future().get();
{ if (info.is_success)
bool res = task.get_future().get(); --remaining_task_count;
if (!res) else if (info.next == info.shard_tasks.size())
throw Exception("Failed to attach partition on replica", {
ErrorCodes::PARTITION_ATTACH_FAILED); /// No more attempts are possible.
throw Exception("Failed to attach partition on shard",
ErrorCodes::PARTITION_ATTACH_FAILED);
}
}
} }
} }
...@@ -863,8 +958,7 @@ std::string ReshardingWorker::createCoordinator(const Cluster & cluster) ...@@ -863,8 +958,7 @@ std::string ReshardingWorker::createCoordinator(const Cluster & cluster)
"", zkutil::CreateMode::Persistent); "", zkutil::CreateMode::Persistent);
(void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/status", (void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/status",
"", zkutil::CreateMode::Persistent); toString(static_cast<UInt64>(STATUS_OK)), zkutil::CreateMode::Persistent);
setStatus(coordinator_id, STATUS_OK);
(void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/partitions", (void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/partitions",
"", zkutil::CreateMode::Persistent); "", zkutil::CreateMode::Persistent);
...@@ -995,9 +1089,7 @@ UInt64 ReshardingWorker::subscribe(const std::string & coordinator_id, const std ...@@ -995,9 +1089,7 @@ UInt64 ReshardingWorker::subscribe(const std::string & coordinator_id, const std
throw zkutil::KeeperException(code); throw zkutil::KeeperException(code);
zookeeper->create(getCoordinatorPath(coordinator_id) + "/status/" zookeeper->create(getCoordinatorPath(coordinator_id) + "/status/"
+ current_host, "", zkutil::CreateMode::Persistent); + current_host, toString(static_cast<UInt64>(STATUS_OK)), zkutil::CreateMode::Persistent);
setStatus(coordinator_id, current_host, STATUS_OK);
/// Assign a unique block number to the current node. We will use it in order /// Assign a unique block number to the current node. We will use it in order
/// to avoid any possible conflict when uploading resharded partitions. /// to avoid any possible conflict when uploading resharded partitions.
......
...@@ -3761,6 +3761,12 @@ StorageReplicatedMergeTree::gatherReplicaSpaceInfo(const WeightedZooKeeperPaths ...@@ -3761,6 +3761,12 @@ StorageReplicatedMergeTree::gatherReplicaSpaceInfo(const WeightedZooKeeperPaths
{ {
} }
TaskInfo(const TaskInfo &) = delete;
TaskInfo & operator=(const TaskInfo &) = delete;
TaskInfo(TaskInfo &&) = default;
TaskInfo & operator=(TaskInfo &&) = default;
std::string replica_path; std::string replica_path;
ReplicatedMergeTreeAddress address; ReplicatedMergeTreeAddress address;
}; };
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册