提交 6b716e51 编写于 作者: Z zhang2014

ISSUES-5436 support custom http [part 6]

上级 7aef95b0
......@@ -9,6 +9,8 @@ namespace DB
namespace ErrorCodes
{
extern const int SYNTAX_ERROR;
extern const int DUPLICATE_CUSTOM_EXECUTOR;
extern const int TOO_MANY_INPUT_CUSTOM_EXECUTOR;
}
bool CustomExecutor::match(Context & context, HTTPRequest & request, HTMLForm & params) const
......@@ -69,7 +71,7 @@ static CustomExecutorPtr createDefaultCustomExecutor()
return std::make_shared<CustomExecutor>(custom_matchers, custom_query_executors);
}
void CustomExecutors::updateCustomExecutors(const Configuration & config, const Settings & settings, const String & config_prefix)
void CustomExecutors::updateCustomExecutors(const Configuration & config, const Settings & /*settings*/, const String & config_prefix)
{
Configuration::Keys custom_executors_keys;
config.keys(config_prefix, custom_executors_keys);
......@@ -83,6 +85,11 @@ void CustomExecutors::updateCustomExecutors(const Configuration & config, const
else if (custom_executor_key.find('.') != String::npos)
throw Exception("CustomExecutor names with dots are not supported: '" + custom_executor_key + "'", ErrorCodes::SYNTAX_ERROR);
const auto & exists_executor = [&](auto & ele) { return ele.first == custom_executor_key; };
if (std::count_if(new_custom_executors.begin(), new_custom_executors.end(), exists_executor))
throw Exception("CustomExecutor name '" + custom_executor_key + "' already exists in system.",
ErrorCodes::DUPLICATE_CUSTOM_EXECUTOR);
new_custom_executors.push_back(
std::make_pair(custom_executor_key, createCustomExecutor(config, config_prefix + "." + custom_executor_key)));
}
......@@ -146,12 +153,23 @@ CustomExecutorPtr CustomExecutors::createCustomExecutor(const Configuration & co
custom_query_executors.push_back(query_executor_creator_it->second(config, config_prefix + "." + matcher_or_query_executor_type));
}
for (const auto & custom_executor_matcher : custom_executor_matchers)
custom_executor_matcher->checkQueryExecutor(custom_query_executors);
checkCustomMatchersAndQueryExecutors(custom_executor_matchers, custom_query_executors);
return std::make_shared<CustomExecutor>(custom_executor_matchers, custom_query_executors);
}
void CustomExecutors::checkCustomMatchersAndQueryExecutors(
std::vector<CustomExecutorMatcherPtr> & matchers, std::vector<CustomQueryExecutorPtr> & query_executors)
{
const auto & sum_func = [&](auto & ele) { return !ele->canBeParseRequestBody(); };
const auto & need_post_data_count = std::count_if(query_executors.begin(), query_executors.end(), sum_func);
if (need_post_data_count > 1)
throw Exception("The CustomExecutor can only contain one insert query.", ErrorCodes::TOO_MANY_INPUT_CUSTOM_EXECUTOR);
for (const auto & matcher : matchers)
matcher->checkQueryExecutors(query_executors);
}
std::pair<String, CustomExecutorPtr> CustomExecutors::getCustomExecutor(Context & context, Poco::Net::HTTPServerRequest & request, HTMLForm & params) const
{
std::shared_lock<std::shared_mutex> lock(rwlock);
......@@ -165,8 +183,14 @@ std::pair<String, CustomExecutorPtr> CustomExecutors::getCustomExecutor(Context
CustomExecutors::CustomExecutors(const Configuration & config, const Settings & settings, const String & config_prefix)
{
registerCustomMatcher("URL", [&](const auto & config, const auto & prefix)
{ return std::make_shared<HTTPURLCustomExecutorMatcher>(config, prefix); });
registerCustomMatcher("URL", [&](const auto & matcher_config, const auto & prefix)
{ return std::make_shared<HTTPURLCustomExecutorMatcher>(matcher_config, prefix); });
registerCustomMatcher("method", [&](const auto & matcher_config, const auto & prefix)
{ return std::make_shared<HTTPMethodCustomExecutorMatcher>(matcher_config, prefix); });
registerQueryExecutor("query", [&](const auto & matcher_config, const auto & prefix)
{ return std::make_shared<ConstQueryCustomQueryExecutor>(matcher_config, prefix); });
updateCustomExecutors(config, settings, config_prefix);
}
......
......@@ -28,7 +28,7 @@ class CustomExecutors
{
public:
using Configuration = Poco::Util::AbstractConfiguration;
CustomExecutors(const Configuration & config, const Settings & settings, const String & config_prefix = "CustomHTTP");
CustomExecutors(const Configuration & config, const Settings & settings, const String & config_prefix = "custom_http");
CustomExecutors(const CustomExecutors &) = delete;
CustomExecutors & operator=(const CustomExecutors &) = delete;
......@@ -36,7 +36,7 @@ public:
using QueryExecutorCreator = std::function<CustomQueryExecutorPtr(const Configuration &, const String &)>;
void registerQueryExecutor(const String & query_executor_name, const QueryExecutorCreator & creator);
using CustomMatcherCreator = const std::function<CustomExecutorMatcherPtr(const Configuration &, const String &)>;
using CustomMatcherCreator = std::function<CustomExecutorMatcherPtr(const Configuration &, const String &)>;
void registerCustomMatcher(const String & matcher_name, const CustomMatcherCreator & creator);
void updateCustomExecutors(const Configuration & config, const Settings & settings, const String & config_prefix);
......@@ -49,6 +49,9 @@ private:
std::unordered_map<String, CustomMatcherCreator> custom_matcher_creators;
CustomExecutorPtr createCustomExecutor(const Configuration & config, const String & config_prefix);
void checkCustomMatchersAndQueryExecutors(std::vector<CustomExecutorMatcherPtr> & matchers, std::vector<CustomQueryExecutorPtr> & query_executors);
};
class CustomExecutor
......
......@@ -22,7 +22,7 @@ class CustomExecutorMatcher
public:
virtual ~CustomExecutorMatcher() = default;
virtual bool checkQueryExecutor(const std::vector<CustomQueryExecutorPtr> & check_executors) const = 0;
virtual bool checkQueryExecutors(const std::vector<CustomQueryExecutorPtr> &check_executors) const = 0;
virtual bool match(Context & context, Poco::Net::HTTPServerRequest & request, HTMLForm & params) const = 0;
};
......@@ -33,63 +33,88 @@ using CustomExecutorMatcherPtr = std::shared_ptr<CustomExecutorMatcher>;
class AlwaysMatchedCustomExecutorMatcher : public CustomExecutorMatcher
{
public:
bool checkQueryExecutor(const std::vector<CustomQueryExecutorPtr> & /*check_executors*/) const override { return true; }
bool checkQueryExecutors(const std::vector<CustomQueryExecutorPtr> & /*check_executors*/) const override { return true; }
bool match(Context & /*context*/, Poco::Net::HTTPServerRequest & /*request*/, HTMLForm & /*params*/) const override { return true; }
};
class HTTPURLCustomExecutorMatcher : public CustomExecutorMatcher
class HTTPMethodCustomExecutorMatcher : public CustomExecutorMatcher
{
public:
HTTPURLCustomExecutorMatcher(const Poco::Util::AbstractConfiguration & configuration, const String & url_config_key)
: url_match_searcher(analyzeURLPatten(configuration.getString(url_config_key, ""), params_name_extract_from_url))
HTTPMethodCustomExecutorMatcher(const Poco::Util::AbstractConfiguration & configuration, const String & method_config_key)
{
match_method = Poco::toLower(configuration.getString(method_config_key));
}
bool checkQueryExecutor(const std::vector<CustomQueryExecutorPtr> & custom_query_executors) const override
bool checkQueryExecutors(const std::vector<CustomQueryExecutorPtr> & /*check_executors*/) const override { return true; }
bool match(Context & /*context*/, Poco::Net::HTTPServerRequest & request, HTMLForm & /*params*/) const override
{
for (const auto & param_name_from_url : params_name_extract_from_url)
{
bool found_param_name = false;
for (const auto & custom_query_executor : custom_query_executors)
{
if (custom_query_executor->isQueryParam(param_name_from_url))
{
found_param_name = true;
break;
}
}
return Poco::toLower(request.getMethod()) == match_method;
}
if (!found_param_name)
throw Exception("The param name '" + param_name_from_url + "' is uselessed.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
private:
String match_method;
};
}
class HTTPURLCustomExecutorMatcher : public CustomExecutorMatcher
{
public:
HTTPURLCustomExecutorMatcher(const Poco::Util::AbstractConfiguration & configuration, const String & url_config_key)
{
regex_matcher = std::make_unique<re2_st::RE2>(configuration.getString(url_config_key));
}
bool checkQueryExecutors(const std::vector<CustomQueryExecutorPtr> & custom_query_executors) const override
{
for (const auto & named_capturing_group : regex_matcher->NamedCapturingGroups())
if (!checkQueryExecutors(named_capturing_group.first, custom_query_executors))
throw Exception("The param name '" + named_capturing_group.first + "' is uselessed.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return true;
}
bool match(Context & /*context*/, Poco::Net::HTTPServerRequest & request, HTMLForm & /*params*/) const override
bool match(Context & context, Poco::Net::HTTPServerRequest & request, HTMLForm & /*params*/) const override
{
const String request_uri = request.getURI();
re2_st::StringPiece query_params_matches[params_name_extract_from_url.size()];
int num_captures = regex_matcher->NumberOfCapturingGroups() + 1;
re2_st::StringPiece matches[num_captures];
re2_st::StringPiece input(request_uri.data(), request_uri.size());
if (regex_matcher->Match(input, 0, request_uri.size(), re2_st::RE2::Anchor::UNANCHORED, matches, num_captures))
{
const auto & full_match = matches[0];
const char * url_end = request_uri.data() + request_uri.size();
const char * not_matched_begin = request_uri.data() + full_match.size();
if (not_matched_begin != url_end && *not_matched_begin == '/')
++not_matched_begin;
// re2_st::StringPiece input;
// if (url_match_searcher.Match(input, start_pos, input.length(), re2_st::RE2::Anchor::UNANCHORED, query_params_matches, num_captures))
// {
//
// }
if (not_matched_begin == url_end || *not_matched_begin == '?')
{
for (const auto & named_capturing_group : regex_matcher->NamedCapturingGroups())
{
const auto & capturing_value = matches[named_capturing_group.second];
context.setQueryParameter(named_capturing_group.first, String(capturing_value.data(), capturing_value.size()));
}
return true;
}
}
return false;
}
private:
re2_st::RE2 url_match_searcher;
std::vector<String> params_name_extract_from_url;
std::unique_ptr<re2_st::RE2> regex_matcher;
String analyzeURLPatten(const String & /*url_patten*/, std::vector<String> & /*matches*/)
bool checkQueryExecutors(const String & param_name, const std::vector<CustomQueryExecutorPtr> & custom_query_executors) const
{
return ".+";
/// TODO: first we replace all capture group
/// TODO: second we replace all ${identifier}
for (const auto & custom_query_executor : custom_query_executors)
if (custom_query_executor->isQueryParam(param_name))
return true;
return false;
}
};
......
......@@ -6,10 +6,14 @@
#include <IO/ReadBufferFromString.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/QueryParameterVisitor.h>
#include <Interpreters/CustomHTTP/HTTPInputStreams.h>
#include <Interpreters/CustomHTTP/HTTPOutputStreams.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ParserQuery.h>
#include <Parsers/ASTInsertQuery.h>
namespace DB
{
......@@ -29,6 +33,54 @@ public:
using CustomQueryExecutorPtr = std::shared_ptr<CustomQueryExecutor>;
class ConstQueryCustomQueryExecutor : public CustomQueryExecutor
{
public:
ConstQueryCustomQueryExecutor(const Poco::Util::AbstractConfiguration & configuration, const String & config_key)
{
execute_query = configuration.getString(config_key, "");
const char * query_begin = execute_query.data();
const char * query_end = execute_query.data() + execute_query.size();
ParserQuery parser(query_end, false);
ASTPtr extract_query_ast = parseQuery(parser, query_begin, query_end, "", 0);
QueryParameterVisitor{query_params_name}.visit(extract_query_ast);
can_be_parse_request_body = !extract_query_ast->as<ASTInsertQuery>();
}
bool isQueryParam(const String & param_name) const override { return query_params_name.count(param_name); }
bool canBeParseRequestBody(Poco::Net::HTTPServerRequest & /*request*/, HTMLForm & /*form*/) const override { return can_be_parse_request_body; }
void executeQueryImpl(
Context & context, Poco::Net::HTTPServerRequest & /*request*/, Poco::Net::HTTPServerResponse & response,
HTMLForm & params, const HTTPInputStreams & /*input_streams*/, const HTTPOutputStreams & output_streams) const override
{
prepareQueryParams(context, params);
ReadBufferPtr execute_query_buf = std::make_shared<ReadBufferFromString>(execute_query);
executeQuery(
*execute_query_buf, *output_streams.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context,
[&response] (const String & content_type) { response.setContentType(content_type); },
[&response] (const String & current_query_id) { response.add("X-ClickHouse-Query-Id", current_query_id); }
);
}
private:
String execute_query;
NameSet query_params_name;
bool can_be_parse_request_body{false};
void prepareQueryParams(Context & context, HTMLForm & params) const
{
for (const auto & param : params)
if (isQueryParam(param.first))
context.setQueryParameter(param.first, param.second);
}
};
class ExtractQueryParamCustomQueryExecutor : public CustomQueryExecutor
{
public:
......
#pragma once
#include <Core/Names.h>
#include <Parsers/IAST.h>
#include <Parsers/ASTQueryParameter.h>
namespace DB
{
class QueryParameterVisitor
{
public:
QueryParameterVisitor(NameSet & parameters_name) : query_parameters(parameters_name) {}
void visit(const ASTPtr & ast)
{
for (const auto & child : ast->children)
{
if (const auto & query_parameter = child->as<ASTQueryParameter>())
visitQueryParameter(*query_parameter);
else
visit(child);
}
}
private:
NameSet & query_parameters;
void visitQueryParameter(const ASTQueryParameter & query_parameter)
{
query_parameters.insert(query_parameter.name);
}
};
}
......@@ -267,10 +267,14 @@ void HTTPHandler::trySendExceptionToClient(
&& !request.stream().eof() && exception_code != ErrorCodes::HTTP_LENGTH_REQUIRED)
request.stream().ignore(std::numeric_limits<std::streamsize>::max());
if (exception_code == ErrorCodes::UNKNOWN_USER || exception_code == ErrorCodes::WRONG_PASSWORD ||
exception_code == ErrorCodes::REQUIRED_PASSWORD || exception_code != ErrorCodes::HTTP_LENGTH_REQUIRED)
if (exception_code == ErrorCodes::UNKNOWN_USER || exception_code == ErrorCodes::WRONG_PASSWORD
|| exception_code == ErrorCodes::REQUIRED_PASSWORD || exception_code == ErrorCodes::HTTP_LENGTH_REQUIRED)
{
response.requireAuthentication("ClickHouse server HTTP API");
if (exception_code == ErrorCodes::HTTP_LENGTH_REQUIRED)
response.setStatusAndReason(exceptionCodeToHTTPStatus(exception_code));
else
response.requireAuthentication("ClickHouse server HTTP API");
response.send() << message << std::endl;
}
else
......
......@@ -512,7 +512,6 @@
-->
<format_schema_path>/var/lib/clickhouse/format_schemas/</format_schema_path>
<!-- Uncomment to use query masking rules.
name - name for the rule (optional)
regexp - RE2 compatible regular expression (mandatory)
......@@ -526,6 +525,15 @@
</query_masking_rules>
-->
<!-- Uncomment to use custom http.
<custom_http>
<test_select_single>
<url><![cdata[/test/(?p<test_url_value>\w+)]]></url>
<query>select {test_url_value:uint64}, {test_query_param_value:string}, {test_query_post_value:string}</query>
</test_select_single>
</custom_http>
-->
<!-- Uncomment to disable ClickHouse internal DNS caching. -->
<!-- <disable_internal_dns_cache>1</disable_internal_dns_cache> -->
</yandex>
......@@ -31,11 +31,6 @@ void WriteBufferFromHTTPServerResponse::startSendHeaders()
response.set("Access-Control-Allow-Origin", "*");
setResponseDefaultHeaders(response, keep_alive_timeout);
#if defined(POCO_CLICKHOUSE_PATCH)
if (request.getMethod() != Poco::Net::HTTPRequest::HTTP_HEAD)
std::tie(response_header_ostr, response_body_ostr) = response.beginSend();
#endif
}
}
......@@ -96,46 +91,79 @@ void WriteBufferFromHTTPServerResponse::finishSendHeaders()
}
void WriteBufferFromHTTPServerResponse::nextImpl()
void WriteBufferFromHTTPServerResponse::choiceSendEncode()
{
if (!out && request.getMethod() != Poco::Net::HTTPRequest::HTTP_HEAD)
{
std::lock_guard lock(mutex);
startSendHeaders();
if (!out && request.getMethod() != Poco::Net::HTTPRequest::HTTP_HEAD)
if (!compress)
{
if (compress)
{
auto content_encoding_name = toContentEncodingName(compression_method);
#if defined(POCO_CLICKHOUSE_PATCH)
*response_header_ostr << "Content-Encoding: " << content_encoding_name << "\r\n";
*response_header_ostr << "Content-Encoding: gzip\r\n";
#else
response.set("Content-Encoding", content_encoding_name);
response.set("Content-Encoding", "gzip");
response_body_ostr = &(response.send());
#endif
}
#if !defined(POCO_CLICKHOUSE_PATCH)
out_raw = std::make_unique<WriteBufferFromOStream>(*response_body_ostr);
deflating_buf.emplace(std::move(out_raw), compression_method, compression_level, working_buffer.size(), working_buffer.begin());
out = &*deflating_buf;
}
else if (compression_method == CompressionMethod::Zlib)
{
#if defined(POCO_CLICKHOUSE_PATCH)
*response_header_ostr << "Content-Encoding: deflate\r\n";
#else
/// Newline autosent by response.send()
response_body_ostr = &(response.send());
#endif
out_raw = std::make_unique<WriteBufferFromOStream>(*response_body_ostr);
deflating_buf.emplace(std::move(out_raw), compression_method, compression_level, working_buffer.size(), working_buffer.begin());
out = &*deflating_buf;
}
#if USE_BROTLI
else if (compression_method == CompressionMethod::Brotli)
{
#if defined(POCO_CLICKHOUSE_PATCH)
std::tie(response_header_ostr, response_body_ostr) = response.beginSend();
if (headers_started_sending && !headers_finished_sending)
*response_header_ostr << "Content-Encoding: " << encoding_type << "\r\n";
#else
response.set("Content-Encoding", content_encoding_name);
#endif
/// We reuse our buffer in "out" to avoid extra allocations and copies.
/// Newline autosent by response.send()
/// This may result in an extra empty line in the response body
response_body_ostr = &(response.send());
#endif
};
if (compress)
out = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromOStream>(*response_body_ostr),
compress ? compression_method : CompressionMethod::None,
compression_level,
working_buffer.size(),
working_buffer.begin());
out_raw = std::make_unique<WriteBufferFromOStream>(*response_body_ostr, working_buffer.size(), working_buffer.begin());
out = &*out_raw;
}
#if USE_BROTLI
else if (compression_method == CompressionMethod::Brotli)
{
set_encoding_type("br");
out_raw.emplace(*response_body_ostr);
brotli_buf.emplace(*out_raw, compression_level, working_buffer.size(), working_buffer.begin());
out = &*brotli_buf;
}
#endif
else
out = std::make_unique<WriteBufferFromOStream>(
*response_body_ostr,
working_buffer.size(),
working_buffer.begin());
throw Exception("Logical error: unknown compression method passed to WriteBufferFromHTTPServerResponse",
ErrorCodes::LOGICAL_ERROR);
/// Use memory allocated for the outer buffer in the buffer pointed to by out. This avoids extra allocation and copy.
}
}
}
void WriteBufferFromHTTPServerResponse::nextImpl()
{
{
std::lock_guard lock(mutex);
startSendHeaders();
choiceSendEncode();
finishSendHeaders();
}
......@@ -207,7 +235,6 @@ void WriteBufferFromHTTPServerResponse::finalize()
}
}
WriteBufferFromHTTPServerResponse::~WriteBufferFromHTTPServerResponse()
{
try
......
......@@ -87,6 +87,8 @@ private:
/// This method finish headers with \r\n, allowing to start to send body.
void finishSendHeaders();
void choiceSendEncode();
void nextImpl() override;
public:
......
......@@ -491,7 +491,7 @@ public:
Compiler & getCompiler();
void setCustomExecutorConfig(const ConfigurationPtr & config, const String & config_prefix = "CustomHTTP");
void setCustomExecutorConfig(const ConfigurationPtr & config, const String & config_prefix = "custom_http");
std::pair<String, CustomExecutorPtr> getCustomExecutor(Poco::Net::HTTPServerRequest &request, HTMLForm & params);
/// Call after initialization before using system logs. Call for global context.
......
......@@ -6,6 +6,7 @@
#include <DataTypes/IDataType.h>
#include <DataTypes/DataTypeFactory.h>
#include <Formats/FormatSettings.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <Parsers/ASTLiteral.h>
......@@ -55,7 +56,10 @@ void ReplaceQueryParameterVisitor::visitQueryParameter(ASTPtr & ast)
IColumn & temp_column = *temp_column_ptr;
ReadBufferFromString read_buffer{value};
FormatSettings format_settings;
data_type->deserializeAsTextEscaped(temp_column, read_buffer, format_settings);
skipWhitespaceIfAny(read_buffer); /// Skip white space on both sides
data_type->deserializeAsWholeText(temp_column, read_buffer, format_settings);
skipWhitespaceIfAny(read_buffer); /// Skip white space on both sides
if (!read_buffer.eof())
throw Exception("Value " + value + " cannot be parsed as " + type_name + " for query parameter '" + ast_param.name + "'"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册