未验证 提交 b9c7ecfa 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #8439 from ClickHouse/refactor_alter

[WIP] Refactor alter
......@@ -175,10 +175,7 @@ ASTPtr DatabaseLazy::getCreateDatabaseQuery(const Context & context) const
void DatabaseLazy::alterTable(
const Context & /* context */,
const String & /* table_name */,
const ColumnsDescription & /* columns */,
const IndicesDescription & /* indices */,
const ConstraintsDescription & /* constraints */,
const ASTModifier & /* storage_modifier */)
const StorageInMemoryMetadata & /* metadata */)
{
SCOPE_EXIT({ clearExpiredTables(); });
throw Exception("ALTER query is not supported for Lazy database.", ErrorCodes::UNSUPPORTED_METHOD);
......
......@@ -55,10 +55,7 @@ public:
void alterTable(
const Context & context,
const String & name,
const ColumnsDescription & columns,
const IndicesDescription & indices,
const ConstraintsDescription & constraints,
const ASTModifier & engine_modifier) override;
const StorageInMemoryMetadata & metadata) override;
time_t getObjectMetadataModificationTime(
const Context & context,
......
......@@ -20,9 +20,12 @@
#include <Dictionaries/DictionaryFactory.h>
#include <Parsers/parseQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/ASTSetQuery.h>
#include <Storages/IStorage.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Parsers/queryToString.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/Event.h>
#include <Common/Stopwatch.h>
......@@ -315,10 +318,7 @@ ASTPtr DatabaseOrdinary::getCreateDatabaseQuery(const Context & context) const
void DatabaseOrdinary::alterTable(
const Context & context,
const String & table_name,
const ColumnsDescription & columns,
const IndicesDescription & indices,
const ConstraintsDescription & constraints,
const ASTModifier & storage_modifier)
const StorageInMemoryMetadata & metadata)
{
/// Read the definition of the table and replace the necessary parts with new ones.
......@@ -338,19 +338,30 @@ void DatabaseOrdinary::alterTable(
const auto & ast_create_query = ast->as<ASTCreateQuery &>();
ASTPtr new_columns = InterpreterCreateQuery::formatColumns(columns);
ASTPtr new_indices = InterpreterCreateQuery::formatIndices(indices);
ASTPtr new_constraints = InterpreterCreateQuery::formatConstraints(constraints);
ASTPtr new_columns = InterpreterCreateQuery::formatColumns(metadata.columns);
ASTPtr new_indices = InterpreterCreateQuery::formatIndices(metadata.indices);
ASTPtr new_constraints = InterpreterCreateQuery::formatConstraints(metadata.constraints);
ast_create_query.columns_list->replace(ast_create_query.columns_list->columns, new_columns);
ast_create_query.columns_list->setOrReplace(ast_create_query.columns_list->indices, new_indices);
ast_create_query.columns_list->setOrReplace(ast_create_query.columns_list->constraints, new_constraints);
if (storage_modifier)
storage_modifier(*ast_create_query.storage);
ASTStorage & storage_ast = *ast_create_query.storage;
/// ORDER BY may change, but cannot appear, it's required construction
if (metadata.order_by_ast && storage_ast.order_by)
storage_ast.set(storage_ast.order_by, metadata.order_by_ast);
statement = getObjectDefinitionFromCreateQuery(ast);
if (metadata.primary_key_ast)
storage_ast.set(storage_ast.primary_key, metadata.primary_key_ast);
if (metadata.ttl_for_table_ast)
storage_ast.set(storage_ast.ttl_table, metadata.ttl_for_table_ast);
if (metadata.settings_ast)
storage_ast.set(storage_ast.settings, metadata.settings_ast);
statement = getObjectDefinitionFromCreateQuery(ast);
{
WriteBufferFromFile out(table_metadata_tmp_path, statement.size(), O_WRONLY | O_CREAT | O_EXCL);
writeString(statement, out);
......
......@@ -51,10 +51,7 @@ public:
void alterTable(
const Context & context,
const String & name,
const ColumnsDescription & columns,
const IndicesDescription & indices,
const ConstraintsDescription & constraints,
const ASTModifier & engine_modifier) override;
const StorageInMemoryMetadata & metadata) override;
time_t getObjectMetadataModificationTime(
const Context & context,
......
......@@ -3,6 +3,7 @@
#include <Core/Types.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Dictionaries/IDictionary.h>
#include <Common/Exception.h>
......@@ -192,12 +193,9 @@ public:
virtual void alterTable(
const Context & /*context*/,
const String & /*name*/,
const ColumnsDescription & /*columns*/,
const IndicesDescription & /*indices*/,
const ConstraintsDescription & /*constraints*/,
const ASTModifier & /*engine_modifier*/)
const StorageInMemoryMetadata & /*metadata*/)
{
throw Exception(getEngineName() + ": renameTable() is not supported", ErrorCodes::NOT_IMPLEMENTED);
throw Exception(getEngineName() + ": alterTable() is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
/// Returns time of table's metadata change, 0 if there is no corresponding metadata file.
......
......@@ -103,7 +103,9 @@ BlockIO InterpreterAlterQuery::execute()
if (!alter_commands.empty())
{
auto table_lock_holder = table->lockAlterIntention(context.getCurrentQueryId());
alter_commands.validate(*table, context);
StorageInMemoryMetadata metadata = table->getInMemoryMetadata();
alter_commands.validate(metadata, context);
alter_commands.prepare(metadata, context);
table->alter(alter_commands, context, table_lock_holder);
}
......
此差异已折叠。
......@@ -2,10 +2,9 @@
#include <optional>
#include <Core/NamesAndTypes.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/IndicesDescription.h>
#include <Storages/ConstraintsDescription.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Common/SettingsChanges.h>
......@@ -31,11 +30,10 @@ struct AlterCommand
ADD_CONSTRAINT,
DROP_CONSTRAINT,
MODIFY_TTL,
UKNOWN_TYPE,
MODIFY_SETTING,
};
Type type = UKNOWN_TYPE;
Type type;
String column_name;
......@@ -43,10 +41,12 @@ struct AlterCommand
String partition_name;
/// For ADD and MODIFY, a new column type.
DataTypePtr data_type;
DataTypePtr data_type = nullptr;
ColumnDefaultKind default_kind{};
ASTPtr default_expression{};
/// For COMMENT column
std::optional<String> comment;
/// For ADD - after which column to add a new one. If an empty string, add to the end. To add to the beginning now it is impossible.
......@@ -59,48 +59,36 @@ struct AlterCommand
bool if_not_exists = false;
/// For MODIFY_ORDER_BY
ASTPtr order_by;
ASTPtr order_by = nullptr;
/// For ADD INDEX
ASTPtr index_decl;
ASTPtr index_decl = nullptr;
String after_index_name;
/// For ADD/DROP INDEX
String index_name;
// For ADD CONSTRAINT
ASTPtr constraint_decl;
ASTPtr constraint_decl = nullptr;
// For ADD/DROP CONSTRAINT
String constraint_name;
/// For MODIFY TTL
ASTPtr ttl;
ASTPtr ttl = nullptr;
/// indicates that this command should not be applied, for example in case of if_exists=true and column doesn't exist.
bool ignore = false;
/// For ADD and MODIFY
CompressionCodecPtr codec;
CompressionCodecPtr codec = nullptr;
/// For MODIFY SETTING
SettingsChanges settings_changes;
AlterCommand() = default;
AlterCommand(const Type type_, const String & column_name_, const DataTypePtr & data_type_,
const ColumnDefaultKind default_kind_, const ASTPtr & default_expression_,
const String & after_column_, const String & comment_,
const bool if_exists_, const bool if_not_exists_)
: type{type_}, column_name{column_name_}, data_type{data_type_}, default_kind{default_kind_},
default_expression{default_expression_}, comment(comment_), after_column{after_column_},
if_exists(if_exists_), if_not_exists(if_not_exists_)
{}
static std::optional<AlterCommand> parse(const ASTAlterCommand * command);
void apply(ColumnsDescription & columns_description, IndicesDescription & indices_description,
ConstraintsDescription & constraints_description, ASTPtr & order_by_ast,
ASTPtr & primary_key_ast, ASTPtr & ttl_table_ast, SettingsChanges & changes) const;
void apply(StorageInMemoryMetadata & metadata) const;
/// Checks that alter query changes data. For MergeTree:
/// * column files (data and marks)
......@@ -110,25 +98,31 @@ struct AlterCommand
/// checks that only settings changed by alter
bool isSettingsAlter() const;
bool isCommentAlter() const;
};
String alterTypeToString(const AlterCommand::Type type);
class Context;
class AlterCommands : public std::vector<AlterCommand>
{
private:
bool prepared = false;
public:
/// Used for primitive table engines, where only columns metadata can be changed
void applyForColumnsOnly(ColumnsDescription & columns_description) const;
void apply(ColumnsDescription & columns_description, IndicesDescription & indices_description,
ConstraintsDescription & constraints_description, ASTPtr & order_by_ast, ASTPtr & primary_key_ast,
ASTPtr & ttl_table_ast, SettingsChanges & changes) const;
void apply(StorageInMemoryMetadata & metadata) const;
/// Apply alter commands only for settings. Exception will be thrown if any other part of table structure will be modified.
void applyForSettingsOnly(SettingsChanges & changes) const;
void validate(const IStorage & table, const Context & context);
void prepare(const StorageInMemoryMetadata & metadata, const Context & context);
void validate(const StorageInMemoryMetadata & metadata, const Context & context) const;
bool isModifyingData() const;
bool isSettingsAlter() const;
bool isCommentAlter() const;
};
}
#pragma once
#include <Core/Types.h>
#include <optional>
#include <Parsers/IAST.h>
#include <Common/SettingsChanges.h>
namespace DB
{
class AlterMetadataCommand
{
enum Type
{
COMMENT_COLUMN,
MODIFY_ORDER_BY,
ADD_INDEX,
ADD_CONSTRAINT,
DROP_CONSTRAINT,
MODIFY_TTL,
MODIFY_SETTING,
};
Type type;
String column_name;
std::optional<String> comment;
/// For DROP_COLUMN, MODIFY_COLUMN, COMMENT_COLUMN
bool if_exists = false;
/// For MODIFY_ORDER_BY
ASTPtr order_by;
/// For ADD INDEX
ASTPtr index_decl;
String after_index_name;
/// For ADD/DROP INDEX
String index_name;
// For ADD CONSTRAINT
ASTPtr constraint_decl;
// For ADD/DROP CONSTRAINT
String constraint_name;
/// For MODIFY TTL
ASTPtr ttl;
/// indicates that this command should not be applied, for example in case of if_exists=true and column doesn't exist.
bool ignore = false;
/// For MODIFY SETTING
SettingsChanges settings_changes;
};
class AlterMetadataCommands : public std::vector<AlterMetadataCommand>
{
public:
void apply(
IndicesDescription & indices_description,
ConstraintsDescription & constraints_description,
ASTPtr & order_by_ast,
ASTPtr & primary_key_ast,
ASTPtr & ttl_table_ast,
SettingsChanges & changes) const;
/// Apply alter commands only for settings. Exception will be thrown if any other part of table structure will be modified.
void applyForSettings(SettingsChanges & changes) const;
void validate(const IStorage & table, const Context & context);
bool isModifyingData() const;
bool isSettingsAlter() const;
};
}
......@@ -27,6 +27,7 @@ namespace ErrorCodes
extern const int SETTINGS_ARE_NOT_SUPPORTED;
extern const int UNKNOWN_SETTING;
extern const int TABLE_IS_DROPPED;
extern const int NOT_IMPLEMENTED;
}
IStorage::IStorage(ColumnsDescription virtuals_) : virtuals(std::move(virtuals_))
......@@ -313,12 +314,6 @@ bool IStorage::isVirtualColumn(const String & column_name) const
return getColumns().get(column_name).is_virtual;
}
void IStorage::checkSettingCanBeChanged(const String & /* setting_name */) const
{
if (!supportsSettings())
throw Exception("Storage '" + getName() + "' doesn't support settings.", ErrorCodes::SETTINGS_ARE_NOT_SUPPORTED);
}
TableStructureReadLockHolder IStorage::lockStructureForShare(bool will_add_new_data, const String & query_id)
{
TableStructureReadLockHolder result;
......@@ -373,57 +368,40 @@ TableStructureWriteLockHolder IStorage::lockExclusively(const String & query_id)
return result;
}
IDatabase::ASTModifier IStorage::getSettingsModifier(const SettingsChanges & new_changes) const
StorageInMemoryMetadata IStorage::getInMemoryMetadata() const
{
return [&] (IAST & ast)
return
{
if (!new_changes.empty())
{
auto & storage_changes = ast.as<ASTStorage &>().settings->changes;
/// Make storage settings unique
for (const auto & change : new_changes)
{
checkSettingCanBeChanged(change.name);
auto finder = [&change] (const SettingChange & c) { return c.name == change.name; };
if (auto it = std::find_if(storage_changes.begin(), storage_changes.end(), finder); it != storage_changes.end())
it->value = change.value;
else
storage_changes.push_back(change);
}
}
.columns = getColumns(),
.indices = getIndices(),
.constraints = getConstraints(),
};
}
void IStorage::alter(
const AlterCommands & params,
const Context & context,
TableStructureWriteLockHolder & table_lock_holder)
TableStructureWriteLockHolder & /*table_lock_holder*/)
{
if (params.isModifyingData())
throw Exception("Method alter supports only change comment of column for storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
checkAlterIsPossible(params, context.getSettingsRef());
const String database_name = getDatabaseName();
const String table_name = getTableName();
StorageInMemoryMetadata metadata = getInMemoryMetadata();
params.apply(metadata);
context.getDatabase(database_name)->alterTable(context, table_name, metadata);
setColumns(std::move(metadata.columns));
}
if (params.isSettingsAlter())
{
SettingsChanges new_changes;
params.applyForSettingsOnly(new_changes);
IDatabase::ASTModifier settings_modifier = getSettingsModifier(new_changes);
context.getDatabase(database_name)->alterTable(context, table_name, getColumns(), getIndices(), getConstraints(), settings_modifier);
}
else
void IStorage::checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */)
{
for (const auto & command : commands)
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
auto new_columns = getColumns();
auto new_indices = getIndices();
auto new_constraints = getConstraints();
params.applyForColumnsOnly(new_columns);
context.getDatabase(database_name)->alterTable(context, table_name, new_columns, new_indices, new_constraints, {});
setColumns(std::move(new_columns));
if (!command.isCommentAlter())
throw Exception(
"Alter of type '" + alterTypeToString(command.type) + "' is not supported by storage " + getName(),
ErrorCodes::NOT_IMPLEMENTED);
}
}
......
......@@ -13,6 +13,7 @@
#include <Storages/ColumnsDescription.h>
#include <Storages/IndicesDescription.h>
#include <Storages/ConstraintsDescription.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Common/ActionLock.h>
#include <Common/Exception.h>
#include <Common/RWLock.h>
......@@ -127,6 +128,10 @@ public: /// thread-unsafe part. lockStructure must be acquired
const ConstraintsDescription & getConstraints() const;
void setConstraints(ConstraintsDescription constraints_);
/// Returns storage metadata copy. Direct modification of
/// result structure doesn't affect storage.
virtual StorageInMemoryMetadata getInMemoryMetadata() const;
/// NOTE: these methods should include virtual columns,
/// but should NOT include ALIAS columns (they are treated separately).
virtual NameAndTypePair getColumn(const String & column_name) const;
......@@ -152,9 +157,6 @@ public: /// thread-unsafe part. lockStructure must be acquired
/// If |need_all| is set, then checks that all the columns of the table are in the block.
void check(const Block & block, bool need_all = false) const;
/// Check storage has setting and setting can be modified.
virtual void checkSettingCanBeChanged(const String & setting_name) const;
protected: /// still thread-unsafe part.
void setIndices(IndicesDescription indices_);
......@@ -162,8 +164,6 @@ protected: /// still thread-unsafe part.
/// Initially reserved virtual column name may be shadowed by real column.
virtual bool isVirtualColumn(const String & column_name) const;
/// Returns modifier of settings in storage definition
IDatabase::ASTModifier getSettingsModifier(const SettingsChanges & new_changes) const;
private:
ColumnsDescription columns; /// combined real and virtual columns
......@@ -316,6 +316,11 @@ public:
*/
virtual void alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder);
/** Checks that alter commands can be applied to storage. For example, columns can be modified,
* or primary key can be changes, etc.
*/
virtual void checkAlterIsPossible(const AlterCommands & commands, const Settings & settings);
/** ALTER tables with regard to its partitions.
* Should handle locks for each command on its own.
*/
......
......@@ -418,15 +418,6 @@ bool StorageKafka::streamToViews()
return limits_applied;
}
void StorageKafka::checkSettingCanBeChanged(const String & setting_name) const
{
if (KafkaSettings::findIndex(setting_name) == KafkaSettings::npos)
throw Exception{"Storage '" + getName() + "' doesn't have setting '" + setting_name + "'", ErrorCodes::UNKNOWN_SETTING};
throw Exception{"Setting '" + setting_name + "' is readonly for storage '" + getName() + "'", ErrorCodes::READONLY_SETTING};
}
void registerStorageKafka(StorageFactory & factory)
{
factory.registerStorage("Kafka", [](const StorageFactory::Arguments & args)
......
......@@ -64,8 +64,6 @@ public:
const auto & getSchemaName() const { return schema_name; }
const auto & skipBroken() const { return skip_broken; }
void checkSettingCanBeChanged(const String & setting_name) const override;
protected:
StorageKafka(
const std::string & table_name_,
......
......@@ -114,16 +114,9 @@ MergeTreeData::MergeTreeData(
const String & database_,
const String & table_,
const String & relative_data_path_,
const ColumnsDescription & columns_,
const IndicesDescription & indices_,
const ConstraintsDescription & constraints_,
const StorageInMemoryMetadata & metadata,
Context & context_,
const String & date_column_name,
const ASTPtr & partition_by_ast_,
const ASTPtr & order_by_ast_,
const ASTPtr & primary_key_ast_,
const ASTPtr & sample_by_ast_,
const ASTPtr & ttl_table_ast_,
const MergingParams & merging_params_,
std::unique_ptr<MergeTreeSettings> storage_settings_,
bool require_part_metadata_,
......@@ -131,8 +124,9 @@ MergeTreeData::MergeTreeData(
BrokenPartCallback broken_part_callback_)
: global_context(context_)
, merging_params(merging_params_)
, partition_by_ast(partition_by_ast_)
, sample_by_ast(sample_by_ast_)
, partition_by_ast(metadata.partition_by_ast)
, sample_by_ast(metadata.sample_by_ast)
, settings_ast(metadata.settings_ast)
, require_part_metadata(require_part_metadata_)
, database_name(database_)
, table_name(table_)
......@@ -147,7 +141,7 @@ MergeTreeData::MergeTreeData(
, parts_mover(this)
{
const auto settings = getSettings();
setProperties(order_by_ast_, primary_key_ast_, columns_, indices_, constraints_);
setProperties(metadata);
/// NOTE: using the same columns list as is read when performing actual merges.
merging_params.check(getColumns().getAllPhysical());
......@@ -189,7 +183,7 @@ MergeTreeData::MergeTreeData(
min_format_version = MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING;
}
setTTLExpressions(columns_.getColumnTTLs(), ttl_table_ast_);
setTTLExpressions(metadata.columns.getColumnTTLs(), metadata.ttl_for_table_ast);
// format_file always contained on any data path
String version_file_path;
......@@ -244,6 +238,35 @@ MergeTreeData::MergeTreeData(
}
StorageInMemoryMetadata MergeTreeData::getInMemoryMetadata() const
{
StorageInMemoryMetadata metadata{
.columns = getColumns(),
.indices = getIndices(),
.constraints = getConstraints(),
};
if (partition_by_ast)
metadata.partition_by_ast = partition_by_ast->clone();
if (order_by_ast)
metadata.order_by_ast = order_by_ast->clone();
if (primary_key_ast)
metadata.primary_key_ast = primary_key_ast->clone();
if (ttl_table_ast)
metadata.ttl_for_table_ast = ttl_table_ast->clone();
if (sample_by_ast)
metadata.sample_by_ast = sample_by_ast->clone();
if (settings_ast)
metadata.settings_ast = settings_ast->clone();
return metadata;
}
static void checkKeyExpression(const ExpressionActions & expr, const Block & sample_block, const String & key_name)
{
for (const ExpressionAction & action : expr.getActions())
......@@ -272,18 +295,14 @@ static void checkKeyExpression(const ExpressionActions & expr, const Block & sam
}
}
void MergeTreeData::setProperties(
const ASTPtr & new_order_by_ast, const ASTPtr & new_primary_key_ast,
const ColumnsDescription & new_columns, const IndicesDescription & indices_description,
const ConstraintsDescription & constraints_description, bool only_check)
void MergeTreeData::setProperties(const StorageInMemoryMetadata & metadata, bool only_check)
{
if (!new_order_by_ast)
if (!metadata.order_by_ast)
throw Exception("ORDER BY cannot be empty", ErrorCodes::BAD_ARGUMENTS);
ASTPtr new_sorting_key_expr_list = extractKeyExpressionList(new_order_by_ast);
ASTPtr new_primary_key_expr_list = new_primary_key_ast
? extractKeyExpressionList(new_primary_key_ast) : new_sorting_key_expr_list->clone();
ASTPtr new_sorting_key_expr_list = extractKeyExpressionList(metadata.order_by_ast);
ASTPtr new_primary_key_expr_list = metadata.primary_key_ast
? extractKeyExpressionList(metadata.primary_key_ast) : new_sorting_key_expr_list->clone();
if (merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing)
new_sorting_key_expr_list->children.push_back(std::make_shared<ASTIdentifier>(merging_params.version_column));
......@@ -315,8 +334,9 @@ void MergeTreeData::setProperties(
}
}
auto all_columns = new_columns.getAllPhysical();
auto all_columns = metadata.columns.getAllPhysical();
/// Order by check AST
if (order_by_ast && only_check)
{
/// This is ALTER, not CREATE/ATTACH TABLE. Let us check that all new columns used in the sorting key
......@@ -352,7 +372,7 @@ void MergeTreeData::setProperties(
"added to the sorting key. You can add expressions that use only the newly added columns",
ErrorCodes::BAD_ARGUMENTS);
if (new_columns.getDefaults().count(col))
if (metadata.columns.getDefaults().count(col))
throw Exception("Newly added column " + col + " has a default expression, so adding "
"expressions that use it to the sorting key is forbidden",
ErrorCodes::BAD_ARGUMENTS);
......@@ -387,11 +407,11 @@ void MergeTreeData::setProperties(
MergeTreeIndices new_indices;
if (!indices_description.indices.empty())
if (!metadata.indices.indices.empty())
{
std::set<String> indices_names;
for (const auto & index_ast : indices_description.indices)
for (const auto & index_ast : metadata.indices.indices)
{
const auto & index_decl = std::dynamic_pointer_cast<ASTIndexDeclaration>(index_ast);
......@@ -428,24 +448,24 @@ void MergeTreeData::setProperties(
if (!only_check)
{
setColumns(std::move(new_columns));
setColumns(std::move(metadata.columns));
order_by_ast = new_order_by_ast;
order_by_ast = metadata.order_by_ast;
sorting_key_columns = std::move(new_sorting_key_columns);
sorting_key_expr_ast = std::move(new_sorting_key_expr_list);
sorting_key_expr = std::move(new_sorting_key_expr);
primary_key_ast = new_primary_key_ast;
primary_key_ast = metadata.primary_key_ast;
primary_key_columns = std::move(new_primary_key_columns);
primary_key_expr_ast = std::move(new_primary_key_expr_list);
primary_key_expr = std::move(new_primary_key_expr);
primary_key_sample = std::move(new_primary_key_sample);
primary_key_data_types = std::move(new_primary_key_data_types);
setIndices(indices_description);
setIndices(metadata.indices);
skip_indices = std::move(new_indices);
setConstraints(constraints_description);
setConstraints(metadata.constraints);
primary_key_and_skip_indices_expr = new_indices_with_primary_key_expr;
sorting_key_and_skip_indices_expr = new_indices_with_sorting_key_expr;
......@@ -1357,17 +1377,15 @@ bool isMetadataOnlyConversion(const IDataType * from, const IDataType * to)
}
void MergeTreeData::checkAlter(const AlterCommands & commands, const Context & /*context*/)
void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, const Settings & settings)
{
/// Check that needed transformations can be applied to the list of columns without considering type conversions.
auto new_columns = getColumns();
auto new_indices = getIndices();
auto new_constraints = getConstraints();
ASTPtr new_order_by_ast = order_by_ast;
ASTPtr new_primary_key_ast = primary_key_ast;
ASTPtr new_ttl_table_ast = ttl_table_ast;
SettingsChanges new_changes;
commands.apply(new_columns, new_indices, new_constraints, new_order_by_ast, new_primary_key_ast, new_ttl_table_ast, new_changes);
StorageInMemoryMetadata metadata = getInMemoryMetadata();
commands.apply(metadata);
if (getIndices().empty() && !metadata.indices.empty() &&
!settings.allow_experimental_data_skipping_indices)
throw Exception("You must set the setting `allow_experimental_data_skipping_indices` to 1 " \
"before using data skipping indices.", ErrorCodes::BAD_ARGUMENTS);
/// Set of columns that shouldn't be altered.
NameSet columns_alter_type_forbidden;
......@@ -1436,13 +1454,32 @@ void MergeTreeData::checkAlter(const AlterCommands & commands, const Context & /
}
}
setProperties(new_order_by_ast, new_primary_key_ast,
new_columns, new_indices, new_constraints, /* only_check = */ true);
setProperties(metadata, /* only_check = */ true);
setTTLExpressions(new_columns.getColumnTTLs(), new_ttl_table_ast, /* only_check = */ true);
setTTLExpressions(metadata.columns.getColumnTTLs(), metadata.ttl_for_table_ast, /* only_check = */ true);
for (const auto & setting : new_changes)
checkSettingCanBeChanged(setting.name);
if (settings_ast)
{
const auto & current_changes = settings_ast->as<const ASTSetQuery &>().changes;
for (const auto & changed_setting : metadata.settings_ast->as<const ASTSetQuery &>().changes)
{
if (MergeTreeSettings::findIndex(changed_setting.name) == MergeTreeSettings::npos)
throw Exception{"Storage '" + getName() + "' doesn't have setting '" + changed_setting.name + "'",
ErrorCodes::UNKNOWN_SETTING};
auto comparator = [&changed_setting](const auto & change) { return change.name == changed_setting.name; };
auto current_setting_it
= std::find_if(current_changes.begin(), current_changes.end(), comparator);
if ((current_setting_it == current_changes.end() || *current_setting_it != changed_setting)
&& MergeTreeSettings::isReadonlySetting(changed_setting.name))
{
throw Exception{"Setting '" + changed_setting.name + "' is readonly for storage '" + getName() + "'",
ErrorCodes::READONLY_SETTING};
}
}
}
if (commands.isModifyingData())
{
......@@ -1450,8 +1487,8 @@ void MergeTreeData::checkAlter(const AlterCommands & commands, const Context & /
ExpressionActionsPtr unused_expression;
NameToNameMap unused_map;
bool unused_bool;
createConvertExpression(nullptr, getColumns().getAllPhysical(), new_columns.getAllPhysical(),
getIndices().indices, new_indices.indices, unused_expression, unused_map, unused_bool);
createConvertExpression(nullptr, getColumns().getAllPhysical(), metadata.columns.getAllPhysical(),
getIndices().indices, metadata.indices.indices, unused_expression, unused_map, unused_bool);
}
}
......@@ -1767,26 +1804,19 @@ void MergeTreeData::alterDataPart(
}
void MergeTreeData::changeSettings(
const SettingsChanges & new_changes,
const ASTPtr & new_settings,
TableStructureWriteLockHolder & /* table_lock_holder */)
{
if (!new_changes.empty())
if (new_settings)
{
const auto & new_changes = new_settings->as<const ASTSetQuery &>().changes;
MergeTreeSettings copy = *getSettings();
copy.applyChanges(new_changes);
storage_settings.set(std::make_unique<const MergeTreeSettings>(copy));
settings_ast = new_settings;
}
}
void MergeTreeData::checkSettingCanBeChanged(const String & setting_name) const
{
if (MergeTreeSettings::findIndex(setting_name) == MergeTreeSettings::npos)
throw Exception{"Storage '" + getName() + "' doesn't have setting '" + setting_name + "'", ErrorCodes::UNKNOWN_SETTING};
if (MergeTreeSettings::isReadonlySetting(setting_name))
throw Exception{"Setting '" + setting_name + "' is readonly for storage '" + getName() + "'", ErrorCodes::READONLY_SETTING};
}
void MergeTreeData::removeEmptyColumnsFromPart(MergeTreeData::MutableDataPartPtr & data_part)
{
auto & empty_columns = data_part->empty_columns;
......
......@@ -332,22 +332,17 @@ public:
/// attach - whether the existing table is attached or the new table is created.
MergeTreeData(const String & database_, const String & table_,
const String & relative_data_path_,
const ColumnsDescription & columns_,
const IndicesDescription & indices_,
const ConstraintsDescription & constraints_,
const StorageInMemoryMetadata & metadata,
Context & context_,
const String & date_column_name,
const ASTPtr & partition_by_ast_,
const ASTPtr & order_by_ast_,
const ASTPtr & primary_key_ast_,
const ASTPtr & sample_by_ast_, /// nullptr, if sampling is not supported.
const ASTPtr & ttl_table_ast_,
const MergingParams & merging_params_,
std::unique_ptr<MergeTreeSettings> settings_,
bool require_part_metadata_,
bool attach,
BrokenPartCallback broken_part_callback_ = [](const String &){});
StorageInMemoryMetadata getInMemoryMetadata() const override;
ASTPtr getPartitionKeyAST() const override { return partition_by_ast; }
ASTPtr getSortingKeyAST() const override { return sorting_key_expr_ast; }
ASTPtr getPrimaryKeyAST() const override { return primary_key_expr_ast; }
......@@ -545,7 +540,7 @@ public:
/// - all type conversions can be done.
/// - columns corresponding to primary key, indices, sign, sampling expression and date are not affected.
/// If something is wrong, throws an exception.
void checkAlter(const AlterCommands & commands, const Context & context);
void checkAlterIsPossible(const AlterCommands & commands, const Settings & settings) override;
/// Performs ALTER of the data part, writes the result to temporary files.
/// Returns an object allowing to rename temporary files to permanent files.
......@@ -559,12 +554,9 @@ public:
/// Change MergeTreeSettings
void changeSettings(
const SettingsChanges & new_changes,
const ASTPtr & new_changes,
TableStructureWriteLockHolder & table_lock_holder);
/// All MergeTreeData children have settings.
void checkSettingCanBeChanged(const String & setting_name) const override;
/// Remove columns, that have been marked as empty after zeroing values with expired ttl
void removeEmptyColumnsFromPart(MergeTreeData::MutableDataPartPtr & data_part);
......@@ -787,6 +779,7 @@ protected:
ASTPtr primary_key_ast;
ASTPtr sample_by_ast;
ASTPtr ttl_table_ast;
ASTPtr settings_ast;
bool require_part_metadata;
......@@ -899,10 +892,7 @@ protected:
std::mutex clear_old_temporary_directories_mutex;
/// Mutex for settings usage
void setProperties(const ASTPtr & new_order_by_ast, const ASTPtr & new_primary_key_ast,
const ColumnsDescription & new_columns,
const IndicesDescription & indices_description,
const ConstraintsDescription & constraints_description, bool only_check = false);
void setProperties(const StorageInMemoryMetadata & metadata, bool only_check = false);
void initPartitionKey();
......
......@@ -34,6 +34,12 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr
merging_params_mode = static_cast<int>(data.merging_params.mode);
sign_column = data.merging_params.sign_column;
/// This code may looks strange, but previously we had only one entity: PRIMARY KEY (or ORDER BY, it doesn't matter)
/// Now we have two different entities ORDER BY and it's optional prefix -- PRIMARY KEY.
/// In most cases user doesn't specify PRIMARY KEY and semantically it's equal to ORDER BY.
/// So rules in zookeeper metadata is following:
/// - When we have only ORDER BY, than store it in "primary key:" row of /metadata
/// - When we have both, than store PRIMARY KEY in "primary key:" row and ORDER BY in "sorting key:" row of /metadata
if (!data.primary_key_ast)
primary_key = formattedAST(MergeTreeData::extractKeyExpressionList(data.order_by_ast));
else
......
......@@ -12,6 +12,7 @@
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/parseAggregateFunctionParameters.h>
......@@ -573,6 +574,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
ASTPtr primary_key_ast;
ASTPtr sample_by_ast;
ASTPtr ttl_table_ast;
ASTPtr settings_ast;
IndicesDescription indices_description;
ConstraintsDescription constraints_description;
......@@ -599,12 +601,16 @@ static StoragePtr create(const StorageFactory::Arguments & args)
if (args.storage_def->ttl_table)
ttl_table_ast = args.storage_def->ttl_table->ptr();
if (args.query.columns_list && args.query.columns_list->indices)
for (const auto & index : args.query.columns_list->indices->children)
indices_description.indices.push_back(
std::dynamic_pointer_cast<ASTIndexDeclaration>(index->clone()));
storage_settings->loadFromQuery(*args.storage_def);
if (args.storage_def->settings)
settings_ast = args.storage_def->settings->ptr();
}
else
{
......@@ -633,18 +639,30 @@ static StoragePtr create(const StorageFactory::Arguments & args)
ErrorCodes::BAD_ARGUMENTS);
}
if (!args.attach && !indices_description.empty() && !args.local_context.getSettingsRef().allow_experimental_data_skipping_indices)
throw Exception("You must set the setting `allow_experimental_data_skipping_indices` to 1 " \
"before using data skipping indices.", ErrorCodes::BAD_ARGUMENTS);
StorageInMemoryMetadata metadata{
.columns = args.columns,
.indices = indices_description,
.constraints = args.constraints,
.partition_by_ast = partition_by_ast,
.order_by_ast = order_by_ast,
.primary_key_ast = primary_key_ast,
.ttl_for_table_ast = ttl_table_ast,
.sample_by_ast = sample_by_ast,
.settings_ast = settings_ast,
};
if (replicated)
return StorageReplicatedMergeTree::create(
zookeeper_path, replica_name, args.attach, args.database_name, args.table_name, args.relative_data_path,
args.columns, indices_description, args.constraints,
args.context, date_column_name, partition_by_ast, order_by_ast, primary_key_ast,
sample_by_ast, ttl_table_ast, merging_params, std::move(storage_settings),
metadata, args.context, date_column_name, merging_params, std::move(storage_settings),
args.has_force_restore_data_flag);
else
return StorageMergeTree::create(
args.database_name, args.table_name, args.relative_data_path, args.columns, indices_description,
args.constraints, args.attach, args.context, date_column_name, partition_by_ast, order_by_ast,
primary_key_ast, sample_by_ast, ttl_table_ast, merging_params, std::move(storage_settings),
args.database_name, args.table_name, args.relative_data_path, metadata, args.attach, args.context,
date_column_name, merging_params, std::move(storage_settings),
args.has_force_restore_data_flag);
}
......
......@@ -699,23 +699,35 @@ void StorageBuffer::flushThread()
} while (!shutdown_event.tryWait(1000));
}
void StorageBuffer::checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */)
{
for (const auto & command : commands)
{
if (command.type != AlterCommand::Type::ADD_COLUMN && command.type != AlterCommand::Type::MODIFY_COLUMN
&& command.type != AlterCommand::Type::DROP_COLUMN && command.type != AlterCommand::Type::COMMENT_COLUMN)
throw Exception(
"Alter of type '" + alterTypeToString(command.type) + "' is not supported by storage " + getName(),
ErrorCodes::NOT_IMPLEMENTED);
}
}
void StorageBuffer::alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder)
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
checkAlterIsPossible(params, context.getSettingsRef());
const String database_name_ = getDatabaseName();
const String table_name_ = getTableName();
/// So that no blocks of the old structure remain.
optimize({} /*query*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/, context);
auto new_columns = getColumns();
auto new_indices = getIndices();
auto new_constraints = getConstraints();
params.applyForColumnsOnly(new_columns);
context.getDatabase(database_name_)->alterTable(context, table_name_, new_columns, new_indices, new_constraints, {});
setColumns(std::move(new_columns));
StorageInMemoryMetadata metadata = getInMemoryMetadata();
params.apply(metadata);
context.getDatabase(database_name_)->alterTable(context, table_name_, metadata);
setColumns(std::move(metadata.columns));
}
......
......@@ -94,9 +94,10 @@ public:
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const override;
/// The structure of the subordinate table is not checked and does not change.
void alter(
const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
void checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */) override;
/// The structure of the subordinate table is not checked and does not change.
void alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
~StorageBuffer() override;
......
......@@ -393,20 +393,33 @@ BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const Context & c
}
void StorageDistributed::alter(
const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder)
void StorageDistributed::checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */)
{
for (const auto & command : commands)
{
if (command.type != AlterCommand::Type::ADD_COLUMN
&& command.type != AlterCommand::Type::MODIFY_COLUMN
&& command.type != AlterCommand::Type::DROP_COLUMN
&& command.type != AlterCommand::Type::COMMENT_COLUMN)
throw Exception("Alter of type '" + alterTypeToString(command.type) + "' is not supported by storage " + getName(),
ErrorCodes::NOT_IMPLEMENTED);
}
}
void StorageDistributed::alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder)
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
checkAlterIsPossible(params, context.getSettingsRef());
const String current_database_name = getDatabaseName();
const String current_table_name = getTableName();
auto new_columns = getColumns();
auto new_indices = getIndices();
auto new_constraints = getConstraints();
params.applyForColumnsOnly(new_columns);
context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, new_constraints, {});
setColumns(std::move(new_columns));
StorageInMemoryMetadata metadata = getInMemoryMetadata();
params.apply(metadata);
context.getDatabase(current_database_name)->alterTable(context, current_table_name, metadata);
setColumns(std::move(metadata.columns));
}
......
......@@ -84,10 +84,12 @@ public:
void rename(const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
void checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */) override;
/// in the sub-tables, you need to manually add and delete columns
/// the structure of the sub-table is not checked
void alter(
const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
void alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
void startup() override;
void shutdown() override;
......
#pragma once
#include <Storages/ColumnsDescription.h>
#include <Storages/IndicesDescription.h>
#include <Storages/ConstraintsDescription.h>
#include <Parsers/IAST_fwd.h>
namespace DB
{
/// Structure represent table metadata stored in memory.
/// Only one storage engine support all fields -- MergeTree.
/// Complete table AST can be recreated from this struct.
struct StorageInMemoryMetadata
{
/// Columns of table with their names, types,
/// defaults, comments, etc. All table engines have columns.
ColumnsDescription columns;
/// Table indices. Currently supported for MergeTree only.
IndicesDescription indices;
/// Table constraints. Currently supported for MergeTree only.
ConstraintsDescription constraints;
/// PARTITION BY expression. Currently supported for MergeTree only.
ASTPtr partition_by_ast = nullptr;
/// ORDER BY expression. Required field for all MergeTree tables
/// even in old syntax MergeTree(partition_key, order_by, ...)
ASTPtr order_by_ast = nullptr;
/// PRIMARY KEY expression. If absent, than equal to order_by_ast.
ASTPtr primary_key_ast = nullptr;
/// TTL expression for whole table. Supported for MergeTree only.
ASTPtr ttl_for_table_ast = nullptr;
/// SAMPLE BY expression. Supported for MergeTree only.
ASTPtr sample_by_ast = nullptr;
/// SETTINGS expression. Supported for MergeTree, Buffer and Kafka.
ASTPtr settings_ast = nullptr;
};
}
......@@ -413,17 +413,27 @@ DatabaseTablesIteratorPtr StorageMerge::getDatabaseIterator(const Context & cont
}
void StorageMerge::checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */)
{
for (const auto & command : commands)
{
if (command.type != AlterCommand::Type::ADD_COLUMN && command.type != AlterCommand::Type::MODIFY_COLUMN
&& command.type != AlterCommand::Type::DROP_COLUMN && command.type != AlterCommand::Type::COMMENT_COLUMN)
throw Exception(
"Alter of type '" + alterTypeToString(command.type) + "' is not supported by storage " + getName(),
ErrorCodes::NOT_IMPLEMENTED);
}
}
void StorageMerge::alter(
const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder)
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
auto new_columns = getColumns();
auto new_indices = getIndices();
auto new_constraints = getConstraints();
params.applyForColumnsOnly(new_columns);
context.getDatabase(database_name)->alterTable(context, table_name, new_columns, new_indices, new_constraints, {});
setColumns(new_columns);
StorageInMemoryMetadata storage_metadata = getInMemoryMetadata();
params.apply(storage_metadata);
context.getDatabase(database_name)->alterTable(context, table_name, storage_metadata);
setColumns(storage_metadata.columns);
}
Block StorageMerge::getQueryHeader(
......
......@@ -48,10 +48,12 @@ public:
database_name = new_database_name;
}
void checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */) override;
/// you need to add and remove columns in the sub-tables manually
/// the structure of sub-tables is not checked
void alter(
const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
void alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const override;
......
......@@ -56,27 +56,27 @@ StorageMergeTree::StorageMergeTree(
const String & database_name_,
const String & table_name_,
const String & relative_data_path_,
const ColumnsDescription & columns_,
const IndicesDescription & indices_,
const ConstraintsDescription & constraints_,
const StorageInMemoryMetadata & metadata,
bool attach,
Context & context_,
const String & date_column_name,
const ASTPtr & partition_by_ast_,
const ASTPtr & order_by_ast_,
const ASTPtr & primary_key_ast_,
const ASTPtr & sample_by_ast_, /// nullptr, if sampling is not supported.
const ASTPtr & ttl_table_ast_,
const MergingParams & merging_params_,
std::unique_ptr<MergeTreeSettings> storage_settings_,
bool has_force_restore_data_flag)
: MergeTreeData(database_name_, table_name_, relative_data_path_,
columns_, indices_, constraints_,
context_, date_column_name, partition_by_ast_, order_by_ast_, primary_key_ast_,
sample_by_ast_, ttl_table_ast_, merging_params_,
std::move(storage_settings_), false, attach),
reader(*this), writer(*this),
merger_mutator(*this, global_context.getBackgroundPool().getNumberOfThreads())
: MergeTreeData(
database_name_,
table_name_,
relative_data_path_,
metadata,
context_,
date_column_name,
merging_params_,
std::move(storage_settings_),
false,
attach)
, reader(*this)
, writer(*this)
, merger_mutator(*this, global_context.getBackgroundPool().getNumberOfThreads())
{
loadDataParts(has_force_restore_data_flag);
......@@ -252,47 +252,21 @@ void StorageMergeTree::alter(
lockNewDataStructureExclusively(table_lock_holder, context.getCurrentQueryId());
checkAlter(params, context);
checkAlterIsPossible(params, context.getSettingsRef());
auto new_columns = getColumns();
auto new_indices = getIndices();
auto new_constraints = getConstraints();
ASTPtr new_order_by_ast = order_by_ast;
ASTPtr new_primary_key_ast = primary_key_ast;
ASTPtr new_ttl_table_ast = ttl_table_ast;
SettingsChanges new_changes;
StorageInMemoryMetadata metadata = getInMemoryMetadata();
params.apply(new_columns, new_indices, new_constraints, new_order_by_ast, new_primary_key_ast, new_ttl_table_ast, new_changes);
/// Modifier for storage AST in /metadata/storage_db/storage.sql
IDatabase::ASTModifier storage_modifier = [&](IAST & ast)
{
auto & storage_ast = ast.as<ASTStorage &>();
if (new_order_by_ast.get() != order_by_ast.get())
storage_ast.set(storage_ast.order_by, new_order_by_ast);
if (new_primary_key_ast.get() != primary_key_ast.get())
storage_ast.set(storage_ast.primary_key, new_primary_key_ast);
if (new_ttl_table_ast.get() != ttl_table_ast.get())
storage_ast.set(storage_ast.ttl_table, new_ttl_table_ast);
if (!new_changes.empty())
{
auto settings_modifier = getSettingsModifier(new_changes);
settings_modifier(ast);
}
};
params.apply(metadata);
/// Update metdata in memory
auto update_metadata = [&]()
auto update_metadata = [&metadata, &table_lock_holder, this]()
{
changeSettings(new_changes, table_lock_holder);
changeSettings(metadata.settings_ast, table_lock_holder);
/// Reinitialize primary key because primary key column types might have changed.
setProperties(new_order_by_ast, new_primary_key_ast, new_columns, new_indices, new_constraints);
setProperties(metadata);
setTTLExpressions(new_columns.getColumnTTLs(), new_ttl_table_ast);
setTTLExpressions(metadata.columns.getColumnTTLs(), metadata.ttl_for_table_ast);
};
/// This alter can be performed at metadata level only
......@@ -300,7 +274,7 @@ void StorageMergeTree::alter(
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, new_constraints, storage_modifier);
context.getDatabase(current_database_name)->alterTable(context, current_table_name, metadata);
update_metadata();
}
......@@ -308,16 +282,16 @@ void StorageMergeTree::alter(
{
/// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time.
/// Also block moves, because they can replace part with old state
/// Also block moves, because they can replace part with old state.
auto merge_blocker = merger_mutator.merges_blocker.cancel();
auto moves_blocked = parts_mover.moves_blocker.cancel();
auto transactions = prepareAlterTransactions(new_columns, new_indices, context);
auto transactions = prepareAlterTransactions(metadata.columns, metadata.indices, context);
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, new_constraints, storage_modifier);
context.getDatabase(current_database_name)->alterTable(context, current_table_name, metadata);
update_metadata();
......@@ -930,25 +904,18 @@ void StorageMergeTree::clearColumnOrIndexInPartition(const ASTPtr & partition, c
std::vector<AlterDataPartTransactionPtr> transactions;
auto new_columns = getColumns();
auto new_indices = getIndices();
auto new_constraints = getConstraints();
ASTPtr ignored_order_by_ast;
ASTPtr ignored_primary_key_ast;
ASTPtr ignored_ttl_table_ast;
SettingsChanges ignored_settings_changes;
alter_command.apply(new_columns, new_indices, new_constraints, ignored_order_by_ast,
ignored_primary_key_ast, ignored_ttl_table_ast, ignored_settings_changes);
StorageInMemoryMetadata metadata = getInMemoryMetadata();
alter_command.apply(metadata);
auto columns_for_parts = new_columns.getAllPhysical();
auto columns_for_parts = metadata.columns.getAllPhysical();
for (const auto & part : parts)
{
if (part->info.partition_id != partition_id)
throw Exception("Unexpected partition ID " + part->info.partition_id + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
MergeTreeData::AlterDataPartTransactionPtr transaction(new MergeTreeData::AlterDataPartTransaction(part));
alterDataPart(columns_for_parts, new_indices.indices, false, transaction);
alterDataPart(columns_for_parts, metadata.indices.indices, false, transaction);
if (transaction->isValid())
transactions.push_back(std::move(transaction));
......
......@@ -67,7 +67,7 @@ public:
void drop(TableStructureWriteLockHolder &) override;
void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override;
void alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
void alter(const AlterCommands & commands, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
void checkTableCanBeDropped() const override;
......@@ -161,17 +161,10 @@ protected:
const String & database_name_,
const String & table_name_,
const String & relative_data_path_,
const ColumnsDescription & columns_,
const IndicesDescription & indices_,
const ConstraintsDescription & constraints_,
const StorageInMemoryMetadata & metadata,
bool attach,
Context & context_,
const String & date_column_name,
const ASTPtr & partition_by_ast_,
const ASTPtr & order_by_ast_,
const ASTPtr & primary_key_ast_,
const ASTPtr & sample_by_ast_, /// nullptr, if sampling is not supported.
const ASTPtr & ttl_table_ast_,
const MergingParams & merging_params_,
std::unique_ptr<MergeTreeSettings> settings_,
bool has_force_restore_data_flag);
......
......@@ -30,6 +30,19 @@ void registerStorageNull(StorageFactory & factory)
});
}
void StorageNull::checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */)
{
for (const auto & command : commands)
{
if (command.type != AlterCommand::Type::ADD_COLUMN && command.type != AlterCommand::Type::MODIFY_COLUMN
&& command.type != AlterCommand::Type::DROP_COLUMN && command.type != AlterCommand::Type::COMMENT_COLUMN)
throw Exception(
"Alter of type '" + alterTypeToString(command.type) + "' is not supported by storage " + getName(),
ErrorCodes::NOT_IMPLEMENTED);
}
}
void StorageNull::alter(
const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder)
{
......@@ -38,12 +51,10 @@ void StorageNull::alter(
const String current_database_name = getDatabaseName();
const String current_table_name = getTableName();
ColumnsDescription new_columns = getColumns();
IndicesDescription new_indices = getIndices();
ConstraintsDescription new_constraints = getConstraints();
params.applyForColumnsOnly(new_columns);
context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, new_constraints, {});
setColumns(std::move(new_columns));
StorageInMemoryMetadata metadata = getInMemoryMetadata();
params.apply(metadata);
context.getDatabase(current_database_name)->alterTable(context, current_table_name, metadata);
setColumns(std::move(metadata.columns));
}
}
......@@ -44,8 +44,9 @@ public:
database_name = new_database_name;
}
void alter(
const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
void checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */) override;
void alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
private:
String table_name;
......
......@@ -33,6 +33,7 @@
#include <Parsers/ASTLiteral.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTCheckQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <IO/ReadBufferFromString.h>
#include <IO/Operators.h>
......@@ -194,25 +195,16 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const String & database_name_,
const String & table_name_,
const String & relative_data_path_,
const ColumnsDescription & columns_,
const IndicesDescription & indices_,
const ConstraintsDescription & constraints_,
const StorageInMemoryMetadata & metadata,
Context & context_,
const String & date_column_name,
const ASTPtr & partition_by_ast_,
const ASTPtr & order_by_ast_,
const ASTPtr & primary_key_ast_,
const ASTPtr & sample_by_ast_,
const ASTPtr & ttl_table_ast_,
const MergingParams & merging_params_,
std::unique_ptr<MergeTreeSettings> settings_,
bool has_force_restore_data_flag)
: MergeTreeData(database_name_, table_name_, relative_data_path_,
columns_, indices_, constraints_,
context_, date_column_name, partition_by_ast_, order_by_ast_, primary_key_ast_,
sample_by_ast_, ttl_table_ast_, merging_params_,
std::move(settings_), true, attach,
: MergeTreeData(database_name_, table_name_, relative_data_path_, metadata,
context_, date_column_name, merging_params_, std::move(settings_), true, attach,
[this] (const std::string & name) { enqueuePartForCheck(name); }),
zookeeper_path(global_context.getMacros()->expand(zookeeper_path_, database_name_, table_name_)),
replica_name(global_context.getMacros()->expand(replica_name_, database_name_, table_name_)),
reader(*this), writer(*this), merger_mutator(*this, global_context.getBackgroundPool().getNumberOfThreads()),
......@@ -496,12 +488,10 @@ void StorageReplicatedMergeTree::checkTableStructure(bool skip_sanity_checks, bo
void StorageReplicatedMergeTree::setTableStructure(ColumnsDescription new_columns, const ReplicatedMergeTreeTableMetadata::Diff & metadata_diff)
{
ASTPtr new_primary_key_ast = primary_key_ast;
ASTPtr new_order_by_ast = order_by_ast;
auto new_indices = getIndices();
auto new_constraints = getConstraints();
ASTPtr new_ttl_table_ast = ttl_table_ast;
IDatabase::ASTModifier storage_modifier;
StorageInMemoryMetadata metadata = getInMemoryMetadata();
if (new_columns != metadata.columns)
metadata.columns = new_columns;
if (!metadata_diff.empty())
{
if (metadata_diff.sorting_key_changed)
......@@ -510,59 +500,41 @@ void StorageReplicatedMergeTree::setTableStructure(ColumnsDescription new_column
auto new_sorting_key_expr_list = parseQuery(parser, metadata_diff.new_sorting_key, 0);
if (new_sorting_key_expr_list->children.size() == 1)
new_order_by_ast = new_sorting_key_expr_list->children[0];
metadata.order_by_ast = new_sorting_key_expr_list->children[0];
else
{
auto tuple = makeASTFunction("tuple");
tuple->arguments->children = new_sorting_key_expr_list->children;
new_order_by_ast = tuple;
metadata.order_by_ast = tuple;
}
if (!primary_key_ast)
{
/// Primary and sorting key become independent after this ALTER so we have to
/// save the old ORDER BY expression as the new primary key.
new_primary_key_ast = order_by_ast->clone();
metadata.primary_key_ast = order_by_ast->clone();
}
}
if (metadata_diff.skip_indices_changed)
new_indices = IndicesDescription::parse(metadata_diff.new_skip_indices);
metadata.indices = IndicesDescription::parse(metadata_diff.new_skip_indices);
if (metadata_diff.constraints_changed)
new_constraints = ConstraintsDescription::parse(metadata_diff.new_constraints);
metadata.constraints = ConstraintsDescription::parse(metadata_diff.new_constraints);
if (metadata_diff.ttl_table_changed)
{
ParserTTLExpressionList parser;
new_ttl_table_ast = parseQuery(parser, metadata_diff.new_ttl_table, 0);
ParserExpression parser;
metadata.ttl_for_table_ast = parseQuery(parser, metadata_diff.new_ttl_table, 0);
}
storage_modifier = [&](IAST & ast)
{
auto & storage_ast = ast.as<ASTStorage &>();
if (!storage_ast.order_by)
throw Exception(
"ALTER MODIFY ORDER BY of default-partitioned tables is not supported",
ErrorCodes::LOGICAL_ERROR);
if (new_primary_key_ast.get() != primary_key_ast.get())
storage_ast.set(storage_ast.primary_key, new_primary_key_ast);
if (new_ttl_table_ast.get() != ttl_table_ast.get())
storage_ast.set(storage_ast.ttl_table, new_ttl_table_ast);
storage_ast.set(storage_ast.order_by, new_order_by_ast);
};
}
global_context.getDatabase(database_name)->alterTable(global_context, table_name, new_columns, new_indices, new_constraints, storage_modifier);
global_context.getDatabase(database_name)->alterTable(global_context, table_name, metadata);
/// Even if the primary/sorting keys didn't change we must reinitialize it
/// because primary key column types might have changed.
setProperties(new_order_by_ast, new_primary_key_ast, new_columns, new_indices, new_constraints);
setTTLExpressions(new_columns.getColumnTTLs(), new_ttl_table_ast);
setProperties(metadata);
setTTLExpressions(new_columns.getColumnTTLs(), metadata.ttl_for_table_ast);
}
......@@ -1550,18 +1522,12 @@ void StorageReplicatedMergeTree::executeClearColumnOrIndexInPartition(const LogE
alter_command.index_name = entry.index_name;
}
auto new_columns = getColumns();
auto new_indices = getIndices();
auto new_constraints = getConstraints();
ASTPtr ignored_order_by_ast;
ASTPtr ignored_primary_key_ast;
ASTPtr ignored_ttl_table_ast;
SettingsChanges ignored_changes;
alter_command.apply(new_columns, new_indices, new_constraints, ignored_order_by_ast, ignored_primary_key_ast, ignored_ttl_table_ast, ignored_changes);
StorageInMemoryMetadata metadata = getInMemoryMetadata();
alter_command.apply(metadata);
size_t modified_parts = 0;
auto parts = getDataParts();
auto columns_for_parts = new_columns.getAllPhysical();
auto columns_for_parts = metadata.columns.getAllPhysical();
/// Check there are no merges in range again
/// TODO: Currently, there are no guarantees that a merge covering entry_part_info will happen during the execution.
......@@ -1581,7 +1547,7 @@ void StorageReplicatedMergeTree::executeClearColumnOrIndexInPartition(const LogE
LOG_DEBUG(log, "Clearing index " << alter_command.index_name << " in part " << part->name);
MergeTreeData::AlterDataPartTransactionPtr transaction(new MergeTreeData::AlterDataPartTransaction(part));
alterDataPart(columns_for_parts, new_indices.indices, false, transaction);
alterDataPart(columns_for_parts, metadata.indices.indices, false, transaction);
if (!transaction->isValid())
continue;
......@@ -3232,6 +3198,9 @@ void StorageReplicatedMergeTree::alter(
const String current_database_name = getDatabaseName();
const String current_table_name = getTableName();
checkAlterIsPossible(params, query_context.getSettingsRef());
/// We cannot check this alter commands with method isModifyingData()
/// because ReplicatedMergeTree stores both columns and metadata for
/// each replica. So we have to wait AlterThread even with lightweight
......@@ -3241,14 +3210,12 @@ void StorageReplicatedMergeTree::alter(
/// We don't replicate storage_settings_ptr ALTER. It's local operation.
/// Also we don't upgrade alter lock to table structure lock.
LOG_DEBUG(log, "ALTER storage_settings_ptr only");
SettingsChanges new_changes;
params.applyForSettingsOnly(new_changes);
StorageInMemoryMetadata metadata = getInMemoryMetadata();
params.apply(metadata);
changeSettings(new_changes, table_lock_holder);
changeSettings(metadata.settings_ast, table_lock_holder);
IDatabase::ASTModifier settings_modifier = getSettingsModifier(new_changes);
global_context.getDatabase(current_database_name)->alterTable(
query_context, current_table_name, getColumns(), getIndices(), getConstraints(), settings_modifier);
global_context.getDatabase(current_database_name)->alterTable(query_context, current_table_name, metadata);
return;
}
......@@ -3278,6 +3245,13 @@ void StorageReplicatedMergeTree::alter(
int32_t new_version = -1; /// Initialization is to suppress (useless) false positive warning found by cppcheck.
};
auto ast_to_str = [](ASTPtr query) -> String
{
if (!query)
return "";
return queryToString(query);
};
/// /columns and /metadata nodes
std::vector<ChangedNode> changed_nodes;
......@@ -3288,33 +3262,25 @@ void StorageReplicatedMergeTree::alter(
if (is_readonly)
throw Exception("Can't ALTER readonly table", ErrorCodes::TABLE_IS_READ_ONLY);
checkAlter(params, query_context);
StorageInMemoryMetadata metadata = getInMemoryMetadata();
params.apply(metadata);
ColumnsDescription new_columns = getColumns();
IndicesDescription new_indices = getIndices();
ConstraintsDescription new_constraints = getConstraints();
ASTPtr new_order_by_ast = order_by_ast;
ASTPtr new_primary_key_ast = primary_key_ast;
ASTPtr new_ttl_table_ast = ttl_table_ast;
SettingsChanges new_changes;
params.apply(new_columns, new_indices, new_constraints, new_order_by_ast, new_primary_key_ast, new_ttl_table_ast, new_changes);
String new_columns_str = new_columns.toString();
String new_columns_str = metadata.columns.toString();
if (new_columns_str != getColumns().toString())
changed_nodes.emplace_back(zookeeper_path, "columns", new_columns_str);
ReplicatedMergeTreeTableMetadata new_metadata(*this);
if (new_order_by_ast.get() != order_by_ast.get())
new_metadata.sorting_key = serializeAST(*extractKeyExpressionList(new_order_by_ast));
if (ast_to_str(metadata.order_by_ast) != ast_to_str(order_by_ast))
new_metadata.sorting_key = serializeAST(*extractKeyExpressionList(metadata.order_by_ast));
if (new_ttl_table_ast.get() != ttl_table_ast.get())
new_metadata.ttl_table = serializeAST(*new_ttl_table_ast);
if (ast_to_str(metadata.ttl_for_table_ast) != ast_to_str(ttl_table_ast))
new_metadata.ttl_table = serializeAST(*metadata.ttl_for_table_ast);
String new_indices_str = new_indices.toString();
String new_indices_str = metadata.indices.toString();
if (new_indices_str != getIndices().toString())
new_metadata.skip_indices = new_indices_str;
String new_constraints_str = new_constraints.toString();
String new_constraints_str = metadata.constraints.toString();
if (new_constraints_str != getConstraints().toString())
new_metadata.constraints = new_constraints_str;
......@@ -3323,16 +3289,11 @@ void StorageReplicatedMergeTree::alter(
changed_nodes.emplace_back(zookeeper_path, "metadata", new_metadata_str);
/// Perform settings update locally
if (!new_changes.empty())
{
IDatabase::ASTModifier settings_modifier = getSettingsModifier(new_changes);
changeSettings(new_changes, table_lock_holder);
global_context.getDatabase(current_database_name)->alterTable(
query_context, current_table_name, getColumns(), getIndices(), getConstraints(), settings_modifier);
}
auto old_metadata = getInMemoryMetadata();
old_metadata.settings_ast = metadata.settings_ast;
changeSettings(metadata.settings_ast, table_lock_holder);
global_context.getDatabase(current_database_name)->alterTable(query_context, current_table_name, old_metadata);
/// Modify shared metadata nodes in ZooKeeper.
Coordination::Requests ops;
......
......@@ -545,16 +545,9 @@ protected:
bool attach,
const String & database_name_, const String & name_,
const String & relative_data_path_,
const ColumnsDescription & columns_,
const IndicesDescription & indices_,
const ConstraintsDescription & constraints_,
const StorageInMemoryMetadata & metadata,
Context & context_,
const String & date_column_name,
const ASTPtr & partition_by_ast_,
const ASTPtr & order_by_ast_,
const ASTPtr & primary_key_ast_,
const ASTPtr & sample_by_ast_,
const ASTPtr & table_ttl_ast_,
const MergingParams & merging_params_,
std::unique_ptr<MergeTreeSettings> settings_,
bool has_force_restore_data_flag);
......
......@@ -5,7 +5,7 @@ CREATE TABLE log_for_alter (
Data String
) ENGINE = Log();
ALTER TABLE log_for_alter MODIFY SETTING aaa=123; -- { serverError 471 }
ALTER TABLE log_for_alter MODIFY SETTING aaa=123; -- { serverError 48 }
DROP TABLE IF EXISTS log_for_alter;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册