diff --git a/dbms/src/Interpreters/CustomHTTP/HTTPMatchExecutor.cpp b/dbms/src/Interpreters/CustomHTTP/HTTPMatchExecutor.cpp new file mode 100644 index 0000000000000000000000000000000000000000..88890755955ea188276646f6cc72e90767d176e7 --- /dev/null +++ b/dbms/src/Interpreters/CustomHTTP/HTTPMatchExecutor.cpp @@ -0,0 +1,259 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int SYNTAX_ERROR; + extern const int LOGICAL_ERROR; + extern const int REQUIRED_PASSWORD; + extern const int INVALID_SESSION_TIMEOUT; + extern const int UNKNOWN_COMPRESSION_METHOD; +} + + +namespace +{ + duration parseSessionTimeout(const HTMLForm & params, size_t default_session_timeout, size_t max_session_timeout) + { + size_t session_timeout = default_session_timeout; + + if (params.has("session_timeout")) + { + std::string session_timeout_str = params.get("session_timeout"); + + ReadBufferFromString buf(session_timeout_str); + if (!tryReadIntText(session_timeout, buf) || !buf.eof()) + throw Exception("Invalid session timeout: '" + session_timeout_str + "'", ErrorCodes::INVALID_SESSION_TIMEOUT); + + if (session_timeout > max_session_timeout) + throw Exception( + "Session timeout '" + session_timeout_str + "' is larger than max_session_timeout: " + toString(max_session_timeout) + + ". Maximum session timeout could be modified in configuration file.", ErrorCodes::INVALID_SESSION_TIMEOUT); + } + + return std::chrono::seconds(session_timeout); + } +} + + +void HTTPMatchExecutor::execute(Context & context, HTTPServerRequest & request, HTTPServerResponse & response, HTMLForm & params, HTTPStreamsWithOutput & used_output) const +{ + authentication(context, request, params); + + std::shared_ptr session_context; + String session_id = params.get("session_id", ""); + duration session_timeout = parseSessionTimeout(params, 1800, 3600); + + SCOPE_EXIT({ detachSessionContext(session_context, session_id, session_timeout); }); + session_context = attachSessionContext(context, params, session_id, session_timeout); + + initClientInfo(context, request); + used_output.attachRequestAndResponse(context, request, response, params, /* TODO: keep_alive_time_out */ 0); + + HTTPStreamsWithInput used_input(request, params); + collectParamsAndApplySettings(request, params, context); + + Settings & query_settings = context.getSettingsRef(); + used_input.attachSettings(context, query_settings, request); + used_output.attachSettings(context, query_settings, request); + + String execute_query_string = getExecuteQuery(params); + ReadBufferPtr query_in_buffer = std::make_shared(execute_query_string); + + ReadBufferPtr in = query_in_buffer; + if (!needParsePostBody(request, params) || !context.getExternalTables().empty()) + in = std::make_shared(*query_in_buffer, *used_input.in_maybe_internal_compressed); + + executeQuery(*in, *used_output.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context, + [&response] (const String & content_type) { response.setContentType(content_type); }, + [&response] (const String & current_query_id) { response.add("X-ClickHouse-Query-Id", current_query_id); }); + + used_output.finalize(); +} + +void HTTPMatchExecutor::initClientInfo(Context & context, HTTPServerRequest & request) const +{ + + ClientInfo & client_info = context.getClientInfo(); + client_info.query_kind = ClientInfo::QueryKind::INITIAL_QUERY; + client_info.interface = ClientInfo::Interface::HTTP; + + /// Query sent through HTTP interface is initial. + client_info.initial_user = client_info.current_user; + client_info.initial_query_id = client_info.current_query_id; + client_info.initial_address = client_info.current_address; + + ClientInfo::HTTPMethod http_method = ClientInfo::HTTPMethod::UNKNOWN; + if (request.getMethod() == Poco::Net::HTTPServerRequest::HTTP_GET) + http_method = ClientInfo::HTTPMethod::GET; + else if (request.getMethod() == Poco::Net::HTTPServerRequest::HTTP_POST) + http_method = ClientInfo::HTTPMethod::POST; + + client_info.http_method = http_method; + client_info.http_user_agent = request.get("User-Agent", ""); +} + +void HTTPMatchExecutor::authentication(Context & context, HTTPServerRequest & request, HTMLForm & params) const +{ + auto user = request.get("X-ClickHouse-User", ""); + auto password = request.get("X-ClickHouse-Key", ""); + auto quota_key = request.get("X-ClickHouse-Quota", ""); + + if (user.empty() && password.empty() && quota_key.empty()) + { + /// User name and password can be passed using query parameters + /// or using HTTP Basic auth (both methods are insecure). + if (request.hasCredentials()) + { + Poco::Net::HTTPBasicCredentials credentials(request); + + user = credentials.getUsername(); + password = credentials.getPassword(); + } + else + { + user = params.get("user", "default"); + password = params.get("password", ""); + } + + quota_key = params.get("quota_key", ""); + } + else + { + /// It is prohibited to mix different authorization schemes. + if (request.hasCredentials() + || params.has("user") + || params.has("password") + || params.has("quota_key")) + { + throw Exception("Invalid authentication: it is not allowed to use X-ClickHouse HTTP headers and other authentication methods simultaneously", ErrorCodes::REQUIRED_PASSWORD); + } + } + + std::string query_id = params.get("query_id", ""); + context.setUser(user, password, request.clientAddress(), quota_key); + context.setCurrentQueryId(query_id); +} + +std::shared_ptr HTTPMatchExecutor::attachSessionContext( + Context & context, HTMLForm & params, const String & session_id, const duration & session_timeout) const +{ + if (!session_id.empty()) + { + std::string session_check = params.get("session_check", ""); + auto session_context = context.acquireSession(session_id, session_timeout, session_check == "1"); + + context = *session_context; + context.setSessionContext(*session_context); + return session_context; + } + return {}; +} + +void HTTPMatchExecutor::detachSessionContext(std::shared_ptr & session_context, const String & session_id, const duration & session_timeout) const +{ + if (session_context) + session_context->releaseSession(session_id, session_timeout); +} + +void HTTPMatchExecutor::collectParamsAndApplySettings(HTTPServerRequest & request, HTMLForm & params, Context & context) const +{ + static const NameSet reserved_param_names{ + "compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace", "buffer_size", "wait_end_of_query", + "session_id", "session_timeout", "session_check"}; + + Names reserved_param_suffixes; + + auto param_could_be_skipped = [&] (const String & name) + { + if (reserved_param_names.count(name)) + return true; + + for (const String & suffix : reserved_param_suffixes) + { + if (endsWith(name, suffix)) + return true; + } + + return false; + }; + + /// Settings can be overridden in the query. + /// Some parameters (database, default_format, everything used in the code above) do not + /// belong to the Settings class. + + /// 'readonly' setting values mean: + /// readonly = 0 - any query is allowed, client can change any setting. + /// readonly = 1 - only readonly queries are allowed, client can't change settings. + /// readonly = 2 - only readonly queries are allowed, client can change any setting except 'readonly'. + + /// In theory if initially readonly = 0, the client can change any setting and then set readonly + /// to some other value. + /// Only readonly queries are allowed for HTTP GET requests. + if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET) + { + Settings & settings = context.getSettingsRef(); + + if (settings.readonly == 0) + settings.readonly = 2; + } + + bool has_multipart = startsWith(request.getContentType().data(), "multipart/form-data"); + + if (has_multipart || needParsePostBody(request, params)) + { + ExternalTablesHandler handler(context, params); + params.load(request, request.stream(), handler); + + if (has_multipart) + { + /// Skip unneeded parameters to avoid confusing them later with context settings or query parameters. + reserved_param_suffixes.reserve(3); + /// It is a bug and ambiguity with `date_time_input_format` and `low_cardinality_allow_in_native_format` formats/settings. + reserved_param_suffixes.emplace_back("_format"); + reserved_param_suffixes.emplace_back("_types"); + reserved_param_suffixes.emplace_back("_structure"); + } + } + + SettingsChanges settings_changes; + for (const auto & [key, value] : params) + { + if (key == "database") + context.setCurrentDatabase(value); + else if (key == "default_format") + context.setDefaultFormat(value); + else if (!param_could_be_skipped(key) && !acceptQueryParam(context, key, value)) + settings_changes.push_back({key, value}); /// All other query parameters are treated as settings. + } + + /// For external data we also want settings + context.checkSettingsConstraints(settings_changes); + context.applySettingsChanges(settings_changes); +} + +} diff --git a/dbms/src/Interpreters/CustomHTTP/HTTPMatchExecutor.h b/dbms/src/Interpreters/CustomHTTP/HTTPMatchExecutor.h new file mode 100644 index 0000000000000000000000000000000000000000..425605aec07ba837dbd8b6c4c3ee6cae27e73975 --- /dev/null +++ b/dbms/src/Interpreters/CustomHTTP/HTTPMatchExecutor.h @@ -0,0 +1,55 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +class Context; +class HTTPMatchExecutor; +struct HTTPStreamsWithInput; +struct HTTPStreamsWithOutput; + +using duration = std::chrono::steady_clock::duration; +using HTTPMatchExecutorPtr = std::shared_ptr; + +class HTTPMatchExecutor +{ +public: + using HTTPServerRequest = Poco::Net::HTTPServerRequest; + using HTTPServerResponse = Poco::Net::HTTPServerResponse; + + bool match(HTTPServerRequest & request, HTMLForm & params) const { return matchImpl(request, params); }; + + void execute(Context & context, HTTPServerRequest & request, HTTPServerResponse & response, HTMLForm & params, HTTPStreamsWithOutput & used_output) const; + + virtual ~HTTPMatchExecutor() = default; +protected: + + virtual bool matchImpl(HTTPServerRequest & request, HTMLForm & params) const = 0; + + virtual String getExecuteQuery(HTMLForm & params) const = 0; + + virtual bool needParsePostBody(HTTPServerRequest & request, HTMLForm & params) const = 0; + + virtual bool acceptQueryParam(Context & context, const String & key, const String & value) const = 0; + + void initClientInfo(Context & context, HTTPServerRequest & request) const; + + void authentication(Context & context, HTTPServerRequest & request, HTMLForm & params) const; + + void detachSessionContext(std::shared_ptr & context, const String & session_id, const duration & session_timeout) const; + + std::shared_ptr attachSessionContext(Context & context, HTMLForm & params, const String & session_id, const duration & session_timeout) const; + + void collectParamsAndApplySettings(HTTPServerRequest & request, HTMLForm & params, Context & context) const; + +}; + +} diff --git a/dbms/src/Interpreters/CustomHTTP/HTTPMatchExecutorDefault.h b/dbms/src/Interpreters/CustomHTTP/HTTPMatchExecutorDefault.h new file mode 100644 index 0000000000000000000000000000000000000000..54ea20120f5172e05c2e42330f7e9e5447ad48af --- /dev/null +++ b/dbms/src/Interpreters/CustomHTTP/HTTPMatchExecutorDefault.h @@ -0,0 +1,46 @@ +#pragma once + +#include +#include +#include +#include + + +namespace DB +{ + +class HTTPMatchExecutorDefault : public HTTPMatchExecutor +{ +protected: + + bool matchImpl(HTTPServerRequest & /*request*/, HTMLForm & /*params*/) const override { return true; } + + bool needParsePostBody(HTTPServerRequest & /*request*/, HTMLForm & /*params*/) const override { return false; } + + String getExecuteQuery(HTMLForm & params) const override + { + String execute_query; + for (const auto & [key, value] : params) + { + if (key == "query") + execute_query += value; + } + + return execute_query; + } + + bool acceptQueryParam(Context &context, const String &key, const String &value) const override + { + if (startsWith(key, "param_")) + { + /// Save name and values of substitution in dictionary. + context.setQueryParameter(key.substr(strlen("param_")), value); + return true; + } + + return key == "query"; + } + +}; + +} diff --git a/dbms/src/Interpreters/CustomHTTP/HTTPStreamsWithInput.cpp b/dbms/src/Interpreters/CustomHTTP/HTTPStreamsWithInput.cpp new file mode 100644 index 0000000000000000000000000000000000000000..6ff17217f241139e84bcd923e40b889ef1acade6 --- /dev/null +++ b/dbms/src/Interpreters/CustomHTTP/HTTPStreamsWithInput.cpp @@ -0,0 +1,73 @@ +#include + +#include +#include +#include +#include "HTTPStreamsWithInput.h" + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int UNKNOWN_COMPRESSION_METHOD; +} + +HTTPStreamsWithInput::HTTPStreamsWithInput(HTTPServerRequest & request, HTMLForm & from) + : in(createRawInBuffer(request)) + , in_maybe_compressed(createCompressedBuffer(request, in)) + , in_maybe_internal_compressed(createInternalCompressedBuffer(from, in_maybe_compressed)) +{ +} + +ReadBufferPtr HTTPStreamsWithInput::createRawInBuffer(HTTPServerRequest & request) const +{ + return std::make_unique(request.stream()); +} + +ReadBufferPtr HTTPStreamsWithInput::createCompressedBuffer(HTTPServerRequest & request, ReadBufferPtr & raw_buffer) const +{ + /// Request body can be compressed using algorithm specified in the Content-Encoding header. + String http_compressed_method = request.get("Content-Encoding", ""); + + if (!http_compressed_method.empty()) + { + if (http_compressed_method == "gzip") + return std::make_shared(*raw_buffer, CompressionMethod::Gzip); + else if (http_compressed_method == "deflate") + return std::make_shared(*raw_buffer, CompressionMethod::Zlib); +#if USE_BROTLI + else if (http_compressed_method == "br") + return std::make_shared(*raw_buffer); +#endif + else + throw Exception("Unknown Content-Encoding of HTTP request: " + http_compressed_method, ErrorCodes::UNKNOWN_COMPRESSION_METHOD); + } + + return raw_buffer; +} + +ReadBufferPtr HTTPStreamsWithInput::createInternalCompressedBuffer(HTMLForm & params, ReadBufferPtr & http_maybe_encoding_buffer) const +{ + /// The data can also be compressed using incompatible internal algorithm. This is indicated by + /// 'decompress' query parameter. + std::unique_ptr in_post_maybe_compressed; + if (params.getParsed("decompress", false)) + return std::make_unique(*http_maybe_encoding_buffer); + + return http_maybe_encoding_buffer; +} + +void HTTPStreamsWithInput::attachSettings(Context & /*context*/, Settings & settings, HTTPServerRequest & /*request*/) +{ + /// If 'http_native_compression_disable_checksumming_on_decompress' setting is turned on, + /// checksums of client data compressed with internal algorithm are not checked. + if (settings.http_native_compression_disable_checksumming_on_decompress) + { + if(CompressedReadBuffer * compressed_buffer = typeid_cast(in_maybe_internal_compressed.get())) + compressed_buffer->disableChecksumming(); + } +} + +} diff --git a/dbms/src/Interpreters/CustomHTTP/HTTPStreamsWithInput.h b/dbms/src/Interpreters/CustomHTTP/HTTPStreamsWithInput.h new file mode 100644 index 0000000000000000000000000000000000000000..2c3bd1c2a35ecf1a23c46a988288dbc59144d920 --- /dev/null +++ b/dbms/src/Interpreters/CustomHTTP/HTTPStreamsWithInput.h @@ -0,0 +1,29 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace DB +{ + +using HTTPServerRequest = Poco::Net::HTTPServerRequest; + +struct HTTPStreamsWithInput +{ + std::shared_ptr in; + std::shared_ptr in_maybe_compressed; + std::shared_ptr in_maybe_internal_compressed; + + HTTPStreamsWithInput(HTTPServerRequest & request, HTMLForm & from); + + void attachSettings(Context & context, Settings & settings, HTTPServerRequest & request); + + ReadBufferPtr createRawInBuffer(HTTPServerRequest & request) const; + ReadBufferPtr createCompressedBuffer(HTTPServerRequest & request, ReadBufferPtr & raw_buffer) const; + ReadBufferPtr createInternalCompressedBuffer(HTMLForm & params, ReadBufferPtr & http_maybe_encoding_buffer) const; +}; + +} \ No newline at end of file diff --git a/dbms/src/Interpreters/CustomHTTP/HTTPStreamsWithOutput.cpp b/dbms/src/Interpreters/CustomHTTP/HTTPStreamsWithOutput.cpp new file mode 100644 index 0000000000000000000000000000000000000000..35426315eb31941031332196eac22e19eddac7cf --- /dev/null +++ b/dbms/src/Interpreters/CustomHTTP/HTTPStreamsWithOutput.cpp @@ -0,0 +1,217 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +namespace +{ + inline void listeningProgress(Context & context, ProgressCallback listener) + { + auto prev = context.getProgressCallback(); + context.setProgressCallback([prev, listener] (const Progress & progress) + { + if (prev) + prev(progress); + + listener(progress); + }); + } + + inline ProgressCallback cancelListener(Context & context, Poco::Net::StreamSocket & socket) + { + /// Assume that at the point this method is called no one is reading data from the socket any more. + /// True for read-only queries. + return [&context, &socket](const Progress &) + { + try + { + char b; + int status = socket.receiveBytes(&b, 1, MSG_DONTWAIT | MSG_PEEK); + if (status == 0) + context.killCurrentQuery(); + } + catch (Poco::TimeoutException &) + { + } + catch (...) + { + context.killCurrentQuery(); + } + }; + } +} + +void HTTPStreamsWithOutput::attachSettings(Context & context, Settings & settings, HTTPServerRequest & request) +{ + /// HTTP response compression is turned on only if the client signalled that they support it + /// (using Accept-Encoding header) and 'enable_http_compression' setting is turned on. + out->setCompression(out->getCompression() && settings.enable_http_compression); + if (out->getCompression()) + out->setCompressionLevel(settings.http_zlib_compression_level); + + out->setSendProgressInterval(settings.http_headers_progress_interval_ms); + + /// Add CORS header if 'add_http_cors_header' setting is turned on and the client passed + /// Origin header. + out->addHeaderCORS(settings.add_http_cors_header && !request.get("Origin", "").empty()); + + /// While still no data has been sent, we will report about query execution progress by sending HTTP headers. + if (settings.send_progress_in_http_headers) + listeningProgress(context, [this] (const Progress & progress) { out->onProgress(progress); }); + + if (settings.readonly > 0 && settings.cancel_http_readonly_queries_on_client_close) + { + Poco::Net::StreamSocket & socket = dynamic_cast(request).socket(); + + listeningProgress(context, cancelListener(context, socket)); + } +} + +void HTTPStreamsWithOutput::attachRequestAndResponse( + Context & context, HTTPServerRequest & request, HTTPServerResponse & response, HTMLForm & form, size_t keep_alive_timeout) +{ + out = createEndpoint(request, response, keep_alive_timeout); + out_maybe_compressed = createMaybeCompressionEndpoint(form, out); + out_maybe_delayed_and_compressed = createMaybeDelayedAndCompressionEndpoint(context, form, out_maybe_compressed); +} + +std::shared_ptr HTTPStreamsWithOutput::createEndpoint( + HTTPServerRequest & request, HTTPServerResponse & response, size_t keep_alive_timeout) +{ + /// The client can pass a HTTP header indicating supported compression method (gzip or deflate). + String http_response_compression_methods = request.get("Accept-Encoding", ""); + + if (!http_response_compression_methods.empty()) + { + /// Both gzip and deflate are supported. If the client supports both, gzip is preferred. + /// NOTE parsing of the list of methods is slightly incorrect. + if (std::string::npos != http_response_compression_methods.find("gzip")) + return std::make_shared( + request, response, keep_alive_timeout, true, CompressionMethod::Gzip, DBMS_DEFAULT_BUFFER_SIZE); + else if (std::string::npos != http_response_compression_methods.find("deflate")) + return std::make_shared( + request, response, keep_alive_timeout, true, CompressionMethod::Zlib, DBMS_DEFAULT_BUFFER_SIZE); +#if USE_BROTLI + else if (http_response_compression_methods == "br") + return std::make_shared( + request, response, keep_alive_timeout, true, CompressionMethod::Brotli, DBMS_DEFAULT_BUFFER_SIZE); +#endif + } + + return std::make_shared( + request, response, keep_alive_timeout, false, CompressionMethod{}, DBMS_DEFAULT_BUFFER_SIZE); +} + +WriteBufferPtr HTTPStreamsWithOutput::createMaybeCompressionEndpoint(HTMLForm & form, std::shared_ptr & endpoint) +{ + /// Client can pass a 'compress' flag in the query string. In this case the query result is + /// compressed using internal algorithm. This is not reflected in HTTP headers. + bool internal_compression = form.getParsed("compress", false); + return internal_compression ? std::make_shared(*endpoint) : WriteBufferPtr(endpoint); +} + +WriteBufferPtr HTTPStreamsWithOutput::createMaybeDelayedAndCompressionEndpoint(Context & context, HTMLForm & form, WriteBufferPtr & endpoint) +{ + /// If it is specified, the whole result will be buffered. + /// First ~buffer_size bytes will be buffered in memory, the remaining bytes will be stored in temporary file. + bool buffer_until_eof = form.getParsed("wait_end_of_query", false); + + /// At least, we should postpone sending of first buffer_size result bytes + size_t buffer_size_total = std::max(form.getParsed("buffer_size", DBMS_DEFAULT_BUFFER_SIZE), static_cast(DBMS_DEFAULT_BUFFER_SIZE)); + + size_t buffer_size_memory = (buffer_size_total > DBMS_DEFAULT_BUFFER_SIZE) ? buffer_size_total : 0; + + if (buffer_size_memory > 0 || buffer_until_eof) + { + CascadeWriteBuffer::WriteBufferPtrs cascade_buffer1; + CascadeWriteBuffer::WriteBufferConstructors cascade_buffer2; + + if (buffer_size_memory > 0) + cascade_buffer1.emplace_back(std::make_shared(buffer_size_memory)); + + if (buffer_until_eof) + { + std::string tmp_path_template = context.getTemporaryPath() + "http_buffers/"; + + auto create_tmp_disk_buffer = [tmp_path_template] (const WriteBufferPtr &) + { + return WriteBufferFromTemporaryFile::create(tmp_path_template); + }; + + cascade_buffer2.emplace_back(std::move(create_tmp_disk_buffer)); + } + else + { + auto push_memory_buffer_and_continue = [next_buffer = endpoint] (const WriteBufferPtr & prev_buf) + { + auto prev_memory_buffer = typeid_cast(prev_buf.get()); + if (!prev_memory_buffer) + throw Exception("Expected MemoryWriteBuffer", ErrorCodes::LOGICAL_ERROR); + + auto rdbuf = prev_memory_buffer->tryGetReadBuffer(); + copyData(*rdbuf , *next_buffer); + + return next_buffer; + }; + + cascade_buffer2.emplace_back(push_memory_buffer_and_continue); + } + + return std::make_shared(std::move(cascade_buffer1), std::move(cascade_buffer2)); + } + + return endpoint; +} + +void HTTPStreamsWithOutput::finalize() const +{ + if (out_maybe_delayed_and_compressed != out_maybe_compressed) + { + /// TODO: set Content-Length if possible + std::vector write_buffers; + std::vector read_buffers; + std::vector read_buffers_raw_ptr; + + auto cascade_buffer = typeid_cast(out_maybe_delayed_and_compressed.get()); + if (!cascade_buffer) + throw Exception("Expected CascadeWriteBuffer", ErrorCodes::LOGICAL_ERROR); + + cascade_buffer->getResultBuffers(write_buffers); + + if (write_buffers.empty()) + throw Exception("At least one buffer is expected to overwrite result into HTTP response", ErrorCodes::LOGICAL_ERROR); + + for (auto & write_buf : write_buffers) + { + IReadableWriteBuffer * write_buf_concrete; + ReadBufferPtr reread_buf; + + if (write_buf + && (write_buf_concrete = dynamic_cast(write_buf.get())) + && (reread_buf = write_buf_concrete->tryGetReadBuffer())) + { + read_buffers.emplace_back(reread_buf); + read_buffers_raw_ptr.emplace_back(reread_buf.get()); + } + } + + ConcatReadBuffer concat_read_buffer(read_buffers_raw_ptr); + copyData(concat_read_buffer, *out_maybe_compressed); + } + + /// Send HTTP headers with code 200 if no exception happened and the data is still not sent to + /// the client. + out->finalize(); +} + +} diff --git a/dbms/src/Interpreters/CustomHTTP/HTTPStreamsWithOutput.h b/dbms/src/Interpreters/CustomHTTP/HTTPStreamsWithOutput.h new file mode 100644 index 0000000000000000000000000000000000000000..65d87b6744ad7659fc05675b16ce5b42d275f590 --- /dev/null +++ b/dbms/src/Interpreters/CustomHTTP/HTTPStreamsWithOutput.h @@ -0,0 +1,48 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +using HTTPServerRequest = Poco::Net::HTTPServerRequest; +using HTTPServerResponse = Poco::Net::HTTPServerResponse; + + +/* Raw data + * ↓ + * CascadeWriteBuffer out_maybe_delayed_and_compressed (optional) + * ↓ (forwards data if an overflow is occur or explicitly via pushDelayedResults) + * CompressedWriteBuffer out_maybe_compressed (optional) + * ↓ + * WriteBufferFromHTTPServerResponse out + */ +struct HTTPStreamsWithOutput +{ + using HTTPResponseBufferPtr = std::shared_ptr; + + HTTPResponseBufferPtr out; + /// Points to 'out' or to CompressedWriteBuffer(*out), depending on settings. + std::shared_ptr out_maybe_compressed; + /// Points to 'out' or to CompressedWriteBuffer(*out) or to CascadeWriteBuffer. + std::shared_ptr out_maybe_delayed_and_compressed; + + void finalize() const; + + WriteBufferPtr createMaybeDelayedAndCompressionEndpoint(Context & context, HTMLForm & form, WriteBufferPtr & endpoint); + + WriteBufferPtr createMaybeCompressionEndpoint(HTMLForm & form, std::shared_ptr & endpoint); + + HTTPResponseBufferPtr createEndpoint(HTTPServerRequest & request, HTTPServerResponse & response, size_t keep_alive_timeout); + + void attachSettings(Context & context, Settings & settings, HTTPServerRequest & request); + + void attachRequestAndResponse(Context & context, HTTPServerRequest & request, HTTPServerResponse & response, HTMLForm & form, size_t keep_alive_timeout); +}; + +} diff --git a/programs/server/HTTPHandler.cpp b/programs/server/HTTPHandler.cpp index 772362107415454c3a3b597345dc189044a92301..99afb319ed58d317c9059027b95016e65fca20c0 100644 --- a/programs/server/HTTPHandler.cpp +++ b/programs/server/HTTPHandler.cpp @@ -216,7 +216,7 @@ void HTTPHandler::processQuery( Poco::Net::HTTPServerRequest & request, HTMLForm & params, Poco::Net::HTTPServerResponse & response, - Output & used_output) + HTTPStreamsWithOutput & used_output) { Context context = server.context(); @@ -224,6 +224,12 @@ void HTTPHandler::processQuery( LOG_TRACE(log, "Request URI: " << request.getURI()); + if (context.getSettingsRef().allow_experimental_custom_http) + { + context.getHTTPMatchExecutor()->execute(context, request, response, params, used_output); + return; + } + std::istream & istr = request.stream(); /// Part of the query can be passed in the 'query' parameter and the rest in the request body @@ -605,20 +611,14 @@ void HTTPHandler::processQuery( } ); - if (used_output.hasDelayed()) - { - /// TODO: set Content-Length if possible - pushDelayedResults(used_output); - } - /// Send HTTP headers with code 200 if no exception happened and the data is still not sent to /// the client. - used_output.out->finalize(); + used_output.finalize(); } void HTTPHandler::trySendExceptionToClient(const std::string & s, int exception_code, Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response, - Output & used_output) + HTTPStreamsWithOutput & used_output) { try { @@ -655,7 +655,7 @@ void HTTPHandler::trySendExceptionToClient(const std::string & s, int exception_ else if (used_output.out_maybe_compressed) { /// Destroy CascadeBuffer to actualize buffers' positions and reset extra references - if (used_output.hasDelayed()) + if (used_output.out_maybe_delayed_and_compressed != used_output.out_maybe_compressed) used_output.out_maybe_delayed_and_compressed.reset(); /// Send the error message into already used (and possibly compressed) stream. @@ -691,7 +691,7 @@ void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne setThreadName("HTTPHandler"); ThreadStatus thread_status; - Output used_output; + HTTPStreamsWithOutput used_output; /// In case of exception, send stack trace to client. bool with_stacktrace = false; diff --git a/programs/server/HTTPHandler.h b/programs/server/HTTPHandler.h index 681a3cce932b66b915ce02b99b932d4fb14eb0dc..603278e11aa1990164b5ec22b38bc1fac00282a1 100644 --- a/programs/server/HTTPHandler.h +++ b/programs/server/HTTPHandler.h @@ -7,6 +7,8 @@ #include #include +#include + namespace CurrentMetrics { @@ -68,14 +70,14 @@ private: Poco::Net::HTTPServerRequest & request, HTMLForm & params, Poco::Net::HTTPServerResponse & response, - Output & used_output); + HTTPStreamsWithOutput & used_output); void trySendExceptionToClient( const std::string & s, int exception_code, Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response, - Output & used_output); + HTTPStreamsWithOutput & used_output); static void pushDelayedResults(Output & used_output); }; diff --git a/src/IO/WriteBufferFromHTTPServerResponse.h b/src/IO/WriteBufferFromHTTPServerResponse.h index ffa36c11c5b2e8ea72e27bc7304655b87bde69c4..c10288fff9f314d552a56ff66679548db895d81e 100644 --- a/src/IO/WriteBufferFromHTTPServerResponse.h +++ b/src/IO/WriteBufferFromHTTPServerResponse.h @@ -113,6 +113,11 @@ public: compress = enable_compression; } + bool getCompression() + { + return compress; + } + /// Set compression level if the compression is turned on. /// The setting has any effect only if HTTP headers haven't been sent yet. void setCompressionLevel(int level) diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 647c3fb8020d6b25de0df7bd6a87c3184b7d5f5e..03dd311f31b4226365b20d0668d62bc5740e95f7 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -44,6 +44,7 @@ #include #include #include +#include #include #include #include @@ -348,6 +349,7 @@ struct ContextShared std::unique_ptr clusters; ConfigurationPtr clusters_config; /// Stores updated configs mutable std::mutex clusters_mutex; /// Guards clusters and clusters_config + mutable std::mutex match_executor_mutex; /// Guards match executor and their config #if USE_EMBEDDED_COMPILER std::shared_ptr compiled_expression_cache; @@ -1543,6 +1545,10 @@ void Context::setCluster(const String & cluster_name, const std::shared_ptrclusters->setCluster(cluster_name, cluster); } +HTTPMatchExecutorPtr Context::getHTTPMatchExecutor() +{ + return std::make_shared(); +} void Context::initializeSystemLogs() { diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 1f81cdbc58b4e66db1c666fdb5f098f506acad55..03859b0334863f8723d4b07818831e55c4e9f8c6 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -490,6 +490,8 @@ public: Compiler & getCompiler(); + HTTPMatchExecutorPtr getHTTPMatchExecutor(); + /// Call after initialization before using system logs. Call for global context. void initializeSystemLogs();