提交 5d11118c 编写于 作者: A Azat Khuzhin

Use thread pool (background_distributed_schedule_pool_size) for distributed sends

After #8756 the problem with 1 thread for each (distributed table, disk)
for distributed sends became even worse (since there can be multiple
disks), so use predefined thread pool for this tasks, that can be
controlled with background_distributed_schedule_pool_size knob.
上级 201d5d5b
......@@ -82,6 +82,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, background_pool_size, 16, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server startup.", 0) \
M(SettingUInt64, background_move_pool_size, 8, "Number of threads performing background moves for tables. Only has meaning at server startup.", 0) \
M(SettingUInt64, background_schedule_pool_size, 16, "Number of threads performing background tasks for replicated tables, kafka streaming, dns cache updates. Only has meaning at server startup.", 0) \
M(SettingUInt64, background_distributed_schedule_pool_size, 16, "Number of threads performing background tasks for distributed sends. Only has meaning at server startup.", 0) \
\
M(SettingMilliseconds, distributed_directory_monitor_sleep_time_ms, 100, "Sleep time for StorageDistributed DirectoryMonitors, in case of any errors delay grows exponentially.", 0) \
M(SettingMilliseconds, distributed_directory_monitor_max_sleep_time_ms, 30000, "Maximum sleep time for StorageDistributed DirectoryMonitors, it limits exponential growth too.", 0) \
......
......@@ -321,6 +321,7 @@ struct ContextShared
std::optional<BackgroundProcessingPool> background_pool; /// The thread pool for the background work performed by the tables.
std::optional<BackgroundProcessingPool> background_move_pool; /// The thread pool for the background moves performed by the tables.
std::optional<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
std::optional<BackgroundSchedulePool> distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends)
MultiVersion<Macros> macros; /// Substitutions extracted from config.
std::unique_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk.
/// Rules for selecting the compression settings, depending on the size of the part.
......@@ -418,6 +419,7 @@ struct ContextShared
background_pool.reset();
background_move_pool.reset();
schedule_pool.reset();
distributed_schedule_pool.reset();
ddl_worker.reset();
/// Stop trace collector if any
......@@ -1348,6 +1350,14 @@ BackgroundSchedulePool & Context::getSchedulePool()
return *shared->schedule_pool;
}
BackgroundSchedulePool & Context::getDistributedSchedulePool()
{
auto lock = getLock();
if (!shared->distributed_schedule_pool)
shared->distributed_schedule_pool.emplace(settings.background_distributed_schedule_pool_size);
return *shared->distributed_schedule_pool;
}
void Context::setDDLWorker(std::unique_ptr<DDLWorker> ddl_worker)
{
auto lock = getLock();
......
......@@ -475,6 +475,7 @@ public:
BackgroundProcessingPool & getBackgroundPool();
BackgroundProcessingPool & getBackgroundMovePool();
BackgroundSchedulePool & getSchedulePool();
BackgroundSchedulePool & getDistributedSchedulePool();
void setDDLWorker(std::unique_ptr<DDLWorker> ddl_worker);
DDLWorker & getDDLWorker() const;
......
#include <DataStreams/RemoteBlockOutputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <Common/escapeForFileName.h>
#include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/ClickHouseRevision.h>
......@@ -78,7 +77,7 @@ namespace
StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
StorageDistributed & storage_, std::string path_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_)
StorageDistributed & storage_, std::string path_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_, BackgroundSchedulePool & bg_pool_)
/// It's important to initialize members before `thread` to avoid race.
: storage(storage_)
, pool(std::move(pool_))
......@@ -92,7 +91,10 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
, max_sleep_time{storage.global_context.getSettingsRef().distributed_directory_monitor_max_sleep_time_ms.totalMilliseconds()}
, log{&Logger::get(getLoggerName())}
, monitor_blocker(monitor_blocker_)
, bg_pool(bg_pool_)
{
task_handle = bg_pool.createTask(getLoggerName() + "/Bg", [this]{ run(); });
task_handle->activateAndSchedule();
}
......@@ -102,7 +104,7 @@ StorageDistributedDirectoryMonitor::~StorageDistributedDirectoryMonitor()
{
quit = true;
cond.notify_one();
thread.join();
task_handle->deactivate();
}
}
......@@ -121,7 +123,7 @@ void StorageDistributedDirectoryMonitor::shutdownAndDropAllData()
{
quit = true;
cond.notify_one();
thread.join();
task_handle->deactivate();
}
Poco::File(path).remove(true);
......@@ -130,16 +132,11 @@ void StorageDistributedDirectoryMonitor::shutdownAndDropAllData()
void StorageDistributedDirectoryMonitor::run()
{
setThreadName("DistrDirMonitor");
std::unique_lock lock{mutex};
const auto quit_requested = [this] { return quit.load(std::memory_order_relaxed); };
while (!quit_requested())
while (!quit)
{
auto do_sleep = true;
bool do_sleep = true;
if (!monitor_blocker.isCancelled())
{
try
......@@ -161,15 +158,25 @@ void StorageDistributedDirectoryMonitor::run()
LOG_DEBUG(log, "Skipping send data over distributed table.");
}
if (do_sleep)
cond.wait_for(lock, sleep_time, quit_requested);
const auto now = std::chrono::system_clock::now();
if (now - last_decrease_time > decrease_error_count_period)
{
error_count /= 2;
last_decrease_time = now;
}
if (do_sleep)
break;
}
if (!quit)
{
/// If there is no error, then it will be scheduled by the DistributedBlockOutputStream,
/// so this is just in case, hence it is distributed_directory_monitor_max_sleep_time_ms
if (error_count)
task_handle->scheduleAfter(sleep_time.count());
else
task_handle->scheduleAfter(max_sleep_time.count());
}
}
......@@ -580,6 +587,13 @@ BlockInputStreamPtr StorageDistributedDirectoryMonitor::createStreamFromFile(con
return std::make_shared<DirectoryMonitorBlockInputStream>(file_name);
}
bool StorageDistributedDirectoryMonitor::scheduleAfter(size_t ms)
{
if (quit)
return false;
return task_handle->scheduleAfter(ms);
}
void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map<UInt64, std::string> & files)
{
std::unordered_set<UInt64> file_indices_to_skip;
......@@ -708,8 +722,13 @@ std::string StorageDistributedDirectoryMonitor::getLoggerName() const
void StorageDistributedDirectoryMonitor::updatePath(const std::string & new_path)
{
std::lock_guard lock{mutex};
task_handle->deactivate();
path = new_path;
current_batch_file_path = path + "current_batch.txt";
task_handle->activateAndSchedule();
}
}
#pragma once
#include <Storages/StorageDistributed.h>
#include <Common/ThreadPool.h>
#include <Core/BackgroundSchedulePool.h>
#include <atomic>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <IO/ReadBufferFromFile.h>
......@@ -20,7 +19,7 @@ class StorageDistributedDirectoryMonitor
{
public:
StorageDistributedDirectoryMonitor(
StorageDistributed & storage_, std::string path_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_);
StorageDistributed & storage_, std::string path_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_, BackgroundSchedulePool & bg_pool_);
~StorageDistributedDirectoryMonitor();
......@@ -33,6 +32,9 @@ public:
void shutdownAndDropAllData();
static BlockInputStreamPtr createStreamFromFile(const String & file_name);
/// For scheduling via DistributedBlockOutputStream
bool scheduleAfter(size_t ms);
private:
void run();
bool processFiles();
......@@ -67,7 +69,9 @@ private:
std::condition_variable cond;
Logger * log;
ActionBlocker & monitor_blocker;
ThreadFromGlobalPool thread{&StorageDistributedDirectoryMonitor::run, this};
BackgroundSchedulePool & bg_pool;
BackgroundSchedulePoolTaskHolder task_handle;
/// Read insert query and insert settings for backward compatible.
static void readHeader(ReadBuffer & in, Settings & insert_settings, std::string & insert_query, ClientInfo & client_info, Logger * log);
......
......@@ -589,8 +589,8 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
const std::string path(disk + data_path + dir_name + '/');
/// ensure shard subdirectory creation and notify storage
if (Poco::File(path).createDirectory())
storage.requireDirectoryMonitor(disk, dir_name);
Poco::File(path).createDirectory();
auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name);
const auto & file_name = toString(storage.file_names_increment.get()) + ".bin";
const auto & block_file_path = path + file_name;
......@@ -632,6 +632,9 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
stream.writePrefix();
stream.write(block);
stream.writeSuffix();
auto sleep_ms = context.getSettingsRef().distributed_directory_monitor_sleep_time_ms;
directory_monitor.scheduleAfter(sleep_ms.totalMilliseconds());
}
if (link(first_file_tmp_path.data(), block_file_path.data()))
......
......@@ -577,15 +577,20 @@ void StorageDistributed::createDirectoryMonitors(const std::string & disk)
}
void StorageDistributed::requireDirectoryMonitor(const std::string & disk, const std::string & name)
StorageDistributedDirectoryMonitor& StorageDistributed::requireDirectoryMonitor(const std::string & disk, const std::string & name)
{
const std::string path(disk + relative_data_path + name);
const std::string key(disk + name);
std::lock_guard lock(cluster_nodes_mutex);
auto & node_data = cluster_nodes_data[key];
node_data.conneciton_pool = StorageDistributedDirectoryMonitor::createPool(name, *this);
node_data.directory_monitor = std::make_unique<StorageDistributedDirectoryMonitor>(*this, path, node_data.conneciton_pool, monitors_blocker);
if (!node_data.directory_monitor)
{
node_data.conneciton_pool = StorageDistributedDirectoryMonitor::createPool(name, *this);
node_data.directory_monitor = std::make_unique<StorageDistributedDirectoryMonitor>(
*this, path, node_data.conneciton_pool, monitors_blocker, global_context.getDistributedSchedulePool());
}
return *node_data.directory_monitor;
}
size_t StorageDistributed::getShardCount() const
......
......@@ -109,7 +109,7 @@ public:
/// create directory monitors for each existing subdirectory
void createDirectoryMonitors(const std::string & disk);
/// ensure directory monitor thread and connectoin pool creation by disk and subdirectory name
void requireDirectoryMonitor(const std::string & disk, const std::string & name);
StorageDistributedDirectoryMonitor & requireDirectoryMonitor(const std::string & disk, const std::string & name);
void flushClusterNodesAllData();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册