提交 159ba24f 编写于 作者: Z zhang2014

ISSUES-5436 support custom http

上级 9dc70545
#include <Interpreters/CustomHTTP/HTTPMatchExecutor.h>
#include <ext/scope_guard.h>
#include <Common/Exception.h>
#include <Common/SettingsChanges.h>
#include <Core/ExternalTable.h>
#include <Interpreters/Context.h>
#include <Interpreters/CustomHTTP/HTTPMatchExecutorDefault.h>
#include <Poco/Net/HTTPBasicCredentials.h>
#include <IO/CascadeWriteBuffer.h>
#include <IO/MemoryReadWriteBuffer.h>
#include <IO/WriteBufferFromTemporaryFile.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadBufferFromIStream.h>
#include <IO/BrotliReadBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ZlibInflatingReadBuffer.h>
#include <DataStreams/IBlockInputStream.h>
#include <Poco/Net/HTTPServerRequestImpl.h>
#include <Compression/CompressedReadBuffer.h>
#include <Interpreters/CustomHTTP/HTTPStreamsWithInput.h>
#include <Interpreters/CustomHTTP/HTTPStreamsWithOutput.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SYNTAX_ERROR;
extern const int LOGICAL_ERROR;
extern const int REQUIRED_PASSWORD;
extern const int INVALID_SESSION_TIMEOUT;
extern const int UNKNOWN_COMPRESSION_METHOD;
}
namespace
{
duration parseSessionTimeout(const HTMLForm & params, size_t default_session_timeout, size_t max_session_timeout)
{
size_t session_timeout = default_session_timeout;
if (params.has("session_timeout"))
{
std::string session_timeout_str = params.get("session_timeout");
ReadBufferFromString buf(session_timeout_str);
if (!tryReadIntText(session_timeout, buf) || !buf.eof())
throw Exception("Invalid session timeout: '" + session_timeout_str + "'", ErrorCodes::INVALID_SESSION_TIMEOUT);
if (session_timeout > max_session_timeout)
throw Exception(
"Session timeout '" + session_timeout_str + "' is larger than max_session_timeout: " + toString(max_session_timeout) +
". Maximum session timeout could be modified in configuration file.", ErrorCodes::INVALID_SESSION_TIMEOUT);
}
return std::chrono::seconds(session_timeout);
}
}
void HTTPMatchExecutor::execute(Context & context, HTTPServerRequest & request, HTTPServerResponse & response, HTMLForm & params, HTTPStreamsWithOutput & used_output) const
{
authentication(context, request, params);
std::shared_ptr<Context> session_context;
String session_id = params.get("session_id", "");
duration session_timeout = parseSessionTimeout(params, 1800, 3600);
SCOPE_EXIT({ detachSessionContext(session_context, session_id, session_timeout); });
session_context = attachSessionContext(context, params, session_id, session_timeout);
initClientInfo(context, request);
used_output.attachRequestAndResponse(context, request, response, params, /* TODO: keep_alive_time_out */ 0);
HTTPStreamsWithInput used_input(request, params);
collectParamsAndApplySettings(request, params, context);
Settings & query_settings = context.getSettingsRef();
used_input.attachSettings(context, query_settings, request);
used_output.attachSettings(context, query_settings, request);
String execute_query_string = getExecuteQuery(params);
ReadBufferPtr query_in_buffer = std::make_shared<ReadBufferFromString>(execute_query_string);
ReadBufferPtr in = query_in_buffer;
if (!needParsePostBody(request, params) || !context.getExternalTables().empty())
in = std::make_shared<ConcatReadBuffer>(*query_in_buffer, *used_input.in_maybe_internal_compressed);
executeQuery(*in, *used_output.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context,
[&response] (const String & content_type) { response.setContentType(content_type); },
[&response] (const String & current_query_id) { response.add("X-ClickHouse-Query-Id", current_query_id); });
used_output.finalize();
}
void HTTPMatchExecutor::initClientInfo(Context & context, HTTPServerRequest & request) const
{
ClientInfo & client_info = context.getClientInfo();
client_info.query_kind = ClientInfo::QueryKind::INITIAL_QUERY;
client_info.interface = ClientInfo::Interface::HTTP;
/// Query sent through HTTP interface is initial.
client_info.initial_user = client_info.current_user;
client_info.initial_query_id = client_info.current_query_id;
client_info.initial_address = client_info.current_address;
ClientInfo::HTTPMethod http_method = ClientInfo::HTTPMethod::UNKNOWN;
if (request.getMethod() == Poco::Net::HTTPServerRequest::HTTP_GET)
http_method = ClientInfo::HTTPMethod::GET;
else if (request.getMethod() == Poco::Net::HTTPServerRequest::HTTP_POST)
http_method = ClientInfo::HTTPMethod::POST;
client_info.http_method = http_method;
client_info.http_user_agent = request.get("User-Agent", "");
}
void HTTPMatchExecutor::authentication(Context & context, HTTPServerRequest & request, HTMLForm & params) const
{
auto user = request.get("X-ClickHouse-User", "");
auto password = request.get("X-ClickHouse-Key", "");
auto quota_key = request.get("X-ClickHouse-Quota", "");
if (user.empty() && password.empty() && quota_key.empty())
{
/// User name and password can be passed using query parameters
/// or using HTTP Basic auth (both methods are insecure).
if (request.hasCredentials())
{
Poco::Net::HTTPBasicCredentials credentials(request);
user = credentials.getUsername();
password = credentials.getPassword();
}
else
{
user = params.get("user", "default");
password = params.get("password", "");
}
quota_key = params.get("quota_key", "");
}
else
{
/// It is prohibited to mix different authorization schemes.
if (request.hasCredentials()
|| params.has("user")
|| params.has("password")
|| params.has("quota_key"))
{
throw Exception("Invalid authentication: it is not allowed to use X-ClickHouse HTTP headers and other authentication methods simultaneously", ErrorCodes::REQUIRED_PASSWORD);
}
}
std::string query_id = params.get("query_id", "");
context.setUser(user, password, request.clientAddress(), quota_key);
context.setCurrentQueryId(query_id);
}
std::shared_ptr<Context> HTTPMatchExecutor::attachSessionContext(
Context & context, HTMLForm & params, const String & session_id, const duration & session_timeout) const
{
if (!session_id.empty())
{
std::string session_check = params.get("session_check", "");
auto session_context = context.acquireSession(session_id, session_timeout, session_check == "1");
context = *session_context;
context.setSessionContext(*session_context);
return session_context;
}
return {};
}
void HTTPMatchExecutor::detachSessionContext(std::shared_ptr<Context> & session_context, const String & session_id, const duration & session_timeout) const
{
if (session_context)
session_context->releaseSession(session_id, session_timeout);
}
void HTTPMatchExecutor::collectParamsAndApplySettings(HTTPServerRequest & request, HTMLForm & params, Context & context) const
{
static const NameSet reserved_param_names{
"compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace", "buffer_size", "wait_end_of_query",
"session_id", "session_timeout", "session_check"};
Names reserved_param_suffixes;
auto param_could_be_skipped = [&] (const String & name)
{
if (reserved_param_names.count(name))
return true;
for (const String & suffix : reserved_param_suffixes)
{
if (endsWith(name, suffix))
return true;
}
return false;
};
/// Settings can be overridden in the query.
/// Some parameters (database, default_format, everything used in the code above) do not
/// belong to the Settings class.
/// 'readonly' setting values mean:
/// readonly = 0 - any query is allowed, client can change any setting.
/// readonly = 1 - only readonly queries are allowed, client can't change settings.
/// readonly = 2 - only readonly queries are allowed, client can change any setting except 'readonly'.
/// In theory if initially readonly = 0, the client can change any setting and then set readonly
/// to some other value.
/// Only readonly queries are allowed for HTTP GET requests.
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET)
{
Settings & settings = context.getSettingsRef();
if (settings.readonly == 0)
settings.readonly = 2;
}
bool has_multipart = startsWith(request.getContentType().data(), "multipart/form-data");
if (has_multipart || needParsePostBody(request, params))
{
ExternalTablesHandler handler(context, params);
params.load(request, request.stream(), handler);
if (has_multipart)
{
/// Skip unneeded parameters to avoid confusing them later with context settings or query parameters.
reserved_param_suffixes.reserve(3);
/// It is a bug and ambiguity with `date_time_input_format` and `low_cardinality_allow_in_native_format` formats/settings.
reserved_param_suffixes.emplace_back("_format");
reserved_param_suffixes.emplace_back("_types");
reserved_param_suffixes.emplace_back("_structure");
}
}
SettingsChanges settings_changes;
for (const auto & [key, value] : params)
{
if (key == "database")
context.setCurrentDatabase(value);
else if (key == "default_format")
context.setDefaultFormat(value);
else if (!param_could_be_skipped(key) && !acceptQueryParam(context, key, value))
settings_changes.push_back({key, value}); /// All other query parameters are treated as settings.
}
/// For external data we also want settings
context.checkSettingsConstraints(settings_changes);
context.applySettingsChanges(settings_changes);
}
}
#pragma once
#include <Core/Types.h>
#include <Common/HTMLForm.h>
#include <Common/Exception.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <IO/ReadBuffer.h>
namespace DB
{
class Context;
class HTTPMatchExecutor;
struct HTTPStreamsWithInput;
struct HTTPStreamsWithOutput;
using duration = std::chrono::steady_clock::duration;
using HTTPMatchExecutorPtr = std::shared_ptr<HTTPMatchExecutor>;
class HTTPMatchExecutor
{
public:
using HTTPServerRequest = Poco::Net::HTTPServerRequest;
using HTTPServerResponse = Poco::Net::HTTPServerResponse;
bool match(HTTPServerRequest & request, HTMLForm & params) const { return matchImpl(request, params); };
void execute(Context & context, HTTPServerRequest & request, HTTPServerResponse & response, HTMLForm & params, HTTPStreamsWithOutput & used_output) const;
virtual ~HTTPMatchExecutor() = default;
protected:
virtual bool matchImpl(HTTPServerRequest & request, HTMLForm & params) const = 0;
virtual String getExecuteQuery(HTMLForm & params) const = 0;
virtual bool needParsePostBody(HTTPServerRequest & request, HTMLForm & params) const = 0;
virtual bool acceptQueryParam(Context & context, const String & key, const String & value) const = 0;
void initClientInfo(Context & context, HTTPServerRequest & request) const;
void authentication(Context & context, HTTPServerRequest & request, HTMLForm & params) const;
void detachSessionContext(std::shared_ptr<Context> & context, const String & session_id, const duration & session_timeout) const;
std::shared_ptr<Context> attachSessionContext(Context & context, HTMLForm & params, const String & session_id, const duration & session_timeout) const;
void collectParamsAndApplySettings(HTTPServerRequest & request, HTMLForm & params, Context & context) const;
};
}
#pragma once
#include <Interpreters/CustomHTTP/HTTPMatchExecutor.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <Interpreters/executeQuery.h>
namespace DB
{
class HTTPMatchExecutorDefault : public HTTPMatchExecutor
{
protected:
bool matchImpl(HTTPServerRequest & /*request*/, HTMLForm & /*params*/) const override { return true; }
bool needParsePostBody(HTTPServerRequest & /*request*/, HTMLForm & /*params*/) const override { return false; }
String getExecuteQuery(HTMLForm & params) const override
{
String execute_query;
for (const auto & [key, value] : params)
{
if (key == "query")
execute_query += value;
}
return execute_query;
}
bool acceptQueryParam(Context &context, const String &key, const String &value) const override
{
if (startsWith(key, "param_"))
{
/// Save name and values of substitution in dictionary.
context.setQueryParameter(key.substr(strlen("param_")), value);
return true;
}
return key == "query";
}
};
}
#include <Interpreters/CustomHTTP/HTTPStreamsWithInput.h>
#include <IO/ReadBufferFromIStream.h>
#include <IO/ZlibInflatingReadBuffer.h>
#include <Compression/CompressedReadBuffer.h>
#include "HTTPStreamsWithInput.h"
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_COMPRESSION_METHOD;
}
HTTPStreamsWithInput::HTTPStreamsWithInput(HTTPServerRequest & request, HTMLForm & from)
: in(createRawInBuffer(request))
, in_maybe_compressed(createCompressedBuffer(request, in))
, in_maybe_internal_compressed(createInternalCompressedBuffer(from, in_maybe_compressed))
{
}
ReadBufferPtr HTTPStreamsWithInput::createRawInBuffer(HTTPServerRequest & request) const
{
return std::make_unique<ReadBufferFromIStream>(request.stream());
}
ReadBufferPtr HTTPStreamsWithInput::createCompressedBuffer(HTTPServerRequest & request, ReadBufferPtr & raw_buffer) const
{
/// Request body can be compressed using algorithm specified in the Content-Encoding header.
String http_compressed_method = request.get("Content-Encoding", "");
if (!http_compressed_method.empty())
{
if (http_compressed_method == "gzip")
return std::make_shared<ZlibInflatingReadBuffer>(*raw_buffer, CompressionMethod::Gzip);
else if (http_compressed_method == "deflate")
return std::make_shared<ZlibInflatingReadBuffer>(*raw_buffer, CompressionMethod::Zlib);
#if USE_BROTLI
else if (http_compressed_method == "br")
return std::make_shared<BrotliReadBuffer>(*raw_buffer);
#endif
else
throw Exception("Unknown Content-Encoding of HTTP request: " + http_compressed_method, ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
}
return raw_buffer;
}
ReadBufferPtr HTTPStreamsWithInput::createInternalCompressedBuffer(HTMLForm & params, ReadBufferPtr & http_maybe_encoding_buffer) const
{
/// The data can also be compressed using incompatible internal algorithm. This is indicated by
/// 'decompress' query parameter.
std::unique_ptr<ReadBuffer> in_post_maybe_compressed;
if (params.getParsed<bool>("decompress", false))
return std::make_unique<CompressedReadBuffer>(*http_maybe_encoding_buffer);
return http_maybe_encoding_buffer;
}
void HTTPStreamsWithInput::attachSettings(Context & /*context*/, Settings & settings, HTTPServerRequest & /*request*/)
{
/// If 'http_native_compression_disable_checksumming_on_decompress' setting is turned on,
/// checksums of client data compressed with internal algorithm are not checked.
if (settings.http_native_compression_disable_checksumming_on_decompress)
{
if(CompressedReadBuffer * compressed_buffer = typeid_cast<CompressedReadBuffer *>(in_maybe_internal_compressed.get()))
compressed_buffer->disableChecksumming();
}
}
}
#pragma once
#include <Core/Types.h>
#include <IO/ReadBuffer.h>
#include <Common/HTMLForm.h>
#include <Interpreters/Context.h>
#include <Poco/Net/HTTPServerRequest.h>
namespace DB
{
using HTTPServerRequest = Poco::Net::HTTPServerRequest;
struct HTTPStreamsWithInput
{
std::shared_ptr<ReadBuffer> in;
std::shared_ptr<ReadBuffer> in_maybe_compressed;
std::shared_ptr<ReadBuffer> in_maybe_internal_compressed;
HTTPStreamsWithInput(HTTPServerRequest & request, HTMLForm & from);
void attachSettings(Context & context, Settings & settings, HTTPServerRequest & request);
ReadBufferPtr createRawInBuffer(HTTPServerRequest & request) const;
ReadBufferPtr createCompressedBuffer(HTTPServerRequest & request, ReadBufferPtr & raw_buffer) const;
ReadBufferPtr createInternalCompressedBuffer(HTMLForm & params, ReadBufferPtr & http_maybe_encoding_buffer) const;
};
}
\ No newline at end of file
#include <Interpreters/CustomHTTP/HTTPStreamsWithOutput.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/copyData.h>
#include <IO/CascadeWriteBuffer.h>
#include <IO/MemoryReadWriteBuffer.h>
#include <DataStreams/IBlockInputStream.h>
#include <Poco/Net/HTTPServerRequestImpl.h>
#include <IO/WriteBufferFromTemporaryFile.h>
#include <Compression/CompressedReadBuffer.h>
#include <IO/ConcatReadBuffer.h>
namespace DB
{
namespace
{
inline void listeningProgress(Context & context, ProgressCallback listener)
{
auto prev = context.getProgressCallback();
context.setProgressCallback([prev, listener] (const Progress & progress)
{
if (prev)
prev(progress);
listener(progress);
});
}
inline ProgressCallback cancelListener(Context & context, Poco::Net::StreamSocket & socket)
{
/// Assume that at the point this method is called no one is reading data from the socket any more.
/// True for read-only queries.
return [&context, &socket](const Progress &)
{
try
{
char b;
int status = socket.receiveBytes(&b, 1, MSG_DONTWAIT | MSG_PEEK);
if (status == 0)
context.killCurrentQuery();
}
catch (Poco::TimeoutException &)
{
}
catch (...)
{
context.killCurrentQuery();
}
};
}
}
void HTTPStreamsWithOutput::attachSettings(Context & context, Settings & settings, HTTPServerRequest & request)
{
/// HTTP response compression is turned on only if the client signalled that they support it
/// (using Accept-Encoding header) and 'enable_http_compression' setting is turned on.
out->setCompression(out->getCompression() && settings.enable_http_compression);
if (out->getCompression())
out->setCompressionLevel(settings.http_zlib_compression_level);
out->setSendProgressInterval(settings.http_headers_progress_interval_ms);
/// Add CORS header if 'add_http_cors_header' setting is turned on and the client passed
/// Origin header.
out->addHeaderCORS(settings.add_http_cors_header && !request.get("Origin", "").empty());
/// While still no data has been sent, we will report about query execution progress by sending HTTP headers.
if (settings.send_progress_in_http_headers)
listeningProgress(context, [this] (const Progress & progress) { out->onProgress(progress); });
if (settings.readonly > 0 && settings.cancel_http_readonly_queries_on_client_close)
{
Poco::Net::StreamSocket & socket = dynamic_cast<Poco::Net::HTTPServerRequestImpl &>(request).socket();
listeningProgress(context, cancelListener(context, socket));
}
}
void HTTPStreamsWithOutput::attachRequestAndResponse(
Context & context, HTTPServerRequest & request, HTTPServerResponse & response, HTMLForm & form, size_t keep_alive_timeout)
{
out = createEndpoint(request, response, keep_alive_timeout);
out_maybe_compressed = createMaybeCompressionEndpoint(form, out);
out_maybe_delayed_and_compressed = createMaybeDelayedAndCompressionEndpoint(context, form, out_maybe_compressed);
}
std::shared_ptr<WriteBufferFromHTTPServerResponse> HTTPStreamsWithOutput::createEndpoint(
HTTPServerRequest & request, HTTPServerResponse & response, size_t keep_alive_timeout)
{
/// The client can pass a HTTP header indicating supported compression method (gzip or deflate).
String http_response_compression_methods = request.get("Accept-Encoding", "");
if (!http_response_compression_methods.empty())
{
/// Both gzip and deflate are supported. If the client supports both, gzip is preferred.
/// NOTE parsing of the list of methods is slightly incorrect.
if (std::string::npos != http_response_compression_methods.find("gzip"))
return std::make_shared<WriteBufferFromHTTPServerResponse>(
request, response, keep_alive_timeout, true, CompressionMethod::Gzip, DBMS_DEFAULT_BUFFER_SIZE);
else if (std::string::npos != http_response_compression_methods.find("deflate"))
return std::make_shared<WriteBufferFromHTTPServerResponse>(
request, response, keep_alive_timeout, true, CompressionMethod::Zlib, DBMS_DEFAULT_BUFFER_SIZE);
#if USE_BROTLI
else if (http_response_compression_methods == "br")
return std::make_shared<WriteBufferFromHTTPServerResponse>(
request, response, keep_alive_timeout, true, CompressionMethod::Brotli, DBMS_DEFAULT_BUFFER_SIZE);
#endif
}
return std::make_shared<WriteBufferFromHTTPServerResponse>(
request, response, keep_alive_timeout, false, CompressionMethod{}, DBMS_DEFAULT_BUFFER_SIZE);
}
WriteBufferPtr HTTPStreamsWithOutput::createMaybeCompressionEndpoint(HTMLForm & form, std::shared_ptr<WriteBufferFromHTTPServerResponse> & endpoint)
{
/// Client can pass a 'compress' flag in the query string. In this case the query result is
/// compressed using internal algorithm. This is not reflected in HTTP headers.
bool internal_compression = form.getParsed<bool>("compress", false);
return internal_compression ? std::make_shared<CompressedWriteBuffer>(*endpoint) : WriteBufferPtr(endpoint);
}
WriteBufferPtr HTTPStreamsWithOutput::createMaybeDelayedAndCompressionEndpoint(Context & context, HTMLForm & form, WriteBufferPtr & endpoint)
{
/// If it is specified, the whole result will be buffered.
/// First ~buffer_size bytes will be buffered in memory, the remaining bytes will be stored in temporary file.
bool buffer_until_eof = form.getParsed<bool>("wait_end_of_query", false);
/// At least, we should postpone sending of first buffer_size result bytes
size_t buffer_size_total = std::max(form.getParsed<size_t>("buffer_size", DBMS_DEFAULT_BUFFER_SIZE), static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE));
size_t buffer_size_memory = (buffer_size_total > DBMS_DEFAULT_BUFFER_SIZE) ? buffer_size_total : 0;
if (buffer_size_memory > 0 || buffer_until_eof)
{
CascadeWriteBuffer::WriteBufferPtrs cascade_buffer1;
CascadeWriteBuffer::WriteBufferConstructors cascade_buffer2;
if (buffer_size_memory > 0)
cascade_buffer1.emplace_back(std::make_shared<MemoryWriteBuffer>(buffer_size_memory));
if (buffer_until_eof)
{
std::string tmp_path_template = context.getTemporaryPath() + "http_buffers/";
auto create_tmp_disk_buffer = [tmp_path_template] (const WriteBufferPtr &)
{
return WriteBufferFromTemporaryFile::create(tmp_path_template);
};
cascade_buffer2.emplace_back(std::move(create_tmp_disk_buffer));
}
else
{
auto push_memory_buffer_and_continue = [next_buffer = endpoint] (const WriteBufferPtr & prev_buf)
{
auto prev_memory_buffer = typeid_cast<MemoryWriteBuffer *>(prev_buf.get());
if (!prev_memory_buffer)
throw Exception("Expected MemoryWriteBuffer", ErrorCodes::LOGICAL_ERROR);
auto rdbuf = prev_memory_buffer->tryGetReadBuffer();
copyData(*rdbuf , *next_buffer);
return next_buffer;
};
cascade_buffer2.emplace_back(push_memory_buffer_and_continue);
}
return std::make_shared<CascadeWriteBuffer>(std::move(cascade_buffer1), std::move(cascade_buffer2));
}
return endpoint;
}
void HTTPStreamsWithOutput::finalize() const
{
if (out_maybe_delayed_and_compressed != out_maybe_compressed)
{
/// TODO: set Content-Length if possible
std::vector<WriteBufferPtr> write_buffers;
std::vector<ReadBufferPtr> read_buffers;
std::vector<ReadBuffer *> read_buffers_raw_ptr;
auto cascade_buffer = typeid_cast<CascadeWriteBuffer *>(out_maybe_delayed_and_compressed.get());
if (!cascade_buffer)
throw Exception("Expected CascadeWriteBuffer", ErrorCodes::LOGICAL_ERROR);
cascade_buffer->getResultBuffers(write_buffers);
if (write_buffers.empty())
throw Exception("At least one buffer is expected to overwrite result into HTTP response", ErrorCodes::LOGICAL_ERROR);
for (auto & write_buf : write_buffers)
{
IReadableWriteBuffer * write_buf_concrete;
ReadBufferPtr reread_buf;
if (write_buf
&& (write_buf_concrete = dynamic_cast<IReadableWriteBuffer *>(write_buf.get()))
&& (reread_buf = write_buf_concrete->tryGetReadBuffer()))
{
read_buffers.emplace_back(reread_buf);
read_buffers_raw_ptr.emplace_back(reread_buf.get());
}
}
ConcatReadBuffer concat_read_buffer(read_buffers_raw_ptr);
copyData(concat_read_buffer, *out_maybe_compressed);
}
/// Send HTTP headers with code 200 if no exception happened and the data is still not sent to
/// the client.
out->finalize();
}
}
#pragma once
#include <Core/Types.h>
#include <IO/WriteBuffer.h>
#include <Common/HTMLForm.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <IO/WriteBufferFromHTTPServerResponse.h>
namespace DB
{
using HTTPServerRequest = Poco::Net::HTTPServerRequest;
using HTTPServerResponse = Poco::Net::HTTPServerResponse;
/* Raw data
* ↓
* CascadeWriteBuffer out_maybe_delayed_and_compressed (optional)
* ↓ (forwards data if an overflow is occur or explicitly via pushDelayedResults)
* CompressedWriteBuffer out_maybe_compressed (optional)
* ↓
* WriteBufferFromHTTPServerResponse out
*/
struct HTTPStreamsWithOutput
{
using HTTPResponseBufferPtr = std::shared_ptr<WriteBufferFromHTTPServerResponse>;
HTTPResponseBufferPtr out;
/// Points to 'out' or to CompressedWriteBuffer(*out), depending on settings.
std::shared_ptr<WriteBuffer> out_maybe_compressed;
/// Points to 'out' or to CompressedWriteBuffer(*out) or to CascadeWriteBuffer.
std::shared_ptr<WriteBuffer> out_maybe_delayed_and_compressed;
void finalize() const;
WriteBufferPtr createMaybeDelayedAndCompressionEndpoint(Context & context, HTMLForm & form, WriteBufferPtr & endpoint);
WriteBufferPtr createMaybeCompressionEndpoint(HTMLForm & form, std::shared_ptr<WriteBufferFromHTTPServerResponse> & endpoint);
HTTPResponseBufferPtr createEndpoint(HTTPServerRequest & request, HTTPServerResponse & response, size_t keep_alive_timeout);
void attachSettings(Context & context, Settings & settings, HTTPServerRequest & request);
void attachRequestAndResponse(Context & context, HTTPServerRequest & request, HTTPServerResponse & response, HTMLForm & form, size_t keep_alive_timeout);
};
}
......@@ -216,7 +216,7 @@ void HTTPHandler::processQuery(
Poco::Net::HTTPServerRequest & request,
HTMLForm & params,
Poco::Net::HTTPServerResponse & response,
Output & used_output)
HTTPStreamsWithOutput & used_output)
{
Context context = server.context();
......@@ -224,6 +224,12 @@ void HTTPHandler::processQuery(
LOG_TRACE(log, "Request URI: " << request.getURI());
if (context.getSettingsRef().allow_experimental_custom_http)
{
context.getHTTPMatchExecutor()->execute(context, request, response, params, used_output);
return;
}
std::istream & istr = request.stream();
/// Part of the query can be passed in the 'query' parameter and the rest in the request body
......@@ -605,20 +611,14 @@ void HTTPHandler::processQuery(
}
);
if (used_output.hasDelayed())
{
/// TODO: set Content-Length if possible
pushDelayedResults(used_output);
}
/// Send HTTP headers with code 200 if no exception happened and the data is still not sent to
/// the client.
used_output.out->finalize();
used_output.finalize();
}
void HTTPHandler::trySendExceptionToClient(const std::string & s, int exception_code,
Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response,
Output & used_output)
HTTPStreamsWithOutput & used_output)
{
try
{
......@@ -655,7 +655,7 @@ void HTTPHandler::trySendExceptionToClient(const std::string & s, int exception_
else if (used_output.out_maybe_compressed)
{
/// Destroy CascadeBuffer to actualize buffers' positions and reset extra references
if (used_output.hasDelayed())
if (used_output.out_maybe_delayed_and_compressed != used_output.out_maybe_compressed)
used_output.out_maybe_delayed_and_compressed.reset();
/// Send the error message into already used (and possibly compressed) stream.
......@@ -691,7 +691,7 @@ void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne
setThreadName("HTTPHandler");
ThreadStatus thread_status;
Output used_output;
HTTPStreamsWithOutput used_output;
/// In case of exception, send stack trace to client.
bool with_stacktrace = false;
......
......@@ -7,6 +7,8 @@
#include <Common/CurrentMetrics.h>
#include <Common/HTMLForm.h>
#include <Interpreters/CustomHTTP/HTTPStreamsWithOutput.h>
namespace CurrentMetrics
{
......@@ -68,14 +70,14 @@ private:
Poco::Net::HTTPServerRequest & request,
HTMLForm & params,
Poco::Net::HTTPServerResponse & response,
Output & used_output);
HTTPStreamsWithOutput & used_output);
void trySendExceptionToClient(
const std::string & s,
int exception_code,
Poco::Net::HTTPServerRequest & request,
Poco::Net::HTTPServerResponse & response,
Output & used_output);
HTTPStreamsWithOutput & used_output);
static void pushDelayedResults(Output & used_output);
};
......
......@@ -113,6 +113,11 @@ public:
compress = enable_compression;
}
bool getCompression()
{
return compress;
}
/// Set compression level if the compression is turned on.
/// The setting has any effect only if HTTP headers haven't been sent yet.
void setCompressionLevel(int level)
......
......@@ -44,6 +44,7 @@
#include <Interpreters/SystemLog.h>
#include <Interpreters/Context.h>
#include <Interpreters/DDLWorker.h>
#include <Interpreters/CustomHTTP/HTTPMatchExecutorDefault.h>
#include <Common/DNSResolver.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/UncompressedCache.h>
......@@ -348,6 +349,7 @@ struct ContextShared
std::unique_ptr<Clusters> clusters;
ConfigurationPtr clusters_config; /// Stores updated configs
mutable std::mutex clusters_mutex; /// Guards clusters and clusters_config
mutable std::mutex match_executor_mutex; /// Guards match executor and their config
#if USE_EMBEDDED_COMPILER
std::shared_ptr<CompiledExpressionCache> compiled_expression_cache;
......@@ -1543,6 +1545,10 @@ void Context::setCluster(const String & cluster_name, const std::shared_ptr<Clus
shared->clusters->setCluster(cluster_name, cluster);
}
HTTPMatchExecutorPtr Context::getHTTPMatchExecutor()
{
return std::make_shared<HTTPMatchExecutorDefault>();
}
void Context::initializeSystemLogs()
{
......
......@@ -490,6 +490,8 @@ public:
Compiler & getCompiler();
HTTPMatchExecutorPtr getHTTPMatchExecutor();
/// Call after initialization before using system logs. Call for global context.
void initializeSystemLogs();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册