提交 2f8f199d 编写于 作者: N Nikolai Kochetov

separated connection pool creation from StorageDistributedDirectoryMonitor in...

separated connection pool creation from StorageDistributedDirectoryMonitor in StorageDistributed; fixed bugs from review [#CLICKHOUSE-3033]
上级 19d3c368
......@@ -122,6 +122,8 @@
M(DictCacheRequests) \
M(DictCacheLockWriteNs) \
M(DictCacheLockReadNs) \
\
M(DistributedSyncInsertionTimeoutExceeded) \
namespace ProfileEvents
......
......@@ -86,8 +86,8 @@ namespace
}
StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(StorageDistributed & storage, const std::string & name)
: storage(storage), pool{createPool(name)}, path{storage.path + name + '/'}
StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(StorageDistributed & storage, const std::string & name, ConnectionPoolPtr pool)
: storage(storage), pool{pool}, path{storage.path + name + '/'}
, current_batch_file_path{path + "current_batch.txt"}
, default_sleep_time{storage.context.getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds()}
, sleep_time{default_sleep_time}
......@@ -150,9 +150,9 @@ void StorageDistributedDirectoryMonitor::run()
}
ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::string & name)
ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::string & name, const StorageDistributed & storage)
{
const auto pool_factory = [this, &name] (const std::string & host, const UInt16 port,
const auto pool_factory = [&storage, &name] (const std::string & host, const UInt16 port,
const std::string & user, const std::string & password,
const std::string & default_database)
{
......
......@@ -16,14 +16,13 @@ namespace DB
class StorageDistributedDirectoryMonitor
{
public:
StorageDistributedDirectoryMonitor(StorageDistributed & storage, const std::string & name);
StorageDistributedDirectoryMonitor(StorageDistributed & storage, const std::string & name, ConnectionPoolPtr pool);
~StorageDistributedDirectoryMonitor();
const ConnectionPoolPtr & getPool() const { return pool; }
static ConnectionPoolPtr createPool(const std::string & name, const StorageDistributed & storage);
private:
void run();
ConnectionPoolPtr createPool(const std::string & name);
bool findFiles();
void processFile(const std::string & file_path);
void processFilesWithBatching(const std::map<UInt64, std::string> & files);
......
......@@ -7,6 +7,8 @@
#include <IO/WriteBufferFromFile.h>
#include <IO/CompressedWriteBuffer.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/RemoteBlockOutputStream.h>
#include <Interpreters/InterpreterInsertQuery.h>
......@@ -18,6 +20,7 @@
#include <Common/CurrentMetrics.h>
#include <Common/typeid_cast.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <common/logger_useful.h>
#include <Poco/DirectoryIterator.h>
......@@ -30,6 +33,10 @@ namespace CurrentMetrics
extern const Metric DistributedSend;
}
namespace ProfileEvents
{
extern const Event DistributedSyncInsertionTimeoutExceeded;
}
namespace DB
{
......@@ -41,9 +48,7 @@ namespace ErrorCodes
DistributedBlockOutputStream::DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast,
const ClusterPtr & cluster_, bool insert_sync_, UInt64 insert_timeout_)
: storage(storage), query_ast(query_ast), cluster(cluster_), insert_sync(insert_sync_), insert_timeout(insert_timeout_),
deadline(std::chrono::system_clock::now() + std::chrono::seconds(insert_timeout)),
log(&Poco::Logger::get("DistributedBlockOutputStream"))
: storage(storage), query_ast(query_ast), cluster(cluster_), insert_sync(insert_sync_), insert_timeout(insert_timeout_)
{
}
......@@ -132,8 +137,9 @@ void DistributedBlockOutputStream::writeImpl(const Block & block, const size_t s
else
{
std::atomic<bool> timeout_exceeded(false);
auto result = std::async(std::launch::async, &DistributedBlockOutputStream::writeToShardDirect,
this, std::cref(block), std::cref(shard_info.dir_names), std::ref(timeout_exceeded));
auto launch = insert_timeout ? std::launch::async : std::launch::deferred;
auto result = std::async(launch, &DistributedBlockOutputStream::writeToShardSync, this, std::cref(block),
std::cref(shard_info.dir_names), shard_id, std::ref(timeout_exceeded));
if (insert_timeout && result.wait_until(deadline) == std::future_status::timeout)
timeout_exceeded = true;
result.get();
......@@ -156,19 +162,34 @@ void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_
}
void DistributedBlockOutputStream::writeToShardDirect(const Block & block, const std::vector<std::string> & dir_names, std::atomic<bool> & timeout_exceeded)
void DistributedBlockOutputStream::writeToShardSync(const Block & block, const std::vector<std::string> & dir_names,
size_t shard_id, const std::atomic<bool> & timeout_exceeded)
{
auto & blocks_inserted = this->blocks_inserted;
auto writeNodeDescription = [shard_id, & blocks_inserted](WriteBufferFromString & out, const Connection & connection)
{
out << " (While insertion to " << connection.getDescription() << " shard " << shard_id;
out << " Inserted blocks: " << blocks_inserted << ")";
};
const auto & query_string = queryToString(query_ast);
for (const auto & dir_name : dir_names)
{
auto & monitor = storage.requireDirectoryMonitor(dir_name);
auto & pool = monitor.getPool();
auto pool = storage.requireConnectionPool(dir_name);
auto connection = pool->get();
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
if (timeout_exceeded)
throw Exception("Timeout exceeded. Inserted blocks: " + std::to_string(blocks_inserted), ErrorCodes::TIMEOUT_EXCEEDED);
{
ProfileEvents::increment(ProfileEvents::DistributedSyncInsertionTimeoutExceeded);
String message;
WriteBufferFromString out(message);
out << "Timeout exceeded.";
writeNodeDescription(out, *connection);
throw Exception(message, ErrorCodes::TIMEOUT_EXCEEDED);
}
try
{
......@@ -180,11 +201,10 @@ void DistributedBlockOutputStream::writeToShardDirect(const Block & block, const
}
catch (Exception & exception)
{
std::string message = "\nWhile insertion to ";
message += connection->getDescription();
message += " Inserted blocks: " + std::to_string(blocks_inserted);
String message;
WriteBufferFromString out(message);
writeNodeDescription(out, *connection);
exception.addMessage(message);
LOG_ERROR(log, message);
exception.rethrow();
}
}
......
......@@ -19,7 +19,8 @@ class StorageDistributed;
class Cluster;
using ClusterPtr = std::shared_ptr<Cluster>;
/** The write is asynchronous - the data is first written to the local filesystem, and then sent to the remote servers.
/** If insert_sync_ is true, the write is synchronous. Uses insert_timeout_ if it is not zero.
* Otherwise, the write is asynchronous - the data is first written to the local filesystem, and then sent to the remote servers.
* If the Distributed table uses more than one shard, then in order to support the write,
* when creating the table, an additional parameter must be specified for ENGINE - the sharding key.
* Sharding key is an arbitrary expression from the columns. For example, rand() or UserID.
......@@ -30,11 +31,11 @@ using ClusterPtr = std::shared_ptr<Cluster>;
class DistributedBlockOutputStream : public IBlockOutputStream
{
public:
DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast, const ClusterPtr & cluster_, bool insert_sync_, UInt64 insert_timeout_ = 0);
DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast, const ClusterPtr & cluster_, bool insert_sync_, UInt64 insert_timeout_);
void write(const Block & block) override;
void writePrefix() override { deadline = std::chrono::system_clock::now() + std::chrono::seconds(insert_timeout); }
void writePrefix() override { deadline = std::chrono::steady_clock::now() + std::chrono::seconds(insert_timeout); }
private:
IColumn::Selector createSelector(Block block);
......@@ -47,18 +48,18 @@ private:
void writeToShard(const Block & block, const std::vector<std::string> & dir_names);
void writeToShardDirect(const Block & block, const std::vector<std::string> & dir_names, std::atomic<bool> & timeout_exceeded);
/// Performs synchronous insertion to remote nodes. If timeout_exceeded flag was set, throws.
void writeToShardSync(const Block & block, const std::vector<std::string> & dir_names,
size_t shard_id, const std::atomic<bool> & timeout_exceeded);
private:
StorageDistributed & storage;
ASTPtr query_ast;
ClusterPtr cluster;
bool insert_sync = true;
UInt64 insert_timeout = 1;
bool insert_sync;
UInt64 insert_timeout;
size_t blocks_inserted = 0;
std::chrono::system_clock::time_point deadline;
Poco::Logger * log;
std::chrono::steady_clock::time_point deadline;
};
}
......@@ -273,7 +273,7 @@ void StorageDistributed::startup()
void StorageDistributed::shutdown()
{
directory_monitors.clear();
cluster_nodes_data.clear();
}
......@@ -455,13 +455,6 @@ bool StorageDistributed::hasColumn(const String & column_name) const
return VirtualColumnFactory::hasColumn(column_name) || IStorage::hasColumn(column_name);
}
StorageDistributedDirectoryMonitor & StorageDistributed::createDirectoryMonitor(const std::string & name)
{
return *(directory_monitors.emplace(name, std::make_unique<StorageDistributedDirectoryMonitor>(*this, name)).first->second);
}
void StorageDistributed::createDirectoryMonitors()
{
if (path.empty())
......@@ -473,16 +466,20 @@ void StorageDistributed::createDirectoryMonitors()
boost::filesystem::directory_iterator end;
for (auto it = begin; it != end; ++it)
if (it->status().type() == boost::filesystem::directory_file)
createDirectoryMonitor(it->path().filename().string());
requireDirectoryMonitor(it->path().filename().string());
}
StorageDistributedDirectoryMonitor & StorageDistributed::requireDirectoryMonitor(const std::string & name)
void StorageDistributed::requireDirectoryMonitor(const std::string & name)
{
cluster_nodes_data[name].requireDirectoryMonitor(name, *this);
}
ConnectionPoolPtr StorageDistributed::requireConnectionPool(const std::string & name)
{
auto it = directory_monitors.find(name);
if (it == directory_monitors.end())
return createDirectoryMonitor(name);
return *it->second;
auto & node_data = cluster_nodes_data[name];
node_data.requireConnectionPool(name, *this);
return node_data.conneciton_pool;
}
size_t StorageDistributed::getShardCount() const
......@@ -496,4 +493,17 @@ ClusterPtr StorageDistributed::getCluster() const
return (owned_cluster) ? owned_cluster : context.getCluster(cluster_name);
}
void StorageDistributed::ClusterNodeData::requireConnectionPool(const std::string & name, const StorageDistributed & storage)
{
if (!conneciton_pool)
conneciton_pool = StorageDistributedDirectoryMonitor::createPool(name, storage);
}
void StorageDistributed::ClusterNodeData::requireDirectoryMonitor(const std::string & name, StorageDistributed & storage)
{
requireConnectionPool(name, storage);
if (!directory_monitor)
directory_monitor = std::make_unique<StorageDistributedDirectoryMonitor>(storage, name, conneciton_pool);
}
}
......@@ -116,13 +116,12 @@ private:
const ASTPtr & sharding_key_ = nullptr,
const String & data_path_ = String{});
/// create directory monitor thread by subdirectory name
StorageDistributedDirectoryMonitor & createDirectoryMonitor(const std::string & name);
/// create directory monitors for each existing subdirectory
void createDirectoryMonitors();
/// ensure directory monitor creation and return it
StorageDistributedDirectoryMonitor & requireDirectoryMonitor(const std::string & name);
/// ensure directory monitor thread by subdirectory name creation
void requireDirectoryMonitor(const std::string & name);
/// ensure connection pool creation and return it
ConnectionPoolPtr requireConnectionPool(const std::string & name);
ClusterPtr getCluster() const;
......@@ -146,7 +145,17 @@ private:
String sharding_key_column_name;
String path; /// Can be empty if data_path_ is empty. In this case, a directory for the data to be sent is not created.
std::unordered_map<std::string, std::unique_ptr<StorageDistributedDirectoryMonitor>> directory_monitors;
struct ClusterNodeData
{
std::unique_ptr<StorageDistributedDirectoryMonitor> directory_monitor;
ConnectionPoolPtr conneciton_pool;
/// Creates connection_pool if not exists.
void requireConnectionPool(const std::string & name, const StorageDistributed & storage);
/// Creates directory_monitor if not exists.
void requireDirectoryMonitor(const std::string & name, StorageDistributed & storage);
};
std::unordered_map<std::string, ClusterNodeData> cluster_nodes_data;
/// Used for global monotonic ordering of files to send.
SimpleIncrement file_names_increment;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册