提交 c547c5a7 编写于 作者: S Silviu Caragea

Merge remote-tracking branch 'origin/background-schedule-pool-fix'

# Conflicts:
#	dbms/src/Common/ZooKeeper/LeaderElection.h
#	dbms/src/Common/ZooKeeper/ZooKeeper.cpp
#	dbms/src/Storages/MergeTree/ReplicatedMergeTreeAlterThread.cpp
#	dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp
#	dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h
#	dbms/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp
#	dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp
#	dbms/src/Storages/StorageReplicatedMergeTree.cpp
#include <Common/BackgroundSchedulePool.h>
#include <Common/MemoryTracker.h>
#include <Common/CurrentMetrics.h>
#include <Common/Exception.h>
#include <Common/setThreadName.h>
#include <Common/Stopwatch.h>
#include <common/logger_useful.h>
#include <chrono>
namespace CurrentMetrics
{
extern const Metric BackgroundSchedulePoolTask;
extern const Metric MemoryTrackingInBackgroundSchedulePool;
}
namespace DB
{
// TaskNotification
class TaskNotification final : public Poco::Notification
{
public:
explicit TaskNotification(const BackgroundSchedulePool::TaskHandle & task) : task(task) {}
void execute() { task->execute(); }
private:
BackgroundSchedulePool::TaskHandle task;
};
// BackgroundSchedulePool::TaskInfo
BackgroundSchedulePool::TaskInfo::TaskInfo(BackgroundSchedulePool & pool, const std::string & name, const Task & function):
name(name),
pool(pool),
function(function)
{
}
bool BackgroundSchedulePool::TaskInfo::schedule()
{
std::lock_guard lock(schedule_mutex);
if (deactivated || scheduled)
return false;
scheduled = true;
if(!executing)
{
if (delayed)
pool.cancelDelayedTask(shared_from_this(), lock);
pool.queue.enqueueNotification(new TaskNotification(shared_from_this()));
}
return true;
}
bool BackgroundSchedulePool::TaskInfo::scheduleAfter(size_t ms)
{
std::lock_guard lock(schedule_mutex);
if (deactivated || scheduled)
return false;
pool.scheduleDelayedTask(shared_from_this(), ms, lock);
return true;
}
void BackgroundSchedulePool::TaskInfo::deactivate()
{
std::lock_guard lock_exec(exec_mutex);
std::lock_guard lock_schedule(schedule_mutex);
if (deactivated)
return;
deactivated = true;
scheduled = false;
if (delayed)
pool.cancelDelayedTask(shared_from_this(), lock_schedule);
}
void BackgroundSchedulePool::TaskInfo::activate()
{
std::lock_guard lock(schedule_mutex);
deactivated = false;
}
void BackgroundSchedulePool::TaskInfo::execute()
{
std::lock_guard lock_exec(exec_mutex);
{
std::lock_guard lock_schedule(schedule_mutex);
if (deactivated)
return;
scheduled = false;
executing = true;
}
CurrentMetrics::Increment metric_increment{CurrentMetrics::BackgroundSchedulePoolTask};
Stopwatch watch;
function();
UInt64 milliseconds = watch.elapsedMilliseconds();
/// If the task is executed longer than specified time, it will be logged.
static const int32_t slow_execution_threshold_ms = 50;
if (milliseconds >= slow_execution_threshold_ms)
LOG_INFO(&Logger::get("BackgroundSchedulePool"), "Executing " << name << " took " << milliseconds << " ms.");
{
std::lock_guard lock_schedule(schedule_mutex);
executing = false;
/// In case was scheduled while executing (including a scheduleAfter which expired) we schedule the task
/// on the queue. We don't call the function again here because this way all tasks
/// will have their chance to execute
if(scheduled)
pool.queue.enqueueNotification(new TaskNotification(shared_from_this()));
}
}
zkutil::WatchCallback BackgroundSchedulePool::TaskInfo::getWatchCallback()
{
return [t=shared_from_this()](const ZooKeeperImpl::ZooKeeper::WatchResponse &) {
t->schedule();
};
}
// BackgroundSchedulePool
BackgroundSchedulePool::BackgroundSchedulePool(size_t size)
: size(size)
{
LOG_INFO(&Logger::get("BackgroundSchedulePool"), "Create BackgroundSchedulePool with " << size << " threads");
threads.resize(size);
for (auto & thread : threads)
thread = std::thread([this] { threadFunction(); });
delayed_thread = std::thread([this] { delayExecutionThreadFunction(); });
}
BackgroundSchedulePool::~BackgroundSchedulePool()
{
try
{
{
std::unique_lock lock(delayed_tasks_lock);
shutdown = true;
wakeup_cond.notify_all();
}
queue.wakeUpAll();
delayed_thread.join();
LOG_TRACE(&Logger::get("BackgroundSchedulePool"), "Waiting for threads to finish.");
for (std::thread & thread : threads)
thread.join();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
BackgroundSchedulePool::TaskHandle BackgroundSchedulePool::addTask(const std::string & name, const Task & task)
{
return std::make_shared<TaskInfo>(*this, name, task);
}
void BackgroundSchedulePool::removeTask(const TaskHandle & task)
{
task->deactivate();
}
void BackgroundSchedulePool::scheduleDelayedTask(const TaskHandle & task, size_t ms, std::lock_guard<std::mutex> & /* schedule_mutex_lock */)
{
Poco::Timestamp current_time;
{
std::lock_guard lock(delayed_tasks_lock);
if (task->delayed)
delayed_tasks.erase(task->iterator);
task->iterator = delayed_tasks.emplace(current_time + (ms * 1000), task);
task->delayed = true;
}
wakeup_cond.notify_all();
}
void BackgroundSchedulePool::cancelDelayedTask(const TaskHandle & task, std::lock_guard<std::mutex> & /* schedule_mutex_lock */)
{
{
std::lock_guard lock(delayed_tasks_lock);
delayed_tasks.erase(task->iterator);
task->delayed = false;
}
wakeup_cond.notify_all();
}
void BackgroundSchedulePool::threadFunction()
{
setThreadName("BackgrSchedPool");
MemoryTracker memory_tracker;
memory_tracker.setMetric(CurrentMetrics::MemoryTrackingInBackgroundSchedulePool);
current_memory_tracker = &memory_tracker;
while (!shutdown)
{
if (Poco::AutoPtr<Poco::Notification> notification = queue.waitDequeueNotification())
{
TaskNotification & task_notification = static_cast<TaskNotification &>(*notification);
task_notification.execute();
}
}
current_memory_tracker = nullptr;
}
void BackgroundSchedulePool::delayExecutionThreadFunction()
{
setThreadName("BckSchPoolDelay");
while (!shutdown)
{
TaskHandle task;
bool found = false;
{
std::unique_lock lock(delayed_tasks_lock);
while(!shutdown)
{
Poco::Timestamp min_time;
if (!delayed_tasks.empty())
{
auto t = delayed_tasks.begin();
min_time = t->first;
task = t->second;
}
if (!task)
{
wakeup_cond.wait(lock);
continue;
}
Poco::Timestamp current_time;
if (min_time > current_time)
{
wakeup_cond.wait_for(lock, std::chrono::microseconds(min_time - current_time));
continue;
}
else
{
/// We have a task ready for execution
found = true;
break;
}
}
}
if(found)
task->schedule();
}
}
}
#pragma once
#include <Poco/Notification.h>
#include <Poco/NotificationQueue.h>
#include <Poco/Timestamp.h>
#include <thread>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <map>
#include <functional>
#include <boost/noncopyable.hpp>
#include <Common/ZooKeeper/Types.h>
namespace DB
{
class TaskNotification;
/** Executes functions scheduled at a specific point in time.
* Basically all tasks are added in a queue and precessed by worker threads.
*
* The most important difference between this and BackgroundProcessingPool
* is that we have the guarantee that the same function is not executed from many workers in the same time.
*
* The usage scenario: instead starting a separate thread for each task,
* register a task in BackgroundSchedulePool and when you need to run the task,
* call schedule or scheduleAfter(duration) method.
*/
class BackgroundSchedulePool
{
public:
class TaskInfo;
using TaskHandle = std::shared_ptr<TaskInfo>;
using Tasks = std::multimap<Poco::Timestamp, TaskHandle>;
using Task = std::function<void()>;
class TaskInfo : public std::enable_shared_from_this<TaskInfo>, private boost::noncopyable
{
public:
TaskInfo(BackgroundSchedulePool & pool, const std::string & name, const Task & function);
/// All these methods waits for current execution of task.
/// Schedule for execution as soon as possible (if not already scheduled).
/// If the task was already scheduled with delay, the delay will be ignored.
bool schedule();
/// Schedule for execution after specified delay.
bool scheduleAfter(size_t ms);
/// Further attempts to schedule become no-op.
void deactivate();
void activate();
/// get zkutil::WatchCallback needed for zookeeper callbacks.
zkutil::WatchCallback getWatchCallback();
private:
friend class TaskNotification;
friend class BackgroundSchedulePool;
void execute();
std::mutex schedule_mutex;
std::mutex exec_mutex;
std::string name;
bool deactivated = false;
bool scheduled = false;
bool delayed = false;
bool executing = false;
BackgroundSchedulePool & pool;
Task function;
/// If the task is scheduled with delay, points to element of delayed_tasks.
Tasks::iterator iterator;
};
BackgroundSchedulePool(size_t size);
~BackgroundSchedulePool();
TaskHandle addTask(const std::string & name, const Task & task);
void removeTask(const TaskHandle & task);
size_t getNumberOfThreads() const { return size; }
private:
using Threads = std::vector<std::thread>;
void threadFunction();
void delayExecutionThreadFunction();
/// Schedule task for execution after specified delay from now.
void scheduleDelayedTask(const TaskHandle & task, size_t ms, std::lock_guard<std::mutex> &);
/// Remove task, that was scheduled with delay, from schedule.
void cancelDelayedTask(const TaskHandle & task, std::lock_guard<std::mutex> &);
/// Number for worker threads.
const size_t size;
std::atomic<bool> shutdown {false};
Threads threads;
Poco::NotificationQueue queue;
/// Delayed notifications.
std::condition_variable wakeup_cond;
std::mutex delayed_tasks_lock;
/// Thread waiting for next delayed task.
std::thread delayed_thread;
/// Tasks ordered by scheduled time.
Tasks delayed_tasks;
};
using BackgroundSchedulePoolPtr = std::shared_ptr<BackgroundSchedulePool>;
}
......@@ -9,6 +9,7 @@
M(ReplicatedSend) \
M(ReplicatedChecks) \
M(BackgroundPoolTask) \
M(BackgroundSchedulePoolTask) \
M(DiskSpaceReservedForMerge) \
M(DistributedSend) \
M(QueryPreempted) \
......@@ -25,6 +26,7 @@
M(LeaderReplica) \
M(MemoryTracking) \
M(MemoryTrackingInBackgroundProcessingPool) \
M(MemoryTrackingInBackgroundSchedulePool) \
M(MemoryTrackingForMerges) \
M(LeaderElection) \
M(EphemeralNode) \
......
......@@ -5,6 +5,7 @@
#include <memory>
#include <common/logger_useful.h>
#include <Common/CurrentMetrics.h>
#include <Common/BackgroundSchedulePool.h>
namespace ProfileEvents
......@@ -35,9 +36,10 @@ public:
* It means that different participants of leader election have different identifiers
* and existence of more than one ephemeral node with same identifier indicates an error.
*/
LeaderElection(const std::string & path_, ZooKeeper & zookeeper_, LeadershipHandler handler_, const std::string & identifier_ = "")
: path(path_), zookeeper(zookeeper_), handler(handler_), identifier(identifier_)
LeaderElection(DB::BackgroundSchedulePool & pool_, const std::string & path_, ZooKeeper & zookeeper_, LeadershipHandler handler_, const std::string & identifier_ = "")
: pool(pool_), path(path_), zookeeper(zookeeper_), handler(handler_), identifier(identifier_)
{
task_handle = pool.addTask("LeaderElection", [this] { threadFunction(); });
createNode();
}
......@@ -47,17 +49,18 @@ public:
return;
shutdown_called = true;
event->set();
if (thread.joinable())
thread.join();
task_handle->deactivate();
}
~LeaderElection()
{
releaseNode();
pool.removeTask(task_handle);
}
private:
DB::BackgroundSchedulePool & pool;
DB::BackgroundSchedulePool::TaskHandle task_handle;
std::string path;
ZooKeeper & zookeeper;
LeadershipHandler handler;
......@@ -66,9 +69,7 @@ private:
EphemeralNodeHolderPtr node;
std::string node_name;
std::thread thread;
std::atomic<bool> shutdown_called {false};
zkutil::EventPtr event = std::make_shared<Poco::Event>();
CurrentMetrics::Increment metric_increment{CurrentMetrics::LeaderElection};
......@@ -80,7 +81,8 @@ private:
std::string node_path = node->getPath();
node_name = node_path.substr(node_path.find_last_of('/') + 1);
thread = std::thread(&LeaderElection::threadFunction, this);
task_handle->activate();
task_handle->schedule();
}
void releaseNode()
......@@ -91,38 +93,35 @@ private:
void threadFunction()
{
while (!shutdown_called)
bool success = false;
try
{
bool success = false;
Strings children = zookeeper.getChildren(path);
std::sort(children.begin(), children.end());
auto it = std::lower_bound(children.begin(), children.end(), node_name);
if (it == children.end() || *it != node_name)
throw Poco::Exception("Assertion failed in LeaderElection");
try
{
Strings children = zookeeper.getChildren(path);
std::sort(children.begin(), children.end());
auto it = std::lower_bound(children.begin(), children.end(), node_name);
if (it == children.end() || *it != node_name)
throw Poco::Exception("Assertion failed in LeaderElection");
if (it == children.begin())
{
ProfileEvents::increment(ProfileEvents::LeaderElectionAcquiredLeadership);
handler();
return;
}
if (zookeeper.exists(path + "/" + *(it - 1), nullptr, event))
event->wait();
success = true;
}
catch (...)
if (it == children.begin())
{
DB::tryLogCurrentException("LeaderElection");
ProfileEvents::increment(ProfileEvents::LeaderElectionAcquiredLeadership);
handler();
return;
}
if (!success)
event->tryWait(10 * 1000);
if (!zookeeper.existsWatch(path + "/" + *(it - 1), nullptr, task_handle->getWatchCallback()))
task_handle->schedule();
success = true;
}
catch (...)
{
DB::tryLogCurrentException("LeaderElection");
}
if (!success)
task_handle->scheduleAfter(10 * 1000);
}
};
......
......@@ -367,6 +367,16 @@ std::string ZooKeeper::get(const std::string & path, Stat * stat, const EventPtr
throw KeeperException("Can't get data for node " + path + ": node doesn't exist", code);
}
std::string ZooKeeper::getWatch(const std::string & path, Stat * stat, WatchCallback watch_callback)
{
int32_t code = 0;
std::string res;
if (tryGetWatch(path, res, stat, watch_callback, &code))
return res;
else
throw KeeperException("Can't get data for node " + path + ": node doesn't exist", code);
}
bool ZooKeeper::tryGet(const std::string & path, std::string & res, Stat * stat, const EventPtr & watch, int * return_code)
{
return tryGetWatch(path, res, stat, callbackForEvent(watch), return_code);
......
......@@ -113,6 +113,7 @@ public:
bool existsWatch(const std::string & path, Stat * stat, WatchCallback watch_callback);
std::string get(const std::string & path, Stat * stat = nullptr, const EventPtr & watch = nullptr);
std::string getWatch(const std::string & path, Stat * stat, WatchCallback watch_callback);
/// Doesn't not throw in the following cases:
/// * The node doesn't exist. Returns false in this case.
......
......@@ -15,6 +15,7 @@
#include <Common/setThreadName.h>
#include <Common/Stopwatch.h>
#include <Common/formatReadable.h>
#include <Common/BackgroundSchedulePool.h>
#include <DataStreams/FormatFactory.h>
#include <Databases/IDatabase.h>
#include <Storages/IStorage.h>
......@@ -130,6 +131,7 @@ struct ContextShared
ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
BackgroundProcessingPoolPtr background_pool; /// The thread pool for the background work performed by the tables.
BackgroundSchedulePoolPtr schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
MultiVersion<Macros> macros; /// Substitutions extracted from config.
std::unique_ptr<Compiler> compiler; /// Used for dynamic compilation of queries' parts if it necessary.
std::shared_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk.
......@@ -1304,6 +1306,14 @@ BackgroundProcessingPool & Context::getBackgroundPool()
return *shared->background_pool;
}
BackgroundSchedulePool & Context::getSchedulePool()
{
auto lock = getLock();
if (!shared->schedule_pool)
shared->schedule_pool = std::make_shared<BackgroundSchedulePool>(settings.background_schedule_pool_size);
return *shared->schedule_pool;
}
void Context::setDDLWorker(std::shared_ptr<DDLWorker> ddl_worker)
{
auto lock = getLock();
......
......@@ -40,6 +40,7 @@ class ExternalDictionaries;
class ExternalModels;
class InterserverIOHandler;
class BackgroundProcessingPool;
class BackgroundSchedulePool;
class MergeList;
class Cluster;
class Compiler;
......@@ -323,6 +324,7 @@ public:
void dropCaches() const;
BackgroundProcessingPool & getBackgroundPool();
BackgroundSchedulePool & getSchedulePool();
void setDDLWorker(std::shared_ptr<DDLWorker> ddl_worker);
DDLWorker & getDDLWorker() const;
......
......@@ -49,6 +49,7 @@ struct Settings
M(SettingBool, use_uncompressed_cache, true, "Whether to use the cache of uncompressed blocks.") \
M(SettingBool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.") \
M(SettingUInt64, background_pool_size, DBMS_DEFAULT_BACKGROUND_POOL_SIZE, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server startup.") \
M(SettingUInt64, background_schedule_pool_size, DBMS_DEFAULT_BACKGROUND_POOL_SIZE, "Number of threads performing background tasks for replicated tables. Only has meaning at server startup.") \
\
M(SettingMilliseconds, distributed_directory_monitor_sleep_time_ms, DBMS_DISTRIBUTED_DIRECTORY_MONITOR_SLEEP_TIME_MS, "Sleep time for StorageDistributed DirectoryMonitors in case there is no work or exception has been thrown.") \
\
......
......@@ -14,191 +14,189 @@ namespace DB
static const auto ALTER_ERROR_SLEEP_MS = 10 * 1000;
ReplicatedMergeTreeAlterThread::ReplicatedMergeTreeAlterThread(StorageReplicatedMergeTree & storage_)
: storage(storage_),
log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, AlterThread)")),
thread([this] { run(); }) {}
ReplicatedMergeTreeAlterThread::ReplicatedMergeTreeAlterThread(StorageReplicatedMergeTree & storage_) :
storage(storage_),
log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, AlterThread)"))
{
task_handle = storage_.context.getSchedulePool().addTask("ReplicatedMergeTreeAlterThread", [this]{run();});
task_handle->schedule();
}
ReplicatedMergeTreeAlterThread::~ReplicatedMergeTreeAlterThread()
{
storage.context.getSchedulePool().removeTask(task_handle);
}
void ReplicatedMergeTreeAlterThread::run()
{
setThreadName("ReplMTAlter");
bool force_recheck_parts = true;
while (!need_stop)
try
{
try
{
/** We have a description of columns in ZooKeeper, common for all replicas (Example: /clickhouse/tables/02-06/visits/columns),
* as well as a description of columns in local file with metadata (storage.data.getColumnsList()).
*
* If these descriptions are different - you need to do ALTER.
*
* If stored version of the node (columns_version) differs from the version in ZK,
* then the description of the columns in ZK does not necessarily differ from the local
* - this can happen with a loop from ALTER-s, which as a whole, does not change anything.
* In this case, you need to update the stored version number,
* and also check the structure of parts, and, if necessary, make ALTER.
*
* Recorded version number needs to be updated after updating the metadata, under lock.
* This version number is checked against the current one for INSERT.
* That is, we make sure to insert blocks with the correct structure.
*
* When the server starts, previous ALTER might not have been completed.
* Therefore, for the first time, regardless of the changes, we check the structure of all parts,
* (Example: /clickhouse/tables/02-06/visits/replicas/example02-06-1.yandex.ru/parts/20140806_20140831_131664_134988_3296/columns)
* and do ALTER if necessary.
*
* TODO: Too complicated, rewrite everything.
*/
auto zookeeper = storage.getZooKeeper();
zkutil::Stat stat;
const String columns_str = zookeeper->get(storage.zookeeper_path + "/columns", &stat, wakeup_event);
auto columns_in_zk = ColumnsDescription::parse(columns_str);
bool changed_version = (stat.version != storage.columns_version);
/** We have a description of columns in ZooKeeper, common for all replicas (Example: /clickhouse/tables/02-06/visits/columns),
* as well as a description of columns in local file with metadata (storage.data.getColumnsList()).
*
* If these descriptions are different - you need to do ALTER.
*
* If stored version of the node (columns_version) differs from the version in ZK,
* then the description of the columns in ZK does not necessarily differ from the local
* - this can happen with a loop from ALTER-s, which as a whole, does not change anything.
* In this case, you need to update the stored version number,
* and also check the structure of parts, and, if necessary, make ALTER.
*
* Recorded version number needs to be updated after updating the metadata, under lock.
* This version number is checked against the current one for INSERT.
* That is, we make sure to insert blocks with the correct structure.
*
* When the server starts, previous ALTER might not have been completed.
* Therefore, for the first time, regardless of the changes, we check the structure of all parts,
* (Example: /clickhouse/tables/02-06/visits/replicas/example02-06-1.yandex.ru/parts/20140806_20140831_131664_134988_3296/columns)
* and do ALTER if necessary.
*
* TODO: Too complicated, rewrite everything.
*/
auto zookeeper = storage.getZooKeeper();
zkutil::Stat stat;
const String columns_str = zookeeper->getWatch(storage.zookeeper_path + "/columns", &stat, task_handle->getWatchCallback());
auto columns_in_zk = ColumnsDescription::parse(columns_str);
bool changed_version = (stat.version != storage.columns_version);
{
/// If you need to lock table structure, then suspend merges.
ActionBlocker::LockHolder merge_blocker;
{
/// If you need to lock table structure, then suspend merges.
ActionBlocker::LockHolder merge_blocker;
if (changed_version || force_recheck_parts)
merge_blocker = storage.merger.merges_blocker.cancel();
if (changed_version || force_recheck_parts)
merge_blocker = storage.merger.merges_blocker.cancel();
MergeTreeData::DataParts parts;
MergeTreeData::DataParts parts;
/// If columns description has changed, we will update table structure locally.
if (changed_version)
{
/// Temporarily cancel part checks to avoid locking for long time.
auto temporarily_stop_part_checks = storage.part_check_thread.temporarilyStop();
/// If columns description has changed, we will update table structure locally.
if (changed_version)
{
/// Temporarily cancel part checks to avoid locking for long time.
auto temporarily_stop_part_checks = storage.part_check_thread.temporarilyStop();
/// Temporarily cancel parts sending
ActionBlocker::LockHolder data_parts_exchange_blocker;
if (storage.data_parts_exchange_endpoint_holder)
data_parts_exchange_blocker = storage.data_parts_exchange_endpoint_holder->cancel();
/// Temporarily cancel parts sending
ActionBlocker::LockHolder data_parts_exchange_blocker;
if (storage.data_parts_exchange_endpoint_holder)
data_parts_exchange_blocker = storage.data_parts_exchange_endpoint_holder->cancel();
/// Temporarily cancel part fetches
auto fetches_blocker = storage.fetcher.blocker.cancel();
/// Temporarily cancel part fetches
auto fetches_blocker = storage.fetcher.blocker.cancel();
LOG_INFO(log, "Changed version of 'columns' node in ZooKeeper. Waiting for structure write lock.");
LOG_INFO(log, "Changed version of 'columns' node in ZooKeeper. Waiting for structure write lock.");
auto table_lock = storage.lockStructureForAlter(__PRETTY_FUNCTION__);
auto table_lock = storage.lockStructureForAlter(__PRETTY_FUNCTION__);
if (columns_in_zk != storage.getColumns())
{
LOG_INFO(log, "Columns list changed in ZooKeeper. Applying changes locally.");
if (columns_in_zk != storage.getColumns())
{
LOG_INFO(log, "Columns list changed in ZooKeeper. Applying changes locally.");
storage.context.getDatabase(storage.database_name)->alterTable(
storage.context, storage.table_name, columns_in_zk, {});
storage.setColumns(std::move(columns_in_zk));
storage.context.getDatabase(storage.database_name)->alterTable(
storage.context, storage.table_name, columns_in_zk, {});
storage.setColumns(std::move(columns_in_zk));
/// Reinitialize primary key because primary key column types might have changed.
storage.data.initPrimaryKey();
/// Reinitialize primary key because primary key column types might have changed.
storage.data.initPrimaryKey();
LOG_INFO(log, "Applied changes to table.");
}
else
{
LOG_INFO(log, "Columns version changed in ZooKeeper, but data wasn't changed. It's like cyclic ALTERs.");
}
LOG_INFO(log, "Applied changes to table.");
}
else
{
LOG_INFO(log, "Columns version changed in ZooKeeper, but data wasn't changed. It's like cyclic ALTERs.");
}
/// You need to get a list of parts under table lock to avoid race condition with merge.
parts = storage.data.getDataParts();
/// You need to get a list of parts under table lock to avoid race condition with merge.
parts = storage.data.getDataParts();
storage.columns_version = stat.version;
}
storage.columns_version = stat.version;
}
/// Update parts.
if (changed_version || force_recheck_parts)
{
auto table_lock = storage.lockStructure(false, __PRETTY_FUNCTION__);
/// Update parts.
if (changed_version || force_recheck_parts)
{
auto table_lock = storage.lockStructure(false, __PRETTY_FUNCTION__);
if (changed_version)
LOG_INFO(log, "ALTER-ing parts");
if (changed_version)
LOG_INFO(log, "ALTER-ing parts");
int changed_parts = 0;
int changed_parts = 0;
if (!changed_version)
parts = storage.data.getDataParts();
if (!changed_version)
parts = storage.data.getDataParts();
const auto columns_for_parts = storage.getColumns().getAllPhysical();
const auto columns_for_parts = storage.getColumns().getAllPhysical();
for (const MergeTreeData::DataPartPtr & part : parts)
for (const MergeTreeData::DataPartPtr & part : parts)
{
/// Update the part and write result to temporary files.
/// TODO: You can skip checking for too large changes if ZooKeeper has, for example,
/// node /flags/force_alter.
auto transaction = storage.data.alterDataPart(
part, columns_for_parts, storage.data.primary_expr_ast, false);
if (!transaction)
continue;
++changed_parts;
/// Update part metadata in ZooKeeper.
zkutil::Requests ops;
ops.emplace_back(zkutil::makeSetRequest(
storage.replica_path + "/parts/" + part->name + "/columns", transaction->getNewColumns().toString(), -1));
ops.emplace_back(zkutil::makeSetRequest(
storage.replica_path + "/parts/" + part->name + "/checksums",
storage.getChecksumsForZooKeeper(transaction->getNewChecksums()),
-1));
try
{
/// Update the part and write result to temporary files.
/// TODO: You can skip checking for too large changes if ZooKeeper has, for example,
/// node /flags/force_alter.
auto transaction = storage.data.alterDataPart(
part, columns_for_parts, storage.data.primary_expr_ast, false);
if (!transaction)
continue;
++changed_parts;
/// Update part metadata in ZooKeeper.
zkutil::Requests ops;
ops.emplace_back(zkutil::makeSetRequest(
storage.replica_path + "/parts/" + part->name + "/columns", transaction->getNewColumns().toString(), -1));
ops.emplace_back(zkutil::makeSetRequest(
storage.replica_path + "/parts/" + part->name + "/checksums",
storage.getChecksumsForZooKeeper(transaction->getNewChecksums()),
-1));
try
{
zookeeper->multi(ops);
}
catch (const zkutil::KeeperException & e)
{
/// The part does not exist in ZK. We will add to queue for verification - maybe the part is superfluous, and it must be removed locally.
if (e.code == ZooKeeperImpl::ZooKeeper::ZNONODE)
storage.enqueuePartForCheck(part->name);
throw;
}
/// Apply file changes.
transaction->commit();
zookeeper->multi(ops);
}
catch (const zkutil::KeeperException & e)
{
/// The part does not exist in ZK. We will add to queue for verification - maybe the part is superfluous, and it must be removed locally.
if (e.code == ZooKeeperImpl::ZooKeeper::ZNONODE)
storage.enqueuePartForCheck(part->name);
/// Columns sizes could be quietly changed in case of MODIFY/ADD COLUMN
storage.data.recalculateColumnSizes();
throw;
}
/// List of columns for a specific replica.
zookeeper->set(storage.replica_path + "/columns", columns_str);
/// Apply file changes.
transaction->commit();
}
if (changed_version)
{
if (changed_parts != 0)
LOG_INFO(log, "ALTER-ed " << changed_parts << " parts");
else
LOG_INFO(log, "No parts ALTER-ed");
}
/// Columns sizes could be quietly changed in case of MODIFY/ADD COLUMN
storage.data.recalculateColumnSizes();
/// List of columns for a specific replica.
zookeeper->set(storage.replica_path + "/columns", columns_str);
force_recheck_parts = false;
if (changed_version)
{
if (changed_parts != 0)
LOG_INFO(log, "ALTER-ed " << changed_parts << " parts");
else
LOG_INFO(log, "No parts ALTER-ed");
}
/// It's important that parts and merge_blocker are destroyed before the wait.
force_recheck_parts = false;
}
wakeup_event->wait();
/// It's important that parts and merge_blocker are destroyed before the wait.
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
force_recheck_parts = true;
force_recheck_parts = true;
wakeup_event->tryWait(ALTER_ERROR_SLEEP_MS);
}
task_handle->scheduleAfter(ALTER_ERROR_SLEEP_MS);
}
LOG_DEBUG(log, "Alter thread finished");
}
}
#pragma once
#include <thread>
#include <Common/BackgroundSchedulePool.h>
#include <Common/ZooKeeper/Types.h>
#include <Core/Types.h>
#include <common/logger_useful.h>
......@@ -21,25 +22,14 @@ class ReplicatedMergeTreeAlterThread
{
public:
ReplicatedMergeTreeAlterThread(StorageReplicatedMergeTree & storage_);
~ReplicatedMergeTreeAlterThread()
{
need_stop = true;
wakeup_event->set();
if (thread.joinable())
thread.join();
}
~ReplicatedMergeTreeAlterThread();
private:
void run();
StorageReplicatedMergeTree & storage;
Logger * log;
zkutil::EventPtr wakeup_event { std::make_shared<Poco::Event>() };
std::atomic<bool> need_stop { false };
std::thread thread;
BackgroundSchedulePool::TaskHandle task_handle;
};
}
......@@ -362,7 +362,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
if (multi_code == ZooKeeperImpl::ZooKeeper::ZOK)
{
transaction.commit();
storage.merge_selecting_event.set();
storage.merge_selecting_task_handle->schedule();
/// Lock nodes have been already deleted, do not delete them in destructor
block_number_lock.assumeUnlocked();
......
......@@ -17,34 +17,33 @@ namespace ErrorCodes
ReplicatedMergeTreeCleanupThread::ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_)
: storage(storage_),
log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, CleanupThread)")),
thread([this] { run(); })
log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, CleanupThread)"))
{
task_handle = storage.context.getSchedulePool().addTask("ReplicatedMergeTreeCleanupThread", [this]{ run(); });
task_handle->schedule();
}
ReplicatedMergeTreeCleanupThread::~ReplicatedMergeTreeCleanupThread()
{
storage.context.getSchedulePool().removeTask(task_handle);
}
void ReplicatedMergeTreeCleanupThread::run()
{
setThreadName("ReplMTCleanup");
const auto CLEANUP_SLEEP_MS = storage.data.settings.cleanup_delay_period * 1000
+ std::uniform_int_distribution<UInt64>(0, storage.data.settings.cleanup_delay_period_random_add * 1000)(rng);
while (!storage.shutdown_called)
try
{
try
{
iterate();
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
storage.cleanup_thread_event.tryWait(CLEANUP_SLEEP_MS);
iterate();
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
LOG_DEBUG(log, "Cleanup thread finished");
task_handle->scheduleAfter(CLEANUP_SLEEP_MS);
}
......@@ -236,11 +235,4 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper &
std::sort(timed_blocks.begin(), timed_blocks.end(), NodeWithStat::greaterByTime);
}
ReplicatedMergeTreeCleanupThread::~ReplicatedMergeTreeCleanupThread()
{
if (thread.joinable())
thread.join();
}
}
......@@ -4,6 +4,7 @@
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <common/logger_useful.h>
#include <Common/BackgroundSchedulePool.h>
#include <thread>
#include <map>
......@@ -25,10 +26,12 @@ public:
~ReplicatedMergeTreeCleanupThread();
void schedule() { task_handle->schedule(); }
private:
StorageReplicatedMergeTree & storage;
Logger * log;
std::thread thread;
BackgroundSchedulePool::TaskHandle task_handle;
pcg64 rng;
void run();
......
......@@ -21,34 +21,34 @@ ReplicatedMergeTreePartCheckThread::ReplicatedMergeTreePartCheckThread(StorageRe
: storage(storage_),
log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, PartCheckThread)"))
{
task_handle = storage.context.getSchedulePool().addTask("ReplicatedMergeTreePartCheckThread", [this] { run(); });
task_handle->schedule();
}
ReplicatedMergeTreePartCheckThread::~ReplicatedMergeTreePartCheckThread()
{
stop();
storage.context.getSchedulePool().removeTask(task_handle);
}
void ReplicatedMergeTreePartCheckThread::start()
{
std::lock_guard<std::mutex> lock(start_stop_mutex);
if (need_stop)
need_stop = false;
else
thread = std::thread([this] { run(); });
need_stop = false;
task_handle->activate();
task_handle->schedule();
}
void ReplicatedMergeTreePartCheckThread::stop()
{
std::lock_guard<std::mutex> lock(start_stop_mutex);
//based on discussion on https://github.com/yandex/ClickHouse/pull/1489#issuecomment-344756259
//using the schedule pool there is no problem in case stop is called two time in row and the start multiple times
std::lock_guard<std::mutex> lock(start_stop_mutex);
need_stop = true;
if (thread.joinable())
{
wakeup_event.set();
thread.join();
need_stop = false;
}
task_handle->deactivate();
}
void ReplicatedMergeTreePartCheckThread::enqueuePart(const String & name, time_t delay_to_check_seconds)
{
std::lock_guard<std::mutex> lock(parts_mutex);
......@@ -58,7 +58,7 @@ void ReplicatedMergeTreePartCheckThread::enqueuePart(const String & name, time_t
parts_queue.emplace_back(name, time(nullptr) + delay_to_check_seconds);
parts_set.insert(name);
wakeup_event.set();
task_handle->schedule();
}
......@@ -309,86 +309,74 @@ void ReplicatedMergeTreePartCheckThread::checkPart(const String & part_name)
void ReplicatedMergeTreePartCheckThread::run()
{
setThreadName("ReplMTPartCheck");
if (need_stop)
return;
while (!need_stop)
try
{
try
{
time_t current_time = time(nullptr);
time_t current_time = time(nullptr);
/// Take part from the queue for verification.
PartsToCheckQueue::iterator selected = parts_queue.end(); /// end from std::list is not get invalidated
time_t min_check_time = std::numeric_limits<time_t>::max();
/// Take part from the queue for verification.
PartsToCheckQueue::iterator selected = parts_queue.end(); /// end from std::list is not get invalidated
time_t min_check_time = std::numeric_limits<time_t>::max();
{
std::lock_guard<std::mutex> lock(parts_mutex);
{
std::lock_guard<std::mutex> lock(parts_mutex);
if (parts_queue.empty())
if (parts_queue.empty())
{
if (!parts_set.empty())
{
if (!parts_set.empty())
{
LOG_ERROR(log, "Non-empty parts_set with empty parts_queue. This is a bug.");
parts_set.clear();
}
LOG_ERROR(log, "Non-empty parts_set with empty parts_queue. This is a bug.");
parts_set.clear();
}
else
}
else
{
for (auto it = parts_queue.begin(); it != parts_queue.end(); ++it)
{
for (auto it = parts_queue.begin(); it != parts_queue.end(); ++it)
if (it->second <= current_time)
{
if (it->second <= current_time)
{
selected = it;
break;
}
if (it->second < min_check_time)
min_check_time = it->second;
selected = it;
break;
}
if (it->second < min_check_time)
min_check_time = it->second;
}
}
}
if (selected == parts_queue.end())
{
/// Poco::Event is triggered immediately if `signal` was before the `wait` call.
/// We can wait a little more than we need due to the use of the old `current_time`.
if (min_check_time != std::numeric_limits<time_t>::max() && min_check_time > current_time)
wakeup_event.tryWait(1000 * (min_check_time - current_time));
else
wakeup_event.wait();
if (selected == parts_queue.end())
return;
continue;
}
checkPart(selected->first);
checkPart(selected->first);
if (need_stop)
return;
if (need_stop)
break;
/// Remove the part from check queue.
{
std::lock_guard<std::mutex> lock(parts_mutex);
/// Remove the part from check queue.
if (parts_queue.empty())
{
std::lock_guard<std::mutex> lock(parts_mutex);
if (parts_queue.empty())
{
LOG_ERROR(log, "Someone erased cheking part from parts_queue. This is a bug.");
}
else
{
parts_set.erase(selected->first);
parts_queue.erase(selected);
}
LOG_ERROR(log, "Someone erased cheking part from parts_queue. This is a bug.");
}
else
{
parts_set.erase(selected->first);
parts_queue.erase(selected);
}
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
wakeup_event.tryWait(PART_CHECK_ERROR_SLEEP_MS);
}
}
LOG_DEBUG(log, "Part check thread finished");
task_handle->schedule();
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
task_handle->scheduleAfter(PART_CHECK_ERROR_SLEEP_MS);
}
}
}
......@@ -10,7 +10,7 @@
#include <Poco/Event.h>
#include <Core/Types.h>
#include <common/logger_useful.h>
#include <Common/BackgroundSchedulePool.h>
namespace DB
{
......@@ -29,6 +29,7 @@ class ReplicatedMergeTreePartCheckThread
{
public:
ReplicatedMergeTreePartCheckThread(StorageReplicatedMergeTree & storage_);
~ReplicatedMergeTreePartCheckThread();
/// Processing of the queue to be checked is done in the background thread, which you must first start.
void start();
......@@ -65,10 +66,7 @@ public:
/// Get the number of parts in the queue for check.
size_t size() const;
~ReplicatedMergeTreePartCheckThread()
{
stop();
}
private:
void run();
......@@ -91,11 +89,10 @@ private:
mutable std::mutex parts_mutex;
StringSet parts_set;
PartsToCheckQueue parts_queue;
Poco::Event wakeup_event;
std::mutex start_stop_mutex;
std::atomic<bool> need_stop { false };
std::thread thread;
BackgroundSchedulePool::TaskHandle task_handle;
};
}
......@@ -259,7 +259,7 @@ bool ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, const Stri
}
bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, zkutil::EventPtr next_update_event)
bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, BackgroundSchedulePool::TaskHandle next_update_task_handle)
{
std::lock_guard<std::mutex> lock(pull_logs_to_queue_mutex);
......@@ -388,10 +388,10 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z
}
}
if (next_update_event)
if (next_update_task_handle)
{
if (zookeeper->exists(zookeeper_path + "/log/log-" + padIndex(index), nullptr, next_update_event))
next_update_event->set();
if (zookeeper->existsWatch(zookeeper_path + "/log/log-" + padIndex(index), nullptr, next_update_task_handle->getWatchCallback()))
next_update_task_handle->schedule();
}
return !log_entries.empty();
......
......@@ -7,6 +7,7 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/BackgroundSchedulePool.h>
namespace DB
......@@ -156,10 +157,10 @@ public:
bool remove(zkutil::ZooKeeperPtr zookeeper, const String & part_name);
/** Copy the new entries from the shared log to the queue of this replica. Set the log_pointer to the appropriate value.
* If next_update_event != nullptr, will call this event when new entries appear in the log.
* If next_update_task_handle != nullptr, will schedule this task when new entries appear in the log.
* Returns true if new entries have been.
*/
bool pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, zkutil::EventPtr next_update_event);
bool pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, BackgroundSchedulePool::TaskHandle next_update_task_handle);
/** Remove the action from the queue with the parts covered by part_name (from ZK and from the RAM).
* And also wait for the completion of their execution, if they are now being executed.
......
......@@ -28,6 +28,10 @@ namespace ErrorCodes
extern const int REPLICA_IS_ALREADY_ACTIVE;
}
namespace
{
constexpr auto retry_period_ms = 10 * 1000;
}
/// Used to check whether it's us who set node `is_active`, or not.
static String generateActiveNodeIdentifier()
......@@ -35,134 +39,140 @@ static String generateActiveNodeIdentifier()
return "pid: " + toString(getpid()) + ", random: " + toString(randomSeed());
}
ReplicatedMergeTreeRestartingThread::ReplicatedMergeTreeRestartingThread(StorageReplicatedMergeTree & storage_)
: storage(storage_),
log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, RestartingThread)")),
active_node_identifier(generateActiveNodeIdentifier()),
thread([this] { run(); })
active_node_identifier(generateActiveNodeIdentifier())
{
check_period_ms = storage.data.settings.zookeeper_session_expiration_check_period.totalSeconds() * 1000;
/// Periodicity of checking lag of replica.
if (check_period_ms > static_cast<Int64>(storage.data.settings.check_delay_period) * 1000)
check_period_ms = storage.data.settings.check_delay_period * 1000;
storage.queue_updating_task_handle = storage.context.getSchedulePool().addTask("StorageReplicatedMergeTree::queueUpdatingThread", [this]{ storage.queueUpdatingThread(); });
storage.queue_updating_task_handle->deactivate();
task_handle = storage.context.getSchedulePool().addTask("ReplicatedMergeTreeRestartingThread", [this]{ run(); });
task_handle->schedule();
}
ReplicatedMergeTreeRestartingThread::~ReplicatedMergeTreeRestartingThread()
{
storage.context.getSchedulePool().removeTask(task_handle);
completeShutdown();
storage.context.getSchedulePool().removeTask(storage.queue_updating_task_handle);
}
void ReplicatedMergeTreeRestartingThread::run()
{
constexpr auto retry_period_ms = 10 * 1000;
if (need_stop)
return;
/// The frequency of checking expiration of session in ZK.
Int64 check_period_ms = storage.data.settings.zookeeper_session_expiration_check_period.totalSeconds() * 1000;
try
{
if (first_time || storage.getZooKeeper()->expired())
{
startup_completed = false;
/// Periodicity of checking lag of replica.
if (check_period_ms > static_cast<Int64>(storage.data.settings.check_delay_period) * 1000)
check_period_ms = storage.data.settings.check_delay_period * 1000;
if (first_time)
{
LOG_DEBUG(log, "Activating replica.");
}
else
{
LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session.");
setThreadName("ReplMTRestart");
bool old_val = false;
if (storage.is_readonly.compare_exchange_strong(old_val, true))
CurrentMetrics::add(CurrentMetrics::ReadonlyReplica);
bool first_time = true; /// Activate replica for the first time.
time_t prev_time_of_check_delay = 0;
partialShutdown();
}
/// Starts the replica when the server starts/creates a table. Restart the replica when session expires with ZK.
while (!need_stop)
{
try
{
if (first_time || storage.getZooKeeper()->expired())
if (!startup_completed)
{
if (first_time)
try
{
LOG_DEBUG(log, "Activating replica.");
storage.setZooKeeper(storage.context.getZooKeeper());
}
else
catch (const zkutil::KeeperException & e)
{
LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session.");
bool old_val = false;
if (storage.is_readonly.compare_exchange_strong(old_val, true))
CurrentMetrics::add(CurrentMetrics::ReadonlyReplica);
/// The exception when you try to zookeeper_init usually happens if DNS does not work. We will try to do it again.
tryLogCurrentException(log, __PRETTY_FUNCTION__);
partialShutdown();
if (first_time)
storage.startup_event.set();
task_handle->scheduleAfter(retry_period_ms);
return;
}
while (!need_stop)
if (!need_stop && !tryStartup())
{
try
{
storage.setZooKeeper(storage.context.getZooKeeper());
}
catch (const zkutil::KeeperException & e)
{
/// The exception when you try to zookeeper_init usually happens if DNS does not work. We will try to do it again.
tryLogCurrentException(log, __PRETTY_FUNCTION__);
if (first_time)
storage.startup_event.set();
wakeup_event.tryWait(retry_period_ms);
continue;
}
if (!need_stop && !tryStartup())
{
if (first_time)
storage.startup_event.set();
wakeup_event.tryWait(retry_period_ms);
continue;
}
if (first_time)
storage.startup_event.set();
break;
task_handle->scheduleAfter(retry_period_ms);
return;
}
if (need_stop)
break;
bool old_val = true;
if (storage.is_readonly.compare_exchange_strong(old_val, false))
CurrentMetrics::sub(CurrentMetrics::ReadonlyReplica);
if (first_time)
storage.startup_event.set();
first_time = false;
startup_completed = true;
}
time_t current_time = time(nullptr);
if (current_time >= prev_time_of_check_delay + static_cast<time_t>(storage.data.settings.check_delay_period))
{
/// Find out lag of replicas.
time_t absolute_delay = 0;
time_t relative_delay = 0;
if (need_stop)
return;
storage.getReplicaDelays(absolute_delay, relative_delay);
bool old_val = true;
if (storage.is_readonly.compare_exchange_strong(old_val, false))
CurrentMetrics::sub(CurrentMetrics::ReadonlyReplica);
if (absolute_delay)
LOG_TRACE(log, "Absolute delay: " << absolute_delay << ". Relative delay: " << relative_delay << ".");
first_time = false;
}
prev_time_of_check_delay = current_time;
time_t current_time = time(nullptr);
if (current_time >= prev_time_of_check_delay + static_cast<time_t>(storage.data.settings.check_delay_period))
{
/// Find out lag of replicas.
time_t absolute_delay = 0;
time_t relative_delay = 0;
/// We give up leadership if the relative lag is greater than threshold.
if (storage.is_leader
&& relative_delay > static_cast<time_t>(storage.data.settings.min_relative_delay_to_yield_leadership))
{
LOG_INFO(log, "Relative replica delay (" << relative_delay << " seconds) is bigger than threshold ("
<< storage.data.settings.min_relative_delay_to_yield_leadership << "). Will yield leadership.");
storage.getReplicaDelays(absolute_delay, relative_delay);
ProfileEvents::increment(ProfileEvents::ReplicaYieldLeadership);
if (absolute_delay)
LOG_TRACE(log, "Absolute delay: " << absolute_delay << ". Relative delay: " << relative_delay << ".");
storage.exitLeaderElection();
/// NOTE: enterLeaderElection() can throw if node creation in ZK fails.
/// This is bad because we can end up without a leader on any replica.
/// In this case we rely on the fact that the session will expire and we will reconnect.
storage.enterLeaderElection();
}
prev_time_of_check_delay = current_time;
/// We give up leadership if the relative lag is greater than threshold.
if (storage.is_leader
&& relative_delay > static_cast<time_t>(storage.data.settings.min_relative_delay_to_yield_leadership))
{
LOG_INFO(log, "Relative replica delay (" << relative_delay << " seconds) is bigger than threshold ("
<< storage.data.settings.min_relative_delay_to_yield_leadership << "). Will yield leadership.");
ProfileEvents::increment(ProfileEvents::ReplicaYieldLeadership);
storage.exitLeaderElection();
/// NOTE: enterLeaderElection() can throw if node creation in ZK fails.
/// This is bad because we can end up without a leader on any replica.
/// In this case we rely on the fact that the session will expire and we will reconnect.
storage.enterLeaderElection();
}
}
catch (...)
{
storage.startup_event.set();
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
wakeup_event.tryWait(check_period_ms);
}
catch (...)
{
storage.startup_event.set();
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
task_handle->scheduleAfter(check_period_ms);
}
void ReplicatedMergeTreeRestartingThread::completeShutdown()
{
try
{
storage.data_parts_exchange_endpoint_holder->cancelForever();
......@@ -182,8 +192,6 @@ void ReplicatedMergeTreeRestartingThread::run()
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
LOG_DEBUG(log, "Restarting thread finished");
}
......@@ -204,7 +212,8 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
storage.shutdown_called = false;
storage.shutdown_event.reset();
storage.queue_updating_thread = std::thread(&StorageReplicatedMergeTree::queueUpdatingThread, &storage);
storage.queue_updating_task_handle->activate();
storage.queue_updating_task_handle->schedule();
storage.part_check_thread.start();
storage.alter_thread = std::make_unique<ReplicatedMergeTreeAlterThread>(storage);
storage.cleanup_thread = std::make_unique<ReplicatedMergeTreeCleanupThread>(storage);
......@@ -354,18 +363,14 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
storage.shutdown_called = true;
storage.shutdown_event.set();
storage.merge_selecting_event.set();
storage.queue_updating_event->set();
storage.alter_query_event->set();
storage.cleanup_thread_event.set();
storage.replica_is_active_node = nullptr;
LOG_TRACE(log, "Waiting for threads to finish");
storage.exitLeaderElection();
if (storage.queue_updating_thread.joinable())
storage.queue_updating_thread.join();
storage.queue_updating_task_handle->deactivate();
storage.cleanup_thread.reset();
storage.alter_thread.reset();
......
......@@ -2,6 +2,7 @@
#include <Poco/Event.h>
#include <common/logger_useful.h>
#include <Common/BackgroundSchedulePool.h>
#include <Core/Types.h>
#include <thread>
#include <atomic>
......@@ -22,16 +23,12 @@ class ReplicatedMergeTreeRestartingThread
{
public:
ReplicatedMergeTreeRestartingThread(StorageReplicatedMergeTree & storage_);
~ReplicatedMergeTreeRestartingThread()
{
if (thread.joinable())
thread.join();
}
~ReplicatedMergeTreeRestartingThread();
void wakeup()
{
wakeup_event.set();
task_handle->schedule();
}
Poco::Event & getWakeupEvent()
......@@ -42,7 +39,7 @@ public:
void stop()
{
need_stop = true;
wakeup();
wakeup_event.set();
}
private:
......@@ -54,9 +51,14 @@ private:
/// The random data we wrote into `/replicas/me/is_active`.
String active_node_identifier;
std::thread thread;
BackgroundSchedulePool::TaskHandle task_handle;
Int64 check_period_ms; /// The frequency of checking expiration of session in ZK.
bool first_time = true; /// Activate replica for the first time.
time_t prev_time_of_check_delay = 0;
bool startup_completed = false;
void run();
void completeShutdown();
/// Start or stop background threads. Used for partial reinitialization when re-creating a session in ZooKeeper.
bool tryStartup(); /// Returns false if ZooKeeper is not available.
......
......@@ -108,6 +108,26 @@ namespace ErrorCodes
static const auto QUEUE_UPDATE_ERROR_SLEEP_MS = 1 * 1000;
static const auto MERGE_SELECTING_SLEEP_MS = 5 * 1000;
template <typename Key> struct CachedMergingPredicate;
class ReplicatedMergeTreeMergeSelectingThread
{
public:
ReplicatedMergeTreeMergeSelectingThread(StorageReplicatedMergeTree* storage_);
void clearState();
bool deduplicate;
std::chrono::steady_clock::time_point now;
std::function<bool(const MergeTreeData::DataPartPtr &, const MergeTreeData::DataPartPtr &, String *)> can_merge;
private:
StorageReplicatedMergeTree* storage;
std::function<bool(const MergeTreeData::DataPartPtr &, const MergeTreeData::DataPartPtr &)> uncached_merging_predicate;
std::function<std::pair<String, String>(const MergeTreeData::DataPartPtr &, const MergeTreeData::DataPartPtr &)> merging_predicate_args_to_key;
std::unique_ptr<CachedMergingPredicate<std::pair<std::string, std::string>> > cached_merging_predicate;
};
/** There are three places for each part, where it should be
* 1. In the RAM, MergeTreeData::data_parts, all_data_parts.
......@@ -216,6 +236,9 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
zookeeper_path = "/" + zookeeper_path;
replica_path = zookeeper_path + "/replicas/" + replica_name;
merge_sel_state.reset(new ReplicatedMergeTreeMergeSelectingThread(this));
merge_selecting_task_handle = context_.getSchedulePool().addTask("StorageReplicatedMergeTree::mergeSelectingThread", [this] { mergeSelectingThread(); });
bool skip_sanity_checks = false;
try
......@@ -1015,9 +1038,9 @@ String StorageReplicatedMergeTree::getChecksumsForZooKeeper(const MergeTreeDataP
}
void StorageReplicatedMergeTree::pullLogsToQueue(zkutil::EventPtr next_update_event)
void StorageReplicatedMergeTree::pullLogsToQueue(BackgroundSchedulePool::TaskHandle next_update_task_handle)
{
if (queue.pullLogsToQueue(getZooKeeper(), next_update_event))
if (queue.pullLogsToQueue(getZooKeeper(), next_update_task_handle))
{
if (queue_task_handle)
queue_task_handle->wake();
......@@ -1277,7 +1300,7 @@ void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTre
/** With `ZCONNECTIONLOSS` or `ZOPERATIONTIMEOUT`, we can inadvertently roll back local changes to the parts.
* This is not a problem, because in this case the merge will remain in the queue, and we will try again.
*/
merge_selecting_event.set();
merge_selecting_task_handle->schedule();
ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
write_part_log({});
......@@ -1593,31 +1616,26 @@ void StorageReplicatedMergeTree::executeClearColumnInPartition(const LogEntry &
void StorageReplicatedMergeTree::queueUpdatingThread()
{
setThreadName("ReplMTQueueUpd");
//most probably this check is not relevant
if (shutdown_called)
return;
bool update_in_progress = false;
while (!shutdown_called)
if (!queue_update_in_progress)
{
if (!update_in_progress)
{
last_queue_update_start_time.store(time(nullptr));
update_in_progress = true;
}
try
{
pullLogsToQueue(queue_updating_event);
last_queue_update_finish_time.store(time(nullptr));
update_in_progress = false;
queue_updating_event->wait();
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
queue_updating_event->tryWait(QUEUE_UPDATE_ERROR_SLEEP_MS);
}
last_queue_update_start_time.store(time(nullptr));
queue_update_in_progress = true;
}
try
{
pullLogsToQueue(queue_updating_task_handle);
last_queue_update_finish_time.store(time(nullptr));
queue_update_in_progress = false;
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
queue_updating_task_handle->scheduleAfter(QUEUE_UPDATE_ERROR_SLEEP_MS);
}
LOG_DEBUG(log, "Queue updating thread finished");
}
......@@ -1755,7 +1773,7 @@ namespace
return true;
}
}
/// If any of the parts is already going to be merged into a larger one, do not agree to merge it.
bool partsWillNotBeMergedOrDisabled(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right,
......@@ -1858,94 +1876,67 @@ namespace
template <typename Key> constexpr CachedMergingPredicate<Key>::clock::duration CachedMergingPredicate<Key>::Expiration::min_delay;
template <typename Key> constexpr CachedMergingPredicate<Key>::clock::duration CachedMergingPredicate<Key>::Expiration::max_delay;
template <typename Key> constexpr double CachedMergingPredicate<Key>::Expiration::exponent_base;
}
void StorageReplicatedMergeTree::mergeSelectingThread()
{
setThreadName("ReplMTMergeSel");
LOG_DEBUG(log, "Merge selecting thread started");
bool deduplicate = false; /// TODO: read deduplicate option from table config
if (!is_leader)
return;
auto uncached_merging_predicate = [this](const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
{
return canMergePartsAccordingToZooKeeperInfo(left, right, getZooKeeper(), zookeeper_path, data);
};
bool success = false;
auto merging_predicate_args_to_key = [](const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
try
{
return std::make_pair(left->name, right->name);
};
CachedMergingPredicate<std::pair<std::string, std::string>> cached_merging_predicate;
/// Will be updated below.
std::chrono::steady_clock::time_point now;
std::lock_guard<std::mutex> merge_selecting_lock(merge_selecting_mutex);
auto can_merge = [&] (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, String *)
{
return partsWillNotBeMergedOrDisabled(left, right, queue)
&& cached_merging_predicate.get(now, uncached_merging_predicate, merging_predicate_args_to_key, left, right);
};
/// You need to load new entries into the queue before you select parts to merge.
/// (so we know which parts are already going to be merged).
/// We must select parts for merge under the mutex because other threads (OPTIMIZE queries) could push new merges.
if (merge_selecting_logs_pulling_is_required)
{
pullLogsToQueue();
merge_selecting_logs_pulling_is_required = false;
}
while (is_leader)
{
bool success = false;
/// If many merges is already queued, then will queue only small enough merges.
/// Otherwise merge queue could be filled with only large merges,
/// and in the same time, many small parts could be created and won't be merged.
size_t merges_queued = queue.countMerges();
try
if (merges_queued >= data.settings.max_replicated_merges_in_queue)
{
LOG_TRACE(log, "Number of queued merges (" << merges_queued
<< ") is greater than max_replicated_merges_in_queue ("
<< data.settings.max_replicated_merges_in_queue << "), so won't select new parts to merge.");
}
else
{
std::lock_guard<std::mutex> merge_selecting_lock(merge_selecting_mutex);
MergeTreeDataMerger::FuturePart future_merged_part;
/// You need to load new entries into the queue before you select parts to merge.
/// (so we know which parts are already going to be merged).
/// We must select parts for merge under the mutex because other threads (OPTIMIZE queries) could push new merges.
if (merge_selecting_logs_pulling_is_required)
{
pullLogsToQueue();
merge_selecting_logs_pulling_is_required = false;
}
size_t max_parts_size_for_merge = merger.getMaxPartsSizeForMerge(data.settings.max_replicated_merges_in_queue, merges_queued);
/// If many merges is already queued, then will queue only small enough merges.
/// Otherwise merge queue could be filled with only large merges,
/// and in the same time, many small parts could be created and won't be merged.
size_t merges_queued = queue.countMerges();
merge_sel_state->now = std::chrono::steady_clock::now();
if (merges_queued >= data.settings.max_replicated_merges_in_queue)
if (max_parts_size_for_merge > 0
&& merger.selectPartsToMerge(future_merged_part, false, max_parts_size_for_merge, merge_sel_state->can_merge))
{
LOG_TRACE(log, "Number of queued merges (" << merges_queued
<< ") is greater than max_replicated_merges_in_queue ("
<< data.settings.max_replicated_merges_in_queue << "), so won't select new parts to merge.");
merge_selecting_logs_pulling_is_required = true;
success = createLogEntryToMergeParts(future_merged_part.parts, future_merged_part.name, merge_sel_state->deduplicate);
}
else
{
MergeTreeDataMerger::FuturePart future_merged_part;
size_t max_parts_size_for_merge = merger.getMaxPartsSizeForMerge(data.settings.max_replicated_merges_in_queue, merges_queued);
now = std::chrono::steady_clock::now();
if (max_parts_size_for_merge > 0
&& merger.selectPartsToMerge(future_merged_part, false, max_parts_size_for_merge, can_merge))
{
merge_selecting_logs_pulling_is_required = true;
success = createLogEntryToMergeParts(future_merged_part.parts, future_merged_part.name, deduplicate);
}
}
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
if (!is_leader)
break;
if (!is_leader)
return;
if (!success)
merge_selecting_event.tryWait(MERGE_SELECTING_SLEEP_MS);
}
if (!success)
merge_selecting_task_handle->scheduleAfter(MERGE_SELECTING_SLEEP_MS);
else
merge_selecting_task_handle->schedule();
LOG_DEBUG(log, "Merge selecting thread finished");
}
......@@ -2051,12 +2042,15 @@ void StorageReplicatedMergeTree::enterLeaderElection()
LOG_INFO(log, "Became leader");
is_leader = true;
merge_selecting_thread = std::thread(&StorageReplicatedMergeTree::mergeSelectingThread, this);
merge_sel_state->clearState();
merge_selecting_task_handle->activate();
merge_selecting_task_handle->schedule();
};
try
{
leader_election = std::make_shared<zkutil::LeaderElection>(
context.getSchedulePool(),
zookeeper_path + "/leader_election",
*current_zookeeper, /// current_zookeeper lives for the lifetime of leader_election,
/// since before changing `current_zookeeper`, `leader_election` object is destroyed in `partialShutdown` method.
......@@ -2085,8 +2079,7 @@ void StorageReplicatedMergeTree::exitLeaderElection()
LOG_INFO(log, "Stopped being leader");
is_leader = false;
merge_selecting_event.set();
merge_selecting_thread.join();
merge_selecting_task_handle->deactivate();
}
/// Delete the node in ZK only after we have stopped the merge_selecting_thread - so that only one
......@@ -2265,7 +2258,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
{
LOG_DEBUG(log, "Part " << part->getNameWithState() << " should be deleted after previous attempt before fetch");
/// Force immediate parts cleanup to delete the part that was left from the previous fetch attempt.
cleanup_thread_event.set();
cleanup_thread->schedule();
return false;
}
......@@ -2360,7 +2353,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
if (quorum)
updateQuorum(part_name);
merge_selecting_event.set();
merge_selecting_task_handle->schedule();
for (const auto & replaced_part : replaced_parts)
{
......@@ -2446,6 +2439,8 @@ StorageReplicatedMergeTree::~StorageReplicatedMergeTree()
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
context.getSchedulePool().removeTask(merge_selecting_task_handle);
}
......@@ -3780,4 +3775,37 @@ void StorageReplicatedMergeTree::clearBlocksInPartition(
LOG_TRACE(log, "Deleted " << to_delete_futures.size() << " deduplication block IDs in partition ID " << partition_id);
}
ReplicatedMergeTreeMergeSelectingThread::ReplicatedMergeTreeMergeSelectingThread(StorageReplicatedMergeTree* storage_) :
storage(storage_)
{
clearState();
}
void ReplicatedMergeTreeMergeSelectingThread::clearState()
{
deduplicate = false; /// TODO: read deduplicate option from table config
uncached_merging_predicate = [this](const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
{
return canMergePartsAccordingToZooKeeperInfo(left, right, storage->getZooKeeper(), storage->zookeeper_path, storage->data);
};
merging_predicate_args_to_key = [](const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
{
return std::make_pair(left->name, right->name);
};
cached_merging_predicate.reset(new CachedMergingPredicate<std::pair<std::string, std::string>>());
/// Will be updated below.
now = std::chrono::steady_clock::time_point();
can_merge = [&] (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, String *)
{
return partsWillNotBeMergedOrDisabled(left, right, storage->queue)
&& cached_merging_predicate->get(now, uncached_merging_predicate, merging_predicate_args_to_key, left, right);
};
}
}
......@@ -21,11 +21,14 @@
#include <Common/randomSeed.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/LeaderElection.h>
#include <Common/BackgroundSchedulePool.h>
namespace DB
{
class ReplicatedMergeTreeMergeSelectingThread;
/** The engine that uses the merge tree (see MergeTreeData) and replicated through ZooKeeper.
*
* ZooKeeper is used for the following things:
......@@ -185,6 +188,7 @@ private:
friend class ReplicatedMergeTreeRestartingThread;
friend struct ReplicatedMergeTreeLogEntry;
friend class ScopedPartitionMergeLock;
friend class ReplicatedMergeTreeMergeSelectingThread;
using LogEntry = ReplicatedMergeTreeLogEntry;
using LogEntryPtr = LogEntry::Ptr;
......@@ -252,16 +256,19 @@ private:
/// Threads.
/// A thread that keeps track of the updates in the logs of all replicas and loads them into the queue.
std::thread queue_updating_thread;
zkutil::EventPtr queue_updating_event = std::make_shared<Poco::Event>();
/// A task that keeps track of the updates in the logs of all replicas and loads them into the queue.
bool queue_update_in_progress = false;
BackgroundSchedulePool::TaskHandle queue_updating_task_handle;
/// A task that performs actions from the queue.
BackgroundProcessingPool::TaskHandle queue_task_handle;
/// A thread that selects parts to merge.
std::thread merge_selecting_thread;
Poco::Event merge_selecting_event;
/// A task that selects parts to merge.
BackgroundSchedulePool::TaskHandle merge_selecting_task_handle;
/// State for merge selecting thread
std::unique_ptr<ReplicatedMergeTreeMergeSelectingThread> merge_sel_state;
/// It is acquired for each iteration of the selection of parts to merge or each OPTIMIZE query.
std::mutex merge_selecting_mutex;
/// If true then new entries might added to the queue, so we must pull logs before selecting parts for merge.
......@@ -270,8 +277,6 @@ private:
/// A thread that removes old parts, log entries, and blocks.
std::unique_ptr<ReplicatedMergeTreeCleanupThread> cleanup_thread;
/// Is used to wakeup cleanup_thread
Poco::Event cleanup_thread_event;
/// A thread that processes reconnection to ZooKeeper when the session expires.
std::unique_ptr<ReplicatedMergeTreeRestartingThread> restarting_thread;
......@@ -287,8 +292,6 @@ private:
Logger * log;
/// Initialization.
/** Creates the minimum set of nodes in ZooKeeper.
*/
void createTableIfNotExists();
......@@ -341,9 +344,9 @@ private:
/// Running jobs from the queue.
/** Copies the new entries from the logs of all replicas to the queue of this replica.
* If next_update_event != nullptr, calls this event when new entries appear in the log.
* If next_update_task_handle != nullptr, schedules this task when new entries appear in the log.
*/
void pullLogsToQueue(zkutil::EventPtr next_update_event = nullptr);
void pullLogsToQueue(BackgroundSchedulePool::TaskHandle next_update_task_handle = nullptr);
/** Execute the action from the queue. Throws an exception if something is wrong.
* Returns whether or not it succeeds. If it did not work, write it to the end of the queue.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册