提交 15d9e39a 编写于 作者: M Michael Kolupaev

Merge

上级 601732a0
......@@ -49,6 +49,7 @@
#define DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES 3
#define DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD 300 /// каждый период уменьшаем счетчик ошибок в 2 раза
#define DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS 5000 /// Максимальное время ожидания в очереди запросов.
#define DBMS_DEFAULT_BACKGROUND_POOL_SIZE 6
/// Используется в методе reserve, когда известно число строк, но неизвестны их размеры.
#define DBMS_APPROX_STRING_SIZE 64
......
......@@ -17,6 +17,7 @@
#include <DB/AggregateFunctions/AggregateFunctionFactory.h>
#include <DB/DataTypes/DataTypeFactory.h>
#include <DB/Storages/StorageFactory.h>
#include <DB/Storages/MergeTree/BackgroundProcessingPool.h>
#include <DB/TableFunctions/TableFunctionFactory.h>
#include <DB/Interpreters/Settings.h>
#include <DB/Interpreters/Users.h>
......@@ -95,6 +96,7 @@ struct ContextShared
ConfigurationPtr users_config; /// Конфиг с секциями users, profiles и quotas.
InterserverIOHandler interserver_io_handler; /// Обработчик для межсерверной передачи данных.
String default_replica_name; /// Имя реплики из конфига.
BackgroundProcessingPoolPtr background_pool; /// Пул потоков для фоновой работы, выполняемой таблицами.
/// Кластеры для distributed таблиц
/// Создаются при создании Distributed таблиц, так как нужно дождаться пока будут выставлены Settings
......@@ -317,6 +319,8 @@ public:
void setMarkCache(size_t cache_size_in_bytes);
MarkCachePtr getMarkCache() const;
BackgroundProcessingPool & getBackgroundPool();
/** Очистить кэши разжатых блоков и засечек.
* Обычно это делается при переименовании таблиц, изменении типа столбцов, удалении таблицы.
* - так как кэши привязаны к именам файлов, и становятся некорректными.
......
......@@ -66,6 +66,9 @@ struct Settings
M(SettingBool, use_splitting_aggregator, false) \
/** Следует ли отменять выполняющийся запрос с таким же id, как новый. */ \
M(SettingBool, replace_running_query, false) \
/** Количество потоков, выполняющих фоновую работу для таблиц (например, слияние в merge tree). \
* TODO: Сейчас применяется только при запуске сервера. Можно сделать изменяемым динамически. */ \
M(SettingUInt64, background_pool_size, DBMS_DEFAULT_BACKGROUND_POOL_SIZE) \
\
M(SettingLoadBalancing, load_balancing, LoadBalancing::RANDOM) \
\
......
......@@ -8,6 +8,7 @@
#include <DB/Core/Types.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/IO/WriteHelpers.h>
namespace DB
{
......@@ -48,10 +49,13 @@ public:
typedef std::shared_ptr<void> TaskHandle;
BackgroundProcessingPool() : size(1), sleep_seconds(1), shutdown(false) {}
BackgroundProcessingPool(int size_) : size(size_), sleep_seconds(10), shutdown(false) {}
void setNumberOfThreads(int size_)
{
if (size_ <= 0)
throw Exception("Invalid number of threads: " + toString(size_), ErrorCodes::ARGUMENT_OUT_OF_BOUND);
Poco::ScopedLock<Poco::FastMutex> tlock(threads_mutex);
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
......@@ -257,4 +261,6 @@ private:
}
};
typedef Poco::SharedPtr<BackgroundProcessingPool> BackgroundProcessingPoolPtr;
}
......@@ -79,13 +79,8 @@ struct MergeTreeSettings
/// Во столько раз ночью увеличиваем коэффициент.
size_t merge_parts_at_night_inc = 10;
/// Сколько потоков использовать для объединения кусков (для MergeTree).
/// Пул потоков общий на весь сервер.
size_t merging_threads = 6;
/// Сколько потоков использовать для загрузки кусков с других реплик и объединения кусков (для ReplicatedMergeTree).
/// Пул потоков на каждую таблицу свой.
size_t replication_threads = 4;
/// Сколько заданий на слияние кусков разрешено одновременно иметь в очереди ReplicatedMergeTree.
size_t max_replicated_merges_in_queue = 6;
/// Если из одного файла читается хотя бы столько строк, чтение можно распараллелить.
size_t min_rows_for_concurrent_read = 20 * 8192;
......
......@@ -5,7 +5,6 @@
#include <DB/Storages/MergeTree/MergeTreeDataWriter.h>
#include <DB/Storages/MergeTree/MergeTreeDataMerger.h>
#include <DB/Storages/MergeTree/DiskSpaceMonitor.h>
#include <DB/Storages/MergeTree/BackgroundProcessingPool.h>
namespace DB
{
......@@ -27,7 +26,7 @@ public:
*/
static StoragePtr create(const String & path_, const String & database_name_, const String & name_,
NamesAndTypesListPtr columns_,
const Context & context_,
Context & context_,
ASTPtr & primary_expr_ast_,
const String & date_column_name_,
const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается.
......@@ -84,6 +83,8 @@ private:
String full_path;
Increment increment;
BackgroundProcessingPool & background_pool;
MergeTreeData data;
MergeTreeDataSelectExecutor reader;
MergeTreeDataWriter writer;
......@@ -96,7 +97,6 @@ private:
volatile bool shutdown_called;
static BackgroundProcessingPool merge_pool;
BackgroundProcessingPool::TaskHandle merge_task_handle;
/// Пока существует, помечает части как currently_merging и держит резерв места.
......@@ -143,7 +143,7 @@ private:
StorageMergeTree(const String & path_, const String & database_name_, const String & name_,
NamesAndTypesListPtr columns_,
const Context & context_,
Context & context_,
ASTPtr & primary_expr_ast_,
const String & date_column_name_,
const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается.
......
......@@ -156,7 +156,6 @@ private:
typedef std::list<LogEntry> LogEntries;
typedef std::set<String> StringSet;
typedef std::vector<std::thread> Threads;
Context & context;
zkutil::ZooKeeperPtr zookeeper;
......@@ -214,8 +213,8 @@ private:
/// Поток, следящий за обновлениями в логах всех реплик и загружающий их в очередь.
std::thread queue_updating_thread;
/// Потоки, выполняющие действия из очереди.
Threads queue_threads;
/// Задание, выполняющее действия из очереди.
BackgroundProcessingPool::TaskHandle queue_task_handle;
/// Поток, выбирающий куски для слияния.
std::thread merge_selecting_thread;
......@@ -321,15 +320,15 @@ private:
/** Выполнить действие из очереди. Бросает исключение, если что-то не так.
*/
void executeLogEntry(const LogEntry & entry);
void executeLogEntry(const LogEntry & entry, BackgroundProcessingPool::Context & pool_context);
/** В бесконечном цикле обновляет очередь.
*/
void queueUpdatingThread();
/** В бесконечном цикле выполняет действия из очереди.
/** Выполняет действия из очереди.
*/
void queueThread();
bool queueTask(BackgroundProcessingPool::Context & context);
/// Выбор кусков для слияния.
......
......@@ -544,6 +544,14 @@ MarkCachePtr Context::getMarkCache() const
return shared->mark_cache;
}
BackgroundProcessingPool & Context::getBackgroundPool()
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (!shared->background_pool)
shared->background_pool = new BackgroundProcessingPool(settings.background_pool_size);
return *shared->background_pool;
}
void Context::resetCaches() const
{
/// Исходим из допущения, что функции setUncompressedCache, setMarkCache, если вызывались, то раньше (при старте сервера). Иначе поставьте mutex.
......
......@@ -6,12 +6,9 @@
namespace DB
{
BackgroundProcessingPool StorageMergeTree::merge_pool;
StorageMergeTree::StorageMergeTree(const String & path_, const String & database_name_, const String & name_,
NamesAndTypesListPtr columns_,
const Context & context_,
Context & context_,
ASTPtr & primary_expr_ast_,
const String & date_column_name_,
const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается.
......@@ -20,6 +17,7 @@ StorageMergeTree::StorageMergeTree(const String & path_, const String & database
const String & sign_column_,
const MergeTreeSettings & settings_)
: path(path_), name(name_), full_path(path + escapeForFileName(name) + '/'), increment(full_path + "increment.txt"),
background_pool(context_.getBackgroundPool()),
data(full_path, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
index_granularity_,mode_, sign_column_, settings_, database_name_ + "." + name),
reader(data), writer(data), merger(data),
......@@ -34,7 +32,7 @@ StorageMergeTree::StorageMergeTree(const String & path_, const String & database
StoragePtr StorageMergeTree::create(
const String & path_, const String & database_name_, const String & name_,
NamesAndTypesListPtr columns_,
const Context & context_,
Context & context_,
ASTPtr & primary_expr_ast_,
const String & date_column_name_,
const ASTPtr & sampling_expression_,
......@@ -48,9 +46,7 @@ StoragePtr StorageMergeTree::create(
sampling_expression_, index_granularity_, mode_, sign_column_, settings_);
StoragePtr res_ptr = res->thisPtr();
merge_pool.setNumberOfThreads(res->data.settings.merging_threads);
merge_pool.setSleepTime(5);
res->merge_task_handle = merge_pool.addTask(std::bind(&StorageMergeTree::mergeTask, res, std::placeholders::_1));
res->merge_task_handle = res->background_pool.addTask(std::bind(&StorageMergeTree::mergeTask, res, std::placeholders::_1));
return res_ptr;
}
......@@ -61,7 +57,7 @@ void StorageMergeTree::shutdown()
return;
shutdown_called = true;
merger.cancelAll();
merge_pool.removeTask(merge_task_handle);
background_pool.removeTask(merge_task_handle);
}
......@@ -142,8 +138,8 @@ bool StorageMergeTree::merge(bool aggressive, BackgroundProcessingPool::Context
auto can_merge = std::bind(&StorageMergeTree::canMergeParts, this, std::placeholders::_1, std::placeholders::_2);
/// Если слияние запущено из пула потоков, и хотя бы половина потоков сливает большие куски,
/// не будем сливать большие куски.
int big_merges = merge_pool.getCounter("big merges");
bool only_small = pool_context && big_merges * 2 >= merge_pool.getNumberOfThreads();
int big_merges = background_pool.getCounter("big merges");
bool only_small = pool_context && big_merges * 2 >= background_pool.getNumberOfThreads();
if (!merger.selectPartsToMerge(parts, merged_name, disk_space, false, aggressive, only_small, can_merge) &&
!merger.selectPartsToMerge(parts, merged_name, disk_space, true, aggressive, only_small, can_merge))
......
......@@ -720,7 +720,7 @@ bool StorageReplicatedMergeTree::shouldExecuteLogEntry(const LogEntry & entry)
return true;
}
void StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
void StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, BackgroundProcessingPool::Context & pool_context)
{
if (entry.type == LogEntry::GET_PART ||
entry.type == LogEntry::MERGE_PARTS)
......@@ -776,6 +776,17 @@ void StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
}
else
{
/// Если собираемся сливать большие куски, увеличим счетчик потоков, сливающих большие куски.
for (const auto & part : parts)
{
if (part->size * data.index_granularity > 25 * 1024 * 1024)
{
pool_context.incrementCounter("big merges");
pool_context.incrementCounter("replicated big merges");
break;
}
}
MergeTreeData::Transaction transaction;
MergeTreeData::DataPartPtr part = merger.mergeParts(parts, entry.new_part_name, &transaction);
......@@ -892,87 +903,73 @@ void StorageReplicatedMergeTree::queueUpdatingThread()
}
}
void StorageReplicatedMergeTree::queueThread()
bool StorageReplicatedMergeTree::queueTask(BackgroundProcessingPool::Context & pool_context)
{
while (!shutdown_called)
{
LogEntry entry;
bool have_work = false;
LogEntry entry;
bool have_work = false;
try
try
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
bool empty = queue.empty();
if (!empty)
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
bool empty = queue.empty();
if (!empty)
for (LogEntries::iterator it = queue.begin(); it != queue.end(); ++it)
{
for (LogEntries::iterator it = queue.begin(); it != queue.end(); ++it)
if (shouldExecuteLogEntry(*it))
{
if (shouldExecuteLogEntry(*it))
{
entry = *it;
entry.tagPartAsFuture(*this);
queue.erase(it);
have_work = true;
break;
}
entry = *it;
entry.tagPartAsFuture(*this);
queue.erase(it);
have_work = true;
break;
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (!have_work)
{
std::this_thread::sleep_for(QUEUE_NO_WORK_SLEEP);
continue;
}
if (!have_work)
return false;
bool success = false;
bool success = false;
try
{
executeLogEntry(entry);
try
{
executeLogEntry(entry, pool_context);
auto code = zookeeper->tryRemove(replica_path + "/queue/" + entry.znode_name);
if (code != ZOK)
LOG_ERROR(log, "Couldn't remove " << replica_path + "/queue/" + entry.znode_name << ": "
<< zkutil::ZooKeeper::error2string(code) + ". There must be a bug somewhere. Ignoring it.");
auto code = zookeeper->tryRemove(replica_path + "/queue/" + entry.znode_name);
if (code != ZOK)
LOG_ERROR(log, "Couldn't remove " << replica_path + "/queue/" + entry.znode_name << ": "
<< zkutil::ZooKeeper::error2string(code) + ". There must be a bug somewhere. Ignoring it.");
success = true;
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::NO_REPLICA_HAS_PART)
/// Если ни у кого нет нужного куска, это нормальная ситуация; не будем писать в лог с уровнем Error.
LOG_INFO(log, e.displayText());
else
tryLogCurrentException(__PRETTY_FUNCTION__);
}
catch (...)
{
success = true;
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::NO_REPLICA_HAS_PART)
/// Если ни у кого нет нужного куска, это нормальная ситуация; не будем писать в лог с уровнем Error.
LOG_INFO(log, e.displayText());
else
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (shutdown_called)
break;
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (success)
{
std::this_thread::sleep_for(QUEUE_AFTER_WORK_SLEEP);
}
else
{
{
/// Добавим действие, которое не получилось выполнить, в конец очереди.
entry.future_part_tagger = nullptr;
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
queue.push_back(entry);
}
std::this_thread::sleep_for(QUEUE_ERROR_SLEEP);
}
if (!success)
{
/// Добавим действие, которое не получилось выполнить, в конец очереди.
entry.future_part_tagger = nullptr;
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
queue.push_back(entry);
}
return success;
}
void StorageReplicatedMergeTree::mergeSelectingThread()
......@@ -988,8 +985,9 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
size_t merges_queued = 0;
/// Есть ли в очереди мердж крупных кусков.
/// TODO: Если мердж уже выполняется, его нет в очереди, но здесь нужно все равно как-то о нем узнать.
bool has_big_merge = false;
bool has_big_merge = context.getBackgroundPool().getCounter("replicated big merges") > 0;
if (!has_big_merge)
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
......@@ -1019,7 +1017,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
do
{
if (merges_queued >= data.settings.merging_threads)
if (merges_queued >= data.settings.max_replicated_merges_in_queue)
break;
MergeTreeData::DataPartsVector parts;
......@@ -1238,9 +1236,8 @@ void StorageReplicatedMergeTree::partialShutdown()
}
if (queue_updating_thread.joinable())
queue_updating_thread.join();
for (auto & thread : queue_threads)
thread.join();
queue_threads.clear();
context.getBackgroundPool().removeTask(queue_task_handle);
queue_task_handle.reset();
LOG_TRACE(log, "Threads finished");
}
......@@ -1269,9 +1266,8 @@ void StorageReplicatedMergeTree::goReadOnly()
}
if (queue_updating_thread.joinable())
queue_updating_thread.join();
for (auto & thread : queue_threads)
thread.join();
queue_threads.clear();
context.getBackgroundPool().removeTask(queue_task_handle);
queue_task_handle.reset();
LOG_TRACE(log, "Threads finished");
}
......@@ -1289,8 +1285,7 @@ void StorageReplicatedMergeTree::startup()
std::bind(&StorageReplicatedMergeTree::becomeLeader, this), replica_name);
queue_updating_thread = std::thread(&StorageReplicatedMergeTree::queueUpdatingThread, this);
for (size_t i = 0; i < data.settings.replication_threads; ++i)
queue_threads.push_back(std::thread(&StorageReplicatedMergeTree::queueThread, this));
queue_task_handle = context.getBackgroundPool().addTask(std::bind(&StorageReplicatedMergeTree::queueTask, this, std::placeholders::_1));
}
void StorageReplicatedMergeTree::restartingThread()
......@@ -1413,6 +1408,8 @@ void StorageReplicatedMergeTree::drop()
LOG_INFO(log, "Removing table " << zookeeper_path << " (this might take several minutes)");
zookeeper->removeRecursive(zookeeper_path);
}
data.dropAllData();
}
void StorageReplicatedMergeTree::LogEntry::writeText(WriteBuffer & out) const
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册