提交 a8266a3d 编写于 作者: A Alexander Tokmakov

fix DDDL with cross-replication and Atomic

上级 7fcf20e4
......@@ -1477,7 +1477,9 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
{
auto create_query_push_ast = rewriteCreateQueryStorage(task_shard.current_pull_table_create_query,
task_table.table_push, task_table.engine_push_ast);
create_query_push_ast->as<ASTCreateQuery &>().if_not_exists = true;
auto & create = create_query_push_ast->as<ASTCreateQuery &>();
create.if_not_exists = true;
InterpreterCreateQuery::prepareOnClusterQuery(create, context, task_table.cluster_push_name);
String query = queryToString(create_query_push_ast);
LOG_DEBUG(log, "Create destination tables. Query: {}", query);
......
......@@ -23,18 +23,15 @@ Macros::Macros(const Poco::Util::AbstractConfiguration & config, const String &
}
String Macros::expand(const String & s,
size_t level,
const String & database_name,
const String & table_name,
const UUID & uuid) const
MacroExpansionInfo & info) const
{
if (s.find('{') == String::npos)
return s;
if (level && s.size() > 65536)
if (info.level && s.size() > 65536)
throw Exception("Too long string while expanding macros", ErrorCodes::SYNTAX_ERROR);
if (level >= 10)
if (info.level >= 10)
throw Exception("Too deep recursion while expanding macros: '" + s + "'", ErrorCodes::SYNTAX_ERROR);
String res;
......@@ -64,17 +61,28 @@ String Macros::expand(const String & s,
/// Prefer explicit macros over implicit.
if (it != macros.end())
res += it->second;
else if (macro_name == "database" && !database_name.empty())
res += database_name;
else if (macro_name == "table" && !table_name.empty())
res += table_name;
else if (macro_name == "database" && !info.database_name.empty())
res += info.database_name;
else if (macro_name == "table" && !info.table_name.empty())
res += info.table_name;
else if (macro_name == "uuid")
{
if (uuid == UUIDHelpers::Nil)
if (info.uuid == UUIDHelpers::Nil)
throw Exception("Macro 'uuid' and empty arguments of ReplicatedMergeTree "
"are supported only for ON CLUSTER queries with Atomic database engine",
ErrorCodes::SYNTAX_ERROR);
res += toString(uuid);
/// For ON CLUSTER queries we don't want to require all macros definitions in initiator's config.
/// However, initiator must check that for cross-replication cluster zookeeper_path does not contain {uuid} macro.
/// It becomes impossible to check if {uuid} is contained inside some unknown macro.
if (info.level)
throw Exception("Macro 'uuid' should not be inside another macro", ErrorCodes::SYNTAX_ERROR);
res += toString(info.uuid);
info.expanded_uuid = true;
}
else if (info.ignore_unknown)
{
res += macro_name;
info.has_unknown = true;
}
else
throw Exception("No macro '" + macro_name +
......@@ -84,7 +92,8 @@ String Macros::expand(const String & s,
pos = end + 1;
}
return expand(res, level + 1, database_name, table_name);
++info.level;
return expand(res, info);
}
String Macros::getValue(const String & key) const
......@@ -94,9 +103,20 @@ String Macros::getValue(const String & key) const
throw Exception("No macro " + key + " in config", ErrorCodes::SYNTAX_ERROR);
}
String Macros::expand(const String & s) const
{
MacroExpansionInfo info;
return expand(s, info);
}
String Macros::expand(const String & s, const StorageID & table_id, bool allow_uuid) const
{
return expand(s, 0, table_id.database_name, table_id.table_name, allow_uuid ? table_id.uuid : UUIDHelpers::Nil);
MacroExpansionInfo info;
info.database_name = table_id.database_name;
info.table_name = table_id.table_name;
info.uuid = allow_uuid ? table_id.uuid : UUIDHelpers::Nil;
return expand(s, info);
}
Names Macros::expand(const Names & source_names, size_t level) const
......@@ -104,8 +124,12 @@ Names Macros::expand(const Names & source_names, size_t level) const
Names result_names;
result_names.reserve(source_names.size());
MacroExpansionInfo info;
for (const String & name : source_names)
result_names.push_back(expand(name, level));
{
info.level = level;
result_names.push_back(expand(name, info));
}
return result_names;
}
......
......@@ -27,15 +27,28 @@ public:
Macros() = default;
Macros(const Poco::Util::AbstractConfiguration & config, const String & key);
struct MacroExpansionInfo
{
/// Settings
String database_name;
String table_name;
UUID uuid = UUIDHelpers::Nil;
bool ignore_unknown = false;
/// Information about macro expansion
size_t level = 0;
bool expanded_uuid = false;
bool has_unknown = false;
};
/** Replace the substring of the form {macro_name} with the value for macro_name, obtained from the config file.
* If {database} and {table} macros aren`t defined explicitly, expand them as database_name and table_name respectively.
* level - the level of recursion.
*/
String expand(const String & s,
size_t level = 0,
const String & database_name = "",
const String & table_name = "",
const UUID & uuid = UUIDHelpers::Nil) const;
MacroExpansionInfo & info) const;
String expand(const String & s) const;
String expand(const String & s, const StorageID & table_id, bool allow_uuid) const;
......
......@@ -623,4 +623,21 @@ const std::string & Cluster::ShardInfo::pathForInsert(bool prefer_localhost_repl
return dir_name_for_internal_replication_with_local;
}
bool Cluster::maybeCrossReplication() const
{
/// Cluster can be used for cross-replication if some replicas have different default database names,
/// so one clickhouse-server instance can contain multiple replicas.
if (addresses_with_failover.empty())
return false;
const String & database_name = addresses_with_failover.front().front().default_database;
for (const auto & shard : addresses_with_failover)
for (const auto & replica : shard)
if (replica.default_database != database_name)
return true;
return false;
}
}
......@@ -193,6 +193,10 @@ public:
/// Get a new Cluster that contains all servers (all shards with all replicas) from existing cluster as independent shards.
std::unique_ptr<Cluster> getClusterWithReplicasAsShards(const Settings & settings) const;
/// Returns false if cluster configuration doesn't allow to use it for cross-replication.
/// NOTE: true does not mean, that it's actually a cross-replication cluster.
bool maybeCrossReplication() const;
private:
using SlotToShard = std::vector<UInt64>;
SlotToShard slot_to_shard;
......
......@@ -5,6 +5,7 @@
#include <Common/StringUtils/StringUtils.h>
#include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h>
#include <Common/Macros.h>
#include <Core/Defines.h>
#include <Core/Settings.h>
......@@ -853,15 +854,60 @@ BlockIO InterpreterCreateQuery::createDictionary(ASTCreateQuery & create)
return {};
}
void InterpreterCreateQuery::prepareOnClusterQuery(ASTCreateQuery & create, const Context & context, const String & cluster_name)
{
if (create.attach)
return;
/// For CREATE query generate UUID on initiator, so it will be the same on all hosts.
/// It will be ignored if database does not support UUIDs.
if (create.uuid == UUIDHelpers::Nil)
create.uuid = UUIDHelpers::generateV4();
/// For cross-replication cluster we cannot use UUID in replica path.
String cluster_name_expanded = context.getMacros()->expand(cluster_name);
ClusterPtr cluster = context.getCluster(cluster_name_expanded);
if (cluster->maybeCrossReplication())
{
/// Check that {uuid} macro is not used in zookeeper_path for ReplicatedMergeTree.
/// Otherwise replicas will generate different paths.
if (!create.storage)
return;
if (!create.storage->engine)
return;
if (!startsWith(create.storage->engine->name, "Replicated"))
return;
bool has_explicit_zk_path_arg = create.storage->engine->arguments &&
create.storage->engine->arguments->children.size() >= 2 &&
create.storage->engine->arguments->children[0]->as<ASTLiteral>() &&
create.storage->engine->arguments->children[0]->as<ASTLiteral>()->value.getType() == Field::Types::String;
if (has_explicit_zk_path_arg)
{
String zk_path = create.storage->engine->arguments->children[0]->as<ASTLiteral>()->value.get<String>();
Macros::MacroExpansionInfo info;
info.uuid = create.uuid;
info.ignore_unknown = true;
context.getMacros()->expand(zk_path, info);
if (!info.expanded_uuid)
return;
}
throw Exception("Seems like cluster is configured for cross-replication, "
"but zookeeper_path for ReplicatedMergeTree is not specified or contains {uuid} macro. "
"It's not supported for cross replication, because tables must have different UUIDs. "
"Please specify unique zookeeper_path explicitly.", ErrorCodes::INCORRECT_QUERY);
}
}
BlockIO InterpreterCreateQuery::execute()
{
auto & create = query_ptr->as<ASTCreateQuery &>();
if (!create.cluster.empty())
{
/// For CREATE query generate UUID on initiator, so it will be the same on all hosts.
/// It will be ignored if database does not support UUIDs.
if (!create.attach && create.uuid == UUIDHelpers::Nil)
create.uuid = UUIDHelpers::generateV4();
prepareOnClusterQuery(create, context, create.cluster);
return executeDDLQueryOnCluster(query_ptr, context, getRequiredAccess());
}
......
......@@ -55,6 +55,8 @@ public:
static ColumnsDescription getColumnsDescription(const ASTExpressionList & columns, const Context & context, bool sanity_check_compression_codecs);
static ConstraintsDescription getConstraintsDescription(const ASTExpressionList * constraints);
static void prepareOnClusterQuery(ASTCreateQuery & create, const Context & context, const String & cluster_name);
private:
struct TableProperties
{
......
......@@ -33,7 +33,7 @@
<enabled_partitions>3 4 5 6 1 2 0 </enabled_partitions>
<!-- Engine of destination tables -->
<engine>ENGINE=ReplicatedMergeTree('/clickhouse/tables/cluster{cluster}/{shard}/hits', '{replica}') PARTITION BY d % 3 ORDER BY (d, sipHash64(d)) SAMPLE BY sipHash64(d) SETTINGS index_granularity = 16</engine>
<engine>ENGINE=ReplicatedMergeTree PARTITION BY d % 3 ORDER BY (d, sipHash64(d)) SAMPLE BY sipHash64(d) SETTINGS index_granularity = 16</engine>
<!-- Which sarding key to use while copying -->
<sharding_key>d + 1</sharding_key>
......@@ -93,4 +93,4 @@
</cluster1>
</remote_servers>
</yandex>
\ No newline at end of file
</yandex>
......@@ -34,7 +34,7 @@
<!-- Engine of destination tables -->
<engine>ENGINE=
ReplicatedMergeTree('/clickhouse/tables/cluster{cluster}/{shard}/b', '{replica}')
ReplicatedMergeTree
PARTITION BY toMonday(date)
ORDER BY d
</engine>
......@@ -97,4 +97,4 @@
</cluster1>
</remote_servers>
</yandex>
\ No newline at end of file
</yandex>
......@@ -28,7 +28,7 @@
<!-- Engine of destination tables -->
<engine>ENGINE=
ReplicatedMergeTree('/clickhouse/tables/cluster{cluster}/{shard}/test_block_size', '{replica}')
ReplicatedMergeTree
ORDER BY d PARTITION BY partition
</engine>
......@@ -99,4 +99,4 @@
</shard_0_0>
</remote_servers>
</yandex>
\ No newline at end of file
</yandex>
......@@ -81,11 +81,11 @@ class Task1:
for cluster_num in ["0", "1"]:
ddl_check_query(instance, "DROP DATABASE IF EXISTS default ON CLUSTER cluster{}".format(cluster_num))
ddl_check_query(instance,
"CREATE DATABASE IF NOT EXISTS default ON CLUSTER cluster{} ENGINE=Ordinary".format(
"CREATE DATABASE IF NOT EXISTS default ON CLUSTER cluster{}".format(
cluster_num))
ddl_check_query(instance, "CREATE TABLE hits ON CLUSTER cluster0 (d UInt64, d1 UInt64 MATERIALIZED d+1) " +
"ENGINE=ReplicatedMergeTree('/clickhouse/tables/cluster_{cluster}/{shard}/hits', '{replica}') " +
"ENGINE=ReplicatedMergeTree " +
"PARTITION BY d % 3 ORDER BY (d, sipHash64(d)) SAMPLE BY sipHash64(d) SETTINGS index_granularity = 16")
ddl_check_query(instance,
"CREATE TABLE hits_all ON CLUSTER cluster0 (d UInt64) ENGINE=Distributed(cluster0, default, hits, d)")
......@@ -110,10 +110,11 @@ class Task1:
class Task2:
def __init__(self, cluster):
def __init__(self, cluster, unique_zk_path):
self.cluster = cluster
self.zk_task_path = "/clickhouse-copier/task_month_to_week_partition"
self.copier_task_config = open(os.path.join(CURRENT_TEST_DIR, 'task_month_to_week_description.xml'), 'r').read()
self.unique_zk_path = unique_zk_path
def start(self):
instance = cluster.instances['s0_0_0']
......@@ -121,11 +122,13 @@ class Task2:
for cluster_num in ["0", "1"]:
ddl_check_query(instance, "DROP DATABASE IF EXISTS default ON CLUSTER cluster{}".format(cluster_num))
ddl_check_query(instance,
"CREATE DATABASE IF NOT EXISTS default ON CLUSTER cluster{} ENGINE=Ordinary".format(
"CREATE DATABASE IF NOT EXISTS default ON CLUSTER cluster{}".format(
cluster_num))
ddl_check_query(instance,
"CREATE TABLE a ON CLUSTER cluster0 (date Date, d UInt64, d1 UInt64 ALIAS d+1) ENGINE=ReplicatedMergeTree('/clickhouse/tables/cluster_{cluster}/{shard}/a', '{replica}', date, intHash64(d), (date, intHash64(d)), 8192)")
"CREATE TABLE a ON CLUSTER cluster0 (date Date, d UInt64, d1 UInt64 ALIAS d+1) "
"ENGINE=ReplicatedMergeTree('/clickhouse/tables/cluster_{cluster}/{shard}/" + self.unique_zk_path + "', "
"'{replica}', date, intHash64(d), (date, intHash64(d)), 8192)")
ddl_check_query(instance,
"CREATE TABLE a_all ON CLUSTER cluster0 (date Date, d UInt64) ENGINE=Distributed(cluster0, default, a, d)")
......@@ -169,7 +172,7 @@ class Task_test_block_size:
ddl_check_query(instance, """
CREATE TABLE test_block_size ON CLUSTER shard_0_0 (partition Date, d UInt64)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/cluster_{cluster}/{shard}/test_block_size', '{replica}')
ENGINE=ReplicatedMergeTree
ORDER BY (d, sipHash64(d)) SAMPLE BY sipHash64(d)""", 2)
instance.query(
......@@ -332,17 +335,17 @@ def test_copy_with_recovering_after_move_faults(started_cluster, use_sample_offs
@pytest.mark.timeout(600)
def test_copy_month_to_week_partition(started_cluster):
execute_task(Task2(started_cluster), [])
execute_task(Task2(started_cluster, "test1"), [])
@pytest.mark.timeout(600)
def test_copy_month_to_week_partition_with_recovering(started_cluster):
execute_task(Task2(started_cluster), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY)])
execute_task(Task2(started_cluster, "test2"), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY)])
@pytest.mark.timeout(600)
def test_copy_month_to_week_partition_with_recovering_after_move_faults(started_cluster):
execute_task(Task2(started_cluster), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY)])
execute_task(Task2(started_cluster, "test3"), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY)])
def test_block_size(started_cluster):
......
......@@ -59,7 +59,7 @@ class TaskTrivial:
for node in [source, destination]:
node.query("DROP DATABASE IF EXISTS default")
node.query("CREATE DATABASE IF NOT EXISTS default ENGINE=Ordinary")
node.query("CREATE DATABASE IF NOT EXISTS default")
source.query("CREATE TABLE trivial (d UInt64, d1 UInt64 MATERIALIZED d+1) "
"ENGINE=ReplicatedMergeTree('/clickhouse/tables/source_trivial_cluster/1/trivial', '1') "
......
......@@ -327,6 +327,8 @@ def test_replicated_without_arguments(test_cluster):
rules = test_cluster.pm_random_drops.pop_rules()
instance = test_cluster.instances['ch1']
test_cluster.ddl_check_query(instance, "CREATE DATABASE test_atomic ON CLUSTER cluster ENGINE=Atomic")
assert "are supported only for ON CLUSTER queries with Atomic database engine" in \
instance.query_and_get_error("CREATE TABLE test_atomic.rmt (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n")
test_cluster.ddl_check_query(instance,
"CREATE TABLE test_atomic.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n")
test_cluster.ddl_check_query(instance, "DROP TABLE test_atomic.rmt ON CLUSTER cluster")
......@@ -334,9 +336,20 @@ def test_replicated_without_arguments(test_cluster):
"CREATE TABLE test_atomic.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n")
test_cluster.ddl_check_query(instance, "RENAME TABLE test_atomic.rmt TO test_atomic.rmt_renamed ON CLUSTER cluster")
test_cluster.ddl_check_query(instance,
"CREATE TABLE test_atomic.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n")
"CREATE TABLE test_atomic.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}') ORDER BY n")
test_cluster.ddl_check_query(instance,
"EXCHANGE TABLES test_atomic.rmt AND test_atomic.rmt_renamed ON CLUSTER cluster")
assert instance.query("SELECT countDistinct(uuid) from clusterAllReplicas('cluster', 'system', 'databases') WHERE uuid != 0 AND name='test_atomic'") == "1\n"
assert instance.query("SELECT countDistinct(uuid) from clusterAllReplicas('cluster', 'system', 'tables') WHERE uuid != 0 AND name='rmt'") == "1\n"
test_cluster.ddl_check_query(instance, "DROP DATABASE test_atomic ON CLUSTER cluster")
test_cluster.ddl_check_query(instance, "CREATE DATABASE test_ordinary ON CLUSTER cluster ENGINE=Ordinary")
assert "are supported only for ON CLUSTER queries with Atomic database engine" in \
instance.query_and_get_error("CREATE TABLE test_ordinary.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n")
assert "are supported only for ON CLUSTER queries with Atomic database engine" in \
instance.query_and_get_error("CREATE TABLE test_ordinary.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree('/{shard}/{uuid}/', '{replica}') ORDER BY n")
test_cluster.ddl_check_query(instance, "CREATE TABLE test_ordinary.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree('/{shard}/{table}/', '{replica}') ORDER BY n")
test_cluster.ddl_check_query(instance, "DROP DATABASE test_ordinary ON CLUSTER cluster")
test_cluster.pm_random_drops.push_rules(rules)
......
......@@ -77,3 +77,30 @@ def test_alter_ddl(started_cluster):
node2.query("SYSTEM SYNC REPLICA replica_2.replicated_local;", timeout=5)
assert_eq_with_retry(node1, "SELECT count(*) FROM replica_2.replicated", '0')
def test_atomic_database(started_cluster):
node1.query('''DROP DATABASE IF EXISTS replica_1 ON CLUSTER cross_3shards_2replicas;
DROP DATABASE IF EXISTS replica_2 ON CLUSTER cross_3shards_2replicas;
CREATE DATABASE replica_1 ON CLUSTER cross_3shards_2replicas ENGINE=Atomic;
CREATE DATABASE replica_2 ON CLUSTER cross_3shards_2replicas ENGINE=Atomic;''')
assert "It's not supported for cross replication" in \
node1.query_and_get_error("CREATE TABLE rmt ON CLUSTER cross_3shards_2replicas (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n")
assert "It's not supported for cross replication" in \
node1.query_and_get_error("CREATE TABLE replica_1.rmt ON CLUSTER cross_3shards_2replicas (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n")
assert "It's not supported for cross replication" in \
node1.query_and_get_error("CREATE TABLE rmt ON CLUSTER cross_3shards_2replicas (n UInt64, s String) ENGINE=ReplicatedMergeTree('/{shard}/{uuid}/', '{replica}') ORDER BY n")
assert "It's not supported for cross replication" in \
node1.query_and_get_error("CREATE TABLE replica_2.rmt ON CLUSTER cross_3shards_2replicas (n UInt64, s String) ENGINE=ReplicatedMergeTree('/{shard}/{uuid}/', '{replica}') ORDER BY n")
assert "For a distributed DDL on circular replicated cluster its table name must be qualified by database name" in \
node1.query_and_get_error("CREATE TABLE rmt ON CLUSTER cross_3shards_2replicas (n UInt64, s String) ENGINE=ReplicatedMergeTree('/tables/{shard}/rmt/', '{replica}') ORDER BY n")
node1.query("CREATE TABLE replica_1.rmt ON CLUSTER cross_3shards_2replicas (n UInt64, s String) ENGINE=ReplicatedMergeTree('/tables/{shard}/rmt/', '{replica}') ORDER BY n")
node1.query("CREATE TABLE replica_2.rmt ON CLUSTER cross_3shards_2replicas (n UInt64, s String) ENGINE=ReplicatedMergeTree('/tables/{shard_bk}/rmt/', '{replica_bk}') ORDER BY n")
assert node1.query("SELECT countDistinct(uuid) from remote('node1,node2,node3', 'system', 'databases') WHERE uuid != 0 AND name='replica_1'") == "1\n"
assert node1.query("SELECT countDistinct(uuid) from remote('node1,node2,node3', 'system', 'tables') WHERE uuid != 0 AND name='rmt'") == "2\n"
node1.query("INSERT INTO replica_1.rmt VALUES (1, 'test')")
node2.query("SYSTEM SYNC REPLICA replica_2.rmt", timeout=5)
assert_eq_with_retry(node2, "SELECT * FROM replica_2.rmt", '1\ttest')
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册