提交 a9b1b369 编写于 作者: A Azat Khuzhin

Use separate pool for buffer flushes (background_buffer_flush_schedule_pool_size)

上级 15dc45b9
......@@ -78,6 +78,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \
M(SettingBool, use_uncompressed_cache, true, "Whether to use the cache of uncompressed blocks.", 0) \
M(SettingBool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.", 0) \
M(SettingUInt64, background_buffer_flush_schedule_pool_size, 16, "Number of threads performing background flush for tables with Buffer engine. Only has meaning at server startup.", 0) \
M(SettingUInt64, background_pool_size, 16, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server startup.", 0) \
M(SettingUInt64, background_move_pool_size, 8, "Number of threads performing background moves for tables. Only has meaning at server startup.", 0) \
M(SettingUInt64, background_schedule_pool_size, 16, "Number of threads performing background tasks for replicated tables. Only has meaning at server startup.", 0) \
......
......@@ -317,6 +317,7 @@ struct ContextShared
MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree)
ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
std::optional<BackgroundSchedulePool> buffer_flush_schedule_pool; /// A thread pool that can do background flush for Buffer tables.
std::optional<BackgroundProcessingPool> background_pool; /// The thread pool for the background work performed by the tables.
std::optional<BackgroundProcessingPool> background_move_pool; /// The thread pool for the background moves performed by the tables.
std::optional<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
......@@ -413,6 +414,7 @@ struct ContextShared
embedded_dictionaries.reset();
external_dictionaries_loader.reset();
external_models_loader.reset();
buffer_flush_schedule_pool.reset();
background_pool.reset();
background_move_pool.reset();
schedule_pool.reset();
......@@ -1330,6 +1332,14 @@ BackgroundProcessingPool & Context::getBackgroundMovePool()
return *shared->background_move_pool;
}
BackgroundSchedulePool & Context::getBufferFlushSchedulePool()
{
auto lock = getLock();
if (!shared->buffer_flush_schedule_pool)
shared->buffer_flush_schedule_pool.emplace(settings.background_buffer_flush_schedule_pool_size);
return *shared->buffer_flush_schedule_pool;
}
BackgroundSchedulePool & Context::getSchedulePool()
{
auto lock = getLock();
......
......@@ -468,6 +468,7 @@ public:
*/
void dropCaches() const;
BackgroundSchedulePool & getBufferFlushSchedulePool();
BackgroundProcessingPool & getBackgroundPool();
BackgroundProcessingPool & getBackgroundMovePool();
BackgroundSchedulePool & getSchedulePool();
......
......@@ -13,7 +13,6 @@
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTExpressionList.h>
#include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h>
#include <Common/MemoryTracker.h>
#include <Common/FieldVisitors.h>
......@@ -76,6 +75,7 @@ StorageBuffer::StorageBuffer(
, destination_id(destination_id_)
, allow_materialized(allow_materialized_)
, log(&Logger::get("StorageBuffer (" + table_id_.getFullTableName() + ")"))
, bg_pool(global_context.getBufferFlushSchedulePool())
{
setColumns(columns_);
setConstraints(constraints_);
......@@ -83,12 +83,7 @@ StorageBuffer::StorageBuffer(
StorageBuffer::~StorageBuffer()
{
// Should not happen if shutdown was called
if (flush_thread.joinable())
{
shutdown_event.set();
flush_thread.join();
}
flush_handle->deactivate();
}
......@@ -397,6 +392,9 @@ public:
least_busy_lock = std::unique_lock(least_busy_buffer->mutex);
}
insertIntoBuffer(block, *least_busy_buffer);
least_busy_lock.unlock();
storage.reschedule();
}
private:
StorageBuffer & storage;
......@@ -458,16 +456,15 @@ void StorageBuffer::startup()
<< " Set appropriate system_profile to fix this.");
}
flush_thread = ThreadFromGlobalPool(&StorageBuffer::flushThread, this);
flush_handle = bg_pool.createTask(log->name() + "/Bg", [this]{ flushBack(); });
flush_handle->activateAndSchedule();
}
void StorageBuffer::shutdown()
{
shutdown_event.set();
if (flush_thread.joinable())
flush_thread.join();
flush_handle->deactivate();
try
{
......@@ -595,7 +592,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc
ProfileEvents::increment(ProfileEvents::StorageBufferFlush);
LOG_TRACE(log, "Flushing buffer with " << rows << " rows, " << bytes << " bytes, age " << time_passed << " seconds.");
LOG_TRACE(log, "Flushing buffer with " << rows << " rows, " << bytes << " bytes, age " << time_passed << " seconds " << (check_thresholds ? "(bg)" : "(direct)") << ".");
if (!destination_id)
return;
......@@ -697,21 +694,42 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl
}
void StorageBuffer::flushThread()
void StorageBuffer::flushBack()
{
try
{
flushAllBuffers(true);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
reschedule();
}
void StorageBuffer::reschedule()
{
setThreadName("BufferFlush");
time_t min_first_write_time = std::numeric_limits<time_t>::max();
time_t rows = 0;
do
for (auto & buffer : buffers)
{
try
{
flushAllBuffers(true);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
} while (!shutdown_event.tryWait(1000));
std::lock_guard lock(buffer.mutex);
min_first_write_time = buffer.first_write_time;
rows += buffer.data.rows();
}
/// will be rescheduled via INSERT
if (!rows)
return;
time_t current_time = time(nullptr);
time_t time_passed = current_time - min_first_write_time;
size_t min = std::max<ssize_t>(min_thresholds.time - time_passed, 1);
size_t max = std::max<ssize_t>(max_thresholds.time - time_passed, 1);
flush_handle->scheduleAfter(std::min(min, max) * 1000);
}
void StorageBuffer::checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */)
......
......@@ -4,7 +4,7 @@
#include <thread>
#include <ext/shared_ptr_helper.h>
#include <Core/NamesAndTypes.h>
#include <Common/ThreadPool.h>
#include <Core/BackgroundSchedulePool.h>
#include <Storages/IStorage.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Poco/Event.h>
......@@ -118,10 +118,6 @@ private:
Poco::Logger * log;
Poco::Event shutdown_event;
/// Resets data by timeout.
ThreadFromGlobalPool flush_thread;
void flushAllBuffers(bool check_thresholds = true);
/// Reset the buffer. If check_thresholds is set - resets only if thresholds are exceeded.
void flushBuffer(Buffer & buffer, bool check_thresholds, bool locked = false);
......@@ -131,7 +127,11 @@ private:
/// `table` argument is passed, as it is sometimes evaluated beforehand. It must match the `destination`.
void writeBlockToDestination(const Block & block, StoragePtr table);
void flushThread();
void flushBack();
void reschedule();
BackgroundSchedulePool & bg_pool;
BackgroundSchedulePoolTaskHolder flush_handle;
protected:
/** num_shards - the level of internal parallelism (the number of independent buffers)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册