提交 90940d84 编写于 作者: N Nikolai Kochetov

Cluster and DistributedBlockOutputStream refactoring [#CLICKHOUSE-3033]

上级 43dc201f
...@@ -77,6 +77,7 @@ Cluster::Address::Address(Poco::Util::AbstractConfiguration & config, const Stri ...@@ -77,6 +77,7 @@ Cluster::Address::Address(Poco::Util::AbstractConfiguration & config, const Stri
user = config.getString(config_prefix + ".user", "default"); user = config.getString(config_prefix + ".user", "default");
password = config.getString(config_prefix + ".password", ""); password = config.getString(config_prefix + ".password", "");
default_database = config.getString(config_prefix + ".default_database", ""); default_database = config.getString(config_prefix + ".default_database", "");
is_local = isLocal(*this);
} }
...@@ -98,6 +99,7 @@ Cluster::Address::Address(const String & host_port_, const String & user_, const ...@@ -98,6 +99,7 @@ Cluster::Address::Address(const String & host_port_, const String & user_, const
host_name = host_port_; host_name = host_port_;
port = default_port; port = default_port;
} }
is_local = isLocal(*this);
} }
...@@ -193,6 +195,8 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se ...@@ -193,6 +195,8 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
{ {
/// Shard without replicas. /// Shard without replicas.
Addresses addresses;
const auto & prefix = config_prefix + key; const auto & prefix = config_prefix + key;
const auto weight = config.getInt(prefix + ".weight", default_weight); const auto weight = config.getInt(prefix + ".weight", default_weight);
...@@ -204,11 +208,10 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se ...@@ -204,11 +208,10 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
info.shard_num = current_shard_num; info.shard_num = current_shard_num;
info.weight = weight; info.weight = weight;
if (isLocal(address)) if (address.is_local)
info.local_addresses.push_back(address); info.local_addresses.push_back(address);
else else
{ {
info.dir_names.push_back(address.toStringFull());
ConnectionPoolPtrs pools; ConnectionPoolPtrs pools;
pools.push_back(std::make_shared<ConnectionPool>( pools.push_back(std::make_shared<ConnectionPool>(
settings.distributed_connections_pool_size, settings.distributed_connections_pool_size,
...@@ -227,6 +230,7 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se ...@@ -227,6 +230,7 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size()); slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size());
shards_info.push_back(info); shards_info.push_back(info);
addresses_with_failover.push_back(addresses);
} }
else if (startsWith(key, "shard")) else if (startsWith(key, "shard"))
{ {
...@@ -244,10 +248,8 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se ...@@ -244,10 +248,8 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
bool internal_replication = config.getBool(partial_prefix + ".internal_replication", false); bool internal_replication = config.getBool(partial_prefix + ".internal_replication", false);
/** in case of internal_replication we will be appending names to /// in case of internal_replication we will be appending names to dir_name_for_internal_replication
* the first element of vector; otherwise we will just .emplace_back std::string dir_name_for_internal_replication;
*/
std::vector<std::string> dir_names{};
auto first = true; auto first = true;
for (const auto & replica_key : replica_keys) for (const auto & replica_key : replica_keys)
...@@ -261,18 +263,16 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se ...@@ -261,18 +263,16 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
replica_addresses.back().replica_num = current_replica_num; replica_addresses.back().replica_num = current_replica_num;
++current_replica_num; ++current_replica_num;
if (!isLocal(replica_addresses.back())) if (!replica_addresses.back().is_local)
{ {
if (internal_replication) if (internal_replication)
{ {
auto dir_name = replica_addresses.back().toStringFull(); auto dir_name = replica_addresses.back().toStringFull();
if (first) if (first)
dir_names.emplace_back(std::move(dir_name)); dir_name_for_internal_replication = dir_name;
else else
dir_names.front() += "," + dir_name; dir_name_for_internal_replication += "," + dir_name;
} }
else
dir_names.emplace_back(replica_addresses.back().toStringFull());
if (first) first = false; if (first) first = false;
} }
...@@ -288,7 +288,7 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se ...@@ -288,7 +288,7 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
for (const auto & replica : replica_addresses) for (const auto & replica : replica_addresses)
{ {
if (isLocal(replica)) if (replica.is_local)
shard_local_addresses.push_back(replica); shard_local_addresses.push_back(replica);
else else
{ {
...@@ -311,17 +311,18 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se ...@@ -311,17 +311,18 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
if (weight) if (weight)
slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size()); slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size());
shards_info.push_back({std::move(dir_names), current_shard_num, weight, shard_local_addresses, shard_pool, internal_replication}); shards_info.push_back({std::move(dir_name_for_internal_replication), current_shard_num, weight,
shard_local_addresses, shard_pool, internal_replication});
} }
else else
throw Exception("Unknown element in config: " + key, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); throw Exception("Unknown element in config: " + key, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
if (!addresses_with_failover.empty() && !addresses.empty())
throw Exception("There must be either 'node' or 'shard' elements in config", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
++current_shard_num; ++current_shard_num;
} }
if (addresses_with_failover.empty())
throw Exception("There must be either 'node' or 'shard' elements in config", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
initMisc(); initMisc();
} }
...@@ -409,12 +410,7 @@ void Cluster::calculateHashOfAddresses() ...@@ -409,12 +410,7 @@ void Cluster::calculateHashOfAddresses()
{ {
std::vector<std::string> elements; std::vector<std::string> elements;
if (!addresses.empty()) if (!addresses_with_failover.empty())
{
for (const auto & address : addresses)
elements.push_back(address.host_name + ":" + toString(address.port));
}
else if (!addresses_with_failover.empty())
{ {
for (const auto & addresses : addresses_with_failover) for (const auto & addresses : addresses_with_failover)
{ {
...@@ -453,8 +449,6 @@ std::unique_ptr<Cluster> Cluster::getClusterWithSingleShard(size_t index) const ...@@ -453,8 +449,6 @@ std::unique_ptr<Cluster> Cluster::getClusterWithSingleShard(size_t index) const
Cluster::Cluster(const Cluster & from, size_t index) Cluster::Cluster(const Cluster & from, size_t index)
: shards_info{from.shards_info[index]} : shards_info{from.shards_info[index]}
{ {
if (!from.addresses.empty())
addresses.emplace_back(from.addresses[index]);
if (!from.addresses_with_failover.empty()) if (!from.addresses_with_failover.empty())
addresses_with_failover.emplace_back(from.addresses_with_failover[index]); addresses_with_failover.emplace_back(from.addresses_with_failover[index]);
......
...@@ -55,6 +55,7 @@ public: ...@@ -55,6 +55,7 @@ public:
String password; String password;
String default_database; /// this database is selected when no database is specified for Distributed table String default_database; /// this database is selected when no database is specified for Distributed table
UInt32 replica_num; UInt32 replica_num;
bool is_local;
Address(Poco::Util::AbstractConfiguration & config, const String & config_prefix); Address(Poco::Util::AbstractConfiguration & config, const String & config_prefix);
Address(const String & host_port_, const String & user_, const String & password_); Address(const String & host_port_, const String & user_, const String & password_);
...@@ -80,8 +81,8 @@ public: ...@@ -80,8 +81,8 @@ public:
bool hasInternalReplication() const { return has_internal_replication; } bool hasInternalReplication() const { return has_internal_replication; }
public: public:
/// Contains names of directories for asynchronous write to StorageDistributed /// Name of directory for asynchronous write to StorageDistributed if has_internal_replication
std::vector<std::string> dir_names; std::string dir_name_for_internal_replication;
/// Number of the shard, the indexation begins with 1 /// Number of the shard, the indexation begins with 1
UInt32 shard_num; UInt32 shard_num;
UInt32 weight; UInt32 weight;
...@@ -94,8 +95,7 @@ public: ...@@ -94,8 +95,7 @@ public:
String getHashOfAddresses() const { return hash_of_addresses; } String getHashOfAddresses() const { return hash_of_addresses; }
const ShardsInfo & getShardsInfo() const { return shards_info; } const ShardsInfo & getShardsInfo() const { return shards_info; }
const Addresses & getShardsAddresses() const { return addresses; } const AddressesWithFailover & getShardsAddresses() const { return addresses_with_failover; }
const AddressesWithFailover & getShardsWithFailoverAddresses() const { return addresses_with_failover; }
const ShardInfo & getAnyShardInfo() const const ShardInfo & getAnyShardInfo() const
{ {
...@@ -144,8 +144,6 @@ private: ...@@ -144,8 +144,6 @@ private:
/// Non-empty is either addresses or addresses_with_failover. /// Non-empty is either addresses or addresses_with_failover.
/// The size and order of the elements in the corresponding array corresponds to shards_info. /// The size and order of the elements in the corresponding array corresponds to shards_info.
/// An array of shards. Each shard is the address of one server.
Addresses addresses;
/// An array of shards. For each shard, an array of replica addresses (servers that are considered identical). /// An array of shards. For each shard, an array of replica addresses (servers that are considered identical).
AddressesWithFailover addresses_with_failover; AddressesWithFailover addresses_with_failover;
......
...@@ -273,7 +273,7 @@ void DDLWorker::processTask(const DDLLogEntry & node, const std::string & node_n ...@@ -273,7 +273,7 @@ void DDLWorker::processTask(const DDLLogEntry & node, const std::string & node_n
String cluster_name = query->cluster; String cluster_name = query->cluster;
auto cluster = context.getCluster(cluster_name); auto cluster = context.getCluster(cluster_name);
auto shard_host_num = tryGetShardAndHostNum(cluster->getShardsWithFailoverAddresses(), host_name, port); auto shard_host_num = tryGetShardAndHostNum(cluster->getShardsAddresses(), host_name, port);
if (!shard_host_num) if (!shard_host_num)
{ {
throw Exception("Cannot find own address (" + host_id + ") in cluster " + cluster_name + " configuration", throw Exception("Cannot find own address (" + host_id + ") in cluster " + cluster_name + " configuration",
...@@ -283,7 +283,7 @@ void DDLWorker::processTask(const DDLLogEntry & node, const std::string & node_n ...@@ -283,7 +283,7 @@ void DDLWorker::processTask(const DDLLogEntry & node, const std::string & node_n
size_t shard_num = shard_host_num->first; size_t shard_num = shard_host_num->first;
size_t host_num = shard_host_num->second; size_t host_num = shard_host_num->second;
const auto & host_address = cluster->getShardsWithFailoverAddresses().at(shard_num).at(host_num); const auto & host_address = cluster->getShardsAddresses().at(shard_num).at(host_num);
ASTPtr rewritten_ast = query->getRewrittenASTWithoutOnCluster(host_address.default_database); ASTPtr rewritten_ast = query->getRewrittenASTWithoutOnCluster(host_address.default_database);
String rewritten_query = queryToString(rewritten_ast); String rewritten_query = queryToString(rewritten_ast);
...@@ -369,7 +369,7 @@ void DDLWorker::processTaskAlter( ...@@ -369,7 +369,7 @@ void DDLWorker::processTaskAlter(
throw Exception("Distributed DDL alters don't work properly yet", ErrorCodes::NOT_IMPLEMENTED); throw Exception("Distributed DDL alters don't work properly yet", ErrorCodes::NOT_IMPLEMENTED);
Strings replica_names; Strings replica_names;
for (const auto & address : cluster->getShardsWithFailoverAddresses().at(shard_num)) for (const auto & address : cluster->getShardsAddresses().at(shard_num))
replica_names.emplace_back(address.toString()); replica_names.emplace_back(address.toString());
std::sort(replica_names.begin(), replica_names.end()); std::sort(replica_names.begin(), replica_names.end());
...@@ -700,7 +700,7 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, Context & context) ...@@ -700,7 +700,7 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, Context & context)
entry.query = queryToString(query_ptr); entry.query = queryToString(query_ptr);
entry.initiator = ddl_worker.getHostName(); entry.initiator = ddl_worker.getHostName();
Cluster::AddressesWithFailover shards = cluster->getShardsWithFailoverAddresses(); Cluster::AddressesWithFailover shards = cluster->getShardsAddresses();
for (const auto & shard : shards) for (const auto & shard : shards)
{ {
for (const auto & addr : shard) for (const auto & addr : shard)
......
...@@ -45,22 +45,26 @@ namespace ProfileEvents ...@@ -45,22 +45,26 @@ namespace ProfileEvents
namespace DB namespace DB
{ {
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int TIMEOUT_EXCEEDED; extern const int TIMEOUT_EXCEEDED;
} }
DistributedBlockOutputStream::DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast, DistributedBlockOutputStream::DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast,
const ClusterPtr & cluster_, bool insert_sync_, UInt64 insert_timeout_) const ClusterPtr & cluster_, bool insert_sync_, UInt64 insert_timeout_)
: storage(storage), query_ast(query_ast), cluster(cluster_), insert_sync(insert_sync_), insert_timeout(insert_timeout_) : storage(storage), query_ast(query_ast), cluster(cluster_), insert_sync(insert_sync_), insert_timeout(insert_timeout_)
{ {
} }
void DistributedBlockOutputStream::writePrefix() void DistributedBlockOutputStream::writePrefix()
{ {
deadline = std::chrono::steady_clock::now() + std::chrono::seconds(insert_timeout); deadline = std::chrono::steady_clock::now() + std::chrono::seconds(insert_timeout);
} }
void DistributedBlockOutputStream::write(const Block & block) void DistributedBlockOutputStream::write(const Block & block)
{ {
if (insert_sync) if (insert_sync)
...@@ -69,21 +73,22 @@ void DistributedBlockOutputStream::write(const Block & block) ...@@ -69,21 +73,22 @@ void DistributedBlockOutputStream::write(const Block & block)
writeAsync(block); writeAsync(block);
} }
void DistributedBlockOutputStream::writeAsync(const Block & block) void DistributedBlockOutputStream::writeAsync(const Block & block)
{ {
if (storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1)) if (storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1))
return writeSplit(block); return writeSplitAsync(block);
writeImpl(block); writeAsyncImpl(block);
++blocks_inserted; ++blocks_inserted;
} }
ThreadPool::Job DistributedBlockOutputStream::createWritingJob( ThreadPool::Job DistributedBlockOutputStream::createWritingJob(
std::vector<bool> & done_jobs, std::atomic<unsigned> & finished_jobs_count, std::condition_variable & cond_var, WritingJobContext & context, const Block & block, const Cluster::Address & address, size_t shard_id, size_t job_id)
const Block & block, size_t job_id, const Cluster::ShardInfo & shard_info, size_t replica_id)
{ {
auto memory_tracker = current_memory_tracker; auto memory_tracker = current_memory_tracker;
return [this, memory_tracker, & done_jobs, & finished_jobs_count, & cond_var, & block, job_id, & shard_info, replica_id]() return [this, memory_tracker, & context, & block, & address, shard_id, job_id]()
{ {
if (!current_memory_tracker) if (!current_memory_tracker)
{ {
...@@ -92,34 +97,34 @@ ThreadPool::Job DistributedBlockOutputStream::createWritingJob( ...@@ -92,34 +97,34 @@ ThreadPool::Job DistributedBlockOutputStream::createWritingJob(
} }
try try
{ {
this->writeToShardSync(block, shard_info, replica_id); const auto & shard_info = cluster->getShardsInfo()[shard_id];
++finished_jobs_count; if (address.is_local)
done_jobs[job_id] = true; {
cond_var.notify_one(); writeToLocal(block, shard_info.getLocalNodeCount());
context.done_local_jobs[job_id] = true;
}
else
{
writeToShardSync(block, shard_info.hasInternalReplication()
? shard_info.dir_name_for_internal_replication
: address.toStringFull());
context.done_remote_jobs[job_id] = true;
}
++context.finished_jobs_count;
context.cond_var.notify_one();
} }
catch (...) catch (...)
{ {
++finished_jobs_count; ++context.finished_jobs_count;
cond_var.notify_one(); context.cond_var.notify_one();
throw; throw;
} }
}; };
} }
void DistributedBlockOutputStream::writeToLocal(const Blocks & blocks, size_t & finished_writings_count)
{
const Cluster::ShardsInfo & shards_info = cluster->getShardsInfo();
for (size_t shard_id : ext::range(0, shards_info.size()))
{
const auto & shard_info = shards_info[shard_id];
if (shard_info.getLocalNodeCount() > 0)
writeToLocal(blocks[shard_id], shard_info.getLocalNodeCount(), finished_writings_count);
}
}
std::string DistributedBlockOutputStream::getCurrentStateDescription( std::string DistributedBlockOutputStream::getCurrentStateDescription(const WritingJobContext & context)
const std::vector<bool> & done_jobs, size_t finished_local_nodes_count)
{ {
const Cluster::ShardsInfo & shards_info = cluster->getShardsInfo(); const Cluster::ShardsInfo & shards_info = cluster->getShardsInfo();
String description; String description;
...@@ -127,111 +132,154 @@ std::string DistributedBlockOutputStream::getCurrentStateDescription( ...@@ -127,111 +132,154 @@ std::string DistributedBlockOutputStream::getCurrentStateDescription(
buffer << "Insertion status:\n"; buffer << "Insertion status:\n";
auto writeDescription = [&buffer](const std::string & address, size_t shard_id, size_t blocks_wrote) auto writeDescription = [&buffer](const Cluster::Address & address, size_t shard_id, size_t blocks_wrote)
{ {
buffer << "Wrote " << blocks_wrote << " blocks on shard " << shard_id << " replica "; buffer << "Wrote " << blocks_wrote << " blocks on shard " << shard_id << " replica ";
buffer << unescapeForFileName(address) << '\n'; buffer << address.toString() << '\n';
}; };
size_t job_id = 0; const auto addresses_with_failovers = cluster->getShardsAddresses();
size_t remote_job_id = 0;
size_t local_job_id = 0;
for (size_t shard_id : ext::range(0, shards_info.size())) for (size_t shard_id : ext::range(0, shards_info.size()))
{ {
const auto & shard_info = shards_info[shard_id]; const auto & shard_info = shards_info[shard_id];
const auto & local_addresses = shard_info.local_addresses; /// If hasInternalReplication, than prefer local replica
if (!shard_info.hasInternalReplication() || !shard_info.isLocal())
for (const auto & address : local_addresses)
{ {
writeDescription(address.toStringFull(), shard_id, blocks_inserted + (finished_local_nodes_count ? 1 : 0)); for (const auto & address : addresses_with_failovers[shard_id])
if (finished_local_nodes_count) if (!address.is_local)
--finished_local_nodes_count; {
writeDescription(address, shard_id, blocks_inserted + (context.done_remote_jobs[remote_job_id++] ? 1 : 0));
if (shard_info.hasInternalReplication())
break;
}
} }
for (const auto & dir_name : shard_info.dir_names) if (shard_info.isLocal())
writeDescription(dir_name, shard_id, blocks_inserted + (done_jobs[job_id++] ? 1 : 0)); {
const auto & address = shard_info.local_addresses.front();
writeDescription(address, shard_id, blocks_inserted + (context.done_local_jobs[local_job_id++] ? 1 : 0));
}
} }
return description; return description;
} }
void DistributedBlockOutputStream::calculateRemoteJobsCount()
{
remote_jobs_count = 0;
const auto & shards_info = cluster->getShardsInfo();
for (const auto & shard_info : shards_info)
remote_jobs_count += shard_info.dir_names.size();
}
void DistributedBlockOutputStream::writeSync(const Block & block) void DistributedBlockOutputStream::createWritingJobs(WritingJobContext & context, const Blocks & blocks)
{ {
if (!pool) const auto & addresses_with_failovers = cluster->getShardsAddresses();
const Cluster::ShardsInfo & shards_info = cluster->getShardsInfo();
size_t remote_job_id = 0;
size_t local_job_id = 0;
for (size_t shard_id : ext::range(0, blocks.size()))
{ {
/// Deferred initialization. Only for sync insertion. const auto & shard_info = shards_info[shard_id];
calculateRemoteJobsCount(); /// If hasInternalReplication, than prefer local replica
pool.emplace(remote_jobs_count); if (!shard_info.hasInternalReplication() || !shard_info.isLocal())
{
for (const auto & address : addresses_with_failovers[shard_id])
if (!address.is_local)
{
pool->schedule(createWritingJob(context, blocks[shard_id], address, shard_id, remote_job_id++));
if (shard_info.hasInternalReplication())
break;
}
}
if (shards_info[shard_id].isLocal())
{
const auto & address = shards_info[shard_id].local_addresses.front();
pool->schedule(createWritingJob(context, blocks[shard_id], address, shard_id, local_job_id++));
}
} }
}
std::vector<bool> done_jobs(remote_jobs_count, false);
std::atomic<unsigned> finished_jobs_count(0);
std::mutex mutex;
std::condition_variable cond_var;
const Cluster::ShardsInfo & shards_info = cluster->getShardsInfo(); void DistributedBlockOutputStream::calculateJobsCount()
Blocks blocks = shards_info.size() > 1 ? splitBlock(block) : Blocks({block}); {
remote_jobs_count = 0;
local_jobs_count = 0;
size_t job_id = 0; const auto & addresses_with_failovers = cluster->getShardsAddresses();
for (size_t shard_id : ext::range(0, blocks.size()))
for (size_t replica_id: ext::range(0, shards_info[shard_id].dir_names.size())) const auto & shards_info = cluster->getShardsInfo();
pool->schedule(createWritingJob(done_jobs, finished_jobs_count, cond_var, for (size_t shard_id : ext::range(0, shards_info.size()))
blocks[shard_id], job_id++, shards_info[shard_id], replica_id));
const size_t jobs_count = job_id;
size_t finished_local_nodes_count;
const auto time_point = deadline;
auto timeout = insert_timeout;
auto & pool = this->pool;
auto wait = [& mutex, & cond_var, time_point, & finished_jobs_count, jobs_count, & pool, timeout]()
{ {
std::unique_lock<std::mutex> lock(mutex); const auto & shard_info = shards_info[shard_id];
auto cond = [& finished_jobs_count, jobs_count] { return finished_jobs_count == jobs_count; }; /// If hasInternalReplication, than prefer local replica
if (timeout) if (!shard_info.hasInternalReplication() || !shard_info.isLocal())
{ {
if (!cond_var.wait_until(lock, time_point, cond)) for (const auto & address : addresses_with_failovers[shard_id])
{ if (!address.is_local)
pool->wait(); {
ProfileEvents::increment(ProfileEvents::DistributedSyncInsertionTimeoutExceeded); ++remote_jobs_count;
throw Exception("Timeout exceeded.", ErrorCodes::TIMEOUT_EXCEEDED); if (shard_info.hasInternalReplication())
} break;
}
} }
else
cond_var.wait(lock, cond);
pool->wait();
};
std::exception_ptr exception; local_jobs_count += shard_info.isLocal() ? 1 : 0;
try }
}
void DistributedBlockOutputStream::waitForUnfinishedJobs(WritingJobContext & context)
{
std::unique_lock<std::mutex> lock(context.mutex);
size_t jobs_count = remote_jobs_count + local_jobs_count;
auto cond = [& context, jobs_count] { return context.finished_jobs_count == jobs_count; };
if (insert_timeout)
{ {
writeToLocal(blocks, finished_local_nodes_count); if (!context.cond_var.wait_until(lock, deadline, cond))
{
pool->wait();
ProfileEvents::increment(ProfileEvents::DistributedSyncInsertionTimeoutExceeded);
throw Exception("Timeout exceeded.", ErrorCodes::TIMEOUT_EXCEEDED);
}
} }
catch (...) else
context.cond_var.wait(lock, cond);
pool->wait();
}
void DistributedBlockOutputStream::writeSync(const Block & block)
{
if (!pool)
{ {
exception = std::current_exception(); /// Deferred initialization. Only for sync insertion.
calculateJobsCount();
pool.emplace(remote_jobs_count + local_jobs_count);
} }
WritingJobContext context;
context.done_remote_jobs.assign(remote_jobs_count, false);
context.done_local_jobs.assign(local_jobs_count, false);
context.finished_jobs_count = 0;
const Cluster::ShardsInfo & shards_info = cluster->getShardsInfo();
Blocks blocks = shards_info.size() > 1 ? splitBlock(block) : Blocks({block});
createWritingJobs(context, blocks);
try try
{ {
wait(); waitForUnfinishedJobs(context);
if (exception)
std::rethrow_exception(exception);
} }
catch(Exception & exception) catch(Exception & exception)
{ {
exception.addMessage(getCurrentStateDescription(done_jobs, finished_local_nodes_count)); exception.addMessage(getCurrentStateDescription(context));
throw; throw;
} }
++blocks_inserted; ++blocks_inserted;
} }
IColumn::Selector DistributedBlockOutputStream::createSelector(Block block) IColumn::Selector DistributedBlockOutputStream::createSelector(Block block)
{ {
storage.getShardingKeyExpr()->execute(block); storage.getShardingKeyExpr()->execute(block);
...@@ -288,33 +336,39 @@ Blocks DistributedBlockOutputStream::splitBlock(const Block & block) ...@@ -288,33 +336,39 @@ Blocks DistributedBlockOutputStream::splitBlock(const Block & block)
} }
void DistributedBlockOutputStream::writeSplit(const Block & block) void DistributedBlockOutputStream::writeSplitAsync(const Block & block)
{ {
Blocks splitted_blocks = splitBlock(block); Blocks splitted_blocks = splitBlock(block);
const size_t num_shards = splitted_blocks.size(); const size_t num_shards = splitted_blocks.size();
for (size_t shard_idx = 0; shard_idx < num_shards; ++shard_idx) for (size_t shard_idx = 0; shard_idx < num_shards; ++shard_idx)
if (splitted_blocks[shard_idx].rows()) if (splitted_blocks[shard_idx].rows())
writeImpl(splitted_blocks[shard_idx], shard_idx); writeAsyncImpl(splitted_blocks[shard_idx], shard_idx);
++blocks_inserted; ++blocks_inserted;
} }
void DistributedBlockOutputStream::writeImpl(const Block & block, const size_t shard_id) void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, const size_t shard_id)
{ {
const auto & shard_info = cluster->getShardsInfo()[shard_id]; const auto & shard_info = cluster->getShardsInfo()[shard_id];
size_t finished_writings_count = 0;
if (shard_info.getLocalNodeCount() > 0) if (shard_info.getLocalNodeCount() > 0)
writeToLocal(block, shard_info.getLocalNodeCount(), finished_writings_count); writeToLocal(block, shard_info.getLocalNodeCount());
/// dir_names is empty if shard has only local addresses if (shard_info.hasInternalReplication())
if (!shard_info.dir_names.empty()) writeToShard(block, {shard_info.dir_name_for_internal_replication});
writeToShard(block, shard_info.dir_names); else
{
std::vector<std::string> dir_names;
for (const auto & address : cluster->getShardsAddresses()[shard_id])
if (!address.is_local)
dir_names.push_back(address.toStringFull());
writeToShard(block, dir_names);
}
} }
void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_t repeats, size_t & finished_writings_count) void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_t repeats)
{ {
InterpreterInsertQuery interp{query_ast, storage.context}; InterpreterInsertQuery interp{query_ast, storage.context};
...@@ -322,20 +376,15 @@ void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_ ...@@ -322,20 +376,15 @@ void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_
block_io.out->writePrefix(); block_io.out->writePrefix();
for (size_t i = 0; i < repeats; ++i) for (size_t i = 0; i < repeats; ++i)
{
block_io.out->write(block); block_io.out->write(block);
++finished_writings_count;
}
block_io.out->writeSuffix(); block_io.out->writeSuffix();
} }
void DistributedBlockOutputStream::writeToShardSync( void DistributedBlockOutputStream::writeToShardSync(const Block & block, const std::string & connection_pool_name)
const Block & block, const Cluster::ShardInfo & shard_info, size_t replica_id)
{ {
const auto & dir_name = shard_info.dir_names[replica_id]; auto pool = storage.requireConnectionPool(connection_pool_name);
auto pool = storage.requireConnectionPool(dir_name);
auto connection = pool->get(); auto connection = pool->get();
const auto & query_string = queryToString(query_ast); const auto & query_string = queryToString(query_ast);
......
...@@ -44,33 +44,45 @@ private: ...@@ -44,33 +44,45 @@ private:
/// Performs synchronous insertion to remote nodes. If timeout_exceeded flag was set, throws. /// Performs synchronous insertion to remote nodes. If timeout_exceeded flag was set, throws.
void writeSync(const Block & block); void writeSync(const Block & block);
void calculateRemoteJobsCount(); void calculateJobsCount();
ThreadPool::Job createWritingJob(std::vector<bool> & done_jobs, std::atomic<unsigned> & finished_jobs_count, struct WritingJobContext
std::condition_variable & cond_var, const Block & block, size_t job_id, {
const Cluster::ShardInfo & shard_info, size_t replica_id); /// Remote job per replica.
std::vector<bool> done_remote_jobs;
/// Local job per shard.
std::vector<bool> done_local_jobs;
std::atomic<unsigned> finished_jobs_count;
std::mutex mutex;
std::condition_variable cond_var;
};
void writeToLocal(const Blocks & blocks, size_t & finished_writings_count); ThreadPool::Job createWritingJob(WritingJobContext & context, const Block & block,
const Cluster::Address & address, size_t shard_id, size_t job_id);
/// Returns the number of blocks was read for each cluster node. Uses during exception handling. void createWritingJobs(WritingJobContext & context, const Blocks & blocks);
std::string getCurrentStateDescription(const std::vector<bool> & done_jobs, size_t finished_local_nodes_count);
void waitForUnfinishedJobs(WritingJobContext & context);
/// Returns the number of blocks was written for each cluster node. Uses during exception handling.
std::string getCurrentStateDescription(const WritingJobContext & context);
IColumn::Selector createSelector(Block block); IColumn::Selector createSelector(Block block);
/// Split block between shards. /// Split block between shards.
Blocks splitBlock(const Block & block); Blocks splitBlock(const Block & block);
void writeSplit(const Block & block); void writeSplitAsync(const Block & block);
void writeImpl(const Block & block, const size_t shard_id = 0); void writeAsyncImpl(const Block & block, const size_t shard_id = 0);
/// Increments finished_writings_count after each repeat. /// Increments finished_writings_count after each repeat.
void writeToLocal(const Block & block, const size_t repeats, size_t & finished_writings_count); void writeToLocal(const Block & block, const size_t repeats);
void writeToShard(const Block & block, const std::vector<std::string> & dir_names); void writeToShard(const Block & block, const std::vector<std::string> & dir_names);
/// Performs synchronous insertion to remote node. /// Performs synchronous insertion to remote node.
void writeToShardSync(const Block & block, const Cluster::ShardInfo & shard_info, size_t replica_id); void writeToShardSync(const Block & block, const std::string & connection_pool_name);
private: private:
StorageDistributed & storage; StorageDistributed & storage;
...@@ -81,6 +93,7 @@ private: ...@@ -81,6 +93,7 @@ private:
size_t blocks_inserted = 0; size_t blocks_inserted = 0;
std::chrono::steady_clock::time_point deadline; std::chrono::steady_clock::time_point deadline;
size_t remote_jobs_count; size_t remote_jobs_count;
size_t local_jobs_count;
std::experimental::optional<ThreadPool> pool; std::experimental::optional<ThreadPool> pool;
}; };
......
...@@ -1700,17 +1700,7 @@ std::string ReshardingWorker::createCoordinator(const Cluster & cluster) ...@@ -1700,17 +1700,7 @@ std::string ReshardingWorker::createCoordinator(const Cluster & cluster)
if (!cluster.getShardsAddresses().empty()) if (!cluster.getShardsAddresses().empty())
{ {
size_t shard_no = 0; size_t shard_no = 0;
for (const auto & address : cluster.getShardsAddresses()) for (const auto & addresses : cluster.getShardsAddresses())
{
publish_address(address.host_name, shard_no);
publish_address(address.resolved_address.host().toString(), shard_no);
++shard_no;
}
}
else if (!cluster.getShardsWithFailoverAddresses().empty())
{
size_t shard_no = 0;
for (const auto & addresses : cluster.getShardsWithFailoverAddresses())
{ {
for (const auto & address : addresses) for (const auto & address : addresses)
{ {
......
...@@ -72,27 +72,10 @@ BlockInputStreams StorageSystemClusters::read( ...@@ -72,27 +72,10 @@ BlockInputStreams StorageSystemClusters::read(
{ {
const std::string cluster_name = entry.first; const std::string cluster_name = entry.first;
const ClusterPtr cluster = entry.second; const ClusterPtr cluster = entry.second;
const auto & addresses = cluster->getShardsAddresses(); const auto & addresses_with_failover = cluster->getShardsAddresses();
const auto & addresses_with_failover = cluster->getShardsWithFailoverAddresses();
const auto & shards_info = cluster->getShardsInfo(); const auto & shards_info = cluster->getShardsInfo();
if (!addresses.empty()) if (!addresses_with_failover.empty())
{
auto it1 = addresses.cbegin();
auto it2 = shards_info.cbegin();
while (it1 != addresses.cend())
{
const auto & address = *it1;
const auto & shard_info = *it2;
updateColumns(cluster_name, shard_info, address);
++it1;
++it2;
}
}
else if (!addresses_with_failover.empty())
{ {
auto it1 = addresses_with_failover.cbegin(); auto it1 = addresses_with_failover.cbegin();
auto it2 = shards_info.cbegin(); auto it2 = shards_info.cbegin();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册