diff --git a/programs/copier/ClusterCopier.cpp b/programs/copier/ClusterCopier.cpp index a129dc7efcc5bb7210d76132300de0cbf1b142bd..3b8d97f63e280babeb1f884774dc60766ebf2390 100644 --- a/programs/copier/ClusterCopier.cpp +++ b/programs/copier/ClusterCopier.cpp @@ -605,7 +605,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t settings_push.replication_alter_partitions_sync = 2; query_alter_ast_string += " ALTER TABLE " + getQuotedTable(original_table) + - " ATTACH PARTITION " + partition_name + + ((partition_name == "'all'") ? " ATTACH PARTITION ID " : " ATTACH PARTITION ") + partition_name + " FROM " + getQuotedTable(helping_table); LOG_DEBUG(log, "Executing ALTER query: {}", query_alter_ast_string); @@ -636,7 +636,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t if (!task_table.isReplicatedTable()) { query_deduplicate_ast_string += " OPTIMIZE TABLE " + getQuotedTable(original_table) + - " PARTITION " + partition_name + " DEDUPLICATE;"; + ((partition_name == "'all'") ? " PARTITION ID " : " PARTITION ") + partition_name + " DEDUPLICATE;"; LOG_DEBUG(log, "Executing OPTIMIZE DEDUPLICATE query: {}", query_alter_ast_string); @@ -807,7 +807,7 @@ bool ClusterCopier::tryDropPartitionPiece( DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number)); String query = "ALTER TABLE " + getQuotedTable(helping_table); - query += " DROP PARTITION " + task_partition.name + ""; + query += ((task_partition.name == "'all'") ? " DROP PARTITION ID " : " DROP PARTITION ") + task_partition.name + ""; /// TODO: use this statement after servers will be updated up to 1.1.54310 // query += " DROP PARTITION ID '" + task_partition.name + "'"; @@ -1567,7 +1567,7 @@ void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskT DatabaseAndTableName original_table = task_table.table_push; DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number)); - String query = "ALTER TABLE " + getQuotedTable(helping_table) + " DROP PARTITION " + partition_name; + String query = "ALTER TABLE " + getQuotedTable(helping_table) + ((partition_name == "'all'") ? " DROP PARTITION ID " : " DROP PARTITION ") + partition_name; const ClusterPtr & cluster_push = task_table.cluster_push; Settings settings_push = task_cluster->settings_push; @@ -1670,14 +1670,24 @@ void ClusterCopier::createShardInternalTables(const ConnectionTimeouts & timeout std::set ClusterCopier::getShardPartitions(const ConnectionTimeouts & timeouts, TaskShard & task_shard) { + std::set res; + createShardInternalTables(timeouts, task_shard, false); TaskTable & task_table = task_shard.task_table; + const String & partition_name = queryToString(task_table.engine_push_partition_key_ast); + + if (partition_name == "'all'") + { + res.emplace("'all'"); + return res; + } + String query; { WriteBufferFromOwnString wb; - wb << "SELECT DISTINCT " << queryToString(task_table.engine_push_partition_key_ast) << " AS partition FROM" + wb << "SELECT DISTINCT " << partition_name << " AS partition FROM" << " " << getQuotedTable(task_shard.table_read_shard) << " ORDER BY partition DESC"; query = wb.str(); } @@ -1692,7 +1702,6 @@ std::set ClusterCopier::getShardPartitions(const ConnectionTimeouts & ti local_context.setSettings(task_cluster->settings_pull); Block block = getBlockWithAllStreamData(InterpreterFactory::get(query_ast, local_context)->execute().getInputStream()); - std::set res; if (block) { ColumnWithTypeAndName & column = block.getByPosition(0); diff --git a/tests/integration/test_cluster_copier/task_non_partitioned_table.xml b/tests/integration/test_cluster_copier/task_non_partitioned_table.xml new file mode 100644 index 0000000000000000000000000000000000000000..499c54ae46e99edc1d4690f65ee0a5eef35f089b --- /dev/null +++ b/tests/integration/test_cluster_copier/task_non_partitioned_table.xml @@ -0,0 +1,39 @@ + + + + + 1 + + s0_0_0 + 9000 + + + + + + + 1 + + s1_1_0 + 9000 + + + + + + 1 + + + + source_cluster + default + copier_test1 + + default_cluster + default + copier_test1_1 + ENGINE = MergeTree ORDER BY date SETTINGS index_granularity = 8192 + rand() + + + diff --git a/tests/integration/test_cluster_copier/test.py b/tests/integration/test_cluster_copier/test.py index 6a922dbfca7207b59f09c6d1aee813a24a611fc8..d87969630cd0fb750bfe2d5d83040682f1e66e06 100644 --- a/tests/integration/test_cluster_copier/test.py +++ b/tests/integration/test_cluster_copier/test.py @@ -230,6 +230,27 @@ class Task_no_arg: instance = cluster.instances['s1_1_0'] instance.query("DROP TABLE copier_test1_1") +class Task_non_partitioned_table: + + def __init__(self, cluster): + self.cluster = cluster + self.zk_task_path = "/clickhouse-copier/task_non_partitoned_table" + self.copier_task_config = open(os.path.join(CURRENT_TEST_DIR, 'task_non_partitioned_table.xml'), 'r').read() + self.rows = 1000000 + + def start(self): + instance = cluster.instances['s0_0_0'] + instance.query( + "create table copier_test1 (date Date, id UInt32) engine = MergeTree ORDER BY date SETTINGS index_granularity = 8192") + instance.query("insert into copier_test1 values ('2016-01-01', 10);") + + def check(self): + assert TSV(self.cluster.instances['s1_1_0'].query("SELECT date FROM copier_test1_1")) == TSV("2016-01-01\n") + instance = cluster.instances['s0_0_0'] + instance.query("DROP TABLE copier_test1") + instance = cluster.instances['s1_1_0'] + instance.query("DROP TABLE copier_test1_1") + def execute_task(task, cmd_options): task.start() @@ -359,6 +380,8 @@ def test_no_index(started_cluster): def test_no_arg(started_cluster): execute_task(Task_no_arg(started_cluster), []) +def test_non_partitioned_table(started_cluster): + execute_task(Task_non_partitioned_table(started_cluster), []) if __name__ == '__main__': with contextmanager(started_cluster)() as cluster: