提交 2a85a1d2 编写于 作者: A Alexey Milovidov

Added setting 'replicated_alter_columns_timeout' [#METR-20816].

上级 11df87ea
...@@ -80,6 +80,8 @@ struct Settings ...@@ -80,6 +80,8 @@ struct Settings
\ \
/** Ожидать выполнения действий по манипуляции с партициями. 0 - не ждать, 1 - ждать выполнения только у себя, 2 - ждать всех. */ \ /** Ожидать выполнения действий по манипуляции с партициями. 0 - не ждать, 1 - ждать выполнения только у себя, 2 - ждать всех. */ \
M(SettingUInt64, replication_alter_partitions_sync, 1) \ M(SettingUInt64, replication_alter_partitions_sync, 1) \
/** Ожидать выполнения действий по изменению структуры таблицы в течение указанного количества секунд. 0 - ждать неограниченное время. */ \
M(SettingUInt64, replication_alter_columns_timeout, 60) \
\ \
M(SettingLoadBalancing, load_balancing, LoadBalancing::RANDOM) \ M(SettingLoadBalancing, load_balancing, LoadBalancing::RANDOM) \
\ \
......
...@@ -344,6 +344,7 @@ namespace ErrorCodes ...@@ -344,6 +344,7 @@ namespace ErrorCodes
extern const int NO_SUCH_BARRIER = 338; extern const int NO_SUCH_BARRIER = 338;
extern const int RESHARDING_ILL_FORMED_LOG = 339; extern const int RESHARDING_ILL_FORMED_LOG = 339;
extern const int NO_ZOOKEEPER_ACCESSOR = 340; extern const int NO_ZOOKEEPER_ACCESSOR = 340;
extern const int UNFINISHED = 341;
extern const int KEEPER_EXCEPTION = 999; extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000; extern const int POCO_EXCEPTION = 1000;
......
...@@ -85,6 +85,7 @@ namespace ErrorCodes ...@@ -85,6 +85,7 @@ namespace ErrorCodes
extern const int NO_SUCH_BARRIER; extern const int NO_SUCH_BARRIER;
extern const int CHECKSUM_DOESNT_MATCH; extern const int CHECKSUM_DOESNT_MATCH;
extern const int BAD_SIZE_OF_FILE_IN_DATA_PART; extern const int BAD_SIZE_OF_FILE_IN_DATA_PART;
extern const int UNFINISHED;
} }
...@@ -2189,12 +2190,28 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params, ...@@ -2189,12 +2190,28 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
} }
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas"); Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
std::set<String> inactive_replicas;
std::set<String> timed_out_replicas;
time_t replication_alter_columns_timeout = context.getSettingsRef().replication_alter_columns_timeout;
for (const String & replica : replicas) for (const String & replica : replicas)
{ {
LOG_DEBUG(log, "Waiting for " << replica << " to apply changes"); LOG_DEBUG(log, "Waiting for " << replica << " to apply changes");
while (!shutdown_called) while (!shutdown_called)
{ {
/// Реплика может быть неактивной.
if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
{
LOG_WARNING(log, "Replica " << replica << " is not active during ALTER query."
" ALTER will be done asynchronously when replica becomes active.");
inactive_replicas.emplace(replica);
break;
}
String replica_columns_str; String replica_columns_str;
/// Реплику могли успеть удалить. /// Реплику могли успеть удалить.
...@@ -2228,11 +2245,58 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params, ...@@ -2228,11 +2245,58 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
if (stat.version != replica_columns_version) if (stat.version != replica_columns_version)
continue; continue;
if (!replication_alter_columns_timeout)
{
alter_query_event->wait(); alter_query_event->wait();
/// Всё Ок.
}
else if (alter_query_event->tryWait(replication_alter_columns_timeout * 1000))
{
/// Всё Ок.
}
else
{
LOG_WARNING(log, "Timeout when waiting for replica " << replica << " to apply ALTER."
" ALTER will be done asynchronously.");
timed_out_replicas.emplace(replica);
}
} }
if (shutdown_called) if (shutdown_called)
break; throw Exception("Alter is not finished because table shutdown was called. Alter will be done after table restart.",
ErrorCodes::UNFINISHED);
if (!inactive_replicas.empty() || !timed_out_replicas.empty())
{
std::stringstream exception_message;
exception_message << "Alter is not finished because";
if (!inactive_replicas.empty())
{
exception_message << " some replicas are inactive right now";
for (auto it = inactive_replicas.begin(); it != inactive_replicas.end(); ++it)
exception_message << (it == inactive_replicas.begin() ? ": " : ", ") << *it;
}
if (!timed_out_replicas.empty() && !inactive_replicas.empty())
exception_message << " and";
if (!timed_out_replicas.empty())
{
exception_message << " timeout when waiting for some replicas";
for (auto it = timed_out_replicas.begin(); it != timed_out_replicas.end(); ++it)
exception_message << (it == timed_out_replicas.begin() ? ": " : ", ") << *it;
exception_message << " (replication_alter_columns_timeout = " << replication_alter_columns_timeout << ")";
}
exception_message << ". Alter will be done asynchronously.";
throw Exception(exception_message.str(), ErrorCodes::UNFINISHED);
}
} }
LOG_DEBUG(log, "ALTER finished"); LOG_DEBUG(log, "ALTER finished");
...@@ -3034,6 +3098,7 @@ void StorageReplicatedMergeTree::freezePartition(const Field & partition, const ...@@ -3034,6 +3098,7 @@ void StorageReplicatedMergeTree::freezePartition(const Field & partition, const
unreplicated_data->freezePartition(prefix); unreplicated_data->freezePartition(prefix);
} }
void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String & database_name, void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String & database_name,
const Field & first_partition, const Field & last_partition, const Field & first_partition, const Field & last_partition,
const WeightedZooKeeperPaths & weighted_zookeeper_paths, const WeightedZooKeeperPaths & weighted_zookeeper_paths,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册