提交 183eb82b 编写于 作者: Z zhang2014

ISSUES-5436 support custom http [part 2]

上级 159ba24f
#pragma once
#include <Core/Types.h>
#include <Common/HTMLForm.h>
#include <Common/Exception.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <IO/ReadBuffer.h>
namespace DB
{
class Context;
class CustomExecutor;
struct HTTPInputStreams;
struct HTTPOutputStreams;
using duration = std::chrono::steady_clock::duration;
using HTTPMatchExecutorPtr = std::shared_ptr<CustomExecutor>;
class CustomExecutor
{
public:
using HTTPServerRequest = Poco::Net::HTTPServerRequest;
using HTTPServerResponse = Poco::Net::HTTPServerResponse;
virtual ~CustomExecutor() = default;
virtual bool isQueryParam(const String & param_name) const = 0;
virtual bool match(HTTPServerRequest & request, HTMLForm & params) const = 0;
virtual bool canBeParseRequestBody(HTTPServerRequest & request, HTMLForm & params) = 0;
using QueryExecutor = std::function<void(HTTPOutputStreams &, HTTPServerResponse &)>;
using QueryExecutors = std::vector<QueryExecutor>;
virtual QueryExecutors getQueryExecutor(Context & context, HTTPServerRequest & request, HTMLForm & params, const HTTPInputStreams & input_streams) const = 0;
};
}
#pragma once
#include <Interpreters/CustomHTTP/CustomExecutor.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/CustomHTTP/HTTPOutputStreams.h>
namespace DB
{
class CustomExecutorDefault : public CustomExecutor
{
public:
bool match(HTTPServerRequest & /*request*/, HTMLForm & /*params*/) const override { return true; }
bool canBeParseRequestBody(HTTPServerRequest & /*request*/, HTMLForm & /*params*/) override { return false; }
bool isQueryParam(const String & param_name) const override
{
return param_name == "query" || startsWith(param_name, "param_");
}
QueryExecutors getQueryExecutor(Context & context, HTTPServerRequest & request, HTMLForm & params, const HTTPInputStreams & input_streams) const override
{
ReadBufferPtr in = prepareAndGetQueryInput(context, request, params, input_streams);
return {[&, shared_in = in](HTTPOutputStreams & output, HTTPServerResponse & response)
{
executeQuery(
*shared_in, *output.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context,
[&response] (const String & content_type) { response.setContentType(content_type); },
[&response] (const String & current_query_id) { response.add("X-ClickHouse-Query-Id", current_query_id); }
);
}};
}
private:
ReadBufferPtr prepareAndGetQueryInput(Context & context, HTTPServerRequest & request, HTMLForm & params, const HTTPInputStreams & input_streams) const
{
for (const auto & [key, value] : params)
{
}
}
};
}
#pragma once
#include <Interpreters/ClientInfo.h>
#include <Poco/Net/HTTPServerRequest.h>
namespace DB
{
class ExtractorClientInfo
{
public:
ExtractorClientInfo(ClientInfo & info_) : client_info(info_) {}
void extract(Poco::Net::HTTPServerRequest & request)
{
client_info.query_kind = ClientInfo::QueryKind::INITIAL_QUERY;
client_info.interface = ClientInfo::Interface::HTTP;
/// Query sent through HTTP interface is initial.
client_info.initial_user = client_info.current_user;
client_info.initial_query_id = client_info.current_query_id;
client_info.initial_address = client_info.current_address;
ClientInfo::HTTPMethod http_method = ClientInfo::HTTPMethod::UNKNOWN;
if (request.getMethod() == Poco::Net::HTTPServerRequest::HTTP_GET)
http_method = ClientInfo::HTTPMethod::GET;
else if (request.getMethod() == Poco::Net::HTTPServerRequest::HTTP_POST)
http_method = ClientInfo::HTTPMethod::POST;
client_info.http_method = http_method;
client_info.http_user_agent = request.get("User-Agent", "");
}
private:
ClientInfo & client_info;
};
}
#pragma once
#include <Poco/Net/HTTPServerRequest.h>
#include <Core/ExternalTable.h>
#include <Common/SettingsChanges.h>
#include <Interpreters/Context.h>
#include <Interpreters/CustomHTTP/CustomExecutor.h>
namespace DB
{
class ExtractorContextChange
{
public:
ExtractorContextChange(Context & context_, const HTTPMatchExecutorPtr & executor_) : context(context_), executor(executor_) {}
void extract(Poco::Net::HTTPServerRequest & request, HTMLForm & params)
{
Names reserved_param_suffixes;
static const NameSet reserved_param_names{
"compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace", "buffer_size", "wait_end_of_query",
"session_id", "session_timeout", "session_check"};
auto param_could_be_skipped = [&] (const String & name)
{
if (reserved_param_names.count(name))
return true;
for (const String & suffix : reserved_param_suffixes)
{
if (endsWith(name, suffix))
return true;
}
return false;
};
/// Settings can be overridden in the query.
/// Some parameters (database, default_format, everything used in the code above) do not
/// belong to the Settings class.
/// 'readonly' setting values mean:
/// readonly = 0 - any query is allowed, client can change any setting.
/// readonly = 1 - only readonly queries are allowed, client can't change settings.
/// readonly = 2 - only readonly queries are allowed, client can change any setting except 'readonly'.
/// In theory if initially readonly = 0, the client can change any setting and then set readonly
/// to some other value.
/// Only readonly queries are allowed for HTTP GET requests.
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET)
{
Settings & settings = context.getSettingsRef();
if (settings.readonly == 0)
settings.readonly = 2;
}
bool has_multipart = startsWith(request.getContentType().data(), "multipart/form-data");
if (has_multipart || executor->canBeParseRequestBody(request, params))
{
ExternalTablesHandler handler(context, params);
params.load(request, request.stream(), handler);
if (has_multipart)
{
/// Skip unneeded parameters to avoid confusing them later with context settings or query parameters.
reserved_param_suffixes.reserve(3);
/// It is a bug and ambiguity with `date_time_input_format` and `low_cardinality_allow_in_native_format` formats/settings.
reserved_param_suffixes.emplace_back("_format");
reserved_param_suffixes.emplace_back("_types");
reserved_param_suffixes.emplace_back("_structure");
}
}
SettingsChanges settings_changes;
for (const auto & [key, value] : params)
{
if (key == "database")
context.setCurrentDatabase(value);
else if (key == "default_format")
context.setDefaultFormat(value);
else if (!param_could_be_skipped(key) && !executor->isQueryParam(key))
settings_changes.push_back({key, value}); /// All other query parameters are treated as settings.
}
/// For external data we also want settings
context.checkSettingsConstraints(settings_changes);
context.applySettingsChanges(settings_changes);
}
private:
Context & context;
const HTTPMatchExecutorPtr & executor;
};
}
#include <Interpreters/CustomHTTP/HTTPStreamsWithInput.h>
#include <Interpreters/CustomHTTP/HTTPInputStreams.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <IO/ReadBufferFromIStream.h>
#include <IO/ZlibInflatingReadBuffer.h>
#include <Compression/CompressedReadBuffer.h>
#include "HTTPStreamsWithInput.h"
namespace DB
......@@ -14,19 +15,26 @@ namespace ErrorCodes
extern const int UNKNOWN_COMPRESSION_METHOD;
}
HTTPStreamsWithInput::HTTPStreamsWithInput(HTTPServerRequest & request, HTMLForm & from)
HTTPInputStreams::HTTPInputStreams(Context & context, HTTPServerRequest & request, HTMLForm & from)
: in(createRawInBuffer(request))
, in_maybe_compressed(createCompressedBuffer(request, in))
, in_maybe_internal_compressed(createInternalCompressedBuffer(from, in_maybe_compressed))
{
/// If 'http_native_compression_disable_checksumming_on_decompress' setting is turned on,
/// checksums of client data compressed with internal algorithm are not checked.
if (context.getSettingsRef().http_native_compression_disable_checksumming_on_decompress)
{
if(CompressedReadBuffer * compressed_buffer = typeid_cast<CompressedReadBuffer *>(in_maybe_internal_compressed.get()))
compressed_buffer->disableChecksumming();
}
}
ReadBufferPtr HTTPStreamsWithInput::createRawInBuffer(HTTPServerRequest & request) const
ReadBufferPtr HTTPInputStreams::createRawInBuffer(HTTPServerRequest & request) const
{
return std::make_unique<ReadBufferFromIStream>(request.stream());
}
ReadBufferPtr HTTPStreamsWithInput::createCompressedBuffer(HTTPServerRequest & request, ReadBufferPtr & raw_buffer) const
ReadBufferPtr HTTPInputStreams::createCompressedBuffer(HTTPServerRequest & request, ReadBufferPtr & raw_buffer) const
{
/// Request body can be compressed using algorithm specified in the Content-Encoding header.
String http_compressed_method = request.get("Content-Encoding", "");
......@@ -38,7 +46,7 @@ ReadBufferPtr HTTPStreamsWithInput::createCompressedBuffer(HTTPServerRequest & r
else if (http_compressed_method == "deflate")
return std::make_shared<ZlibInflatingReadBuffer>(*raw_buffer, CompressionMethod::Zlib);
#if USE_BROTLI
else if (http_compressed_method == "br")
else if (http_compressed_method == "br")
return std::make_shared<BrotliReadBuffer>(*raw_buffer);
#endif
else
......@@ -48,7 +56,7 @@ ReadBufferPtr HTTPStreamsWithInput::createCompressedBuffer(HTTPServerRequest & r
return raw_buffer;
}
ReadBufferPtr HTTPStreamsWithInput::createInternalCompressedBuffer(HTMLForm & params, ReadBufferPtr & http_maybe_encoding_buffer) const
ReadBufferPtr HTTPInputStreams::createInternalCompressedBuffer(HTMLForm & params, ReadBufferPtr & http_maybe_encoding_buffer) const
{
/// The data can also be compressed using incompatible internal algorithm. This is indicated by
/// 'decompress' query parameter.
......@@ -59,15 +67,4 @@ ReadBufferPtr HTTPStreamsWithInput::createInternalCompressedBuffer(HTMLForm & pa
return http_maybe_encoding_buffer;
}
void HTTPStreamsWithInput::attachSettings(Context & /*context*/, Settings & settings, HTTPServerRequest & /*request*/)
{
/// If 'http_native_compression_disable_checksumming_on_decompress' setting is turned on,
/// checksums of client data compressed with internal algorithm are not checked.
if (settings.http_native_compression_disable_checksumming_on_decompress)
{
if(CompressedReadBuffer * compressed_buffer = typeid_cast<CompressedReadBuffer *>(in_maybe_internal_compressed.get()))
compressed_buffer->disableChecksumming();
}
}
}
......@@ -11,15 +11,13 @@ namespace DB
using HTTPServerRequest = Poco::Net::HTTPServerRequest;
struct HTTPStreamsWithInput
struct HTTPInputStreams
{
std::shared_ptr<ReadBuffer> in;
std::shared_ptr<ReadBuffer> in_maybe_compressed;
std::shared_ptr<ReadBuffer> in_maybe_internal_compressed;
HTTPStreamsWithInput(HTTPServerRequest & request, HTMLForm & from);
void attachSettings(Context & context, Settings & settings, HTTPServerRequest & request);
HTTPInputStreams(Context & context, HTTPServerRequest & request, HTMLForm & from);
ReadBufferPtr createRawInBuffer(HTTPServerRequest & request) const;
ReadBufferPtr createCompressedBuffer(HTTPServerRequest & request, ReadBufferPtr & raw_buffer) const;
......
#include <Interpreters/CustomHTTP/HTTPMatchExecutor.h>
#include <ext/scope_guard.h>
#include <Common/Exception.h>
#include <Common/SettingsChanges.h>
#include <Core/ExternalTable.h>
#include <Interpreters/Context.h>
#include <Interpreters/CustomHTTP/HTTPMatchExecutorDefault.h>
#include <Poco/Net/HTTPBasicCredentials.h>
#include <IO/CascadeWriteBuffer.h>
#include <IO/MemoryReadWriteBuffer.h>
#include <IO/WriteBufferFromTemporaryFile.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadBufferFromIStream.h>
#include <IO/BrotliReadBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ZlibInflatingReadBuffer.h>
#include <DataStreams/IBlockInputStream.h>
#include <Poco/Net/HTTPServerRequestImpl.h>
#include <Compression/CompressedReadBuffer.h>
#include <Interpreters/CustomHTTP/HTTPStreamsWithInput.h>
#include <Interpreters/CustomHTTP/HTTPStreamsWithOutput.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SYNTAX_ERROR;
extern const int LOGICAL_ERROR;
extern const int REQUIRED_PASSWORD;
extern const int INVALID_SESSION_TIMEOUT;
extern const int UNKNOWN_COMPRESSION_METHOD;
}
namespace
{
duration parseSessionTimeout(const HTMLForm & params, size_t default_session_timeout, size_t max_session_timeout)
{
size_t session_timeout = default_session_timeout;
if (params.has("session_timeout"))
{
std::string session_timeout_str = params.get("session_timeout");
ReadBufferFromString buf(session_timeout_str);
if (!tryReadIntText(session_timeout, buf) || !buf.eof())
throw Exception("Invalid session timeout: '" + session_timeout_str + "'", ErrorCodes::INVALID_SESSION_TIMEOUT);
if (session_timeout > max_session_timeout)
throw Exception(
"Session timeout '" + session_timeout_str + "' is larger than max_session_timeout: " + toString(max_session_timeout) +
". Maximum session timeout could be modified in configuration file.", ErrorCodes::INVALID_SESSION_TIMEOUT);
}
return std::chrono::seconds(session_timeout);
}
}
void HTTPMatchExecutor::execute(Context & context, HTTPServerRequest & request, HTTPServerResponse & response, HTMLForm & params, HTTPStreamsWithOutput & used_output) const
{
authentication(context, request, params);
std::shared_ptr<Context> session_context;
String session_id = params.get("session_id", "");
duration session_timeout = parseSessionTimeout(params, 1800, 3600);
SCOPE_EXIT({ detachSessionContext(session_context, session_id, session_timeout); });
session_context = attachSessionContext(context, params, session_id, session_timeout);
initClientInfo(context, request);
used_output.attachRequestAndResponse(context, request, response, params, /* TODO: keep_alive_time_out */ 0);
HTTPStreamsWithInput used_input(request, params);
collectParamsAndApplySettings(request, params, context);
Settings & query_settings = context.getSettingsRef();
used_input.attachSettings(context, query_settings, request);
used_output.attachSettings(context, query_settings, request);
String execute_query_string = getExecuteQuery(params);
ReadBufferPtr query_in_buffer = std::make_shared<ReadBufferFromString>(execute_query_string);
ReadBufferPtr in = query_in_buffer;
if (!needParsePostBody(request, params) || !context.getExternalTables().empty())
in = std::make_shared<ConcatReadBuffer>(*query_in_buffer, *used_input.in_maybe_internal_compressed);
executeQuery(*in, *used_output.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context,
[&response] (const String & content_type) { response.setContentType(content_type); },
[&response] (const String & current_query_id) { response.add("X-ClickHouse-Query-Id", current_query_id); });
used_output.finalize();
}
void HTTPMatchExecutor::initClientInfo(Context & context, HTTPServerRequest & request) const
{
ClientInfo & client_info = context.getClientInfo();
client_info.query_kind = ClientInfo::QueryKind::INITIAL_QUERY;
client_info.interface = ClientInfo::Interface::HTTP;
/// Query sent through HTTP interface is initial.
client_info.initial_user = client_info.current_user;
client_info.initial_query_id = client_info.current_query_id;
client_info.initial_address = client_info.current_address;
ClientInfo::HTTPMethod http_method = ClientInfo::HTTPMethod::UNKNOWN;
if (request.getMethod() == Poco::Net::HTTPServerRequest::HTTP_GET)
http_method = ClientInfo::HTTPMethod::GET;
else if (request.getMethod() == Poco::Net::HTTPServerRequest::HTTP_POST)
http_method = ClientInfo::HTTPMethod::POST;
client_info.http_method = http_method;
client_info.http_user_agent = request.get("User-Agent", "");
}
void HTTPMatchExecutor::authentication(Context & context, HTTPServerRequest & request, HTMLForm & params) const
{
auto user = request.get("X-ClickHouse-User", "");
auto password = request.get("X-ClickHouse-Key", "");
auto quota_key = request.get("X-ClickHouse-Quota", "");
if (user.empty() && password.empty() && quota_key.empty())
{
/// User name and password can be passed using query parameters
/// or using HTTP Basic auth (both methods are insecure).
if (request.hasCredentials())
{
Poco::Net::HTTPBasicCredentials credentials(request);
user = credentials.getUsername();
password = credentials.getPassword();
}
else
{
user = params.get("user", "default");
password = params.get("password", "");
}
quota_key = params.get("quota_key", "");
}
else
{
/// It is prohibited to mix different authorization schemes.
if (request.hasCredentials()
|| params.has("user")
|| params.has("password")
|| params.has("quota_key"))
{
throw Exception("Invalid authentication: it is not allowed to use X-ClickHouse HTTP headers and other authentication methods simultaneously", ErrorCodes::REQUIRED_PASSWORD);
}
}
std::string query_id = params.get("query_id", "");
context.setUser(user, password, request.clientAddress(), quota_key);
context.setCurrentQueryId(query_id);
}
std::shared_ptr<Context> HTTPMatchExecutor::attachSessionContext(
Context & context, HTMLForm & params, const String & session_id, const duration & session_timeout) const
{
if (!session_id.empty())
{
std::string session_check = params.get("session_check", "");
auto session_context = context.acquireSession(session_id, session_timeout, session_check == "1");
context = *session_context;
context.setSessionContext(*session_context);
return session_context;
}
return {};
}
void HTTPMatchExecutor::detachSessionContext(std::shared_ptr<Context> & session_context, const String & session_id, const duration & session_timeout) const
{
if (session_context)
session_context->releaseSession(session_id, session_timeout);
}
void HTTPMatchExecutor::collectParamsAndApplySettings(HTTPServerRequest & request, HTMLForm & params, Context & context) const
{
static const NameSet reserved_param_names{
"compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace", "buffer_size", "wait_end_of_query",
"session_id", "session_timeout", "session_check"};
Names reserved_param_suffixes;
auto param_could_be_skipped = [&] (const String & name)
{
if (reserved_param_names.count(name))
return true;
for (const String & suffix : reserved_param_suffixes)
{
if (endsWith(name, suffix))
return true;
}
return false;
};
/// Settings can be overridden in the query.
/// Some parameters (database, default_format, everything used in the code above) do not
/// belong to the Settings class.
/// 'readonly' setting values mean:
/// readonly = 0 - any query is allowed, client can change any setting.
/// readonly = 1 - only readonly queries are allowed, client can't change settings.
/// readonly = 2 - only readonly queries are allowed, client can change any setting except 'readonly'.
/// In theory if initially readonly = 0, the client can change any setting and then set readonly
/// to some other value.
/// Only readonly queries are allowed for HTTP GET requests.
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET)
{
Settings & settings = context.getSettingsRef();
if (settings.readonly == 0)
settings.readonly = 2;
}
bool has_multipart = startsWith(request.getContentType().data(), "multipart/form-data");
if (has_multipart || needParsePostBody(request, params))
{
ExternalTablesHandler handler(context, params);
params.load(request, request.stream(), handler);
if (has_multipart)
{
/// Skip unneeded parameters to avoid confusing them later with context settings or query parameters.
reserved_param_suffixes.reserve(3);
/// It is a bug and ambiguity with `date_time_input_format` and `low_cardinality_allow_in_native_format` formats/settings.
reserved_param_suffixes.emplace_back("_format");
reserved_param_suffixes.emplace_back("_types");
reserved_param_suffixes.emplace_back("_structure");
}
}
SettingsChanges settings_changes;
for (const auto & [key, value] : params)
{
if (key == "database")
context.setCurrentDatabase(value);
else if (key == "default_format")
context.setDefaultFormat(value);
else if (!param_could_be_skipped(key) && !acceptQueryParam(context, key, value))
settings_changes.push_back({key, value}); /// All other query parameters are treated as settings.
}
/// For external data we also want settings
context.checkSettingsConstraints(settings_changes);
context.applySettingsChanges(settings_changes);
}
}
#pragma once
#include <Core/Types.h>
#include <Common/HTMLForm.h>
#include <Common/Exception.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <IO/ReadBuffer.h>
namespace DB
{
class Context;
class HTTPMatchExecutor;
struct HTTPStreamsWithInput;
struct HTTPStreamsWithOutput;
using duration = std::chrono::steady_clock::duration;
using HTTPMatchExecutorPtr = std::shared_ptr<HTTPMatchExecutor>;
class HTTPMatchExecutor
{
public:
using HTTPServerRequest = Poco::Net::HTTPServerRequest;
using HTTPServerResponse = Poco::Net::HTTPServerResponse;
bool match(HTTPServerRequest & request, HTMLForm & params) const { return matchImpl(request, params); };
void execute(Context & context, HTTPServerRequest & request, HTTPServerResponse & response, HTMLForm & params, HTTPStreamsWithOutput & used_output) const;
virtual ~HTTPMatchExecutor() = default;
protected:
virtual bool matchImpl(HTTPServerRequest & request, HTMLForm & params) const = 0;
virtual String getExecuteQuery(HTMLForm & params) const = 0;
virtual bool needParsePostBody(HTTPServerRequest & request, HTMLForm & params) const = 0;
virtual bool acceptQueryParam(Context & context, const String & key, const String & value) const = 0;
void initClientInfo(Context & context, HTTPServerRequest & request) const;
void authentication(Context & context, HTTPServerRequest & request, HTMLForm & params) const;
void detachSessionContext(std::shared_ptr<Context> & context, const String & session_id, const duration & session_timeout) const;
std::shared_ptr<Context> attachSessionContext(Context & context, HTMLForm & params, const String & session_id, const duration & session_timeout) const;
void collectParamsAndApplySettings(HTTPServerRequest & request, HTMLForm & params, Context & context) const;
};
}
#pragma once
#include <Interpreters/CustomHTTP/HTTPMatchExecutor.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <Interpreters/executeQuery.h>
namespace DB
{
class HTTPMatchExecutorDefault : public HTTPMatchExecutor
{
protected:
bool matchImpl(HTTPServerRequest & /*request*/, HTMLForm & /*params*/) const override { return true; }
bool needParsePostBody(HTTPServerRequest & /*request*/, HTMLForm & /*params*/) const override { return false; }
String getExecuteQuery(HTMLForm & params) const override
{
String execute_query;
for (const auto & [key, value] : params)
{
if (key == "query")
execute_query += value;
}
return execute_query;
}
bool acceptQueryParam(Context &context, const String &key, const String &value) const override
{
if (startsWith(key, "param_"))
{
/// Save name and values of substitution in dictionary.
context.setQueryParameter(key.substr(strlen("param_")), value);
return true;
}
return key == "query";
}
};
}
#include <Interpreters/CustomHTTP/HTTPStreamsWithOutput.h>
#include <Interpreters/CustomHTTP/HTTPOutputStreams.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/copyData.h>
#include <IO/CascadeWriteBuffer.h>
......@@ -8,6 +8,7 @@
#include <IO/WriteBufferFromTemporaryFile.h>
#include <Compression/CompressedReadBuffer.h>
#include <IO/ConcatReadBuffer.h>
#include "HTTPOutputStreams.h"
namespace DB
......@@ -51,8 +52,14 @@ namespace
}
}
void HTTPStreamsWithOutput::attachSettings(Context & context, Settings & settings, HTTPServerRequest & request)
HTTPOutputStreams::HTTPOutputStreams(
Context & context, HTTPServerRequest & request, HTTPServerResponse & response, HTMLForm & form, size_t keep_alive_timeout)
: out(createResponseOut(request, response, keep_alive_timeout))
, out_maybe_compressed(createMaybeCompressionOut(form, out))
, out_maybe_delayed_and_compressed(createMaybeDelayedAndCompressionOut(context, form, out_maybe_compressed))
{
Settings & settings = context.getSettingsRef();
/// HTTP response compression is turned on only if the client signalled that they support it
/// (using Accept-Encoding header) and 'enable_http_compression' setting is turned on.
out->setCompression(out->getCompression() && settings.enable_http_compression);
......@@ -77,16 +84,8 @@ void HTTPStreamsWithOutput::attachSettings(Context & context, Settings & setting
}
}
void HTTPStreamsWithOutput::attachRequestAndResponse(
Context & context, HTTPServerRequest & request, HTTPServerResponse & response, HTMLForm & form, size_t keep_alive_timeout)
{
out = createEndpoint(request, response, keep_alive_timeout);
out_maybe_compressed = createMaybeCompressionEndpoint(form, out);
out_maybe_delayed_and_compressed = createMaybeDelayedAndCompressionEndpoint(context, form, out_maybe_compressed);
}
std::shared_ptr<WriteBufferFromHTTPServerResponse> HTTPStreamsWithOutput::createEndpoint(
HTTPServerRequest & request, HTTPServerResponse & response, size_t keep_alive_timeout)
std::shared_ptr<WriteBufferFromHTTPServerResponse> HTTPOutputStreams::createResponseOut(
HTTPServerRequest &request, HTTPServerResponse &response, size_t keep_alive_timeout)
{
/// The client can pass a HTTP header indicating supported compression method (gzip or deflate).
String http_response_compression_methods = request.get("Accept-Encoding", "");
......@@ -112,15 +111,15 @@ std::shared_ptr<WriteBufferFromHTTPServerResponse> HTTPStreamsWithOutput::create
request, response, keep_alive_timeout, false, CompressionMethod{}, DBMS_DEFAULT_BUFFER_SIZE);
}
WriteBufferPtr HTTPStreamsWithOutput::createMaybeCompressionEndpoint(HTMLForm & form, std::shared_ptr<WriteBufferFromHTTPServerResponse> & endpoint)
WriteBufferPtr HTTPOutputStreams::createMaybeCompressionOut(HTMLForm & form, std::shared_ptr<WriteBufferFromHTTPServerResponse> & out_)
{
/// Client can pass a 'compress' flag in the query string. In this case the query result is
/// compressed using internal algorithm. This is not reflected in HTTP headers.
bool internal_compression = form.getParsed<bool>("compress", false);
return internal_compression ? std::make_shared<CompressedWriteBuffer>(*endpoint) : WriteBufferPtr(endpoint);
return internal_compression ? std::make_shared<CompressedWriteBuffer>(*out_) : WriteBufferPtr(out_);
}
WriteBufferPtr HTTPStreamsWithOutput::createMaybeDelayedAndCompressionEndpoint(Context & context, HTMLForm & form, WriteBufferPtr & endpoint)
WriteBufferPtr HTTPOutputStreams::createMaybeDelayedAndCompressionOut(Context & context, HTMLForm & form, WriteBufferPtr & out_)
{
/// If it is specified, the whole result will be buffered.
/// First ~buffer_size bytes will be buffered in memory, the remaining bytes will be stored in temporary file.
......@@ -152,7 +151,7 @@ WriteBufferPtr HTTPStreamsWithOutput::createMaybeDelayedAndCompressionEndpoint(C
}
else
{
auto push_memory_buffer_and_continue = [next_buffer = endpoint] (const WriteBufferPtr & prev_buf)
auto push_memory_buffer_and_continue = [next_buffer = out_] (const WriteBufferPtr & prev_buf)
{
auto prev_memory_buffer = typeid_cast<MemoryWriteBuffer *>(prev_buf.get());
if (!prev_memory_buffer)
......@@ -170,10 +169,10 @@ WriteBufferPtr HTTPStreamsWithOutput::createMaybeDelayedAndCompressionEndpoint(C
return std::make_shared<CascadeWriteBuffer>(std::move(cascade_buffer1), std::move(cascade_buffer2));
}
return endpoint;
return out_;
}
void HTTPStreamsWithOutput::finalize() const
void HTTPOutputStreams::finalize() const
{
if (out_maybe_delayed_and_compressed != out_maybe_compressed)
{
......
......@@ -22,7 +22,7 @@ using HTTPServerResponse = Poco::Net::HTTPServerResponse;
* ↓
* WriteBufferFromHTTPServerResponse out
*/
struct HTTPStreamsWithOutput
struct HTTPOutputStreams
{
using HTTPResponseBufferPtr = std::shared_ptr<WriteBufferFromHTTPServerResponse>;
......@@ -32,17 +32,17 @@ struct HTTPStreamsWithOutput
/// Points to 'out' or to CompressedWriteBuffer(*out) or to CascadeWriteBuffer.
std::shared_ptr<WriteBuffer> out_maybe_delayed_and_compressed;
void finalize() const;
HTTPOutputStreams() = default;
WriteBufferPtr createMaybeDelayedAndCompressionEndpoint(Context & context, HTMLForm & form, WriteBufferPtr & endpoint);
HTTPOutputStreams(Context & context, HTTPServerRequest & request, HTTPServerResponse & response, HTMLForm & form, size_t keep_alive_timeout);
WriteBufferPtr createMaybeCompressionEndpoint(HTMLForm & form, std::shared_ptr<WriteBufferFromHTTPServerResponse> & endpoint);
void finalize() const;
HTTPResponseBufferPtr createEndpoint(HTTPServerRequest & request, HTTPServerResponse & response, size_t keep_alive_timeout);
WriteBufferPtr createMaybeDelayedAndCompressionOut(Context &context, HTMLForm &form, WriteBufferPtr &out_);
void attachSettings(Context & context, Settings & settings, HTTPServerRequest & request);
WriteBufferPtr createMaybeCompressionOut(HTMLForm & form, std::shared_ptr<WriteBufferFromHTTPServerResponse> & out_);
void attachRequestAndResponse(Context & context, HTTPServerRequest & request, HTTPServerResponse & response, HTMLForm & form, size_t keep_alive_timeout);
HTTPResponseBufferPtr createResponseOut(HTTPServerRequest & request, HTTPServerResponse & response, size_t keep_alive_timeout);
};
}
此差异已折叠。
......@@ -7,7 +7,7 @@
#include <Common/CurrentMetrics.h>
#include <Common/HTMLForm.h>
#include <Interpreters/CustomHTTP/HTTPStreamsWithOutput.h>
#include <Interpreters/CustomHTTP/HTTPOutputStreams.h>
namespace CurrentMetrics
......@@ -30,31 +30,22 @@ public:
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override;
/// This method is called right before the query execution.
virtual void customizeContext(DB::Context& /* context */) {}
private:
struct Output
using HTTPRequest = Poco::Net::HTTPServerRequest;
using HTTPResponse = Poco::Net::HTTPServerResponse;
struct SessionContextHolder
{
/* Raw data
* ↓
* CascadeWriteBuffer out_maybe_delayed_and_compressed (optional)
* ↓ (forwards data if an overflow is occur or explicitly via pushDelayedResults)
* CompressedWriteBuffer out_maybe_compressed (optional)
* ↓
* WriteBufferFromHTTPServerResponse out
*/
std::shared_ptr<WriteBufferFromHTTPServerResponse> out;
/// Points to 'out' or to CompressedWriteBuffer(*out), depending on settings.
std::shared_ptr<WriteBuffer> out_maybe_compressed;
/// Points to 'out' or to CompressedWriteBuffer(*out) or to CascadeWriteBuffer.
std::shared_ptr<WriteBuffer> out_maybe_delayed_and_compressed;
inline bool hasDelayed() const
{
return out_maybe_delayed_and_compressed != out_maybe_compressed;
}
~SessionContextHolder();
SessionContextHolder(IServer & accepted_server, HTMLForm & params);
void authentication(HTTPServerRequest & request, HTMLForm & params);
String session_id;
std::unique_ptr<Context> context = nullptr;
std::shared_ptr<Context> session_context = nullptr;
std::chrono::steady_clock::duration session_timeout;
};
IServer & server;
......@@ -66,20 +57,11 @@ private:
CurrentMetrics::Increment metric_increment{CurrentMetrics::HTTPConnection};
/// Also initializes 'used_output'.
void processQuery(
Poco::Net::HTTPServerRequest & request,
HTMLForm & params,
Poco::Net::HTTPServerResponse & response,
HTTPStreamsWithOutput & used_output);
void processQuery(HTTPRequest & request, HTMLForm & params, HTTPResponse & response, SessionContextHolder & holder);
void trySendExceptionToClient(
const std::string & s,
int exception_code,
Poco::Net::HTTPServerRequest & request,
Poco::Net::HTTPServerResponse & response,
HTTPStreamsWithOutput & used_output);
const std::string & message, int exception_code, HTTPRequest & request, HTTPResponse & response, HTTPOutputStreams & used_output);
static void pushDelayedResults(Output & used_output);
};
}
......@@ -26,6 +26,12 @@ struct HTMLForm : public Poco::Net::HTMLForm
readUrl(istr);
}
template <typename T>
bool check(const std::string & key, T check_value)
{
const auto & value = getParsed<T>(key, T());
return value == check_value;
}
template <typename T>
T getParsed(const std::string & key, T default_value)
......
......@@ -44,7 +44,7 @@
#include <Interpreters/SystemLog.h>
#include <Interpreters/Context.h>
#include <Interpreters/DDLWorker.h>
#include <Interpreters/CustomHTTP/HTTPMatchExecutorDefault.h>
#include <Interpreters/CustomHTTP/CustomExecutorDefault.h>
#include <Common/DNSResolver.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/UncompressedCache.h>
......@@ -1545,11 +1545,6 @@ void Context::setCluster(const String & cluster_name, const std::shared_ptr<Clus
shared->clusters->setCluster(cluster_name, cluster);
}
HTTPMatchExecutorPtr Context::getHTTPMatchExecutor()
{
return std::make_shared<HTTPMatchExecutorDefault>();
}
void Context::initializeSystemLogs()
{
auto lock = getLock();
......@@ -2049,6 +2044,11 @@ void Context::resetInputCallbacks()
input_blocks_reader = {};
}
std::pair<String, HTTPMatchExecutorPtr> Context::getCustomExecutor(Poco::Net::HTTPServerRequest & /*request*/)
{
return std::pair<String, HTTPMatchExecutorPtr>("Default", std::shared_ptr<CustomExecutorDefault>());
}
StorageID Context::resolveStorageID(StorageID storage_id, StorageNamespace where) const
{
......
......@@ -490,7 +490,7 @@ public:
Compiler & getCompiler();
HTTPMatchExecutorPtr getHTTPMatchExecutor();
std::pair<String, HTTPMatchExecutorPtr> getCustomExecutor(Poco::Net::HTTPServerRequest &request/*, HTMLForm & params*/);
/// Call after initialization before using system logs. Call for global context.
void initializeSystemLogs();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册