diff --git a/dbms/src/Core/ErrorCodes.cpp b/dbms/src/Core/ErrorCodes.cpp index 8f5bcfe9ed3fcc10722303f9f180ac714303fd7d..82badc8e4424d11eca1b9ec14dc66a8bfbac1bb6 100644 --- a/dbms/src/Core/ErrorCodes.cpp +++ b/dbms/src/Core/ErrorCodes.cpp @@ -366,6 +366,7 @@ namespace ErrorCodes extern const int SEEK_POSITION_OUT_OF_BOUND = 361; extern const int CURRENT_WRITE_BUFFER_IS_EXHAUSTED = 362; extern const int CANNOT_CREATE_IO_BUFFER = 363; + extern const int RECEIVED_ERROR_TOO_MANY_REQUESTS = 364; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/dbms/src/IO/ReadBufferFromHTTP.cpp b/dbms/src/IO/ReadBufferFromHTTP.cpp index 4f26a8566e46a25a08432893ed36b5cc9df5997c..cbe8c7cf9efbcc3b2e9f17a10886f73cb3a86be0 100644 --- a/dbms/src/IO/ReadBufferFromHTTP.cpp +++ b/dbms/src/IO/ReadBufferFromHTTP.cpp @@ -18,9 +18,9 @@ namespace DB namespace ErrorCodes { extern const int RECEIVED_ERROR_FROM_REMOTE_IO_SERVER; + extern const int RECEIVED_ERROR_TOO_MANY_REQUESTS; } - static Poco::Net::IPAddress resolveHostImpl(const String & host) { return Poco::Net::DNS::resolveOne(host); @@ -91,9 +91,9 @@ ReadBufferFromHTTP::ReadBufferFromHTTP( { std::stringstream error_message; error_message << "Received error from remote server " << uri.str() << ". HTTP status code: " - << status << ", body: " << istr->rdbuf(); + << status << " " << response.getReason() << ", body: " << istr->rdbuf(); - throw Exception(error_message.str(), ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER); + throw Exception(error_message.str(), status == HTTP_TOO_MANY_REQUESTS ? ErrorCodes::RECEIVED_ERROR_TOO_MANY_REQUESTS : ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER); } impl = std::make_unique(*istr, buffer_size_); diff --git a/dbms/src/IO/ReadBufferFromHTTP.h b/dbms/src/IO/ReadBufferFromHTTP.h index 937de38e1fc8cf8f5e0b1898595fc8bd13fa7665..8dbd08a28ded8cde4a6e8e2d49f2cebcbdb1515e 100644 --- a/dbms/src/IO/ReadBufferFromHTTP.h +++ b/dbms/src/IO/ReadBufferFromHTTP.h @@ -14,6 +14,8 @@ namespace DB { +const int HTTP_TOO_MANY_REQUESTS = 429; + /** Perform HTTP-request and provide response to read. */ class ReadBufferFromHTTP : public ReadBuffer diff --git a/dbms/src/IO/ReadWriteBufferFromHTTP.cpp b/dbms/src/IO/ReadWriteBufferFromHTTP.cpp index 27eb1e7f7bf7cc77b2c436a3e3db57e210454db6..ce43a2a75489d06a8da34ca17e397ea0b4fd9407 100644 --- a/dbms/src/IO/ReadWriteBufferFromHTTP.cpp +++ b/dbms/src/IO/ReadWriteBufferFromHTTP.cpp @@ -21,6 +21,7 @@ namespace DB namespace ErrorCodes { extern const int RECEIVED_ERROR_FROM_REMOTE_IO_SERVER; + extern const int RECEIVED_ERROR_TOO_MANY_REQUESTS; } static Poco::Net::IPAddress resolveHostImpl(const String & host) @@ -86,9 +87,9 @@ ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP( { std::stringstream error_message; error_message << "Received error from remote server " << uri.toString() << ". HTTP status code: " - << status << ", body: " << istr->rdbuf(); + << status << " " << response.getReason() << ", body: " << istr->rdbuf(); - throw Exception(error_message.str(), ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER); + throw Exception(error_message.str(), status == HTTP_TOO_MANY_REQUESTS ? ErrorCodes::RECEIVED_ERROR_TOO_MANY_REQUESTS : ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER); } impl = std::make_unique(*istr, buffer_size_); diff --git a/dbms/src/IO/WriteBufferFromHTTPServerResponse.cpp b/dbms/src/IO/WriteBufferFromHTTPServerResponse.cpp index 34a6274505fba9d8b33f242c95741dbe3b41ce12..25351ff1a9593cefdf1ab61aa8d8eea2114ae342 100644 --- a/dbms/src/IO/WriteBufferFromHTTPServerResponse.cpp +++ b/dbms/src/IO/WriteBufferFromHTTPServerResponse.cpp @@ -44,7 +44,8 @@ void WriteBufferFromHTTPServerResponse::finishSendHeaders() #if POCO_CLICKHOUSE_PATCH /// Send end of headers delimiter. - *response_header_ostr << "\r\n" << std::flush; + if (response_header_ostr) + *response_header_ostr << "\r\n" << std::flush; #else /// Newline autosent by response.send() /// if nothing to send in body: diff --git a/dbms/src/Interpreters/InterserverIOHandler.h b/dbms/src/Interpreters/InterserverIOHandler.h index 74e0709d5228468d2568a7ee8ec01a1f9a464d9d..205e9bd3bb2132ad313415b31b2881a3644041df 100644 --- a/dbms/src/Interpreters/InterserverIOHandler.h +++ b/dbms/src/Interpreters/InterserverIOHandler.h @@ -11,6 +11,8 @@ #include #include +namespace Poco { namespace Net { class HTTPServerResponse; } } + namespace DB { @@ -64,7 +66,7 @@ class InterserverIOEndpoint { public: virtual std::string getId(const std::string & path) const = 0; - virtual void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out) = 0; + virtual void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) = 0; virtual ~InterserverIOEndpoint() {} void cancel() { is_cancelled = true; } diff --git a/dbms/src/Server/InterserverIOHTTPHandler.cpp b/dbms/src/Server/InterserverIOHTTPHandler.cpp index d45431467daeb17d2b5007244efe9a56eb9c9500..5c7af625d1c248ae0683edd7bd86631a0b54ff6f 100644 --- a/dbms/src/Server/InterserverIOHTTPHandler.cpp +++ b/dbms/src/Server/InterserverIOHTTPHandler.cpp @@ -14,9 +14,9 @@ namespace ErrorCodes extern const int POCO_EXCEPTION; extern const int STD_EXCEPTION; extern const int UNKNOWN_EXCEPTION; + extern const int TOO_MUCH_SIMULTANEOUS_QUERIES; } - void InterserverIOHTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) { HTMLForm params(request); @@ -37,11 +37,11 @@ void InterserverIOHTTPHandler::processQuery(Poco::Net::HTTPServerRequest & reque if (compress) { CompressedWriteBuffer compressed_out(out); - endpoint->processQuery(params, body, compressed_out); + endpoint->processQuery(params, body, compressed_out, response); } else { - endpoint->processQuery(params, body, out); + endpoint->processQuery(params, body, out, response); } out.finalize(); @@ -61,6 +61,14 @@ void InterserverIOHTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & requ } catch (Exception & e) { + + if (e.code() == ErrorCodes::TOO_MUCH_SIMULTANEOUS_QUERIES) + { + if (!response.sent()) + response.send(); + return; + } + response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); /// Sending to remote server was cancelled due to server shutdown or drop table. diff --git a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp index bd253693cdb31d8d5c36665b0a9aa86ff83e067b..e49c81ebfcb065eef645a75499361f77cc028207 100644 --- a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp @@ -4,6 +4,8 @@ #include #include #include +#include +#include namespace CurrentMetrics @@ -19,6 +21,7 @@ namespace ErrorCodes { extern const int ABORTED; extern const int BAD_SIZE_OF_FILE_IN_DATA_PART; + extern const int TOO_MUCH_SIMULTANEOUS_QUERIES; } namespace DataPartsExchange @@ -39,7 +42,7 @@ std::string Service::getId(const std::string & node_id) const return getEndpointId(node_id); } -void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out) +void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) { if (is_cancelled) throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED); @@ -49,6 +52,24 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body bool send_sharded_part = !shard_str.empty(); + static std::atomic_uint total_sends {0}; + + if (total_sends >= data.settings.replicated_max_parallel_sends + || data.current_table_sends >= data.settings.replicated_max_parallel_sends_for_table) + { + response.setStatus(std::to_string(HTTP_TOO_MANY_REQUESTS)); + response.setReason("Too many concurrent fetches, try again later"); + response.set("Retry-After", "10"); + response.setChunkedTransferEncoding(false); + return; + } + ++total_sends; + SCOPE_EXIT({--total_sends;}); + + ++data.current_table_sends; + SCOPE_EXIT({--data.current_table_sends;}); + + LOG_TRACE(log, "Sending part " << part_name); try diff --git a/dbms/src/Storages/MergeTree/DataPartsExchange.h b/dbms/src/Storages/MergeTree/DataPartsExchange.h index f8e59b2b386ae1a3663d7a2995863101e690d9cc..ff2ede733ddfe1c3f7b46b5f0cb666ab979fc186 100644 --- a/dbms/src/Storages/MergeTree/DataPartsExchange.h +++ b/dbms/src/Storages/MergeTree/DataPartsExchange.h @@ -24,7 +24,7 @@ public: Service & operator=(const Service &) = delete; std::string getId(const std::string & node_id) const override; - void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out) override; + void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) override; private: MergeTreeData::DataPartPtr findPart(const String & name); diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index f97ff2843b9b2cb6edd97ca03c52a19a95d9018e..10c17eeeeeef706fbc9df15a8a7c2222d39a0594 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -472,6 +472,9 @@ public: Block primary_key_sample; DataTypes primary_key_data_types; + /// Limiting parallel sends per one table, used in DataPartsExchange + std::atomic_uint current_table_sends {0}; + private: friend struct MergeTreeDataPart; friend class StorageMergeTree; diff --git a/dbms/src/Storages/MergeTree/MergeTreeSettings.h b/dbms/src/Storages/MergeTree/MergeTreeSettings.h index 65cb33425281e8a0592df7e7390cc8dac0b3717a..bdb11759e9fd90067f25de7280e9f560e0fa959b 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeSettings.h +++ b/dbms/src/Storages/MergeTree/MergeTreeSettings.h @@ -78,6 +78,9 @@ struct MergeTreeSettings /// Limit parallel fetches size_t replicated_max_parallel_fetches = 4; size_t replicated_max_parallel_fetches_for_table = 2; + /// Limit parallel sends + size_t replicated_max_parallel_sends = 4; + size_t replicated_max_parallel_sends_for_table = 2; /// If ration of wrong parts to total number of parts is less than this - allow to start anyway. double replicated_max_ratio_of_wrong_parts = 0.05; @@ -138,6 +141,8 @@ struct MergeTreeSettings SET_SIZE_T(replicated_max_missing_active_parts); SET_SIZE_T(replicated_max_parallel_fetches); SET_SIZE_T(replicated_max_parallel_fetches_for_table); + SET_SIZE_T(replicated_max_parallel_sends); + SET_SIZE_T(replicated_max_parallel_sends_for_table); SET_DOUBLE(replicated_max_ratio_of_wrong_parts); SET_SIZE_T(zookeeper_session_expiration_check_period); SET_SIZE_T(check_delay_period); diff --git a/dbms/src/Storages/MergeTree/RemoteDiskSpaceMonitor.cpp b/dbms/src/Storages/MergeTree/RemoteDiskSpaceMonitor.cpp index 0b26b42d2406ee07cb93605ebd6c702458a07f94..551e83ff705468a1dc5c2c25992d42ae192c41e5 100644 --- a/dbms/src/Storages/MergeTree/RemoteDiskSpaceMonitor.cpp +++ b/dbms/src/Storages/MergeTree/RemoteDiskSpaceMonitor.cpp @@ -36,7 +36,7 @@ std::string Service::getId(const std::string & node_id) const return getEndpointId(node_id); } -void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out) +void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) { if (is_cancelled) throw Exception{"RemoteDiskSpaceMonitor service terminated", ErrorCodes::ABORTED}; diff --git a/dbms/src/Storages/MergeTree/RemoteDiskSpaceMonitor.h b/dbms/src/Storages/MergeTree/RemoteDiskSpaceMonitor.h index 3b1404c9c966cd3613e764f60cc405c036db2e44..64249e3328c93e93acf3ecc953e9b91977a72545 100644 --- a/dbms/src/Storages/MergeTree/RemoteDiskSpaceMonitor.h +++ b/dbms/src/Storages/MergeTree/RemoteDiskSpaceMonitor.h @@ -21,7 +21,7 @@ public: Service(const Service &) = delete; Service & operator=(const Service &) = delete; std::string getId(const std::string & node_id) const override; - void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out) override; + void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) override; private: const Context & context; diff --git a/dbms/src/Storages/MergeTree/RemotePartChecker.cpp b/dbms/src/Storages/MergeTree/RemotePartChecker.cpp index 38b86e219ac3f055a104735f0459673e9887c13d..3597840f01ba2ad31b84bc6e41d59cf3a4a4af31 100644 --- a/dbms/src/Storages/MergeTree/RemotePartChecker.cpp +++ b/dbms/src/Storages/MergeTree/RemotePartChecker.cpp @@ -33,7 +33,7 @@ std::string Service::getId(const std::string & node_id) const return getEndpointId(node_id); } -void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out) +void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) { auto part_name = params.get("part"); auto hash = params.get("hash"); diff --git a/dbms/src/Storages/MergeTree/RemotePartChecker.h b/dbms/src/Storages/MergeTree/RemotePartChecker.h index be868c229bfab1eb19a6505d09e29284da87964f..ac3f6498e2382bc91844c3503d93e677e691e659 100644 --- a/dbms/src/Storages/MergeTree/RemotePartChecker.h +++ b/dbms/src/Storages/MergeTree/RemotePartChecker.h @@ -27,7 +27,7 @@ public: Service(const Service &) = delete; Service & operator=(const Service &) = delete; std::string getId(const std::string & node_id) const override; - void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out) override; + void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) override; private: StoragePtr owned_storage; diff --git a/dbms/src/Storages/MergeTree/RemoteQueryExecutor.cpp b/dbms/src/Storages/MergeTree/RemoteQueryExecutor.cpp index c26325779e42ed67246a9ceb0ee010608c3a1361..e271907e71787da0444efbc177a2e21cf6ee1f28 100644 --- a/dbms/src/Storages/MergeTree/RemoteQueryExecutor.cpp +++ b/dbms/src/Storages/MergeTree/RemoteQueryExecutor.cpp @@ -35,7 +35,7 @@ std::string Service::getId(const std::string & node_id) const return getEndpointId(node_id); } -void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out) +void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) { if (is_cancelled) throw Exception{"RemoteQueryExecutor service terminated", ErrorCodes::ABORTED}; diff --git a/dbms/src/Storages/MergeTree/RemoteQueryExecutor.h b/dbms/src/Storages/MergeTree/RemoteQueryExecutor.h index 2cff097670aeb97409da6f9df71b30b566a73956..7e6c1fefb881433a0368dc8b036b972e783be250 100644 --- a/dbms/src/Storages/MergeTree/RemoteQueryExecutor.h +++ b/dbms/src/Storages/MergeTree/RemoteQueryExecutor.h @@ -20,7 +20,7 @@ public: Service(const Service &) = delete; Service & operator=(const Service &) = delete; std::string getId(const std::string & node_id) const override; - void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out) override; + void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) override; private: Context & context; diff --git a/dbms/src/Storages/MergeTree/ShardedPartitionUploader.cpp b/dbms/src/Storages/MergeTree/ShardedPartitionUploader.cpp index 46588b284d33e1716a41f6005882b278deca630f..ef47a4628cb3d78c5807a99386fc906b4cdb8722 100644 --- a/dbms/src/Storages/MergeTree/ShardedPartitionUploader.cpp +++ b/dbms/src/Storages/MergeTree/ShardedPartitionUploader.cpp @@ -45,7 +45,7 @@ std::string Service::getId(const std::string & node_id) const return getEndpointId(node_id); } -void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out) +void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) { std::string part_name = params.get("path"); std::string replica_path = params.get("endpoint"); diff --git a/dbms/src/Storages/MergeTree/ShardedPartitionUploader.h b/dbms/src/Storages/MergeTree/ShardedPartitionUploader.h index f3c429a222a8531e8802f413ac74e0829586d5bd..be6a185fb941be3f99f7b15efd48868ea63706a8 100644 --- a/dbms/src/Storages/MergeTree/ShardedPartitionUploader.h +++ b/dbms/src/Storages/MergeTree/ShardedPartitionUploader.h @@ -23,7 +23,7 @@ public: Service(const Service &) = delete; Service & operator=(const Service &) = delete; std::string getId(const std::string & node_id) const override; - void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out) override; + void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) override; private: StoragePtr owned_storage; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index a6103422b1080c2eb37651ae968fff915ae69709..ee6483a913942eafcf22c5342e95701f1caddbb1 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -103,6 +103,7 @@ namespace ErrorCodes extern const int UNFINISHED; extern const int METADATA_MISMATCH; extern const int RESHARDING_NULLABLE_SHARDING_KEY; + extern const int RECEIVED_ERROR_TOO_MANY_REQUESTS; } @@ -1417,8 +1418,21 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry) } } - if (!fetchPart(covering_part, zookeeper_path + "/replicas/" + replica, false, entry.quorum)) - return false; + try + { + if (!fetchPart(covering_part, zookeeper_path + "/replicas/" + replica, false, entry.quorum)) + return false; + } + catch (const Exception & e) + { + /// No stacktrace, just log message + if (e.code() == ErrorCodes::RECEIVED_ERROR_TOO_MANY_REQUESTS) + { + LOG_INFO(log, "Too busy replica. Will try later. " << e.message()); + return false; + } + throw; + } if (entry.type == LogEntry::MERGE_PARTS) ProfileEvents::increment(ProfileEvents::ReplicatedPartFetchesOfMerged); @@ -3444,7 +3458,7 @@ void StorageReplicatedMergeTree::fetchPartition(const Field & partition, const S } catch (const DB::Exception & e) { - if (e.code() != ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER) + if (e.code() != ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER && e.code() != ErrorCodes::RECEIVED_ERROR_TOO_MANY_REQUESTS) throw; LOG_INFO(log, e.displayText());