未验证 提交 636a3817 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #9552 from ClickHouse/fix-race-condition-queue-task

Fixed race condition on queue_task_handle
......@@ -60,19 +60,27 @@ BackgroundProcessingPool::BackgroundProcessingPool(int size_,
}
BackgroundProcessingPool::TaskHandle BackgroundProcessingPool::addTask(const Task & task)
BackgroundProcessingPool::TaskHandle BackgroundProcessingPool::createTask(const Task & task)
{
TaskHandle res = std::make_shared<TaskInfo>(*this, task);
return std::make_shared<TaskInfo>(*this, task);
}
void BackgroundProcessingPool::startTask(const TaskHandle & task)
{
Poco::Timestamp current_time;
{
std::unique_lock lock(tasks_mutex);
res->iterator = tasks.emplace(current_time, res);
task->iterator = tasks.emplace(current_time, task);
}
wake_event.notify_all();
}
BackgroundProcessingPool::TaskHandle BackgroundProcessingPool::addTask(const Task & task)
{
TaskHandle res = createTask(task);
startTask(res);
return res;
}
......
......@@ -82,9 +82,14 @@ public:
return size;
}
/// The task is started immediately.
/// Create task and start it.
TaskHandle addTask(const Task & task);
/// Create task but not start it.
TaskHandle createTask(const Task & task);
/// Start the task that was created but not started. Precondition: task was not started.
void startTask(const TaskHandle & task);
void removeTask(const TaskHandle & task);
~BackgroundProcessingPool();
......
......@@ -2902,9 +2902,20 @@ void StorageReplicatedMergeTree::startup()
/// Wait while restarting_thread initializes LeaderElection (and so on) or makes first attmept to do it
startup_event.wait();
queue_task_handle = global_context.getBackgroundPool().addTask([this] { return queueTask(); });
/// If we don't separate create/start steps, race condition will happen
/// between the assignment of queue_task_handle and queueTask that use the queue_task_handle.
{
auto & pool = global_context.getBackgroundPool();
queue_task_handle = pool.createTask([this] { return queueTask(); });
pool.startTask(queue_task_handle);
}
if (areBackgroundMovesNeeded())
move_parts_task_handle = global_context.getBackgroundMovePool().addTask([this] { return movePartsTask(); });
{
auto & pool = global_context.getBackgroundMovePool();
move_parts_task_handle = pool.createTask([this] { return movePartsTask(); });
pool.startTask(move_parts_task_handle);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册