diff --git a/dbms/src/Client/Connection.cpp b/dbms/src/Client/Connection.cpp index 6a18a1c985a4a59b762e553b3808f9a799a832e0..4d0b20f0168e678af2a44335c08aadabe73e7d37 100644 --- a/dbms/src/Client/Connection.cpp +++ b/dbms/src/Client/Connection.cpp @@ -64,9 +64,9 @@ void Connection::connect() { socket = std::make_unique(); } - socket->connect(resolved_address, connect_timeout); - socket->setReceiveTimeout(receive_timeout); - socket->setSendTimeout(send_timeout); + socket->connect(resolved_address, timeouts.connection_timeout); + socket->setReceiveTimeout(timeouts.receive_timeout); + socket->setSendTimeout(timeouts.send_timeout); socket->setNoDelay(true); in = std::make_shared(*socket); diff --git a/dbms/src/Client/Connection.h b/dbms/src/Client/Connection.h index fa362f611f233e597d03fca643976a2f0adbce69..c9b401c91ee3deeaabe2bb7dd3a64773b09a7147 100644 --- a/dbms/src/Client/Connection.h +++ b/dbms/src/Client/Connection.h @@ -17,6 +17,7 @@ #include #include +#include #include #include @@ -54,12 +55,10 @@ class Connection : private boost::noncopyable public: Connection(const String & host_, UInt16 port_, const String & default_database_, const String & user_, const String & password_, + const ConnectionTimeouts & timeouts_, const String & client_name_ = "client", Protocol::Compression compression_ = Protocol::Compression::Enable, Protocol::Encryption encryption_ = Protocol::Encryption::Disable, - Poco::Timespan connect_timeout_ = Poco::Timespan(DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, 0), - Poco::Timespan receive_timeout_ = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0), - Poco::Timespan send_timeout_ = Poco::Timespan(DBMS_DEFAULT_SEND_TIMEOUT_SEC, 0), Poco::Timespan sync_request_timeout_ = Poco::Timespan(DBMS_DEFAULT_SYNC_REQUEST_TIMEOUT_SEC, 0)) : host(host_), port(port_), default_database(default_database_), @@ -67,7 +66,7 @@ public: client_name(client_name_), compression(compression_), encryption(encryption_), - connect_timeout(connect_timeout_), receive_timeout(receive_timeout_), send_timeout(send_timeout_), + timeouts(timeouts_), sync_request_timeout(sync_request_timeout_), log_wrapper(*this) { @@ -82,12 +81,10 @@ public: Connection(const String & host_, UInt16 port_, const Poco::Net::SocketAddress & resolved_address_, const String & default_database_, const String & user_, const String & password_, + const ConnectionTimeouts & timeouts_, const String & client_name_ = "client", Protocol::Compression compression_ = Protocol::Compression::Enable, Protocol::Encryption encryption_ = Protocol::Encryption::Disable, - Poco::Timespan connect_timeout_ = Poco::Timespan(DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, 0), - Poco::Timespan receive_timeout_ = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0), - Poco::Timespan send_timeout_ = Poco::Timespan(DBMS_DEFAULT_SEND_TIMEOUT_SEC, 0), Poco::Timespan sync_request_timeout_ = Poco::Timespan(DBMS_DEFAULT_SYNC_REQUEST_TIMEOUT_SEC, 0)) : host(host_), port(port_), @@ -97,7 +94,7 @@ public: client_name(client_name_), compression(compression_), encryption(encryption_), - connect_timeout(connect_timeout_), receive_timeout(receive_timeout_), send_timeout(send_timeout_), + timeouts(timeouts_), sync_request_timeout(sync_request_timeout_), log_wrapper(*this) { @@ -233,9 +230,7 @@ private: */ ThrottlerPtr throttler; - Poco::Timespan connect_timeout; - Poco::Timespan receive_timeout; - Poco::Timespan send_timeout; + ConnectionTimeouts timeouts; Poco::Timespan sync_request_timeout; /// From where to read query execution result. diff --git a/dbms/src/Client/ConnectionPool.h b/dbms/src/Client/ConnectionPool.h index 1e78f4a19e1ff6ea405d08b32db102504b6e2d82..eb71a7069c975b34398f2a4ee933486f01acf18b 100644 --- a/dbms/src/Client/ConnectionPool.h +++ b/dbms/src/Client/ConnectionPool.h @@ -3,7 +3,7 @@ #include #include - +#include namespace DB { @@ -48,17 +48,15 @@ public: const String & host_, UInt16 port_, const String & default_database_, const String & user_, const String & password_, + const ConnectionTimeouts & timeouts, const String & client_name_ = "client", Protocol::Compression compression_ = Protocol::Compression::Enable, - Protocol::Encryption encryption_ = Protocol::Encryption::Disable, - Poco::Timespan connect_timeout_ = Poco::Timespan(DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, 0), - Poco::Timespan receive_timeout_ = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0), - Poco::Timespan send_timeout_ = Poco::Timespan(DBMS_DEFAULT_SEND_TIMEOUT_SEC, 0)) + Protocol::Encryption encryption_ = Protocol::Encryption::Disable) : Base(max_connections_, &Logger::get("ConnectionPool (" + host_ + ":" + toString(port_) + ")")), host(host_), port(port_), default_database(default_database_), user(user_), password(password_), resolved_address(host_, port_), client_name(client_name_), compression(compression_), encryption(encryption_), - connect_timeout(connect_timeout_), receive_timeout(receive_timeout_), send_timeout(send_timeout_) + timeouts(timeouts) { } @@ -66,17 +64,15 @@ public: const String & host_, UInt16 port_, const Poco::Net::SocketAddress & resolved_address_, const String & default_database_, const String & user_, const String & password_, + const ConnectionTimeouts & timeouts, const String & client_name_ = "client", Protocol::Compression compression_ = Protocol::Compression::Enable, - Protocol::Encryption encryption_ = Protocol::Encryption::Disable, - Poco::Timespan connect_timeout_ = Poco::Timespan(DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, 0), - Poco::Timespan receive_timeout_ = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0), - Poco::Timespan send_timeout_ = Poco::Timespan(DBMS_DEFAULT_SEND_TIMEOUT_SEC, 0)) + Protocol::Encryption encryption_ = Protocol::Encryption::Disable) : Base(max_connections_, &Logger::get("ConnectionPool (" + host_ + ":" + toString(port_) + ")")), host(host_), port(port_), default_database(default_database_), user(user_), password(password_), resolved_address(resolved_address_), client_name(client_name_), compression(compression_), encryption(encryption_), - connect_timeout(connect_timeout_), receive_timeout(receive_timeout_), send_timeout(send_timeout_) + timeouts(timeouts) { } @@ -105,9 +101,8 @@ protected: { return std::make_shared( host, port, resolved_address, - default_database, user, password, - client_name, compression, encryption, - connect_timeout, receive_timeout, send_timeout); + default_database, user, password, timeouts, + client_name, compression, encryption); } private: @@ -126,9 +121,7 @@ private: Protocol::Compression compression; /// Whether to compress data when interacting with the server. Protocol::Encryption encryption; /// Whether to encrypt data when interacting with the server. - Poco::Timespan connect_timeout; - Poco::Timespan receive_timeout; - Poco::Timespan send_timeout; + ConnectionTimeouts timeouts; }; } diff --git a/dbms/src/Core/Defines.h b/dbms/src/Core/Defines.h index 8fba855ebd16c6193a1513ef38f272c3d1e04c2e..d730fd1b14fc17f75ff2e0a908cc449e0353ea82 100644 --- a/dbms/src/Core/Defines.h +++ b/dbms/src/Core/Defines.h @@ -71,6 +71,9 @@ #define DEFAULT_QUERY_LOG_FLUSH_INTERVAL_MILLISECONDS 7500 +#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800 +#define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1 + #define ALWAYS_INLINE __attribute__((__always_inline__)) #define NO_INLINE __attribute__((__noinline__)) diff --git a/dbms/src/Dictionaries/ClickHouseDictionarySource.cpp b/dbms/src/Dictionaries/ClickHouseDictionarySource.cpp index 535d09586c652dc71ddd255e2764e208b80b51fc..039126b458fbc067ed516dce014f11941c81b548 100644 --- a/dbms/src/Dictionaries/ClickHouseDictionarySource.cpp +++ b/dbms/src/Dictionaries/ClickHouseDictionarySource.cpp @@ -7,7 +7,7 @@ #include #include #include - +#include namespace DB { @@ -21,11 +21,13 @@ namespace ErrorCodes static const size_t MAX_CONNECTIONS = 16; static ConnectionPoolWithFailoverPtr createPool( - const std::string & host, UInt16 port, const std::string & db, const std::string & user, const std::string & password) + const std::string & host, UInt16 port, const std::string & db, + const std::string & user, const std::string & password, const Context & context) { + auto timeouts = ConnectionTimeouts::getTCPTimeouts(context.getSettingsRef()); ConnectionPoolPtrs pools; pools.emplace_back(std::make_shared( - MAX_CONNECTIONS, host, port, db, user, password, "ClickHouseDictionarySource")); + MAX_CONNECTIONS, host, port, db, user, password, timeouts, "ClickHouseDictionarySource")); return std::make_shared(pools, LoadBalancing::RANDOM); } @@ -46,7 +48,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource( query_builder{dict_struct, db, table, where, ExternalQueryBuilder::Backticks}, sample_block{sample_block}, context(context), is_local{isLocalAddress({ host, port }, config.getInt("tcp_port", 0))}, - pool{is_local ? nullptr : createPool(host, port, db, user, password)}, + pool{is_local ? nullptr : createPool(host, port, db, user, password, context)}, load_all_query{query_builder.composeLoadAllQuery()} {} @@ -59,7 +61,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(const ClickHouseDictionar query_builder{dict_struct, db, table, where, ExternalQueryBuilder::Backticks}, sample_block{other.sample_block}, context(other.context), is_local{other.is_local}, - pool{is_local ? nullptr : createPool(host, port, db, user, password)}, + pool{is_local ? nullptr : createPool(host, port, db, user, password, context)}, load_all_query{other.load_all_query} {} diff --git a/dbms/src/Dictionaries/HTTPDictionarySource.cpp b/dbms/src/Dictionaries/HTTPDictionarySource.cpp index 07b6ac9681a385ad2544de37922bf436cf9c35a7..69399e09279ef6b3654d1aad7b5d50f3092f5adf 100644 --- a/dbms/src/Dictionaries/HTTPDictionarySource.cpp +++ b/dbms/src/Dictionaries/HTTPDictionarySource.cpp @@ -8,7 +8,7 @@ #include #include #include - +#include namespace DB { @@ -24,7 +24,8 @@ HTTPDictionarySource::HTTPDictionarySource(const DictionaryStructure & dict_stru url{config.getString(config_prefix + ".url", "")}, format{config.getString(config_prefix + ".format")}, sample_block{sample_block}, - context(context) + context(context), + timeouts(ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef())) { } @@ -34,7 +35,8 @@ HTTPDictionarySource::HTTPDictionarySource(const HTTPDictionarySource & other) url{other.url}, format{other.format}, sample_block{other.sample_block}, - context(other.context) + context(other.context), + timeouts(ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef())) { } @@ -42,7 +44,8 @@ BlockInputStreamPtr HTTPDictionarySource::loadAll() { LOG_TRACE(log, "loadAll " + toString()); Poco::URI uri(url); - auto in_ptr = std::make_unique(uri, Poco::Net::HTTPRequest::HTTP_GET); + auto in_ptr = std::make_unique(uri, Poco::Net::HTTPRequest::HTTP_GET, + ReadWriteBufferFromHTTP::OutStreamCallback(), timeouts); auto input_stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size); return std::make_shared>(input_stream, std::move(in_ptr)); } @@ -59,7 +62,8 @@ BlockInputStreamPtr HTTPDictionarySource::loadIds(const std::vector & id }; Poco::URI uri(url); - auto in_ptr = std::make_unique(uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream_callback); + auto in_ptr = std::make_unique(uri, Poco::Net::HTTPRequest::HTTP_POST, + out_stream_callback, timeouts); auto input_stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size); return std::make_shared>(input_stream, std::move(in_ptr)); } @@ -77,7 +81,8 @@ BlockInputStreamPtr HTTPDictionarySource::loadKeys( }; Poco::URI uri(url); - auto in_ptr = std::make_unique(uri, Poco::Net::HTTPRequest::HTTP_POST, out_stream_callback); + auto in_ptr = std::make_unique(uri, Poco::Net::HTTPRequest::HTTP_POST, + out_stream_callback, timeouts); auto input_stream = context.getInputFormat(format, *in_ptr, sample_block, max_block_size); return std::make_shared>(input_stream, std::move(in_ptr)); } diff --git a/dbms/src/Dictionaries/HTTPDictionarySource.h b/dbms/src/Dictionaries/HTTPDictionarySource.h index 3630c9bebf8ea736209fd913bdabd406834572a4..ba799ec8fbef9f865b09c9dc48d6eb647a5c298f 100644 --- a/dbms/src/Dictionaries/HTTPDictionarySource.h +++ b/dbms/src/Dictionaries/HTTPDictionarySource.h @@ -3,7 +3,7 @@ #include #include #include - +#include namespace Poco { class Logger; } @@ -48,6 +48,7 @@ private: const std::string format; Block sample_block; const Context & context; + ConnectionTimeouts timeouts; }; } diff --git a/dbms/src/IO/ConnectionTimeouts.h b/dbms/src/IO/ConnectionTimeouts.h new file mode 100644 index 0000000000000000000000000000000000000000..bafac83764b49445334836452eb7d851adaacf14 --- /dev/null +++ b/dbms/src/IO/ConnectionTimeouts.h @@ -0,0 +1,52 @@ +#pragma once + +#include +#include + +namespace DB +{ + +struct ConnectionTimeouts +{ + Poco::Timespan connection_timeout; + Poco::Timespan send_timeout; + Poco::Timespan receive_timeout; + + ConnectionTimeouts() = default; + + ConnectionTimeouts(const Poco::Timespan & connection_timeout_, + const Poco::Timespan & send_timeout_, + const Poco::Timespan & receive_timeout_) + : connection_timeout(connection_timeout_), + send_timeout(send_timeout_), + receive_timeout(receive_timeout_) + { + } + + static Poco::Timespan saturate(const Poco::Timespan & timespan, const Poco::Timespan & limit) + { + if (limit.totalMicroseconds() == 0) + return timespan; + else + return (timespan > limit) ? limit : timespan; + } + + ConnectionTimeouts getSaturated(const Poco::Timespan & limit) const + { + return ConnectionTimeouts(saturate(connection_timeout, limit), + saturate(send_timeout, limit), + saturate(receive_timeout, limit)); + } + + static ConnectionTimeouts getTCPTimeouts(const Settings & settings) + { + return ConnectionTimeouts(settings.connect_timeout, settings.send_timeout, settings.receive_timeout); + } + + static ConnectionTimeouts getHTTPTimeouts(const Settings & settings) + { + return ConnectionTimeouts(settings.http_connection_timeout, settings.http_send_timeout, settings.http_receive_timeout); + } +}; + +} diff --git a/dbms/src/IO/ReadWriteBufferFromHTTP.cpp b/dbms/src/IO/ReadWriteBufferFromHTTP.cpp index a88a2eea8358abac9df82fb93515e559eab2535a..12954b616ad8e3ef462e83e8d339cb1cfe40fdba 100644 --- a/dbms/src/IO/ReadWriteBufferFromHTTP.cpp +++ b/dbms/src/IO/ReadWriteBufferFromHTTP.cpp @@ -27,8 +27,8 @@ namespace ErrorCodes ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(const Poco::URI & uri, const std::string & method_, OutStreamCallback out_stream_callback, - size_t buffer_size_, - const HTTPTimeouts & timeouts) + const ConnectionTimeouts & timeouts, + size_t buffer_size_) : ReadBuffer(nullptr, 0), uri{uri}, method{!method_.empty() ? method_ : out_stream_callback ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET}, diff --git a/dbms/src/IO/ReadWriteBufferFromHTTP.h b/dbms/src/IO/ReadWriteBufferFromHTTP.h index 9041e1331af89b64caea44d0e5dfb3f356622a08..88230ac2079249a3266b40c550437de63e5ed666 100644 --- a/dbms/src/IO/ReadWriteBufferFromHTTP.h +++ b/dbms/src/IO/ReadWriteBufferFromHTTP.h @@ -4,6 +4,7 @@ #include #include #include +#include #define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800 #define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1 @@ -13,13 +14,6 @@ namespace DB const int HTTP_TOO_MANY_REQUESTS = 429; -struct HTTPTimeouts -{ - Poco::Timespan connection_timeout = Poco::Timespan(DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, 0); - Poco::Timespan send_timeout = Poco::Timespan(DEFAULT_HTTP_READ_BUFFER_TIMEOUT, 0); - Poco::Timespan receive_timeout = Poco::Timespan(DEFAULT_HTTP_READ_BUFFER_TIMEOUT, 0); -}; - /** Perform HTTP POST request and provide response to read. */ @@ -28,7 +22,7 @@ class ReadWriteBufferFromHTTP : public ReadBuffer private: Poco::URI uri; std::string method; - HTTPTimeouts timeouts; + ConnectionTimeouts timeouts; bool is_ssl; std::unique_ptr session; @@ -38,12 +32,12 @@ private: public: using OutStreamCallback = std::function; - ReadWriteBufferFromHTTP( + explicit ReadWriteBufferFromHTTP( const Poco::URI & uri, const std::string & method = {}, OutStreamCallback out_stream_callback = {}, - size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, - const HTTPTimeouts & timeouts = {}); + const ConnectionTimeouts & timeouts = {}, + size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE); bool nextImpl() override; }; diff --git a/dbms/src/IO/RemoteReadBuffer.h b/dbms/src/IO/RemoteReadBuffer.h index 4738c79e0628e05dc07e7ba8643f26b6a3c1f30c..d962a5c2af52cf211fd75518bebcd0716a6b082f 100644 --- a/dbms/src/IO/RemoteReadBuffer.h +++ b/dbms/src/IO/RemoteReadBuffer.h @@ -37,9 +37,12 @@ public: { std::make_pair("action", "read"), std::make_pair("path", path), - std::make_pair("compress", (compress ? "true" : "false"))}); + std::make_pair("compress", (compress ? "true" : "false")) + }); - impl = std::make_unique(uri, std::string(), ReadWriteBufferFromHTTP::OutStreamCallback(), buffer_size, HTTPTimeouts{connection_timeout, send_timeout, receive_timeout}); + ConnectionTimeouts timeouts(connection_timeout, send_timeout, receive_timeout); + ReadWriteBufferFromHTTP::OutStreamCallback callback; + impl = std::make_unique(uri, std::string(), callback, timeouts, buffer_size); } bool nextImpl() override @@ -56,7 +59,7 @@ public: const std::string & host, int port, const std::string & path, - size_t timeout = 0) + const ConnectionTimeouts & timeouts) { Poco::URI uri; uri.setScheme("http"); @@ -67,7 +70,7 @@ public: std::make_pair("action", "list"), std::make_pair("path", path)}); - ReadWriteBufferFromHTTP in(uri, {}, {}, {}, HTTPTimeouts{timeout}); + ReadWriteBufferFromHTTP in(uri, {}, {}, timeouts); std::vector files; while (!in.eof()) diff --git a/dbms/src/Interpreters/Cluster.cpp b/dbms/src/Interpreters/Cluster.cpp index 343f03758b71b236f892e23a65c433bd4c816899..76ea391ce1a43996478582950c27e7f3ce788c08 100644 --- a/dbms/src/Interpreters/Cluster.cpp +++ b/dbms/src/Interpreters/Cluster.cpp @@ -210,10 +210,8 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se settings.distributed_connections_pool_size, address.host_name, address.port, address.resolved_address, address.default_database, address.user, address.password, - "server", Protocol::Compression::Enable, Protocol::Encryption::Disable, - saturate(settings.connect_timeout, settings.limits.max_execution_time), - saturate(settings.receive_timeout, settings.limits.max_execution_time), - saturate(settings.send_timeout, settings.limits.max_execution_time))); + ConnectionTimeouts::getTCPTimeouts(settings).getSaturated(settings.limits.max_execution_time), + "server", Protocol::Compression::Enable, Protocol::Encryption::Disable)); info.pool = std::make_shared( std::move(pools), settings.load_balancing, settings.connections_with_failover_max_tries); @@ -289,10 +287,8 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se settings.distributed_connections_pool_size, replica.host_name, replica.port, replica.resolved_address, replica.default_database, replica.user, replica.password, - "server", Protocol::Compression::Enable, Protocol::Encryption::Disable, - saturate(settings.connect_timeout_with_failover_ms, settings.limits.max_execution_time), - saturate(settings.receive_timeout, settings.limits.max_execution_time), - saturate(settings.send_timeout, settings.limits.max_execution_time))); + ConnectionTimeouts::getTCPTimeouts(settings).getSaturated(settings.limits.max_execution_time), + "server", Protocol::Compression::Enable, Protocol::Encryption::Disable)); } } @@ -348,10 +344,8 @@ Cluster::Cluster(const Settings & settings, const std::vector #include #include +#include #include @@ -64,10 +65,10 @@ public: const String & host_, UInt16 port_, const String & default_database_, const String & user_, const String & password_, const String & stage, bool randomize_, size_t max_iterations_, double max_time_, - const String & json_path_, const Settings & settings_) + const String & json_path_, const ConnectionTimeouts & timeouts, const Settings & settings_) : concurrency(concurrency_), delay(delay_), queue(concurrency), - connections(concurrency, host_, port_, default_database_, user_, password_), + connections(concurrency, host_, port_, default_database_, user_, password_, timeouts), randomize(randomize_), max_iterations(max_iterations_), max_time(max_time_), json_path(json_path_), settings(settings_), global_context(Context::createGlobal()), pool(concurrency) { @@ -482,6 +483,7 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv) options["iterations"].as(), options["timelimit"].as(), options["json"].as(), + ConnectionTimeouts::getTCPTimeouts(settings), settings); } catch (...) diff --git a/dbms/src/Server/Client.cpp b/dbms/src/Server/Client.cpp index 2afceca24f69882e3530a5400ca37f72117f5a69..642bf7a5ebce9bbbdec450565bb481ebdbedc64a 100644 --- a/dbms/src/Server/Client.cpp +++ b/dbms/src/Server/Client.cpp @@ -385,7 +385,9 @@ private: : Protocol::Encryption::Disable; String host = config().getString("host", "localhost"); - UInt16 port = config().getInt("port", config().getInt(static_cast(encryption) ? "tcp_ssl_port" : "tcp_port", static_cast(encryption) ? DBMS_DEFAULT_SECURE_PORT : DBMS_DEFAULT_PORT)); + UInt16 port = config().getInt("port", config().getInt( + static_cast(encryption) ? "tcp_ssl_port" : "tcp_port", + static_cast(encryption) ? DBMS_DEFAULT_SECURE_PORT : DBMS_DEFAULT_PORT)); String default_database = config().getString("database", ""); String user = config().getString("user", ""); String password = config().getString("password", ""); @@ -401,11 +403,12 @@ private: << (!user.empty() ? " as user " + user : "") << "." << std::endl; - connection = std::make_unique(host, port, default_database, user, password, "client", compression, - encryption, - Poco::Timespan(config().getInt("connect_timeout", DBMS_DEFAULT_CONNECT_TIMEOUT_SEC), 0), - Poco::Timespan(config().getInt("receive_timeout", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0), - Poco::Timespan(config().getInt("send_timeout", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0)); + ConnectionTimeouts timeouts( + Poco::Timespan(config().getInt("connect_timeout", DBMS_DEFAULT_CONNECT_TIMEOUT_SEC), 0), + Poco::Timespan(config().getInt("receive_timeout", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0), + Poco::Timespan(config().getInt("send_timeout", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0)); + connection = std::make_unique(host, port, default_database, user, password, timeouts, "client", + compression, encryption); String server_name; UInt64 server_version_major = 0; diff --git a/dbms/src/Server/PerformanceTest.cpp b/dbms/src/Server/PerformanceTest.cpp index 299eba0497b50a865a353045c8008235e14ab5ae..4c77f6dbbe987a35e78e785a7bead76d2b4d097f 100644 --- a/dbms/src/Server/PerformanceTest.cpp +++ b/dbms/src/Server/PerformanceTest.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -503,8 +504,9 @@ public: Strings && tests_names_, Strings && skip_names_, Strings && tests_names_regexp_, - Strings && skip_names_regexp_) - : connection(host_, port_, default_database_, user_, password_), + Strings && skip_names_regexp_, + const ConnectionTimeouts & timeouts) + : connection(host_, port_, default_database_, user_, password_, timeouts), gotSIGINT(false), lite_output(lite_output_), profiles_file(profiles_file_), @@ -1481,6 +1483,8 @@ try Strings tests_names_regexp = options.count("names-regexp") ? options["names-regexp"].as() : Strings({}); Strings skip_names_regexp = options.count("skip-names-regexp") ? options["skip-names-regexp"].as() : Strings({}); + auto timeouts = DB::ConnectionTimeouts::getTCPTimeouts(DB::Settings()); + DB::PerformanceTest performanceTest(options["host"].as(), options["port"].as(), options["database"].as(), @@ -1494,7 +1498,8 @@ try std::move(tests_names), std::move(skip_names), std::move(tests_names_regexp), - std::move(skip_names_regexp)); + std::move(skip_names_regexp), + timeouts); return 0; } diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 837de5ab2f91145ea41d04421d4ebf23a997bb29..436fbe5fd5a909f0b6e4997086eb0b541a84322f 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -357,8 +357,8 @@ int Server::main(const std::vector & /*args*/) { Poco::Net::SocketAddress http_socket_address = make_socket_address(listen_host, config().getInt("http_port")); Poco::Net::ServerSocket http_socket(http_socket_address); - http_socket.setReceiveTimeout(settings.receive_timeout); - http_socket.setSendTimeout(settings.send_timeout); + http_socket.setReceiveTimeout(settings.http_receive_timeout); + http_socket.setSendTimeout(settings.http_send_timeout); servers.emplace_back(new Poco::Net::HTTPServer( new HTTPHandlerFactory(*this, "HTTPHandler-factory"), @@ -376,8 +376,8 @@ int Server::main(const std::vector & /*args*/) std::call_once(ssl_init_once, SSLInit); Poco::Net::SocketAddress http_socket_address = make_socket_address(listen_host, config().getInt("https_port")); Poco::Net::SecureServerSocket http_socket(http_socket_address); - http_socket.setReceiveTimeout(settings.receive_timeout); - http_socket.setSendTimeout(settings.send_timeout); + http_socket.setReceiveTimeout(settings.http_receive_timeout); + http_socket.setSendTimeout(settings.http_send_timeout); servers.emplace_back(new Poco::Net::HTTPServer( new HTTPHandlerFactory(*this, "HTTPHandler-factory"), @@ -438,8 +438,8 @@ int Server::main(const std::vector & /*args*/) { Poco::Net::SocketAddress interserver_address = make_socket_address(listen_host, config().getInt("interserver_http_port")); Poco::Net::ServerSocket interserver_io_http_socket(interserver_address); - interserver_io_http_socket.setReceiveTimeout(settings.receive_timeout); - interserver_io_http_socket.setSendTimeout(settings.send_timeout); + interserver_io_http_socket.setReceiveTimeout(settings.http_receive_timeout); + interserver_io_http_socket.setSendTimeout(settings.http_send_timeout); servers.emplace_back(new Poco::Net::HTTPServer( new InterserverIOHTTPHandlerFactory(*this, "InterserverIOHTTPHandler-factory"), server_pool, diff --git a/dbms/src/Storages/Distributed/DirectoryMonitor.cpp b/dbms/src/Storages/Distributed/DirectoryMonitor.cpp index 668500853e904e5380f99fa5cd074b39f7153570..48d0f74f365cfd0489aac540e1edba1027b0ceb2 100644 --- a/dbms/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/dbms/src/Storages/Distributed/DirectoryMonitor.cpp @@ -150,13 +150,14 @@ void StorageDistributedDirectoryMonitor::run() ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::string & name, const StorageDistributed & storage) { - const auto pool_factory = [&storage, &name] (const std::string & host, const UInt16 port, + auto timeouts = ConnectionTimeouts::getTCPTimeouts(storage.context.getSettingsRef()); + const auto pool_factory = [&storage, &name, &timeouts] (const std::string & host, const UInt16 port, const std::string & user, const std::string & password, const std::string & default_database) { return std::make_shared( 1, host, port, default_database, - user, password, + user, password, timeouts, storage.getName() + '_' + name); }; diff --git a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp index b0b6e8923b2ce5f04d0d5983bc92a4ac0dfd9dbc..c164fce35b140961f948bfaa7a984fe54bae116d 100644 --- a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp @@ -155,6 +155,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( const String & replica_path, const String & host, int port, + const ConnectionTimeouts & timeouts, bool to_detached) { Poco::URI uri; @@ -168,7 +169,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( {"compress", "false"} }); - ReadWriteBufferFromHTTP in{uri, Poco::Net::HTTPRequest::HTTP_POST}; + ReadWriteBufferFromHTTP in{uri, Poco::Net::HTTPRequest::HTTP_POST, {}, timeouts}; static const String TMP_PREFIX = "tmp_fetch_"; String relative_part_path = String(to_detached ? "detached/" : "") + TMP_PREFIX + part_name; diff --git a/dbms/src/Storages/MergeTree/DataPartsExchange.h b/dbms/src/Storages/MergeTree/DataPartsExchange.h index 1b099081d3eefa05f92e61606353f462d9494bf3..c498a7f8f0c978fb80ac80a15eaa8ac8c65250d9 100644 --- a/dbms/src/Storages/MergeTree/DataPartsExchange.h +++ b/dbms/src/Storages/MergeTree/DataPartsExchange.h @@ -4,6 +4,7 @@ #include #include #include +#include namespace DB @@ -51,6 +52,7 @@ public: const String & replica_path, const String & host, int port, + const ConnectionTimeouts & timeouts, bool to_detached = false); /// You need to stop the data transfer. diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 29a81935ad767cc859f40bc85d3d11e7ee71e081..6ae8e9cf340a3d1861d286efbeba61719ad8d9c7 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -20,6 +20,7 @@ #include #include +#include #include #include @@ -2124,8 +2125,9 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin Stopwatch stopwatch; + auto timeouts = ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef()); MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart( - part_name, replica_path, address.host, address.replication_port, to_detached); + part_name, replica_path, address.host, address.replication_port, timeouts, to_detached); if (!to_detached) { @@ -3094,11 +3096,12 @@ void StorageReplicatedMergeTree::sendRequestToLeaderReplica(const ASTPtr & query /// NOTE Works only if there is access from the default user without a password. You can fix it by adding a parameter to the server config. + auto timeouts = ConnectionTimeouts::getTCPTimeouts(context.getSettingsRef()); Connection connection( leader_address.host, leader_address.queries_port, leader_address.database, - "", "", "ClickHouse replica"); + "", "", timeouts, "ClickHouse replica"); RemoteBlockInputStream stream(connection, formattedAST(new_query), context, &settings); NullBlockOutputStream output;