提交 f67fd49a 编写于 作者: A Alexander Tokmakov 提交者: alesapin

add review suggestions

上级 a7c595ab
......@@ -210,7 +210,7 @@ try
/// Maybe useless
if (config().has("macros"))
context->setMacros(std::make_unique<Macros>(config(), "macros"));
context->setMacros(std::make_unique<Macros>(config(), "macros", log));
/// Skip networking
......
......@@ -534,7 +534,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
}
if (config().has("macros"))
global_context->setMacros(std::make_unique<Macros>(config(), "macros"));
global_context->setMacros(std::make_unique<Macros>(config(), "macros", log));
/// Initialize main config reloader.
std::string include_from_path = config().getString("include_from", "/etc/metrika.xml");
......@@ -559,7 +559,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
//setTextLog(global_context->getTextLog());
//buildLoggers(*config, logger());
global_context->setClustersConfig(config);
global_context->setMacros(std::make_unique<Macros>(*config, "macros"));
global_context->setMacros(std::make_unique<Macros>(*config, "macros", log));
global_context->setExternalAuthenticatorsConfig(*config);
/// Setup protection to avoid accidental DROP for big tables (that are greater than 50 GB by default)
......
......@@ -2,6 +2,7 @@
#include <Common/Macros.h>
#include <Common/Exception.h>
#include <IO/WriteHelpers.h>
#include <common/logger_useful.h>
namespace DB
......@@ -12,19 +13,31 @@ namespace ErrorCodes
extern const int SYNTAX_ERROR;
}
Macros::Macros(const Poco::Util::AbstractConfiguration & config, const String & root_key)
Macros::Macros(const Poco::Util::AbstractConfiguration & config, const String & root_key, Poco::Logger * log)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(root_key, keys);
for (const String & key : keys)
{
macros[key] = config.getString(root_key + "." + key);
if (key == "database" || key == "table" || key == "uuid")
{
LOG_WARNING(log,
"Config file contains '{}' macro. This macro has special meaning "
"and it's explicit definition is not recommended. Implicit unfolding for "
"'database', 'table' and 'uuid' macros will be disabled.",
key);
enable_special_macros = false;
}
}
}
String Macros::expand(const String & s,
MacroExpansionInfo & info) const
{
/// Do not allow recursion if we expand only special macros, because it will be infinite recursion
assert(info.level == 0 || !info.expand_special_macros_only);
if (s.find('{') == String::npos)
return s;
......@@ -34,6 +47,10 @@ String Macros::expand(const String & s,
if (info.level >= 10)
throw Exception("Too deep recursion while expanding macros: '" + s + "'", ErrorCodes::SYNTAX_ERROR);
/// If config file contains explicit special macro, then we do not expand it in this mode.
if (!enable_special_macros && info.expand_special_macros_only)
return s;
String res;
size_t pos = 0;
while (true)
......@@ -59,15 +76,21 @@ String Macros::expand(const String & s,
auto it = macros.find(macro_name);
/// Prefer explicit macros over implicit.
if (it != macros.end())
if (it != macros.end() && !info.expand_special_macros_only)
res += it->second;
else if (macro_name == "database" && !info.database_name.empty())
res += info.database_name;
else if (macro_name == "table" && !info.table_name.empty())
res += info.table_name;
else if (macro_name == "database" && !info.table_id.database_name.empty())
{
res += info.table_id.database_name;
info.expanded_database = true;
}
else if (macro_name == "table" && !info.table_id.table_name.empty())
{
res += info.table_id.table_name;
info.expanded_table = true;
}
else if (macro_name == "uuid")
{
if (info.uuid == UUIDHelpers::Nil)
if (info.table_id.uuid == UUIDHelpers::Nil)
throw Exception("Macro 'uuid' and empty arguments of ReplicatedMergeTree "
"are supported only for ON CLUSTER queries with Atomic database engine",
ErrorCodes::SYNTAX_ERROR);
......@@ -76,12 +99,16 @@ String Macros::expand(const String & s,
/// It becomes impossible to check if {uuid} is contained inside some unknown macro.
if (info.level)
throw Exception("Macro 'uuid' should not be inside another macro", ErrorCodes::SYNTAX_ERROR);
res += toString(info.uuid);
res += toString(info.table_id.uuid);
info.expanded_uuid = true;
}
else if (info.ignore_unknown)
else if (info.ignore_unknown || info.expand_special_macros_only)
{
if (info.expand_special_macros_only)
res += '{';
res += macro_name;
if (info.expand_special_macros_only)
res += '}';
info.has_unknown = true;
}
else
......@@ -93,6 +120,9 @@ String Macros::expand(const String & s,
}
++info.level;
if (info.expand_special_macros_only)
return res;
return expand(res, info);
}
......@@ -113,9 +143,9 @@ String Macros::expand(const String & s) const
String Macros::expand(const String & s, const StorageID & table_id, bool allow_uuid) const
{
MacroExpansionInfo info;
info.database_name = table_id.database_name;
info.table_name = table_id.table_name;
info.uuid = allow_uuid ? table_id.uuid : UUIDHelpers::Nil;
info.table_id = table_id;
if (!allow_uuid)
info.table_id.uuid = UUIDHelpers::Nil;
return expand(s, info);
}
......
......@@ -13,6 +13,7 @@ namespace Poco
{
class AbstractConfiguration;
}
class Logger;
}
......@@ -25,18 +26,19 @@ class Macros
{
public:
Macros() = default;
Macros(const Poco::Util::AbstractConfiguration & config, const String & key);
Macros(const Poco::Util::AbstractConfiguration & config, const String & key, Poco::Logger * log);
struct MacroExpansionInfo
{
/// Settings
String database_name;
String table_name;
UUID uuid = UUIDHelpers::Nil;
StorageID table_id = StorageID::createEmpty();
bool ignore_unknown = false;
bool expand_special_macros_only = false;
/// Information about macro expansion
size_t level = 0;
bool expanded_database = false;
bool expanded_table = false;
bool expanded_uuid = false;
bool has_unknown = false;
};
......@@ -64,6 +66,7 @@ public:
private:
MacroMap macros;
bool enable_special_macros = true;
};
......
......@@ -207,11 +207,13 @@ void DatabaseAtomic::renameTable(const Context & context, const String & table_n
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot move dictionary to other database");
StoragePtr table = getTableUnlocked(table_name, db_lock);
table->checkTableCanBeRenamed();
assert_can_move_mat_view(table);
StoragePtr other_table;
if (exchange)
{
other_table = other_db.getTableUnlocked(to_table_name, other_db_lock);
other_table->checkTableCanBeRenamed();
assert_can_move_mat_view(other_table);
}
......
......@@ -888,7 +888,7 @@ void InterpreterCreateQuery::prepareOnClusterQuery(ASTCreateQuery & create, cons
{
String zk_path = create.storage->engine->arguments->children[0]->as<ASTLiteral>()->value.get<String>();
Macros::MacroExpansionInfo info;
info.uuid = create.uuid;
info.table_id.uuid = create.uuid;
info.ignore_unknown = true;
context.getMacros()->expand(zk_path, info);
if (!info.expanded_uuid)
......
......@@ -337,6 +337,8 @@ public:
throw Exception("Truncate is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
virtual void checkTableCanBeRenamed() const {}
/** Rename the table.
* Renaming a name in a file with metadata, the name in the list of tables in the RAM, is done separately.
* In this function, you need to rename the directory with the data, if any.
......
......@@ -412,26 +412,31 @@ static StoragePtr create(const StorageFactory::Arguments & args)
/// For Replicated.
String zookeeper_path;
String replica_name;
bool allow_renaming = true;
if (replicated)
{
bool has_arguments = arg_num + 2 <= arg_cnt;
bool has_valid_arguments = has_arguments && engine_args[arg_num]->as<ASTLiteral>() && engine_args[arg_num + 1]->as<ASTLiteral>();
ASTLiteral * ast_zk_path;
ASTLiteral * ast_replica_name;
if (has_valid_arguments)
{
const auto * ast = engine_args[arg_num]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::String)
zookeeper_path = safeGet<String>(ast->value);
/// Get path and name from engine arguments
ast_zk_path = engine_args[arg_num]->as<ASTLiteral>();
if (ast_zk_path && ast_zk_path->value.getType() == Field::Types::String)
zookeeper_path = safeGet<String>(ast_zk_path->value);
else
throw Exception(
"Path in ZooKeeper must be a string literal" + getMergeTreeVerboseHelp(is_extended_storage_def),
ErrorCodes::BAD_ARGUMENTS);
++arg_num;
ast = engine_args[arg_num]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::String)
replica_name = safeGet<String>(ast->value);
ast_replica_name = engine_args[arg_num]->as<ASTLiteral>();
if (ast_replica_name && ast_replica_name->value.getType() == Field::Types::String)
replica_name = safeGet<String>(ast_replica_name->value);
else
throw Exception(
"Replica name must be a string literal" + getMergeTreeVerboseHelp(is_extended_storage_def), ErrorCodes::BAD_ARGUMENTS);
......@@ -448,6 +453,20 @@ static StoragePtr create(const StorageFactory::Arguments & args)
zookeeper_path = args.context.getConfigRef().getString("default_replica_path", "/clickhouse/tables/{uuid}/{shard}");
/// TODO maybe use hostname if {replica} is not defined?
replica_name = args.context.getConfigRef().getString("default_replica_name", "{replica}");
/// Modify query, so default values will be written to metadata
assert(arg_num == 0);
ASTs old_args;
std::swap(engine_args, old_args);
auto path_arg = std::make_shared<ASTLiteral>(zookeeper_path);
auto name_arg = std::make_shared<ASTLiteral>(replica_name);
ast_zk_path = path_arg.get();
ast_replica_name = name_arg.get();
engine_args.emplace_back(std::move(path_arg));
engine_args.emplace_back(std::move(name_arg));
std::move(std::begin(old_args), std::end(old_args), std::back_inserter(engine_args));
arg_num = 2;
arg_cnt += 2;
}
else
throw Exception("Expected two string literal arguments: zookeper_path and replica_name", ErrorCodes::BAD_ARGUMENTS);
......@@ -455,8 +474,44 @@ static StoragePtr create(const StorageFactory::Arguments & args)
/// Allow implicit {uuid} macros only for zookeeper_path in ON CLUSTER queries
bool is_on_cluster = args.local_context.getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
bool allow_uuid_macro = is_on_cluster || args.query.attach;
zookeeper_path = args.context.getMacros()->expand(zookeeper_path, args.table_id, allow_uuid_macro);
replica_name = args.context.getMacros()->expand(replica_name, args.table_id, false);
/// Unfold {database} and {table} macro on table creation, so table can be renamed.
/// We also unfold {uuid} macro, so path will not be broken after moving table from Atomic to Ordinary database.
if (!args.attach)
{
Macros::MacroExpansionInfo info;
/// NOTE: it's not recursive
info.expand_special_macros_only = true;
info.table_id = args.table_id;
if (!allow_uuid_macro)
info.table_id.uuid = UUIDHelpers::Nil;
zookeeper_path = args.context.getMacros()->expand(zookeeper_path, info);
info.level = 0;
info.table_id.uuid = UUIDHelpers::Nil;
replica_name = args.context.getMacros()->expand(replica_name, info);
}
ast_zk_path->value = zookeeper_path;
ast_replica_name->value = replica_name;
/// Expand other macros (such as {shard} and {replica}). We do not expand them on previous step
/// to make possible copying metadata files between replicas.
Macros::MacroExpansionInfo info;
info.table_id = args.table_id;
if (!allow_uuid_macro)
info.table_id.uuid = UUIDHelpers::Nil;
zookeeper_path = args.context.getMacros()->expand(zookeeper_path, info);
info.level = 0;
info.table_id.uuid = UUIDHelpers::Nil;
replica_name = args.context.getMacros()->expand(replica_name, info);
/// We do not allow renaming table with these macros in metadata, because zookeeper_path will be broken after RENAME TABLE.
/// NOTE: it may happen if table was created by older version of ClickHouse (< 20.10) and macros was not unfolded on table creation
/// or if one of these macros is recursively expanded from some other macro.
if (info.expanded_database || info.expanded_table)
allow_renaming = false;
}
/// This merging param maybe used as part of sorting key
......@@ -707,7 +762,8 @@ static StoragePtr create(const StorageFactory::Arguments & args)
date_column_name,
merging_params,
std::move(storage_settings),
args.has_force_restore_data_flag);
args.has_force_restore_data_flag,
allow_renaming);
else
return StorageMergeTree::create(
args.table_id,
......
......@@ -180,7 +180,16 @@ StoragePtr StorageFactory::get(
.has_force_restore_data_flag = has_force_restore_data_flag
};
return storages.at(name).creator_fn(arguments);
auto res = storages.at(name).creator_fn(arguments);
if (!empty_engine_args.empty())
{
/// Storage creator modified empty arguments list, so we should modify the query
assert(storage_def && storage_def->engine && !storage_def->engine->arguments);
storage_def->engine->arguments = std::make_shared<ASTExpressionList>();
storage_def->engine->children.push_back(storage_def->engine->arguments);
storage_def->engine->arguments->children = empty_engine_args;
}
return res;
}
StorageFactory & StorageFactory::instance()
......
......@@ -178,7 +178,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const String & date_column_name,
const MergingParams & merging_params_,
std::unique_ptr<MergeTreeSettings> settings_,
bool has_force_restore_data_flag)
bool has_force_restore_data_flag,
bool allow_renaming_)
: MergeTreeData(table_id_,
relative_data_path_,
metadata_,
......@@ -200,6 +201,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
, cleanup_thread(*this)
, part_check_thread(*this)
, restarting_thread(*this)
, allow_renaming(allow_renaming_)
{
queue_updating_task = global_context.getSchedulePool().createTask(
getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::queueUpdatingTask)", [this]{ queueUpdatingTask(); });
......@@ -4187,8 +4189,17 @@ void StorageReplicatedMergeTree::checkTableCanBeDropped() const
global_context.checkTableCanBeDropped(table_id.database_name, table_id.table_name, getTotalActiveSizeInBytes());
}
void StorageReplicatedMergeTree::checkTableCanBeRenamed() const
{
if (!allow_renaming)
throw Exception("Cannot rename Replicated table, because zookeeper_path contains implicit 'database' or 'table' macro. "
"We cannot rename path in ZooKeeper, so path may become inconsistent with table name. If you really want to rename table, "
"you should edit metadata file first and restart server or reattach the table.", ErrorCodes::NOT_IMPLEMENTED);
}
void StorageReplicatedMergeTree::rename(const String & new_path_to_table_data, const StorageID & new_table_id)
{
checkTableCanBeRenamed();
MergeTreeData::rename(new_path_to_table_data, new_table_id);
/// Update table name in zookeeper
......
......@@ -128,6 +128,8 @@ public:
void truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &) override;
void checkTableCanBeRenamed() const override;
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;
bool supportsIndexForIn() const override { return true; }
......@@ -304,6 +306,9 @@ private:
/// True if replica was created for existing table with fixed granularity
bool other_replicas_fixed_granularity = false;
/// Do not allow RENAME TABLE if zookeeper_path contains {database} or {table} macro
const bool allow_renaming;
template <class Func>
void foreachCommittedParts(const Func & func) const;
......@@ -571,7 +576,8 @@ protected:
const String & date_column_name,
const MergingParams & merging_params_,
std::unique_ptr<MergeTreeSettings> settings_,
bool has_force_restore_data_flag);
bool has_force_restore_data_flag,
bool allow_renaming_);
};
......
......@@ -3,5 +3,7 @@
<test>Hello, world!</test>
<shard>s1</shard>
<replica>r1</replica>
<default_path_test>/clickhouse/tables/{database}/{shard}/</default_path_test>
<default_name_test>table_{table}</default_name_test>
</macros>
</yandex>
......@@ -330,10 +330,12 @@ def test_replicated_without_arguments(test_cluster):
assert "are supported only for ON CLUSTER queries with Atomic database engine" in \
instance.query_and_get_error("CREATE TABLE test_atomic.rmt (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n")
test_cluster.ddl_check_query(instance,
"CREATE TABLE test_atomic.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n")
"CREATE TABLE test_atomic.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree() ORDER BY n")
test_cluster.ddl_check_query(instance, "DROP TABLE test_atomic.rmt ON CLUSTER cluster")
test_cluster.ddl_check_query(instance,
"CREATE TABLE test_atomic.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n")
"CREATE TABLE test_atomic.rmt UUID '12345678-0000-4000-8000-000000000001' ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n")
assert instance.query("SHOW CREATE test_atomic.rmt FORMAT TSVRaw") == \
"CREATE TABLE test_atomic.rmt\n(\n `n` UInt64,\n `s` String\n)\nENGINE = ReplicatedMergeTree('/clickhouse/tables/12345678-0000-4000-8000-000000000001/{shard}', '{replica}')\nORDER BY n\nSETTINGS index_granularity = 8192\n"
test_cluster.ddl_check_query(instance, "RENAME TABLE test_atomic.rmt TO test_atomic.rmt_renamed ON CLUSTER cluster")
test_cluster.ddl_check_query(instance,
"CREATE TABLE test_atomic.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}') ORDER BY n")
......@@ -349,6 +351,8 @@ def test_replicated_without_arguments(test_cluster):
assert "are supported only for ON CLUSTER queries with Atomic database engine" in \
instance.query_and_get_error("CREATE TABLE test_ordinary.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree('/{shard}/{uuid}/', '{replica}') ORDER BY n")
test_cluster.ddl_check_query(instance, "CREATE TABLE test_ordinary.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree('/{shard}/{table}/', '{replica}') ORDER BY n")
assert instance.query("SHOW CREATE test_ordinary.rmt FORMAT TSVRaw") == \
"CREATE TABLE test_ordinary.rmt\n(\n `n` UInt64,\n `s` String\n)\nENGINE = ReplicatedMergeTree('/{shard}/rmt/', '{replica}')\nORDER BY n\nSETTINGS index_granularity = 8192\n"
test_cluster.ddl_check_query(instance, "DROP DATABASE test_ordinary ON CLUSTER cluster")
test_cluster.pm_random_drops.push_rules(rules)
......
CREATE TABLE default.rmt\n(\n `n` UInt64,\n `s` String\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/test_01148/{shard}/default/rmt\', \'{replica}\')\nORDER BY n\nSETTINGS index_granularity = 8192
CREATE TABLE default.rmt1\n(\n `n` UInt64,\n `s` String\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/test_01148/{shard}/default/rmt\', \'{replica}\')\nORDER BY n\nSETTINGS index_granularity = 8192
CREATE TABLE default.rmt\n(\n `n` UInt64,\n `s` String\n)\nENGINE = ReplicatedMergeTree(\'{default_path_test}test_01148\', \'{default_name_test}\')\nORDER BY n\nSETTINGS index_granularity = 8192
CREATE TABLE default.rmt\n(\n `n` UInt64,\n `s` String\n)\nENGINE = ReplicatedMergeTree(\'{default_path_test}test_01148\', \'{default_name_test}\')\nORDER BY n\nSETTINGS index_granularity = 8192
DROP TABLE IF EXISTS rmt;
CREATE TABLE rmt (n UInt64, s String) ENGINE = ReplicatedMergeTree('/clickhouse/test_01148/{shard}/{database}/{table}', '{replica}') ORDER BY n;
SHOW CREATE TABLE rmt;
RENAME TABLE rmt TO rmt1;
DETACH TABLE rmt1;
ATTACH TABLE rmt1;
SHOW CREATE TABLE rmt1;
CREATE TABLE rmt (n UInt64, s String) ENGINE = ReplicatedMergeTree('{default_path_test}{uuid}', '{default_name_test}') ORDER BY n; -- { serverError 62 }
CREATE TABLE rmt (n UInt64, s String) ENGINE = ReplicatedMergeTree('{default_path_test}test_01148', '{default_name_test}') ORDER BY n;
SHOW CREATE TABLE rmt;
RENAME TABLE rmt TO rmt2; -- { serverError 48 }
DETACH TABLE rmt;
ATTACH TABLE rmt;
SHOW CREATE TABLE rmt;
DROP TABLE rmt;
DROP TABLE rmt1;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册