未验证 提交 7a32ca05 编写于 作者: N Nikolai Kochetov 提交者: GitHub

Merge pull request #7670 from excitoon-favorites/move_pool

Separated pool for background moves
......@@ -76,6 +76,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, use_uncompressed_cache, true, "Whether to use the cache of uncompressed blocks.", 0) \
M(SettingBool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.", 0) \
M(SettingUInt64, background_pool_size, 16, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server startup.", 0) \
M(SettingUInt64, background_move_pool_size, 8, "Number of threads performing background moves for tables. Only has meaning at server startup.", 0) \
M(SettingUInt64, background_schedule_pool_size, 16, "Number of threads performing background tasks for replicated tables. Only has meaning at server startup.", 0) \
\
M(SettingMilliseconds, distributed_directory_monitor_sleep_time_ms, 100, "Sleep time for StorageDistributed DirectoryMonitors, in case of any errors delay grows exponentially.", 0) \
......
......@@ -140,6 +140,7 @@ struct ContextShared
ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
std::optional<BackgroundProcessingPool> background_pool; /// The thread pool for the background work performed by the tables.
std::optional<BackgroundProcessingPool> background_move_pool; /// The thread pool for the background moves performed by the tables.
std::optional<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
MultiVersion<Macros> macros; /// Substitutions extracted from config.
std::unique_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk.
......@@ -287,6 +288,7 @@ struct ContextShared
external_dictionaries_loader.reset();
external_models_loader.reset();
background_pool.reset();
background_move_pool.reset();
schedule_pool.reset();
ddl_worker.reset();
......@@ -1489,6 +1491,14 @@ BackgroundProcessingPool & Context::getBackgroundPool()
return *shared->background_pool;
}
BackgroundProcessingPool & Context::getBackgroundMovePool()
{
auto lock = getLock();
if (!shared->background_move_pool)
shared->background_move_pool.emplace(settings.background_move_pool_size, "BackgroundMovePool", "BgMoveProcPool");
return *shared->background_move_pool;
}
BackgroundSchedulePool & Context::getSchedulePool()
{
auto lock = getLock();
......
......@@ -450,6 +450,7 @@ public:
void dropCaches() const;
BackgroundProcessingPool & getBackgroundPool();
BackgroundProcessingPool & getBackgroundMovePool();
BackgroundSchedulePool & getSchedulePool();
void setDDLWorker(std::unique_ptr<DDLWorker> ddl_worker);
......
......@@ -61,9 +61,12 @@ void BackgroundProcessingPoolTaskInfo::wake()
}
BackgroundProcessingPool::BackgroundProcessingPool(int size_) : size(size_)
BackgroundProcessingPool::BackgroundProcessingPool(int size_, const char * log_name, const char * thread_name_)
: size(size_)
, thread_name(thread_name_)
{
LOG_INFO(&Logger::get("BackgroundProcessingPool"), "Create BackgroundProcessingPool with " << size << " threads");
logger = &Logger::get(log_name);
LOG_INFO(logger, "Create " << log_name << " with " << size << " threads");
threads.resize(size);
for (auto & thread : threads)
......@@ -122,7 +125,7 @@ BackgroundProcessingPool::~BackgroundProcessingPool()
void BackgroundProcessingPool::threadFunction()
{
setThreadName("BackgrProcPool");
setThreadName(thread_name);
{
std::lock_guard lock(tasks_mutex);
......
......@@ -46,7 +46,9 @@ public:
using TaskHandle = std::shared_ptr<TaskInfo>;
BackgroundProcessingPool(int size_);
BackgroundProcessingPool(int size_,
const char * log_name = "BackgroundProcessingPool",
const char * thread_name_ = "BackgrProcPool");
size_t getNumberOfThreads() const
{
......@@ -67,6 +69,8 @@ protected:
using Threads = std::vector<ThreadFromGlobalPool>;
const size_t size;
const char * thread_name;
Poco::Logger * logger;
Tasks tasks; /// Ordered in priority.
std::mutex tasks_mutex;
......
......@@ -3473,6 +3473,11 @@ bool MergeTreeData::selectPartsAndMove()
return moveParts(std::move(moving_tagger));
}
bool MergeTreeData::areBackgroundMovesNeeded() const
{
return storage_policy->getVolumes().size() > 1;
}
bool MergeTreeData::movePartsToSpace(const DataPartsVector & parts, DiskSpace::SpacePtr space)
{
if (parts_mover.moves_blocker.isCancelled())
......
......@@ -939,6 +939,8 @@ protected:
/// Selects parts for move and moves them, used in background process
bool selectPartsAndMove();
bool areBackgroundMovesNeeded() const;
private:
/// RAII Wrapper for atomic work with currently moving parts
/// Acuire them in constructor and remove them in destructor
......
......@@ -99,7 +99,8 @@ void StorageMergeTree::startup()
/// NOTE background task will also do the above cleanups periodically.
time_after_previous_cleanup.restart();
merging_mutating_task_handle = global_context.getBackgroundPool().addTask([this] { return mergeMutateTask(); });
moving_task_handle = global_context.getBackgroundPool().addTask([this] { return movePartsTask(); });
if (areBackgroundMovesNeeded())
moving_task_handle = global_context.getBackgroundMovePool().addTask([this] { return movePartsTask(); });
}
......@@ -115,7 +116,7 @@ void StorageMergeTree::shutdown()
global_context.getBackgroundPool().removeTask(merging_mutating_task_handle);
if (moving_task_handle)
global_context.getBackgroundPool().removeTask(moving_task_handle);
global_context.getBackgroundMovePool().removeTask(moving_task_handle);
}
......
......@@ -2878,7 +2878,8 @@ void StorageReplicatedMergeTree::startup()
data_parts_exchange_endpoint->getId(replica_path), data_parts_exchange_endpoint, global_context.getInterserverIOHandler());
queue_task_handle = global_context.getBackgroundPool().addTask([this] { return queueTask(); });
move_parts_task_handle = global_context.getBackgroundPool().addTask([this] { return movePartsTask(); });
if (areBackgroundMovesNeeded())
move_parts_task_handle = global_context.getBackgroundMovePool().addTask([this] { return movePartsTask(); });
/// In this thread replica will be activated.
restarting_thread.start();
......@@ -2902,7 +2903,7 @@ void StorageReplicatedMergeTree::shutdown()
queue_task_handle.reset();
if (move_parts_task_handle)
global_context.getBackgroundPool().removeTask(move_parts_task_handle);
global_context.getBackgroundMovePool().removeTask(move_parts_task_handle);
move_parts_task_handle.reset();
if (data_parts_exchange_endpoint_holder)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册