diff --git a/dbms/programs/copier/ClusterCopier.h b/dbms/programs/copier/ClusterCopier.h index 8cc0cf21211f0a298cf846a0b01329bd978da852..cd0481684dc5646899e3b665c1fa5e11e2b393e9 100644 --- a/dbms/programs/copier/ClusterCopier.h +++ b/dbms/programs/copier/ClusterCopier.h @@ -273,7 +273,8 @@ public: for (TaskTable & task_table : task_cluster->table_tasks) { LOG_INFO(log, "Process table task " << task_table.table_id << " with " - << task_table.all_shards.size() << " shards, " << task_table.local_shards.size() << " of them are local ones"); + << task_table.all_shards.size() << " shards, " + << task_table.local_shards.size() << " of them are local ones"); if (task_table.all_shards.empty()) continue; @@ -507,7 +508,7 @@ public: catch (const Coordination::Exception & e) { LOG_INFO(log, "A ZooKeeper error occurred while checking partition " << partition_name - << ". Will recheck the partition. Error: " << e.displayText()); + << ". Will recheck the partition. Error: " << e.displayText()); return false; } @@ -525,6 +526,112 @@ public: return true; } + bool checkAllPieceInPartitionDone(const TaskTable & task_table, const String & partition_name, const TasksShard & shards_with_partition) + { + bool answer = true; + for (size_t piece_number = 0; piece_number < task_table.number_of_splits; piece_number++) + answer &= checkPartitionPieceIsDone(task_table, partition_name, piece_number, shards_with_partition); + return answer; + } + + + /* The same as function above + * Assume that we don't know on which shards do we have partition certain piece. + * We'll check them all (I mean shards that contain the whole partition) + * And shards that don't have certain piece MUST mark that piece is_done true. + * */ + bool checkPartitionPieceIsDone(const TaskTable & task_table, const String & partition_name, + size_t piece_number, const TasksShard & shards_with_partition) + { + LOG_DEBUG(log, "Check that all shards processed partition " << partition_name + << " piece number" + toString(piece_number) + " successfully"); + + auto zookeeper = context.getZooKeeper(); + + /// Collect all shards that contain partition piece number piece_number. + Strings piece_status_paths; + for (auto & shard : shards_with_partition) + { + ShardPartition & task_shard_partition = shard->partition_tasks.find(partition_name)->second; + ShardPartitionPiece & shard_partition_piece = task_shard_partition.pieces[piece_number]; + piece_status_paths.emplace_back(shard_partition_piece.getShardStatusPath()); + } + + std::vector zxid1, zxid2; + + try + { + std::vector get_futures; + for (const String & path : piece_status_paths) + get_futures.emplace_back(zookeeper->asyncGet(path)); + + // Check that state is Finished and remember zxid + for (auto & future : get_futures) + { + auto res = future.get(); + + TaskStateWithOwner status = TaskStateWithOwner::fromString(res.data); + if (status.state != TaskState::Finished) + { + LOG_INFO(log, "The task " << res.data << " is being rewritten by " << status.owner << ". Partition piece will be rechecked"); + return false; + } + + zxid1.push_back(res.stat.pzxid); + } + + // Check that partition is not dirty + { + CleanStateClock clean_state_clock ( + zookeeper, + task_table.getCertainPartitionIsDirtyPath(partition_name), + task_table.getCertainPartitionIsCleanedPath(partition_name) + ); + Coordination::Stat stat{}; + LogicalClock task_start_clock; + if (zookeeper->exists(task_table.getCertainPartitionTaskStatusPath(partition_name), &stat)) + task_start_clock = LogicalClock(stat.mzxid); + zookeeper->get(task_table.getCertainPartitionTaskStatusPath(partition_name), &stat); + if (!clean_state_clock.is_clean() || task_start_clock <= clean_state_clock.discovery_zxid) + { + LOG_INFO(log, "Partition " << partition_name << " piece number " << toString(piece_number) << " become dirty"); + return false; + } + } + + get_futures.clear(); + for (const String & path : piece_status_paths) + get_futures.emplace_back(zookeeper->asyncGet(path)); + + // Remember zxid of states again + for (auto & future : get_futures) + { + auto res = future.get(); + zxid2.push_back(res.stat.pzxid); + } + } + catch (const Coordination::Exception & e) + { + LOG_INFO(log, "A ZooKeeper error occurred while checking partition " << partition_name << " piece number " + << toString(piece_number) << ". Will recheck the partition. Error: " << e.displayText()); + return false; + } + + // If all task is finished and zxid is not changed then partition could not become dirty again + for (UInt64 shard_num = 0; shard_num < piece_status_paths.size(); ++shard_num) + { + if (zxid1[shard_num] != zxid2[shard_num]) + { + LOG_INFO(log, "The task " << piece_status_paths[shard_num] << " is being modified now. Partition piece will be rechecked"); + return false; + } + } + + LOG_INFO(log, "Partition " << partition_name << " is copied successfully"); + return true; + + } + /// Removes MATERIALIZED and ALIAS columns from create table query static ASTPtr removeAliasColumnsFromCreateQuery(const ASTPtr & query_ast) { @@ -877,6 +984,14 @@ public: Error, }; + + enum class PartititonPieceTaskStatus + { + Active, + Finished, + Error, + }; + /// Job for copying partition from particular shard. PartitionTaskStatus tryProcessPartitionTask(const ConnectionTimeouts & timeouts, ShardPartition & task_partition, bool is_unprioritized_task) { @@ -884,7 +999,7 @@ public: try { - res = processPartitionTaskImpl(timeouts, task_partition, is_unprioritized_task); + res = iterateThroughAllPiecesInPartition(timeouts, task_partition, is_unprioritized_task); } catch (...) { @@ -905,23 +1020,40 @@ public: return res; } - PartitionTaskStatus processPartitionTaskImpl(const ConnectionTimeouts & timeouts, ShardPartition & task_partition, bool is_unprioritized_task) + PartitionTaskStatus iterateThroughAllPiecesInPartition(const ConnectionTimeouts & timeouts, ShardPartition & task_partition, + bool is_unprioritized_task) + { + const size_t total_number_of_pieces = task_partition.task_shard.task_table.number_of_splits; + + /// ThreadPool maybe ?? + for (size_t piece_number = 0; piece_number < total_number_of_pieces; piece_number++) + processPartitionPieceTaskImpl(timeouts, task_partition, piece_number, is_unprioritized_task); + + return PartitionTaskStatus::Finished; + + } + + PartitionTaskStatus processPartitionPieceTaskImpl(const ConnectionTimeouts & timeouts, ShardPartition & task_partition, + const size_t current_piece_number, bool is_unprioritized_task) { TaskShard & task_shard = task_partition.task_shard; TaskTable & task_table = task_shard.task_table; ClusterPartition & cluster_partition = task_table.getClusterPartition(task_partition.name); + ShardPartitionPiece & partition_piece = task_partition.pieces[current_piece_number]; + const size_t number_of_splits = task_table.number_of_splits; + const String primary_key_comma_separated = task_table.primary_key_comma_separated; UNUSED(number_of_splits); - + UNUSED(partition_piece); /// We need to update table definitions for each partition, it could be changed after ALTER - createShardInternalTables(timeouts, task_shard); + createShardInternalTables(timeouts, task_shard, true, current_piece_number); auto zookeeper = context.getZooKeeper(); - const String is_dirty_flag_path = task_partition.getCommonPartitionIsDirtyPath(); - const String is_dirty_cleaned_path = task_partition.getCommonPartitionIsCleanedPath(); - const String current_task_is_active_path = task_partition.getActiveWorkerPath(); - const String current_task_status_path = task_partition.getShardStatusPath(); + const String piece_is_dirty_flag_path = partition_piece.getPartitionPieceIsDirtyPath(); + const String piece_is_dirty_cleaned_path = partition_piece.getPartitionPieceIsCleanedPath(); + const String current_task_piece_is_active_path = partition_piece.getActiveWorkerPath(); + const String current_task_piece_status_path = partition_piece.getShardStatusPath(); /// Auxiliary functions: @@ -935,25 +1067,27 @@ public: else if (clock.discovery_version) { LOG_DEBUG(log, "Updating clean state clock"); - zookeeper->set(is_dirty_flag_path, host_id, clock.discovery_version.value()); + zookeeper->set(piece_is_dirty_flag_path, host_id, clock.discovery_version.value()); } else { LOG_DEBUG(log, "Creating clean state clock"); - zookeeper->create(is_dirty_flag_path, host_id, zkutil::CreateMode::Persistent); + zookeeper->create(piece_is_dirty_flag_path, host_id, zkutil::CreateMode::Persistent); } }; /// Returns SELECT query filtering current partition and applying user filter auto get_select_query = [&] (const DatabaseAndTableName & from_table, const String & fields, String limit = "", - bool enable_splitting = false, size_t current_piece_number = 0) + bool enable_splitting = false) { String query; query += "SELECT " + fields + " FROM " + getQuotedTable(from_table); /// TODO: Bad, it is better to rewrite with ASTLiteral(partition_key_field) query += " WHERE (" + queryToString(task_table.engine_push_partition_key_ast) + " = (" + task_partition.name + " AS partition_key))"; + if (enable_splitting) - query += " AND ( cityHash64(*) = " + std::to_string(current_piece_number) + " )"; + query += " AND ( cityHash64(" + primary_key_comma_separated + ") = " + toString(current_piece_number) + " )"; + if (!task_table.where_condition_str.empty()) query += " AND (" + task_table.where_condition_str + ")"; if (!limit.empty()) @@ -964,11 +1098,11 @@ public: }; /// Load balancing - auto worker_node_holder = createTaskWorkerNodeAndWaitIfNeed(zookeeper, current_task_status_path, is_unprioritized_task); + auto worker_node_holder = createTaskWorkerNodeAndWaitIfNeed(zookeeper, current_task_piece_status_path, is_unprioritized_task); - LOG_DEBUG(log, "Processing " << current_task_status_path); + LOG_DEBUG(log, "Processing " << current_task_piece_status_path); - CleanStateClock clean_state_clock (zookeeper, is_dirty_flag_path, is_dirty_cleaned_path); + CleanStateClock clean_state_clock (zookeeper, piece_is_dirty_flag_path, piece_is_dirty_cleaned_path); LogicalClock task_start_clock; { @@ -982,7 +1116,7 @@ public: && (!task_start_clock.hasHappened() || clean_state_clock.discovery_zxid <= task_start_clock)) { LOG_DEBUG(log, "Partition " << task_partition.name << " appears to be clean"); - zookeeper->createAncestors(current_task_status_path); + zookeeper->createAncestors(current_task_piece_status_path); } else { @@ -1001,17 +1135,17 @@ public: } /// Create ephemeral node to mark that we are active and process the partition - zookeeper->createAncestors(current_task_is_active_path); + zookeeper->createAncestors(current_task_piece_is_active_path); zkutil::EphemeralNodeHolderPtr partition_task_node_holder; try { - partition_task_node_holder = zkutil::EphemeralNodeHolder::create(current_task_is_active_path, *zookeeper, host_id); + partition_task_node_holder = zkutil::EphemeralNodeHolder::create(current_task_piece_is_active_path, *zookeeper, host_id); } catch (const Coordination::Exception & e) { if (e.code == Coordination::ZNODEEXISTS) { - LOG_DEBUG(log, "Someone is already processing " << current_task_is_active_path); + LOG_DEBUG(log, "Someone is already processing " << current_task_piece_is_active_path); return PartitionTaskStatus::Active; } @@ -1022,17 +1156,17 @@ public: /// create blocking node to signal cleaning up if it is abandoned { String status_data; - if (zookeeper->tryGet(current_task_status_path, status_data)) + if (zookeeper->tryGet(current_task_piece_status_path, status_data)) { TaskStateWithOwner status = TaskStateWithOwner::fromString(status_data); if (status.state == TaskState::Finished) { - LOG_DEBUG(log, "Task " << current_task_status_path << " has been successfully executed by " << status.owner); + LOG_DEBUG(log, "Task " << current_task_piece_status_path << " has been successfully executed by " << status.owner); return PartitionTaskStatus::Finished; } // Task is abandoned, initialize DROP PARTITION - LOG_DEBUG(log, "Task " << current_task_status_path << " has not been successfully finished by " << + LOG_DEBUG(log, "Task " << current_task_piece_status_path << " has not been successfully finished by " << status.owner << ". Partition will be dropped and refilled."); create_is_dirty_node(clean_state_clock); @@ -1064,7 +1198,7 @@ public: if (count != 0) { Coordination::Stat stat_shards{}; - zookeeper->get(task_partition.getPartitionShardsPath(), &stat_shards); + zookeeper->get(task_partition.getPartitionShardsPath(), & stat_shards); /// NOTE: partition is still fresh if dirt discovery happens before cleaning if (stat_shards.numChildren == 0) @@ -1085,7 +1219,7 @@ public: /// Try start processing, create node about it { String start_state = TaskStateWithOwner::getData(TaskState::Started, host_id); - CleanStateClock new_clean_state_clock (zookeeper, is_dirty_flag_path, is_dirty_cleaned_path); + CleanStateClock new_clean_state_clock (zookeeper, piece_is_dirty_flag_path, piece_is_dirty_cleaned_path); if (clean_state_clock != new_clean_state_clock) { LOG_INFO(log, "Partition " << task_partition.name << " clean state changed, cowardly bailing"); @@ -1097,13 +1231,18 @@ public: create_is_dirty_node(new_clean_state_clock); return PartitionTaskStatus::Error; } - zookeeper->create(current_task_status_path, start_state, zkutil::CreateMode::Persistent); + zookeeper->create(current_task_piece_status_path, start_state, zkutil::CreateMode::Persistent); } /// Try create table (if not exists) on each shard { + /// Define push table for current partition piece + auto database_and_table_for_current_piece= std::pair( + task_table.table_push.first, + task_table.table_push.second + ".piece_" + toString(current_piece_number)); + auto create_query_push_ast = rewriteCreateQueryStorage(task_shard.current_pull_table_create_query, - task_table.table_push, task_table.engine_push_ast); + database_and_table_for_current_piece, task_table.engine_push_ast); create_query_push_ast->as().if_not_exists = true; String query = queryToString(create_query_push_ast); @@ -1173,7 +1312,7 @@ public: throw Exception("ZooKeeper session is expired, cancel INSERT SELECT", ErrorCodes::UNFINISHED); if (!future_is_dirty_checker.valid()) - future_is_dirty_checker = zookeeper->asyncExists(is_dirty_flag_path); + future_is_dirty_checker = zookeeper->asyncExists(piece_is_dirty_flag_path); /// check_period_milliseconds should less than average insert time of single block /// Otherwise, the insertion will slow a little bit @@ -1222,7 +1361,7 @@ public: /// Finalize the processing, change state of current partition task (and also check is_dirty flag) { String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id); - CleanStateClock new_clean_state_clock (zookeeper, is_dirty_flag_path, is_dirty_cleaned_path); + CleanStateClock new_clean_state_clock (zookeeper, piece_is_dirty_flag_path, piece_is_dirty_cleaned_path); if (clean_state_clock != new_clean_state_clock) { LOG_INFO(log, "Partition " << task_partition.name << " clean state changed, cowardly bailing"); @@ -1234,7 +1373,7 @@ public: create_is_dirty_node(new_clean_state_clock); return PartitionTaskStatus::Error; } - zookeeper->set(current_task_status_path, state_finished, 0); + zookeeper->set(current_task_piece_status_path, state_finished, 0); } LOG_INFO(log, "Partition " << task_partition.name << " copied"); @@ -1283,7 +1422,8 @@ public: return parseQuery(parser_create_query, create_query_pull_str, 0); } - void createShardInternalTables(const ConnectionTimeouts & timeouts, TaskShard & task_shard, bool create_split = true) + /// If it is implicitly asked to create split Distributed table for certain piece on current shard, we will do it. + void createShardInternalTables(const ConnectionTimeouts & timeouts, TaskShard & task_shard, bool create_split = true, const size_t piece_number = 0) { TaskTable & task_table = task_shard.task_table; @@ -1295,7 +1435,8 @@ public: String read_shard_prefix = ".read_shard_" + toString(task_shard.indexInCluster()) + "."; String split_shard_prefix = ".split."; task_shard.table_read_shard = DatabaseAndTableName(working_database_name, read_shard_prefix + task_table.table_id); - task_shard.table_split_shard = DatabaseAndTableName(working_database_name, split_shard_prefix + task_table.table_id); + task_shard.table_split_shard = DatabaseAndTableName( + working_database_name, split_shard_prefix + task_table.table_id + ".piece_" + toString(piece_number)); /// Create special cluster with single shard String shard_read_cluster_name = read_shard_prefix + task_table.cluster_pull_name; @@ -1303,16 +1444,16 @@ public: context.setCluster(shard_read_cluster_name, cluster_pull_current_shard); auto storage_shard_ast = createASTStorageDistributed(shard_read_cluster_name, task_table.table_pull.first, task_table.table_pull.second); - const auto & storage_split_ast = task_table.engine_split_ast; + const auto & storage_piece_split_ast = task_table.auxiliary_engine_split_asts[piece_number]; auto create_query_ast = removeAliasColumnsFromCreateQuery(task_shard.current_pull_table_create_query); auto create_table_pull_ast = rewriteCreateQueryStorage(create_query_ast, task_shard.table_read_shard, storage_shard_ast); - auto create_table_split_ast = rewriteCreateQueryStorage(create_query_ast, task_shard.table_split_shard, storage_split_ast); + auto create_table_split_piece_ast = rewriteCreateQueryStorage(create_query_ast, task_shard.table_split_shard, storage_piece_split_ast); dropAndCreateLocalTable(create_table_pull_ast); if (create_split) - dropAndCreateLocalTable(create_table_split_ast); + dropAndCreateLocalTable(create_table_split_piece_ast); } @@ -1396,8 +1537,10 @@ public: + " WHERE (" + queryToString(task_table.engine_push_partition_key_ast) + " = (" + partition_quoted_name + " AS partition_key))"; const size_t number_of_splits = task_table.number_of_splits; + const String & primary_key_comma_separated = task_table.primary_key_comma_separated; - query += " AND (cityHash64(*) % " + std::to_string(number_of_splits) + " = " + std::to_string(current_piece_number) + " )"; + query += " AND (cityHash64(" + primary_key_comma_separated + ") % " + + std::to_string(number_of_splits) + " = " + std::to_string(current_piece_number) + " )"; if (!task_table.where_condition_str.empty()) query += " AND (" + task_table.where_condition_str + ")"; diff --git a/dbms/programs/copier/Internals.h b/dbms/programs/copier/Internals.h index dfb52dcfc34290894090def7f02c9f8f0efc013f..d4f565b63df2532d319d5b149c7a478076689e8e 100644 --- a/dbms/programs/copier/Internals.h +++ b/dbms/programs/copier/Internals.h @@ -313,6 +313,79 @@ static ASTPtr extractPartitionKey(const ASTPtr & storage_ast) } } +/* + * Choosing a Primary Key that Differs from the Sorting Key + * It is possible to specify a primary key (an expression with values that are written in the index file for each mark) + * that is different from the sorting key (an expression for sorting the rows in data parts). + * In this case the primary key expression tuple must be a prefix of the sorting key expression tuple. + * This feature is helpful when using the SummingMergeTree and AggregatingMergeTree table engines. + * In a common case when using these engines, the table has two types of columns: dimensions and measures. + * Typical queries aggregate values of measure columns with arbitrary GROUP BY and filtering by dimensions. + * Because SummingMergeTree and AggregatingMergeTree aggregate rows with the same value of the sorting key, + * it is natural to add all dimensions to it. As a result, the key expression consists of a long list of columns + * and this list must be frequently updated with newly added dimensions. + * In this case it makes sense to leave only a few columns in the primary key that will provide efficient + * range scans and add the remaining dimension columns to the sorting key tuple. + * ALTER of the sorting key is a lightweight operation because when a new column is simultaneously added t + * o the table and to the sorting key, existing data parts don't need to be changed. + * Since the old sorting key is a prefix of the new sorting key and there is no data in the newly added column, + * the data is sorted by both the old and new sorting keys at the moment of table modification. + * + * */ +[[maybe_unused]] static ASTPtr extractPrimaryKeyOrOrderBy(const ASTPtr & storage_ast) +{ + String storage_str = queryToString(storage_ast); + + const auto & storage = storage_ast->as(); + const auto & engine = storage.engine->as(); + + if (!endsWith(engine.name, "MergeTree")) + { + throw Exception("Unsupported engine was specified in " + storage_str + ", only *MergeTree engines are supported", + ErrorCodes::BAD_ARGUMENTS); + } + + /// FIXME + if (!isExtendedDefinitionStorage(storage_ast)) + { + throw Exception("Is not extended deginition storage " + storage_str + " Will be fixed later.", + ErrorCodes::BAD_ARGUMENTS); + } + + if (storage.primary_key) + return storage.primary_key->clone(); + + return storage.order_by->clone(); +} + +[[maybe_unused]] static String createCommaSeparatedStringFrom(const Strings & strings) +{ + String answer; + for (auto & string: strings) + answer += string + ", "; + + /// Remove last comma and space + answer.pop_back(); + answer.pop_back(); + return answer; +} + +[[maybe_unused]] static Strings extractPrimaryKeyString(const ASTPtr & storage_ast) +{ + const auto primary_key_or_order_by = extractPrimaryKeyOrOrderBy(storage_ast)->as(); + + ASTPtr primary_key_or_order_by_arguments_ast = primary_key_or_order_by.arguments->clone(); + ASTs & primary_key_or_order_by_arguments = primary_key_or_order_by_arguments_ast->children; + + Strings answer; + answer.reserve(primary_key_or_order_by_arguments.size()); + + for (auto & column : primary_key_or_order_by_arguments) + answer.push_back(column->getColumnName()); + + return answer; +} + static ShardPriority getReplicasPriority(const Cluster::Addresses & replicas, const std::string & local_hostname, UInt8 random) diff --git a/dbms/programs/copier/ShardPartitionPiece.h b/dbms/programs/copier/ShardPartitionPiece.h index 28f17a4054192e39aaffa28385bfe82b80b69ea7..303407d1d5b6614af316241b062d54ea86d2157a 100644 --- a/dbms/programs/copier/ShardPartitionPiece.h +++ b/dbms/programs/copier/ShardPartitionPiece.h @@ -36,7 +36,7 @@ struct ShardPartitionPiece { String ShardPartitionPiece::getPartitionPiecePath() const { - return shard_partition.getPartitionPath() + "/piece" + std::to_string(current_piece_number); + return shard_partition.getPartitionPath() + "/piece_" + std::to_string(current_piece_number); } String ShardPartitionPiece::getPartitionPieceCleanStartPath() const diff --git a/dbms/programs/copier/TaskCluster.h b/dbms/programs/copier/TaskCluster.h index 3ed7d610d179ba23d5ed381daee7cb6f5273e707..0df6f7e6a6b3cc751e89654816ef5ff34b7bb648 100644 --- a/dbms/programs/copier/TaskCluster.h +++ b/dbms/programs/copier/TaskCluster.h @@ -53,7 +53,7 @@ void TaskCluster::loadTasks(const Poco::Util::AbstractConfiguration &config, con Poco::Util::AbstractConfiguration::Keys tables_keys; config.keys(prefix + "tables", tables_keys); - for (const auto &table_key : tables_keys) { + for (const auto & table_key : tables_keys) { table_tasks.emplace_back(*this, config, prefix + "tables", table_key); } } diff --git a/dbms/programs/copier/TaskTable.h b/dbms/programs/copier/TaskTable.h index c8c292ea3a2662cb56e4f80d3877c47ea70047ee..10aaf9334a6b708217e06266b23e0ad6b261062d 100644 --- a/dbms/programs/copier/TaskTable.h +++ b/dbms/programs/copier/TaskTable.h @@ -2,26 +2,50 @@ #include "Internals.h" #include "TaskCluster.h" +#include "ext/range.h" namespace DB { struct TaskTable { - TaskTable(TaskCluster &parent, const Poco::Util::AbstractConfiguration &config, const String &prefix, - const String &table_key); + TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config, const String & prefix, + const String & table_key); - TaskCluster &task_cluster; + TaskCluster & task_cluster; - String getPartitionPath(const String &partition_name) const; + /// These functions used in checkPartitionIsDone() or checkPartitionPieceIsDone() + /// They are implemented here not to call task_table.tasks_shard[partition_name].second.pieces[current_piece_number] etc. - [[maybe_unused]] String - getPartitionPiecePath(const String &partition_name, size_t current_piece_number) const; + String getPartitionPath(const String & partition_name) const; - String getCertainPartitionIsDirtyPath(const String &partition_name) const; + [[maybe_unused]] String getPartitionPiecePath(const String & partition_name, const size_t piece_number) const; - String getCertainPartitionIsCleanedPath(const String &partition_name) const; + String getCertainPartitionIsDirtyPath(const String & partition_name) const; - String getCertainPartitionTaskStatusPath(const String &partition_name) const; + [[maybe_unused]] String getCertainPartitionPieceIsDirtyPath(const String & partition_name, const size_t piece_number) const + { + UNUSED(partition_name); + UNUSED(piece_number); + return "Not Implemented"; + } + + String getCertainPartitionIsCleanedPath(const String & partition_name) const; + + [[maybe_unused]] String getCertainPartitionPieceIsCleanedPath(const String & partition_name, const size_t piece_number) const + { + UNUSED(partition_name); + UNUSED(piece_number); + return "Not implemented"; + } + + String getCertainPartitionTaskStatusPath(const String & partition_name) const; + + [[maybe_unused]] String getCertainPartitionPieceTaskStatusPath(const String & partition_name, const size_t piece_number) const + { + UNUSED(partition_name); + UNUSED(piece_number); + return "Not implemented"; + } /// Partitions will be splitted into number-of-splits pieces. /// Each piece will be copied independently. (10 by default) @@ -32,6 +56,9 @@ struct TaskTable { /// Used as task ID String table_id; + /// Column names in primary key + String primary_key_comma_separated; + /// Source cluster and table String cluster_pull_name; DatabaseAndTableName table_pull; @@ -41,14 +68,37 @@ struct TaskTable { DatabaseAndTableName table_push; /// Storage of destination table + /// (tables that are stored on each shard of target cluster) String engine_push_str; ASTPtr engine_push_ast; ASTPtr engine_push_partition_key_ast; - /// A Distributed table definition used to split data + /* + * A Distributed table definition used to split data + * Distributed table will be created on each shard of default + * cluster to perform data copying and resharding + * */ String sharding_key_str; ASTPtr sharding_key_ast; - ASTPtr engine_split_ast; + ASTPtr main_engine_split_ast; + + + /* + * Auxuliary table engines used to perform partition piece copying. + * Each AST represent table engine for certatin piece number. + * After copying partition piece is Ok, this piece will be moved to the main + * target table. All this tables are stored on each shard as the main table. + * We have to use separate tables for partition pieces because of the atomicity of copying. + * Also if we want to move some partition to another table, the partition keys have to be the same. + * */ + + + /* + * To copy partiton piece form one cluster to another we have to use Distributed table. + * In case of usage separate table (engine_push) for each partiton piece, + * we have to use many Distributed tables. + * */ + ASTs auxiliary_engine_split_asts; /// Additional WHERE expression to filter input data String where_condition_str; @@ -63,9 +113,11 @@ struct TaskTable { Strings enabled_partitions; NameSet enabled_partitions_set; - /// Prioritized list of shards - /// all_shards contains information about all shards in the table. - /// So we have to check whether particular shard have current partiton or not while processing. + /** + * Prioritized list of shards + * all_shards contains information about all shards in the table. + * So we have to check whether particular shard have current partiton or not while processing. + */ TasksShard all_shards; TasksShard local_shards; @@ -76,7 +128,7 @@ struct TaskTable { /// Parition names to process in user-specified order Strings ordered_partition_names; - ClusterPartition &getClusterPartition(const String &partition_name) { + ClusterPartition & getClusterPartition(const String &partition_name) { auto it = cluster_partitions.find(partition_name); if (it == cluster_partitions.end()) throw Exception("There are no cluster partition " + partition_name + " in " + table_id, @@ -99,10 +151,10 @@ String TaskTable::getPartitionPath(const String &partition_name) const { + "/" + escapeForFileName(partition_name); // 201701 } -String TaskTable::getPartitionPiecePath(const String &partition_name, size_t current_piece_number) const { - assert(current_piece_number < number_of_splits); +String TaskTable::getPartitionPiecePath(const String & partition_name, size_t piece_number) const { + assert(piece_number < number_of_splits); return getPartitionPath(partition_name) + "/" + - std::to_string(current_piece_number); // 1...number_of_splits + std::to_string(piece_number); // 1...number_of_splits } String TaskTable::getCertainPartitionIsDirtyPath(const String &partition_name) const { @@ -131,9 +183,8 @@ String TaskShard::getHostNameExample() const { return replicas.at(0).readableString(); } -TaskTable::TaskTable(TaskCluster &parent, const Poco::Util::AbstractConfiguration &config, - const String &prefix_, - const String &table_key) +TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config, + const String & prefix_, const String & table_key) : task_cluster(parent) { String table_prefix = prefix_ + "." + table_key + "."; @@ -160,14 +211,26 @@ TaskTable::TaskTable(TaskCluster &parent, const Poco::Util::AbstractConfiguratio ParserStorage parser_storage; engine_push_ast = parseQuery(parser_storage, engine_push_str, 0); engine_push_partition_key_ast = extractPartitionKey(engine_push_ast); + primary_key_comma_separated = createCommaSeparatedStringFrom(extractPrimaryKeyString(engine_push_ast)); } sharding_key_str = config.getString(table_prefix + "sharding_key"); + + auxiliary_engine_split_asts.reserve(number_of_splits); { ParserExpressionWithOptionalAlias parser_expression(false); sharding_key_ast = parseQuery(parser_expression, sharding_key_str, 0); - engine_split_ast = createASTStorageDistributed(cluster_push_name, table_push.first, table_push.second, + main_engine_split_ast = createASTStorageDistributed(cluster_push_name, table_push.first, table_push.second, sharding_key_ast); + + for (const auto piece_number : ext::range(0, number_of_splits)) + { + auxiliary_engine_split_asts.emplace_back + ( + createASTStorageDistributed(cluster_push_name, table_push.first, + table_push.second + ".piece_" + toString(piece_number), sharding_key_ast) + ); + } } where_condition_str = config.getString(table_prefix + "where_condition", ""); @@ -209,7 +272,7 @@ TaskTable::TaskTable(TaskCluster &parent, const Poco::Util::AbstractConfiguratio template void TaskTable::initShards(RandomEngine &&random_engine) { - const String &fqdn_name = getFQDNOrHostName(); + const String & fqdn_name = getFQDNOrHostName(); std::uniform_int_distribution get_urand(0, std::numeric_limits::max()); // Compute the priority