提交 ea2b4867 编写于 作者: N Nikolai Kochetov

added threadpool to DistributedBlockOutputStream

上级 63ec81e2
......@@ -12,21 +12,22 @@
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/RemoteBlockOutputStream.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/createBlockSelector.h>
#include <DataTypes/DataTypesNumber.h>
#include <Common/setThreadName.h>
#include <Common/ClickHouseRevision.h>
#include <Common/CurrentMetrics.h>
#include <Common/typeid_cast.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Common/MemoryTracker.h>
#include <Common/escapeForFileName.h>
#include <common/logger_useful.h>
#include <ext/range.h>
#include <Poco/DirectoryIterator.h>
#include <iostream>
#include <future>
#include <condition_variable>
#include <mutex>
......@@ -55,16 +56,9 @@ DistributedBlockOutputStream::DistributedBlockOutputStream(StorageDistributed &
{
}
DistributedBlockOutputStream::writePrefix()
void DistributedBlockOutputStream::writePrefix()
{
deadline = std::chrono::steady_clock::now() + std::chrono::seconds(insert_timeout);
remote_jobs_count = 0;
if (storage.getShardingKeyExpr())
{
const auto & shards_info = cluster->getShardsInfo();
for (const auto & shard_info : shards_info)
remote_jobs_count += shard_info.dir_names.size();
}
}
void DistributedBlockOutputStream::write(const Block & block)
......@@ -84,13 +78,12 @@ void DistributedBlockOutputStream::writeAsync(const Block & block)
++blocks_inserted;
}
ThreadPool::Job createWritingJob(std::vector<bool> & done_jobs, std::atomic<unsigned> & finished_jobs_count,
std::condition_variable & cond_var, const Block & block, size_t job_id,
const Cluster::ShardInfo & shard_info, size_t replica_id)
ThreadPool::Job DistributedBlockOutputStream::createWritingJob(
std::vector<bool> & done_jobs, std::atomic<unsigned> & finished_jobs_count, std::condition_variable & cond_var,
const Block & block, size_t job_id, const Cluster::ShardInfo & shard_info, size_t replica_id)
{
auto memory_tracker = current_memory_tracker;
return [this, memory_tracker, & done_jobs, & finished_jobs_count, & cond_var, & block,
size_t job_id, const Cluster::ShardInfo & shard_info, size_t replica_id]()
return [this, memory_tracker, & done_jobs, & finished_jobs_count, & cond_var, & block, job_id, & shard_info, replica_id]()
{
if (!current_memory_tracker)
{
......@@ -113,51 +106,126 @@ ThreadPool::Job createWritingJob(std::vector<bool> & done_jobs, std::atomic<unsi
};
}
void DistributedBlockOutputStream::writeToLocal(const Blocks & blocks)
void DistributedBlockOutputStream::writeToLocal(const Blocks & blocks, size_t & finished_writings_count)
{
const Cluster::ShardInfo & shard_info = cluster->getShardsInfo();
for (size_t shard_id: ext::range(0, shards_info.size()))
const Cluster::ShardsInfo & shards_info = cluster->getShardsInfo();
for (size_t shard_id : ext::range(0, shards_info.size()))
{
const auto & shard_info = shards_info[shard_id];
if (shard_info.getLocalNodeCount() > 0)
writeToLocal(blocks[shard_id], shard_info.getLocalNodeCount());
writeToLocal(blocks[shard_id], shard_info.getLocalNodeCount(), finished_writings_count);
}
}
std::string getCurrentStateDescription(const std::vector<bool> & done_jobs)
std::string DistributedBlockOutputStream::getCurrentStateDescription(
const std::vector<bool> & done_jobs, size_t finished_local_nodes_count)
{
const Cluster::ShardsInfo & shards_info = cluster->getShardsInfo();
String description;
WriteBufferFromString buffer(description);
buffer << "Insertion status:\n";
auto writeDescription = [&buffer](const std::string & address, size_t shard_id, size_t blocks_wrote)
{
buffer << "Wrote " << blocks_wrote << " blocks on shard " << shard_id << " replica ";
buffer << unescapeForFileName(address) << '\n';
};
size_t job_id = 0;
for (size_t shard_id : ext::range(0, shards_info.size()))
{
const auto & shard_info = shards_info[shard_id];
const auto & local_addresses = shard_info.local_addresses;
for (const auto & address : local_addresses)
{
writeDescription(address.toStringFull(), shard_id, blocks_inserted + (finished_local_nodes_count ? 1 : 0));
if (finished_local_nodes_count)
--finished_local_nodes_count;
}
for (const auto & dir_name : shard_info.dir_names)
writeDescription(dir_name, shard_id, blocks_inserted + (done_jobs[job_id++] ? 1 : 0));
}
return description;
}
void DistributedBlockOutputStream::calculateRemoteJobsCount()
{
remote_jobs_count = 0;
const auto & shards_info = cluster->getShardsInfo();
for (const auto & shard_info : shards_info)
remote_jobs_count += shard_info.dir_names.size();
}
void DistributedBlockOutputStream::writeSync(const Block & block)
{
if (!pool)
pool = ThreadPool(remote_jobs_count);
{
/// Deferred initialization. Only for sync insertion.
calculateRemoteJobsCount();
pool.emplace(remote_jobs_count);
}
std::vector<bool> done_jobs(remote_jobs_count, false);
std::atomic<unsigned> finished_jobs_count = 0;
std::atomic<unsigned> finished_jobs_count(0);
std::mutex mutex;
std::condition_variable cond_var;
const Cluster::ShardInfo & shard_info = cluster->getShardsInfo();
Blocks blocks = shard_info.size() > 1 ? splitBlocks(block) : Blocks({block});
const Cluster::ShardsInfo & shards_info = cluster->getShardsInfo();
Blocks blocks = shards_info.size() > 1 ? splitBlock(block) : Blocks({block});
size_t job_id = 0;
for (size_t shard_id: ext::range(0, blocks.size()))
for (size_t replica_id : ext::range(0, shards_info[shard_id].dir_names.size()))
pool->schledule(createWritingJob(jobs_done, finished_jobs_count, cond_var,
blocks[shard_id], job_id++, shards_info[shard_id], replica_id));
try
writeToLocal(blocks);
catch(Exception & exception)
for (size_t shard_id : ext::range(0, blocks.size()))
for (size_t replica_id: ext::range(0, shards_info[shard_id].dir_names.size()))
pool->schedule(createWritingJob(done_jobs, finished_jobs_count, cond_var,
blocks[shard_id], job_id++, shards_info[shard_id], replica_id));
const size_t jobs_count = job_id;
size_t finished_local_nodes_count;
const auto time_point = deadline;
auto timeout = insert_timeout;
auto & pool = this->pool;
auto wait = [& mutex, & cond_var, time_point, & finished_jobs_count, jobs_count, & pool, timeout]()
{
try
pool->wait();
catch(Exception & exception)
std::unique_lock<std::mutex> lock(mutex);
auto cond = [& finished_jobs_count, jobs_count] { return finished_jobs_count == jobs_count; };
if (timeout)
{
throw;
if (!cond_var.wait_until(lock, time_point, cond))
{
pool->wait();
ProfileEvents::increment(ProfileEvents::DistributedSyncInsertionTimeoutExceeded);
throw Exception("Timeout exceeded.", ErrorCodes::TIMEOUT_EXCEEDED);
}
}
else
cond_var.wait(lock, cond);
pool->wait();
};
std::exception_ptr exception;
try
{
writeToLocal(blocks, finished_local_nodes_count);
}
catch (...)
{
exception = std::current_exception();
}
try
{
wait();
if (exception)
std::rethrow_exception(exception);
}
catch(Exception & exception)
{
exception.addMessage(getCurrentStateDescription(done_jobs, finished_local_nodes_count));
throw;
}
......@@ -190,7 +258,7 @@ IColumn::Selector DistributedBlockOutputStream::createSelector(Block block)
}
Blocks DistributedBlockOutputStream::splitBlocks(const Block & block)
Blocks DistributedBlockOutputStream::splitBlock(const Block & block)
{
const auto num_cols = block.columns();
/// cache column pointers for later reuse
......@@ -222,7 +290,7 @@ Blocks DistributedBlockOutputStream::splitBlocks(const Block & block)
void DistributedBlockOutputStream::writeSplit(const Block & block)
{
Blocks splitted_blocks = splitBlocks(block);
Blocks splitted_blocks = splitBlock(block);
const size_t num_shards = splitted_blocks.size();
for (size_t shard_idx = 0; shard_idx < num_shards; ++shard_idx)
......@@ -236,29 +304,17 @@ void DistributedBlockOutputStream::writeSplit(const Block & block)
void DistributedBlockOutputStream::writeImpl(const Block & block, const size_t shard_id)
{
const auto & shard_info = cluster->getShardsInfo()[shard_id];
size_t finished_writings_count = 0;
if (shard_info.getLocalNodeCount() > 0)
writeToLocal(block, shard_info.getLocalNodeCount());
writeToLocal(block, shard_info.getLocalNodeCount(), finished_writings_count);
/// dir_names is empty if shard has only local addresses
if (!shard_info.dir_names.empty())
{
if (!insert_sync)
writeToShard(block, shard_info.dir_names);
else
{
std::atomic<bool> timeout_exceeded(false);
auto launch = insert_timeout ? std::launch::async : std::launch::deferred;
auto result = std::async(launch, &DistributedBlockOutputStream::writeToShardSync, this, std::cref(block),
std::cref(shard_info.dir_names), shard_id, std::ref(timeout_exceeded));
if (insert_timeout && result.wait_until(deadline) == std::future_status::timeout)
timeout_exceeded = true;
result.get();
}
}
writeToShard(block, shard_info.dir_names);
}
void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_t repeats)
void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_t repeats, size_t & finished_writings_count)
{
InterpreterInsertQuery interp{query_ast, storage.context};
......@@ -266,60 +322,33 @@ void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_
block_io.out->writePrefix();
for (size_t i = 0; i < repeats; ++i)
{
block_io.out->write(block);
++finished_writings_count;
}
block_io.out->writeSuffix();
}
void DistributedBlockOutputStream::writeToShardSync(const Block & block, const std::vector<std::string> & dir_names,
size_t shard_id, const std::atomic<bool> & timeout_exceeded)
void DistributedBlockOutputStream::writeToShardSync(
const Block & block, const Cluster::ShardInfo & shard_info, size_t replica_id)
{
auto & blocks_inserted = this->blocks_inserted;
auto writeNodeDescription = [shard_id, & blocks_inserted](WriteBufferFromString & out, const Connection & connection)
{
out << " (While insertion to " << connection.getDescription() << " shard " << shard_id;
out << " Inserted blocks: " << blocks_inserted << ")";
};
const auto & dir_name = shard_info.dir_names[replica_id];
auto pool = storage.requireConnectionPool(dir_name);
auto connection = pool->get();
const auto & query_string = queryToString(query_ast);
for (const auto & dir_name : dir_names)
{
auto pool = storage.requireConnectionPool(dir_name);
auto connection = pool->get();
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
if (timeout_exceeded)
{
ProfileEvents::increment(ProfileEvents::DistributedSyncInsertionTimeoutExceeded);
String message;
ut(message);
out << "Timeout exceeded.";
writeNodeDescription(out, *connection);
throw Exception(message, ErrorCodes::TIMEOUT_EXCEEDED);
}
RemoteBlockOutputStream remote{*connection, query_string};
try
{
RemoteBlockOutputStream remote{*connection, query_string};
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
remote.writePrefix();
remote.write(block);
remote.writeSuffix();
}
catch (Exception & exception)
{
String message;
WriteBufferFromString out(message);
writeNodeDescription(out, *connection);
exception.addMessage(message);
exception.rethrow();
}
}
remote.writePrefix();
remote.write(block);
remote.writeSuffix();
}
void DistributedBlockOutputStream::writeToShard(const Block & block, const std::vector<std::string> & dir_names)
{
/** tmp directory is used to ensure atomicity of transactions
......@@ -374,4 +403,5 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
Poco::File(first_file_tmp_path).remove();
}
}
......@@ -8,6 +8,7 @@
#include <memory>
#include <chrono>
#include <experimental/optional>
#include <Interpreters/Cluster.h>
namespace Poco
{
......@@ -18,8 +19,6 @@ namespace DB
{
class StorageDistributed;
class Cluster;
using ClusterPtr = std::shared_ptr<Cluster>;
/** If insert_sync_ is true, the write is synchronous. Uses insert_timeout_ if it is not zero.
* Otherwise, the write is asynchronous - the data is first written to the local filesystem, and then sent to the remote servers.
......@@ -42,31 +41,36 @@ public:
private:
void writeAsync(const Block & block);
/// Performs synchronous insertion to remote nodes. If timeout_exceeded flag was set, throws.
void writeSync(const Block & block);
void calculateRemoteJobsCount();
ThreadPool::Job createWritingJob(std::vector<bool> & done_jobs, std::atomic<unsigned> & finished_jobs_count,
std::condition_variable & cond_var, const Block & block, size_t job_id,
const Cluster::ShardInfo & shard_info, size_t replica_id);
void writeToLocal(const Blocks & blocks);
void writeToLocal(const Blocks & blocks, size_t & finished_writings_count);
std::string getCurrentStateDescription(const std::vector<bool> & done_jobs);
/// Returns the number of blocks was read for each cluster node. Uses during exception handling.
std::string getCurrentStateDescription(const std::vector<bool> & done_jobs, size_t finished_local_nodes_count);
IColumn::Selector createSelector(Block block);
/// Split block between shards.
Blocks splitBlock(const Block & block);
void writeSplit(const Block & block);
void writeImpl(const Block & block, const size_t shard_id = 0);
void writeToLocal(const Block & block, const size_t repeats);
/// Increments finished_writings_count after each repeat.
void writeToLocal(const Block & block, const size_t repeats, size_t & finished_writings_count);
void writeToShard(const Block & block, const std::vector<std::string> & dir_names);
/// Performs synchronous insertion to remote nodes. If timeout_exceeded flag was set, throws.
void writeToShardSync(const Block & block, const std::vector<std::string> & dir_names,
size_t shard_id, const std::atomic<bool> & timeout_exceeded);
/// Performs synchronous insertion to remote node.
void writeToShardSync(const Block & block, const Cluster::ShardInfo & shard_info, size_t replica_id);
private:
StorageDistributed & storage;
......
......@@ -16,10 +16,9 @@
class ThreadPool
{
private:
public:
using Job = std::function<void()>;
public:
/// Size is constant, all threads are created immediately.
ThreadPool(size_t m_size);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册