未验证 提交 d3fd94c1 编写于 作者: N Nikita Mikhaylov 提交者: GitHub

Merge pull request #17394 from ClickHouse/backport/20.12/17248

Backport #17248 to 20.12: Fix #15235. When clickhouse-copier handle non-partitioned table, throws segfault error.
......@@ -605,7 +605,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
settings_push.replication_alter_partitions_sync = 2;
query_alter_ast_string += " ALTER TABLE " + getQuotedTable(original_table) +
" ATTACH PARTITION " + partition_name +
((partition_name == "'all'") ? " ATTACH PARTITION ID " : " ATTACH PARTITION ") + partition_name +
" FROM " + getQuotedTable(helping_table);
LOG_DEBUG(log, "Executing ALTER query: {}", query_alter_ast_string);
......@@ -636,7 +636,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
if (!task_table.isReplicatedTable())
query_deduplicate_ast_string += " OPTIMIZE TABLE " + getQuotedTable(original_table) +
" PARTITION " + partition_name + " DEDUPLICATE;";
((partition_name == "'all'") ? " PARTITION ID " : " PARTITION ") + partition_name + " DEDUPLICATE;";
LOG_DEBUG(log, "Executing OPTIMIZE DEDUPLICATE query: {}", query_alter_ast_string);
......@@ -807,7 +807,7 @@ bool ClusterCopier::tryDropPartitionPiece(
DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number));
String query = "ALTER TABLE " + getQuotedTable(helping_table);
query += " DROP PARTITION " + task_partition.name + "";
query += ((task_partition.name == "'all'") ? " DROP PARTITION ID " : " DROP PARTITION ") + task_partition.name + "";
/// TODO: use this statement after servers will be updated up to 1.1.54310
// query += " DROP PARTITION ID '" + task_partition.name + "'";
......@@ -1567,7 +1567,7 @@ void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskT
DatabaseAndTableName original_table = task_table.table_push;
DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number));
String query = "ALTER TABLE " + getQuotedTable(helping_table) + " DROP PARTITION " + partition_name;
String query = "ALTER TABLE " + getQuotedTable(helping_table) + ((partition_name == "'all'") ? " DROP PARTITION ID " : " DROP PARTITION ") + partition_name;
const ClusterPtr & cluster_push = task_table.cluster_push;
Settings settings_push = task_cluster->settings_push;
......@@ -1670,14 +1670,24 @@ void ClusterCopier::createShardInternalTables(const ConnectionTimeouts & timeout
std::set<String> ClusterCopier::getShardPartitions(const ConnectionTimeouts & timeouts, TaskShard & task_shard)
std::set<String> res;
createShardInternalTables(timeouts, task_shard, false);
TaskTable & task_table = task_shard.task_table;
const String & partition_name = queryToString(task_table.engine_push_partition_key_ast);
if (partition_name == "'all'")
return res;
String query;
WriteBufferFromOwnString wb;
wb << "SELECT DISTINCT " << queryToString(task_table.engine_push_partition_key_ast) << " AS partition FROM"
wb << "SELECT DISTINCT " << partition_name << " AS partition FROM"
<< " " << getQuotedTable(task_shard.table_read_shard) << " ORDER BY partition DESC";
query = wb.str();
......@@ -1692,7 +1702,6 @@ std::set<String> ClusterCopier::getShardPartitions(const ConnectionTimeouts & ti
Block block = getBlockWithAllStreamData(InterpreterFactory::get(query_ast, local_context)->execute().getInputStream());
std::set<String> res;
if (block)
ColumnWithTypeAndName & column = block.getByPosition(0);
<engine>ENGINE = MergeTree ORDER BY date SETTINGS index_granularity = 8192</engine>
......@@ -230,6 +230,27 @@ class Task_no_arg:
instance = cluster.instances['s1_1_0']
instance.query("DROP TABLE copier_test1_1")
class Task_non_partitioned_table:
def __init__(self, cluster):
self.cluster = cluster
self.zk_task_path = "/clickhouse-copier/task_non_partitoned_table"
self.copier_task_config = open(os.path.join(CURRENT_TEST_DIR, 'task_non_partitioned_table.xml'), 'r').read()
self.rows = 1000000
def start(self):
instance = cluster.instances['s0_0_0']
"create table copier_test1 (date Date, id UInt32) engine = MergeTree ORDER BY date SETTINGS index_granularity = 8192")
instance.query("insert into copier_test1 values ('2016-01-01', 10);")
def check(self):
assert TSV(self.cluster.instances['s1_1_0'].query("SELECT date FROM copier_test1_1")) == TSV("2016-01-01\n")
instance = cluster.instances['s0_0_0']
instance.query("DROP TABLE copier_test1")
instance = cluster.instances['s1_1_0']
instance.query("DROP TABLE copier_test1_1")
def execute_task(task, cmd_options):
......@@ -359,6 +380,8 @@ def test_no_index(started_cluster):
def test_no_arg(started_cluster):
execute_task(Task_no_arg(started_cluster), [])
def test_non_partitioned_table(started_cluster):
execute_task(Task_non_partitioned_table(started_cluster), [])
if __name__ == '__main__':
with contextmanager(started_cluster)() as cluster:
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册