From 90940d84231c746c00beba675348b4504589a28e Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Fri, 11 Aug 2017 18:02:07 +0300 Subject: [PATCH] Cluster and DistributedBlockOutputStream refactoring [#CLICKHOUSE-3033] --- dbms/src/Interpreters/Cluster.cpp | 42 ++- dbms/src/Interpreters/Cluster.h | 10 +- dbms/src/Interpreters/DDLWorker.cpp | 8 +- .../DistributedBlockOutputStream.cpp | 255 +++++++++++------- .../DistributedBlockOutputStream.h | 35 ++- .../Storages/MergeTree/ReshardingWorker.cpp | 12 +- .../Storages/System/StorageSystemClusters.cpp | 21 +- 7 files changed, 205 insertions(+), 178 deletions(-) diff --git a/dbms/src/Interpreters/Cluster.cpp b/dbms/src/Interpreters/Cluster.cpp index 87d9244495..2ed378b12d 100644 --- a/dbms/src/Interpreters/Cluster.cpp +++ b/dbms/src/Interpreters/Cluster.cpp @@ -77,6 +77,7 @@ Cluster::Address::Address(Poco::Util::AbstractConfiguration & config, const Stri user = config.getString(config_prefix + ".user", "default"); password = config.getString(config_prefix + ".password", ""); default_database = config.getString(config_prefix + ".default_database", ""); + is_local = isLocal(*this); } @@ -98,6 +99,7 @@ Cluster::Address::Address(const String & host_port_, const String & user_, const host_name = host_port_; port = default_port; } + is_local = isLocal(*this); } @@ -193,6 +195,8 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se { /// Shard without replicas. + Addresses addresses; + const auto & prefix = config_prefix + key; const auto weight = config.getInt(prefix + ".weight", default_weight); @@ -204,11 +208,10 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se info.shard_num = current_shard_num; info.weight = weight; - if (isLocal(address)) + if (address.is_local) info.local_addresses.push_back(address); else { - info.dir_names.push_back(address.toStringFull()); ConnectionPoolPtrs pools; pools.push_back(std::make_shared( settings.distributed_connections_pool_size, @@ -227,6 +230,7 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size()); shards_info.push_back(info); + addresses_with_failover.push_back(addresses); } else if (startsWith(key, "shard")) { @@ -244,10 +248,8 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se bool internal_replication = config.getBool(partial_prefix + ".internal_replication", false); - /** in case of internal_replication we will be appending names to - * the first element of vector; otherwise we will just .emplace_back - */ - std::vector dir_names{}; + /// in case of internal_replication we will be appending names to dir_name_for_internal_replication + std::string dir_name_for_internal_replication; auto first = true; for (const auto & replica_key : replica_keys) @@ -261,18 +263,16 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se replica_addresses.back().replica_num = current_replica_num; ++current_replica_num; - if (!isLocal(replica_addresses.back())) + if (!replica_addresses.back().is_local) { if (internal_replication) { auto dir_name = replica_addresses.back().toStringFull(); if (first) - dir_names.emplace_back(std::move(dir_name)); + dir_name_for_internal_replication = dir_name; else - dir_names.front() += "," + dir_name; + dir_name_for_internal_replication += "," + dir_name; } - else - dir_names.emplace_back(replica_addresses.back().toStringFull()); if (first) first = false; } @@ -288,7 +288,7 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se for (const auto & replica : replica_addresses) { - if (isLocal(replica)) + if (replica.is_local) shard_local_addresses.push_back(replica); else { @@ -311,17 +311,18 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se if (weight) slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size()); - shards_info.push_back({std::move(dir_names), current_shard_num, weight, shard_local_addresses, shard_pool, internal_replication}); + shards_info.push_back({std::move(dir_name_for_internal_replication), current_shard_num, weight, + shard_local_addresses, shard_pool, internal_replication}); } else throw Exception("Unknown element in config: " + key, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); - if (!addresses_with_failover.empty() && !addresses.empty()) - throw Exception("There must be either 'node' or 'shard' elements in config", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); - ++current_shard_num; } + if (addresses_with_failover.empty()) + throw Exception("There must be either 'node' or 'shard' elements in config", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); + initMisc(); } @@ -409,12 +410,7 @@ void Cluster::calculateHashOfAddresses() { std::vector elements; - if (!addresses.empty()) - { - for (const auto & address : addresses) - elements.push_back(address.host_name + ":" + toString(address.port)); - } - else if (!addresses_with_failover.empty()) + if (!addresses_with_failover.empty()) { for (const auto & addresses : addresses_with_failover) { @@ -453,8 +449,6 @@ std::unique_ptr Cluster::getClusterWithSingleShard(size_t index) const Cluster::Cluster(const Cluster & from, size_t index) : shards_info{from.shards_info[index]} { - if (!from.addresses.empty()) - addresses.emplace_back(from.addresses[index]); if (!from.addresses_with_failover.empty()) addresses_with_failover.emplace_back(from.addresses_with_failover[index]); diff --git a/dbms/src/Interpreters/Cluster.h b/dbms/src/Interpreters/Cluster.h index 9a7b1470d6..cb131a0039 100644 --- a/dbms/src/Interpreters/Cluster.h +++ b/dbms/src/Interpreters/Cluster.h @@ -55,6 +55,7 @@ public: String password; String default_database; /// this database is selected when no database is specified for Distributed table UInt32 replica_num; + bool is_local; Address(Poco::Util::AbstractConfiguration & config, const String & config_prefix); Address(const String & host_port_, const String & user_, const String & password_); @@ -80,8 +81,8 @@ public: bool hasInternalReplication() const { return has_internal_replication; } public: - /// Contains names of directories for asynchronous write to StorageDistributed - std::vector dir_names; + /// Name of directory for asynchronous write to StorageDistributed if has_internal_replication + std::string dir_name_for_internal_replication; /// Number of the shard, the indexation begins with 1 UInt32 shard_num; UInt32 weight; @@ -94,8 +95,7 @@ public: String getHashOfAddresses() const { return hash_of_addresses; } const ShardsInfo & getShardsInfo() const { return shards_info; } - const Addresses & getShardsAddresses() const { return addresses; } - const AddressesWithFailover & getShardsWithFailoverAddresses() const { return addresses_with_failover; } + const AddressesWithFailover & getShardsAddresses() const { return addresses_with_failover; } const ShardInfo & getAnyShardInfo() const { @@ -144,8 +144,6 @@ private: /// Non-empty is either addresses or addresses_with_failover. /// The size and order of the elements in the corresponding array corresponds to shards_info. - /// An array of shards. Each shard is the address of one server. - Addresses addresses; /// An array of shards. For each shard, an array of replica addresses (servers that are considered identical). AddressesWithFailover addresses_with_failover; diff --git a/dbms/src/Interpreters/DDLWorker.cpp b/dbms/src/Interpreters/DDLWorker.cpp index 1dc88d8192..e0be7563b9 100644 --- a/dbms/src/Interpreters/DDLWorker.cpp +++ b/dbms/src/Interpreters/DDLWorker.cpp @@ -273,7 +273,7 @@ void DDLWorker::processTask(const DDLLogEntry & node, const std::string & node_n String cluster_name = query->cluster; auto cluster = context.getCluster(cluster_name); - auto shard_host_num = tryGetShardAndHostNum(cluster->getShardsWithFailoverAddresses(), host_name, port); + auto shard_host_num = tryGetShardAndHostNum(cluster->getShardsAddresses(), host_name, port); if (!shard_host_num) { throw Exception("Cannot find own address (" + host_id + ") in cluster " + cluster_name + " configuration", @@ -283,7 +283,7 @@ void DDLWorker::processTask(const DDLLogEntry & node, const std::string & node_n size_t shard_num = shard_host_num->first; size_t host_num = shard_host_num->second; - const auto & host_address = cluster->getShardsWithFailoverAddresses().at(shard_num).at(host_num); + const auto & host_address = cluster->getShardsAddresses().at(shard_num).at(host_num); ASTPtr rewritten_ast = query->getRewrittenASTWithoutOnCluster(host_address.default_database); String rewritten_query = queryToString(rewritten_ast); @@ -369,7 +369,7 @@ void DDLWorker::processTaskAlter( throw Exception("Distributed DDL alters don't work properly yet", ErrorCodes::NOT_IMPLEMENTED); Strings replica_names; - for (const auto & address : cluster->getShardsWithFailoverAddresses().at(shard_num)) + for (const auto & address : cluster->getShardsAddresses().at(shard_num)) replica_names.emplace_back(address.toString()); std::sort(replica_names.begin(), replica_names.end()); @@ -700,7 +700,7 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, Context & context) entry.query = queryToString(query_ptr); entry.initiator = ddl_worker.getHostName(); - Cluster::AddressesWithFailover shards = cluster->getShardsWithFailoverAddresses(); + Cluster::AddressesWithFailover shards = cluster->getShardsAddresses(); for (const auto & shard : shards) { for (const auto & addr : shard) diff --git a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp index 994ed9e4c7..239f8ba94d 100644 --- a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -45,22 +45,26 @@ namespace ProfileEvents namespace DB { + namespace ErrorCodes { extern const int TIMEOUT_EXCEEDED; } + DistributedBlockOutputStream::DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast, const ClusterPtr & cluster_, bool insert_sync_, UInt64 insert_timeout_) : storage(storage), query_ast(query_ast), cluster(cluster_), insert_sync(insert_sync_), insert_timeout(insert_timeout_) { } + void DistributedBlockOutputStream::writePrefix() { deadline = std::chrono::steady_clock::now() + std::chrono::seconds(insert_timeout); } + void DistributedBlockOutputStream::write(const Block & block) { if (insert_sync) @@ -69,21 +73,22 @@ void DistributedBlockOutputStream::write(const Block & block) writeAsync(block); } + void DistributedBlockOutputStream::writeAsync(const Block & block) { if (storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1)) - return writeSplit(block); + return writeSplitAsync(block); - writeImpl(block); + writeAsyncImpl(block); ++blocks_inserted; } + ThreadPool::Job DistributedBlockOutputStream::createWritingJob( - std::vector & done_jobs, std::atomic & finished_jobs_count, std::condition_variable & cond_var, - const Block & block, size_t job_id, const Cluster::ShardInfo & shard_info, size_t replica_id) + WritingJobContext & context, const Block & block, const Cluster::Address & address, size_t shard_id, size_t job_id) { auto memory_tracker = current_memory_tracker; - return [this, memory_tracker, & done_jobs, & finished_jobs_count, & cond_var, & block, job_id, & shard_info, replica_id]() + return [this, memory_tracker, & context, & block, & address, shard_id, job_id]() { if (!current_memory_tracker) { @@ -92,34 +97,34 @@ ThreadPool::Job DistributedBlockOutputStream::createWritingJob( } try { - this->writeToShardSync(block, shard_info, replica_id); - ++finished_jobs_count; - done_jobs[job_id] = true; - cond_var.notify_one(); + const auto & shard_info = cluster->getShardsInfo()[shard_id]; + if (address.is_local) + { + writeToLocal(block, shard_info.getLocalNodeCount()); + context.done_local_jobs[job_id] = true; + } + else + { + writeToShardSync(block, shard_info.hasInternalReplication() + ? shard_info.dir_name_for_internal_replication + : address.toStringFull()); + context.done_remote_jobs[job_id] = true; + } + + ++context.finished_jobs_count; + context.cond_var.notify_one(); } catch (...) { - ++finished_jobs_count; - cond_var.notify_one(); + ++context.finished_jobs_count; + context.cond_var.notify_one(); throw; } }; } -void DistributedBlockOutputStream::writeToLocal(const Blocks & blocks, size_t & finished_writings_count) -{ - const Cluster::ShardsInfo & shards_info = cluster->getShardsInfo(); - for (size_t shard_id : ext::range(0, shards_info.size())) - { - const auto & shard_info = shards_info[shard_id]; - if (shard_info.getLocalNodeCount() > 0) - writeToLocal(blocks[shard_id], shard_info.getLocalNodeCount(), finished_writings_count); - } -} - -std::string DistributedBlockOutputStream::getCurrentStateDescription( - const std::vector & done_jobs, size_t finished_local_nodes_count) +std::string DistributedBlockOutputStream::getCurrentStateDescription(const WritingJobContext & context) { const Cluster::ShardsInfo & shards_info = cluster->getShardsInfo(); String description; @@ -127,111 +132,154 @@ std::string DistributedBlockOutputStream::getCurrentStateDescription( buffer << "Insertion status:\n"; - auto writeDescription = [&buffer](const std::string & address, size_t shard_id, size_t blocks_wrote) + auto writeDescription = [&buffer](const Cluster::Address & address, size_t shard_id, size_t blocks_wrote) { buffer << "Wrote " << blocks_wrote << " blocks on shard " << shard_id << " replica "; - buffer << unescapeForFileName(address) << '\n'; + buffer << address.toString() << '\n'; }; - size_t job_id = 0; + const auto addresses_with_failovers = cluster->getShardsAddresses(); + + size_t remote_job_id = 0; + size_t local_job_id = 0; for (size_t shard_id : ext::range(0, shards_info.size())) { const auto & shard_info = shards_info[shard_id]; - const auto & local_addresses = shard_info.local_addresses; - - for (const auto & address : local_addresses) + /// If hasInternalReplication, than prefer local replica + if (!shard_info.hasInternalReplication() || !shard_info.isLocal()) { - writeDescription(address.toStringFull(), shard_id, blocks_inserted + (finished_local_nodes_count ? 1 : 0)); - if (finished_local_nodes_count) - --finished_local_nodes_count; + for (const auto & address : addresses_with_failovers[shard_id]) + if (!address.is_local) + { + writeDescription(address, shard_id, blocks_inserted + (context.done_remote_jobs[remote_job_id++] ? 1 : 0)); + if (shard_info.hasInternalReplication()) + break; + } } - for (const auto & dir_name : shard_info.dir_names) - writeDescription(dir_name, shard_id, blocks_inserted + (done_jobs[job_id++] ? 1 : 0)); + if (shard_info.isLocal()) + { + const auto & address = shard_info.local_addresses.front(); + writeDescription(address, shard_id, blocks_inserted + (context.done_local_jobs[local_job_id++] ? 1 : 0)); + } } return description; } -void DistributedBlockOutputStream::calculateRemoteJobsCount() -{ - remote_jobs_count = 0; - const auto & shards_info = cluster->getShardsInfo(); - for (const auto & shard_info : shards_info) - remote_jobs_count += shard_info.dir_names.size(); -} -void DistributedBlockOutputStream::writeSync(const Block & block) +void DistributedBlockOutputStream::createWritingJobs(WritingJobContext & context, const Blocks & blocks) { - if (!pool) + const auto & addresses_with_failovers = cluster->getShardsAddresses(); + const Cluster::ShardsInfo & shards_info = cluster->getShardsInfo(); + + size_t remote_job_id = 0; + size_t local_job_id = 0; + for (size_t shard_id : ext::range(0, blocks.size())) { - /// Deferred initialization. Only for sync insertion. - calculateRemoteJobsCount(); - pool.emplace(remote_jobs_count); + const auto & shard_info = shards_info[shard_id]; + /// If hasInternalReplication, than prefer local replica + if (!shard_info.hasInternalReplication() || !shard_info.isLocal()) + { + for (const auto & address : addresses_with_failovers[shard_id]) + if (!address.is_local) + { + pool->schedule(createWritingJob(context, blocks[shard_id], address, shard_id, remote_job_id++)); + if (shard_info.hasInternalReplication()) + break; + } + } + + if (shards_info[shard_id].isLocal()) + { + const auto & address = shards_info[shard_id].local_addresses.front(); + pool->schedule(createWritingJob(context, blocks[shard_id], address, shard_id, local_job_id++)); + } } +} - std::vector done_jobs(remote_jobs_count, false); - std::atomic finished_jobs_count(0); - std::mutex mutex; - std::condition_variable cond_var; - const Cluster::ShardsInfo & shards_info = cluster->getShardsInfo(); - Blocks blocks = shards_info.size() > 1 ? splitBlock(block) : Blocks({block}); +void DistributedBlockOutputStream::calculateJobsCount() +{ + remote_jobs_count = 0; + local_jobs_count = 0; - size_t job_id = 0; - for (size_t shard_id : ext::range(0, blocks.size())) - for (size_t replica_id: ext::range(0, shards_info[shard_id].dir_names.size())) - pool->schedule(createWritingJob(done_jobs, finished_jobs_count, cond_var, - blocks[shard_id], job_id++, shards_info[shard_id], replica_id)); - - const size_t jobs_count = job_id; - size_t finished_local_nodes_count; - const auto time_point = deadline; - auto timeout = insert_timeout; - auto & pool = this->pool; - auto wait = [& mutex, & cond_var, time_point, & finished_jobs_count, jobs_count, & pool, timeout]() + const auto & addresses_with_failovers = cluster->getShardsAddresses(); + + const auto & shards_info = cluster->getShardsInfo(); + for (size_t shard_id : ext::range(0, shards_info.size())) { - std::unique_lock lock(mutex); - auto cond = [& finished_jobs_count, jobs_count] { return finished_jobs_count == jobs_count; }; - if (timeout) + const auto & shard_info = shards_info[shard_id]; + /// If hasInternalReplication, than prefer local replica + if (!shard_info.hasInternalReplication() || !shard_info.isLocal()) { - if (!cond_var.wait_until(lock, time_point, cond)) - { - pool->wait(); - ProfileEvents::increment(ProfileEvents::DistributedSyncInsertionTimeoutExceeded); - throw Exception("Timeout exceeded.", ErrorCodes::TIMEOUT_EXCEEDED); - } + for (const auto & address : addresses_with_failovers[shard_id]) + if (!address.is_local) + { + ++remote_jobs_count; + if (shard_info.hasInternalReplication()) + break; + } } - else - cond_var.wait(lock, cond); - pool->wait(); - }; - std::exception_ptr exception; - try + local_jobs_count += shard_info.isLocal() ? 1 : 0; + } +} + + +void DistributedBlockOutputStream::waitForUnfinishedJobs(WritingJobContext & context) +{ + std::unique_lock lock(context.mutex); + size_t jobs_count = remote_jobs_count + local_jobs_count; + auto cond = [& context, jobs_count] { return context.finished_jobs_count == jobs_count; }; + + if (insert_timeout) { - writeToLocal(blocks, finished_local_nodes_count); + if (!context.cond_var.wait_until(lock, deadline, cond)) + { + pool->wait(); + ProfileEvents::increment(ProfileEvents::DistributedSyncInsertionTimeoutExceeded); + throw Exception("Timeout exceeded.", ErrorCodes::TIMEOUT_EXCEEDED); + } } - catch (...) + else + context.cond_var.wait(lock, cond); + pool->wait(); +} + + +void DistributedBlockOutputStream::writeSync(const Block & block) +{ + if (!pool) { - exception = std::current_exception(); + /// Deferred initialization. Only for sync insertion. + calculateJobsCount(); + pool.emplace(remote_jobs_count + local_jobs_count); } + WritingJobContext context; + context.done_remote_jobs.assign(remote_jobs_count, false); + context.done_local_jobs.assign(local_jobs_count, false); + context.finished_jobs_count = 0; + + const Cluster::ShardsInfo & shards_info = cluster->getShardsInfo(); + Blocks blocks = shards_info.size() > 1 ? splitBlock(block) : Blocks({block}); + createWritingJobs(context, blocks); + try { - wait(); - if (exception) - std::rethrow_exception(exception); + waitForUnfinishedJobs(context); } catch(Exception & exception) { - exception.addMessage(getCurrentStateDescription(done_jobs, finished_local_nodes_count)); + exception.addMessage(getCurrentStateDescription(context)); throw; } ++blocks_inserted; } + IColumn::Selector DistributedBlockOutputStream::createSelector(Block block) { storage.getShardingKeyExpr()->execute(block); @@ -288,33 +336,39 @@ Blocks DistributedBlockOutputStream::splitBlock(const Block & block) } -void DistributedBlockOutputStream::writeSplit(const Block & block) +void DistributedBlockOutputStream::writeSplitAsync(const Block & block) { Blocks splitted_blocks = splitBlock(block); const size_t num_shards = splitted_blocks.size(); for (size_t shard_idx = 0; shard_idx < num_shards; ++shard_idx) if (splitted_blocks[shard_idx].rows()) - writeImpl(splitted_blocks[shard_idx], shard_idx); + writeAsyncImpl(splitted_blocks[shard_idx], shard_idx); ++blocks_inserted; } -void DistributedBlockOutputStream::writeImpl(const Block & block, const size_t shard_id) +void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, const size_t shard_id) { const auto & shard_info = cluster->getShardsInfo()[shard_id]; - size_t finished_writings_count = 0; if (shard_info.getLocalNodeCount() > 0) - writeToLocal(block, shard_info.getLocalNodeCount(), finished_writings_count); + writeToLocal(block, shard_info.getLocalNodeCount()); - /// dir_names is empty if shard has only local addresses - if (!shard_info.dir_names.empty()) - writeToShard(block, shard_info.dir_names); + if (shard_info.hasInternalReplication()) + writeToShard(block, {shard_info.dir_name_for_internal_replication}); + else + { + std::vector dir_names; + for (const auto & address : cluster->getShardsAddresses()[shard_id]) + if (!address.is_local) + dir_names.push_back(address.toStringFull()); + writeToShard(block, dir_names); + } } -void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_t repeats, size_t & finished_writings_count) +void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_t repeats) { InterpreterInsertQuery interp{query_ast, storage.context}; @@ -322,20 +376,15 @@ void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_ block_io.out->writePrefix(); for (size_t i = 0; i < repeats; ++i) - { block_io.out->write(block); - ++finished_writings_count; - } block_io.out->writeSuffix(); } -void DistributedBlockOutputStream::writeToShardSync( - const Block & block, const Cluster::ShardInfo & shard_info, size_t replica_id) +void DistributedBlockOutputStream::writeToShardSync(const Block & block, const std::string & connection_pool_name) { - const auto & dir_name = shard_info.dir_names[replica_id]; - auto pool = storage.requireConnectionPool(dir_name); + auto pool = storage.requireConnectionPool(connection_pool_name); auto connection = pool->get(); const auto & query_string = queryToString(query_ast); diff --git a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.h b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.h index 3ed4ed04e4..a74cbbc2fc 100644 --- a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.h +++ b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.h @@ -44,33 +44,45 @@ private: /// Performs synchronous insertion to remote nodes. If timeout_exceeded flag was set, throws. void writeSync(const Block & block); - void calculateRemoteJobsCount(); + void calculateJobsCount(); - ThreadPool::Job createWritingJob(std::vector & done_jobs, std::atomic & finished_jobs_count, - std::condition_variable & cond_var, const Block & block, size_t job_id, - const Cluster::ShardInfo & shard_info, size_t replica_id); + struct WritingJobContext + { + /// Remote job per replica. + std::vector done_remote_jobs; + /// Local job per shard. + std::vector done_local_jobs; + std::atomic finished_jobs_count; + std::mutex mutex; + std::condition_variable cond_var; + }; - void writeToLocal(const Blocks & blocks, size_t & finished_writings_count); + ThreadPool::Job createWritingJob(WritingJobContext & context, const Block & block, + const Cluster::Address & address, size_t shard_id, size_t job_id); - /// Returns the number of blocks was read for each cluster node. Uses during exception handling. - std::string getCurrentStateDescription(const std::vector & done_jobs, size_t finished_local_nodes_count); + void createWritingJobs(WritingJobContext & context, const Blocks & blocks); + + void waitForUnfinishedJobs(WritingJobContext & context); + + /// Returns the number of blocks was written for each cluster node. Uses during exception handling. + std::string getCurrentStateDescription(const WritingJobContext & context); IColumn::Selector createSelector(Block block); /// Split block between shards. Blocks splitBlock(const Block & block); - void writeSplit(const Block & block); + void writeSplitAsync(const Block & block); - void writeImpl(const Block & block, const size_t shard_id = 0); + void writeAsyncImpl(const Block & block, const size_t shard_id = 0); /// Increments finished_writings_count after each repeat. - void writeToLocal(const Block & block, const size_t repeats, size_t & finished_writings_count); + void writeToLocal(const Block & block, const size_t repeats); void writeToShard(const Block & block, const std::vector & dir_names); /// Performs synchronous insertion to remote node. - void writeToShardSync(const Block & block, const Cluster::ShardInfo & shard_info, size_t replica_id); + void writeToShardSync(const Block & block, const std::string & connection_pool_name); private: StorageDistributed & storage; @@ -81,6 +93,7 @@ private: size_t blocks_inserted = 0; std::chrono::steady_clock::time_point deadline; size_t remote_jobs_count; + size_t local_jobs_count; std::experimental::optional pool; }; diff --git a/dbms/src/Storages/MergeTree/ReshardingWorker.cpp b/dbms/src/Storages/MergeTree/ReshardingWorker.cpp index 6ef968975c..7dc8d622ae 100644 --- a/dbms/src/Storages/MergeTree/ReshardingWorker.cpp +++ b/dbms/src/Storages/MergeTree/ReshardingWorker.cpp @@ -1700,17 +1700,7 @@ std::string ReshardingWorker::createCoordinator(const Cluster & cluster) if (!cluster.getShardsAddresses().empty()) { size_t shard_no = 0; - for (const auto & address : cluster.getShardsAddresses()) - { - publish_address(address.host_name, shard_no); - publish_address(address.resolved_address.host().toString(), shard_no); - ++shard_no; - } - } - else if (!cluster.getShardsWithFailoverAddresses().empty()) - { - size_t shard_no = 0; - for (const auto & addresses : cluster.getShardsWithFailoverAddresses()) + for (const auto & addresses : cluster.getShardsAddresses()) { for (const auto & address : addresses) { diff --git a/dbms/src/Storages/System/StorageSystemClusters.cpp b/dbms/src/Storages/System/StorageSystemClusters.cpp index fb3b116030..628766e0a2 100644 --- a/dbms/src/Storages/System/StorageSystemClusters.cpp +++ b/dbms/src/Storages/System/StorageSystemClusters.cpp @@ -72,27 +72,10 @@ BlockInputStreams StorageSystemClusters::read( { const std::string cluster_name = entry.first; const ClusterPtr cluster = entry.second; - const auto & addresses = cluster->getShardsAddresses(); - const auto & addresses_with_failover = cluster->getShardsWithFailoverAddresses(); + const auto & addresses_with_failover = cluster->getShardsAddresses(); const auto & shards_info = cluster->getShardsInfo(); - if (!addresses.empty()) - { - auto it1 = addresses.cbegin(); - auto it2 = shards_info.cbegin(); - - while (it1 != addresses.cend()) - { - const auto & address = *it1; - const auto & shard_info = *it2; - - updateColumns(cluster_name, shard_info, address); - - ++it1; - ++it2; - } - } - else if (!addresses_with_failover.empty()) + if (!addresses_with_failover.empty()) { auto it1 = addresses_with_failover.cbegin(); auto it2 = shards_info.cbegin(); -- GitLab