diff --git a/dbms/include/DB/Common/BlockFilterCreator.h b/dbms/include/DB/Common/BlockFilterCreator.h deleted file mode 100644 index aa17ec04953e4a9579081ba0177d6298fc868185..0000000000000000000000000000000000000000 --- a/dbms/include/DB/Common/BlockFilterCreator.h +++ /dev/null @@ -1,64 +0,0 @@ -#pragma once - -#include -#include - -#include - -#if defined(__x86_64__) - #define LIBDIVIDE_USE_SSE2 1 -#endif - -#include - -namespace DB -{ - -template -struct BlockFilterCreator -{ - static std::vector perform(const size_t num_rows, const IColumn * column, - size_t num_shards, const std::vector & slots) - { - const auto total_weight = slots.size(); - std::vector filters(num_shards); - - /** Деление отрицательного числа с остатком на положительное, в C++ даёт отрицательный остаток. - * Для данной задачи это не подходит. Поэтому, будем обрабатывать знаковые типы как беззнаковые. - * Это даёт уже что-то совсем не похожее на деление с остатком, но подходящее для данной задачи. - */ - using UnsignedT = typename std::make_unsigned::type; - - /// const columns contain only one value, therefore we do not need to read it at every iteration - if (column->isConst()) - { - const auto data = typeid_cast *>(column)->getData(); - const auto shard_num = slots[static_cast(data) % total_weight]; - - for (size_t i = 0; i < num_shards; ++i) - filters[i].assign(num_rows, static_cast(shard_num == i)); - } - else - { - /// libdivide поддерживает только UInt32 или UInt64. - using TUInt32Or64 = typename std::conditional::type; - - libdivide::divider divider(total_weight); - - const auto & data = typeid_cast *>(column)->getData(); - - /// NOTE Может быть, стоит поменять местами циклы. - for (size_t i = 0; i < num_shards; ++i) - { - filters[i].resize(num_rows); - for (size_t j = 0; j < num_rows; ++j) - filters[i][j] = slots[ - static_cast(data[j]) - (static_cast(data[j]) / divider) * total_weight] == i; - } - } - - return filters; - } -}; - -} diff --git a/dbms/include/DB/Interpreters/Context.h b/dbms/include/DB/Interpreters/Context.h index f298d5faeba51ae901efc5b7c8d1de9f27973faf..bd036ca9cc89f38b859853ed1884b5baebe8a1bc 100644 --- a/dbms/include/DB/Interpreters/Context.h +++ b/dbms/include/DB/Interpreters/Context.h @@ -251,7 +251,6 @@ public: BackgroundProcessingPool & getBackgroundPool(); - void setReshardingWorker(std::shared_ptr resharding_worker); ReshardingWorker & getReshardingWorker(); /** Очистить кэши разжатых блоков и засечек. diff --git a/dbms/include/DB/Interpreters/InterpreterAlterQuery.h b/dbms/include/DB/Interpreters/InterpreterAlterQuery.h index 9e6417e507411f96970f47770fbd4009667cab35..15814622164fd4d31c9dd075dbf4c031376fb04b 100644 --- a/dbms/include/DB/Interpreters/InterpreterAlterQuery.h +++ b/dbms/include/DB/Interpreters/InterpreterAlterQuery.h @@ -55,7 +55,7 @@ private: /// Для RESHARD PARTITION. Field last_partition; WeightedZooKeeperPaths weighted_zookeeper_paths; - ASTPtr sharding_key_expr; + String sharding_key; static PartitionCommand dropPartition(const Field & partition, bool detach, bool unreplicated) { @@ -78,9 +78,9 @@ private: } static PartitionCommand reshardPartitions(const Field & first_partition_, const Field & last_partition_, - const WeightedZooKeeperPaths & weighted_zookeeper_paths_, const ASTPtr & sharding_key_expr) + const WeightedZooKeeperPaths & weighted_zookeeper_paths_, const String & sharding_key_) { - return {RESHARD_PARTITION, first_partition_, false, false, false, {}, last_partition_, weighted_zookeeper_paths_, sharding_key_expr}; + return {RESHARD_PARTITION, first_partition_, false, false, false, {}, last_partition_, weighted_zookeeper_paths_, sharding_key_}; } }; diff --git a/dbms/include/DB/Parsers/ASTAlterQuery.h b/dbms/include/DB/Parsers/ASTAlterQuery.h index eea05c73e1224336fbf0e08d0e23e6b7e1467972..4d6d9af30bf6409931911b0e46bdd178aad4967a 100644 --- a/dbms/include/DB/Parsers/ASTAlterQuery.h +++ b/dbms/include/DB/Parsers/ASTAlterQuery.h @@ -70,7 +70,7 @@ public: */ ASTPtr last_partition; ASTPtr weighted_zookeeper_paths; - ASTPtr sharding_key_expr; + String sharding_key; /// deep copy void clone(Parameters & p) const; diff --git a/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h b/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h index 081b51eed4a8529a0bebb11cfd78bb23295be7f4..2d6f20678f1a875ce988f4ec16dcf2351cde6c9c 100644 --- a/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h +++ b/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h @@ -1,14 +1,32 @@ #pragma once +#include + #include -#include -#include + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include + +#if defined(__x86_64__) + #define LIBDIVIDE_USE_SSE2 1 +#endif + +#include + namespace DB { -class StorageDistributed; - /** Запись асинхронная - данные сначала записываются на локальную файловую систему, а потом отправляются на удалённые серверы. * Если Distributed таблица использует более одного шарда, то для того, чтобы поддерживалась запись, * при создании таблицы должен быть указан дополнительный параметр у ENGINE - ключ шардирования. @@ -20,22 +38,198 @@ class StorageDistributed; class DistributedBlockOutputStream : public IBlockOutputStream { public: - DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast); + DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast) + : storage(storage), query_ast(query_ast) + { + } + + void write(const Block & block) override + { + if (storage.getShardingKeyExpr() && (storage.cluster.getShardsInfo().size() > 1)) + return writeSplit(block); - void write(const Block & block) override; + writeImpl(block); + } private: - std::vector createFilters(Block block); + template + static std::vector createFiltersImpl(const size_t num_rows, const IColumn * column, const Cluster & cluster) + { + const auto total_weight = cluster.slot_to_shard.size(); + const auto num_shards = cluster.getShardsInfo().size(); + std::vector filters(num_shards); - void writeSplit(const Block & block); + /** Деление отрицательного числа с остатком на положительное, в C++ даёт отрицательный остаток. + * Для данной задачи это не подходит. Поэтому, будем обрабатывать знаковые типы как беззнаковые. + * Это даёт уже что-то совсем не похожее на деление с остатком, но подходящее для данной задачи. + */ + using UnsignedT = typename std::make_unsigned::type; - void writeImpl(const Block & block, const size_t shard_id = 0); + /// const columns contain only one value, therefore we do not need to read it at every iteration + if (column->isConst()) + { + const auto data = typeid_cast *>(column)->getData(); + const auto shard_num = cluster.slot_to_shard[static_cast(data) % total_weight]; - void writeToLocal(const Block & block, const size_t repeats); + for (size_t i = 0; i < num_shards; ++i) + filters[i].assign(num_rows, static_cast(shard_num == i)); + } + else + { + /// libdivide поддерживает только UInt32 или UInt64. + using TUInt32Or64 = typename std::conditional::type; - void writeToShard(const Block & block, const std::vector & dir_names); + libdivide::divider divider(total_weight); + + const auto & data = typeid_cast *>(column)->getData(); + + /// NOTE Может быть, стоит поменять местами циклы. + for (size_t i = 0; i < num_shards; ++i) + { + filters[i].resize(num_rows); + for (size_t j = 0; j < num_rows; ++j) + filters[i][j] = cluster.slot_to_shard[ + static_cast(data[j]) - (static_cast(data[j]) / divider) * total_weight] == i; + } + } + + return filters; + } + + std::vector createFilters(Block block) + { + using create_filters_sig = std::vector(size_t, const IColumn *, const Cluster &); + /// hashmap of pointers to functions corresponding to each integral type + static std::unordered_map creators{ + { TypeName::get(), &createFiltersImpl }, + { TypeName::get(), &createFiltersImpl }, + { TypeName::get(), &createFiltersImpl }, + { TypeName::get(), &createFiltersImpl }, + { TypeName::get(), &createFiltersImpl }, + { TypeName::get(), &createFiltersImpl }, + { TypeName::get(), &createFiltersImpl }, + { TypeName::get(), &createFiltersImpl }, + }; + + storage.getShardingKeyExpr()->execute(block); + + const auto & key_column = block.getByName(storage.getShardingKeyColumnName()); + + /// check that key column has valid type + const auto it = creators.find(key_column.type->getName()); + + return it != std::end(creators) + ? (*it->second)(block.rowsInFirstColumn(), key_column.column.get(), storage.cluster) + : throw Exception{ + "Sharding key expression does not evaluate to an integer type", + ErrorCodes::TYPE_MISMATCH + }; + } + + void writeSplit(const Block & block) + { + const auto num_cols = block.columns(); + /// cache column pointers for later reuse + std::vector columns(num_cols); + for (size_t i = 0; i < columns.size(); ++i) + columns[i] = block.getByPosition(i).column; + + auto filters = createFilters(block); + + const auto num_shards = storage.cluster.getShardsInfo().size(); + + ssize_t size_hint = ((block.rowsInFirstColumn() + num_shards - 1) / num_shards) * 1.1; /// Число 1.1 выбрано наугад. + + for (size_t i = 0; i < num_shards; ++i) + { + auto target_block = block.cloneEmpty(); + + for (size_t col = 0; col < num_cols; ++col) + target_block.getByPosition(col).column = columns[col]->filter(filters[i], size_hint); + + if (target_block.rowsInFirstColumn()) + writeImpl(target_block, i); + } + } + + void writeImpl(const Block & block, const size_t shard_id = 0) + { + const auto & shard_info = storage.cluster.getShardsInfo()[shard_id]; + if (shard_info.getLocalNodeCount() > 0) + writeToLocal(block, shard_info.getLocalNodeCount()); + + /// dir_names is empty if shard has only local addresses + if (!shard_info.dir_names.empty()) + writeToShard(block, shard_info.dir_names); + } + + void writeToLocal(const Block & block, const size_t repeats) + { + InterpreterInsertQuery interp{query_ast, storage.context}; + + auto block_io = interp.execute(); + block_io.out->writePrefix(); + + for (size_t i = 0; i < repeats; ++i) + block_io.out->write(block); + + block_io.out->writeSuffix(); + } + + void writeToShard(const Block & block, const std::vector & dir_names) + { + /** tmp directory is used to ensure atomicity of transactions + * and keep monitor thread out from reading incomplete data + */ + std::string first_file_tmp_path{}; + + auto first = true; + const auto & query_string = queryToString(query_ast); + + /// write first file, hardlink the others + for (const auto & dir_name : dir_names) + { + const auto & path = storage.getPath() + dir_name + '/'; + + /// ensure shard subdirectory creation and notify storage + if (Poco::File(path).createDirectory()) + storage.requireDirectoryMonitor(dir_name); + + const auto & file_name = toString(Increment{path + "increment.txt"}.get(true)) + ".bin"; + const auto & block_file_path = path + file_name; + + /** on first iteration write block to a temporary directory for subsequent hardlinking to ensure + * the inode is not freed until we're done */ + if (first) + { + first = false; + + const auto & tmp_path = path + "tmp/"; + Poco::File(tmp_path).createDirectory(); + const auto & block_file_tmp_path = tmp_path + file_name; + + first_file_tmp_path = block_file_tmp_path; + + WriteBufferFromFile out{block_file_tmp_path}; + CompressedWriteBuffer compress{out}; + NativeBlockOutputStream stream{compress, Revision::get()}; + + writeStringBinary(query_string, out); + + stream.writePrefix(); + stream.write(block); + stream.writeSuffix(); + } + + if (link(first_file_tmp_path.data(), block_file_path.data())) + throwFromErrno("Could not link " + block_file_path + " to " + first_file_tmp_path); + } + + /** remove the temporary file, enabling the OS to reclaim inode after all threads + * have removed their corresponding files */ + Poco::File(first_file_tmp_path).remove(); + } -private: StorageDistributed & storage; ASTPtr query_ast; }; diff --git a/dbms/include/DB/Storages/IStorage.h b/dbms/include/DB/Storages/IStorage.h index 1f0c4eb40a993beff69cb9fa7774045774286cf0..4505ba552a55cefdad183e44aa7da460389e95bd 100644 --- a/dbms/include/DB/Storages/IStorage.h +++ b/dbms/include/DB/Storages/IStorage.h @@ -242,7 +242,7 @@ public: /** Выполнить запрос RESHARD PARTITION. */ virtual void reshardPartitions(const String & database_name, const Field & first_partition, const Field & last_partition, - const WeightedZooKeeperPaths & weighted_zookeeper_paths, const ASTPtr & sharding_key_expr, + const WeightedZooKeeperPaths & weighted_zookeeper_paths, const String & sharding_key, const Settings & settings) { throw Exception("Method reshardPartition is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED); diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeSharder.h b/dbms/include/DB/Storages/MergeTree/MergeTreeSharder.h index 7e422b5c4a8a5d9205fd5287fd626a0fcf066b6d..f41a0492d74ebce93c7f3c257a0b3711d5d24693 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeSharder.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeSharder.h @@ -56,8 +56,6 @@ private: const ReshardingJob & job; Logger * log; std::vector slots; - ExpressionActionsPtr sharding_key_expr; - std::string sharding_key_column_name; }; } diff --git a/dbms/include/DB/Storages/MergeTree/RemoteDiskSpaceMonitor.h b/dbms/include/DB/Storages/MergeTree/RemoteDiskSpaceMonitor.h index c6a2bed618962c607c8b7215c1226109b6124959..45689a9f608239b498c099a5c05d2c4035ea132b 100644 --- a/dbms/include/DB/Storages/MergeTree/RemoteDiskSpaceMonitor.h +++ b/dbms/include/DB/Storages/MergeTree/RemoteDiskSpaceMonitor.h @@ -7,8 +7,6 @@ namespace DB { -class Context; - namespace RemoteDiskSpaceMonitor { @@ -17,14 +15,14 @@ namespace RemoteDiskSpaceMonitor class Service final : public InterserverIOEndpoint { public: - Service(const Context & context_); + Service(const std::string & path_); Service(const Service &) = delete; Service & operator=(const Service &) = delete; std::string getId(const std::string & node_id) const override; void processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) override; private: - const Context & context; + const std::string path; }; /** Клиент для получения информации о свободном месте на удалённом диске. @@ -35,7 +33,7 @@ public: Client() = default; Client(const Client &) = delete; Client & operator=(const Client &) = delete; - size_t getFreeSpace(const InterserverIOEndpointLocation & location) const; + size_t getFreeDiskSpace(const InterserverIOEndpointLocation & location) const; void cancel() { is_cancelled = true; } private: diff --git a/dbms/include/DB/Storages/MergeTree/ReshardingJob.h b/dbms/include/DB/Storages/MergeTree/ReshardingJob.h index e22157620fc20fc03fe11222817be16de014e7d5..927a27abef3cb8257cecb416386722d09fdcc759 100644 --- a/dbms/include/DB/Storages/MergeTree/ReshardingJob.h +++ b/dbms/include/DB/Storages/MergeTree/ReshardingJob.h @@ -16,7 +16,7 @@ public: ReshardingJob(const std::string & database_name_, const std::string & table_name_, const std::string & partition_, const WeightedZooKeeperPaths & paths_, - const ASTPtr & sharding_key_expr_); + const std::string & sharding_key_); ReshardingJob(const ReshardingJob &) = delete; ReshardingJob & operator=(const ReshardingJob &) = delete; @@ -29,7 +29,7 @@ public: std::string table_name; std::string partition; WeightedZooKeeperPaths paths; - ASTPtr sharding_key_expr; + std::string sharding_key; }; } diff --git a/dbms/include/DB/Storages/MergeTree/ReshardingWorker.h b/dbms/include/DB/Storages/MergeTree/ReshardingWorker.h index 984d116fed0141a27cf4077d02e7e468dbbfd70b..35874ae5a2fbe958429b26403a4a45a4b9e6d2fe 100644 --- a/dbms/include/DB/Storages/MergeTree/ReshardingWorker.h +++ b/dbms/include/DB/Storages/MergeTree/ReshardingWorker.h @@ -1,9 +1,7 @@ #pragma once -#include #include #include -#include #include #include #include @@ -24,8 +22,7 @@ class ReshardingJob; class ReshardingWorker final { public: - ReshardingWorker(const Poco::Util::AbstractConfiguration & config, - const std::string & config_name, Context & context_); + ReshardingWorker(Context & context_); ReshardingWorker(const ReshardingWorker &) = delete; ReshardingWorker & operator=(const ReshardingWorker &) = delete; @@ -40,12 +37,18 @@ public: const std::string & table_name, const std::string & partition, const WeightedZooKeeperPaths & weighted_zookeeper_paths, - const ASTPtr & sharding_key_expr); + const std::string & sharding_key); + + /// Прислать запрос на перешардирование. + void submitJob(const ReshardingJob & job); /// Был ли поток запущен? bool isStarted() const; private: + /// Прислать запрос на перешардирование (внутренняя версия). + void submitJobImpl(const std::string & serialized_job); + /// Следить за появлением новых задач. Выполнить их последовательно. void pollAndExecute(); @@ -78,17 +81,18 @@ private: /// Принудительно завершить поток. void abortIfRequested() const; + /// Был ли поток завершён? + bool hasAborted(const Exception & ex) const; + private: Context & context; Logger * log; - std::unique_ptr merger; std::thread polling_thread; - mutable std::mutex cancel_mutex; std::string host_task_queue_path; std::atomic is_started{false}; std::atomic must_stop{false}; }; -using ReshardingWorkerPtr = std::shared_ptr; +using ReshardingWorkerPtr = Poco::SharedPtr; } diff --git a/dbms/include/DB/Storages/MergeTree/ShardedPartitionSender.h b/dbms/include/DB/Storages/MergeTree/ShardedPartitionSender.h index bff2d5a20c08a0fa435f592ed8525eccbd2faae9..1f345b9a72151c89c59cbff09b0db080a4e34307 100644 --- a/dbms/include/DB/Storages/MergeTree/ShardedPartitionSender.h +++ b/dbms/include/DB/Storages/MergeTree/ShardedPartitionSender.h @@ -2,7 +2,6 @@ #include #include -#include namespace DB { @@ -25,7 +24,6 @@ public: private: StorageReplicatedMergeTree & storage; - Logger * log; }; /** Клиент для отправления кусков из партиции таблицы *MergeTree. @@ -33,7 +31,7 @@ private: class Client final { public: - Client(); + Client() = default; Client(const Client &) = delete; Client & operator=(const Client &) = delete; bool send(const InterserverIOEndpointLocation & to_location, const InterserverIOEndpointLocation & from_location, @@ -42,7 +40,6 @@ public: private: std::atomic is_cancelled{false}; - Logger * log; }; } diff --git a/dbms/include/DB/Storages/StorageDistributed.h b/dbms/include/DB/Storages/StorageDistributed.h index d7d9676dad1f77e1de56cb62c6fcb38b1faad89d..8a031688f33b9a40dc6d673e0196c14254fedc18 100644 --- a/dbms/include/DB/Storages/StorageDistributed.h +++ b/dbms/include/DB/Storages/StorageDistributed.h @@ -77,7 +77,7 @@ public: void shutdown() override; void reshardPartitions(const String & database_name, const Field & first_partition, const Field & last_partition, - const WeightedZooKeeperPaths & weighted_zookeeper_paths, const ASTPtr & sharding_key_expr, + const WeightedZooKeeperPaths & weighted_zookeeper_paths, const String & sharding_key, const Settings & settings) override; /// От каждой реплики получить описание соответствующей локальной таблицы. diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index b69da7d0f4532534be39f011bb2c6ae5d47db2f3..5b37efadf19eec631e613572ec716afd4f514306 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -136,7 +136,7 @@ public: void fetchPartition(const Field & partition, const String & from, const Settings & settings) override; void freezePartition(const Field & partition, const Settings & settings) override; void reshardPartitions(const String & database_name, const Field & first_partition, const Field & last_partition, - const WeightedZooKeeperPaths & weighted_zookeeper_paths, const ASTPtr & sharding_key_expr, + const WeightedZooKeeperPaths & weighted_zookeeper_paths, const String & sharding_key, const Settings & settings) override; /** Удаляет реплику из ZooKeeper. Если других реплик нет, удаляет всю таблицу из ZooKeeper. @@ -257,7 +257,7 @@ private: MergeTreeDataMerger merger; DataPartsExchange::Fetcher fetcher; - RemoteDiskSpaceMonitor::Client disk_space_monitor_client; + RemoteDiskSpaceMonitor::Client free_disk_space_checker; ShardedPartitionSender::Client sharded_partition_sender_client; RemoteQueryExecutor::Client remote_query_executor_client; diff --git a/dbms/src/Core/ErrorCodes.cpp b/dbms/src/Core/ErrorCodes.cpp index 6bf7a8fb712cf2c5d150de59958822b273eb8c3f..d261b50028293e1ed566711e12c9f3c4bca462a1 100644 --- a/dbms/src/Core/ErrorCodes.cpp +++ b/dbms/src/Core/ErrorCodes.cpp @@ -317,12 +317,13 @@ namespace ErrorCodes extern const int INSUFFICIENT_SPACE_FOR_RESHARDING = 311; extern const int PARTITION_COPY_FAILED = 312; extern const int PARTITION_ATTACH_FAILED = 313; - extern const int RESHARDING_NO_WORKER = 314; - extern const int INVALID_PARTITIONS_INTERVAL = 315; - extern const int RESHARDING_INVALID_PARAMETERS = 316; - extern const int INVALID_SHARD_WEIGHT = 317; - extern const int INVALID_CONFIG_PARAMETER = 318; - extern const int UNKNOWN_STATUS_OF_INSERT = 319; + extern const int RESHARDING_CLEANUP_FAILED = 314; + extern const int RESHARDING_NO_WORKER = 315; + extern const int INVALID_PARTITIONS_INTERVAL = 316; + extern const int RESHARDING_INVALID_PARAMETERS = 317; + extern const int INVALID_SHARD_WEIGHT = 318; + extern const int SHARD_DOESNT_REFERENCE_TABLE = 319; + extern const int UNKNOWN_STATUS_OF_INSERT = 320; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index c66d21b5ef85b280e36e03f5df4e8c937fd30145..cbcb2cc5fd3c2d2551f7faa357f09afd50d1993e 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -822,19 +822,16 @@ BackgroundProcessingPool & Context::getBackgroundPool() return *shared->background_pool; } -void Context::setReshardingWorker(std::shared_ptr resharding_worker) -{ - Poco::ScopedLock lock(shared->mutex); - if (shared->resharding_worker) - throw Exception("Resharding background thread has already been set.", ErrorCodes::LOGICAL_ERROR); - shared->resharding_worker = resharding_worker; -} - ReshardingWorker & Context::getReshardingWorker() { Poco::ScopedLock lock(shared->mutex); + + if (!shared->zookeeper) + throw Exception("Resharding background processing requires ZooKeeper", ErrorCodes::LOGICAL_ERROR); + if (!shared->resharding_worker) - throw Exception("Resharding background thread not set.", ErrorCodes::LOGICAL_ERROR); + shared->resharding_worker = new ReshardingWorker(*this); + return *shared->resharding_worker; } diff --git a/dbms/src/Interpreters/InterpreterAlterQuery.cpp b/dbms/src/Interpreters/InterpreterAlterQuery.cpp index 8602495420a4e04a904cebefba701c02ecfb562c..c91f74d97f0f5c68568985e861e82fc021e95b4b 100644 --- a/dbms/src/Interpreters/InterpreterAlterQuery.cpp +++ b/dbms/src/Interpreters/InterpreterAlterQuery.cpp @@ -67,7 +67,7 @@ BlockIO InterpreterAlterQuery::execute() break; case PartitionCommand::RESHARD_PARTITION: - table->reshardPartitions(database_name, command.partition, command.last_partition, command.weighted_zookeeper_paths, command.sharding_key_expr, context.getSettingsRef()); + table->reshardPartitions(database_name, command.partition, command.last_partition, command.weighted_zookeeper_paths, command.sharding_key, context.getSettingsRef()); break; default: @@ -190,8 +190,9 @@ void InterpreterAlterQuery::parseAlter( weighted_zookeeper_paths.emplace_back(weighted_zookeeper_path.path, weighted_zookeeper_path.weight); } - out_partition_commands.push_back(PartitionCommand::reshardPartitions( - first_partition, last_partition, weighted_zookeeper_paths, params.sharding_key_expr)); + const auto & sharding_key = params.sharding_key; + + out_partition_commands.push_back(PartitionCommand::reshardPartitions(first_partition, last_partition, weighted_zookeeper_paths, sharding_key)); } else throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR); diff --git a/dbms/src/Parsers/ASTAlterQuery.cpp b/dbms/src/Parsers/ASTAlterQuery.cpp index c490fcec3676a8b626af08190f1093d9cb5f0c7b..54df2d547aff3373e1a07d31543ce9c477ec4156 100644 --- a/dbms/src/Parsers/ASTAlterQuery.cpp +++ b/dbms/src/Parsers/ASTAlterQuery.cpp @@ -19,7 +19,6 @@ void ASTAlterQuery::Parameters::clone(Parameters & p) const if (partition) p.partition = partition->clone(); if (last_partition) p.last_partition = last_partition->clone(); if (weighted_zookeeper_paths) p.weighted_zookeeper_paths = weighted_zookeeper_paths->clone(); - if (sharding_key_expr) p.sharding_key_expr = sharding_key_expr->clone(); } void ASTAlterQuery::addParameters(const Parameters & params) @@ -35,8 +34,6 @@ void ASTAlterQuery::addParameters(const Parameters & params) children.push_back(params.last_partition); if (params.weighted_zookeeper_paths) children.push_back(params.weighted_zookeeper_paths); - if (params.sharding_key_expr) - children.push_back(params.sharding_key_expr); } ASTAlterQuery::ASTAlterQuery(StringRange range_) : IAST(range_) @@ -156,9 +153,8 @@ void ASTAlterQuery::formatImpl(const FormatSettings & settings, FormatState & st settings.ostr << settings.nl_or_ws; settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str - << "USING " << (settings.hilite ? hilite_none : ""); - - p.sharding_key_expr->formatImpl(settings, state, frame); + << "USING " << (settings.hilite ? hilite_none : "") + << p.sharding_key; } else throw Exception("Unexpected type of ALTER", ErrorCodes::UNEXPECTED_AST_STRUCTURE); diff --git a/dbms/src/Parsers/ParserAlterQuery.cpp b/dbms/src/Parsers/ParserAlterQuery.cpp index d0910e418c5752e6c7c0997e2b8e2d6d47854d1f..7db11eb8a3d20b793360c92ee3edb506351425b9 100644 --- a/dbms/src/Parsers/ParserAlterQuery.cpp +++ b/dbms/src/Parsers/ParserAlterQuery.cpp @@ -255,7 +255,7 @@ bool ParserAlterQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_pa else if (s_reshard.ignore(pos, end, max_parsed_pos, expected)) { ParserList weighted_zookeeper_paths_p(ParserPtr(new ParserWeightedZooKeeperPath), ParserPtr(new ParserString(",")), false); - ParserExpressionWithOptionalAlias parser_sharding_key_expr(false); + ParserIdentifier sharding_key_parser; ws.ignore(pos, end); @@ -294,9 +294,12 @@ bool ParserAlterQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_pa ws.ignore(pos, end); - if (!parser_sharding_key_expr.parse(pos, end, params.sharding_key_expr, max_parsed_pos, expected)) + ASTPtr ast_sharding_key; + if (!sharding_key_parser.parse(pos, end, ast_sharding_key, max_parsed_pos, expected)) return false; + params.sharding_key = typeid_cast(*ast_sharding_key).name; + ws.ignore(pos, end); params.type = ASTAlterQuery::RESHARD_PARTITION; diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index fabf16a0d84f1780d45830d5934d8d4e7921425c..d9d107cd60013421ba424e85e8a7c859976dde89 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -320,11 +320,14 @@ int Server::main(const std::vector & args) global_context->setCurrentDatabase(config().getString("default_database", "default")); - if (has_zookeeper && config().has("resharding")) + if (has_zookeeper) { - auto resharding_worker = std::make_shared(config(), "resharding", *global_context); - global_context->setReshardingWorker(resharding_worker); - resharding_worker->start(); + zkutil::ZooKeeperPtr zookeeper = global_context->getZooKeeper(); + if (!zookeeper->getTaskQueuePath().empty()) + { + auto & resharding_worker = global_context->getReshardingWorker(); + resharding_worker.start(); + } } SCOPE_EXIT( diff --git a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp deleted file mode 100644 index cfc984b280a9f5da7474caeee49f9b91d03131a6..0000000000000000000000000000000000000000 --- a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ /dev/null @@ -1,180 +0,0 @@ -#include -#include - -#include - -#include -#include -#include -#include -#include -#include - -#include -#include -#include - -#include - -namespace DB -{ - -namespace -{ - -template -std::vector createFiltersImpl(const size_t num_rows, const IColumn * column, const Cluster & cluster) -{ - return BlockFilterCreator::perform(num_rows, column, cluster.getShardsInfo().size(), cluster.slot_to_shard); -} - -} - -DistributedBlockOutputStream::DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast) - : storage(storage), query_ast(query_ast) -{ -} - -void DistributedBlockOutputStream::write(const Block & block) -{ - if (storage.getShardingKeyExpr() && (storage.cluster.getShardsInfo().size() > 1)) - return writeSplit(block); - - writeImpl(block); -} - -std::vector DistributedBlockOutputStream::createFilters(Block block) -{ - using create_filters_sig = std::vector(size_t, const IColumn *, const Cluster &); - /// hashmap of pointers to functions corresponding to each integral type - static std::unordered_map creators{ - { TypeName::get(), &createFiltersImpl }, - { TypeName::get(), &createFiltersImpl }, - { TypeName::get(), &createFiltersImpl }, - { TypeName::get(), &createFiltersImpl }, - { TypeName::get(), &createFiltersImpl }, - { TypeName::get(), &createFiltersImpl }, - { TypeName::get(), &createFiltersImpl }, - { TypeName::get(), &createFiltersImpl }, - }; - - storage.getShardingKeyExpr()->execute(block); - - const auto & key_column = block.getByName(storage.getShardingKeyColumnName()); - - /// check that key column has valid type - const auto it = creators.find(key_column.type->getName()); - - return it != std::end(creators) - ? (*it->second)(block.rowsInFirstColumn(), key_column.column.get(), storage.cluster) - : throw Exception{ - "Sharding key expression does not evaluate to an integer type", - ErrorCodes::TYPE_MISMATCH - }; -} - -void DistributedBlockOutputStream::writeSplit(const Block & block) -{ - const auto num_cols = block.columns(); - /// cache column pointers for later reuse - std::vector columns(num_cols); - for (size_t i = 0; i < columns.size(); ++i) - columns[i] = block.getByPosition(i).column; - - auto filters = createFilters(block); - - const auto num_shards = storage.cluster.getShardsInfo().size(); - - ssize_t size_hint = ((block.rowsInFirstColumn() + num_shards - 1) / num_shards) * 1.1; /// Число 1.1 выбрано наугад. - - for (size_t i = 0; i < num_shards; ++i) - { - auto target_block = block.cloneEmpty(); - - for (size_t col = 0; col < num_cols; ++col) - target_block.getByPosition(col).column = columns[col]->filter(filters[i], size_hint); - - if (target_block.rowsInFirstColumn()) - writeImpl(target_block, i); - } -} - -void DistributedBlockOutputStream::writeImpl(const Block & block, const size_t shard_id) -{ - const auto & shard_info = storage.cluster.getShardsInfo()[shard_id]; - if (shard_info.getLocalNodeCount() > 0) - writeToLocal(block, shard_info.getLocalNodeCount()); - - /// dir_names is empty if shard has only local addresses - if (!shard_info.dir_names.empty()) - writeToShard(block, shard_info.dir_names); -} - -void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_t repeats) -{ - InterpreterInsertQuery interp{query_ast, storage.context}; - - auto block_io = interp.execute(); - block_io.out->writePrefix(); - - for (size_t i = 0; i < repeats; ++i) - block_io.out->write(block); - - block_io.out->writeSuffix(); -} - -void DistributedBlockOutputStream::writeToShard(const Block & block, const std::vector & dir_names) -{ - /** tmp directory is used to ensure atomicity of transactions - * and keep monitor thread out from reading incomplete data - */ - std::string first_file_tmp_path{}; - - auto first = true; - const auto & query_string = queryToString(query_ast); - - /// write first file, hardlink the others - for (const auto & dir_name : dir_names) - { - const auto & path = storage.getPath() + dir_name + '/'; - - /// ensure shard subdirectory creation and notify storage - if (Poco::File(path).createDirectory()) - storage.requireDirectoryMonitor(dir_name); - - const auto & file_name = toString(Increment{path + "increment.txt"}.get(true)) + ".bin"; - const auto & block_file_path = path + file_name; - - /** on first iteration write block to a temporary directory for subsequent hardlinking to ensure - * the inode is not freed until we're done */ - if (first) - { - first = false; - - const auto & tmp_path = path + "tmp/"; - Poco::File(tmp_path).createDirectory(); - const auto & block_file_tmp_path = tmp_path + file_name; - - first_file_tmp_path = block_file_tmp_path; - - WriteBufferFromFile out{block_file_tmp_path}; - CompressedWriteBuffer compress{out}; - NativeBlockOutputStream stream{compress, Revision::get()}; - - writeStringBinary(query_string, out); - - stream.writePrefix(); - stream.write(block); - stream.writeSuffix(); - } - - if (link(first_file_tmp_path.data(), block_file_path.data())) - throwFromErrno("Could not link " + block_file_path + " to " + first_file_tmp_path); - } - - /** remove the temporary file, enabling the OS to reclaim inode after all threads - * have removed their corresponding files */ - Poco::File(first_file_tmp_path).remove(); -} - -} diff --git a/dbms/src/Storages/MergeTree/MergeTreeSharder.cpp b/dbms/src/Storages/MergeTree/MergeTreeSharder.cpp index 8483c8336f8b2a2d0cbc259a51d2074eaa15aa4c..2b42baa7057c16970814aa2a07bf0a703647811c 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeSharder.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeSharder.cpp @@ -4,8 +4,6 @@ #include #include #include -#include -#include #include @@ -18,6 +16,47 @@ namespace ErrorCodes extern const int TYPE_MISMATCH; } +namespace +{ + +template +std::vector createFiltersImpl(const size_t num_rows, const IColumn * column, size_t num_shards, const std::vector & slots) +{ + const auto total_weight = slots.size(); + std::vector filters(num_shards); + + /** Деление отрицательного числа с остатком на положительное, в C++ даёт отрицательный остаток. + * Для данной задачи это не подходит. Поэтому, будем обрабатывать знаковые типы как беззнаковые. + * Это даёт уже что-то совсем не похожее на деление с остатком, но подходящее для данной задачи. + */ + using UnsignedT = typename std::make_unsigned::type; + + /// const columns contain only one value, therefore we do not need to read it at every iteration + if (column->isConst()) + { + const auto data = typeid_cast *>(column)->getData(); + const auto shard_num = slots[static_cast(data) % total_weight]; + + for (size_t i = 0; i < num_shards; ++i) + filters[i].assign(num_rows, static_cast(shard_num == i)); + } + else + { + const auto & data = typeid_cast *>(column)->getData(); + + for (size_t i = 0; i < num_shards; ++i) + { + filters[i].resize(num_rows); + for (size_t j = 0; j < num_rows; ++j) + filters[i][j] = slots[static_cast(data[j]) % total_weight] == i; + } + } + + return filters; +} + +} + ShardedBlockWithDateInterval::ShardedBlockWithDateInterval(const Block & block_, size_t shard_no_, UInt16 min_date_, UInt16 max_date_) : block(block_), shard_no(shard_no_), min_date(min_date_), max_date(max_date_) @@ -25,9 +64,7 @@ ShardedBlockWithDateInterval::ShardedBlockWithDateInterval(const Block & block_, } MergeTreeSharder::MergeTreeSharder(MergeTreeData & data_, const ReshardingJob & job_) - : data(data_), job(job_), log(&Logger::get(data.getLogName() + " (Sharder)")), - sharding_key_expr(ExpressionAnalyzer(job.sharding_key_expr, data.context, nullptr, data.getColumnsList()).getActions(false)), - sharding_key_column_name(job.sharding_key_expr->getColumnName()) + : data(data_), job(job_), log(&Logger::get(data.getLogName() + " (Sharder)")) { for (size_t shard_no = 0; shard_no < job.paths.size(); ++shard_no) { @@ -164,19 +201,19 @@ std::vector MergeTreeSharder::createFilters(Block block) using create_filters_sig = std::vector(size_t, const IColumn *, size_t num_shards, const std::vector & slots); /// hashmap of pointers to functions corresponding to each integral type static std::unordered_map creators{ - { TypeName::get(), &BlockFilterCreator::perform }, - { TypeName::get(), &BlockFilterCreator::perform }, - { TypeName::get(), &BlockFilterCreator::perform }, - { TypeName::get(), &BlockFilterCreator::perform }, - { TypeName::get(), &BlockFilterCreator::perform }, - { TypeName::get(), &BlockFilterCreator::perform }, - { TypeName::get(), &BlockFilterCreator::perform }, - { TypeName::get(), &BlockFilterCreator::perform }, + { TypeName::get(), &createFiltersImpl }, + { TypeName::get(), &createFiltersImpl }, + { TypeName::get(), &createFiltersImpl }, + { TypeName::get(), &createFiltersImpl }, + { TypeName::get(), &createFiltersImpl }, + { TypeName::get(), &createFiltersImpl }, + { TypeName::get(), &createFiltersImpl }, + { TypeName::get(), &createFiltersImpl }, }; - sharding_key_expr->execute(block); + data.getPrimaryExpression()->execute(block); - const auto & key_column = block.getByName(sharding_key_column_name); + const auto & key_column = block.getByName(job.sharding_key); /// check that key column has valid type const auto it = creators.find(key_column.type->getName()); diff --git a/dbms/src/Storages/MergeTree/RemoteDiskSpaceMonitor.cpp b/dbms/src/Storages/MergeTree/RemoteDiskSpaceMonitor.cpp index 382261d9cbdacc9ef24c1c2e226ec60cfd625bba..8a3392a6c812351b9e535f7362b3db1ba9983f60 100644 --- a/dbms/src/Storages/MergeTree/RemoteDiskSpaceMonitor.cpp +++ b/dbms/src/Storages/MergeTree/RemoteDiskSpaceMonitor.cpp @@ -1,6 +1,5 @@ #include #include -#include #include #include #include @@ -26,8 +25,8 @@ std::string getEndpointId(const std::string & node_id) } -Service::Service(const Context & context_) - : context(context_) +Service::Service(const String & path_) + : path(path_) { } @@ -41,12 +40,12 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out if (is_cancelled) throw Exception("RemoteDiskSpaceMonitor service terminated", ErrorCodes::ABORTED); - size_t free_space = DiskSpaceMonitor::getUnreservedFreeSpace(context.getPath()); + size_t free_space = DiskSpaceMonitor::getUnreservedFreeSpace(path); writeBinary(free_space, out); out.next(); } -size_t Client::getFreeSpace(const InterserverIOEndpointLocation & location) const +size_t Client::getFreeDiskSpace(const InterserverIOEndpointLocation & location) const { ReadBufferFromHTTP::Params params = { diff --git a/dbms/src/Storages/MergeTree/ReshardingJob.cpp b/dbms/src/Storages/MergeTree/ReshardingJob.cpp index fa2a23fb5ae3160e3003279a451f9346bd082eff..9effb2e233e9b21be1566c682627eb61349579cd 100644 --- a/dbms/src/Storages/MergeTree/ReshardingJob.cpp +++ b/dbms/src/Storages/MergeTree/ReshardingJob.cpp @@ -3,17 +3,10 @@ #include #include #include -#include -#include namespace DB { -namespace ErrorCodes -{ - extern const int LOGICAL_ERROR; -} - ReshardingJob::ReshardingJob(const std::string & serialized_job) { ReadBufferFromString buf(serialized_job); @@ -21,19 +14,7 @@ ReshardingJob::ReshardingJob(const std::string & serialized_job) readBinary(database_name, buf); readBinary(table_name, buf); readBinary(partition, buf); - - std::string expr; - readBinary(expr, buf); - - IParser::Pos pos = expr.data(); - IParser::Pos max_parsed_pos = pos; - const char * end = pos + expr.size(); - - ParserExpressionWithOptionalAlias parser(false); - Expected expected = ""; - if (!parser.parse(pos, end, sharding_key_expr, max_parsed_pos, expected)) - throw Exception("ReshardingJob: Internal error", ErrorCodes::LOGICAL_ERROR); - + readBinary(sharding_key, buf); while (!buf.eof()) { std::string path; @@ -48,12 +29,12 @@ ReshardingJob::ReshardingJob(const std::string & serialized_job) ReshardingJob::ReshardingJob(const std::string & database_name_, const std::string & table_name_, const std::string & partition_, const WeightedZooKeeperPaths & paths_, - const ASTPtr & sharding_key_expr_) + const std::string & sharding_key_) : database_name(database_name_), table_name(table_name_), partition(partition_), paths(paths_), - sharding_key_expr(sharding_key_expr_) + sharding_key(sharding_key_) { } @@ -65,8 +46,7 @@ std::string ReshardingJob::toString() const writeBinary(database_name, buf); writeBinary(table_name, buf); writeBinary(partition, buf); - writeBinary(queryToString(sharding_key_expr), buf); - + writeBinary(sharding_key, buf); for (const auto & path : paths) { writeBinary(path.first, buf); diff --git a/dbms/src/Storages/MergeTree/ReshardingWorker.cpp b/dbms/src/Storages/MergeTree/ReshardingWorker.cpp index 4d1b1c8b3e5f9e71627bc814489137972d3f9d1a..02ef39f6035ab56c3af5c3348ca0d460d3607852 100644 --- a/dbms/src/Storages/MergeTree/ReshardingWorker.cpp +++ b/dbms/src/Storages/MergeTree/ReshardingWorker.cpp @@ -11,7 +11,6 @@ #include #include #include -#include #include #include #include @@ -31,8 +30,7 @@ namespace ErrorCodes extern const int UNEXPECTED_ZOOKEEPER_ERROR; extern const int PARTITION_COPY_FAILED; extern const int PARTITION_ATTACH_FAILED; - extern const int UNKNOWN_ELEMENT_IN_CONFIG; - extern const int INVALID_CONFIG_PARAMETER; + extern const int RESHARDING_CLEANUP_FAILED; } namespace @@ -54,52 +52,17 @@ std::string createMergedPartName(const MergeTreeData::DataPartsVector & parts) return ActiveDataPartSet::getPartName(left_date, right_date, parts.front()->left, parts.back()->right, level + 1); } -class Arguments final -{ -public: - Arguments(const Poco::Util::AbstractConfiguration & config, const std::string & config_name) - { - Poco::Util::AbstractConfiguration::Keys keys; - config.keys(config_name, keys); - for (const auto & key : keys) - { - if (key == "task_queue_path") - { - task_queue_path = config.getString(config_name + "." + key); - if (task_queue_path.empty()) - throw Exception("Invalid parameter in resharding configuration", ErrorCodes::INVALID_CONFIG_PARAMETER); - } - else - throw Exception("Unknown parameter in resharding configuration", ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); - } - } - - Arguments(const Arguments &) = delete; - Arguments & operator=(const Arguments &) = delete; - - std::string getTaskQueuePath() const - { - return task_queue_path; - } - -private: - std::string task_queue_path; -}; - } -ReshardingWorker::ReshardingWorker(const Poco::Util::AbstractConfiguration & config, - const std::string & config_name, Context & context_) +ReshardingWorker::ReshardingWorker(Context & context_) : context(context_), log(&Logger::get("ReshardingWorker")) { - Arguments arguments(config, config_name); - auto zookeeper = context.getZooKeeper(); host_task_queue_path = "/clickhouse"; zookeeper->createIfNotExists(host_task_queue_path, ""); - host_task_queue_path += "/" + arguments.getTaskQueuePath(); + host_task_queue_path += "/" + zookeeper->getTaskQueuePath(); zookeeper->createIfNotExists(host_task_queue_path, ""); host_task_queue_path += "/resharding"; @@ -112,11 +75,6 @@ ReshardingWorker::ReshardingWorker(const Poco::Util::AbstractConfiguration & con ReshardingWorker::~ReshardingWorker() { must_stop = true; - { - std::lock_guard guard(cancel_mutex); - if (merger) - merger->cancel(); - } if (polling_thread.joinable()) polling_thread.join(); } @@ -130,12 +88,16 @@ void ReshardingWorker::submitJob(const std::string & database_name, const std::string & table_name, const std::string & partition, const WeightedZooKeeperPaths & weighted_zookeeper_paths, - const ASTPtr & sharding_key_expr) + const std::string & sharding_key) { - auto serialized_job = ReshardingJob(database_name, table_name, partition, weighted_zookeeper_paths, sharding_key_expr).toString(); - auto zookeeper = context.getZooKeeper(); - (void) zookeeper->create(host_task_queue_path + "/task-", serialized_job, - zkutil::CreateMode::PersistentSequential); + auto str = ReshardingJob(database_name, table_name, partition, weighted_zookeeper_paths, sharding_key).toString(); + submitJobImpl(str); +} + +void ReshardingWorker::submitJob(const ReshardingJob & job) +{ + auto str = job.toString(); + submitJobImpl(str); } bool ReshardingWorker::isStarted() const @@ -143,18 +105,23 @@ bool ReshardingWorker::isStarted() const return is_started; } -void ReshardingWorker::pollAndExecute() +void ReshardingWorker::submitJobImpl(const std::string & serialized_job) { - bool error = false; + auto zookeeper = context.getZooKeeper(); + (void) zookeeper->create(host_task_queue_path + "/task-", serialized_job, + zkutil::CreateMode::PersistentSequential); +} +void ReshardingWorker::pollAndExecute() +{ try { bool old_val = false; if (!is_started.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed)) - throw Exception("Resharding background thread already started", ErrorCodes::LOGICAL_ERROR); + throw Exception("Resharding worker thread already started", ErrorCodes::LOGICAL_ERROR); - LOG_DEBUG(log, "Started resharding background thread."); + LOG_DEBUG(log, "Started resharding thread."); try { @@ -162,10 +129,10 @@ void ReshardingWorker::pollAndExecute() } catch (const Exception & ex) { - if (ex.code() == ErrorCodes::ABORTED) + if ((ex.code() == ErrorCodes::RESHARDING_CLEANUP_FAILED) || hasAborted(ex)) throw; else - LOG_ERROR(log, ex.message()); + LOG_INFO(log, ex.message()); } catch (...) { @@ -199,10 +166,10 @@ void ReshardingWorker::pollAndExecute() } catch (const Exception & ex) { - if (ex.code() == ErrorCodes::ABORTED) + if ((ex.code() == ErrorCodes::RESHARDING_CLEANUP_FAILED) || hasAborted(ex)) throw; else - LOG_ERROR(log, ex.message()); + LOG_INFO(log, ex.message()); } catch (...) { @@ -212,21 +179,11 @@ void ReshardingWorker::pollAndExecute() } catch (const Exception & ex) { - if (ex.code() != ErrorCodes::ABORTED) - error = true; - } - catch (...) - { - error = true; + if (!hasAborted(ex)) + throw; } - if (error) - { - /// Если мы попали сюда, это значит, что где-то кроется баг. - LOG_ERROR(log, "Resharding background thread terminated with critical error."); - } - else - LOG_DEBUG(log, "Resharding background thread terminated."); + LOG_DEBUG(log, "Resharding thread terminated."); } void ReshardingWorker::performPendingJobs() @@ -247,24 +204,8 @@ void ReshardingWorker::perform(const Strings & job_nodes) std::string child_full_path = host_task_queue_path + "/" + child; auto job_descriptor = zookeeper->get(child_full_path); ReshardingJob job(job_descriptor); - - try - { - perform(job); - } - catch (const Exception & ex) - { - if (ex.code() != ErrorCodes::ABORTED) - zookeeper->remove(child_full_path); - throw; - } - catch (...) - { - zookeeper->remove(child_full_path); - throw; - } - zookeeper->remove(child_full_path); + perform(job); } } @@ -288,8 +229,12 @@ void ReshardingWorker::perform(const ReshardingJob & job) { cleanup(storage, job); - if (ex.code() == ErrorCodes::ABORTED) - LOG_DEBUG(log, "Resharding job cancelled."); + if (hasAborted(ex)) + { + /// Поток завершается. Сохраняем сведения о прерванной задаче. + submitJob(job); + LOG_DEBUG(log, "Resharding job cancelled then re-submitted for later processing."); + } throw; } @@ -331,24 +276,15 @@ void ReshardingWorker::createShardedPartitions(StorageReplicatedMergeTree & stor /// Для каждого шарда, куски, которые должны быть слиты. std::unordered_map to_merge; - /// Для нумерации блоков. - SimpleIncrement increment(storage.data.getMaxDataPartIndex()); - MergeTreeData::PerShardDataParts & per_shard_data_parts = storage.data.per_shard_data_parts; auto zookeeper = storage.getZooKeeper(); const auto & settings = context.getSettingsRef(); + (void) settings; DayNum_t month = MergeTreeData::getMonthFromName(job.partition); - { - std::lock_guard guard(cancel_mutex); - merger = std::make_unique(storage.data); - } - - auto parts_from_partition = merger->selectAllPartsFromPartition(month); - - MergeTreeSharder sharder(storage.data, job); + auto parts_from_partition = storage.merger.selectAllPartsFromPartition(month); for (const auto & part : parts_from_partition) { @@ -369,6 +305,8 @@ void ReshardingWorker::createShardedPartitions(StorageReplicatedMergeTree & stor DBMS_DEFAULT_BUFFER_SIZE, true); + MergeTreeSharder sharder(storage.data, job); + Block block; while (block = source.read()) { @@ -380,8 +318,57 @@ void ReshardingWorker::createShardedPartitions(StorageReplicatedMergeTree & stor abortIfRequested(); /// Создать новый кусок соответствующий новому блоку. - Int64 temp_index = increment.get(); - MergeTreeData::MutableDataPartPtr block_part = sharder.writeTempPart(block_with_dates, temp_index); + std::string month_name = toString(DateLUT::instance().toNumYYYYMMDD(DayNum_t(block_with_dates.min_date)) / 100); + AbandonableLockInZooKeeper block_number_lock = storage.allocateBlockNumber(month_name); + Int64 part_number = block_number_lock.getNumber(); + MergeTreeData::MutableDataPartPtr block_part = sharder.writeTempPart(block_with_dates, part_number); + + /// Добавить в БД ZooKeeper информацию о новом блоке. + SipHash hash; + block_part->checksums.summaryDataChecksum(hash); + union + { + char bytes[16]; + UInt64 lo; + UInt64 hi; + } hash_value; + hash.get128(hash_value.bytes); + + std::string checksum(hash_value.bytes, 16); + + std::string block_id = toString(hash_value.lo) + "_" + toString(hash_value.hi); + + zkutil::Ops ops; + auto acl = zookeeper->getDefaultACL(); + + std::string to_path = job.paths[block_with_dates.shard_no].first; + + ops.push_back( + new zkutil::Op::Create( + to_path + "/detached_sharded_blocks/" + block_id, + "", + acl, + zkutil::CreateMode::Persistent)); + ops.push_back( + new zkutil::Op::Create( + to_path + "/detached_sharded_blocks/" + block_id + "/checksum", + checksum, + acl, + zkutil::CreateMode::Persistent)); + ops.push_back( + new zkutil::Op::Create( + to_path + "/detached_sharded_blocks/" + block_id + "/number", + toString(part_number), + acl, + zkutil::CreateMode::Persistent)); + + block_number_lock.getUnlockOps(ops); + + auto code = zookeeper->tryMulti(ops); + if (code != ZOK) + throw Exception("Unexpected error while adding block " + toString(part_number) + + " with ID " + block_id + ": " + zkutil::ZooKeeper::error2string(code), + ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR); abortIfRequested(); @@ -403,7 +390,7 @@ void ReshardingWorker::createShardedPartitions(StorageReplicatedMergeTree & stor const auto & merge_entry = storage.data.context.getMergeList().insert(job.database_name, job.table_name, merged_name); - MergeTreeData::MutableDataPartPtr new_part = merger->mergeParts(parts, merged_name, *merge_entry, + MergeTreeData::MutableDataPartPtr new_part = storage.merger.mergeParts(parts, merged_name, *merge_entry, storage.data.context.getSettings().min_bytes_to_use_direct_io); sharded_parts.insert(new_part); @@ -437,7 +424,7 @@ void ReshardingWorker::createShardedPartitions(StorageReplicatedMergeTree & stor const auto & merge_entry = storage.data.context.getMergeList().insert(job.database_name, job.table_name, merged_name); - MergeTreeData::MutableDataPartPtr new_part = merger->mergeParts(parts, merged_name, *merge_entry, + MergeTreeData::MutableDataPartPtr new_part = storage.merger.mergeParts(parts, merged_name, *merge_entry, storage.data.context.getSettings().min_bytes_to_use_direct_io); sharded_parts.insert(new_part); @@ -479,6 +466,8 @@ void ReshardingWorker::publishShardedPartitions(StorageReplicatedMergeTree & sto auto zookeeper = storage.getZooKeeper(); + MergeTreeData::PerShardDataParts & per_shard_data_parts = storage.data.per_shard_data_parts; + struct TaskInfo { TaskInfo(const std::string & replica_path_, @@ -504,20 +493,16 @@ void ReshardingWorker::publishShardedPartitions(StorageReplicatedMergeTree & sto /// Количество участвующих локальных реплик. Должно быть <= 1. size_t local_count = 0; - for (const auto & entry : storage.data.per_shard_data_parts) + for (size_t shard_no = 0; shard_no < job.paths.size(); ++shard_no) { - size_t shard_no = entry.first; - const MergeTreeData::MutableDataParts & sharded_parts = entry.second; - if (sharded_parts.empty()) - continue; + const WeightedZooKeeperPath & weighted_path = job.paths[shard_no]; + const std::string & zookeeper_path = weighted_path.first; std::vector part_names; + const MergeTreeData::MutableDataParts & sharded_parts = per_shard_data_parts.at(shard_no); for (const MergeTreeData::DataPartPtr & sharded_part : sharded_parts) part_names.push_back(sharded_part->name); - const WeightedZooKeeperPath & weighted_path = job.paths[shard_no]; - const std::string & zookeeper_path = weighted_path.first; - auto children = zookeeper->getChildren(zookeeper_path + "/replicas"); for (const auto & child : children) { @@ -625,14 +610,9 @@ void ReshardingWorker::applyChanges(StorageReplicatedMergeTree & storage, const using TaskInfoList = std::vector; TaskInfoList task_info_list; - for (const auto & entry : storage.data.per_shard_data_parts) + for (size_t i = 0; i < job.paths.size(); ++i) { - size_t shard_no = entry.first; - const MergeTreeData::MutableDataParts & sharded_parts = entry.second; - if (sharded_parts.empty()) - continue; - - const WeightedZooKeeperPath & weighted_path = job.paths[shard_no]; + const WeightedZooKeeperPath & weighted_path = job.paths[i]; const std::string & zookeeper_path = weighted_path.first; auto children = zookeeper->getChildren(zookeeper_path + "/replicas"); @@ -688,13 +668,47 @@ void ReshardingWorker::cleanup(StorageReplicatedMergeTree & storage, const Resha { LOG_DEBUG(log, "Performing cleanup."); - storage.data.per_shard_data_parts.clear(); + try + { + storage.data.per_shard_data_parts.clear(); + + Poco::DirectoryIterator end; + for (Poco::DirectoryIterator it(storage.full_path + "/reshard"); it != end; ++it) + { + auto absolute_path = it.path().absolute().toString(); + Poco::File(absolute_path).remove(true); + } + + auto zookeeper = storage.getZooKeeper(); + zkutil::Ops ops; + for (size_t i = 0; i < job.paths.size(); ++i) + { + const WeightedZooKeeperPath & weighted_path = job.paths[i]; + const std::string & zookeeper_path = weighted_path.first; - Poco::DirectoryIterator end; - for (Poco::DirectoryIterator it(storage.full_path + "/reshard"); it != end; ++it) + auto children = zookeeper->getChildren(zookeeper_path + "/detached_sharded_blocks"); + if (!children.empty()) + { + for (const auto & child : children) + { + ops.push_back( + new zkutil::Op::Remove( + zookeeper_path + "/detached_sharded_blocks/" + child + "/number", -1)); + ops.push_back( + new zkutil::Op::Remove( + zookeeper_path + "/detached_sharded_blocks/" + child + "/checksum", -1)); + ops.push_back( + new zkutil::Op::Remove( + zookeeper_path + "/detached_sharded_blocks/" + child, -1)); + } + } + } + zookeeper->multi(ops); + } + catch (...) { - auto absolute_path = it.path().absolute().toString(); - Poco::File(absolute_path).remove(true); + throw Exception("Failed to perform cleanup during resharding operation", + ErrorCodes::RESHARDING_CLEANUP_FAILED); } } @@ -704,4 +718,9 @@ void ReshardingWorker::abortIfRequested() const throw Exception("Cancelled resharding", ErrorCodes::ABORTED); } +bool ReshardingWorker::hasAborted(const Exception & ex) const +{ + return must_stop && (ex.code() == ErrorCodes::ABORTED); +} + } diff --git a/dbms/src/Storages/MergeTree/ShardedPartitionSender.cpp b/dbms/src/Storages/MergeTree/ShardedPartitionSender.cpp index 9ce5a36366c2fb4030e0a267ee2224b5adaa299a..5dbcf664633bed234b4425bba065c97d889ef37f 100644 --- a/dbms/src/Storages/MergeTree/ShardedPartitionSender.cpp +++ b/dbms/src/Storages/MergeTree/ShardedPartitionSender.cpp @@ -51,7 +51,7 @@ std::string getEndpointId(const std::string & node_id) } Service::Service(StorageReplicatedMergeTree & storage_) - : storage(storage_), log(&Logger::get("ShardedPartitionSender::Service")) + : storage(storage_) { } @@ -79,18 +79,8 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out MergeTreeData::MutableDataPartPtr part = storage.fetcher.fetchShardedPart(from_location, part_name, shard_no); part->is_temp = false; - - const std::string old_part_path = storage.full_path + part->name; - const std::string new_part_path = storage.full_path + "detached/" + part_name; - - Poco::File new_part_dir(new_part_path); - if (new_part_dir.exists()) - { - LOG_WARNING(log, "Directory " + new_part_path + " already exists. Removing."); - new_part_dir.remove(true); - } - - Poco::File(old_part_path).renameTo(new_part_path); + const std::string new_name = "detached/" + part_name; + Poco::File(storage.full_path + part->name).renameTo(storage.full_path + new_name); } bool flag = true; @@ -98,11 +88,6 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out out.next(); } -Client::Client() - : log(&Logger::get("ShardedPartitionSender::Client")) -{ -} - bool Client::send(const InterserverIOEndpointLocation & to_location, const InterserverIOEndpointLocation & from_location, const std::vector & parts, size_t shard_no) { diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp index 622a8b6eadbb6fe91b49f7dbfcba9b86644e69ad..96cba49df43939cd83108dc2b5c7698b052ed0fb 100644 --- a/dbms/src/Storages/StorageDistributed.cpp +++ b/dbms/src/Storages/StorageDistributed.cpp @@ -228,7 +228,7 @@ void StorageDistributed::shutdown() void StorageDistributed::reshardPartitions(const String & database_name, const Field & first_partition, const Field & last_partition, const WeightedZooKeeperPaths & weighted_zookeeper_paths, - const ASTPtr & sharding_key_expr, const Settings & settings) + const String & sharding_key, const Settings & settings) { /// Создать запрос ALTER TABLE xxx.yyy RESHARD PARTITION zzz TO ttt USING uuu. @@ -258,7 +258,7 @@ void StorageDistributed::reshardPartitions(const String & database_name, const F } parameters.weighted_zookeeper_paths = expr_list; - parameters.sharding_key_expr = sharding_key_expr; + parameters.sharding_key = sharding_key; /** Функциональность shard_multiplexing не доделана - выключаем её. * (Потому что установка соединений с разными шардами в рамках одного потока выполняется не параллельно.) diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 6eb942841b783173762aaca51474379b69a62215..83e95eb7d0db592bd95be80827d59aaab0017d50 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -75,6 +75,7 @@ namespace ErrorCodes extern const int INVALID_PARTITIONS_INTERVAL; extern const int RESHARDING_INVALID_PARAMETERS; extern const int INVALID_SHARD_WEIGHT; + extern const int SHARD_DOESNT_REFERENCE_TABLE; } @@ -334,7 +335,7 @@ StoragePtr StorageReplicatedMergeTree::create( /// Сервисы для перешардирования. { - InterserverIOEndpointPtr endpoint = new RemoteDiskSpaceMonitor::Service(res->context); + InterserverIOEndpointPtr endpoint = new RemoteDiskSpaceMonitor::Service(res->full_path); res->disk_space_monitor_endpoint_holder = get_endpoint_holder(endpoint); } @@ -402,6 +403,8 @@ void StorageReplicatedMergeTree::createTableIfNotExists() acl, zkutil::CreateMode::Persistent)); ops.push_back(new zkutil::Op::Create(zookeeper_path + "/blocks", "", acl, zkutil::CreateMode::Persistent)); + ops.push_back(new zkutil::Op::Create(zookeeper_path + "/detached_sharded_blocks", "", + acl, zkutil::CreateMode::Persistent)); ops.push_back(new zkutil::Op::Create(zookeeper_path + "/block_numbers", "", acl, zkutil::CreateMode::Persistent)); ops.push_back(new zkutil::Op::Create(zookeeper_path + "/nonincrement_block_numbers", "", @@ -2314,7 +2317,7 @@ void StorageReplicatedMergeTree::shutdown() fetcher.cancel(); disk_space_monitor_endpoint_holder = nullptr; - disk_space_monitor_client.cancel(); + free_disk_space_checker.cancel(); sharded_partition_sender_endpoint_holder = nullptr; sharded_partition_sender_client.cancel(); @@ -2888,7 +2891,56 @@ void StorageReplicatedMergeTree::attachPartition(ASTPtr query, const Field & fie zookeeper_path + "/log/log-", entry.toString(), zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential)); } - LOG_DEBUG(log, "Adding attaches to log"); + std::string log_msg = "Adding attaches to log"; + + if (is_leader_node) + { + /// Если ATTACH PART выполняется в рамках перешардирования, обновляем информацию о блоках на шарде. + auto children = zookeeper->getChildren(zookeeper_path + "/detached_sharded_blocks"); + if (!children.empty()) + { + log_msg += ". Updating information about blocks in the context of the resharding operation."; + + auto acl = zookeeper->getDefaultACL(); + + for (const auto & child : children) + { + std::string checksum = zookeeper->get(zookeeper_path + "/detached_sharded_blocks/" + child + "/checksum"); + std::string number = zookeeper->get(zookeeper_path + "/detached_sharded_blocks/" + child + "/number"); + + ops.push_back( + new zkutil::Op::Create( + zookeeper_path + "/blocks/" + child, + "", + acl, + zkutil::CreateMode::Persistent)); + ops.push_back( + new zkutil::Op::Create( + zookeeper_path + "/blocks/" + child + "/checksum", + checksum, + acl, + zkutil::CreateMode::Persistent)); + ops.push_back( + new zkutil::Op::Create( + zookeeper_path + "/blocks/" + child + "/number", + number, + acl, + zkutil::CreateMode::Persistent)); + + ops.push_back( + new zkutil::Op::Remove( + zookeeper_path + "/detached_sharded_blocks/" + child + "/number", -1)); + ops.push_back( + new zkutil::Op::Remove( + zookeeper_path + "/detached_sharded_blocks/" + child + "/checksum", -1)); + ops.push_back( + new zkutil::Op::Remove( + zookeeper_path + "/detached_sharded_blocks/" + child, -1)); + } + } + } + + LOG_DEBUG(log, log_msg); zookeeper->multi(ops); @@ -3429,12 +3481,12 @@ void StorageReplicatedMergeTree::freezePartition(const Field & partition, const } void StorageReplicatedMergeTree::reshardPartitions(const String & database_name, const Field & first_partition, const Field & last_partition, - const WeightedZooKeeperPaths & weighted_zookeeper_paths, const ASTPtr & sharding_key_expr, + const WeightedZooKeeperPaths & weighted_zookeeper_paths, const String & sharding_key, const Settings & settings) { auto & resharding_worker = context.getReshardingWorker(); if (!resharding_worker.isStarted()) - throw Exception("Resharding background thread is not running.", ErrorCodes::RESHARDING_NO_WORKER); + throw Exception("Resharding worker is not running.", ErrorCodes::RESHARDING_NO_WORKER); for (const auto & weighted_path : weighted_zookeeper_paths) { @@ -3443,6 +3495,14 @@ void StorageReplicatedMergeTree::reshardPartitions(const String & database_name, throw Exception("Shard has invalid weight", ErrorCodes::INVALID_SHARD_WEIGHT); } + for (const auto & weighted_path : weighted_zookeeper_paths) + { + const std::string & path = weighted_path.first; + if ((path.length() <= getTableName().length()) || + (path.substr(path.length() - getTableName().length()) != getTableName())) + throw Exception("Shard does not reference table", ErrorCodes::SHARD_DOESNT_REFERENCE_TABLE); + } + DayNum_t first_partition_num = !first_partition.isNull() ? MergeTreeData::getMonthDayNum(first_partition) : DayNum_t(); DayNum_t last_partition_num = !last_partition.isNull() ? MergeTreeData::getMonthDayNum(last_partition) : DayNum_t(); @@ -3488,7 +3548,7 @@ void StorageReplicatedMergeTree::reshardPartitions(const String & database_name, /// Зарегистрировать фоновые задачи перешардирования. for (const auto & partition : partition_list) - resharding_worker.submitJob(database_name, getTableName(), partition, weighted_zookeeper_paths, sharding_key_expr); + resharding_worker.submitJob(database_name, getTableName(), partition, weighted_zookeeper_paths, sharding_key); } void StorageReplicatedMergeTree::enforceShardsConsistency(const WeightedZooKeeperPaths & weighted_zookeeper_paths) @@ -3581,8 +3641,8 @@ StorageReplicatedMergeTree::gatherReplicaSpaceInfo(const WeightedZooKeeperPaths InterserverIOEndpointLocation location(replica_path, address.host, address.replication_port); - tasks[i] = Tasks::value_type(std::bind(&RemoteDiskSpaceMonitor::Client::getFreeSpace, - &disk_space_monitor_client, location)); + tasks[i] = Tasks::value_type(std::bind(&RemoteDiskSpaceMonitor::Client::getFreeDiskSpace, + &free_disk_space_checker, location)); pool.schedule([i, &tasks]{ tasks[i](); }); } } diff --git a/libs/libzkutil/include/zkutil/ZooKeeper.h b/libs/libzkutil/include/zkutil/ZooKeeper.h index e16946c052059211c6e586bf9991a1c31530f45f..36aa867969d310336a3de2c7e6908f894cf5b54a 100644 --- a/libs/libzkutil/include/zkutil/ZooKeeper.h +++ b/libs/libzkutil/include/zkutil/ZooKeeper.h @@ -173,6 +173,8 @@ public: */ void waitForDisappear(const std::string & path); + std::string getTaskQueuePath() const; + /** Асинхронный интерфейс (реализовано небольшое подмножество операций). * * Использование: @@ -298,7 +300,7 @@ private: friend struct WatchWithEvent; friend class EphemeralNodeHolder; - void init(const std::string & hosts, int32_t session_timeout_ms); + void init(const std::string & hosts, int32_t session_timeout_ms, const std::string & task_queue_path_ = ""); void removeChildrenRecursive(const std::string & path); void tryRemoveChildrenRecursive(const std::string & path); void * watchForEvent(EventPtr event); @@ -341,6 +343,7 @@ private: int32_t existsImpl(const std::string & path, Stat * stat_, EventPtr watch = nullptr); std::string hosts; + std::string task_queue_path; int32_t session_timeout_ms; std::mutex mutex; diff --git a/libs/libzkutil/src/ZooKeeper.cpp b/libs/libzkutil/src/ZooKeeper.cpp index 1da49abbdf1782f18a9d7c74c4740df6318f0acb..2d4bcb8da6bd1fa8671c3406a099c0e5738cf169 100644 --- a/libs/libzkutil/src/ZooKeeper.cpp +++ b/libs/libzkutil/src/ZooKeeper.cpp @@ -61,12 +61,13 @@ void ZooKeeper::processEvent(zhandle_t * zh, int type, int state, const char * p } } -void ZooKeeper::init(const std::string & hosts_, int32_t session_timeout_ms_) +void ZooKeeper::init(const std::string & hosts_, int32_t session_timeout_ms_, const std::string & task_queue_path_) { log = &Logger::get("ZooKeeper"); zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR); hosts = hosts_; session_timeout_ms = session_timeout_ms_; + task_queue_path = task_queue_path_; impl = zookeeper_init(hosts.c_str(), nullptr, session_timeout_ms, nullptr, nullptr, 0); ProfileEvents::increment(ProfileEvents::ZooKeeperInit); @@ -104,6 +105,10 @@ struct ZooKeeperArgs { session_timeout_ms = config.getInt(config_name + "." + key); } + else if (key == "task_queue_path") + { + task_queue_path = config.getString(config_name + "." + key); + } else throw KeeperException(std::string("Unknown key ") + key + " in config file"); } @@ -120,12 +125,13 @@ struct ZooKeeperArgs std::string hosts; size_t session_timeout_ms; + std::string task_queue_path; }; ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name) { ZooKeeperArgs args(config, config_name); - init(args.hosts, args.session_timeout_ms); + init(args.hosts, args.session_timeout_ms, args.task_queue_path); } void * ZooKeeper::watchForEvent(EventPtr event) @@ -578,6 +584,11 @@ void ZooKeeper::waitForDisappear(const std::string & path) } } +std::string ZooKeeper::getTaskQueuePath() const +{ + return task_queue_path; +} + ZooKeeper::~ZooKeeper() { LOG_INFO(&Logger::get("~ZooKeeper"), "Closing ZooKeeper session");