提交 fdd34069 编写于 作者: A Alexey Milovidov

dbms: probably fixed error in cyclic ALTERs [#METR-12509].

上级 ba0191d0
......@@ -1530,59 +1530,76 @@ void StorageReplicatedMergeTree::alterThread()
{
try
{
/** Имеем описание столбцов в ZooKeeper, общее для всех реплик (Пример: /clickhouse/tables/02-06/visits/columns),
* а также описание столбцов в локальном файле с метаданными (data.getColumnsList()).
*
* Если эти описания отличаются - нужно сделать ALTER.
*
* Если запомненная версия ноды (columns_version) отличается от версии в ZK,
* то описание столбцов в ZK не обязательно отличается от локального
* - такое может быть при цикле из ALTER-ов, который в целом, ничего не меняет.
* В этом случае, надо обновить запомненный номер версии,
* а также всё-равно проверить структуру кусков, и, при необходимости, сделать ALTER.
*
* Запомненный номер версии нужно обновить после обновления метаданных, под блокировкой.
* Этот номер версии проверяется на соответствие актуальному при INSERT-е.
* То есть, так добиваемся, чтобы вставлялись блоки с правильной структурой.
*
* При старте сервера, мог быть не завершён предыдущий ALTER.
* Поэтому, в первый раз, независимо от изменений, проверяем структуру всех part-ов,
* (Пример: /clickhouse/tables/02-06/visits/replicas/example02-06-1.yandex.ru/parts/20140806_20140831_131664_134988_3296/columns)
* и делаем ALTER, если необходимо.
*
* TODO: Слишком сложно, всё переделать.
*/
zkutil::Stat stat;
String columns_str = zookeeper->get(zookeeper_path + "/columns", &stat, alter_thread_event);
NamesAndTypesList columns = NamesAndTypesList::parse(columns_str, context.getDataTypeFactory());
bool changed = false;
/// Проверим, что описание столбцов изменилось.
/// Чтобы не останавливать лишний раз все запросы в таблицу, проверим сначала под локом на чтение.
{
auto table_lock = lockStructure(false);
if (columns != data.getColumnsList())
changed = true;
}
bool changed_version = (stat.version != columns_version);
MergeTreeData::DataParts parts;
/// Если описание столбцов изменилось, обновим структуру таблицы локально.
if (changed)
if (changed_version)
{
auto table_lock = lockStructureForAlter();
if (columns != data.getColumnsList())
{
LOG_INFO(log, "Columns list changed in ZooKeeper. Applying changes locally.");
InterpreterAlterQuery::updateMetadata(database_name, table_name, columns, context);
data.setColumnsList(columns);
if (unreplicated_data)
unreplicated_data->setColumnsList(columns);
columns_version = stat.version;
LOG_INFO(log, "Applied changes to table.");
/// Нужно получить список кусков под блокировкой таблицы, чтобы избежать race condition с мерджем.
parts = data.getDataParts();
}
else
{
changed = false;
columns_version = stat.version;
LOG_INFO(log, "Columns version changed in ZooKeeper, but data wasn't changed. It's like cyclic ALTERs.");
}
/// Нужно получить список кусков под блокировкой таблицы, чтобы избежать race condition с мерджем.
parts = data.getDataParts();
columns_version = stat.version;
}
/// Обновим куски.
if (changed || force_recheck_parts)
if (changed_version || force_recheck_parts)
{
if (changed)
auto table_lock = lockStructure(false);
if (changed_version)
LOG_INFO(log, "ALTER-ing parts");
int changed_parts = 0;
if (!changed)
if (!changed_version)
parts = data.getDataParts();
auto table_lock = lockStructure(false);
for (const MergeTreeData::DataPartPtr & part : parts)
{
/// Обновим кусок и запишем результат во временные файлы.
......@@ -1623,10 +1640,17 @@ void StorageReplicatedMergeTree::alterThread()
}
}
/// Список столбцов для конкретной реплики.
zookeeper->set(replica_path + "/columns", columns.toString());
if (changed || changed_parts != 0)
LOG_INFO(log, "ALTER-ed " << changed_parts << " parts");
if (changed_version)
{
if (changed_parts != 0)
LOG_INFO(log, "ALTER-ed " << changed_parts << " parts");
else
LOG_INFO(log, "No parts ALTER-ed");
}
force_recheck_parts = false;
}
......@@ -1642,7 +1666,7 @@ void StorageReplicatedMergeTree::alterThread()
}
}
LOG_DEBUG(log, "alter thread finished");
LOG_DEBUG(log, "Alter thread finished");
}
void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_name)
......@@ -2284,6 +2308,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
/// Подпишемся на изменения столбцов, чтобы перестать ждать, если кто-то еще сделает ALTER.
if (!zookeeper->exists(zookeeper_path + "/columns", &stat, alter_query_event))
throw Exception(zookeeper_path + "/columns doesn't exist", ErrorCodes::NOT_FOUND_NODE);
if (stat.version != new_columns_version)
{
LOG_WARNING(log, zookeeper_path + "/columns changed before this ALTER finished; "
......@@ -2314,6 +2339,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
if (!zookeeper->exists(zookeeper_path + "/columns", &stat))
throw Exception(zookeeper_path + "/columns doesn't exist", ErrorCodes::NOT_FOUND_NODE);
if (stat.version != new_columns_version)
{
LOG_WARNING(log, zookeeper_path + "/columns changed before ALTER finished; "
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册