提交 847f7ab4 编写于 作者: Z zhang2014

ISSUES-5436 fix build failure & fix test failure

上级 07ed4ba4
#pragma once
#include <Interpreters/CustomHTTP/QueryExecutorAndMatcher.h>
namespace DB
{
class AlwaysQueryMatcher : public QueryMatcher
{
public:
bool checkQueryExecutors(const std::vector<QueryExecutorPtr> & /*check_executors*/) const override { return true; }
bool match(Context & /*context*/, Poco::Net::HTTPServerRequest & /*request*/, HTMLForm & /*params*/) const override { return true; }
};
}
#include <Interpreters/CustomHTTP/ConstQueryExecutor.h>
#include <Parsers/IAST.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ParserQuery.h>
#include <Parsers/ASTInsertQuery.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/QueryParameterVisitor.h>
#include <Interpreters/CustomHTTP/ExtractorContextChange.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_CUSTOM_EXECUTOR_PARAM;
}
void prepareQueryParams(Context & context, HTMLForm & params, const NameSet & query_params_name)
{
for (const auto & param : params)
if (query_params_name.count(param.first))
context.setQueryParameter(param.first, param.second);
}
QueryExecutorConst::QueryExecutorConst(const Poco::Util::AbstractConfiguration & configuration, const String & config_key)
{
execute_query = configuration.getString(config_key);
const char * query_begin = execute_query.data();
const char * query_end = execute_query.data() + execute_query.size();
ParserQuery parser(query_end, false);
ASTPtr extract_query_ast = parseQuery(parser, query_begin, query_end, "", 0);
QueryParameterVisitor{query_params_name}.visit(extract_query_ast);
can_be_parse_request_body = !extract_query_ast->as<ASTInsertQuery>();
const auto & reserved_params_name = ExtractorContextChange::getReservedParamNames();
for (const auto & prepared_param_name : query_params_name)
{
if (Settings::findIndex(prepared_param_name) != Settings::npos || reserved_params_name.count(prepared_param_name))
throw Exception(
"Illegal custom executor query param name '" + prepared_param_name + "', Because it's a reserved name or Settings name",
ErrorCodes::ILLEGAL_CUSTOM_EXECUTOR_PARAM);
}
}
void QueryExecutorConst::executeQueryImpl(
Context & context, Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response,
HTMLForm & params, const HTTPInputStreams & input_streams, const HTTPOutputStreams & output_streams) const
{
ReadBufferPtr temp_query_buf;
prepareQueryParams(context, params, query_params_name);
ReadBufferPtr execute_query_buf = std::make_shared<ReadBufferFromString>(execute_query);
if (!canBeParseRequestBody() && !startsWith(request.getContentType().data(), "multipart/form-data"))
{
temp_query_buf = execute_query_buf; /// we create a temporary reference for not to be destroyed
execute_query_buf = std::make_unique<ConcatReadBuffer>(*temp_query_buf, *input_streams.in_maybe_internal_compressed);
}
executeQuery(
*execute_query_buf, *output_streams.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context,
[&response] (const String & content_type) { response.setContentType(content_type); },
[&response] (const String & current_query_id) { response.add("X-ClickHouse-Query-Id", current_query_id); }
);
}
}
#pragma once
#include <Interpreters/CustomHTTP/QueryExecutorAndMatcher.h>
namespace DB
{
class QueryExecutorConst : public QueryExecutor
{
public:
using HTTPServerRequest = Poco::Net::HTTPServerRequest;
using HTTPServerResponse = Poco::Net::HTTPServerResponse;
bool canBeParseRequestBody() const override { return can_be_parse_request_body; }
bool isQueryParam(const String & param_name) const override { return query_params_name.count(param_name); }
QueryExecutorConst(const Poco::Util::AbstractConfiguration & configuration, const String & config_key);
void executeQueryImpl(
Context & context, HTTPServerRequest & request, HTTPServerResponse & response,
HTMLForm & params, const HTTPInputStreams & input_streams, const HTTPOutputStreams & output_streams) const override;
private:
String execute_query;
NameSet query_params_name;
bool can_be_parse_request_body{false};
};
}
#include <Interpreters/CustomHTTP/CustomExecutor.h>
#include <Interpreters/CustomHTTP/HTTPOutputStreams.h>
#include "CustomExecutor.h"
#include <Interpreters/CustomHTTP/ConstQueryExecutor.h>
#include <Interpreters/CustomHTTP/DynamicQueryExecutor.h>
#include <Interpreters/CustomHTTP/URLQueryMatcher.h>
#include <Interpreters/CustomHTTP/MethodQueryMatcher.h>
#include <Interpreters/CustomHTTP/AlwaysQueryMatcher.h>
namespace DB
......@@ -9,10 +13,17 @@ namespace DB
namespace ErrorCodes
{
extern const int SYNTAX_ERROR;
extern const int DUPLICATE_CUSTOM_EXECUTOR;
extern const int UNKNOW_QUERY_EXECUTOR;
extern const int TOO_MANY_INPUT_CUSTOM_EXECUTOR;
}
CustomExecutor::CustomExecutor(
const std::vector<QueryMatcherPtr> & matchers_,
const std::vector<QueryExecutorPtr> & query_executors_)
: matchers(matchers_), query_executors(query_executors_)
{
}
bool CustomExecutor::match(Context & context, HTTPRequest & request, HTMLForm & params) const
{
for (const auto & matcher : matchers)
......@@ -35,11 +46,11 @@ bool CustomExecutor::isQueryParam(const String & param_name) const
return true;
}
bool CustomExecutor::canBeParseRequestBody(HTTPRequest & request, HTMLForm & params) const
bool CustomExecutor::canBeParseRequestBody() const
{
for (const auto & query_executor : query_executors)
{
if (!query_executor->canBeParseRequestBody(request, params))
if (!query_executor->canBeParseRequestBody())
return false;
}
......@@ -57,20 +68,6 @@ void CustomExecutor::executeQuery(
output_streams.finalize();
}
CustomExecutor::CustomExecutor(
const std::vector<CustomExecutorMatcherPtr> & matchers_, const std::vector<CustomQueryExecutorPtr> & query_executors_)
: matchers(matchers_), query_executors(query_executors_)
{
}
static CustomExecutorPtr createDefaultCustomExecutor()
{
std::vector<CustomExecutorMatcherPtr> custom_matchers{std::make_shared<AlwaysMatchedCustomExecutorMatcher>()};
std::vector<CustomQueryExecutorPtr> custom_query_executors{std::make_shared<ExtractQueryParamCustomQueryExecutor>()};
return std::make_shared<CustomExecutor>(custom_matchers, custom_query_executors);
}
void CustomExecutors::updateCustomExecutors(const Configuration & config, const Settings & /*settings*/, const String & config_prefix)
{
Configuration::Keys custom_executors_keys;
......@@ -80,91 +77,91 @@ void CustomExecutors::updateCustomExecutors(const Configuration & config, const
for (const auto & custom_executor_key : custom_executors_keys)
{
if (custom_executor_key == "Default")
throw Exception("CustomExecutor cannot be 'Default'.", ErrorCodes::SYNTAX_ERROR);
else if (custom_executor_key.find('.') != String::npos)
if (custom_executor_key.find('.') != String::npos)
throw Exception("CustomExecutor names with dots are not supported: '" + custom_executor_key + "'", ErrorCodes::SYNTAX_ERROR);
const auto & exists_executor = [&](auto & ele) { return ele.first == custom_executor_key; };
if (std::count_if(new_custom_executors.begin(), new_custom_executors.end(), exists_executor))
throw Exception("CustomExecutor name '" + custom_executor_key + "' already exists in system.",
ErrorCodes::DUPLICATE_CUSTOM_EXECUTOR);
new_custom_executors.push_back(
std::make_pair(custom_executor_key, createCustomExecutor(config, config_prefix + "." + custom_executor_key)));
new_custom_executors.push_back({custom_executor_key, createCustomExecutor(config, config_prefix, custom_executor_key)});
}
new_custom_executors.push_back(std::make_pair("Default", createDefaultCustomExecutor()));
std::unique_lock<std::shared_mutex> lock(rwlock);
custom_executors = new_custom_executors;
}
void CustomExecutors::registerCustomMatcher(const String & matcher_name, const CustomExecutors::CustomMatcherCreator & creator)
{
const auto & matcher_creator_it = custom_matcher_creators.find(matcher_name);
const auto & query_executor_creator_it = query_executor_creators.find(matcher_name);
if (matcher_creator_it != custom_matcher_creators.end() && query_executor_creator_it != query_executor_creators.end())
throw Exception("LOGICAL_ERROR QueryMatcher name must be unique between the QueryExecutor and QueryMatcher.",
ErrorCodes::LOGICAL_ERROR);
custom_matcher_creators[matcher_name] = creator;
}
void CustomExecutors::registerQueryExecutor(const String & query_executor_name, const CustomExecutors::QueryExecutorCreator & creator)
{
const auto & matcher_creator_it = custom_matcher_creators.find(query_executor_name);
const auto & query_executor_creator_it = query_executor_creators.find(query_executor_name);
if (matcher_creator_it != custom_matcher_creators.end() && query_executor_creator_it != query_executor_creators.end())
throw Exception("LOGICAL_ERROR CustomQueryExecutor name must be unique between the CustomQueryExecutor and CustomExecutorMatcher.",
ErrorCodes::LOGICAL_ERROR);
throw Exception("LOGICAL_ERROR QueryExecutor name must be unique between the QueryExecutor and QueryMatcher.",
ErrorCodes::LOGICAL_ERROR);
query_executor_creators[query_executor_name] = creator;
}
void CustomExecutors::registerCustomMatcher(const String & matcher_name, const CustomExecutors::CustomMatcherCreator & creator)
String fixMatcherOrExecutorTypeName(const String & matcher_or_executor_type_name)
{
const auto & matcher_creator_it = custom_matcher_creators.find(matcher_name);
const auto & query_executor_creator_it = query_executor_creators.find(matcher_name);
if (matcher_creator_it != custom_matcher_creators.end() && query_executor_creator_it != query_executor_creators.end())
throw Exception("LOGICAL_ERROR CustomExecutorMatcher name must be unique between the CustomQueryExecutor and CustomExecutorMatcher.",
ErrorCodes::LOGICAL_ERROR);
custom_matcher_creators[matcher_name] = creator;
auto type_name_end_pos = matcher_or_executor_type_name.find('[');
return type_name_end_pos == String::npos ? matcher_or_executor_type_name : matcher_or_executor_type_name.substr(0, type_name_end_pos);
}
CustomExecutorPtr CustomExecutors::createCustomExecutor(const Configuration & config, const String & config_prefix)
CustomExecutorPtr CustomExecutors::createCustomExecutor(const Configuration & config, const String & config_prefix, const String & name)
{
Configuration::Keys matchers_or_query_executors_type;
config.keys(config_prefix, matchers_or_query_executors_type);
Configuration::Keys matchers_key;
config.keys(config_prefix + "." + name, matchers_key);
std::vector<CustomQueryExecutorPtr> custom_query_executors;
std::vector<CustomExecutorMatcherPtr> custom_executor_matchers;
std::vector<QueryMatcherPtr> query_matchers;
std::vector<QueryExecutorPtr> query_executors;
for (const auto & matcher_or_query_executor_type : matchers_or_query_executors_type)
for (const auto & matcher_key : matchers_key)
{
String matcher_or_query_executor_type = fixMatcherOrExecutorTypeName(matcher_key);
if (matcher_or_query_executor_type.find('.') != String::npos)
throw Exception(
"CustomMatcher or CustomQueryExecutor names with dots are not supported: '" + matcher_or_query_executor_type + "'",
throw Exception("CustomMatcher or QueryExecutor names with dots are not supported: '" + matcher_or_query_executor_type + "'",
ErrorCodes::SYNTAX_ERROR);
const auto & matcher_creator_it = custom_matcher_creators.find(matcher_or_query_executor_type);
const auto & query_executor_creator_it = query_executor_creators.find(matcher_or_query_executor_type);
if (matcher_creator_it == custom_matcher_creators.end() && query_executor_creator_it == query_executor_creators.end())
throw Exception("CustomMatcher or CustomQueryExecutor '" + matcher_or_query_executor_type + "' is not implemented.",
ErrorCodes::NOT_IMPLEMENTED);
throw Exception("CustomMatcher or QueryExecutor '" + matcher_or_query_executor_type + "' is not implemented.",
ErrorCodes::NOT_IMPLEMENTED);
if (matcher_creator_it != custom_matcher_creators.end())
custom_executor_matchers.push_back(matcher_creator_it->second(config, config_prefix + "." + matcher_or_query_executor_type));
query_matchers.push_back(matcher_creator_it->second(config, config_prefix + "." + name + "." + matcher_key));
if (query_executor_creator_it != query_executor_creators.end())
custom_query_executors.push_back(query_executor_creator_it->second(config, config_prefix + "." + matcher_or_query_executor_type));
query_executors.push_back(query_executor_creator_it->second(config, config_prefix + "." + name + "." + matcher_key));
}
checkCustomMatchersAndQueryExecutors(custom_executor_matchers, custom_query_executors);
return std::make_shared<CustomExecutor>(custom_executor_matchers, custom_query_executors);
checkQueryMatchersAndExecutors(name, query_matchers, query_executors);
return std::make_shared<CustomExecutor>(query_matchers, query_executors);
}
void CustomExecutors::checkCustomMatchersAndQueryExecutors(
std::vector<CustomExecutorMatcherPtr> & matchers, std::vector<CustomQueryExecutorPtr> & query_executors)
void CustomExecutors::checkQueryMatchersAndExecutors(
const String & name, std::vector<QueryMatcherPtr> & matchers, std::vector<QueryExecutorPtr> & query_executors)
{
const auto & sum_func = [&](auto & ele) { return !ele->canBeParseRequestBody(); };
if (matchers.empty() || query_executors.empty())
throw Exception("The CustomExecutor '" + name + "' must contain a Matcher and a QueryExecutor.", ErrorCodes::SYNTAX_ERROR);
const auto & sum_func = [&](auto & ele) -> bool { return !ele->canBeParseRequestBody(); };
const auto & need_post_data_count = std::count_if(query_executors.begin(), query_executors.end(), sum_func);
if (need_post_data_count > 1)
throw Exception("The CustomExecutor can only contain one insert query.", ErrorCodes::TOO_MANY_INPUT_CUSTOM_EXECUTOR);
throw Exception("The CustomExecutor '" + name + "' can only contain one insert query." + toString(need_post_data_count), ErrorCodes::TOO_MANY_INPUT_CUSTOM_EXECUTOR);
for (const auto & matcher : matchers)
matcher->checkQueryExecutors(query_executors);
......@@ -178,19 +175,25 @@ std::pair<String, CustomExecutorPtr> CustomExecutors::getCustomExecutor(Context
if (custom_executor.second->match(context, request, params))
return custom_executor;
throw Exception("LOGICAL_ERROR not found custom executor.", ErrorCodes::LOGICAL_ERROR);
throw Exception("No query executors matched", ErrorCodes::UNKNOW_QUERY_EXECUTOR);
}
CustomExecutors::CustomExecutors(const Configuration & config, const Settings & settings, const String & config_prefix)
{
registerCustomMatcher("URL", [&](const auto & matcher_config, const auto & prefix)
{ return std::make_shared<HTTPURLCustomExecutorMatcher>(matcher_config, prefix); });
{ return std::make_shared<URLQueryMatcher>(matcher_config, prefix); });
registerCustomMatcher("method", [&](const auto & matcher_config, const auto & prefix)
{ return std::make_shared<HTTPMethodCustomExecutorMatcher>(matcher_config, prefix); });
{ return std::make_shared<MethodQueryMatcher>(matcher_config, prefix); });
registerCustomMatcher("always_matched", [&](const auto & /*matcher_config*/, const auto & /*prefix*/)
{ return std::make_shared<AlwaysQueryMatcher>(); });
registerQueryExecutor("query", [&](const auto & matcher_config, const auto & prefix)
{ return std::make_shared<ConstQueryCustomQueryExecutor>(matcher_config, prefix); });
{ return std::make_shared<QueryExecutorConst>(matcher_config, prefix); });
registerQueryExecutor("dynamic_query", [&](const auto & matcher_config, const auto & prefix)
{ return std::make_shared<QueryExecutorDynamic>(matcher_config, prefix); });
updateCustomExecutors(config, settings, config_prefix);
}
......
......@@ -7,14 +7,13 @@
#include <Core/Settings.h>
#include <Common/HTMLForm.h>
#include <Common/Exception.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Interpreters/Context.h>
#include <Interpreters/CustomHTTP/HTTPInputStreams.h>
#include <Interpreters/CustomHTTP/HTTPOutputStreams.h>
#include <Interpreters/CustomHTTP/CustomQueryExecutors.h>
#include <Interpreters/CustomHTTP/CustomExecutorMatchers.h>
#include <Interpreters/CustomHTTP/QueryExecutorAndMatcher.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
......@@ -33,10 +32,10 @@ public:
CustomExecutors(const CustomExecutors &) = delete;
CustomExecutors & operator=(const CustomExecutors &) = delete;
using QueryExecutorCreator = std::function<CustomQueryExecutorPtr(const Configuration &, const String &)>;
using QueryExecutorCreator = std::function<QueryExecutorPtr(const Configuration &, const String &)>;
void registerQueryExecutor(const String & query_executor_name, const QueryExecutorCreator & creator);
using CustomMatcherCreator = std::function<CustomExecutorMatcherPtr(const Configuration &, const String &)>;
using CustomMatcherCreator = std::function<QueryMatcherPtr(const Configuration &, const String &)>;
void registerCustomMatcher(const String & matcher_name, const CustomMatcherCreator & creator);
void updateCustomExecutors(const Configuration & config, const Settings & settings, const String & config_prefix);
......@@ -48,18 +47,18 @@ private:
std::unordered_map<String, QueryExecutorCreator> query_executor_creators;
std::unordered_map<String, CustomMatcherCreator> custom_matcher_creators;
CustomExecutorPtr createCustomExecutor(const Configuration & config, const String & config_prefix);
CustomExecutorPtr createCustomExecutor(const Configuration & config, const String & config_prefix, const String & name);
void checkCustomMatchersAndQueryExecutors(std::vector<CustomExecutorMatcherPtr> & matchers, std::vector<CustomQueryExecutorPtr> & query_executors);
void checkQueryMatchersAndExecutors(const String & name, std::vector<QueryMatcherPtr> & matchers, std::vector<QueryExecutorPtr> & query_executors);
};
class CustomExecutor
{
public:
bool isQueryParam(const String & param_name) const;
bool canBeParseRequestBody() const;
bool canBeParseRequestBody(HTTPRequest & request, HTMLForm & params) const;
bool isQueryParam(const String & param_name) const;
bool match(Context & context, HTTPRequest & request, HTMLForm & params) const;
......@@ -68,11 +67,11 @@ public:
HTMLForm & params, const HTTPInputStreams & input_streams, const HTTPOutputStreams & output_streams
);
CustomExecutor(const std::vector<CustomExecutorMatcherPtr> & matchers_, const std::vector<CustomQueryExecutorPtr> & query_executors_);
CustomExecutor(const std::vector<QueryMatcherPtr> & matchers_, const std::vector<QueryExecutorPtr> & query_executors_);
private:
std::vector<CustomExecutorMatcherPtr> matchers;
std::vector<CustomQueryExecutorPtr> query_executors;
std::vector<QueryMatcherPtr> matchers;
std::vector<QueryExecutorPtr> query_executors;
};
}
#pragma once
#include <Core/Types.h>
#include <Common/config.h>
#include <Common/HTMLForm.h>
#include <Interpreters/Context.h>
#include <Interpreters/CustomHTTP/CustomQueryExecutors.h>
#include <Poco/Net/HTTPServerRequest.h>
#if USE_RE2_ST
# include <re2_st/re2.h>
#else
# define re2_st re2
#endif
namespace DB
{
class CustomExecutorMatcher
{
public:
virtual ~CustomExecutorMatcher() = default;
virtual bool checkQueryExecutors(const std::vector<CustomQueryExecutorPtr> &check_executors) const = 0;
virtual bool match(Context & context, Poco::Net::HTTPServerRequest & request, HTMLForm & params) const = 0;
};
using CustomExecutorMatcherPtr = std::shared_ptr<CustomExecutorMatcher>;
class AlwaysMatchedCustomExecutorMatcher : public CustomExecutorMatcher
{
public:
bool checkQueryExecutors(const std::vector<CustomQueryExecutorPtr> & /*check_executors*/) const override { return true; }
bool match(Context & /*context*/, Poco::Net::HTTPServerRequest & /*request*/, HTMLForm & /*params*/) const override { return true; }
};
class HTTPMethodCustomExecutorMatcher : public CustomExecutorMatcher
{
public:
HTTPMethodCustomExecutorMatcher(const Poco::Util::AbstractConfiguration & configuration, const String & method_config_key)
{
match_method = Poco::toLower(configuration.getString(method_config_key));
}
bool checkQueryExecutors(const std::vector<CustomQueryExecutorPtr> & /*check_executors*/) const override { return true; }
bool match(Context & /*context*/, Poco::Net::HTTPServerRequest & request, HTMLForm & /*params*/) const override
{
return Poco::toLower(request.getMethod()) == match_method;
}
private:
String match_method;
};
class HTTPURLCustomExecutorMatcher : public CustomExecutorMatcher
{
public:
HTTPURLCustomExecutorMatcher(const Poco::Util::AbstractConfiguration & configuration, const String & url_config_key)
{
regex_matcher = std::make_unique<re2_st::RE2>(configuration.getString(url_config_key));
}
bool checkQueryExecutors(const std::vector<CustomQueryExecutorPtr> & custom_query_executors) const override
{
for (const auto & named_capturing_group : regex_matcher->NamedCapturingGroups())
if (!checkQueryExecutors(named_capturing_group.first, custom_query_executors))
throw Exception("The param name '" + named_capturing_group.first + "' is uselessed.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return true;
}
bool match(Context & context, Poco::Net::HTTPServerRequest & request, HTMLForm & /*params*/) const override
{
const String request_uri = request.getURI();
int num_captures = regex_matcher->NumberOfCapturingGroups() + 1;
re2_st::StringPiece matches[num_captures];
re2_st::StringPiece input(request_uri.data(), request_uri.size());
if (regex_matcher->Match(input, 0, request_uri.size(), re2_st::RE2::Anchor::UNANCHORED, matches, num_captures))
{
const auto & full_match = matches[0];
const char * url_end = request_uri.data() + request_uri.size();
const char * not_matched_begin = request_uri.data() + full_match.size();
if (not_matched_begin != url_end && *not_matched_begin == '/')
++not_matched_begin;
if (not_matched_begin == url_end || *not_matched_begin == '?')
{
for (const auto & named_capturing_group : regex_matcher->NamedCapturingGroups())
{
const auto & capturing_value = matches[named_capturing_group.second];
context.setQueryParameter(named_capturing_group.first, String(capturing_value.data(), capturing_value.size()));
}
return true;
}
}
return false;
}
private:
std::unique_ptr<re2_st::RE2> regex_matcher;
bool checkQueryExecutors(const String & param_name, const std::vector<CustomQueryExecutorPtr> & custom_query_executors) const
{
for (const auto & custom_query_executor : custom_query_executors)
if (custom_query_executor->isQueryParam(param_name))
return true;
return false;
}
};
}
#pragma once
#include <Core/Types.h>
#include <Common/HTMLForm.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/QueryParameterVisitor.h>
#include <Interpreters/CustomHTTP/HTTPInputStreams.h>
#include <Interpreters/CustomHTTP/HTTPOutputStreams.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ParserQuery.h>
#include <Parsers/ASTInsertQuery.h>
namespace DB
{
class CustomQueryExecutor
{
public:
virtual ~CustomQueryExecutor() = default;
virtual bool isQueryParam(const String &) const = 0;
virtual bool canBeParseRequestBody(Poco::Net::HTTPServerRequest &, HTMLForm &) const = 0;
virtual void executeQueryImpl(
Context & context, Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response,
HTMLForm & params, const HTTPInputStreams & input_streams, const HTTPOutputStreams & output_streams) const = 0;
};
using CustomQueryExecutorPtr = std::shared_ptr<CustomQueryExecutor>;
class ConstQueryCustomQueryExecutor : public CustomQueryExecutor
{
public:
ConstQueryCustomQueryExecutor(const Poco::Util::AbstractConfiguration & configuration, const String & config_key)
{
execute_query = configuration.getString(config_key, "");
const char * query_begin = execute_query.data();
const char * query_end = execute_query.data() + execute_query.size();
ParserQuery parser(query_end, false);
ASTPtr extract_query_ast = parseQuery(parser, query_begin, query_end, "", 0);
QueryParameterVisitor{query_params_name}.visit(extract_query_ast);
can_be_parse_request_body = !extract_query_ast->as<ASTInsertQuery>();
}
bool isQueryParam(const String & param_name) const override { return query_params_name.count(param_name); }
bool canBeParseRequestBody(Poco::Net::HTTPServerRequest & /*request*/, HTMLForm & /*form*/) const override { return can_be_parse_request_body; }
void executeQueryImpl(
Context & context, Poco::Net::HTTPServerRequest & /*request*/, Poco::Net::HTTPServerResponse & response,
HTMLForm & params, const HTTPInputStreams & /*input_streams*/, const HTTPOutputStreams & output_streams) const override
{
prepareQueryParams(context, params);
ReadBufferPtr execute_query_buf = std::make_shared<ReadBufferFromString>(execute_query);
executeQuery(
*execute_query_buf, *output_streams.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context,
[&response] (const String & content_type) { response.setContentType(content_type); },
[&response] (const String & current_query_id) { response.add("X-ClickHouse-Query-Id", current_query_id); }
);
}
private:
String execute_query;
NameSet query_params_name;
bool can_be_parse_request_body{false};
void prepareQueryParams(Context & context, HTMLForm & params) const
{
for (const auto & param : params)
if (isQueryParam(param.first))
context.setQueryParameter(param.first, param.second);
}
};
class ExtractQueryParamCustomQueryExecutor : public CustomQueryExecutor
{
public:
bool isQueryParam(const String & param_name) const override { return param_name == "query" || startsWith(param_name, "param_"); }
bool canBeParseRequestBody(Poco::Net::HTTPServerRequest & /*request*/, HTMLForm & /*form*/) const override { return false; }
void executeQueryImpl(
Context & context, Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response,
HTMLForm & params, const HTTPInputStreams & input_streams, const HTTPOutputStreams & output_streams) const override
{
const auto & execute_query = prepareQuery(context, params);
ReadBufferPtr execute_query_buf = std::make_shared<ReadBufferFromString>(execute_query);
ReadBufferPtr temp_query_buf;
if (!startsWith(request.getContentType().data(), "multipart/form-data"))
{
temp_query_buf = execute_query_buf; /// we create a temporary reference for not to be destroyed
execute_query_buf = std::make_unique<ConcatReadBuffer>(*temp_query_buf, *input_streams.in_maybe_internal_compressed);
}
executeQuery(
*execute_query_buf, *output_streams.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context,
[&response] (const String & content_type) { response.setContentType(content_type); },
[&response] (const String & current_query_id) { response.add("X-ClickHouse-Query-Id", current_query_id); }
);
}
private:
String prepareQuery(Context & context, HTMLForm & params) const
{
const static size_t prefix_size = strlen("param_");
std::stringstream query_stream;
for (const auto & param : params)
{
if (param.first == "query")
query_stream << param.second;
else if (startsWith(param.first, "param_"))
context.setQueryParameter(param.first.substr(prefix_size), param.second);
}
query_stream << "\n";
return query_stream.str();
}
};
}
#include <Interpreters/CustomHTTP/DynamicQueryExecutor.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <Interpreters/executeQuery.h>
namespace DB
{
bool QueryExecutorDynamic::isQueryParam(const String & param_name) const
{
return param_name == dynamic_param_name || startsWith(param_name, "param_");
}
QueryExecutorDynamic::QueryExecutorDynamic(const Configuration & configuration, const String & config_key)
{
dynamic_param_name = configuration.getString(config_key + "." + "param_name", "query");
}
void QueryExecutorDynamic::executeQueryImpl(
Context & context, Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response,
HTMLForm & params, const HTTPInputStreams & input_streams, const HTTPOutputStreams & output_streams) const
{
ReadBufferPtr temp_query_buf;
const auto & execute_query = prepareQuery(context, params);
ReadBufferPtr execute_query_buf = std::make_shared<ReadBufferFromString>(execute_query);
if (!startsWith(request.getContentType().data(), "multipart/form-data"))
{
temp_query_buf = execute_query_buf; /// we create a temporary reference for not to be destroyed
execute_query_buf = std::make_unique<ConcatReadBuffer>(*temp_query_buf, *input_streams.in_maybe_internal_compressed);
}
executeQuery(
*execute_query_buf, *output_streams.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false,
context, [&response] (const String & content_type) { response.setContentType(content_type); },
[&response] (const String & current_query_id) { response.add("X-ClickHouse-Query-Id", current_query_id); }
);
}
String QueryExecutorDynamic::prepareQuery(Context & context, HTMLForm & params) const
{
const static size_t prefix_size = strlen("param_");
WriteBufferFromOwnString query_buffer;
for (const auto & param : params)
{
if (param.first == dynamic_param_name)
writeString(param.second, query_buffer);
else if (startsWith(param.first, "param_"))
context.setQueryParameter(param.first.substr(prefix_size), param.second);
}
if (query_buffer.offset())
writeString("\n", query_buffer);
return query_buffer.str();
}
}
#pragma once
#include <Core/Types.h>
#include <Interpreters/Context.h>
#include <Interpreters/CustomHTTP/HTTPInputStreams.h>
#include <Interpreters/CustomHTTP/HTTPOutputStreams.h>
#include <Interpreters/CustomHTTP/QueryExecutorAndMatcher.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
namespace DB
{
class QueryExecutorDynamic : public QueryExecutor
{
public:
using HTTPServerRequest = Poco::Net::HTTPServerRequest;
using HTTPServerResponse = Poco::Net::HTTPServerResponse;
using Configuration = Poco::Util::AbstractConfiguration;
bool canBeParseRequestBody() const override { return false; }
bool isQueryParam(const String & param_name) const override;
QueryExecutorDynamic(const Configuration & configuration, const String & config_key);
void executeQueryImpl(
Context & context, HTTPServerRequest & request, HTTPServerResponse & response,
HTMLForm & params, const HTTPInputStreams & input_streams, const HTTPOutputStreams & output_streams) const override;
private:
String dynamic_param_name{"query"};
String prepareQuery(Context & context, HTMLForm & params) const;
};
}
......@@ -9,44 +9,81 @@
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_HTTP_PARAM;
}
class ExtractorContextChange
{
public:
ExtractorContextChange(Context & context_, const CustomExecutorPtr & executor_) : context(context_), executor(executor_) {}
void extract(Poco::Net::HTTPServerRequest & request, HTMLForm & params)
static const NameSet & getReservedParamNames()
{
Names reserved_param_suffixes;
static const NameSet reserved_param_names{
"compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace", "buffer_size", "wait_end_of_query",
"session_id", "session_timeout", "session_check"};
"compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace",
"buffer_size", "wait_end_of_query", "session_id", "session_timeout", "session_check"
};
return reserved_param_names;
}
static std::function<bool(const String &)> reservedParamSuffixesFilter(bool reserved)
{
if (!reserved)
return [&](const String &) { return false; };
auto param_could_be_skipped = [&] (const String & name)
/// Skip unneeded parameters to avoid confusing them later with context settings or query parameters.
/// It is a bug and ambiguity with `date_time_input_format` and `low_cardinality_allow_in_native_format` formats/settings.
return [&](const String & param_name)
{
if (reserved_param_names.count(name))
if (endsWith(param_name, "_format"))
return true;
else if (endsWith(param_name, "_types"))
return true;
else if (endsWith(param_name, "_structure"))
return true;
for (const String & suffix : reserved_param_suffixes)
{
if (endsWith(name, suffix))
return true;
}
return false;
};
}
void extract(Poco::Net::HTTPServerRequest & request, HTMLForm & params)
{
bool is_multipart_data = startsWith(request.getContentType().data(), "multipart/form-data");
/// Settings can be overridden in the query.
/// Some parameters (database, default_format, everything used in the code above) do not
/// belong to the Settings class.
becomeReadonlyIfNeed(request);
changeSettingsFromParams(params, reservedParamSuffixesFilter(is_multipart_data));
if (is_multipart_data || executor->canBeParseRequestBody())
{
ExternalTablesHandler handler(context, params);
params.load(request, request.stream(), handler);
/// We use the `Post Request Body Settings` to override the `Qeruy String Param settings`
if (executor->canBeParseRequestBody())
changeSettingsFromParams(params, reservedParamSuffixesFilter(is_multipart_data));
}
}
private:
Context & context;
const CustomExecutorPtr & executor;
/// 'readonly' setting values mean:
/// readonly = 0 - any query is allowed, client can change any setting.
/// readonly = 1 - only readonly queries are allowed, client can't change settings.
/// readonly = 2 - only readonly queries are allowed, client can change any setting except 'readonly'.
/// 'readonly' setting values mean:
/// readonly = 0 - any query is allowed, client can change any setting.
/// readonly = 1 - only readonly queries are allowed, client can't change settings.
/// readonly = 2 - only readonly queries are allowed, client can change any setting except 'readonly'.
/// In theory if initially readonly = 0, the client can change any setting and then set readonly
/// to some other value.
/// Only readonly queries are allowed for HTTP GET requests.
/// In theory if initially readonly = 0, the client can change any setting and then set readonly
/// to some other value.
/// Only readonly queries are allowed for HTTP GET requests.
void becomeReadonlyIfNeed(HTTPRequest & request)
{
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET)
{
Settings & settings = context.getSettingsRef();
......@@ -54,45 +91,33 @@ public:
if (settings.readonly == 0)
settings.readonly = 2;
}
}
bool has_multipart = startsWith(request.getContentType().data(), "multipart/form-data");
if (has_multipart || executor->canBeParseRequestBody(request, params))
{
ExternalTablesHandler handler(context, params);
params.load(request, request.stream(), handler);
if (has_multipart)
{
/// Skip unneeded parameters to avoid confusing them later with context settings or query parameters.
reserved_param_suffixes.reserve(3);
/// It is a bug and ambiguity with `date_time_input_format` and `low_cardinality_allow_in_native_format` formats/settings.
reserved_param_suffixes.emplace_back("_format");
reserved_param_suffixes.emplace_back("_types");
reserved_param_suffixes.emplace_back("_structure");
}
}
void changeSettingsFromParams(HTMLForm & params, const std::function<bool(const String &)> & reserved_param_suffixes)
{
SettingsChanges settings_changes;
for (const auto & [key, value] : params)
const auto & reserved_param_names = getReservedParamNames();
for (const auto & [name, value] : params)
{
if (key == "database")
if (name == "database")
context.setCurrentDatabase(value);
else if (key == "default_format")
else if (name == "default_format")
context.setDefaultFormat(value);
else if (!param_could_be_skipped(key) && !executor->isQueryParam(key))
settings_changes.push_back({key, value}); /// All other query parameters are treated as settings.
else if (!reserved_param_names.count(name) && !reserved_param_suffixes(name))
{
if (Settings::findIndex(name) != Settings::npos)
settings_changes.push_back({name, value});
else if (!executor->isQueryParam(name))
throw Exception("Unknown HTTP param name: '" + name + "'", ErrorCodes::UNKNOWN_HTTP_PARAM);
}
}
/// For external data we also want settings
context.checkSettingsConstraints(settings_changes);
context.applySettingsChanges(settings_changes);
}
private:
Context & context;
const CustomExecutorPtr & executor;
};
}
#include <Common/config.h>
#include <Interpreters/CustomHTTP/HTTPInputStreams.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <IO/BrotliReadBuffer.h>
#include <IO/ReadBufferFromIStream.h>
#include <IO/ZlibInflatingReadBuffer.h>
#include <Compression/CompressedReadBuffer.h>
......
......@@ -52,16 +52,15 @@ namespace
}
}
HTTPOutputStreams::HTTPOutputStreams(HTTPServerRequest & request, HTTPServerResponse & response, bool internal_compress, size_t keep_alive_timeout)
: out(createResponseOut(request, response, keep_alive_timeout))
HTTPOutputStreams::HTTPOutputStreams(HTTPResponseBufferPtr & raw_out, bool internal_compress)
: out(raw_out)
, out_maybe_compressed(createMaybeCompressionOut(internal_compress, out))
, out_maybe_delayed_and_compressed(out_maybe_compressed)
{
}
HTTPOutputStreams::HTTPOutputStreams(
Context & context, HTTPServerRequest & request, HTTPServerResponse & response, HTMLForm & form, size_t keep_alive_timeout)
: out(createResponseOut(request, response, keep_alive_timeout))
HTTPOutputStreams::HTTPOutputStreams(HTTPResponseBufferPtr & raw_out, Context & context, HTTPServerRequest & request, HTMLForm & form)
: out(raw_out)
, out_maybe_compressed(createMaybeCompressionOut(form.getParsed<bool>("compress", false), out))
, out_maybe_delayed_and_compressed(createMaybeDelayedAndCompressionOut(context, form, out_maybe_compressed))
{
......@@ -90,36 +89,10 @@ HTTPOutputStreams::HTTPOutputStreams(
}
}
HTTPResponseBufferPtr HTTPOutputStreams::createResponseOut(HTTPServerRequest & request, HTTPServerResponse & response, size_t keep_alive)
{
/// The client can pass a HTTP header indicating supported compression method (gzip or deflate).
String http_response_compression_methods = request.get("Accept-Encoding", "");
if (!http_response_compression_methods.empty())
{
/// Both gzip and deflate are supported. If the client supports both, gzip is preferred.
/// NOTE parsing of the list of methods is slightly incorrect.
if (std::string::npos != http_response_compression_methods.find("gzip"))
return std::make_shared<WriteBufferFromHTTPServerResponse>(
request, response, keep_alive, true, CompressionMethod::Gzip, DBMS_DEFAULT_BUFFER_SIZE, response.sent());
else if (std::string::npos != http_response_compression_methods.find("deflate"))
return std::make_shared<WriteBufferFromHTTPServerResponse>(
request, response, keep_alive, true, CompressionMethod::Zlib, DBMS_DEFAULT_BUFFER_SIZE, response.sent());
#if USE_BROTLI
else if (http_response_compression_methods == "br")
return std::make_shared<WriteBufferFromHTTPServerResponse>(
request, response, keep_alive, true, CompressionMethod::Brotli, DBMS_DEFAULT_BUFFER_SIZE, response.sent());
#endif
}
return std::make_shared<WriteBufferFromHTTPServerResponse>(request, response, keep_alive, false, CompressionMethod{}, DBMS_DEFAULT_BUFFER_SIZE, response.sent());
}
WriteBufferPtr HTTPOutputStreams::createMaybeCompressionOut(bool compression, HTTPResponseBufferPtr & out_)
{
/// Client can pass a 'compress' flag in the query string. In this case the query result is
/// compressed using internal algorithm. This is not reflected in HTTP headers.
// bool internal_compression = form.getParsed<bool>("compress", false);
return compression ? std::make_shared<CompressedWriteBuffer>(*out_) : WriteBufferPtr(out_);
}
......@@ -178,13 +151,18 @@ WriteBufferPtr HTTPOutputStreams::createMaybeDelayedAndCompressionOut(Context &
HTTPOutputStreams::~HTTPOutputStreams()
{
/// Destroy CascadeBuffer to actualize buffers' positions and reset extra references
/// This could be a broken HTTP Request
/// Because it does not call finalize or writes some data to output stream after call finalize
/// In this case we need to clean up its broken state to ensure that they are not sent to the client
/// For delayed stream, we destory CascadeBuffer and without sending any data to client.
if (out_maybe_delayed_and_compressed != out_maybe_compressed)
out_maybe_delayed_and_compressed.reset();
/// If buffer has data, and that data wasn't sent yet, then no need to send that data
if (out->count() == out->offset())
{
/// If buffer has data and server never sends data to client
/// no need to send that data
out_maybe_compressed->position() = out_maybe_compressed->buffer().begin();
out->position() = out->buffer().begin();
}
......
......@@ -34,15 +34,13 @@ struct HTTPOutputStreams
void finalize() const;
WriteBufferPtr createMaybeDelayedAndCompressionOut(Context &context, HTMLForm &form, WriteBufferPtr &out_);
WriteBufferPtr createMaybeDelayedAndCompressionOut(Context & context, HTMLForm & form, WriteBufferPtr & out_);
WriteBufferPtr createMaybeCompressionOut(bool compression, std::shared_ptr<WriteBufferFromHTTPServerResponse> & out_);
HTTPResponseBufferPtr createResponseOut(HTTPServerRequest & request, HTTPServerResponse & response, size_t keep_alive);
HTTPOutputStreams(HTTPResponseBufferPtr & raw_out, bool internal_compress);
HTTPOutputStreams(HTTPServerRequest & request, HTTPServerResponse & response, bool internal_compress, size_t keep_alive_timeout);
HTTPOutputStreams(Context & context, HTTPServerRequest & request, HTTPServerResponse & response, HTMLForm & form, size_t keep_alive_timeout);
HTTPOutputStreams(HTTPResponseBufferPtr & raw_out, Context & context, HTTPServerRequest & request, HTMLForm & form);
};
using HTTPOutputStreamsPtr = std::unique_ptr<HTTPOutputStreams>;
......
#pragma once
#include <Core/Types.h>
#include "QueryExecutorAndMatcher.h"
namespace DB
{
class MethodQueryMatcher : public QueryMatcher
{
public:
using Config = Poco::Util::AbstractConfiguration;
MethodQueryMatcher(const Config & configuration, const String & config_key)
: method(Poco::toLower(configuration.getString(config_key)))
{
}
bool match(Context & /*context*/, Poco::Net::HTTPServerRequest & request, HTMLForm & /*params*/) const override
{
return Poco::toLower(request.getMethod()) == method;
}
bool checkQueryExecutors(const std::vector<QueryExecutorPtr> & /*check_executors*/) const override { return true; }
private:
String method;
};
}
#pragma once
#include <Core/Types.h>
#include <Common/config.h>
#include <Common/HTMLForm.h>
#include <Interpreters/Context.h>
#include <Interpreters/CustomHTTP/HTTPInputStreams.h>
#include <Interpreters/CustomHTTP/HTTPOutputStreams.h>
#include <Poco/Net/HTTPServerRequest.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_COMPILE_REGEXP;
}
class QueryExecutor
{
public:
virtual ~QueryExecutor() = default;
virtual bool isQueryParam(const String &) const = 0;
virtual bool canBeParseRequestBody() const = 0;
virtual void executeQueryImpl(
Context & context, Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response,
HTMLForm & params, const HTTPInputStreams & input_streams, const HTTPOutputStreams & output_streams) const = 0;
};
using QueryExecutorPtr = std::shared_ptr<QueryExecutor>;
class QueryMatcher
{
public:
virtual ~QueryMatcher() = default;
virtual bool checkQueryExecutors(const std::vector<QueryExecutorPtr> &check_executors) const = 0;
virtual bool match(Context & context, Poco::Net::HTTPServerRequest & request, HTMLForm & params) const = 0;
};
using QueryMatcherPtr = std::shared_ptr<QueryMatcher>;
}
#include <Interpreters/CustomHTTP/URLQueryMatcher.h>
#include "URLQueryMatcher.h"
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_COMPILE_REGEXP;
extern const int UNDEFINED_CUSTOM_EXECUTOR_PARAM;
}
URLQueryMatcher::URLQueryMatcher(const Poco::Util::AbstractConfiguration & configuration, const String & config_key)
{
const auto & regex_str = configuration.getString(config_key);
regex_matcher = std::make_unique<re2_st::RE2>(regex_str);
if (!regex_matcher->ok())
throw Exception("cannot compile re2: " + regex_str + ", error: " + regex_matcher->error() +
". Look at https://github.com/google/re2/wiki/Syntax for reference.", ErrorCodes::CANNOT_COMPILE_REGEXP);
}
bool checkQueryOneQueryParam(const String & param_name, const std::vector<QueryExecutorPtr> & custom_query_executors)
{
for (const auto & custom_query_executor : custom_query_executors)
if (custom_query_executor->isQueryParam(param_name))
return true;
return false;
}
bool URLQueryMatcher::checkQueryExecutors(const std::vector<QueryExecutorPtr> & custom_query_executors) const
{
for (const auto & named_capturing_group : regex_matcher->NamedCapturingGroups())
if (!checkQueryOneQueryParam(named_capturing_group.first, custom_query_executors))
throw Exception("The param name '" + named_capturing_group.first + "' is not defined in the QueryExecutor.",
ErrorCodes::UNDEFINED_CUSTOM_EXECUTOR_PARAM);
return true;
}
bool URLQueryMatcher::match(Context & context, Poco::Net::HTTPServerRequest &request, HTMLForm &) const
{
const String request_uri = request.getURI();
int num_captures = regex_matcher->NumberOfCapturingGroups() + 1;
re2_st::StringPiece matches[num_captures];
re2_st::StringPiece input(request_uri.data(), request_uri.size());
if (regex_matcher->Match(input, 0, request_uri.size(), re2_st::RE2::Anchor::UNANCHORED, matches, num_captures))
{
const auto & full_match = matches[0];
const char * url_end = request_uri.data() + request_uri.size();
const char * not_matched_begin = request_uri.data() + full_match.size();
if (not_matched_begin != url_end && *not_matched_begin == '/')
++not_matched_begin;
if (not_matched_begin == url_end || *not_matched_begin == '?')
{
for (const auto & named_capturing_group : regex_matcher->NamedCapturingGroups())
{
const auto & capturing_value = matches[named_capturing_group.second];
context.setQueryParameter(named_capturing_group.first, String(capturing_value.data(), capturing_value.size()));
}
return true;
}
}
return false;
}
}
#pragma once
#include <Core/Types.h>
#include <Interpreters/CustomHTTP/QueryExecutorAndMatcher.h>
#include <re2/re2.h>
#include <re2/stringpiece.h>
#if USE_RE2_ST
# include <re2_st/re2.h>
#else
# define re2_st re2
#endif
namespace DB
{
class URLQueryMatcher : public QueryMatcher
{
public:
URLQueryMatcher(const Poco::Util::AbstractConfiguration & configuration, const String & config_key);
bool match(Context & context, Poco::Net::HTTPServerRequest & request, HTMLForm &) const override;
bool checkQueryExecutors(const std::vector<QueryExecutorPtr> & custom_query_executors) const override;
private:
std::unique_ptr<re2_st::RE2> regex_matcher;
};
}
......@@ -41,13 +41,13 @@
</test_for_url_match>
<test_for_method_match>
<URL>PUT</URL>
<method>PUT</method>
<query>SELECT 'Matched test_for_method_match'</query>
</test_for_method_match>
<test_for_multiple_match>
<URL>/test_for_multiple_match</URL>
<method>GET</method>
<URL>/test_for_multiple_match</URL>
<query>SELECT 'Matched test_for_multiple_match'</query>
</test_for_multiple_match>
</custom_http>
......
......@@ -90,6 +90,7 @@ namespace ErrorCodes
extern const int INVALID_SESSION_TIMEOUT;
extern const int HTTP_LENGTH_REQUIRED;
extern const int UNKNOW_QUERY_EXECUTOR;
}
......@@ -117,7 +118,8 @@ static Poco::Net::HTTPResponse::HTTPStatus exceptionCodeToHTTPStatus(int excepti
exception_code == ErrorCodes::INCORRECT_DATA ||
exception_code == ErrorCodes::TYPE_MISMATCH)
return HTTPResponse::HTTP_BAD_REQUEST;
else if (exception_code == ErrorCodes::UNKNOWN_TABLE ||
else if (exception_code == ErrorCodes::UNKNOW_QUERY_EXECUTOR ||
exception_code == ErrorCodes::UNKNOWN_TABLE ||
exception_code == ErrorCodes::UNKNOWN_FUNCTION ||
exception_code == ErrorCodes::UNKNOWN_IDENTIFIER ||
exception_code == ErrorCodes::UNKNOWN_TYPE ||
......@@ -184,18 +186,22 @@ HTTPHandler::SessionContextHolder::~SessionContextHolder()
}
HTTPHandler::SessionContextHolder::SessionContextHolder(IServer & accepted_server, HTMLForm & params)
HTTPHandler::SessionContextHolder::SessionContextHolder(Context & query_context_, HTTPRequest & request, HTMLForm & params)
: query_context(query_context_)
{
session_id = params.get("session_id", "");
context = std::make_unique<Context>(accepted_server.context());
authentication(request, params);
if (!session_id.empty())
{
session_timeout = parseSessionTimeout(accepted_server.config(), params);
session_context = context->acquireSession(session_id, session_timeout, params.check<String>("session_check", "1"));
session_id = params.get("session_id", "");
context = std::make_unique<Context>(*session_context);
context->setSessionContext(*session_context);
if (!session_id.empty())
{
session_timeout = parseSessionTimeout(query_context.getConfigRef(), params);
session_context = query_context.acquireSession(session_id, session_timeout, params.check<String>("session_check", "1"));
query_context = *session_context;
query_context.setSessionContext(*session_context);
}
}
}
......@@ -237,11 +243,11 @@ void HTTPHandler::SessionContextHolder::authentication(HTTPServerRequest & reque
}
std::string query_id = params.get("query_id", "");
context->setUser(user, password, request.clientAddress(), quota_key);
context->setCurrentQueryId(query_id);
query_context.setUser(user, password, request.clientAddress(), quota_key);
query_context.setCurrentQueryId(query_id);
}
void HTTPHandler::processQuery(Context & context, HTTPRequest & request, HTMLForm & params, HTTPResponse & response)
void HTTPHandler::processQuery(Context & context, HTTPRequest & request, HTMLForm & params, HTTPResponse & response, HTTPResponseBufferPtr & response_out)
{
const auto & name_with_custom_executor = context.getCustomExecutor(request, params);
LOG_TRACE(log, "Using '" << name_with_custom_executor.first << "' CustomExecutor to execute URI: " << request.getURI());
......@@ -250,12 +256,13 @@ void HTTPHandler::processQuery(Context & context, HTTPRequest & request, HTMLFor
ExtractorContextChange{context, name_with_custom_executor.second}.extract(request, params);
HTTPInputStreams input_streams{context, request, params};
HTTPOutputStreams output_streams = HTTPOutputStreams(context, request, response, params, getKeepAliveTimeout());
HTTPOutputStreams output_streams = HTTPOutputStreams(response_out, context, request, params);
name_with_custom_executor.second->executeQuery(context, request, response, params, input_streams, output_streams);
}
void HTTPHandler::trySendExceptionToClient(
const std::string & message, int exception_code, Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response, bool compression)
const std::string & message, int exception_code, HTTPRequest & request,
HTTPResponse & response, HTTPResponseBufferPtr response_out, bool compression)
{
try
{
......@@ -265,22 +272,25 @@ void HTTPHandler::trySendExceptionToClient(
/// to avoid reading part of the current request body in the next request.
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST && response.getKeepAlive()
&& !request.stream().eof() && exception_code != ErrorCodes::HTTP_LENGTH_REQUIRED)
{
request.stream().ignore(std::numeric_limits<std::streamsize>::max());
}
if (exception_code == ErrorCodes::UNKNOWN_USER || exception_code == ErrorCodes::WRONG_PASSWORD
|| exception_code == ErrorCodes::REQUIRED_PASSWORD || exception_code == ErrorCodes::HTTP_LENGTH_REQUIRED)
if (exception_code == ErrorCodes::UNKNOWN_USER || exception_code == ErrorCodes::WRONG_PASSWORD ||
exception_code == ErrorCodes::REQUIRED_PASSWORD)
{
if (exception_code == ErrorCodes::HTTP_LENGTH_REQUIRED)
response.setStatusAndReason(exceptionCodeToHTTPStatus(exception_code));
else
response.requireAuthentication("ClickHouse server HTTP API");
response.send() << message << std::endl;
response.requireAuthentication("ClickHouse server HTTP API");
}
else
{
response.setStatusAndReason(exceptionCodeToHTTPStatus(exception_code));
HTTPOutputStreams output_streams(request, response, compression, getKeepAliveTimeout());
}
if (!response_out && !response.sent())
response.send() << message << std::endl;
else
{
HTTPOutputStreams output_streams(response_out, compression);
writeString(message, *output_streams.out_maybe_compressed);
writeChar('\n', *output_streams.out_maybe_compressed);
......@@ -300,10 +310,12 @@ void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne
ThreadStatus thread_status;
/// In case of exception, send stack trace to client.
HTTPResponseBufferPtr response_out;
bool with_stacktrace = false, internal_compression = false;
try
{
response_out = createResponseOut(request, response);
response.set("Content-Type", "text/plain; charset=UTF-8");
response.set("X-ClickHouse-Server-Display-Name", server_display_name);
......@@ -316,15 +328,22 @@ void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne
internal_compression = params.getParsed<bool>("compress", false);
/// Workaround. Poco does not detect 411 Length Required case.
<<<<<<< HEAD:programs/server/HTTPHandler.cpp
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST && !request.getChunkedTransferEncoding() && !request.hasContentLength())
throw Exception("The Transfer-Encoding is not chunked and there is no Content-Length header for POST request", ErrorCodes::HTTP_LENGTH_REQUIRED);
=======
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST
&& !request.getChunkedTransferEncoding()
&& !request.hasContentLength())
throw Exception("There is neither Transfer-Encoding header nor Content-Length header", ErrorCodes::HTTP_LENGTH_REQUIRED);
>>>>>>> ISSUES-5436 fix build failure & fix test failure:dbms/programs/server/HTTPHandler.cpp
{
SessionContextHolder holder{server, params};
CurrentThread::QueryScope query_scope(*holder.context);
Context query_context = server.context();
CurrentThread::QueryScope query_scope(query_context);
holder.authentication(request, params);
processQuery(*holder.context, request, params, response);
SessionContextHolder holder{query_context, request, params};
processQuery(holder.query_context, request, params, response, response_out);
LOG_INFO(log, "Done processing query");
}
}
......@@ -337,8 +356,34 @@ void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne
*/
int exception_code = getCurrentExceptionCode();
std::string exception_message = getCurrentExceptionMessage(with_stacktrace, true);
trySendExceptionToClient(exception_message, exception_code, request, response, internal_compression);
trySendExceptionToClient(exception_message, exception_code, request, response, response_out, internal_compression);
}
}
HTTPResponseBufferPtr HTTPHandler::createResponseOut(HTTPServerRequest & request, HTTPServerResponse & response)
{
size_t keep_alive = server.config().getUInt("keep_alive_timeout", 10);
/// The client can pass a HTTP header indicating supported compression method (gzip or deflate).
String http_response_compression_methods = request.get("Accept-Encoding", "");
if (!http_response_compression_methods.empty())
{
/// Both gzip and deflate are supported. If the client supports both, gzip is preferred.
/// NOTE parsing of the list of methods is slightly incorrect.
if (std::string::npos != http_response_compression_methods.find("gzip"))
return std::make_shared<WriteBufferFromHTTPServerResponse>(
request, response, keep_alive, true, CompressionMethod::Gzip, DBMS_DEFAULT_BUFFER_SIZE);
else if (std::string::npos != http_response_compression_methods.find("deflate"))
return std::make_shared<WriteBufferFromHTTPServerResponse>(
request, response, keep_alive, true, CompressionMethod::Zlib, DBMS_DEFAULT_BUFFER_SIZE);
#if USE_BROTLI
else if (http_response_compression_methods == "br")
return std::make_shared<WriteBufferFromHTTPServerResponse>(
request, response, keep_alive, true, CompressionMethod::Brotli, DBMS_DEFAULT_BUFFER_SIZE);
#endif
}
return std::make_shared<WriteBufferFromHTTPServerResponse>(request, response, keep_alive, false, CompressionMethod{}, DBMS_DEFAULT_BUFFER_SIZE);
}
}
......@@ -38,12 +38,12 @@ private:
{
~SessionContextHolder();
SessionContextHolder(IServer & accepted_server, HTMLForm & params);
void authentication(HTTPServerRequest & request, HTMLForm & params);
SessionContextHolder(Context & query_context_, HTTPRequest & request, HTMLForm & params);
String session_id;
std::unique_ptr<Context> context = nullptr;
Context & query_context;
std::shared_ptr<Context> session_context = nullptr;
std::chrono::steady_clock::duration session_timeout;
};
......@@ -58,9 +58,13 @@ private:
size_t getKeepAliveTimeout() { return server.config().getUInt("keep_alive_timeout", 10); }
void processQuery(Context & context, HTTPRequest & request, HTMLForm & params, HTTPResponse & response);
HTTPResponseBufferPtr createResponseOut(HTTPServerRequest & request, HTTPServerResponse & response);
void processQuery(Context & context, HTTPRequest & request, HTMLForm & params, HTTPResponse & response, HTTPResponseBufferPtr & response_out);
void trySendExceptionToClient(const std::string & message, int exception_code, HTTPRequest & request, HTTPResponse & response, bool compression);
void trySendExceptionToClient(
const std::string & message, int exception_code, HTTPRequest & request,
HTTPResponse & response, HTTPResponseBufferPtr response_out, bool compression);
};
......
......@@ -525,11 +525,18 @@
</query_masking_rules>
-->
<custom_http>
<default>
<dynamic_query />
<always_matched />
</default>
</custom_http>
<!-- Uncomment to use custom http.
<custom_http>
<test_select_single>
<url><![cdata[/test/(?p<test_url_value>\w+)]]></url>
<query>select {test_url_value:uint64}, {test_query_param_value:string}, {test_query_post_value:string}</query>
<method>GET</method>
<URL><![CDATA[/test/(?P<test_url_value>\w+)]]></URL>
<query><![CDATA[SELECT {test_url_value:UInt64}, {test_query_param_value:String}, {test_query_post_value:String}]]></query>
</test_select_single>
</custom_http>
-->
......
......@@ -31,6 +31,11 @@ void WriteBufferFromHTTPServerResponse::startSendHeaders()
response.set("Access-Control-Allow-Origin", "*");
setResponseDefaultHeaders(response, keep_alive_timeout);
#if defined(POCO_CLICKHOUSE_PATCH)
if (request.getMethod() != Poco::Net::HTTPRequest::HTTP_HEAD)
std::tie(response_header_ostr, response_body_ostr) = response.beginSend();
#endif
}
}
......@@ -91,12 +96,19 @@ void WriteBufferFromHTTPServerResponse::finishSendHeaders()
}
void WriteBufferFromHTTPServerResponse::choiceSendEncode()
void WriteBufferFromHTTPServerResponse::nextImpl()
{
if (!out && request.getMethod() != Poco::Net::HTTPRequest::HTTP_HEAD)
{
if (!compress)
std::lock_guard lock(mutex);
startSendHeaders();
if (!out && request.getMethod() != Poco::Net::HTTPRequest::HTTP_HEAD)
{
if (compress)
{
if (compression_method == CompressionMethod::Gzip)
{
#if defined(POCO_CLICKHOUSE_PATCH)
*response_header_ostr << "Content-Encoding: gzip\r\n";
#else
......@@ -112,8 +124,8 @@ void WriteBufferFromHTTPServerResponse::choiceSendEncode()
#if defined(POCO_CLICKHOUSE_PATCH)
*response_header_ostr << "Content-Encoding: deflate\r\n";
#else
/// Newline autosent by response.send()
response_body_ostr = &(response.send());
response.set("Content-Encoding", "gzip");
response_body_ostr = &(response.send());
#endif
out_raw = std::make_unique<WriteBufferFromOStream>(*response_body_ostr);
deflating_buf.emplace(std::move(out_raw), compression_method, compression_level, working_buffer.size(), working_buffer.begin());
......@@ -123,9 +135,7 @@ void WriteBufferFromHTTPServerResponse::choiceSendEncode()
else if (compression_method == CompressionMethod::Brotli)
{
#if defined(POCO_CLICKHOUSE_PATCH)
std::tie(response_header_ostr, response_body_ostr) = response.beginSend();
if (headers_started_sending && !headers_finished_sending)
*response_header_ostr << "Content-Encoding: " << encoding_type << "\r\n";
*response_header_ostr << "Content-Encoding: deflate\r\n";
#else
response.set("Content-Encoding", content_encoding_name);
#endif
......@@ -134,37 +144,23 @@ void WriteBufferFromHTTPServerResponse::choiceSendEncode()
/// This may result in an extra empty line in the response body
response_body_ostr = &(response.send());
#endif
};
out_raw = std::make_unique<WriteBufferFromOStream>(*response_body_ostr, working_buffer.size(), working_buffer.begin());
out = &*out_raw;
}
#if USE_BROTLI
else if (compression_method == CompressionMethod::Brotli)
else
{
set_encoding_type("br");
out_raw.emplace(*response_body_ostr);
brotli_buf.emplace(*out_raw, compression_level, working_buffer.size(), working_buffer.begin());
out = &*brotli_buf;
}
#if !defined(POCO_CLICKHOUSE_PATCH)
response_body_ostr = &(response.send());
#endif
else
throw Exception("Logical error: unknown compression method passed to WriteBufferFromHTTPServerResponse",
ErrorCodes::LOGICAL_ERROR);
/// Use memory allocated for the outer buffer in the buffer pointed to by out. This avoids extra allocation and copy.
}
}
}
void WriteBufferFromHTTPServerResponse::nextImpl()
{
{
std::lock_guard lock(mutex);
out_raw.emplace(*response_body_ostr, working_buffer.size(), working_buffer.begin());
out = &*out_raw;
}
}
startSendHeaders();
choiceSendEncode();
finishSendHeaders();
}
if (out)
......@@ -182,16 +178,13 @@ WriteBufferFromHTTPServerResponse::WriteBufferFromHTTPServerResponse(
unsigned keep_alive_timeout_,
bool compress_,
CompressionMethod compression_method_,
size_t size,
bool finish_send_headers_)
size_t size)
: BufferWithOwnMemory<WriteBuffer>(size)
, request(request_)
, response(response_)
, keep_alive_timeout(keep_alive_timeout_)
, compress(compress_)
, compression_method(compression_method_)
, headers_started_sending(finish_send_headers_)
, headers_finished_sending(finish_send_headers_)
{
}
......@@ -235,6 +228,7 @@ void WriteBufferFromHTTPServerResponse::finalize()
}
}
WriteBufferFromHTTPServerResponse::~WriteBufferFromHTTPServerResponse()
{
try
......
......@@ -87,8 +87,6 @@ private:
/// This method finish headers with \r\n, allowing to start to send body.
void finishSendHeaders();
void choiceSendEncode();
void nextImpl() override;
public:
......@@ -98,8 +96,7 @@ public:
unsigned keep_alive_timeout_,
bool compress_ = false, /// If true - set Content-Encoding header and compress the result.
CompressionMethod compression_method_ = CompressionMethod::Gzip,
size_t size = DBMS_DEFAULT_BUFFER_SIZE,
bool finish_send_headers_ = false);
size_t size = DBMS_DEFAULT_BUFFER_SIZE);
/// Writes progess in repeating HTTP headers.
void onProgress(const Progress & progress);
......
......@@ -57,9 +57,7 @@ void ReplaceQueryParameterVisitor::visitQueryParameter(ASTPtr & ast)
ReadBufferFromString read_buffer{value};
FormatSettings format_settings;
skipWhitespaceIfAny(read_buffer); /// Skip white space on both sides
data_type->deserializeAsWholeText(temp_column, read_buffer, format_settings);
skipWhitespaceIfAny(read_buffer); /// Skip white space on both sides
if (!read_buffer.eof())
throw Exception("Value " + value + " cannot be parsed as " + type_name + " for query parameter '" + ast_param.name + "'"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册