diff --git a/base/daemon/BaseDaemon.h b/base/daemon/BaseDaemon.h index 42d94629ae9814814ec1279fc35640087733d96c..8b9d765cf2efd2c17b51298b3a62a22ef8f9ba1d 100644 --- a/base/daemon/BaseDaemon.h +++ b/base/daemon/BaseDaemon.h @@ -83,7 +83,7 @@ public: template void writeToGraphite(const std::string & key, const T & value, const std::string & config_name = DEFAULT_GRAPHITE_CONFIG_NAME, time_t timestamp = 0, const std::string & custom_root_path = "") { - auto writer = getGraphiteWriter(config_name); + auto *writer = getGraphiteWriter(config_name); if (writer) writer->write(key, value, timestamp, custom_root_path); } @@ -91,7 +91,7 @@ public: template void writeToGraphite(const GraphiteWriter::KeyValueVector & key_vals, const std::string & config_name = DEFAULT_GRAPHITE_CONFIG_NAME, time_t timestamp = 0, const std::string & custom_root_path = "") { - auto writer = getGraphiteWriter(config_name); + auto *writer = getGraphiteWriter(config_name); if (writer) writer->write(key_vals, timestamp, custom_root_path); } @@ -99,7 +99,7 @@ public: template void writeToGraphite(const GraphiteWriter::KeyValueVector & key_vals, const std::chrono::system_clock::time_point & current_time, const std::string & custom_root_path) { - auto writer = getGraphiteWriter(); + auto *writer = getGraphiteWriter(); if (writer) writer->write(key_vals, std::chrono::system_clock::to_time_t(current_time), custom_root_path); } diff --git a/programs/odbc-bridge/ColumnInfoHandler.cpp b/programs/odbc-bridge/ColumnInfoHandler.cpp index ee4daa3e16db2de068b7c08a78e34bae18063927..5aef7f1ac385672520dfc73cd5c31005ca3d4e05 100644 --- a/programs/odbc-bridge/ColumnInfoHandler.cpp +++ b/programs/odbc-bridge/ColumnInfoHandler.cpp @@ -4,14 +4,14 @@ # include # include -# include +# include # include # include # include # include # include # include -# include +# include # include # include # include @@ -59,16 +59,16 @@ namespace } } -void ODBCColumnsInfoHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) +void ODBCColumnsInfoHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) { - Poco::Net::HTMLForm params(request, request.stream()); + HTMLForm params(request, request.getStream()); LOG_TRACE(log, "Request URI: {}", request.getURI()); auto process_error = [&response, this](const std::string & message) { response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); if (!response.sent()) - response.send() << message << std::endl; + *response.send() << message << std::endl; LOG_WARNING(log, message); }; @@ -159,7 +159,7 @@ void ODBCColumnsInfoHandler::handleRequest(Poco::Net::HTTPServerRequest & reques columns.emplace_back(reinterpret_cast(column_name), std::move(column_type)); } - WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout); + WriteBufferFromHTTPServerResponse out(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout); writeStringBinary(columns.toString(), out); } catch (...) diff --git a/programs/odbc-bridge/ColumnInfoHandler.h b/programs/odbc-bridge/ColumnInfoHandler.h index 04b4c06693bddb8d9e45103d66f60e54270ad866..9b5b470b31d00e88ad18c8e5ebb4dc5ed4232833 100644 --- a/programs/odbc-bridge/ColumnInfoHandler.h +++ b/programs/odbc-bridge/ColumnInfoHandler.h @@ -3,10 +3,11 @@ #if USE_ODBC # include -# include -# include +# include # include +# include + /** The structure of the table is taken from the query "SELECT * FROM table WHERE 1=0". * TODO: It would be much better to utilize ODBC methods dedicated for columns description. * If there is no such table, an exception is thrown. @@ -14,7 +15,7 @@ namespace DB { -class ODBCColumnsInfoHandler : public Poco::Net::HTTPRequestHandler +class ODBCColumnsInfoHandler : public HTTPRequestHandler { public: ODBCColumnsInfoHandler(size_t keep_alive_timeout_, Context & context_) @@ -22,7 +23,7 @@ public: { } - void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override; + void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override; private: Poco::Logger * log; diff --git a/programs/odbc-bridge/HandlerFactory.cpp b/programs/odbc-bridge/HandlerFactory.cpp index 0cc40480b874724f0f2da9508608a97d620cdefc..9ac48af4ace9fb646f828fa997397508e341dc90 100644 --- a/programs/odbc-bridge/HandlerFactory.cpp +++ b/programs/odbc-bridge/HandlerFactory.cpp @@ -7,39 +7,40 @@ namespace DB { -Poco::Net::HTTPRequestHandler * HandlerFactory::createRequestHandler(const Poco::Net::HTTPServerRequest & request) + +std::unique_ptr HandlerFactory::createRequestHandler(const HTTPServerRequest & request) { Poco::URI uri{request.getURI()}; LOG_TRACE(log, "Request URI: {}", uri.toString()); if (uri.getPath() == "/ping" && request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET) - return new PingHandler(keep_alive_timeout); + return std::make_unique(keep_alive_timeout); if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST) { if (uri.getPath() == "/columns_info") #if USE_ODBC - return new ODBCColumnsInfoHandler(keep_alive_timeout, context); + return std::make_unique(keep_alive_timeout, context); #else return nullptr; #endif else if (uri.getPath() == "/identifier_quote") #if USE_ODBC - return new IdentifierQuoteHandler(keep_alive_timeout, context); + return std::make_unique(keep_alive_timeout, context); #else return nullptr; #endif else if (uri.getPath() == "/schema_allowed") #if USE_ODBC - return new SchemaAllowedHandler(keep_alive_timeout, context); + return std::make_unique(keep_alive_timeout, context); #else return nullptr; #endif else if (uri.getPath() == "/write") - return new ODBCHandler(pool_map, keep_alive_timeout, context, "write"); + return std::make_unique(pool_map, keep_alive_timeout, context, "write"); else - return new ODBCHandler(pool_map, keep_alive_timeout, context, "read"); + return std::make_unique(pool_map, keep_alive_timeout, context, "read"); } return nullptr; } diff --git a/programs/odbc-bridge/HandlerFactory.h b/programs/odbc-bridge/HandlerFactory.h index 1d4edfc9dd1fb9b074f3ae2ae5434f7a26bc0b15..5dce6f02ecd89f6bd3c28be324a0508882b97136 100644 --- a/programs/odbc-bridge/HandlerFactory.h +++ b/programs/odbc-bridge/HandlerFactory.h @@ -1,16 +1,17 @@ #pragma once + #include -#include -#include -#include -#include "MainHandler.h" +#include #include "ColumnInfoHandler.h" #include "IdentifierQuoteHandler.h" +#include "MainHandler.h" #include "SchemaAllowedHandler.h" +#include + #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-parameter" - #include +#include #pragma GCC diagnostic pop @@ -19,7 +20,7 @@ namespace DB /** Factory for '/ping', '/', '/columns_info', '/identifier_quote', '/schema_allowed' handlers. * Also stores Session pools for ODBC connections */ -class HandlerFactory : public Poco::Net::HTTPRequestHandlerFactory +class HandlerFactory : public HTTPRequestHandlerFactory { public: HandlerFactory(const std::string & name_, size_t keep_alive_timeout_, Context & context_) @@ -28,7 +29,7 @@ public: pool_map = std::make_shared(); } - Poco::Net::HTTPRequestHandler * createRequestHandler(const Poco::Net::HTTPServerRequest & request) override; + std::unique_ptr createRequestHandler(const HTTPServerRequest & request) override; private: Poco::Logger * log; diff --git a/programs/odbc-bridge/IdentifierQuoteHandler.cpp b/programs/odbc-bridge/IdentifierQuoteHandler.cpp index 2c3701cfff9ea7f66a3377ac955717b1082023d3..ec4e4493d61ea2f62f89721abbdd96495087a9e5 100644 --- a/programs/odbc-bridge/IdentifierQuoteHandler.cpp +++ b/programs/odbc-bridge/IdentifierQuoteHandler.cpp @@ -3,14 +3,14 @@ #if USE_ODBC # include -# include +# include +# include # include # include # include # include # include # include -# include # include # include # include @@ -22,16 +22,16 @@ namespace DB { -void IdentifierQuoteHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) +void IdentifierQuoteHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) { - Poco::Net::HTMLForm params(request, request.stream()); + HTMLForm params(request, request.getStream()); LOG_TRACE(log, "Request URI: {}", request.getURI()); auto process_error = [&response, this](const std::string & message) { response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); if (!response.sent()) - response.send() << message << std::endl; + *response.send() << message << std::endl; LOG_WARNING(log, message); }; @@ -49,7 +49,7 @@ void IdentifierQuoteHandler::handleRequest(Poco::Net::HTTPServerRequest & reques auto identifier = getIdentifierQuote(hdbc); - WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout); + WriteBufferFromHTTPServerResponse out(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout); writeStringBinary(identifier, out); } catch (...) diff --git a/programs/odbc-bridge/IdentifierQuoteHandler.h b/programs/odbc-bridge/IdentifierQuoteHandler.h index fd357e32786261b4ec4e2ce31832d48020490bcd..dad88c72ad822746d4a152473f4985bc8d6bcf3d 100644 --- a/programs/odbc-bridge/IdentifierQuoteHandler.h +++ b/programs/odbc-bridge/IdentifierQuoteHandler.h @@ -1,8 +1,9 @@ #pragma once #include +#include + #include -#include #if USE_ODBC @@ -10,7 +11,7 @@ namespace DB { -class IdentifierQuoteHandler : public Poco::Net::HTTPRequestHandler +class IdentifierQuoteHandler : public HTTPRequestHandler { public: IdentifierQuoteHandler(size_t keep_alive_timeout_, Context &) @@ -18,7 +19,7 @@ public: { } - void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override; + void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override; private: Poco::Logger * log; diff --git a/programs/odbc-bridge/MainHandler.cpp b/programs/odbc-bridge/MainHandler.cpp index 64cb7bc0b4674f6f4512680303e6f95827de2a10..b96703978782fc5277ba2d15d1d9679278b9487e 100644 --- a/programs/odbc-bridge/MainHandler.cpp +++ b/programs/odbc-bridge/MainHandler.cpp @@ -7,7 +7,7 @@ #include #include #include -#include +#include #include #include #include @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -73,19 +74,19 @@ ODBCHandler::PoolPtr ODBCHandler::getPool(const std::string & connection_str) return pool_map->at(connection_str); } -void ODBCHandler::processError(Poco::Net::HTTPServerResponse & response, const std::string & message) +void ODBCHandler::processError(HTTPServerResponse & response, const std::string & message) { - response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); + response.setStatusAndReason(HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); if (!response.sent()) - response.send() << message << std::endl; + *response.send() << message << std::endl; LOG_WARNING(log, message); } -void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) +void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) { - Poco::Net::HTMLForm params(request); + HTMLForm params(request); if (mode == "read") - params.read(request.stream()); + params.read(request.getStream()); LOG_TRACE(log, "Request URI: {}", request.getURI()); if (mode == "read" && !params.has("query")) @@ -136,7 +137,7 @@ void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne std::string connection_string = params.get("connection_string"); LOG_TRACE(log, "Connection string: '{}'", connection_string); - WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout); + WriteBufferFromHTTPServerResponse out(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout); try { @@ -163,9 +164,8 @@ void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne #endif auto pool = getPool(connection_string); - ReadBufferFromIStream read_buf(request.stream()); - auto input_format = FormatFactory::instance().getInput(format, read_buf, *sample_block, - context, max_block_size); + auto & read_buf = request.getStream(); + auto input_format = FormatFactory::instance().getInput(format, read_buf, *sample_block, context, max_block_size); auto input_stream = std::make_shared(input_format); ODBCBlockOutputStream output_stream(pool->get(), db_name, table_name, *sample_block, quoting_style); copyData(*input_stream, output_stream); diff --git a/programs/odbc-bridge/MainHandler.h b/programs/odbc-bridge/MainHandler.h index ec5e6693a607b1d4826c314e6748ba81be6fe31d..e237ede5814b80c7fbb848fca16c7c414bfd27b6 100644 --- a/programs/odbc-bridge/MainHandler.h +++ b/programs/odbc-bridge/MainHandler.h @@ -1,12 +1,13 @@ #pragma once #include +#include + #include -#include #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-parameter" - #include +#include #pragma GCC diagnostic pop namespace DB @@ -16,7 +17,7 @@ namespace DB * and also query in request body * response in RowBinary format */ -class ODBCHandler : public Poco::Net::HTTPRequestHandler +class ODBCHandler : public HTTPRequestHandler { public: using PoolPtr = std::shared_ptr; @@ -34,7 +35,7 @@ public: { } - void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override; + void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override; private: Poco::Logger * log; @@ -47,7 +48,7 @@ private: static inline std::mutex mutex; PoolPtr getPool(const std::string & connection_str); - void processError(Poco::Net::HTTPServerResponse & response, const std::string & message); + void processError(HTTPServerResponse & response, const std::string & message); }; } diff --git a/programs/odbc-bridge/ODBCBridge.cpp b/programs/odbc-bridge/ODBCBridge.cpp index 9deefaf78950d2f75153145872e20c8bbbebb058..8869a2639c1b8f780890daea076eb6c86801e35d 100644 --- a/programs/odbc-bridge/ODBCBridge.cpp +++ b/programs/odbc-bridge/ODBCBridge.cpp @@ -11,7 +11,6 @@ # include #endif -#include #include #include #include @@ -23,6 +22,7 @@ #include #include #include +#include namespace DB @@ -212,8 +212,12 @@ int ODBCBridge::main(const std::vector & /*args*/) SensitiveDataMasker::setInstance(std::make_unique(config(), "query_masking_rules")); } - auto server = Poco::Net::HTTPServer( - new HandlerFactory("ODBCRequestHandlerFactory-factory", keep_alive_timeout, context), server_pool, socket, http_params); + auto server = HTTPServer( + context, + std::make_shared("ODBCRequestHandlerFactory-factory", keep_alive_timeout, context), + server_pool, + socket, + http_params); server.start(); LOG_INFO(log, "Listening http://{}", address.toString()); diff --git a/programs/odbc-bridge/PingHandler.cpp b/programs/odbc-bridge/PingHandler.cpp index b0313e46bf39dc85edd7e09c0cc004de37812998..e3ab5e5cd00fa90b21438858ebaaecaffe454db0 100644 --- a/programs/odbc-bridge/PingHandler.cpp +++ b/programs/odbc-bridge/PingHandler.cpp @@ -6,7 +6,7 @@ namespace DB { -void PingHandler::handleRequest(Poco::Net::HTTPServerRequest & /*request*/, Poco::Net::HTTPServerResponse & response) +void PingHandler::handleRequest(HTTPServerRequest & /* request */, HTTPServerResponse & response) { try { diff --git a/programs/odbc-bridge/PingHandler.h b/programs/odbc-bridge/PingHandler.h index d8109a50bb6c38701ad664c218a51483a596ac21..c969ec55af76a337fd71d031eb70e9b119c42fc9 100644 --- a/programs/odbc-bridge/PingHandler.h +++ b/programs/odbc-bridge/PingHandler.h @@ -1,17 +1,19 @@ #pragma once -#include + +#include namespace DB { -/** Simple ping handler, answers "Ok." to GET request - */ -class PingHandler : public Poco::Net::HTTPRequestHandler + +/// Simple ping handler, answers "Ok." to GET request +class PingHandler : public HTTPRequestHandler { public: - PingHandler(size_t keep_alive_timeout_) : keep_alive_timeout(keep_alive_timeout_) {} - void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override; + explicit PingHandler(size_t keep_alive_timeout_) : keep_alive_timeout(keep_alive_timeout_) {} + void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override; private: size_t keep_alive_timeout; }; + } diff --git a/programs/odbc-bridge/SchemaAllowedHandler.cpp b/programs/odbc-bridge/SchemaAllowedHandler.cpp index fa08a27da5957da40fdc11c586ed4418a0b22734..48744b6d2ca227b5f5b8c790e15388f179a94028 100644 --- a/programs/odbc-bridge/SchemaAllowedHandler.cpp +++ b/programs/odbc-bridge/SchemaAllowedHandler.cpp @@ -2,12 +2,12 @@ #if USE_ODBC -# include +# include +# include # include # include # include # include -# include # include # include # include @@ -33,16 +33,16 @@ namespace } -void SchemaAllowedHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) +void SchemaAllowedHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) { - Poco::Net::HTMLForm params(request, request.stream()); + HTMLForm params(request, request.getStream()); LOG_TRACE(log, "Request URI: {}", request.getURI()); auto process_error = [&response, this](const std::string & message) { response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); if (!response.sent()) - response.send() << message << std::endl; + *response.send() << message << std::endl; LOG_WARNING(log, message); }; @@ -60,7 +60,7 @@ void SchemaAllowedHandler::handleRequest(Poco::Net::HTTPServerRequest & request, bool result = isSchemaAllowed(hdbc); - WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout); + WriteBufferFromHTTPServerResponse out(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout); writeBoolText(result, out); } catch (...) diff --git a/programs/odbc-bridge/SchemaAllowedHandler.h b/programs/odbc-bridge/SchemaAllowedHandler.h index 76aa23b903c64564042aca08d042af5ad4bc6aa6..91eddf678033b04470f24c2a8ce372e3d2d945d8 100644 --- a/programs/odbc-bridge/SchemaAllowedHandler.h +++ b/programs/odbc-bridge/SchemaAllowedHandler.h @@ -1,17 +1,18 @@ #pragma once +#include + #include -#include #if USE_ODBC namespace DB { -class Context; +class Context; -/// This handler establishes connection to database, and retrieve whether schema is allowed. -class SchemaAllowedHandler : public Poco::Net::HTTPRequestHandler +/// This handler establishes connection to database, and retrieves whether schema is allowed. +class SchemaAllowedHandler : public HTTPRequestHandler { public: SchemaAllowedHandler(size_t keep_alive_timeout_, Context &) @@ -19,7 +20,7 @@ public: { } - void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override; + void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override; private: Poco::Logger * log; diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index a96cb2b8973584ecc49ce07cb68e27047e2a87ee..4194bb4a06b3b1701f3e2e178bba35e1e50e950b 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -69,6 +69,7 @@ #include #include #include +#include #if !defined(ARCADIA_BUILD) @@ -1070,8 +1071,10 @@ int Server::main(const std::vector & /*args*/) socket.setReceiveTimeout(settings.http_receive_timeout); socket.setSendTimeout(settings.http_send_timeout); - servers->emplace_back(port_name, std::make_unique( - createHandlerFactory(*this, async_metrics, "HTTPHandler-factory"), server_pool, socket, http_params)); + servers->emplace_back( + port_name, + std::make_unique( + context(), createHandlerFactory(*this, async_metrics, "HTTPHandler-factory"), server_pool, socket, http_params)); LOG_INFO(log, "Listening for http://{}", address.toString()); }); @@ -1085,8 +1088,10 @@ int Server::main(const std::vector & /*args*/) auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); socket.setReceiveTimeout(settings.http_receive_timeout); socket.setSendTimeout(settings.http_send_timeout); - servers->emplace_back(port_name, std::make_unique( - createHandlerFactory(*this, async_metrics, "HTTPSHandler-factory"), server_pool, socket, http_params)); + servers->emplace_back( + port_name, + std::make_unique( + context(), createHandlerFactory(*this, async_metrics, "HTTPSHandler-factory"), server_pool, socket, http_params)); LOG_INFO(log, "Listening for https://{}", address.toString()); #else @@ -1160,8 +1165,14 @@ int Server::main(const std::vector & /*args*/) auto address = socketBindListen(socket, listen_host, port); socket.setReceiveTimeout(settings.http_receive_timeout); socket.setSendTimeout(settings.http_send_timeout); - servers->emplace_back(port_name, std::make_unique( - createHandlerFactory(*this, async_metrics, "InterserverIOHTTPHandler-factory"), server_pool, socket, http_params)); + servers->emplace_back( + port_name, + std::make_unique( + context(), + createHandlerFactory(*this, async_metrics, "InterserverIOHTTPHandler-factory"), + server_pool, + socket, + http_params)); LOG_INFO(log, "Listening for replica communication (interserver): http://{}", address.toString()); }); @@ -1174,8 +1185,14 @@ int Server::main(const std::vector & /*args*/) auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); socket.setReceiveTimeout(settings.http_receive_timeout); socket.setSendTimeout(settings.http_send_timeout); - servers->emplace_back(port_name, std::make_unique( - createHandlerFactory(*this, async_metrics, "InterserverIOHTTPSHandler-factory"), server_pool, socket, http_params)); + servers->emplace_back( + port_name, + std::make_unique( + context(), + createHandlerFactory(*this, async_metrics, "InterserverIOHTTPSHandler-factory"), + server_pool, + socket, + http_params)); LOG_INFO(log, "Listening for secure replica communication (interserver): https://{}", address.toString()); #else @@ -1235,8 +1252,14 @@ int Server::main(const std::vector & /*args*/) auto address = socketBindListen(socket, listen_host, port); socket.setReceiveTimeout(settings.http_receive_timeout); socket.setSendTimeout(settings.http_send_timeout); - servers->emplace_back(port_name, std::make_unique( - createHandlerFactory(*this, async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params)); + servers->emplace_back( + port_name, + std::make_unique( + context(), + createHandlerFactory(*this, async_metrics, "PrometheusHandler-factory"), + server_pool, + socket, + http_params)); LOG_INFO(log, "Listening for Prometheus: http://{}", address.toString()); }); diff --git a/programs/server/Server.h b/programs/server/Server.h index c582e4753087fd2311eeb2b5b1a141a3792a6764..fbfc26f6ee582bba1d214fbe05cb648bfaa4f9dc 100644 --- a/programs/server/Server.h +++ b/programs/server/Server.h @@ -51,6 +51,7 @@ public: } void defineOptions(Poco::Util::OptionSet & _options) override; + protected: int run() override; @@ -65,8 +66,6 @@ protected: private: Context * global_context_ptr = nullptr; -private: - Poco::Net::SocketAddress socketBindListen(Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port, [[maybe_unused]] bool secure = false) const; using CreateServerFunc = std::function; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index d370016da00dc3603aa0a0128181c3208a3ec20d..215a13cce1a869c2649204088af1787d139e4b3e 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -181,6 +181,7 @@ add_object_library(clickhouse_storages_mergetree Storages/MergeTree) add_object_library(clickhouse_storages_liveview Storages/LiveView) add_object_library(clickhouse_client Client) add_object_library(clickhouse_server Server) +add_object_library(clickhouse_server_http Server/HTTP) add_object_library(clickhouse_formats Formats) add_object_library(clickhouse_processors Processors) add_object_library(clickhouse_processors_executors Processors/Executors) diff --git a/src/Common/HTMLForm.h b/src/Common/HTMLForm.h deleted file mode 100644 index 2b62167dce7ca3983b8fe38bfe1947b2133b9ba7..0000000000000000000000000000000000000000 --- a/src/Common/HTMLForm.h +++ /dev/null @@ -1,42 +0,0 @@ -#pragma once - -#include -#include -#include -#include - -#include - - -/** Somehow, in case of POST, Poco::Net::HTMLForm doesn't read parameters from URL, only from body. - * This helper allows to read parameters just from URL. - */ -struct HTMLForm : public Poco::Net::HTMLForm -{ - HTMLForm(const Poco::Net::HTTPRequest & request) - { - Poco::URI uri(request.getURI()); - std::istringstream istr(uri.getRawQuery()); // STYLE_CHECK_ALLOW_STD_STRING_STREAM - readUrl(istr); - } - - HTMLForm(const Poco::URI & uri) - { - std::istringstream istr(uri.getRawQuery()); // STYLE_CHECK_ALLOW_STD_STRING_STREAM - readUrl(istr); - } - - - template - T getParsed(const std::string & key, T default_value) - { - auto it = find(key); - return (it != end()) ? DB::parse(it->second) : default_value; - } - - template - T getParsed(const std::string & key) - { - return DB::parse(get(key)); - } -}; diff --git a/src/Common/StringUtils/StringUtils.h b/src/Common/StringUtils/StringUtils.h index 904e3035dd8b9a4992c27402c1f49e342671df6e..cb2227f01a8729b4e62eb3908e0bb73b3dfc5d4d 100644 --- a/src/Common/StringUtils/StringUtils.h +++ b/src/Common/StringUtils/StringUtils.h @@ -120,6 +120,12 @@ inline bool isWhitespaceASCII(char c) return c == ' ' || c == '\t' || c == '\n' || c == '\r' || c == '\f' || c == '\v'; } +/// Since |isWhiteSpaceASCII()| is used inside algorithms it's easier to implement another function than add extra argument. +inline bool isWhitespaceASCIIOneLine(char c) +{ + return c == ' ' || c == '\t' || c == '\r' || c == '\f' || c == '\v'; +} + inline bool isControlASCII(char c) { return static_cast(c) <= 31; diff --git a/src/Common/formatIPv6.h b/src/Common/formatIPv6.h index 63c064b21f81c0fcc860c7d4297917265454fa6c..bd0c68d70f99f4534ea013f601d6405ac30e41aa 100644 --- a/src/Common/formatIPv6.h +++ b/src/Common/formatIPv6.h @@ -85,9 +85,9 @@ inline bool parseIPv6(const char * src, unsigned char * dst) return clear_dst(); unsigned char tmp[IPV6_BINARY_LENGTH]{}; - auto tp = tmp; - auto endp = tp + IPV6_BINARY_LENGTH; - auto curtok = src; + auto * tp = tmp; + auto * endp = tp + IPV6_BINARY_LENGTH; + const auto * curtok = src; auto saw_xdigit = false; UInt32 val{}; unsigned char * colonp = nullptr; @@ -97,14 +97,14 @@ inline bool parseIPv6(const char * src, unsigned char * dst) { const auto num = unhex(ch); - if (num != '\xff') + if (num != u8'\xff') { val <<= 4; val |= num; if (val > 0xffffu) return clear_dst(); - saw_xdigit = 1; + saw_xdigit = true; continue; } @@ -204,7 +204,7 @@ inline void formatIPv4(const unsigned char * src, char *& dst, uint8_t mask_tail for (size_t octet = 0; octet < limit; ++octet) { const uint8_t value = static_cast(src[IPV4_BINARY_LENGTH - octet - 1]); - auto rep = one_byte_to_string_lookup_table[value]; + const auto * rep = one_byte_to_string_lookup_table[value]; const uint8_t len = rep[0]; const char* str = rep + 1; diff --git a/src/Common/hex.h b/src/Common/hex.h index db094e1dfd1cbae3d45e7f01e84389931137b4dc..a1fa7b324658bd02d41538b45ba6358ef45a46e8 100644 --- a/src/Common/hex.h +++ b/src/Common/hex.h @@ -90,12 +90,12 @@ std::string getHexUIntLowercase(TUInt uint_) extern const char * const hex_char_to_digit_table; -inline char unhex(char c) +inline UInt8 unhex(char c) { return hex_char_to_digit_table[static_cast(c)]; } -inline char unhex2(const char * data) +inline UInt8 unhex2(const char * data) { return static_cast(unhex(data[0])) * 0x10 diff --git a/src/Core/ExternalTable.cpp b/src/Core/ExternalTable.cpp index 767ed9599509c67432b11741bda31721574dcc2b..afc9fe00ef541148d8110c98334fc2e7aaf2623e 100644 --- a/src/Core/ExternalTable.cpp +++ b/src/Core/ExternalTable.cpp @@ -125,19 +125,16 @@ ExternalTable::ExternalTable(const boost::program_options::variables_map & exter } -void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header, std::istream & stream) +void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header, ReadBuffer & stream) { const Settings & settings = context.getSettingsRef(); - /// The buffer is initialized here, not in the virtual function initReadBuffer - read_buffer_impl = std::make_unique(stream); - if (settings.http_max_multipart_form_data_size) read_buffer = std::make_unique( - *read_buffer_impl, settings.http_max_multipart_form_data_size, + stream, settings.http_max_multipart_form_data_size, true, "the maximum size of multipart/form-data. This limit can be tuned by 'http_max_multipart_form_data_size' setting"); else - read_buffer = std::move(read_buffer_impl); + read_buffer = wrapReadBufferReference(stream); /// Retrieve a collection of parameters from MessageHeader Poco::Net::NameValueCollection content; diff --git a/src/Core/ExternalTable.h b/src/Core/ExternalTable.h index 0d8e0aaf8acd45ec0d478e70637c49ffa5aa6aee..aa15846d48afcacb6d79d2bf9d3c7b4ae3dda760 100644 --- a/src/Core/ExternalTable.h +++ b/src/Core/ExternalTable.h @@ -1,15 +1,14 @@ #pragma once -#include -#include -#include -#include - -#include - -#include #include +#include #include +#include + +#include +#include +#include +#include namespace Poco @@ -51,7 +50,7 @@ public: std::unique_ptr read_buffer; Block sample_block; - virtual ~BaseExternalTable() {} + virtual ~BaseExternalTable() = default; /// Initialize read_buffer, depending on the data source. By default, does nothing. virtual void initReadBuffer() {} @@ -82,24 +81,23 @@ public: void initReadBuffer() override; /// Extract parameters from variables_map, which is built on the client command line - ExternalTable(const boost::program_options::variables_map & external_options); + explicit ExternalTable(const boost::program_options::variables_map & external_options); }; /// Parsing of external table used when sending tables via http /// The `handlePart` function will be called for each table passed, /// so it's also necessary to call `clean` at the end of the `handlePart`. -class ExternalTablesHandler : public Poco::Net::PartHandler, BaseExternalTable +class ExternalTablesHandler : public HTMLForm::PartHandler, BaseExternalTable { public: ExternalTablesHandler(Context & context_, const Poco::Net::NameValueCollection & params_) : context(context_), params(params_) {} - void handlePart(const Poco::Net::MessageHeader & header, std::istream & stream) override; + void handlePart(const Poco::Net::MessageHeader & header, ReadBuffer & stream) override; private: Context & context; const Poco::Net::NameValueCollection & params; - std::unique_ptr read_buffer_impl; }; diff --git a/src/IO/EmptyReadBuffer.h b/src/IO/EmptyReadBuffer.h new file mode 100644 index 0000000000000000000000000000000000000000..e2189b9943f1e0cc80f13ae6e4def8d4be8394b3 --- /dev/null +++ b/src/IO/EmptyReadBuffer.h @@ -0,0 +1,18 @@ +#pragma once + +#include + +namespace DB +{ + +/// Just a stub - reads nothing from nowhere. +class EmptyReadBuffer : public ReadBuffer +{ +public: + EmptyReadBuffer() : ReadBuffer(nullptr, 0) {} + +private: + bool nextImpl() override { return false; } +}; + +} diff --git a/src/IO/HTTPChunkedReadBuffer.cpp b/src/IO/HTTPChunkedReadBuffer.cpp new file mode 100644 index 0000000000000000000000000000000000000000..bd9bbba4c6c449cd762a81211662bbd30d074ccd --- /dev/null +++ b/src/IO/HTTPChunkedReadBuffer.cpp @@ -0,0 +1,92 @@ +#include + +#include +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int ARGUMENT_OUT_OF_BOUND; + extern const int UNEXPECTED_END_OF_FILE; + extern const int CORRUPTED_DATA; + extern const int TOO_MANY_BYTES; +} + +size_t HTTPChunkedReadBuffer::readChunkHeader() +{ + if (in->eof()) + throw Exception("Unexpected end of file while reading chunk header of HTTP chunked data", ErrorCodes::UNEXPECTED_END_OF_FILE); + + if (!isHexDigit(*in->position())) + throw Exception("Unexpected data instead of HTTP chunk header", ErrorCodes::CORRUPTED_DATA); + + size_t res = 0; + do + { + if (common::mulOverflow(res, 16ul, res) || common::addOverflow(res, unhex(*in->position()), res)) + throw Exception("Chunk size is out of bounds", ErrorCodes::ARGUMENT_OUT_OF_BOUND); + ++in->position(); + } while (!in->eof() && isHexDigit(*in->position())); + + /// NOTE: If we want to read any chunk extensions, it should be done here. + + skipToCarriageReturnOrEOF(*in); + + if (in->eof()) + throw Exception("Unexpected end of file while reading chunk header of HTTP chunked data", ErrorCodes::UNEXPECTED_END_OF_FILE); + + if (res > max_size) + throw Exception("Chunk size is too large", ErrorCodes::TOO_MANY_BYTES); + + assertString("\n", *in); + return res; +} + +void HTTPChunkedReadBuffer::readChunkFooter() +{ + assertString("\r\n", *in); +} + +bool HTTPChunkedReadBuffer::nextImpl() +{ + if (!in) + return false; + + /// The footer of previous chunk. + if (count()) + readChunkFooter(); + + size_t chunk_size = readChunkHeader(); + if (0 == chunk_size) + { + readChunkFooter(); + in.reset(); // prevent double-eof situation. + return false; + } + + if (in->available() >= chunk_size) + { + /// Zero-copy read from input. + working_buffer = Buffer(in->position(), in->position() + chunk_size); + in->position() += chunk_size; + } + else + { + /// Chunk is not completely in buffer, copy it to scratch space. + memory.resize(chunk_size); + in->readStrict(memory.data(), chunk_size); + working_buffer = Buffer(memory.data(), memory.data() + chunk_size); + } + + /// NOTE: We postpone reading the footer to the next iteration, because it may not be completely in buffer, + /// but we need to keep the current data in buffer available. + + return true; +} + +} diff --git a/src/IO/HTTPChunkedReadBuffer.h b/src/IO/HTTPChunkedReadBuffer.h new file mode 100644 index 0000000000000000000000000000000000000000..0ccebc69d08e3135956a5741b336aef1c672c3bc --- /dev/null +++ b/src/IO/HTTPChunkedReadBuffer.h @@ -0,0 +1,25 @@ +#pragma once + +#include +#include + +namespace DB +{ + +/// Reads data with HTTP Chunked Transfer Encoding. +class HTTPChunkedReadBuffer : public BufferWithOwnMemory +{ +public: + HTTPChunkedReadBuffer(std::unique_ptr in_, size_t max_chunk_size) : in(std::move(in_)), max_size(max_chunk_size) {} + +private: + std::unique_ptr in; + const size_t max_size; + + size_t readChunkHeader(); + void readChunkFooter(); + + bool nextImpl() override; +}; + +} diff --git a/src/IO/HTTPCommon.cpp b/src/IO/HTTPCommon.cpp index d12aa10fe6a43ad08fe7f69dc8722abb7e2f3ecd..346bbf0427ef2504411d00800412cd4f3cc17604 100644 --- a/src/IO/HTTPCommon.cpp +++ b/src/IO/HTTPCommon.cpp @@ -1,5 +1,6 @@ #include +#include #include #include #include @@ -23,7 +24,6 @@ # include #endif -#include #include #include @@ -266,7 +266,7 @@ namespace }; } -void setResponseDefaultHeaders(Poco::Net::HTTPServerResponse & response, unsigned keep_alive_timeout) +void setResponseDefaultHeaders(HTTPServerResponse & response, unsigned keep_alive_timeout) { if (!response.getKeepAlive()) return; diff --git a/src/IO/HTTPCommon.h b/src/IO/HTTPCommon.h index 4a81d23a8a36bf6ba52a6f50984d83450cad7f06..18e83abb83b5fb94865cdedde8c0df22dd4ea9c3 100644 --- a/src/IO/HTTPCommon.h +++ b/src/IO/HTTPCommon.h @@ -14,20 +14,13 @@ #include -namespace Poco -{ -namespace Net -{ - class HTTPServerResponse; -} -} - - namespace DB { constexpr int HTTP_TOO_MANY_REQUESTS = 429; +class HTTPServerResponse; + class SingleEndpointHTTPSessionPool : public PoolBase { private: @@ -45,7 +38,7 @@ public: using PooledHTTPSessionPtr = SingleEndpointHTTPSessionPool::Entry; using HTTPSessionPtr = std::shared_ptr; -void setResponseDefaultHeaders(Poco::Net::HTTPServerResponse & response, unsigned keep_alive_timeout); +void setResponseDefaultHeaders(HTTPServerResponse & response, unsigned keep_alive_timeout); /// Create session object to perform requests and set required parameters. HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, bool resolve_host = true); @@ -54,7 +47,7 @@ HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size, bool resolve_host = true); PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const Poco::URI & proxy_uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size, bool resolve_host = true); -bool isRedirect(const Poco::Net::HTTPResponse::HTTPStatus status); +bool isRedirect(Poco::Net::HTTPResponse::HTTPStatus status); /** Used to receive response (response headers and possibly body) * after sending data (request headers and possibly body). @@ -65,5 +58,5 @@ std::istream * receiveResponse( Poco::Net::HTTPClientSession & session, const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response, bool allow_redirects); void assertResponseIsOk( - const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response, std::istream & istr, const bool allow_redirects = false); + const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response, std::istream & istr, bool allow_redirects = false); } diff --git a/src/IO/LimitReadBuffer.cpp b/src/IO/LimitReadBuffer.cpp index baa9e4876883d890ccc56c786bdd1ce0e6ba1775..9daffa3a1d3674f241dd63b00fa6bffc43d00912 100644 --- a/src/IO/LimitReadBuffer.cpp +++ b/src/IO/LimitReadBuffer.cpp @@ -14,10 +14,10 @@ namespace ErrorCodes bool LimitReadBuffer::nextImpl() { - assert(position() >= in.position()); + assert(position() >= in->position()); /// Let underlying buffer calculate read bytes in `next()` call. - in.position() = position(); + in->position() = position(); if (bytes >= limit) { @@ -27,13 +27,13 @@ bool LimitReadBuffer::nextImpl() return false; } - if (!in.next()) + if (!in->next()) { - working_buffer = in.buffer(); + working_buffer = in->buffer(); return false; } - working_buffer = in.buffer(); + working_buffer = in->buffer(); if (limit - bytes < working_buffer.size()) working_buffer.resize(limit - bytes); @@ -42,14 +42,33 @@ bool LimitReadBuffer::nextImpl() } -LimitReadBuffer::LimitReadBuffer(ReadBuffer & in_, UInt64 limit_, bool throw_exception_, std::string exception_message_) - : ReadBuffer(in_.position(), 0), in(in_), limit(limit_), throw_exception(throw_exception_), exception_message(std::move(exception_message_)) +LimitReadBuffer::LimitReadBuffer(ReadBuffer * in_, bool owns, UInt64 limit_, bool throw_exception_, std::string exception_message_) + : ReadBuffer(in_ ? in_->position() : nullptr, 0) + , in(in_) + , owns_in(owns) + , limit(limit_) + , throw_exception(throw_exception_) + , exception_message(std::move(exception_message_)) { - size_t remaining_bytes_in_buffer = in.buffer().end() - in.position(); + assert(in); + + size_t remaining_bytes_in_buffer = in->buffer().end() - in->position(); if (remaining_bytes_in_buffer > limit) remaining_bytes_in_buffer = limit; - working_buffer = Buffer(in.position(), in.position() + remaining_bytes_in_buffer); + working_buffer = Buffer(in->position(), in->position() + remaining_bytes_in_buffer); +} + + +LimitReadBuffer::LimitReadBuffer(ReadBuffer & in_, UInt64 limit_, bool throw_exception_, std::string exception_message_) + : LimitReadBuffer(&in_, false, limit_, throw_exception_, exception_message_) +{ +} + + +LimitReadBuffer::LimitReadBuffer(std::unique_ptr in_, UInt64 limit_, bool throw_exception_, std::string exception_message_) + : LimitReadBuffer(in_.release(), true, limit_, throw_exception_, exception_message_) +{ } @@ -57,7 +76,10 @@ LimitReadBuffer::~LimitReadBuffer() { /// Update underlying buffer's position in case when limit wasn't reached. if (!working_buffer.empty()) - in.position() = position(); + in->position() = position(); + + if (owns_in) + delete in; } } diff --git a/src/IO/LimitReadBuffer.h b/src/IO/LimitReadBuffer.h index db3d2684ef72791fff75919767ac93e526090492..a5fa0f0d5cc059822f4a4c259c497c2a0031a1da 100644 --- a/src/IO/LimitReadBuffer.h +++ b/src/IO/LimitReadBuffer.h @@ -12,17 +12,22 @@ namespace DB */ class LimitReadBuffer : public ReadBuffer { +public: + LimitReadBuffer(ReadBuffer & in_, UInt64 limit_, bool throw_exception_, std::string exception_message_ = {}); + LimitReadBuffer(std::unique_ptr in_, UInt64 limit_, bool throw_exception_, std::string exception_message_ = {}); + ~LimitReadBuffer() override; + private: - ReadBuffer & in; + ReadBuffer * in; + bool owns_in; + UInt64 limit; bool throw_exception; std::string exception_message; - bool nextImpl() override; + LimitReadBuffer(ReadBuffer * in_, bool owns, UInt64 limit_, bool throw_exception_, std::string exception_message_); -public: - LimitReadBuffer(ReadBuffer & in_, UInt64 limit_, bool throw_exception_, std::string exception_message_ = {}); - ~LimitReadBuffer() override; + bool nextImpl() override; }; } diff --git a/src/IO/PeekableReadBuffer.cpp b/src/IO/PeekableReadBuffer.cpp index e0e99afbfecc34d96706801b2667a7b9e59364bc..1d999d586b273c05e59c54c67b9e39b37db56bfb 100644 --- a/src/IO/PeekableReadBuffer.cpp +++ b/src/IO/PeekableReadBuffer.cpp @@ -1,7 +1,9 @@ #include + namespace DB { + namespace ErrorCodes { extern const int LOGICAL_ERROR; @@ -107,22 +109,29 @@ bool PeekableReadBuffer::peekNext() return sub_buf.next(); } -void PeekableReadBuffer::rollbackToCheckpoint() +void PeekableReadBuffer::rollbackToCheckpoint(bool drop) { checkStateCorrect(); + if (!checkpoint) throw DB::Exception("There is no checkpoint", ErrorCodes::LOGICAL_ERROR); else if (checkpointInOwnMemory() == currentlyReadFromOwnMemory()) pos = *checkpoint; else /// Checkpoint is in own memory and pos is not. Switch to reading from own memory BufferBase::set(memory.data(), peeked_size, *checkpoint - memory.data()); + + if (drop) + dropCheckpoint(); + checkStateCorrect(); } bool PeekableReadBuffer::nextImpl() { - /// FIXME wrong bytes count because it can read the same data again after rollbackToCheckpoint() - /// However, changing bytes count on every call of next() (even after rollback) allows to determine if some pointers were invalidated. + /// FIXME: wrong bytes count because it can read the same data again after rollbackToCheckpoint() + /// however, changing bytes count on every call of next() (even after rollback) allows to determine + /// if some pointers were invalidated. + checkStateCorrect(); bool res; @@ -138,7 +147,7 @@ bool PeekableReadBuffer::nextImpl() if (useSubbufferOnly()) { /// Load next data to sub_buf - sub_buf.position() = pos; + sub_buf.position() = position(); res = sub_buf.next(); } else diff --git a/src/IO/PeekableReadBuffer.h b/src/IO/PeekableReadBuffer.h index e425f9bc9534ce2bf265e71d5931bd0a19cb031d..4f6e669b31d3bcf7647a82847815c10c5e085401 100644 --- a/src/IO/PeekableReadBuffer.h +++ b/src/IO/PeekableReadBuffer.h @@ -58,7 +58,7 @@ public: /// Sets position at checkpoint. /// All pointers (such as this->buffer().end()) may be invalidated - void rollbackToCheckpoint(); + void rollbackToCheckpoint(bool drop = false); /// If checkpoint and current position are in different buffers, appends data from sub-buffer to own memory, /// so data between checkpoint and position will be in continuous memory. diff --git a/src/IO/ReadBuffer.h b/src/IO/ReadBuffer.h index 5cbe04f8348e87e82afd06693a66e9b7e93c65a9..e3166ba81805d13884b4886fe484efc9f0d7b19e 100644 --- a/src/IO/ReadBuffer.h +++ b/src/IO/ReadBuffer.h @@ -134,15 +134,27 @@ public: tryIgnore(std::numeric_limits::max()); } - /** Reads a single byte. */ - bool ALWAYS_INLINE read(char & c) + /// Peeks a single byte. + bool ALWAYS_INLINE peek(char & c) { if (eof()) return false; - c = *pos++; + c = *pos; return true; } + /// Reads a single byte. + bool ALWAYS_INLINE read(char & c) + { + if (peek(c)) + { + ++pos; + return true; + } + + return false; + } + void ALWAYS_INLINE readStrict(char & c) { if (read(c)) @@ -207,5 +219,39 @@ private: using ReadBufferPtr = std::shared_ptr; +/// Due to inconsistencies in ReadBuffer-family interfaces: +/// - some require to fully wrap underlying buffer and own it, +/// - some just wrap the reference without ownership, +/// we need to be able to wrap reference-only buffers with movable transparent proxy-buffer. +/// The uniqueness of such wraps is responsibility of the code author. +inline std::unique_ptr wrapReadBufferReference(ReadBuffer & buf) +{ + class ReadBufferWrapper : public ReadBuffer + { + public: + explicit ReadBufferWrapper(ReadBuffer & buf_) : ReadBuffer(buf_.position(), 0), buf(buf_) + { + working_buffer = Buffer(buf.position(), buf.buffer().end()); + } + + private: + ReadBuffer & buf; + + bool nextImpl() override + { + buf.position() = position(); + + if (!buf.next()) + return false; + + working_buffer = buf.buffer(); + + return true; + } + }; + + return std::make_unique(buf); +} + } diff --git a/src/IO/ReadBufferFromPocoSocket.cpp b/src/IO/ReadBufferFromPocoSocket.cpp index 2c13446e693d8fa02209371e260dd38dee7c739e..59f0dc25667f488543729d5e169f71aa29ec3f17 100644 --- a/src/IO/ReadBufferFromPocoSocket.cpp +++ b/src/IO/ReadBufferFromPocoSocket.cpp @@ -78,7 +78,7 @@ ReadBufferFromPocoSocket::ReadBufferFromPocoSocket(Poco::Net::Socket & socket_, { } -bool ReadBufferFromPocoSocket::poll(size_t timeout_microseconds) +bool ReadBufferFromPocoSocket::poll(size_t timeout_microseconds) const { return available() || socket.poll(timeout_microseconds, Poco::Net::Socket::SELECT_READ | Poco::Net::Socket::SELECT_ERROR); } diff --git a/src/IO/ReadBufferFromPocoSocket.h b/src/IO/ReadBufferFromPocoSocket.h index 8064cd392468d9d3f70598b3112f85f76aca42de..d182d48d1f8a8ab6ccbe90d8a7d8b288cde33db5 100644 --- a/src/IO/ReadBufferFromPocoSocket.h +++ b/src/IO/ReadBufferFromPocoSocket.h @@ -1,15 +1,14 @@ #pragma once -#include - -#include #include +#include + +#include namespace DB { -/** Works with the ready Poco::Net::Socket. Blocking operations. - */ +/// Works with the ready Poco::Net::Socket. Blocking operations. class ReadBufferFromPocoSocket : public BufferWithOwnMemory { protected: @@ -24,9 +23,9 @@ protected: bool nextImpl() override; public: - ReadBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE); + explicit ReadBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE); - bool poll(size_t timeout_microseconds); + bool poll(size_t timeout_microseconds) const; void setAsyncCallback(std::function async_callback_) { async_callback = std::move(async_callback_); } diff --git a/src/IO/ReadHelpers.cpp b/src/IO/ReadHelpers.cpp index baa12297718a174f8cd3a14fc89823eac706703c..fe563021d2effd1556d0d4ba66321a96cce44007 100644 --- a/src/IO/ReadHelpers.cpp +++ b/src/IO/ReadHelpers.cpp @@ -1050,6 +1050,25 @@ void readAndThrowException(ReadBuffer & buf, const String & additional_message) } +void skipToCarriageReturnOrEOF(ReadBuffer & buf) +{ + while (!buf.eof()) + { + char * next_pos = find_first_symbols<'\r'>(buf.position(), buf.buffer().end()); + buf.position() = next_pos; + + if (!buf.hasPendingData()) + continue; + + if (*buf.position() == '\r') + { + ++buf.position(); + return; + } + } +} + + void skipToNextLineOrEOF(ReadBuffer & buf) { while (!buf.eof()) diff --git a/src/IO/ReadHelpers.h b/src/IO/ReadHelpers.h index 4482667f44775a7a8475594bade93bd0638eab55..d203bd7bbeef4d720a76d873a50656c01d68abba 100644 --- a/src/IO/ReadHelpers.h +++ b/src/IO/ReadHelpers.h @@ -536,7 +536,7 @@ void parseUUID(const UInt8 * src36, std::reverse_iterator dst16); void parseUUIDWithoutSeparator(const UInt8 * src36, std::reverse_iterator dst16); template -void formatHex(IteratorSrc src, IteratorDst dst, const size_t num_bytes); +void formatHex(IteratorSrc src, IteratorDst dst, size_t num_bytes); template @@ -1046,10 +1046,14 @@ void readText(std::vector & x, ReadBuffer & buf) /// Skip whitespace characters. -inline void skipWhitespaceIfAny(ReadBuffer & buf) +inline void skipWhitespaceIfAny(ReadBuffer & buf, bool one_line = false) { - while (!buf.eof() && isWhitespaceASCII(*buf.position())) - ++buf.position(); + if (!one_line) + while (!buf.eof() && isWhitespaceASCII(*buf.position())) + ++buf.position(); + else + while (!buf.eof() && isWhitespaceASCIIOneLine(*buf.position())) + ++buf.position(); } /// Skips json value. @@ -1212,6 +1216,9 @@ inline void skipBOMIfExists(ReadBuffer & buf) /// Skip to next character after next \n. If no \n in stream, skip to end. void skipToNextLineOrEOF(ReadBuffer & buf); +/// Skip to next character after next \r. If no \r in stream, skip to end. +void skipToCarriageReturnOrEOF(ReadBuffer & buf); + /// Skip to next character after next unescaped \n. If no \n in stream, skip to end. Does not throw on invalid escape sequences. void skipToUnescapedNextLineOrEOF(ReadBuffer & buf); diff --git a/src/IO/ya.make b/src/IO/ya.make index 2ef8bd0a98649ea1a60ecf70e0b1871e6d5217fa..980719aa74fff14ef53ceb48986225356adbf65e 100644 --- a/src/IO/ya.make +++ b/src/IO/ya.make @@ -26,6 +26,7 @@ SRCS( CascadeWriteBuffer.cpp CompressionMethod.cpp DoubleConverter.cpp + HTTPChunkedReadBuffer.cpp HTTPCommon.cpp HashingWriteBuffer.cpp HexWriteBuffer.cpp @@ -56,7 +57,6 @@ SRCS( WriteBufferFromFileDescriptor.cpp WriteBufferFromFileDescriptorDiscardOnFailure.cpp WriteBufferFromHTTP.cpp - WriteBufferFromHTTPServerResponse.cpp WriteBufferFromOStream.cpp WriteBufferFromPocoSocket.cpp WriteBufferFromTemporaryFile.cpp diff --git a/src/Interpreters/InterserverIOHandler.h b/src/Interpreters/InterserverIOHandler.h index 6d62c9651cac9c7839d50451afc3c93fa0f897a2..db95a00d0f7f604bb077b88d3069c4b2260653d6 100644 --- a/src/Interpreters/InterserverIOHandler.h +++ b/src/Interpreters/InterserverIOHandler.h @@ -8,13 +8,13 @@ #include #include #include -#include -#include -#include -#include + #include -namespace Poco { namespace Net { class HTTPServerResponse; } } +#include +#include +#include +#include namespace DB { @@ -25,13 +25,16 @@ namespace ErrorCodes extern const int NO_SUCH_INTERSERVER_IO_ENDPOINT; } +class HTMLForm; +class HTTPServerResponse; + /** Query processor from other servers. */ class InterserverIOEndpoint { public: virtual std::string getId(const std::string & path) const = 0; - virtual void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) = 0; + virtual void processQuery(const HTMLForm & params, ReadBuffer & body, WriteBuffer & out, HTTPServerResponse & response) = 0; virtual ~InterserverIOEndpoint() = default; /// You need to stop the data transfer if blocker is activated. diff --git a/src/Server/HTTP/HTMLForm.cpp b/src/Server/HTTP/HTMLForm.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ca407858c338922670f8d3f40a0d4ae928a43a9d --- /dev/null +++ b/src/Server/HTTP/HTMLForm.cpp @@ -0,0 +1,381 @@ +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + + +namespace DB +{ + +namespace +{ + +class NullPartHandler : public HTMLForm::PartHandler +{ +public: + void handlePart(const Poco::Net::MessageHeader &, ReadBuffer &) override {} +}; + +} + +const std::string HTMLForm::ENCODING_URL = "application/x-www-form-urlencoded"; +const std::string HTMLForm::ENCODING_MULTIPART = "multipart/form-data"; +const int HTMLForm::UNKNOWN_CONTENT_LENGTH = -1; + + +HTMLForm::HTMLForm() : field_limit(DFL_FIELD_LIMIT), value_length_limit(DFL_MAX_VALUE_LENGTH), encoding(ENCODING_URL) +{ +} + + +HTMLForm::HTMLForm(const std::string & encoding_) + : field_limit(DFL_FIELD_LIMIT), value_length_limit(DFL_MAX_VALUE_LENGTH), encoding(encoding_) +{ +} + + +HTMLForm::HTMLForm(const Poco::Net::HTTPRequest & request, ReadBuffer & requestBody, PartHandler & handler) + : field_limit(DFL_FIELD_LIMIT), value_length_limit(DFL_MAX_VALUE_LENGTH) +{ + load(request, requestBody, handler); +} + + +HTMLForm::HTMLForm(const Poco::Net::HTTPRequest & request, ReadBuffer & requestBody) + : field_limit(DFL_FIELD_LIMIT), value_length_limit(DFL_MAX_VALUE_LENGTH) +{ + load(request, requestBody); +} + + +HTMLForm::HTMLForm(const Poco::Net::HTTPRequest & request) : HTMLForm(Poco::URI(request.getURI())) +{ +} + +HTMLForm::HTMLForm(const Poco::URI & uri) : field_limit(DFL_FIELD_LIMIT), value_length_limit(DFL_MAX_VALUE_LENGTH) +{ + ReadBufferFromString istr(uri.getRawQuery()); // STYLE_CHECK_ALLOW_STD_STRING_STREAM + readQuery(istr); +} + + +void HTMLForm::setEncoding(const std::string & encoding_) +{ + encoding = encoding_; +} + + +void HTMLForm::addPart(const std::string & name, Poco::Net::PartSource * source) +{ + poco_check_ptr(source); + + Part part; + part.name = name; + part.source = std::unique_ptr(source); + parts.push_back(std::move(part)); +} + + +void HTMLForm::load(const Poco::Net::HTTPRequest & request, ReadBuffer & requestBody, PartHandler & handler) +{ + clear(); + + Poco::URI uri(request.getURI()); + const std::string & query = uri.getRawQuery(); + if (!query.empty()) + { + ReadBufferFromString istr(query); + readQuery(istr); + } + + if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST || request.getMethod() == Poco::Net::HTTPRequest::HTTP_PUT) + { + std::string media_type; + NameValueCollection params; + Poco::Net::MessageHeader::splitParameters(request.getContentType(), media_type, params); + encoding = media_type; + if (encoding == ENCODING_MULTIPART) + { + boundary = params["boundary"]; + readMultipart(requestBody, handler); + } + else + { + readQuery(requestBody); + } + } +} + + +void HTMLForm::load(const Poco::Net::HTTPRequest & request, ReadBuffer & requestBody) +{ + NullPartHandler nah; + load(request, requestBody, nah); +} + + +void HTMLForm::load(const Poco::Net::HTTPRequest & request) +{ + NullPartHandler nah; + EmptyReadBuffer nis; + load(request, nis, nah); +} + + +void HTMLForm::read(ReadBuffer & in, PartHandler & handler) +{ + if (encoding == ENCODING_URL) + readQuery(in); + else + readMultipart(in, handler); +} + + +void HTMLForm::read(ReadBuffer & in) +{ + readQuery(in); +} + + +void HTMLForm::read(const std::string & queryString) +{ + ReadBufferFromString istr(queryString); + readQuery(istr); +} + + +void HTMLForm::readQuery(ReadBuffer & in) +{ + size_t fields = 0; + char ch = 0; // silence "uninitialized" warning from gcc-* + bool is_first = true; + + while (true) + { + if (field_limit > 0 && fields == field_limit) + throw Poco::Net::HTMLFormException("Too many form fields"); + + std::string name; + std::string value; + + while (in.read(ch) && ch != '=' && ch != '&') + { + if (ch == '+') + ch = ' '; + if (name.size() < MAX_NAME_LENGTH) + name += ch; + else + throw Poco::Net::HTMLFormException("Field name too long"); + } + + if (ch == '=') + { + while (in.read(ch) && ch != '&') + { + if (ch == '+') + ch = ' '; + if (value.size() < value_length_limit) + value += ch; + else + throw Poco::Net::HTMLFormException("Field value too long"); + } + } + + // Remove UTF-8 BOM from first name, if present + if (is_first) + Poco::UTF8::removeBOM(name); + + std::string decoded_name; + std::string decoded_value; + Poco::URI::decode(name, decoded_name); + Poco::URI::decode(value, decoded_value); + add(decoded_name, decoded_value); + ++fields; + + is_first = false; + + if (in.eof()) + break; + } +} + + +void HTMLForm::readMultipart(ReadBuffer & in_, PartHandler & handler) +{ + /// Assume there is always a boundary provided. + assert(!boundary.empty()); + + size_t fields = 0; + MultipartReadBuffer in(in_, boundary); + + /// Assume there is at least one part + in.skipToNextBoundary(); + + /// Read each part until next boundary (or last boundary) + while (!in.eof()) + { + if (field_limit && fields > field_limit) + throw Poco::Net::HTMLFormException("Too many form fields"); + + Poco::Net::MessageHeader header; + readHeaders(header, in); + skipToNextLineOrEOF(in); + + NameValueCollection params; + if (header.has("Content-Disposition")) + { + std::string unused; + Poco::Net::MessageHeader::splitParameters(header.get("Content-Disposition"), unused, params); + } + + if (params.has("filename")) + handler.handlePart(header, in); + else + { + std::string name = params["name"]; + std::string value; + char ch; + + while (in.read(ch)) + { + if (value.size() > value_length_limit) + throw Poco::Net::HTMLFormException("Field value too long"); + value += ch; + } + + add(name, value); + } + + ++fields; + + /// If we already encountered EOF for the buffer |in|, it's possible that the next symbol is a start of boundary line. + /// In this case reading the boundary line will reset the EOF state, potentially breaking invariant of EOF idempotency - + /// if there is such invariant in the first place. + if (!in.skipToNextBoundary()) + break; + } +} + + +void HTMLForm::setFieldLimit(int limit) +{ + poco_assert(limit >= 0); + + field_limit = limit; +} + + +void HTMLForm::setValueLengthLimit(int limit) +{ + poco_assert(limit >= 0); + + value_length_limit = limit; +} + + +HTMLForm::MultipartReadBuffer::MultipartReadBuffer(ReadBuffer & in_, const std::string & boundary_) + : ReadBuffer(nullptr, 0), in(in_), boundary("--" + boundary_) +{ + /// For consistency with |nextImpl()| + position() = in.position(); +} + +bool HTMLForm::MultipartReadBuffer::skipToNextBoundary() +{ + assert(working_buffer.empty() || eof()); + assert(boundary_hit); + + boundary_hit = false; + + while (!in.eof()) + { + auto line = readLine(); + if (startsWith(line, boundary)) + { + set(in.position(), 0); + next(); /// We need to restrict our buffer to size of next available line. + return !startsWith(line, boundary + "--"); + } + } + + throw Poco::Net::HTMLFormException("No boundary line found"); +} + +std::string HTMLForm::MultipartReadBuffer::readLine(bool strict) +{ + std::string line; + char ch = 0; // silence "uninitialized" warning from gcc-* + + while (in.read(ch) && ch != '\r' && ch != '\n') + line += ch; + + if (in.eof()) + { + if (strict) + throw Poco::Net::HTMLFormException("Unexpected end of message"); + return line; + } + + line += ch; + + if (ch == '\r') + { + if (!in.read(ch) || ch != '\n') + throw Poco::Net::HTMLFormException("No CRLF found"); + else + line += ch; + } + + return line; +} + +bool HTMLForm::MultipartReadBuffer::nextImpl() +{ + if (boundary_hit) + return false; + + assert(position() >= in.position()); + + in.position() = position(); + + /// We expect to start from the first symbol after EOL, so we can put checkpoint + /// and safely try to read til the next EOL and check for boundary. + in.setCheckpoint(); + + /// FIXME: there is an extra copy because we cannot traverse PeekableBuffer from checkpoint to position() + /// since it may store different data parts in different sub-buffers, + /// anyway calling makeContinuousMemoryFromCheckpointToPos() will also make an extra copy. + std::string line = readLine(false); + + /// According to RFC2046 the preceding CRLF is a part of boundary line. + if (line == "\r\n") + { + line = readLine(false); + boundary_hit = startsWith(line, boundary); + if (!boundary_hit) line = "\r\n"; + } + else + boundary_hit = startsWith(line, boundary); + + in.rollbackToCheckpoint(true); + + /// Rolling back to checkpoint may change underlying buffers. + /// Limit readable data to a single line. + BufferBase::set(in.position(), line.size(), 0); + + return !boundary_hit && !line.empty(); +} + +} diff --git a/src/Server/HTTP/HTMLForm.h b/src/Server/HTTP/HTMLForm.h new file mode 100644 index 0000000000000000000000000000000000000000..27be712e1d5c576a19deb41ea6dcd83afb15ba2a --- /dev/null +++ b/src/Server/HTTP/HTMLForm.h @@ -0,0 +1,175 @@ +#pragma once + +#include +#include + +#include +#include +#include +#include +#include + +namespace DB +{ + +class HTMLForm : public Poco::Net::NameValueCollection, private boost::noncopyable +{ +public: + class PartHandler; + + enum Options + { + OPT_USE_CONTENT_LENGTH = 0x01 // don't use Chunked Transfer-Encoding for multipart requests. + }; + + /// Creates an empty HTMLForm and sets the + /// encoding to "application/x-www-form-urlencoded". + HTMLForm(); + + /// Creates an empty HTMLForm that uses the given encoding. + /// Encoding must be either "application/x-www-form-urlencoded" (which is the default) or "multipart/form-data". + explicit HTMLForm(const std::string & encoding); + + /// Creates a HTMLForm from the given HTTP request. + /// Uploaded files are passed to the given PartHandler. + HTMLForm(const Poco::Net::HTTPRequest & request, ReadBuffer & requestBody, PartHandler & handler); + + /// Creates a HTMLForm from the given HTTP request. + /// Uploaded files are silently discarded. + HTMLForm(const Poco::Net::HTTPRequest & request, ReadBuffer & requestBody); + + /// Creates a HTMLForm from the given HTTP request. + /// The request must be a GET request and the form data must be in the query string (URL encoded). + /// For POST requests, you must use one of the constructors taking an additional input stream for the request body. + explicit HTMLForm(const Poco::Net::HTTPRequest & request); + + explicit HTMLForm(const Poco::URI & uri); + + template + T getParsed(const std::string & key, T default_value) + { + auto it = find(key); + return (it != end()) ? DB::parse(it->second) : default_value; + } + + template + T getParsed(const std::string & key) + { + return DB::parse(get(key)); + } + + /// Sets the encoding used for posting the form. + /// Encoding must be either "application/x-www-form-urlencoded" (which is the default) or "multipart/form-data". + void setEncoding(const std::string & encoding); + + /// Returns the encoding used for posting the form. + const std::string & getEncoding() const { return encoding; } + + /// Adds an part/attachment (file upload) to the form. + /// The form takes ownership of the PartSource and deletes it when it is no longer needed. + /// The part will only be sent if the encoding set for the form is "multipart/form-data" + void addPart(const std::string & name, Poco::Net::PartSource * pSource); + + /// Reads the form data from the given HTTP request. + /// Uploaded files are passed to the given PartHandler. + void load(const Poco::Net::HTTPRequest & request, ReadBuffer & requestBody, PartHandler & handler); + + /// Reads the form data from the given HTTP request. + /// Uploaded files are silently discarded. + void load(const Poco::Net::HTTPRequest & request, ReadBuffer & requestBody); + + /// Reads the form data from the given HTTP request. + /// The request must be a GET request and the form data must be in the query string (URL encoded). + /// For POST requests, you must use one of the overloads taking an additional input stream for the request body. + void load(const Poco::Net::HTTPRequest & request); + + /// Reads the form data from the given input stream. + /// The form data read from the stream must be in the encoding specified for the form. + /// Note that read() does not clear the form before reading the new values. + void read(ReadBuffer & in, PartHandler & handler); + + /// Reads the URL-encoded form data from the given input stream. + /// Note that read() does not clear the form before reading the new values. + void read(ReadBuffer & in); + + /// Reads the form data from the given HTTP query string. + /// Note that read() does not clear the form before reading the new values. + void read(const std::string & queryString); + + /// Returns the MIME boundary used for writing multipart form data. + const std::string & getBoundary() const { return boundary; } + + /// Returns the maximum number of header fields allowed. + /// See setFieldLimit() for more information. + int getFieldLimit() const { return field_limit; } + + /// Sets the maximum number of header fields allowed. This limit is used to defend certain kinds of denial-of-service attacks. + /// Specify 0 for unlimited (not recommended). The default limit is 100. + void setFieldLimit(int limit); + + /// Sets the maximum size for form field values stored as strings. + void setValueLengthLimit(int limit); + + /// Returns the maximum size for form field values stored as strings. + int getValueLengthLimit() const { return value_length_limit; } + + static const std::string ENCODING_URL; /// "application/x-www-form-urlencoded" + static const std::string ENCODING_MULTIPART; /// "multipart/form-data" + static const int UNKNOWN_CONTENT_LENGTH; + +protected: + void readQuery(ReadBuffer & in); + void readMultipart(ReadBuffer & in, PartHandler & handler); + +private: + /// This buffer provides data line by line to check for boundary line in a convenient way. + class MultipartReadBuffer; + + enum Limits + { + DFL_FIELD_LIMIT = 100, + MAX_NAME_LENGTH = 1024, + DFL_MAX_VALUE_LENGTH = 256 * 1024 + }; + + struct Part + { + std::string name; + std::unique_ptr source; + }; + + using PartVec = std::vector; + + size_t field_limit; + size_t value_length_limit; + std::string encoding; + std::string boundary; + PartVec parts; +}; + +class HTMLForm::PartHandler +{ +public: + virtual ~PartHandler() = default; + virtual void handlePart(const Poco::Net::MessageHeader &, ReadBuffer &) = 0; +}; + +class HTMLForm::MultipartReadBuffer : public ReadBuffer +{ +public: + MultipartReadBuffer(ReadBuffer & in, const std::string & boundary); + + /// Returns false if last boundary found. + bool skipToNextBoundary(); + +private: + PeekableReadBuffer in; + const std::string boundary; + bool boundary_hit = true; + + std::string readLine(bool strict = true); + + bool nextImpl() override; +}; + +} diff --git a/src/Server/HTTP/HTTPRequest.h b/src/Server/HTTP/HTTPRequest.h new file mode 100644 index 0000000000000000000000000000000000000000..40839cbcdd238f55877dfbd1c1eac14abb56ccb7 --- /dev/null +++ b/src/Server/HTTP/HTTPRequest.h @@ -0,0 +1,10 @@ +#pragma once + +#include + +namespace DB +{ + +using HTTPRequest = Poco::Net::HTTPRequest; + +} diff --git a/src/Server/HTTP/HTTPRequestHandler.h b/src/Server/HTTP/HTTPRequestHandler.h new file mode 100644 index 0000000000000000000000000000000000000000..19340866bb7d545d3d7245687a3cb80bd4f9a413 --- /dev/null +++ b/src/Server/HTTP/HTTPRequestHandler.h @@ -0,0 +1,19 @@ +#pragma once + +#include +#include + +#include + +namespace DB +{ + +class HTTPRequestHandler : private boost::noncopyable +{ +public: + virtual ~HTTPRequestHandler() = default; + + virtual void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) = 0; +}; + +} diff --git a/src/Server/HTTP/HTTPRequestHandlerFactory.h b/src/Server/HTTP/HTTPRequestHandlerFactory.h new file mode 100644 index 0000000000000000000000000000000000000000..3d50bf0a2ed2f59d2574ef9ce98307dc69b0a0fd --- /dev/null +++ b/src/Server/HTTP/HTTPRequestHandlerFactory.h @@ -0,0 +1,20 @@ +#pragma once + +#include + +#include + +namespace DB +{ + +class HTTPRequestHandlerFactory : private boost::noncopyable +{ +public: + virtual ~HTTPRequestHandlerFactory() = default; + + virtual std::unique_ptr createRequestHandler(const HTTPServerRequest & request) = 0; +}; + +using HTTPRequestHandlerFactoryPtr = std::shared_ptr; + +} diff --git a/src/Server/HTTP/HTTPResponse.h b/src/Server/HTTP/HTTPResponse.h new file mode 100644 index 0000000000000000000000000000000000000000..c73bcec6c3907631be16004ee575a63f56b6ccbc --- /dev/null +++ b/src/Server/HTTP/HTTPResponse.h @@ -0,0 +1,10 @@ +#pragma once + +#include + +namespace DB +{ + +using HTTPResponse = Poco::Net::HTTPResponse; + +} diff --git a/src/Server/HTTP/HTTPServer.cpp b/src/Server/HTTP/HTTPServer.cpp new file mode 100644 index 0000000000000000000000000000000000000000..3e050080bdd54ac3a255160c374c26496de8bfd5 --- /dev/null +++ b/src/Server/HTTP/HTTPServer.cpp @@ -0,0 +1,48 @@ +#include + +#include + + +namespace DB +{ +HTTPServer::HTTPServer( + const Context & context, + HTTPRequestHandlerFactoryPtr factory_, + UInt16 portNumber, + Poco::Net::HTTPServerParams::Ptr params) + : TCPServer(new HTTPServerConnectionFactory(context, params, factory_), portNumber, params), factory(factory_) +{ +} + +HTTPServer::HTTPServer( + const Context & context, + HTTPRequestHandlerFactoryPtr factory_, + const Poco::Net::ServerSocket & socket, + Poco::Net::HTTPServerParams::Ptr params) + : TCPServer(new HTTPServerConnectionFactory(context, params, factory_), socket, params), factory(factory_) +{ +} + +HTTPServer::HTTPServer( + const Context & context, + HTTPRequestHandlerFactoryPtr factory_, + Poco::ThreadPool & threadPool, + const Poco::Net::ServerSocket & socket, + Poco::Net::HTTPServerParams::Ptr params) + : TCPServer(new HTTPServerConnectionFactory(context, params, factory_), threadPool, socket, params), factory(factory_) +{ +} + +HTTPServer::~HTTPServer() +{ + /// We should call stop and join thread here instead of destructor of parent TCPHandler, + /// because there's possible race on 'vptr' between this virtual destructor and 'run' method. + stop(); +} + +void HTTPServer::stopAll(bool /* abortCurrent */) +{ + stop(); +} + +} diff --git a/src/Server/HTTP/HTTPServer.h b/src/Server/HTTP/HTTPServer.h new file mode 100644 index 0000000000000000000000000000000000000000..1ce62c65ca278b5a9222a9081ba95a9f07ac7d25 --- /dev/null +++ b/src/Server/HTTP/HTTPServer.h @@ -0,0 +1,46 @@ +#pragma once + +#include + +#include +#include + +#include + + +namespace DB +{ + +class Context; + +class HTTPServer : public Poco::Net::TCPServer +{ +public: + explicit HTTPServer( + const Context & context, + HTTPRequestHandlerFactoryPtr factory, + UInt16 portNumber = 80, + Poco::Net::HTTPServerParams::Ptr params = new Poco::Net::HTTPServerParams); + + HTTPServer( + const Context & context, + HTTPRequestHandlerFactoryPtr factory, + const Poco::Net::ServerSocket & socket, + Poco::Net::HTTPServerParams::Ptr params); + + HTTPServer( + const Context & context, + HTTPRequestHandlerFactoryPtr factory, + Poco::ThreadPool & threadPool, + const Poco::Net::ServerSocket & socket, + Poco::Net::HTTPServerParams::Ptr params); + + ~HTTPServer() override; + + void stopAll(bool abortCurrent = false); + +private: + HTTPRequestHandlerFactoryPtr factory; +}; + +} diff --git a/src/Server/HTTP/HTTPServerConnection.cpp b/src/Server/HTTP/HTTPServerConnection.cpp new file mode 100644 index 0000000000000000000000000000000000000000..e2ee4c8882bd56f5d6297dcae1968aa6cf739dcc --- /dev/null +++ b/src/Server/HTTP/HTTPServerConnection.cpp @@ -0,0 +1,128 @@ +#include + +#include + +namespace DB +{ + +HTTPServerConnection::HTTPServerConnection( + const Context & context_, + const Poco::Net::StreamSocket & socket, + Poco::Net::HTTPServerParams::Ptr params_, + HTTPRequestHandlerFactoryPtr factory_) + : TCPServerConnection(socket), context(context_), params(params_), factory(factory_), stopped(false) +{ + poco_check_ptr(factory); +} + +void HTTPServerConnection::run() +{ + std::string server = params->getSoftwareVersion(); + Poco::Net::HTTPServerSession session(socket(), params); + + while (!stopped && session.hasMoreRequests()) + { + try + { + std::unique_lock lock(mutex); + if (!stopped) + { + HTTPServerResponse response(session); + HTTPServerRequest request(context, response, session); + + Poco::Timestamp now; + response.setDate(now); + response.setVersion(request.getVersion()); + response.setKeepAlive(params->getKeepAlive() && request.getKeepAlive() && session.canKeepAlive()); + if (!server.empty()) + response.set("Server", server); + try + { + std::unique_ptr handler(factory->createRequestHandler(request)); + + if (handler) + { + if (request.getExpectContinue() && response.getStatus() == Poco::Net::HTTPResponse::HTTP_OK) + response.sendContinue(); + + handler->handleRequest(request, response); + session.setKeepAlive(params->getKeepAlive() && response.getKeepAlive() && session.canKeepAlive()); + } + else + sendErrorResponse(session, Poco::Net::HTTPResponse::HTTP_NOT_IMPLEMENTED); + } + catch (Poco::Exception &) + { + if (!response.sent()) + { + try + { + sendErrorResponse(session, Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); + } + catch (...) + { + } + } + throw; + } + } + } + catch (Poco::Net::NoMessageException &) + { + break; + } + catch (Poco::Net::MessageException &) + { + sendErrorResponse(session, Poco::Net::HTTPResponse::HTTP_BAD_REQUEST); + } + catch (Poco::Exception &) + { + if (session.networkException()) + { + session.networkException()->rethrow(); + } + else + throw; + } + } +} + +// static +void HTTPServerConnection::sendErrorResponse(Poco::Net::HTTPServerSession & session, Poco::Net::HTTPResponse::HTTPStatus status) +{ + HTTPServerResponse response(session); + response.setVersion(Poco::Net::HTTPMessage::HTTP_1_1); + response.setStatusAndReason(status); + response.setKeepAlive(false); + response.send(); + session.setKeepAlive(false); +} + +void HTTPServerConnection::onServerStopped(const bool & abortCurrent) +{ + stopped = true; + if (abortCurrent) + { + try + { + socket().shutdown(); + } + catch (...) + { + } + } + else + { + std::unique_lock lock(mutex); + + try + { + socket().shutdown(); + } + catch (...) + { + } + } +} + +} diff --git a/src/Server/HTTP/HTTPServerConnection.h b/src/Server/HTTP/HTTPServerConnection.h new file mode 100644 index 0000000000000000000000000000000000000000..589c33025bf1edb2b4188b97e6a3a5d627456948 --- /dev/null +++ b/src/Server/HTTP/HTTPServerConnection.h @@ -0,0 +1,36 @@ +#pragma once + +#include +#include + +#include +#include +#include + +namespace DB +{ + +class HTTPServerConnection : public Poco::Net::TCPServerConnection +{ +public: + HTTPServerConnection( + const Context & context, + const Poco::Net::StreamSocket & socket, + Poco::Net::HTTPServerParams::Ptr params, + HTTPRequestHandlerFactoryPtr factory); + + void run() override; + +protected: + static void sendErrorResponse(Poco::Net::HTTPServerSession & session, Poco::Net::HTTPResponse::HTTPStatus status); + void onServerStopped(const bool & abortCurrent); + +private: + Context context; + Poco::Net::HTTPServerParams::Ptr params; + HTTPRequestHandlerFactoryPtr factory; + bool stopped; + std::mutex mutex; // guards the |factory| with assumption that creating handlers is not thread-safe. +}; + +} diff --git a/src/Server/HTTP/HTTPServerConnectionFactory.cpp b/src/Server/HTTP/HTTPServerConnectionFactory.cpp new file mode 100644 index 0000000000000000000000000000000000000000..876ccb9096b1b5293d88070142a98574e0eaccd4 --- /dev/null +++ b/src/Server/HTTP/HTTPServerConnectionFactory.cpp @@ -0,0 +1,19 @@ +#include + +#include + +namespace DB +{ +HTTPServerConnectionFactory::HTTPServerConnectionFactory( + const Context & context_, Poco::Net::HTTPServerParams::Ptr params_, HTTPRequestHandlerFactoryPtr factory_) + : context(context_), params(params_), factory(factory_) +{ + poco_check_ptr(factory); +} + +Poco::Net::TCPServerConnection * HTTPServerConnectionFactory::createConnection(const Poco::Net::StreamSocket & socket) +{ + return new HTTPServerConnection(context, socket, params, factory); +} + +} diff --git a/src/Server/HTTP/HTTPServerConnectionFactory.h b/src/Server/HTTP/HTTPServerConnectionFactory.h new file mode 100644 index 0000000000000000000000000000000000000000..4f8ca43cbfba560428509c48f99205c69c2dda95 --- /dev/null +++ b/src/Server/HTTP/HTTPServerConnectionFactory.h @@ -0,0 +1,25 @@ +#pragma once + +#include +#include + +#include +#include + +namespace DB +{ + +class HTTPServerConnectionFactory : public Poco::Net::TCPServerConnectionFactory +{ +public: + HTTPServerConnectionFactory(const Context & context, Poco::Net::HTTPServerParams::Ptr params, HTTPRequestHandlerFactoryPtr factory); + + Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket) override; + +private: + Context context; + Poco::Net::HTTPServerParams::Ptr params; + HTTPRequestHandlerFactoryPtr factory; +}; + +} diff --git a/src/Server/HTTP/HTTPServerRequest.cpp b/src/Server/HTTP/HTTPServerRequest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..bdba6a51d91a25a35f6877dc08a65cb5773a0235 --- /dev/null +++ b/src/Server/HTTP/HTTPServerRequest.cpp @@ -0,0 +1,123 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace DB +{ + +HTTPServerRequest::HTTPServerRequest(const Context & context, HTTPServerResponse & response, Poco::Net::HTTPServerSession & session) +{ + response.attachRequest(this); + + /// Now that we know socket is still connected, obtain addresses + client_address = session.clientAddress(); + server_address = session.serverAddress(); + + auto receive_timeout = context.getSettingsRef().http_receive_timeout; + auto send_timeout = context.getSettingsRef().http_send_timeout; + auto max_query_size = context.getSettingsRef().max_query_size; + + session.socket().setReceiveTimeout(receive_timeout); + session.socket().setSendTimeout(send_timeout); + + auto in = std::make_unique(session.socket()); + socket = session.socket().impl(); + + readRequest(*in); /// Try parse according to RFC7230 + + if (getChunkedTransferEncoding()) + stream = std::make_unique(std::move(in), max_query_size); + else if (hasContentLength()) + stream = std::make_unique(std::move(in), getContentLength(), false); + else if (getMethod() != HTTPRequest::HTTP_GET && getMethod() != HTTPRequest::HTTP_HEAD && getMethod() != HTTPRequest::HTTP_DELETE) + stream = std::move(in); + else + /// We have to distinguish empty buffer and nullptr. + stream = std::make_unique(); +} + +bool HTTPServerRequest::checkPeerConnected() const +{ + try + { + char b; + if (!socket->receiveBytes(&b, 1, MSG_DONTWAIT | MSG_PEEK)) + return false; + } + catch (Poco::TimeoutException &) + { + } + catch (...) + { + return false; + } + + return true; +} + +void HTTPServerRequest::readRequest(ReadBuffer & in) +{ + char ch; + std::string method; + std::string uri; + std::string version; + + method.reserve(16); + uri.reserve(64); + version.reserve(16); + + if (in.eof()) + throw Poco::Net::NoMessageException(); + + skipWhitespaceIfAny(in); + + if (in.eof()) + throw Poco::Net::MessageException("No HTTP request header"); + + while (in.read(ch) && !Poco::Ascii::isSpace(ch) && method.size() <= MAX_METHOD_LENGTH) + method += ch; + + if (method.size() > MAX_METHOD_LENGTH) + throw Poco::Net::MessageException("HTTP request method invalid or too long"); + + skipWhitespaceIfAny(in); + + while (in.read(ch) && !Poco::Ascii::isSpace(ch) && uri.size() <= MAX_URI_LENGTH) + uri += ch; + + if (uri.size() > MAX_URI_LENGTH) + throw Poco::Net::MessageException("HTTP request URI invalid or too long"); + + skipWhitespaceIfAny(in); + + while (in.read(ch) && !Poco::Ascii::isSpace(ch) && version.size() <= MAX_VERSION_LENGTH) + version += ch; + + if (version.size() > MAX_VERSION_LENGTH) + throw Poco::Net::MessageException("Invalid HTTP version string"); + + // since HTTP always use Windows-style EOL '\r\n' we always can safely skip to '\n' + + skipToNextLineOrEOF(in); + + readHeaders(*this, in); + + skipToNextLineOrEOF(in); + + setMethod(method); + setURI(uri); + setVersion(version); +} + +} diff --git a/src/Server/HTTP/HTTPServerRequest.h b/src/Server/HTTP/HTTPServerRequest.h new file mode 100644 index 0000000000000000000000000000000000000000..7fd548502121963b84038e5e4e71d0c7424fe92a --- /dev/null +++ b/src/Server/HTTP/HTTPServerRequest.h @@ -0,0 +1,59 @@ +#pragma once + +#include +#include + +#include + +namespace DB +{ + +class Context; +class HTTPServerResponse; +class ReadBufferFromPocoSocket; + +class HTTPServerRequest : public HTTPRequest +{ +public: + HTTPServerRequest(const Context & context, HTTPServerResponse & response, Poco::Net::HTTPServerSession & session); + + /// FIXME: it's a little bit inconvenient interface. The rationale is that all other ReadBuffer's wrap each other + /// via unique_ptr - but we can't inherit HTTPServerRequest from ReadBuffer and pass it around, + /// since we also need it in other places. + + /// Returns the input stream for reading the request body. + ReadBuffer & getStream() + { + poco_check_ptr(stream); + return *stream; + } + + bool checkPeerConnected() const; + + /// Returns the client's address. + const Poco::Net::SocketAddress & clientAddress() const { return client_address; } + + /// Returns the server's address. + const Poco::Net::SocketAddress & serverAddress() const { return server_address; } + +private: + /// Limits for basic sanity checks when reading a header + enum Limits + { + MAX_NAME_LENGTH = 256, + MAX_VALUE_LENGTH = 8192, + MAX_METHOD_LENGTH = 32, + MAX_URI_LENGTH = 16384, + MAX_VERSION_LENGTH = 8, + MAX_FIELDS_NUMBER = 100, + }; + + std::unique_ptr stream; + Poco::Net::SocketImpl * socket; + Poco::Net::SocketAddress client_address; + Poco::Net::SocketAddress server_address; + + void readRequest(ReadBuffer & in); +}; + +} diff --git a/src/Server/HTTP/HTTPServerResponse.cpp b/src/Server/HTTP/HTTPServerResponse.cpp new file mode 100644 index 0000000000000000000000000000000000000000..e3d52fffa80f0e98269acb4d62a4070a629011f6 --- /dev/null +++ b/src/Server/HTTP/HTTPServerResponse.cpp @@ -0,0 +1,163 @@ +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +HTTPServerResponse::HTTPServerResponse(Poco::Net::HTTPServerSession & session_) : session(session_) +{ +} + +void HTTPServerResponse::sendContinue() +{ + Poco::Net::HTTPHeaderOutputStream hs(session); + hs << getVersion() << " 100 Continue\r\n\r\n"; +} + +std::shared_ptr HTTPServerResponse::send() +{ + poco_assert(!stream); + + if ((request && request->getMethod() == HTTPRequest::HTTP_HEAD) || getStatus() < 200 || getStatus() == HTTPResponse::HTTP_NO_CONTENT + || getStatus() == HTTPResponse::HTTP_NOT_MODIFIED) + { + Poco::CountingOutputStream cs; + write(cs); + stream = std::make_shared(session, cs.chars()); + write(*stream); + } + else if (getChunkedTransferEncoding()) + { + Poco::Net::HTTPHeaderOutputStream hs(session); + write(hs); + stream = std::make_shared(session); + } + else if (hasContentLength()) + { + Poco::CountingOutputStream cs; + write(cs); + stream = std::make_shared(session, getContentLength64() + cs.chars()); + write(*stream); + } + else + { + stream = std::make_shared(session); + setKeepAlive(false); + write(*stream); + } + + return stream; +} + +std::pair, std::shared_ptr> HTTPServerResponse::beginSend() +{ + poco_assert(!stream); + poco_assert(!header_stream); + + /// NOTE: Code is not exception safe. + + if ((request && request->getMethod() == HTTPRequest::HTTP_HEAD) || getStatus() < 200 || getStatus() == HTTPResponse::HTTP_NO_CONTENT + || getStatus() == HTTPResponse::HTTP_NOT_MODIFIED) + { + throw Poco::Exception("HTTPServerResponse::beginSend is invalid for HEAD request"); + } + else if (getChunkedTransferEncoding()) + { + header_stream = std::make_shared(session); + beginWrite(*header_stream); + stream = std::make_shared(session); + } + else if (hasContentLength()) + { + throw Poco::Exception("HTTPServerResponse::beginSend is invalid for response with Content-Length header"); + } + else + { + stream = std::make_shared(session); + header_stream = stream; + setKeepAlive(false); + beginWrite(*stream); + } + + return std::make_pair(header_stream, stream); +} + +void HTTPServerResponse::sendFile(const std::string & path, const std::string & mediaType) +{ + poco_assert(!stream); + + Poco::File f(path); + Poco::Timestamp date_time = f.getLastModified(); + Poco::File::FileSize length = f.getSize(); + set("Last-Modified", Poco::DateTimeFormatter::format(date_time, Poco::DateTimeFormat::HTTP_FORMAT)); + setContentLength64(length); + setContentType(mediaType); + setChunkedTransferEncoding(false); + + Poco::FileInputStream istr(path); + if (istr.good()) + { + stream = std::make_shared(session); + write(*stream); + if (request && request->getMethod() != HTTPRequest::HTTP_HEAD) + { + Poco::StreamCopier::copyStream(istr, *stream); + } + } + else + throw Poco::OpenFileException(path); +} + +void HTTPServerResponse::sendBuffer(const void * buffer, std::size_t length) +{ + poco_assert(!stream); + + setContentLength(static_cast(length)); + setChunkedTransferEncoding(false); + + stream = std::make_shared(session); + write(*stream); + if (request && request->getMethod() != HTTPRequest::HTTP_HEAD) + { + stream->write(static_cast(buffer), static_cast(length)); + } +} + +void HTTPServerResponse::redirect(const std::string & uri, HTTPStatus status) +{ + poco_assert(!stream); + + setContentLength(0); + setChunkedTransferEncoding(false); + + setStatusAndReason(status); + set("Location", uri); + + stream = std::make_shared(session); + write(*stream); +} + +void HTTPServerResponse::requireAuthentication(const std::string & realm) +{ + poco_assert(!stream); + + setStatusAndReason(HTTPResponse::HTTP_UNAUTHORIZED); + std::string auth("Basic realm=\""); + auth.append(realm); + auth.append("\""); + set("WWW-Authenticate", auth); +} + +} diff --git a/src/Server/HTTP/HTTPServerResponse.h b/src/Server/HTTP/HTTPServerResponse.h new file mode 100644 index 0000000000000000000000000000000000000000..82221ce3a83999198984aef2c14f90b8a0754f46 --- /dev/null +++ b/src/Server/HTTP/HTTPServerResponse.h @@ -0,0 +1,91 @@ +#pragma once + +#include + +#include +#include + +#include +#include + +namespace DB +{ + +class HTTPServerRequest; + +class HTTPServerResponse : public HTTPResponse +{ +public: + explicit HTTPServerResponse(Poco::Net::HTTPServerSession & session); + + void sendContinue(); /// Sends a 100 Continue response to the client. + + /// Sends the response header to the client and + /// returns an output stream for sending the + /// response body. + /// + /// Must not be called after beginSend(), sendFile(), sendBuffer() + /// or redirect() has been called. + std::shared_ptr send(); /// TODO: use some WriteBuffer implementation here. + + /// Sends the response headers to the client + /// but do not finish headers with \r\n, + /// allowing to continue sending additional header fields. + /// + /// Must not be called after send(), sendFile(), sendBuffer() + /// or redirect() has been called. + std::pair, std::shared_ptr> beginSend(); /// TODO: use some WriteBuffer implementation here. + + /// Sends the response header to the client, followed + /// by the content of the given file. + /// + /// Must not be called after send(), sendBuffer() + /// or redirect() has been called. + /// + /// Throws a FileNotFoundException if the file + /// cannot be found, or an OpenFileException if + /// the file cannot be opened. + void sendFile(const std::string & path, const std::string & mediaType); + + /// Sends the response header to the client, followed + /// by the contents of the given buffer. + /// + /// The Content-Length header of the response is set + /// to length and chunked transfer encoding is disabled. + /// + /// If both the HTTP message header and body (from the + /// given buffer) fit into one single network packet, the + /// complete response can be sent in one network packet. + /// + /// Must not be called after send(), sendFile() + /// or redirect() has been called. + void sendBuffer(const void * pBuffer, std::size_t length); /// FIXME: do we need this one? + + /// Sets the status code, which must be one of + /// HTTP_MOVED_PERMANENTLY (301), HTTP_FOUND (302), + /// or HTTP_SEE_OTHER (303), + /// and sets the "Location" header field + /// to the given URI, which according to + /// the HTTP specification, must be absolute. + /// + /// Must not be called after send() has been called. + void redirect(const std::string & uri, Poco::Net::HTTPResponse::HTTPStatus status = Poco::Net::HTTPResponse::HTTP_FOUND); + + void requireAuthentication(const std::string & realm); + /// Sets the status code to 401 (Unauthorized) + /// and sets the "WWW-Authenticate" header field + /// according to the given realm. + + /// Returns true if the response (header) has been sent. + bool sent() const { return !!stream; } + + void attachRequest(HTTPServerRequest * request_) { request = request_; } + +private: + Poco::Net::HTTPServerSession & session; + HTTPServerRequest * request; + std::shared_ptr stream; + std::shared_ptr header_stream; +}; + +} diff --git a/src/Server/HTTP/ReadHeaders.cpp b/src/Server/HTTP/ReadHeaders.cpp new file mode 100644 index 0000000000000000000000000000000000000000..77ec48c11b193e7b8198bd0a30da4523eea79b5a --- /dev/null +++ b/src/Server/HTTP/ReadHeaders.cpp @@ -0,0 +1,88 @@ +#include + +#include +#include + +#include + +namespace DB +{ + +void readHeaders( + Poco::Net::MessageHeader & headers, ReadBuffer & in, size_t max_fields_number, size_t max_name_length, size_t max_value_length) +{ + char ch = 0; // silence uninitialized warning from gcc-* + std::string name; + std::string value; + + name.reserve(32); + value.reserve(64); + + size_t fields = 0; + + while (true) + { + if (fields > max_fields_number) + throw Poco::Net::MessageException("Too many header fields"); + + name.clear(); + value.clear(); + + /// Field name + while (in.peek(ch) && ch != ':' && !Poco::Ascii::isSpace(ch) && name.size() <= max_name_length) + { + name += ch; + in.ignore(); + } + + if (in.eof()) + throw Poco::Net::MessageException("Field is invalid"); + + if (name.empty()) + { + if (ch == '\r') + /// Start of the empty-line delimiter + break; + if (ch == ':') + throw Poco::Net::MessageException("Field name is empty"); + } + else + { + if (name.size() > max_name_length) + throw Poco::Net::MessageException("Field name is too long"); + if (ch != ':') + throw Poco::Net::MessageException("Field name is invalid or no colon found"); + } + + in.ignore(); + + skipWhitespaceIfAny(in, true); + + if (in.eof()) + throw Poco::Net::MessageException("Field is invalid"); + + /// Field value - folded values not supported. + while (in.read(ch) && ch != '\r' && ch != '\n' && value.size() <= max_value_length) + value += ch; + + if (in.eof()) + throw Poco::Net::MessageException("Field is invalid"); + + if (value.empty()) + throw Poco::Net::MessageException("Field value is empty"); + + if (ch == '\n') + throw Poco::Net::MessageException("No CRLF found"); + + if (value.size() > max_value_length) + throw Poco::Net::MessageException("Field value is too long"); + + skipToNextLineOrEOF(in); + + Poco::trimRightInPlace(value); + headers.add(name, headers.decodeWord(value)); + ++fields; + } +} + +} diff --git a/src/Server/HTTP/ReadHeaders.h b/src/Server/HTTP/ReadHeaders.h new file mode 100644 index 0000000000000000000000000000000000000000..e94cddcf4896bd474b974d25eb9dfaa69646a696 --- /dev/null +++ b/src/Server/HTTP/ReadHeaders.h @@ -0,0 +1,17 @@ +#pragma once + +#include + +namespace DB +{ + +class ReadBuffer; + +void readHeaders( + Poco::Net::MessageHeader & headers, + ReadBuffer & in, + size_t max_fields_number = 100, + size_t max_name_length = 256, + size_t max_value_length = 8192); + +} diff --git a/src/IO/WriteBufferFromHTTPServerResponse.cpp b/src/Server/HTTP/WriteBufferFromHTTPServerResponse.cpp similarity index 81% rename from src/IO/WriteBufferFromHTTPServerResponse.cpp rename to src/Server/HTTP/WriteBufferFromHTTPServerResponse.cpp index ac2eeac1652f4a03b80401fa376d55a17371081d..86133fc2ffe112222edab7a063e00ca211e5227b 100644 --- a/src/IO/WriteBufferFromHTTPServerResponse.cpp +++ b/src/Server/HTTP/WriteBufferFromHTTPServerResponse.cpp @@ -1,9 +1,8 @@ -#include -#include -#include -#include +#include + #include #include +#include #include #include #include @@ -13,6 +12,8 @@ # include #endif +#include + namespace DB { @@ -33,16 +34,13 @@ void WriteBufferFromHTTPServerResponse::startSendHeaders() setResponseDefaultHeaders(response, keep_alive_timeout); -#if defined(POCO_CLICKHOUSE_PATCH) - if (request.getMethod() != Poco::Net::HTTPRequest::HTTP_HEAD) + if (!is_http_method_head) std::tie(response_header_ostr, response_body_ostr) = response.beginSend(); -#endif } } void WriteBufferFromHTTPServerResponse::writeHeaderSummary() { -#if defined(POCO_CLICKHOUSE_PATCH) if (headers_finished_sending) return; @@ -51,12 +49,10 @@ void WriteBufferFromHTTPServerResponse::writeHeaderSummary() if (response_header_ostr) *response_header_ostr << "X-ClickHouse-Summary: " << progress_string_writer.str() << "\r\n" << std::flush; -#endif } void WriteBufferFromHTTPServerResponse::writeHeaderProgress() { -#if defined(POCO_CLICKHOUSE_PATCH) if (headers_finished_sending) return; @@ -65,7 +61,6 @@ void WriteBufferFromHTTPServerResponse::writeHeaderProgress() if (response_header_ostr) *response_header_ostr << "X-ClickHouse-Progress: " << progress_string_writer.str() << "\r\n" << std::flush; -#endif } void WriteBufferFromHTTPServerResponse::finishSendHeaders() @@ -75,23 +70,16 @@ void WriteBufferFromHTTPServerResponse::finishSendHeaders() writeHeaderSummary(); headers_finished_sending = true; - if (request.getMethod() != Poco::Net::HTTPRequest::HTTP_HEAD) + if (!is_http_method_head) { -#if defined(POCO_CLICKHOUSE_PATCH) /// Send end of headers delimiter. if (response_header_ostr) *response_header_ostr << "\r\n" << std::flush; -#else - /// Newline autosent by response.send() - /// if nothing to send in body: - if (!response_body_ostr) - response_body_ostr = &(response.send()); -#endif } else { if (!response_body_ostr) - response_body_ostr = &(response.send()); + response_body_ostr = response.send(); } } } @@ -104,23 +92,15 @@ void WriteBufferFromHTTPServerResponse::nextImpl() startSendHeaders(); - if (!out && request.getMethod() != Poco::Net::HTTPRequest::HTTP_HEAD) + if (!out && !is_http_method_head) { if (compress) { auto content_encoding_name = toContentEncodingName(compression_method); -#if defined(POCO_CLICKHOUSE_PATCH) *response_header_ostr << "Content-Encoding: " << content_encoding_name << "\r\n"; -#else - response.set("Content-Encoding", content_encoding_name); -#endif } -#if !defined(POCO_CLICKHOUSE_PATCH) - response_body_ostr = &(response.send()); -#endif - /// We reuse our buffer in "out" to avoid extra allocations and copies. if (compress) @@ -150,14 +130,14 @@ void WriteBufferFromHTTPServerResponse::nextImpl() WriteBufferFromHTTPServerResponse::WriteBufferFromHTTPServerResponse( - Poco::Net::HTTPServerRequest & request_, - Poco::Net::HTTPServerResponse & response_, + HTTPServerResponse & response_, + bool is_http_method_head_, unsigned keep_alive_timeout_, bool compress_, CompressionMethod compression_method_) : BufferWithOwnMemory(DBMS_DEFAULT_BUFFER_SIZE) - , request(request_) , response(response_) + , is_http_method_head(is_http_method_head_) , keep_alive_timeout(keep_alive_timeout_) , compress(compress_) , compression_method(compression_method_) diff --git a/src/IO/WriteBufferFromHTTPServerResponse.h b/src/Server/HTTP/WriteBufferFromHTTPServerResponse.h similarity index 86% rename from src/IO/WriteBufferFromHTTPServerResponse.h rename to src/Server/HTTP/WriteBufferFromHTTPServerResponse.h index 85a81c3dda793d7a78163b35b37764d7a3099528..b4ff454195fb3a32f55ad9f0cc1a76e6b94f8d15 100644 --- a/src/IO/WriteBufferFromHTTPServerResponse.h +++ b/src/Server/HTTP/WriteBufferFromHTTPServerResponse.h @@ -1,31 +1,17 @@ #pragma once -#include -#include -#include -#include -#include -#include -#include #include -#include +#include #include #include +#include +#include +#include #include #include -#if !defined(ARCADIA_BUILD) -# include -#endif - - -namespace Poco -{ - namespace Net - { - class HTTPServerResponse; - } -} +#include +#include namespace DB @@ -47,20 +33,17 @@ namespace DB class WriteBufferFromHTTPServerResponse final : public BufferWithOwnMemory { private: - Poco::Net::HTTPServerRequest & request; - Poco::Net::HTTPServerResponse & response; + HTTPServerResponse & response; + bool is_http_method_head; bool add_cors_header = false; unsigned keep_alive_timeout = 0; bool compress = false; CompressionMethod compression_method; int compression_level = 1; - std::ostream * response_body_ostr = nullptr; - -#if defined(POCO_CLICKHOUSE_PATCH) - std::ostream * response_header_ostr = nullptr; -#endif + std::shared_ptr response_body_ostr; + std::shared_ptr response_header_ostr; std::unique_ptr out; @@ -91,8 +74,8 @@ private: public: WriteBufferFromHTTPServerResponse( - Poco::Net::HTTPServerRequest & request_, - Poco::Net::HTTPServerResponse & response_, + HTTPServerResponse & response_, + bool is_http_method_head_, unsigned keep_alive_timeout_, bool compress_ = false, /// If true - set Content-Encoding header and compress the result. CompressionMethod compression_method_ = CompressionMethod::None); diff --git a/src/Server/HTTPHandler.cpp b/src/Server/HTTPHandler.cpp index e9a77c3b433ed5e46c8579f58918c710628979dd..d200ee7421ff8d96c78ffab1b6875cda68c9cc81 100644 --- a/src/Server/HTTPHandler.cpp +++ b/src/Server/HTTPHandler.cpp @@ -1,49 +1,47 @@ -#include "HTTPHandler.h" +#include -#include "HTTPHandlerFactory.h" -#include "HTTPHandlerRequestFilter.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include #include +#include +#include +#include +#include +#include +#include #include #include -#include -#include #include +#include +#include +#include #include #include -#include -#include -#include -#include -#include -#include -#include #include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include -#include +#include +#include #if !defined(ARCADIA_BUILD) # include #endif +#include +#include +#include +#include + +#include +#include + namespace DB { @@ -237,16 +235,14 @@ HTTPHandler::HTTPHandler(IServer & server_, const std::string & name) void HTTPHandler::processQuery( Context & context, - Poco::Net::HTTPServerRequest & request, + HTTPServerRequest & request, HTMLForm & params, - Poco::Net::HTTPServerResponse & response, + HTTPServerResponse & response, Output & used_output, std::optional & query_scope) { LOG_TRACE(log, "Request URI: {}", request.getURI()); - std::istream & istr = request.stream(); - /// The user and password can be passed by headers (similar to X-Auth-*), /// which is used by load balancers to pass authentication information. std::string user = request.get("X-ClickHouse-User", ""); @@ -291,9 +287,9 @@ void HTTPHandler::processQuery( client_info.interface = ClientInfo::Interface::HTTP; ClientInfo::HTTPMethod http_method = ClientInfo::HTTPMethod::UNKNOWN; - if (request.getMethod() == Poco::Net::HTTPServerRequest::HTTP_GET) + if (request.getMethod() == HTTPServerRequest::HTTP_GET) http_method = ClientInfo::HTTPMethod::GET; - else if (request.getMethod() == Poco::Net::HTTPServerRequest::HTTP_POST) + else if (request.getMethod() == HTTPServerRequest::HTTP_POST) http_method = ClientInfo::HTTPMethod::POST; client_info.http_method = http_method; @@ -356,10 +352,8 @@ void HTTPHandler::processQuery( } #endif - // Set the query id supplied by the user, if any, and also update the - // OpenTelemetry fields. - context.setCurrentQueryId(params.get("query_id", - request.get("X-ClickHouse-Query-Id", ""))); + // Set the query id supplied by the user, if any, and also update the OpenTelemetry fields. + context.setCurrentQueryId(params.get("query_id", request.get("X-ClickHouse-Query-Id", ""))); client_info.initial_query_id = client_info.current_query_id; @@ -405,7 +399,11 @@ void HTTPHandler::processQuery( unsigned keep_alive_timeout = config.getUInt("keep_alive_timeout", 10); used_output.out = std::make_shared( - request, response, keep_alive_timeout, client_supports_http_compression, http_response_compression_method); + response, + request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, + keep_alive_timeout, + client_supports_http_compression, + http_response_compression_method); if (internal_compression) used_output.out_maybe_compressed = std::make_shared(*used_output.out); @@ -459,8 +457,8 @@ void HTTPHandler::processQuery( /// Request body can be compressed using algorithm specified in the Content-Encoding header. String http_request_compression_method_str = request.get("Content-Encoding", ""); - std::unique_ptr in_post = wrapReadBufferWithCompressionMethod( - std::make_unique(istr), chooseCompressionMethod({}, http_request_compression_method_str)); + auto in_post = wrapReadBufferWithCompressionMethod( + wrapReadBufferReference(request.getStream()), chooseCompressionMethod({}, http_request_compression_method_str)); /// The data can also be compressed using incompatible internal algorithm. This is indicated by /// 'decompress' query parameter. @@ -513,7 +511,7 @@ void HTTPHandler::processQuery( const auto & settings = context.getSettingsRef(); /// Only readonly queries are allowed for HTTP GET requests. - if (request.getMethod() == Poco::Net::HTTPServerRequest::HTTP_GET) + if (request.getMethod() == HTTPServerRequest::HTTP_GET) { if (settings.readonly == 0) context.setSetting("readonly", 2); @@ -608,26 +606,12 @@ void HTTPHandler::processQuery( if (settings.readonly > 0 && settings.cancel_http_readonly_queries_on_client_close) { - Poco::Net::StreamSocket & socket = dynamic_cast(request).socket(); - - append_callback([&context, &socket](const Progress &) + append_callback([&context, &request](const Progress &) { - /// Assume that at the point this method is called no one is reading data from the socket any more. - /// True for read-only queries. - try - { - char b; - int status = socket.receiveBytes(&b, 1, MSG_DONTWAIT | MSG_PEEK); - if (status == 0) - context.killCurrentQuery(); - } - catch (Poco::TimeoutException &) - { - } - catch (...) - { + /// Assume that at the point this method is called no one is reading data from the socket any more: + /// should be true for read-only queries. + if (!request.checkPeerConnected()) context.killCurrentQuery(); - } }); } @@ -656,22 +640,23 @@ void HTTPHandler::processQuery( used_output.out->finalize(); } -void HTTPHandler::trySendExceptionToClient(const std::string & s, int exception_code, - Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response, - Output & used_output) +void HTTPHandler::trySendExceptionToClient( + const std::string & s, int exception_code, HTTPServerRequest & request, HTTPServerResponse & response, Output & used_output) { try { response.set("X-ClickHouse-Exception-Code", toString(exception_code)); + /// FIXME: make sure that no one else is reading from the same stream at the moment. + /// If HTTP method is POST and Keep-Alive is turned on, we should read the whole request body /// to avoid reading part of the current request body in the next request. if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST && response.getKeepAlive() - && !request.stream().eof() - && exception_code != ErrorCodes::HTTP_LENGTH_REQUIRED) + && exception_code != ErrorCodes::HTTP_LENGTH_REQUIRED + && !request.getStream().eof()) { - request.stream().ignore(std::numeric_limits::max()); + request.getStream().ignoreAll(); } bool auth_fail = exception_code == ErrorCodes::UNKNOWN_USER || @@ -690,7 +675,7 @@ void HTTPHandler::trySendExceptionToClient(const std::string & s, int exception_ if (!response.sent() && !used_output.out_maybe_compressed) { /// If nothing was sent yet and we don't even know if we must compress the response. - response.send() << s << std::endl; + *response.send() << s << std::endl; } else if (used_output.out_maybe_compressed) { @@ -717,6 +702,11 @@ void HTTPHandler::trySendExceptionToClient(const std::string & s, int exception_ used_output.out_maybe_compressed->next(); used_output.out->finalize(); } + else + { + assert(false); + __builtin_unreachable(); + } } catch (...) { @@ -725,7 +715,7 @@ void HTTPHandler::trySendExceptionToClient(const std::string & s, int exception_ } -void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) +void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) { setThreadName("HTTPHandler"); ThreadStatus thread_status; @@ -746,17 +736,18 @@ void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne response.setContentType("text/plain; charset=UTF-8"); response.set("X-ClickHouse-Server-Display-Name", server_display_name); /// For keep-alive to work. - if (request.getVersion() == Poco::Net::HTTPServerRequest::HTTP_1_1) + if (request.getVersion() == HTTPServerRequest::HTTP_1_1) response.setChunkedTransferEncoding(true); HTMLForm params(request); with_stacktrace = params.getParsed("stacktrace", false); /// Workaround. Poco does not detect 411 Length Required case. - if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST && !request.getChunkedTransferEncoding() && - !request.hasContentLength()) + if (request.getMethod() == HTTPRequest::HTTP_POST && !request.getChunkedTransferEncoding() && !request.hasContentLength()) { - throw Exception("The Transfer-Encoding is not chunked and there is no Content-Length header for POST request", ErrorCodes::HTTP_LENGTH_REQUIRED); + throw Exception( + "The Transfer-Encoding is not chunked and there is no Content-Length header for POST request", + ErrorCodes::HTTP_LENGTH_REQUIRED); } processQuery(context, request, params, response, used_output, query_scope); @@ -800,7 +791,7 @@ bool DynamicQueryHandler::customizeQueryParam(Context & context, const std::stri return false; } -std::string DynamicQueryHandler::getQuery(Poco::Net::HTTPServerRequest & request, HTMLForm & params, Context & context) +std::string DynamicQueryHandler::getQuery(HTTPServerRequest & request, HTMLForm & params, Context & context) { if (likely(!startsWith(request.getContentType(), "multipart/form-data"))) { @@ -814,7 +805,7 @@ std::string DynamicQueryHandler::getQuery(Poco::Net::HTTPServerRequest & request /// Support for "external data for query processing". /// Used in case of POST request with form-data, but it isn't expected to be deleted after that scope. ExternalTablesHandler handler(context, params); - params.load(request, request.stream(), handler); + params.load(request, request.getStream(), handler); std::string full_query; /// Params are of both form params POST and uri (GET params) @@ -844,7 +835,7 @@ bool PredefinedQueryHandler::customizeQueryParam(Context & context, const std::s return false; } -void PredefinedQueryHandler::customizeContext(Poco::Net::HTTPServerRequest & request, DB::Context & context) +void PredefinedQueryHandler::customizeContext(HTTPServerRequest & request, DB::Context & context) { /// If in the configuration file, the handler's header is regex and contains named capture group /// We will extract regex named capture groups as query parameters @@ -880,22 +871,26 @@ void PredefinedQueryHandler::customizeContext(Poco::Net::HTTPServerRequest & req } } -std::string PredefinedQueryHandler::getQuery(Poco::Net::HTTPServerRequest & request, HTMLForm & params, Context & context) +std::string PredefinedQueryHandler::getQuery(HTTPServerRequest & request, HTMLForm & params, Context & context) { if (unlikely(startsWith(request.getContentType(), "multipart/form-data"))) { /// Support for "external data for query processing". ExternalTablesHandler handler(context, params); - params.load(request, request.stream(), handler); + params.load(request, request.getStream(), handler); } return predefined_query; } -Poco::Net::HTTPRequestHandlerFactory * createDynamicHandlerFactory(IServer & server, const std::string & config_prefix) +HTTPRequestHandlerFactoryPtr createDynamicHandlerFactory(IServer & server, const std::string & config_prefix) { - std::string query_param_name = server.config().getString(config_prefix + ".handler.query_param_name", "query"); - return addFiltersFromConfig(new HandlingRuleHTTPHandlerFactory(server, std::move(query_param_name)), server.config(), config_prefix); + const auto & query_param_name = server.config().getString(config_prefix + ".handler.query_param_name", "query"); + auto factory = std::make_shared>(server, std::move(query_param_name)); + + factory->addFiltersFromConfig(server.config(), config_prefix); + + return factory; } static inline bool capturingNamedQueryParam(NameSet receive_params, const CompiledRegexPtr & compiled_regex) @@ -913,18 +908,20 @@ static inline CompiledRegexPtr getCompiledRegex(const std::string & expression) auto compiled_regex = std::make_shared(expression); if (!compiled_regex->ok()) - throw Exception("Cannot compile re2: " + expression + " for http handling rule, error: " + - compiled_regex->error() + ". Look at https://github.com/google/re2/wiki/Syntax for reference.", ErrorCodes::CANNOT_COMPILE_REGEXP); + throw Exception( + "Cannot compile re2: " + expression + " for http handling rule, error: " + compiled_regex->error() + + ". Look at https://github.com/google/re2/wiki/Syntax for reference.", + ErrorCodes::CANNOT_COMPILE_REGEXP); return compiled_regex; } -Poco::Net::HTTPRequestHandlerFactory * createPredefinedHandlerFactory(IServer & server, const std::string & config_prefix) +HTTPRequestHandlerFactoryPtr createPredefinedHandlerFactory(IServer & server, const std::string & config_prefix) { Poco::Util::AbstractConfiguration & configuration = server.config(); if (!configuration.has(config_prefix + ".handler.query")) - throw Exception("There is no path '" + config_prefix + ".handler.query" + "' in configuration file.", ErrorCodes::NO_ELEMENTS_IN_CONFIG); + throw Exception("There is no path '" + config_prefix + ".handler.query' in configuration file.", ErrorCodes::NO_ELEMENTS_IN_CONFIG); std::string predefined_query = configuration.getString(config_prefix + ".handler.query"); NameSet analyze_receive_params = analyzeReceiveQueryParams(predefined_query); @@ -946,6 +943,8 @@ Poco::Net::HTTPRequestHandlerFactory * createPredefinedHandlerFactory(IServer & headers_name_with_regex.emplace(std::make_pair(header_name, regex)); } + std::shared_ptr> factory; + if (configuration.has(config_prefix + ".url")) { auto url_expression = configuration.getString(config_prefix + ".url"); @@ -955,14 +954,23 @@ Poco::Net::HTTPRequestHandlerFactory * createPredefinedHandlerFactory(IServer & auto regex = getCompiledRegex(url_expression); if (capturingNamedQueryParam(analyze_receive_params, regex)) - return addFiltersFromConfig(new HandlingRuleHTTPHandlerFactory( - server, std::move(analyze_receive_params), std::move(predefined_query), std::move(regex), - std::move(headers_name_with_regex)), configuration, config_prefix); + { + factory = std::make_shared>( + server, + std::move(analyze_receive_params), + std::move(predefined_query), + std::move(regex), + std::move(headers_name_with_regex)); + factory->addFiltersFromConfig(configuration, config_prefix); + return factory; + } } - return addFiltersFromConfig(new HandlingRuleHTTPHandlerFactory( - server, std::move(analyze_receive_params), std::move(predefined_query), CompiledRegexPtr{} ,std::move(headers_name_with_regex)), - configuration, config_prefix); + factory = std::make_shared>( + server, std::move(analyze_receive_params), std::move(predefined_query), CompiledRegexPtr{}, std::move(headers_name_with_regex)); + factory->addFiltersFromConfig(configuration, config_prefix); + + return factory; } } diff --git a/src/Server/HTTPHandler.h b/src/Server/HTTPHandler.h index 96727df54043e163c93d96bb80c0effae73fa480..e903fbfbff74dec56385f3babc4a63aa1c887ed2 100644 --- a/src/Server/HTTPHandler.h +++ b/src/Server/HTTPHandler.h @@ -1,13 +1,10 @@ #pragma once -#include "IServer.h" - -#include - -#include -#include -#include #include +#include +#include +#include +#include #include @@ -21,23 +18,24 @@ namespace Poco { class Logger; } namespace DB { +class IServer; class WriteBufferFromHTTPServerResponse; using CompiledRegexPtr = std::shared_ptr; -class HTTPHandler : public Poco::Net::HTTPRequestHandler +class HTTPHandler : public HTTPRequestHandler { public: - explicit HTTPHandler(IServer & server_, const std::string & name); + HTTPHandler(IServer & server_, const std::string & name); - void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override; + void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override; /// This method is called right before the query execution. - virtual void customizeContext(Poco::Net::HTTPServerRequest & /*request*/, Context & /* context */) {} + virtual void customizeContext(HTTPServerRequest & /* request */, Context & /* context */) {} virtual bool customizeQueryParam(Context & context, const std::string & key, const std::string & value) = 0; - virtual std::string getQuery(Poco::Net::HTTPServerRequest & request, HTMLForm & params, Context & context) = 0; + virtual std::string getQuery(HTTPServerRequest & request, HTMLForm & params, Context & context) = 0; private: struct Output @@ -74,17 +72,17 @@ private: /// Also initializes 'used_output'. void processQuery( Context & context, - Poco::Net::HTTPServerRequest & request, + HTTPServerRequest & request, HTMLForm & params, - Poco::Net::HTTPServerResponse & response, + HTTPServerResponse & response, Output & used_output, std::optional & query_scope); void trySendExceptionToClient( const std::string & s, int exception_code, - Poco::Net::HTTPServerRequest & request, - Poco::Net::HTTPServerResponse & response, + HTTPServerRequest & request, + HTTPServerResponse & response, Output & used_output); static void pushDelayedResults(Output & used_output); @@ -97,7 +95,7 @@ private: public: explicit DynamicQueryHandler(IServer & server_, const std::string & param_name_ = "query"); - std::string getQuery(Poco::Net::HTTPServerRequest & request, HTMLForm & params, Context & context) override; + std::string getQuery(HTTPServerRequest & request, HTMLForm & params, Context & context) override; bool customizeQueryParam(Context &context, const std::string &key, const std::string &value) override; }; @@ -114,9 +112,9 @@ public: IServer & server_, const NameSet & receive_params_, const std::string & predefined_query_ , const CompiledRegexPtr & url_regex_, const std::unordered_map & header_name_with_regex_); - virtual void customizeContext(Poco::Net::HTTPServerRequest & request, Context & context) override; + virtual void customizeContext(HTTPServerRequest & request, Context & context) override; - std::string getQuery(Poco::Net::HTTPServerRequest & request, HTMLForm & params, Context & context) override; + std::string getQuery(HTTPServerRequest & request, HTMLForm & params, Context & context) override; bool customizeQueryParam(Context & context, const std::string & key, const std::string & value) override; }; diff --git a/src/Server/HTTPHandlerFactory.cpp b/src/Server/HTTPHandlerFactory.cpp index 9eac60355d230228ee866d8c56d960f874da14a1..db80750beb8c357ad5dc20abba6edc18659d1ebc 100644 --- a/src/Server/HTTPHandlerFactory.cpp +++ b/src/Server/HTTPHandlerFactory.cpp @@ -1,4 +1,7 @@ -#include "HTTPHandlerFactory.h" +#include + +#include +#include #include @@ -29,7 +32,7 @@ HTTPRequestHandlerFactoryMain::HTTPRequestHandlerFactoryMain(const std::string & { } -Poco::Net::HTTPRequestHandler * HTTPRequestHandlerFactoryMain::createRequestHandler(const Poco::Net::HTTPServerRequest & request) +std::unique_ptr HTTPRequestHandlerFactoryMain::createRequestHandler(const HTTPServerRequest & request) { LOG_TRACE(log, "HTTP Request for {}. Method: {}, Address: {}, User-Agent: {}{}, Content Type: {}, Transfer Encoding: {}, X-Forwarded-For: {}", name, request.getMethod(), request.clientAddress().toString(), request.get("User-Agent", "(none)"), @@ -38,8 +41,8 @@ Poco::Net::HTTPRequestHandler * HTTPRequestHandlerFactoryMain::createRequestHand for (auto & handler_factory : child_factories) { - auto * handler = handler_factory->createRequestHandler(request); - if (handler != nullptr) + auto handler = handler_factory->createRequestHandler(request); + if (handler) return handler; } @@ -47,31 +50,16 @@ Poco::Net::HTTPRequestHandler * HTTPRequestHandlerFactoryMain::createRequestHand || request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD || request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST) { - return new NotFoundHandler; + return std::unique_ptr(new NotFoundHandler); } return nullptr; } -HTTPRequestHandlerFactoryMain::~HTTPRequestHandlerFactoryMain() -{ - while (!child_factories.empty()) - { - delete child_factories.back(); - child_factories.pop_back(); - } -} - -HTTPRequestHandlerFactoryMain::TThis * HTTPRequestHandlerFactoryMain::addHandler(Poco::Net::HTTPRequestHandlerFactory * child_factory) -{ - child_factories.emplace_back(child_factory); - return this; -} - static inline auto createHandlersFactoryFromConfig( IServer & server, const std::string & name, const String & prefix, AsynchronousMetrics & async_metrics) { - auto main_handler_factory = std::make_unique(name); + auto main_handler_factory = std::make_shared(name); Poco::Util::AbstractConfiguration::Keys keys; server.config().keys(prefix, keys); @@ -109,10 +97,11 @@ static inline auto createHandlersFactoryFromConfig( ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); } - return main_handler_factory.release(); + return main_handler_factory; } -static inline Poco::Net::HTTPRequestHandlerFactory * createHTTPHandlerFactory(IServer & server, const std::string & name, AsynchronousMetrics & async_metrics) +static inline HTTPRequestHandlerFactoryPtr +createHTTPHandlerFactory(IServer & server, const std::string & name, AsynchronousMetrics & async_metrics) { if (server.config().has("http_handlers")) { @@ -120,25 +109,25 @@ static inline Poco::Net::HTTPRequestHandlerFactory * createHTTPHandlerFactory(IS } else { - auto factory = std::make_unique(name); + auto factory = std::make_shared(name); addDefaultHandlersFactory(*factory, server, async_metrics); - return factory.release(); + return factory; } } -static inline Poco::Net::HTTPRequestHandlerFactory * createInterserverHTTPHandlerFactory(IServer & server, const std::string & name) +static inline HTTPRequestHandlerFactoryPtr createInterserverHTTPHandlerFactory(IServer & server, const std::string & name) { - auto factory = std::make_unique(name); + auto factory = std::make_shared(name); addCommonDefaultHandlersFactory(*factory, server); - auto main_handler = std::make_unique>(server); + auto main_handler = std::make_shared>(server); main_handler->allowPostAndGetParamsRequest(); - factory->addHandler(main_handler.release()); + factory->addHandler(main_handler); - return factory.release(); + return factory; } -Poco::Net::HTTPRequestHandlerFactory * createHandlerFactory(IServer & server, AsynchronousMetrics & async_metrics, const std::string & name) +HTTPRequestHandlerFactoryPtr createHandlerFactory(IServer & server, AsynchronousMetrics & async_metrics, const std::string & name) { if (name == "HTTPHandler-factory" || name == "HTTPSHandler-factory") return createHTTPHandlerFactory(server, name, async_metrics); @@ -146,12 +135,13 @@ Poco::Net::HTTPRequestHandlerFactory * createHandlerFactory(IServer & server, As return createInterserverHTTPHandlerFactory(server, name); else if (name == "PrometheusHandler-factory") { - auto factory = std::make_unique(name); - auto handler = std::make_unique>( + auto factory = std::make_shared(name); + auto handler = std::make_shared>( server, PrometheusMetricsWriter(server.config(), "prometheus", async_metrics)); - handler->attachStrictPath(server.config().getString("prometheus.endpoint", "/metrics"))->allowGetAndHeadRequest(); - factory->addHandler(handler.release()); - return factory.release(); + handler->attachStrictPath(server.config().getString("prometheus.endpoint", "/metrics")); + handler->allowGetAndHeadRequest(); + factory->addHandler(handler); + return factory; } throw Exception("LOGICAL ERROR: Unknown HTTP handler factory name.", ErrorCodes::LOGICAL_ERROR); @@ -162,39 +152,44 @@ static const auto root_response_expression = "config://http_server_default_respo void addCommonDefaultHandlersFactory(HTTPRequestHandlerFactoryMain & factory, IServer & server) { - auto root_handler = std::make_unique>(server, root_response_expression); - root_handler->attachStrictPath("/")->allowGetAndHeadRequest(); - factory.addHandler(root_handler.release()); - - auto ping_handler = std::make_unique>(server, ping_response_expression); - ping_handler->attachStrictPath("/ping")->allowGetAndHeadRequest(); - factory.addHandler(ping_handler.release()); - - auto replicas_status_handler = std::make_unique>(server); - replicas_status_handler->attachNonStrictPath("/replicas_status")->allowGetAndHeadRequest(); - factory.addHandler(replicas_status_handler.release()); - - auto web_ui_handler = std::make_unique>(server, "play.html"); - web_ui_handler->attachNonStrictPath("/play")->allowGetAndHeadRequest(); - factory.addHandler(web_ui_handler.release()); + auto root_handler = std::make_shared>(server, root_response_expression); + root_handler->attachStrictPath("/"); + root_handler->allowGetAndHeadRequest(); + factory.addHandler(root_handler); + + auto ping_handler = std::make_shared>(server, ping_response_expression); + ping_handler->attachStrictPath("/ping"); + ping_handler->allowGetAndHeadRequest(); + factory.addHandler(ping_handler); + + auto replicas_status_handler = std::make_shared>(server); + replicas_status_handler->attachNonStrictPath("/replicas_status"); + replicas_status_handler->allowGetAndHeadRequest(); + factory.addHandler(replicas_status_handler); + + auto web_ui_handler = std::make_shared>(server, "play.html"); + web_ui_handler->attachNonStrictPath("/play"); + web_ui_handler->allowGetAndHeadRequest(); + factory.addHandler(web_ui_handler); } void addDefaultHandlersFactory(HTTPRequestHandlerFactoryMain & factory, IServer & server, AsynchronousMetrics & async_metrics) { addCommonDefaultHandlersFactory(factory, server); - auto query_handler = std::make_unique>(server, "query"); + auto query_handler = std::make_shared>(server, "query"); query_handler->allowPostAndGetParamsRequest(); - factory.addHandler(query_handler.release()); + factory.addHandler(query_handler); /// We check that prometheus handler will be served on current (default) port. /// Otherwise it will be created separately, see createHandlerFactory(...). if (server.config().has("prometheus") && server.config().getInt("prometheus.port", 0) == 0) { - auto prometheus_handler = std::make_unique>( + auto prometheus_handler = std::make_shared>( server, PrometheusMetricsWriter(server.config(), "prometheus", async_metrics)); - prometheus_handler->attachStrictPath(server.config().getString("prometheus.endpoint", "/metrics"))->allowGetAndHeadRequest(); - factory.addHandler(prometheus_handler.release()); + prometheus_handler->attachStrictPath(server.config().getString("prometheus.endpoint", "/metrics")); + prometheus_handler->allowGetAndHeadRequest(); + factory.addHandler(prometheus_handler); } } diff --git a/src/Server/HTTPHandlerFactory.h b/src/Server/HTTPHandlerFactory.h index 3e8313172eb1a5e804260fc1639e680f504fe77c..6297f988eaa440bebbd1b3b94330d981b1c7a95c 100644 --- a/src/Server/HTTPHandlerFactory.h +++ b/src/Server/HTTPHandlerFactory.h @@ -1,82 +1,102 @@ #pragma once -#include "IServer.h" -#include -#include -#include -#include -#include -#include #include +#include +#include +#include +#include +#include + +#include namespace DB { -/// Handle request using child handlers -class HTTPRequestHandlerFactoryMain : public Poco::Net::HTTPRequestHandlerFactory, boost::noncopyable +namespace ErrorCodes { -private: - using TThis = HTTPRequestHandlerFactoryMain; + extern const int UNKNOWN_ELEMENT_IN_CONFIG; +} - Poco::Logger * log; - std::string name; +class IServer; - std::vector child_factories; +/// Handle request using child handlers +class HTTPRequestHandlerFactoryMain : public HTTPRequestHandlerFactory +{ public: + explicit HTTPRequestHandlerFactoryMain(const std::string & name_); - ~HTTPRequestHandlerFactoryMain() override; + void addHandler(HTTPRequestHandlerFactoryPtr child_factory) { child_factories.emplace_back(child_factory); } - HTTPRequestHandlerFactoryMain(const std::string & name_); + std::unique_ptr createRequestHandler(const HTTPServerRequest & request) override; - TThis * addHandler(Poco::Net::HTTPRequestHandlerFactory * child_factory); +private: + Poco::Logger * log; + std::string name; - Poco::Net::HTTPRequestHandler * createRequestHandler(const Poco::Net::HTTPServerRequest & request) override; + std::vector child_factories; }; template -class HandlingRuleHTTPHandlerFactory : public Poco::Net::HTTPRequestHandlerFactory +class HandlingRuleHTTPHandlerFactory : public HTTPRequestHandlerFactory { public: - using TThis = HandlingRuleHTTPHandlerFactory; - using Filter = std::function; + using Filter = std::function; template - HandlingRuleHTTPHandlerFactory(TArgs &&... args) + explicit HandlingRuleHTTPHandlerFactory(TArgs &&... args) { creator = [args = std::tuple(std::forward(args) ...)]() { return std::apply([&](auto && ... endpoint_args) { - return new TEndpoint(std::forward(endpoint_args)...); + return std::make_unique(std::forward(endpoint_args)...); }, std::move(args)); }; } - TThis * addFilter(Filter cur_filter) + void addFilter(Filter cur_filter) { Filter prev_filter = filter; filter = [prev_filter, cur_filter](const auto & request) { return prev_filter ? prev_filter(request) && cur_filter(request) : cur_filter(request); }; + } + + void addFiltersFromConfig(Poco::Util::AbstractConfiguration & config, const std::string & prefix) + { + Poco::Util::AbstractConfiguration::Keys filters_type; + config.keys(prefix, filters_type); - return this; + for (const auto & filter_type : filters_type) + { + if (filter_type == "handler") + continue; + else if (filter_type == "url") + addFilter(urlFilter(config, prefix + ".url")); + else if (filter_type == "headers") + addFilter(headersFilter(config, prefix + ".headers")); + else if (filter_type == "methods") + addFilter(methodsFilter(config, prefix + ".methods")); + else + throw Exception("Unknown element in config: " + prefix + "." + filter_type, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); + } } - TThis * attachStrictPath(const String & strict_path) + void attachStrictPath(const String & strict_path) { - return addFilter([strict_path](const auto & request) { return request.getURI() == strict_path; }); + addFilter([strict_path](const auto & request) { return request.getURI() == strict_path; }); } - TThis * attachNonStrictPath(const String & non_strict_path) + void attachNonStrictPath(const String & non_strict_path) { - return addFilter([non_strict_path](const auto & request) { return startsWith(request.getURI(), non_strict_path); }); + addFilter([non_strict_path](const auto & request) { return startsWith(request.getURI(), non_strict_path); }); } /// Handle GET or HEAD endpoint on specified path - TThis * allowGetAndHeadRequest() + void allowGetAndHeadRequest() { - return addFilter([](const auto & request) + addFilter([](const auto & request) { return request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET || request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD; @@ -84,35 +104,35 @@ public: } /// Handle POST or GET with params - TThis * allowPostAndGetParamsRequest() + void allowPostAndGetParamsRequest() { - return addFilter([](const auto & request) + addFilter([](const auto & request) { return request.getURI().find('?') != std::string::npos || request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST; }); } - Poco::Net::HTTPRequestHandler * createRequestHandler(const Poco::Net::HTTPServerRequest & request) override + std::unique_ptr createRequestHandler(const HTTPServerRequest & request) override { return filter(request) ? creator() : nullptr; } private: Filter filter; - std::function creator; + std::function ()> creator; }; -Poco::Net::HTTPRequestHandlerFactory * createStaticHandlerFactory(IServer & server, const std::string & config_prefix); - -Poco::Net::HTTPRequestHandlerFactory * createDynamicHandlerFactory(IServer & server, const std::string & config_prefix); +HTTPRequestHandlerFactoryPtr createStaticHandlerFactory(IServer & server, const std::string & config_prefix); -Poco::Net::HTTPRequestHandlerFactory * createPredefinedHandlerFactory(IServer & server, const std::string & config_prefix); +HTTPRequestHandlerFactoryPtr createDynamicHandlerFactory(IServer & server, const std::string & config_prefix); -Poco::Net::HTTPRequestHandlerFactory * createReplicasStatusHandlerFactory(IServer & server, const std::string & config_prefix); +HTTPRequestHandlerFactoryPtr createPredefinedHandlerFactory(IServer & server, const std::string & config_prefix); -Poco::Net::HTTPRequestHandlerFactory * createPrometheusHandlerFactory(IServer & server, AsynchronousMetrics & async_metrics, const std::string & config_prefix); +HTTPRequestHandlerFactoryPtr createReplicasStatusHandlerFactory(IServer & server, const std::string & config_prefix); -Poco::Net::HTTPRequestHandlerFactory * createHandlerFactory(IServer & server, AsynchronousMetrics & async_metrics, const std::string & name); +HTTPRequestHandlerFactoryPtr +createPrometheusHandlerFactory(IServer & server, AsynchronousMetrics & async_metrics, const std::string & config_prefix); +HTTPRequestHandlerFactoryPtr createHandlerFactory(IServer & server, AsynchronousMetrics & async_metrics, const std::string & name); } diff --git a/src/Server/HTTPHandlerRequestFilter.h b/src/Server/HTTPHandlerRequestFilter.h index f952efd7653ee549c0eaa6ee93aa26ee93003550..f0474e8b953c86a47d9ad4c868d30d564e0322dc 100644 --- a/src/Server/HTTPHandlerRequestFilter.h +++ b/src/Server/HTTPHandlerRequestFilter.h @@ -1,15 +1,17 @@ #pragma once -#include "HTTPHandlerFactory.h" +#include +#include +#include +#include +#include #include #include #include -#include #include -#include - +#include namespace DB { @@ -17,11 +19,9 @@ namespace DB namespace ErrorCodes { extern const int CANNOT_COMPILE_REGEXP; - extern const int UNKNOWN_ELEMENT_IN_CONFIG; } - -typedef std::shared_ptr CompiledRegexPtr; +using CompiledRegexPtr = std::shared_ptr; static inline bool checkRegexExpression(const StringRef & match_str, const CompiledRegexPtr & compiled_regex) { @@ -45,10 +45,10 @@ static inline auto methodsFilter(Poco::Util::AbstractConfiguration & config, con std::vector methods; Poco::StringTokenizer tokenizer(config.getString(config_path), ","); - for (auto iterator = tokenizer.begin(); iterator != tokenizer.end(); ++iterator) - methods.emplace_back(Poco::toUpper(Poco::trim(*iterator))); + for (const auto & iterator : tokenizer) + methods.emplace_back(Poco::toUpper(Poco::trim(iterator))); - return [methods](const Poco::Net::HTTPServerRequest & request) { return std::count(methods.begin(), methods.end(), request.getMethod()); }; + return [methods](const HTTPServerRequest & request) { return std::count(methods.begin(), methods.end(), request.getMethod()); }; } static inline auto getExpression(const std::string & expression) @@ -66,7 +66,7 @@ static inline auto getExpression(const std::string & expression) static inline auto urlFilter(Poco::Util::AbstractConfiguration & config, const std::string & config_path) { - return [expression = getExpression(config.getString(config_path))](const Poco::Net::HTTPServerRequest & request) + return [expression = getExpression(config.getString(config_path))](const HTTPServerRequest & request) { const auto & uri = request.getURI(); const auto & end = find_first_symbols<'?'>(uri.data(), uri.data() + uri.size()); @@ -88,7 +88,7 @@ static inline auto headersFilter(Poco::Util::AbstractConfiguration & config, con headers_expression.emplace(std::make_pair(header_name, expression)); } - return [headers_expression](const Poco::Net::HTTPServerRequest & request) + return [headers_expression](const HTTPServerRequest & request) { for (const auto & [header_name, header_expression] : headers_expression) { @@ -101,28 +101,4 @@ static inline auto headersFilter(Poco::Util::AbstractConfiguration & config, con }; } -template -static inline Poco::Net::HTTPRequestHandlerFactory * addFiltersFromConfig( - HandlingRuleHTTPHandlerFactory * factory, Poco::Util::AbstractConfiguration & config, const std::string & prefix) -{ - Poco::Util::AbstractConfiguration::Keys filters_type; - config.keys(prefix, filters_type); - - for (const auto & filter_type : filters_type) - { - if (filter_type == "handler") - continue; - else if (filter_type == "url") - factory->addFilter(urlFilter(config, prefix + ".url")); - else if (filter_type == "headers") - factory->addFilter(headersFilter(config, prefix + ".headers")); - else if (filter_type == "methods") - factory->addFilter(methodsFilter(config, prefix + ".methods")); - else - throw Exception("Unknown element in config: " + prefix + "." + filter_type, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); - } - - return factory; -} - } diff --git a/src/Server/InterserverIOHTTPHandler.cpp b/src/Server/InterserverIOHTTPHandler.cpp index 973759bedd169199c8abca4788fefeb2f0432fcf..3296da945787986974dcf01430f788f04b7f4461 100644 --- a/src/Server/InterserverIOHTTPHandler.cpp +++ b/src/Server/InterserverIOHTTPHandler.cpp @@ -1,18 +1,18 @@ -#include "InterserverIOHTTPHandler.h" +#include + +#include -#include -#include -#include -#include -#include -#include -#include #include #include -#include -#include #include -#include "IServer.h" +#include +#include +#include +#include +#include + +#include +#include namespace DB { @@ -23,7 +23,7 @@ namespace ErrorCodes extern const int TOO_MANY_SIMULTANEOUS_QUERIES; } -std::pair InterserverIOHTTPHandler::checkAuthentication(Poco::Net::HTTPServerRequest & request) const +std::pair InterserverIOHTTPHandler::checkAuthentication(HTTPServerRequest & request) const { const auto & config = server.config(); @@ -51,7 +51,7 @@ std::pair InterserverIOHTTPHandler::checkAuthentication(Poco::Net: return {"", true}; } -void InterserverIOHTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response, Output & used_output) +void InterserverIOHTTPHandler::processQuery(HTTPServerRequest & request, HTTPServerResponse & response, Output & used_output) { HTMLForm params(request); @@ -60,7 +60,7 @@ void InterserverIOHTTPHandler::processQuery(Poco::Net::HTTPServerRequest & reque String endpoint_name = params.get("endpoint"); bool compress = params.get("compress") == "true"; - ReadBufferFromIStream body(request.stream()); + auto & body = request.getStream(); auto endpoint = server.context().getInterserverIOHandler().getEndpoint(endpoint_name); /// Locked for read while query processing @@ -80,18 +80,19 @@ void InterserverIOHTTPHandler::processQuery(Poco::Net::HTTPServerRequest & reque } -void InterserverIOHTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) +void InterserverIOHTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) { setThreadName("IntersrvHandler"); /// In order to work keep-alive. - if (request.getVersion() == Poco::Net::HTTPServerRequest::HTTP_1_1) + if (request.getVersion() == HTTPServerRequest::HTTP_1_1) response.setChunkedTransferEncoding(true); Output used_output; const auto & config = server.config(); unsigned keep_alive_timeout = config.getUInt("keep_alive_timeout", 10); - used_output.out = std::make_shared(request, response, keep_alive_timeout); + used_output.out = std::make_shared( + response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout); try { @@ -102,7 +103,7 @@ void InterserverIOHTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & requ } else { - response.setStatusAndReason(Poco::Net::HTTPServerResponse::HTTP_UNAUTHORIZED); + response.setStatusAndReason(HTTPServerResponse::HTTP_UNAUTHORIZED); if (!response.sent()) writeString(message, *used_output.out); LOG_WARNING(log, "Query processing failed request: '{}' authentication failed", request.getURI()); diff --git a/src/Server/InterserverIOHTTPHandler.h b/src/Server/InterserverIOHTTPHandler.h index 8dc1962664c08845f04809a94ea3d802405d51f2..47892aa678fb16c86543249eb22dab1dc16dcb9d 100644 --- a/src/Server/InterserverIOHTTPHandler.h +++ b/src/Server/InterserverIOHTTPHandler.h @@ -1,10 +1,12 @@ #pragma once -#include -#include -#include +#include #include +#include + +#include + namespace CurrentMetrics { @@ -17,7 +19,7 @@ namespace DB class IServer; class WriteBufferFromHTTPServerResponse; -class InterserverIOHTTPHandler : public Poco::Net::HTTPRequestHandler +class InterserverIOHTTPHandler : public HTTPRequestHandler { public: explicit InterserverIOHTTPHandler(IServer & server_) @@ -26,7 +28,7 @@ public: { } - void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override; + void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override; private: struct Output @@ -39,9 +41,9 @@ private: CurrentMetrics::Increment metric_increment{CurrentMetrics::InterserverConnection}; - void processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response, Output & used_output); + void processQuery(HTTPServerRequest & request, HTTPServerResponse & response, Output & used_output); - std::pair checkAuthentication(Poco::Net::HTTPServerRequest & request) const; + std::pair checkAuthentication(HTTPServerRequest & request) const; }; } diff --git a/src/Server/NotFoundHandler.cpp b/src/Server/NotFoundHandler.cpp index 766e8895784b55b7618cbcb1ba9b1b8e507dbf32..3181708b9b775bbba157956d903737deb690d8d1 100644 --- a/src/Server/NotFoundHandler.cpp +++ b/src/Server/NotFoundHandler.cpp @@ -1,32 +1,25 @@ -#include "NotFoundHandler.h" +#include #include - #include -#include -#include - namespace DB { - -void NotFoundHandler::handleRequest( - Poco::Net::HTTPServerRequest & request, - Poco::Net::HTTPServerResponse & response) +void NotFoundHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) { try { response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_NOT_FOUND); - response.send() << "There is no handle " << request.getURI() << "\n\n" - << "Use / or /ping for health checks.\n" - << "Or /replicas_status for more sophisticated health checks.\n\n" - << "Send queries from your program with POST method or GET /?query=...\n\n" - << "Use clickhouse-client:\n\n" - << "For interactive data analysis:\n" - << " clickhouse-client\n\n" - << "For batch query processing:\n" - << " clickhouse-client --query='SELECT 1' > result\n" - << " clickhouse-client < query > result\n"; + *response.send() << "There is no handle " << request.getURI() << "\n\n" + << "Use / or /ping for health checks.\n" + << "Or /replicas_status for more sophisticated health checks.\n\n" + << "Send queries from your program with POST method or GET /?query=...\n\n" + << "Use clickhouse-client:\n\n" + << "For interactive data analysis:\n" + << " clickhouse-client\n\n" + << "For batch query processing:\n" + << " clickhouse-client --query='SELECT 1' > result\n" + << " clickhouse-client < query > result\n"; } catch (...) { diff --git a/src/Server/NotFoundHandler.h b/src/Server/NotFoundHandler.h index 7f758e49d0dd2ff2108f1e786ee40412e98aac39..749ac388c4da1b25cec2debf7bdda60b05f2a9c4 100644 --- a/src/Server/NotFoundHandler.h +++ b/src/Server/NotFoundHandler.h @@ -1,18 +1,15 @@ #pragma once -#include - +#include namespace DB { /// Response with 404 and verbose description. -class NotFoundHandler : public Poco::Net::HTTPRequestHandler +class NotFoundHandler : public HTTPRequestHandler { public: - void handleRequest( - Poco::Net::HTTPServerRequest & request, - Poco::Net::HTTPServerResponse & response) override; + void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override; }; } diff --git a/src/Server/PrometheusRequestHandler.cpp b/src/Server/PrometheusRequestHandler.cpp index 60deec9b28953c09fc09f5cfa18ec6ff6a72ad34..83cb8e85a9eda1620b523eb36c259d732f4fbf02 100644 --- a/src/Server/PrometheusRequestHandler.cpp +++ b/src/Server/PrometheusRequestHandler.cpp @@ -1,26 +1,19 @@ -#include "PrometheusRequestHandler.h" +#include #include - +#include +#include +#include +#include #include - -#include -#include -#include - #include -#include -#include -#include +#include namespace DB { - -void PrometheusRequestHandler::handleRequest( - Poco::Net::HTTPServerRequest & request, - Poco::Net::HTTPServerResponse & response) +void PrometheusRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) { try { @@ -31,7 +24,7 @@ void PrometheusRequestHandler::handleRequest( response.setContentType("text/plain; version=0.0.4; charset=UTF-8"); - auto wb = WriteBufferFromHTTPServerResponse(request, response, keep_alive_timeout); + auto wb = WriteBufferFromHTTPServerResponse(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout); metrics_writer.write(wb); wb.finalize(); } @@ -41,10 +34,13 @@ void PrometheusRequestHandler::handleRequest( } } -Poco::Net::HTTPRequestHandlerFactory * createPrometheusHandlerFactory(IServer & server, AsynchronousMetrics & async_metrics, const std::string & config_prefix) +HTTPRequestHandlerFactoryPtr +createPrometheusHandlerFactory(IServer & server, AsynchronousMetrics & async_metrics, const std::string & config_prefix) { - return addFiltersFromConfig(new HandlingRuleHTTPHandlerFactory( - server, PrometheusMetricsWriter(server.config(), config_prefix + ".handler", async_metrics)), server.config(), config_prefix); + auto factory = std::make_shared>( + server, PrometheusMetricsWriter(server.config(), config_prefix + ".handler", async_metrics)); + factory->addFiltersFromConfig(server.config(), config_prefix); + return factory; } } diff --git a/src/Server/PrometheusRequestHandler.h b/src/Server/PrometheusRequestHandler.h index 47c8adf4774d5be5aa302c08258371a54d7054ab..1fb3d9f0f5955c1a23d7d66e53921978da2da203 100644 --- a/src/Server/PrometheusRequestHandler.h +++ b/src/Server/PrometheusRequestHandler.h @@ -1,17 +1,15 @@ #pragma once -#include "IServer.h" -#include "PrometheusMetricsWriter.h" +#include -#include -#include -#include -#include +#include "PrometheusMetricsWriter.h" namespace DB { -class PrometheusRequestHandler : public Poco::Net::HTTPRequestHandler +class IServer; + +class PrometheusRequestHandler : public HTTPRequestHandler { private: IServer & server; @@ -24,9 +22,7 @@ public: { } - void handleRequest( - Poco::Net::HTTPServerRequest & request, - Poco::Net::HTTPServerResponse & response) override; + void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override; }; } diff --git a/src/Server/ReplicasStatusHandler.cpp b/src/Server/ReplicasStatusHandler.cpp index fc79ad9d134013d4f106bb2de7fb802003939d68..778f982713195f16601bff2eea80ded87ef90b3e 100644 --- a/src/Server/ReplicasStatusHandler.cpp +++ b/src/Server/ReplicasStatusHandler.cpp @@ -1,17 +1,18 @@ -#include "ReplicasStatusHandler.h" +#include +#include +#include #include +#include +#include +#include +#include #include -#include #include -#include -#include #include #include #include -#include -#include namespace DB @@ -24,7 +25,7 @@ ReplicasStatusHandler::ReplicasStatusHandler(IServer & server) } -void ReplicasStatusHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) +void ReplicasStatusHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) { try { @@ -82,7 +83,7 @@ void ReplicasStatusHandler::handleRequest(Poco::Net::HTTPServerRequest & request } if (verbose) - response.send() << message.str(); + *response.send() << message.str(); else { const char * data = "Ok.\n"; @@ -100,7 +101,7 @@ void ReplicasStatusHandler::handleRequest(Poco::Net::HTTPServerRequest & request if (!response.sent()) { /// We have not sent anything yet and we don't even know if we need to compress response. - response.send() << getCurrentExceptionMessage(false) << std::endl; + *response.send() << getCurrentExceptionMessage(false) << std::endl; } } catch (...) @@ -110,9 +111,11 @@ void ReplicasStatusHandler::handleRequest(Poco::Net::HTTPServerRequest & request } } -Poco::Net::HTTPRequestHandlerFactory * createReplicasStatusHandlerFactory(IServer & server, const std::string & config_prefix) +HTTPRequestHandlerFactoryPtr createReplicasStatusHandlerFactory(IServer & server, const std::string & config_prefix) { - return addFiltersFromConfig(new HandlingRuleHTTPHandlerFactory(server), server.config(), config_prefix); + auto factory = std::make_shared>(server); + factory->addFiltersFromConfig(server.config(), config_prefix); + return factory; } } diff --git a/src/Server/ReplicasStatusHandler.h b/src/Server/ReplicasStatusHandler.h index a32f1ba905f1c50d6cfb218d8ac016287458145f..8a790b13ad6c0174a10a63d294a059eca7dd899e 100644 --- a/src/Server/ReplicasStatusHandler.h +++ b/src/Server/ReplicasStatusHandler.h @@ -1,17 +1,15 @@ #pragma once -#include "IServer.h" - -#include - +#include namespace DB { class Context; +class IServer; /// Replies "Ok.\n" if all replicas on this server don't lag too much. Otherwise output lag information. -class ReplicasStatusHandler : public Poco::Net::HTTPRequestHandler +class ReplicasStatusHandler : public HTTPRequestHandler { private: Context & context; @@ -19,7 +17,7 @@ private: public: explicit ReplicasStatusHandler(IServer & server_); - void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override; + void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override; }; diff --git a/src/Server/StaticRequestHandler.cpp b/src/Server/StaticRequestHandler.cpp index ad2c07ab0aaac685b27d45a7ba6d41626df06bbf..f3f564c1cf8a2a4186f795d5000a25d5bd859086 100644 --- a/src/Server/StaticRequestHandler.cpp +++ b/src/Server/StaticRequestHandler.cpp @@ -9,7 +9,7 @@ #include #include #include -#include +#include #include #include @@ -32,7 +32,8 @@ namespace ErrorCodes extern const int INVALID_CONFIG_PARAMETER; } -static inline WriteBufferPtr responseWriteBuffer(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response, unsigned int keep_alive_timeout) +static inline WriteBufferPtr +responseWriteBuffer(HTTPServerRequest & request, HTTPServerResponse & response, unsigned int keep_alive_timeout) { /// The client can pass a HTTP header indicating supported compression method (gzip or deflate). String http_response_compression_methods = request.get("Accept-Encoding", ""); @@ -55,12 +56,15 @@ static inline WriteBufferPtr responseWriteBuffer(Poco::Net::HTTPServerRequest & bool client_supports_http_compression = http_response_compression_method != CompressionMethod::None; return std::make_shared( - request, response, keep_alive_timeout, client_supports_http_compression, http_response_compression_method); + response, + request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, + keep_alive_timeout, + client_supports_http_compression, + http_response_compression_method); } static inline void trySendExceptionToClient( - const std::string & s, int exception_code, - Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response , WriteBuffer & out) + const std::string & s, int exception_code, HTTPServerRequest & request, HTTPServerResponse & response, WriteBuffer & out) { try { @@ -69,13 +73,13 @@ static inline void trySendExceptionToClient( /// If HTTP method is POST and Keep-Alive is turned on, we should read the whole request body /// to avoid reading part of the current request body in the next request. if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST - && response.getKeepAlive() && !request.stream().eof() && exception_code != ErrorCodes::HTTP_LENGTH_REQUIRED) - request.stream().ignore(std::numeric_limits::max()); + && response.getKeepAlive() && !request.getStream().eof() && exception_code != ErrorCodes::HTTP_LENGTH_REQUIRED) + request.getStream().ignore(std::numeric_limits::max()); response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); if (!response.sent()) - response.send() << s << std::endl; + *response.send() << s << std::endl; else { if (out.count() != out.offset()) @@ -94,7 +98,7 @@ static inline void trySendExceptionToClient( } } -void StaticRequestHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) +void StaticRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) { auto keep_alive_timeout = server.config().getUInt("keep_alive_timeout", 10); const auto & out = responseWriteBuffer(request, response, keep_alive_timeout); @@ -159,14 +163,17 @@ StaticRequestHandler::StaticRequestHandler(IServer & server_, const String & exp { } -Poco::Net::HTTPRequestHandlerFactory * createStaticHandlerFactory(IServer & server, const std::string & config_prefix) +HTTPRequestHandlerFactoryPtr createStaticHandlerFactory(IServer & server, const std::string & config_prefix) { int status = server.config().getInt(config_prefix + ".handler.status", 200); std::string response_content = server.config().getRawString(config_prefix + ".handler.response_content", "Ok.\n"); std::string response_content_type = server.config().getString(config_prefix + ".handler.content_type", "text/plain; charset=UTF-8"); + auto factory = std::make_shared>( + server, std::move(response_content), std::move(status), std::move(response_content_type)); - return addFiltersFromConfig(new HandlingRuleHTTPHandlerFactory( - server, std::move(response_content), std::move(status), std::move(response_content_type)), server.config(), config_prefix); + factory->addFiltersFromConfig(server.config(), config_prefix); + + return factory; } } diff --git a/src/Server/StaticRequestHandler.h b/src/Server/StaticRequestHandler.h index 0a29384ad0eca2720d584335be0e7dd7b11b0ba6..56c7f5a6d44510f21b8ad32b9afbe9712d2ddbc9 100644 --- a/src/Server/StaticRequestHandler.h +++ b/src/Server/StaticRequestHandler.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include @@ -11,7 +11,7 @@ class IServer; class WriteBuffer; /// Response with custom string. Can be used for browser. -class StaticRequestHandler : public Poco::Net::HTTPRequestHandler +class StaticRequestHandler : public HTTPRequestHandler { private: IServer & server; @@ -29,7 +29,7 @@ public: void writeResponse(WriteBuffer & out); - void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override; + void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override; }; } diff --git a/src/Server/WebUIRequestHandler.cpp b/src/Server/WebUIRequestHandler.cpp index 6159a27971fdcdc913ec5a79ffb31795c8a1acca..fb8ff71611e3916cd853267f86a8d6d54b7a9647 100644 --- a/src/Server/WebUIRequestHandler.cpp +++ b/src/Server/WebUIRequestHandler.cpp @@ -18,18 +18,18 @@ WebUIRequestHandler::WebUIRequestHandler(IServer & server_, std::string resource } -void WebUIRequestHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) +void WebUIRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) { auto keep_alive_timeout = server.config().getUInt("keep_alive_timeout", 10); response.setContentType("text/html; charset=UTF-8"); - if (request.getVersion() == Poco::Net::HTTPServerRequest::HTTP_1_1) + if (request.getVersion() == HTTPServerRequest::HTTP_1_1) response.setChunkedTransferEncoding(true); setResponseDefaultHeaders(response, keep_alive_timeout); response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_OK); - response.send() << getResource(resource_name); + *response.send() << getResource(resource_name); } } diff --git a/src/Server/WebUIRequestHandler.h b/src/Server/WebUIRequestHandler.h index 3066b86b36a21d1306772fc074e1bbbeec73712a..1c52b626091481c9ce90e339dfd1afe79a8e2451 100644 --- a/src/Server/WebUIRequestHandler.h +++ b/src/Server/WebUIRequestHandler.h @@ -1,6 +1,6 @@ #pragma once -#include +#include namespace DB @@ -9,14 +9,14 @@ namespace DB class IServer; /// Response with HTML page that allows to send queries and show results in browser. -class WebUIRequestHandler : public Poco::Net::HTTPRequestHandler +class WebUIRequestHandler : public HTTPRequestHandler { private: IServer & server; std::string resource_name; public: WebUIRequestHandler(IServer & server_, std::string resource_name_); - void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override; + void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override; }; } diff --git a/src/Server/ya.make b/src/Server/ya.make index a0269e9ac84d35015ce4f08750348f49035266b8..ef5ef6d5f572b47b5bc71c11dd2b817b3d227937 100644 --- a/src/Server/ya.make +++ b/src/Server/ya.make @@ -11,6 +11,14 @@ PEERDIR( SRCS( GRPCServer.cpp + HTTP/HTMLForm.cpp + HTTP/HTTPServer.cpp + HTTP/HTTPServerConnection.cpp + HTTP/HTTPServerConnectionFactory.cpp + HTTP/HTTPServerRequest.cpp + HTTP/HTTPServerResponse.cpp + HTTP/ReadHeaders.cpp + HTTP/WriteBufferFromHTTPServerResponse.cpp HTTPHandler.cpp HTTPHandlerFactory.cpp InterserverIOHTTPHandler.cpp diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index e01e7793dd36e00423c04d1c4dc54df31cad1917..f80020991b02cb873b1a2978d70ea688978791d5 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -1,17 +1,20 @@ #include + +#include +#include +#include +#include +#include +#include #include #include -#include -#include +#include #include #include -#include -#include #include + #include -#include #include -#include namespace CurrentMetrics @@ -83,7 +86,7 @@ std::string Service::getId(const std::string & node_id) const return getEndpointId(node_id); } -void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*body*/, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) +void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, WriteBuffer & out, HTTPServerResponse & response) { int client_protocol_version = parse(params.get("client_protocol_version", "0")); diff --git a/src/Storages/MergeTree/DataPartsExchange.h b/src/Storages/MergeTree/DataPartsExchange.h index 0a359474d2da99ca6be625a659d8c834e48d132b..834fed1182fd026182eb1d92385d96bdc0c9aa22 100644 --- a/src/Storages/MergeTree/DataPartsExchange.h +++ b/src/Storages/MergeTree/DataPartsExchange.h @@ -20,21 +20,19 @@ namespace DataPartsExchange class Service final : public InterserverIOEndpoint { public: - Service(MergeTreeData & data_) - : data(data_), log(&Poco::Logger::get(data.getLogName() + " (Replicated PartsService)")) {} + explicit Service(MergeTreeData & data_) : data(data_), log(&Poco::Logger::get(data.getLogName() + " (Replicated PartsService)")) {} Service(const Service &) = delete; Service & operator=(const Service &) = delete; std::string getId(const std::string & node_id) const override; - void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) override; + void processQuery(const HTMLForm & params, ReadBuffer & body, WriteBuffer & out, HTTPServerResponse & response) override; private: MergeTreeData::DataPartPtr findPart(const String & name); void sendPartFromMemory(const MergeTreeData::DataPartPtr & part, WriteBuffer & out); void sendPartFromDisk(const MergeTreeData::DataPartPtr & part, WriteBuffer & out, int client_protocol_version); -private: /// StorageReplicatedMergeTree::shutdown() waits for all parts exchange handlers to finish, /// so Service will never access dangling reference to storage MergeTreeData & data; @@ -43,13 +41,10 @@ private: /** Client for getting the parts from the table *MergeTree. */ -class Fetcher final +class Fetcher final : private boost::noncopyable { public: - Fetcher(MergeTreeData & data_) : data(data_), log(&Poco::Logger::get("Fetcher")) {} - - Fetcher(const Fetcher &) = delete; - Fetcher & operator=(const Fetcher &) = delete; + explicit Fetcher(MergeTreeData & data_) : data(data_), log(&Poco::Logger::get("Fetcher")) {} /// Downloads a part to tmp_directory. If to_detached - downloads to the `detached` directory. MergeTreeData::MutableDataPartPtr fetchPart( @@ -75,7 +70,7 @@ private: bool to_detached, const String & tmp_prefix_, bool sync, - const ReservationPtr reservation, + ReservationPtr reservation, PooledReadWriteBufferFromHTTP & in); MergeTreeData::MutableDataPartPtr downloadPartToMemory( diff --git a/tests/queries/query_test.py b/tests/queries/query_test.py index 3dea639187ef62778602bdd192cf5f2e2f49db82..417a51fe5237b4f1802c18cced8b4ca2897817ca 100644 --- a/tests/queries/query_test.py +++ b/tests/queries/query_test.py @@ -33,7 +33,7 @@ SKIP_LIST = [ "01057_http_compression_prefer_brotli", "01080_check_for_error_incorrect_size_of_nested_column", "01083_expressions_in_engine_arguments", - "01086_odbc_roundtrip", + # "01086_odbc_roundtrip", "01088_benchmark_query_id", "01098_temporary_and_external_tables", "01099_parallel_distributed_insert_select",