提交 1bc311ee 编写于 作者: P proller 提交者: alexey-milovidov

ReplicatedMergeTree: Max streams to send data [#CLICKHOUSE-2878] (#656)

* ReplicatedMergeTree: Max streams to send data [#CLICKHOUSE-2878]

* fix

* better messages on client

* change code to 429 TOO_MANY_REQUESTS

* wip

* better message

* Update InterserverIOHandler.h

* Update InterserverIOHTTPHandler.cpp

* Update StorageReplicatedMergeTree.cpp
上级 13e9b5c9
...@@ -366,6 +366,7 @@ namespace ErrorCodes ...@@ -366,6 +366,7 @@ namespace ErrorCodes
extern const int SEEK_POSITION_OUT_OF_BOUND = 361; extern const int SEEK_POSITION_OUT_OF_BOUND = 361;
extern const int CURRENT_WRITE_BUFFER_IS_EXHAUSTED = 362; extern const int CURRENT_WRITE_BUFFER_IS_EXHAUSTED = 362;
extern const int CANNOT_CREATE_IO_BUFFER = 363; extern const int CANNOT_CREATE_IO_BUFFER = 363;
extern const int RECEIVED_ERROR_TOO_MANY_REQUESTS = 364;
extern const int KEEPER_EXCEPTION = 999; extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000; extern const int POCO_EXCEPTION = 1000;
......
...@@ -18,9 +18,9 @@ namespace DB ...@@ -18,9 +18,9 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int RECEIVED_ERROR_FROM_REMOTE_IO_SERVER; extern const int RECEIVED_ERROR_FROM_REMOTE_IO_SERVER;
extern const int RECEIVED_ERROR_TOO_MANY_REQUESTS;
} }
static Poco::Net::IPAddress resolveHostImpl(const String & host) static Poco::Net::IPAddress resolveHostImpl(const String & host)
{ {
return Poco::Net::DNS::resolveOne(host); return Poco::Net::DNS::resolveOne(host);
...@@ -91,9 +91,9 @@ ReadBufferFromHTTP::ReadBufferFromHTTP( ...@@ -91,9 +91,9 @@ ReadBufferFromHTTP::ReadBufferFromHTTP(
{ {
std::stringstream error_message; std::stringstream error_message;
error_message << "Received error from remote server " << uri.str() << ". HTTP status code: " error_message << "Received error from remote server " << uri.str() << ". HTTP status code: "
<< status << ", body: " << istr->rdbuf(); << status << " " << response.getReason() << ", body: " << istr->rdbuf();
throw Exception(error_message.str(), ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER); throw Exception(error_message.str(), status == HTTP_TOO_MANY_REQUESTS ? ErrorCodes::RECEIVED_ERROR_TOO_MANY_REQUESTS : ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER);
} }
impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size_); impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size_);
......
...@@ -14,6 +14,8 @@ ...@@ -14,6 +14,8 @@
namespace DB namespace DB
{ {
const int HTTP_TOO_MANY_REQUESTS = 429;
/** Perform HTTP-request and provide response to read. /** Perform HTTP-request and provide response to read.
*/ */
class ReadBufferFromHTTP : public ReadBuffer class ReadBufferFromHTTP : public ReadBuffer
......
...@@ -21,6 +21,7 @@ namespace DB ...@@ -21,6 +21,7 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int RECEIVED_ERROR_FROM_REMOTE_IO_SERVER; extern const int RECEIVED_ERROR_FROM_REMOTE_IO_SERVER;
extern const int RECEIVED_ERROR_TOO_MANY_REQUESTS;
} }
static Poco::Net::IPAddress resolveHostImpl(const String & host) static Poco::Net::IPAddress resolveHostImpl(const String & host)
...@@ -86,9 +87,9 @@ ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP( ...@@ -86,9 +87,9 @@ ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(
{ {
std::stringstream error_message; std::stringstream error_message;
error_message << "Received error from remote server " << uri.toString() << ". HTTP status code: " error_message << "Received error from remote server " << uri.toString() << ". HTTP status code: "
<< status << ", body: " << istr->rdbuf(); << status << " " << response.getReason() << ", body: " << istr->rdbuf();
throw Exception(error_message.str(), ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER); throw Exception(error_message.str(), status == HTTP_TOO_MANY_REQUESTS ? ErrorCodes::RECEIVED_ERROR_TOO_MANY_REQUESTS : ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER);
} }
impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size_); impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size_);
......
...@@ -44,7 +44,8 @@ void WriteBufferFromHTTPServerResponse::finishSendHeaders() ...@@ -44,7 +44,8 @@ void WriteBufferFromHTTPServerResponse::finishSendHeaders()
#if POCO_CLICKHOUSE_PATCH #if POCO_CLICKHOUSE_PATCH
/// Send end of headers delimiter. /// Send end of headers delimiter.
*response_header_ostr << "\r\n" << std::flush; if (response_header_ostr)
*response_header_ostr << "\r\n" << std::flush;
#else #else
/// Newline autosent by response.send() /// Newline autosent by response.send()
/// if nothing to send in body: /// if nothing to send in body:
......
...@@ -11,6 +11,8 @@ ...@@ -11,6 +11,8 @@
#include <atomic> #include <atomic>
#include <Poco/Net/HTMLForm.h> #include <Poco/Net/HTMLForm.h>
namespace Poco { namespace Net { class HTTPServerResponse; } }
namespace DB namespace DB
{ {
...@@ -64,7 +66,7 @@ class InterserverIOEndpoint ...@@ -64,7 +66,7 @@ class InterserverIOEndpoint
{ {
public: public:
virtual std::string getId(const std::string & path) const = 0; virtual std::string getId(const std::string & path) const = 0;
virtual void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out) = 0; virtual void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) = 0;
virtual ~InterserverIOEndpoint() {} virtual ~InterserverIOEndpoint() {}
void cancel() { is_cancelled = true; } void cancel() { is_cancelled = true; }
......
...@@ -14,9 +14,9 @@ namespace ErrorCodes ...@@ -14,9 +14,9 @@ namespace ErrorCodes
extern const int POCO_EXCEPTION; extern const int POCO_EXCEPTION;
extern const int STD_EXCEPTION; extern const int STD_EXCEPTION;
extern const int UNKNOWN_EXCEPTION; extern const int UNKNOWN_EXCEPTION;
extern const int TOO_MUCH_SIMULTANEOUS_QUERIES;
} }
void InterserverIOHTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) void InterserverIOHTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
{ {
HTMLForm params(request); HTMLForm params(request);
...@@ -37,11 +37,11 @@ void InterserverIOHTTPHandler::processQuery(Poco::Net::HTTPServerRequest & reque ...@@ -37,11 +37,11 @@ void InterserverIOHTTPHandler::processQuery(Poco::Net::HTTPServerRequest & reque
if (compress) if (compress)
{ {
CompressedWriteBuffer compressed_out(out); CompressedWriteBuffer compressed_out(out);
endpoint->processQuery(params, body, compressed_out); endpoint->processQuery(params, body, compressed_out, response);
} }
else else
{ {
endpoint->processQuery(params, body, out); endpoint->processQuery(params, body, out, response);
} }
out.finalize(); out.finalize();
...@@ -61,6 +61,14 @@ void InterserverIOHTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & requ ...@@ -61,6 +61,14 @@ void InterserverIOHTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & requ
} }
catch (Exception & e) catch (Exception & e)
{ {
if (e.code() == ErrorCodes::TOO_MUCH_SIMULTANEOUS_QUERIES)
{
if (!response.sent())
response.send();
return;
}
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
/// Sending to remote server was cancelled due to server shutdown or drop table. /// Sending to remote server was cancelled due to server shutdown or drop table.
......
...@@ -4,6 +4,8 @@ ...@@ -4,6 +4,8 @@
#include <Common/NetException.h> #include <Common/NetException.h>
#include <IO/ReadBufferFromHTTP.h> #include <IO/ReadBufferFromHTTP.h>
#include <Poco/File.h> #include <Poco/File.h>
#include <ext/scope_guard.hpp>
#include <Poco/Net/HTTPServerResponse.h>
namespace CurrentMetrics namespace CurrentMetrics
...@@ -19,6 +21,7 @@ namespace ErrorCodes ...@@ -19,6 +21,7 @@ namespace ErrorCodes
{ {
extern const int ABORTED; extern const int ABORTED;
extern const int BAD_SIZE_OF_FILE_IN_DATA_PART; extern const int BAD_SIZE_OF_FILE_IN_DATA_PART;
extern const int TOO_MUCH_SIMULTANEOUS_QUERIES;
} }
namespace DataPartsExchange namespace DataPartsExchange
...@@ -39,7 +42,7 @@ std::string Service::getId(const std::string & node_id) const ...@@ -39,7 +42,7 @@ std::string Service::getId(const std::string & node_id) const
return getEndpointId(node_id); return getEndpointId(node_id);
} }
void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out) void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response)
{ {
if (is_cancelled) if (is_cancelled)
throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED); throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED);
...@@ -49,6 +52,24 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body ...@@ -49,6 +52,24 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body
bool send_sharded_part = !shard_str.empty(); bool send_sharded_part = !shard_str.empty();
static std::atomic_uint total_sends {0};
if (total_sends >= data.settings.replicated_max_parallel_sends
|| data.current_table_sends >= data.settings.replicated_max_parallel_sends_for_table)
{
response.setStatus(std::to_string(HTTP_TOO_MANY_REQUESTS));
response.setReason("Too many concurrent fetches, try again later");
response.set("Retry-After", "10");
response.setChunkedTransferEncoding(false);
return;
}
++total_sends;
SCOPE_EXIT({--total_sends;});
++data.current_table_sends;
SCOPE_EXIT({--data.current_table_sends;});
LOG_TRACE(log, "Sending part " << part_name); LOG_TRACE(log, "Sending part " << part_name);
try try
......
...@@ -24,7 +24,7 @@ public: ...@@ -24,7 +24,7 @@ public:
Service & operator=(const Service &) = delete; Service & operator=(const Service &) = delete;
std::string getId(const std::string & node_id) const override; std::string getId(const std::string & node_id) const override;
void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out) override; void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) override;
private: private:
MergeTreeData::DataPartPtr findPart(const String & name); MergeTreeData::DataPartPtr findPart(const String & name);
......
...@@ -472,6 +472,9 @@ public: ...@@ -472,6 +472,9 @@ public:
Block primary_key_sample; Block primary_key_sample;
DataTypes primary_key_data_types; DataTypes primary_key_data_types;
/// Limiting parallel sends per one table, used in DataPartsExchange
std::atomic_uint current_table_sends {0};
private: private:
friend struct MergeTreeDataPart; friend struct MergeTreeDataPart;
friend class StorageMergeTree; friend class StorageMergeTree;
......
...@@ -78,6 +78,9 @@ struct MergeTreeSettings ...@@ -78,6 +78,9 @@ struct MergeTreeSettings
/// Limit parallel fetches /// Limit parallel fetches
size_t replicated_max_parallel_fetches = 4; size_t replicated_max_parallel_fetches = 4;
size_t replicated_max_parallel_fetches_for_table = 2; size_t replicated_max_parallel_fetches_for_table = 2;
/// Limit parallel sends
size_t replicated_max_parallel_sends = 4;
size_t replicated_max_parallel_sends_for_table = 2;
/// If ration of wrong parts to total number of parts is less than this - allow to start anyway. /// If ration of wrong parts to total number of parts is less than this - allow to start anyway.
double replicated_max_ratio_of_wrong_parts = 0.05; double replicated_max_ratio_of_wrong_parts = 0.05;
...@@ -138,6 +141,8 @@ struct MergeTreeSettings ...@@ -138,6 +141,8 @@ struct MergeTreeSettings
SET_SIZE_T(replicated_max_missing_active_parts); SET_SIZE_T(replicated_max_missing_active_parts);
SET_SIZE_T(replicated_max_parallel_fetches); SET_SIZE_T(replicated_max_parallel_fetches);
SET_SIZE_T(replicated_max_parallel_fetches_for_table); SET_SIZE_T(replicated_max_parallel_fetches_for_table);
SET_SIZE_T(replicated_max_parallel_sends);
SET_SIZE_T(replicated_max_parallel_sends_for_table);
SET_DOUBLE(replicated_max_ratio_of_wrong_parts); SET_DOUBLE(replicated_max_ratio_of_wrong_parts);
SET_SIZE_T(zookeeper_session_expiration_check_period); SET_SIZE_T(zookeeper_session_expiration_check_period);
SET_SIZE_T(check_delay_period); SET_SIZE_T(check_delay_period);
......
...@@ -36,7 +36,7 @@ std::string Service::getId(const std::string & node_id) const ...@@ -36,7 +36,7 @@ std::string Service::getId(const std::string & node_id) const
return getEndpointId(node_id); return getEndpointId(node_id);
} }
void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out) void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response)
{ {
if (is_cancelled) if (is_cancelled)
throw Exception{"RemoteDiskSpaceMonitor service terminated", ErrorCodes::ABORTED}; throw Exception{"RemoteDiskSpaceMonitor service terminated", ErrorCodes::ABORTED};
......
...@@ -21,7 +21,7 @@ public: ...@@ -21,7 +21,7 @@ public:
Service(const Service &) = delete; Service(const Service &) = delete;
Service & operator=(const Service &) = delete; Service & operator=(const Service &) = delete;
std::string getId(const std::string & node_id) const override; std::string getId(const std::string & node_id) const override;
void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out) override; void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) override;
private: private:
const Context & context; const Context & context;
......
...@@ -33,7 +33,7 @@ std::string Service::getId(const std::string & node_id) const ...@@ -33,7 +33,7 @@ std::string Service::getId(const std::string & node_id) const
return getEndpointId(node_id); return getEndpointId(node_id);
} }
void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out) void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response)
{ {
auto part_name = params.get("part"); auto part_name = params.get("part");
auto hash = params.get("hash"); auto hash = params.get("hash");
......
...@@ -27,7 +27,7 @@ public: ...@@ -27,7 +27,7 @@ public:
Service(const Service &) = delete; Service(const Service &) = delete;
Service & operator=(const Service &) = delete; Service & operator=(const Service &) = delete;
std::string getId(const std::string & node_id) const override; std::string getId(const std::string & node_id) const override;
void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out) override; void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) override;
private: private:
StoragePtr owned_storage; StoragePtr owned_storage;
......
...@@ -35,7 +35,7 @@ std::string Service::getId(const std::string & node_id) const ...@@ -35,7 +35,7 @@ std::string Service::getId(const std::string & node_id) const
return getEndpointId(node_id); return getEndpointId(node_id);
} }
void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out) void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response)
{ {
if (is_cancelled) if (is_cancelled)
throw Exception{"RemoteQueryExecutor service terminated", ErrorCodes::ABORTED}; throw Exception{"RemoteQueryExecutor service terminated", ErrorCodes::ABORTED};
......
...@@ -20,7 +20,7 @@ public: ...@@ -20,7 +20,7 @@ public:
Service(const Service &) = delete; Service(const Service &) = delete;
Service & operator=(const Service &) = delete; Service & operator=(const Service &) = delete;
std::string getId(const std::string & node_id) const override; std::string getId(const std::string & node_id) const override;
void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out) override; void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) override;
private: private:
Context & context; Context & context;
......
...@@ -45,7 +45,7 @@ std::string Service::getId(const std::string & node_id) const ...@@ -45,7 +45,7 @@ std::string Service::getId(const std::string & node_id) const
return getEndpointId(node_id); return getEndpointId(node_id);
} }
void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out) void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response)
{ {
std::string part_name = params.get("path"); std::string part_name = params.get("path");
std::string replica_path = params.get("endpoint"); std::string replica_path = params.get("endpoint");
......
...@@ -23,7 +23,7 @@ public: ...@@ -23,7 +23,7 @@ public:
Service(const Service &) = delete; Service(const Service &) = delete;
Service & operator=(const Service &) = delete; Service & operator=(const Service &) = delete;
std::string getId(const std::string & node_id) const override; std::string getId(const std::string & node_id) const override;
void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out) override; void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) override;
private: private:
StoragePtr owned_storage; StoragePtr owned_storage;
......
...@@ -103,6 +103,7 @@ namespace ErrorCodes ...@@ -103,6 +103,7 @@ namespace ErrorCodes
extern const int UNFINISHED; extern const int UNFINISHED;
extern const int METADATA_MISMATCH; extern const int METADATA_MISMATCH;
extern const int RESHARDING_NULLABLE_SHARDING_KEY; extern const int RESHARDING_NULLABLE_SHARDING_KEY;
extern const int RECEIVED_ERROR_TOO_MANY_REQUESTS;
} }
...@@ -1417,8 +1418,21 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry) ...@@ -1417,8 +1418,21 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
} }
} }
if (!fetchPart(covering_part, zookeeper_path + "/replicas/" + replica, false, entry.quorum)) try
return false; {
if (!fetchPart(covering_part, zookeeper_path + "/replicas/" + replica, false, entry.quorum))
return false;
}
catch (const Exception & e)
{
/// No stacktrace, just log message
if (e.code() == ErrorCodes::RECEIVED_ERROR_TOO_MANY_REQUESTS)
{
LOG_INFO(log, "Too busy replica. Will try later. " << e.message());
return false;
}
throw;
}
if (entry.type == LogEntry::MERGE_PARTS) if (entry.type == LogEntry::MERGE_PARTS)
ProfileEvents::increment(ProfileEvents::ReplicatedPartFetchesOfMerged); ProfileEvents::increment(ProfileEvents::ReplicatedPartFetchesOfMerged);
...@@ -3444,7 +3458,7 @@ void StorageReplicatedMergeTree::fetchPartition(const Field & partition, const S ...@@ -3444,7 +3458,7 @@ void StorageReplicatedMergeTree::fetchPartition(const Field & partition, const S
} }
catch (const DB::Exception & e) catch (const DB::Exception & e)
{ {
if (e.code() != ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER) if (e.code() != ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER && e.code() != ErrorCodes::RECEIVED_ERROR_TOO_MANY_REQUESTS)
throw; throw;
LOG_INFO(log, e.displayText()); LOG_INFO(log, e.displayText());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册