From 5ec000540033478b36dede5357e1c5812ba879ed Mon Sep 17 00:00:00 2001 From: root Date: Mon, 16 Sep 2019 17:28:41 +0000 Subject: [PATCH] +UpdatableSessionBase Committer: maqroll --- dbms/src/IO/HTTPCommon.cpp | 8 +- dbms/src/IO/HTTPCommon.h | 4 +- dbms/src/IO/ReadWriteBufferFromHTTP.cpp | 30 +--- dbms/src/IO/ReadWriteBufferFromHTTP.h | 210 +++++++++++++++++++----- dbms/src/IO/WriteBufferFromHTTP.cpp | 2 +- dbms/src/Storages/StorageURL.cpp | 2 +- 6 files changed, 176 insertions(+), 80 deletions(-) diff --git a/dbms/src/IO/HTTPCommon.cpp b/dbms/src/IO/HTTPCommon.cpp index eb48e54419..3d877fc756 100644 --- a/dbms/src/IO/HTTPCommon.cpp +++ b/dbms/src/IO/HTTPCommon.cpp @@ -217,17 +217,15 @@ PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const Connecti return HTTPSessionPool::instance().getSession(uri, timeouts, per_endpoint_pool_size); } +bool isRedirect(const Poco::Net::HTTPResponse::HTTPStatus status) { return status == Poco::Net::HTTPResponse::HTTP_MOVED_PERMANENTLY || status == Poco::Net::HTTPResponse::HTTP_FOUND || status == Poco::Net::HTTPResponse::HTTP_SEE_OTHER || status == Poco::Net::HTTPResponse::HTTP_TEMPORARY_REDIRECT; } std::istream * receiveResponse( - Poco::Net::HTTPClientSession & session, const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response) + Poco::Net::HTTPClientSession & session, const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response, const bool allow_redirects) { auto istr = &session.receiveResponse(response); auto status = response.getStatus(); - if ((request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET) && (status == Poco::Net::HTTPResponse::HTTP_MOVED_PERMANENTLY || status == Poco::Net::HTTPResponse::HTTP_FOUND || status == Poco::Net::HTTPResponse::HTTP_SEE_OTHER || status == Poco::Net::HTTPResponse::HTTP_TEMPORARY_REDIRECT)) - throw Poco::URIRedirection(response.get("Location")); - - if (status != Poco::Net::HTTPResponse::HTTP_OK) + if (!(status == Poco::Net::HTTPResponse::HTTP_OK || (isRedirect(status) && allow_redirects))) { std::stringstream error_message; error_message << "Received error from remote server " << request.getURI() << ". HTTP status code: " << status << " " diff --git a/dbms/src/IO/HTTPCommon.h b/dbms/src/IO/HTTPCommon.h index dda8d2aac7..f05efa15d3 100644 --- a/dbms/src/IO/HTTPCommon.h +++ b/dbms/src/IO/HTTPCommon.h @@ -50,11 +50,13 @@ HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & /// As previous method creates session, but tooks it from pool PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size); +bool isRedirect(const Poco::Net::HTTPResponse::HTTPStatus status); + /** Used to receive response (response headers and possibly body) * after sending data (request headers and possibly body). * Throws exception in case of non HTTP_OK (200) response code. * Returned istream lives in 'session' object. */ std::istream * receiveResponse( - Poco::Net::HTTPClientSession & session, const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response); + Poco::Net::HTTPClientSession & session, const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response, bool allow_redirects); } diff --git a/dbms/src/IO/ReadWriteBufferFromHTTP.cpp b/dbms/src/IO/ReadWriteBufferFromHTTP.cpp index ec04de7802..89e8702001 100644 --- a/dbms/src/IO/ReadWriteBufferFromHTTP.cpp +++ b/dbms/src/IO/ReadWriteBufferFromHTTP.cpp @@ -9,33 +9,5 @@ namespace ErrorCodes extern const int TOO_MANY_REDIRECTS; } - -std::unique_ptr makeReadWriteBufferFromHTTP(const Poco::URI & uri, - const std::string & method, - std::function callback, - const DB::ConnectionTimeouts & timeouts, - const DB::SettingUInt64 max_redirects) - { - auto actual_uri =uri; - UInt64 redirects = 0; - - do - { - try - { - return std::make_unique(actual_uri, method, callback, timeouts); - } - catch (Poco::URIRedirection & exc) - { - redirects++; - actual_uri = exc.uri(); - } - } while(max_redirects>redirects); - - // too many redirects.... - std::stringstream error_message; - error_message << "Too many redirects while trying to access " << uri.toString() ; - - throw Exception(error_message.str(), ErrorCodes::TOO_MANY_REDIRECTS); - } } + diff --git a/dbms/src/IO/ReadWriteBufferFromHTTP.h b/dbms/src/IO/ReadWriteBufferFromHTTP.h index 9abcd0edf0..47ab323302 100644 --- a/dbms/src/IO/ReadWriteBufferFromHTTP.h +++ b/dbms/src/IO/ReadWriteBufferFromHTTP.h @@ -27,39 +27,78 @@ namespace DB /** Perform HTTP POST request and provide response to read. */ +namespace ErrorCodes +{ + extern const int TOO_MANY_REDIRECTS; +} + +template +class UpdatableSessionBase +{ +protected: + SessionPtr session; + UInt64 redirects { 0 }; + Poco::URI initial_uri; + const ConnectionTimeouts & timeouts; + DB::SettingUInt64 max_redirects; + +public: + void buildNewSession(const Poco::URI & uri); + + explicit UpdatableSessionBase(const Poco::URI uri, + const ConnectionTimeouts & timeouts_, + SettingUInt64 max_redirects_) + : initial_uri { uri } + , timeouts { timeouts_ } + , max_redirects { max_redirects_ } + { + } + + SessionPtr getSession() + { + return session; + } + + void updateSession(const Poco::URI & uri) + { + if (redirects++ + template class ReadWriteBufferFromHTTPBase : public ReadBuffer { protected: Poco::URI uri; std::string method; - SessionPtr session; + UpdatableSessionPtr session; std::istream * istr; /// owned by session std::unique_ptr impl; + std::function out_stream_callback; + const Poco::Net::HTTPBasicCredentials & credentials; - public: - using OutStreamCallback = std::function; - - explicit ReadWriteBufferFromHTTPBase(SessionPtr session_, - Poco::URI uri_, - const std::string & method_ = {}, - OutStreamCallback out_stream_callback = {}, - const Poco::Net::HTTPBasicCredentials & credentials = {}, - size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE) - : ReadBuffer(nullptr, 0) - , uri {uri_} - , method {!method_.empty() ? method_ : out_stream_callback ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET} - , session {session_} + protected: + std::istream * call(const Poco::URI uri_, Poco::Net::HTTPResponse & response) { // With empty path poco will send "POST HTTP/1.1" its bug. if (uri.getPath().empty()) uri.setPath("/"); - Poco::Net::HTTPRequest request(method, uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1); - request.setHost(uri.getHost()); // use original, not resolved host name in header + Poco::Net::HTTPRequest request(method, uri_.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1); + request.setHost(uri_.getHost()); // use original, not resolved host name in header if (out_stream_callback) request.setChunkedTransferEncoding(true); @@ -67,26 +106,70 @@ namespace detail if (!credentials.getUsername().empty()) credentials.authenticate(request); - Poco::Net::HTTPResponse response; - LOG_TRACE((&Logger::get("ReadWriteBufferFromHTTP")), "Sending request to " << uri.toString()); + auto sess = session->getSession(); + + auto & stream_out = sess->sendRequest(request); + + if (out_stream_callback) + out_stream_callback(stream_out); + try { - auto & stream_out = session->sendRequest(request); + istr = receiveResponse(*sess, request, response, true); - if (out_stream_callback) - out_stream_callback(stream_out); + return istr; - istr = receiveResponse(*session, request, response); + } + catch (const Poco::Exception & e) + { + /// We use session data storage as storage for exception text + /// Depend on it we can deduce to reconnect session or reresolve session host + sess->attachSessionData(e.message()); + throw; + } + } + public: + using OutStreamCallback = std::function; + + explicit ReadWriteBufferFromHTTPBase(UpdatableSessionPtr session_, + Poco::URI uri_, + const std::string & method_ = {}, + OutStreamCallback out_stream_callback_ = {}, + const Poco::Net::HTTPBasicCredentials & credentials_ = {}, + size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE) + : ReadBuffer(nullptr, 0) + , uri {uri_} + , method {!method_.empty() ? method_ : out_stream_callback ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET} + , session {session_} + , out_stream_callback {out_stream_callback_} + , credentials {credentials_} + { + Poco::Net::HTTPResponse response; + + istr = call(uri, response); + + while (isRedirect(response.getStatus())) + { + Poco::URI uri_redirect(response.get("Location")); + + session->updateSession(uri_redirect); + + istr = call(uri_redirect,response); + } + + try + { impl = std::make_unique(*istr, buffer_size_); } catch (const Poco::Exception & e) { /// We use session data storage as storage for exception text /// Depend on it we can deduce to reconnect session or reresolve session host - session->attachSessionData(e.message()); + auto sess = session->getSession(); + sess->attachSessionData(e.message()); throw; } } @@ -103,47 +186,88 @@ namespace detail }; } -class ReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase +class UpdatableSession : public UpdatableSessionBase { - using Parent = detail::ReadWriteBufferFromHTTPBase; + using Parent = UpdatableSessionBase; + +public: + explicit UpdatableSession(const Poco::URI uri, + const ConnectionTimeouts & timeouts_, + const SettingUInt64 max_redirects_) + : Parent(uri, timeouts_, max_redirects_) + { + session = makeHTTPSession(initial_uri, timeouts); + } + + void buildNewSession(const Poco::URI uri) + { + session = makeHTTPSession(uri, timeouts); + } +}; + +class ReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase> +{ + using Parent = detail::ReadWriteBufferFromHTTPBase>; public: explicit ReadWriteBufferFromHTTP(Poco::URI uri_, const std::string & method_ = {}, - OutStreamCallback out_stream_callback = {}, + OutStreamCallback out_stream_callback_ = {}, const ConnectionTimeouts & timeouts = {}, - const Poco::Net::HTTPBasicCredentials & credentials = {}, + const DB::SettingUInt64 max_redirects = 0, + const Poco::Net::HTTPBasicCredentials & credentials_ = {}, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE) - : Parent(makeHTTPSession(uri_, timeouts), uri_, method_, out_stream_callback, credentials, buffer_size_) + : Parent(std::make_shared(uri_, timeouts, max_redirects), uri_, method_, out_stream_callback_, credentials_, buffer_size_) + { + } +}; + +class UpdatablePooledSession : public UpdatableSessionBase +{ + using Parent = UpdatableSessionBase; + +private: + size_t per_endpoint_pool_size; + +public: + explicit UpdatablePooledSession(const Poco::URI uri, + const ConnectionTimeouts & timeouts_, + const SettingUInt64 max_redirects_, + size_t per_endpoint_pool_size_) + : Parent(uri, timeouts_, max_redirects_) + , per_endpoint_pool_size { per_endpoint_pool_size_ } { + session = makePooledHTTPSession(initial_uri, timeouts, per_endpoint_pool_size); + } + + void buildNewSession(const Poco::URI uri) + { + session = makePooledHTTPSession(uri, timeouts, per_endpoint_pool_size); } }; -class PooledReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase + +class PooledReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase> { - using Parent = detail::ReadWriteBufferFromHTTPBase; + using Parent = detail::ReadWriteBufferFromHTTPBase>; public: explicit PooledReadWriteBufferFromHTTP(Poco::URI uri_, const std::string & method_ = {}, - OutStreamCallback out_stream_callback = {}, - const ConnectionTimeouts & timeouts = {}, - const Poco::Net::HTTPBasicCredentials & credentials = {}, + OutStreamCallback out_stream_callback_ = {}, + const ConnectionTimeouts & timeouts_ = {}, + const Poco::Net::HTTPBasicCredentials & credentials_ = {}, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, + const DB::SettingUInt64 max_redirects = 0, size_t max_connections_per_endpoint = DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT) - : Parent(makePooledHTTPSession(uri_, timeouts, max_connections_per_endpoint), + : Parent(std::make_shared(uri_, timeouts_, max_redirects, max_connections_per_endpoint), uri_, method_, - out_stream_callback, - credentials, + out_stream_callback_, + credentials_, buffer_size_) { } }; -std::unique_ptr makeReadWriteBufferFromHTTP(const Poco::URI & uri, - const std::string & method, - std::function callback, - const ConnectionTimeouts & timeouts, - const SettingUInt64 max_redirects); - } + diff --git a/dbms/src/IO/WriteBufferFromHTTP.cpp b/dbms/src/IO/WriteBufferFromHTTP.cpp index c74c74a0bd..0a8095b960 100644 --- a/dbms/src/IO/WriteBufferFromHTTP.cpp +++ b/dbms/src/IO/WriteBufferFromHTTP.cpp @@ -22,7 +22,7 @@ WriteBufferFromHTTP::WriteBufferFromHTTP( void WriteBufferFromHTTP::finalize() { - receiveResponse(*session, request, response); + receiveResponse(*session, request, response, false); /// TODO: Response body is ignored. } diff --git a/dbms/src/Storages/StorageURL.cpp b/dbms/src/Storages/StorageURL.cpp index ebbffd2067..074e99c533 100644 --- a/dbms/src/Storages/StorageURL.cpp +++ b/dbms/src/Storages/StorageURL.cpp @@ -54,7 +54,7 @@ namespace const ConnectionTimeouts & timeouts) : name(name_) { - read_buf = makeReadWriteBufferFromHTTP(uri, method, callback, timeouts,context.getSettingsRef().max_http_get_redirects); + read_buf = std::make_unique(uri, method, callback, timeouts, context.getSettingsRef().max_http_get_redirects); reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size); } -- GitLab