提交 4a35d916 编写于 作者: A Alexey Milovidov

dbms: merges don't block ALTERs [#METR-18690].

上级 f8e5fed8
......@@ -52,11 +52,12 @@ public:
static size_t estimateDiskSpaceForMerge(const MergeTreeData::DataPartsVector & parts);
/** Отменяет все мерджи. Все выполняющиеся сейчас вызовы mergeParts скоро бросят исключение.
* Все новые вызовы будут бросать исключения, пока не будет вызван uncancelAll().
* Все новые вызовы будут бросать исключения, пока не будет вызван uncancel().
* Считает количество таких вызовов для поддержки нескольких наложенных друг на друга отмен.
*/
bool cancelAll() { return cancelled.exchange(true, std::memory_order_relaxed); }
bool uncancelAll() { return cancelled.exchange(false, std::memory_order_relaxed); }
bool isCancelled() const { return cancelled.load(std::memory_order_relaxed); }
void cancel() { cancelled.fetch_add(1); }
void uncancel() { cancelled.fetch_sub(1); }
bool isCancelled() const { return cancelled.load() > 0; }
private:
MergeTreeData & data;
......@@ -66,24 +67,28 @@ private:
/// Когда в последний раз писали в лог, что место на диске кончилось (чтобы не писать об этом слишком часто).
time_t disk_space_warning_time = 0;
std::atomic<bool> cancelled{false};
std::atomic<int> cancelled {0};
};
/** Временно приостанавливает мерджи.
*/
class MergeTreeMergeBlocker
{
public:
MergeTreeMergeBlocker(MergeTreeDataMerger & merger)
: merger(merger), was_cancelled{!merger.cancelAll()} {}
: merger(merger)
{
merger.cancel();
}
~MergeTreeMergeBlocker()
{
if (was_cancelled)
merger.uncancelAll();
merger.uncancel();
}
private:
MergeTreeDataMerger & merger;
const bool was_cancelled;
};
}
......@@ -169,10 +169,6 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
storage.shutdown_called = false;
storage.shutdown_event.reset();
storage.merger.uncancelAll();
if (storage.unreplicated_merger)
storage.unreplicated_merger->uncancelAll();
storage.queue_updating_thread = std::thread(&StorageReplicatedMergeTree::queueUpdatingThread, &storage);
storage.cleanup_thread.reset(new ReplicatedMergeTreeCleanupThread(storage));
storage.alter_thread = std::thread(&StorageReplicatedMergeTree::alterThread, &storage);
......@@ -181,6 +177,10 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
std::bind(&StorageReplicatedMergeTree::queueTask, &storage, std::placeholders::_1));
storage.queue_task_handle->wake();
storage.merger.uncancel();
if (storage.unreplicated_merger)
storage.unreplicated_merger->uncancel();
return true;
}
catch (...)
......@@ -328,9 +328,9 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
storage.parts_to_check_event.set();
storage.replica_is_active_node = nullptr;
storage.merger.cancelAll();
storage.merger.cancel();
if (storage.unreplicated_merger)
storage.unreplicated_merger->cancelAll();
storage.unreplicated_merger->cancel();
LOG_TRACE(log, "Waiting for threads to finish");
if (storage.is_leader_node)
......
......@@ -100,7 +100,7 @@ void StorageMergeTree::shutdown()
if (shutdown_called)
return;
shutdown_called = true;
merger.cancelAll();
merger.cancel();
background_pool.removeTask(merge_task_handle);
}
......
......@@ -1801,139 +1801,153 @@ void StorageReplicatedMergeTree::alterThread()
bool changed_version = (stat.version != columns_version);
MergeTreeData::DataParts parts;
/// Если описание столбцов изменилось, обновим структуру таблицы локально.
if (changed_version)
{
LOG_INFO(log, "Changed version of 'columns' node in ZooKeeper. Waiting for structure write lock.");
auto table_lock = lockStructureForAlter();
/// Если потребуется блокировать структуру таблицы, то приостановим мерджи.
std::unique_ptr<MergeTreeMergeBlocker> merge_blocker;
std::unique_ptr<MergeTreeMergeBlocker> unreplicated_merge_blocker;
const auto columns_changed = columns != data.getColumnsListNonMaterialized();
const auto materialized_columns_changed = materialized_columns != data.materialized_columns;
const auto alias_columns_changed = alias_columns != data.alias_columns;
const auto column_defaults_changed = column_defaults != data.column_defaults;
if (columns_changed || materialized_columns_changed || alias_columns_changed ||
column_defaults_changed)
if (changed_version || force_recheck_parts)
{
LOG_INFO(log, "Columns list changed in ZooKeeper. Applying changes locally.");
merge_blocker = std::make_unique<MergeTreeMergeBlocker>(merger);
if (unreplicated_merger)
unreplicated_merge_blocker = std::make_unique<MergeTreeMergeBlocker>(*unreplicated_merger);
}
InterpreterAlterQuery::updateMetadata(database_name, table_name, columns,
materialized_columns, alias_columns, column_defaults, context);
MergeTreeData::DataParts parts;
if (columns_changed)
{
data.setColumnsList(columns);
/// Если описание столбцов изменилось, обновим структуру таблицы локально.
if (changed_version)
{
LOG_INFO(log, "Changed version of 'columns' node in ZooKeeper. Waiting for structure write lock.");
if (unreplicated_data)
unreplicated_data->setColumnsList(columns);
}
auto table_lock = lockStructureForAlter();
if (materialized_columns_changed)
{
this->materialized_columns = materialized_columns;
data.materialized_columns = std::move(materialized_columns);
}
const auto columns_changed = columns != data.getColumnsListNonMaterialized();
const auto materialized_columns_changed = materialized_columns != data.materialized_columns;
const auto alias_columns_changed = alias_columns != data.alias_columns;
const auto column_defaults_changed = column_defaults != data.column_defaults;
if (alias_columns_changed)
if (columns_changed || materialized_columns_changed || alias_columns_changed ||
column_defaults_changed)
{
this->alias_columns = alias_columns;
data.alias_columns = std::move(alias_columns);
}
LOG_INFO(log, "Columns list changed in ZooKeeper. Applying changes locally.");
if (column_defaults_changed)
{
this->column_defaults = column_defaults;
data.column_defaults = std::move(column_defaults);
}
InterpreterAlterQuery::updateMetadata(database_name, table_name, columns,
materialized_columns, alias_columns, column_defaults, context);
LOG_INFO(log, "Applied changes to table.");
}
else
{
LOG_INFO(log, "Columns version changed in ZooKeeper, but data wasn't changed. It's like cyclic ALTERs.");
}
if (columns_changed)
{
data.setColumnsList(columns);
/// Нужно получить список кусков под блокировкой таблицы, чтобы избежать race condition с мерджем.
parts = data.getDataParts();
if (unreplicated_data)
unreplicated_data->setColumnsList(columns);
}
columns_version = stat.version;
}
if (materialized_columns_changed)
{
this->materialized_columns = materialized_columns;
data.materialized_columns = std::move(materialized_columns);
}
/// Обновим куски.
if (changed_version || force_recheck_parts)
{
auto table_lock = lockStructure(false);
if (alias_columns_changed)
{
this->alias_columns = alias_columns;
data.alias_columns = std::move(alias_columns);
}
if (changed_version)
LOG_INFO(log, "ALTER-ing parts");
if (column_defaults_changed)
{
this->column_defaults = column_defaults;
data.column_defaults = std::move(column_defaults);
}
int changed_parts = 0;
LOG_INFO(log, "Applied changes to table.");
}
else
{
LOG_INFO(log, "Columns version changed in ZooKeeper, but data wasn't changed. It's like cyclic ALTERs.");
}
if (!changed_version)
/// Нужно получить список кусков под блокировкой таблицы, чтобы избежать race condition с мерджем.
parts = data.getDataParts();
const auto columns_plus_materialized = data.getColumnsList();
columns_version = stat.version;
}
for (const MergeTreeData::DataPartPtr & part : parts)
/// Обновим куски.
if (changed_version || force_recheck_parts)
{
/// Обновим кусок и запишем результат во временные файлы.
/// TODO: Можно пропускать проверку на слишком большие изменения, если в ZooKeeper есть, например,
/// нода /flags/force_alter.
auto transaction = data.alterDataPart(part, columns_plus_materialized);
auto table_lock = lockStructure(false);
if (!transaction)
continue;
if (changed_version)
LOG_INFO(log, "ALTER-ing parts");
++changed_parts;
int changed_parts = 0;
/// Обновим метаданные куска в ZooKeeper.
zkutil::Ops ops;
ops.push_back(new zkutil::Op::SetData(
replica_path + "/parts/" + part->name + "/columns", transaction->getNewColumns().toString(), -1));
ops.push_back(new zkutil::Op::SetData(
replica_path + "/parts/" + part->name + "/checksums", transaction->getNewChecksums().toString(), -1));
zookeeper->multi(ops);
/// Применим изменения файлов.
transaction->commit();
}
if (!changed_version)
parts = data.getDataParts();
/// То же самое для нереплицируемых данных.
if (unreplicated_data)
{
parts = unreplicated_data->getDataParts();
const auto columns_plus_materialized = data.getColumnsList();
for (const MergeTreeData::DataPartPtr & part : parts)
{
auto transaction = unreplicated_data->alterDataPart(part, columns_plus_materialized);
/// Обновим кусок и запишем результат во временные файлы.
/// TODO: Можно пропускать проверку на слишком большие изменения, если в ZooKeeper есть, например,
/// нода /flags/force_alter.
auto transaction = data.alterDataPart(part, columns_plus_materialized);
if (!transaction)
continue;
++changed_parts;
/// Обновим метаданные куска в ZooKeeper.
zkutil::Ops ops;
ops.push_back(new zkutil::Op::SetData(
replica_path + "/parts/" + part->name + "/columns", transaction->getNewColumns().toString(), -1));
ops.push_back(new zkutil::Op::SetData(
replica_path + "/parts/" + part->name + "/checksums", transaction->getNewChecksums().toString(), -1));
zookeeper->multi(ops);
/// Применим изменения файлов.
transaction->commit();
}
}
/// Список столбцов для конкретной реплики.
zookeeper->set(replica_path + "/columns", columns_str);
/// То же самое для нереплицируемых данных.
if (unreplicated_data)
{
parts = unreplicated_data->getDataParts();
if (changed_version)
{
if (changed_parts != 0)
LOG_INFO(log, "ALTER-ed " << changed_parts << " parts");
else
LOG_INFO(log, "No parts ALTER-ed");
for (const MergeTreeData::DataPartPtr & part : parts)
{
auto transaction = unreplicated_data->alterDataPart(part, columns_plus_materialized);
if (!transaction)
continue;
++changed_parts;
transaction->commit();
}
}
/// Список столбцов для конкретной реплики.
zookeeper->set(replica_path + "/columns", columns_str);
if (changed_version)
{
if (changed_parts != 0)
LOG_INFO(log, "ALTER-ed " << changed_parts << " parts");
else
LOG_INFO(log, "No parts ALTER-ed");
}
force_recheck_parts = false;
}
force_recheck_parts = false;
/// Важно, что уничтожается parts и merge_blocker перед wait-ом.
}
parts.clear();
alter_thread_event->wait();
}
catch (...)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册