提交 fd007571 编写于 作者: Z zhang2014

ISSUES-5436 support custom http [part 4]

上级 60abff33
#include <Interpreters/CustomHTTP/CustomExecutor.h>
#include <Interpreters/CustomHTTP/HTTPOutputStreams.h>
#include <Interpreters/CustomHTTP/CustomExecutorDefault.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SYNTAX_ERROR;
}
bool CustomExecutor::match(HTTPRequest & request, HTMLForm & params) const
{
for (const auto & matcher : matchers)
{
if (!matcher->match(request, params))
return false;
}
return true;
}
bool CustomExecutor::isQueryParam(const String & param_name) const
{
for (const auto & query_executor : query_executors)
{
if (!query_executor->isQueryParam(param_name))
return false;
}
return true;
}
bool CustomExecutor::canBeParseRequestBody(HTTPRequest & request, HTMLForm & params) const
{
for (const auto & query_executor : query_executors)
{
if (!query_executor->canBeParseRequestBody(request, params))
return false;
}
return true;
}
void CustomExecutor::executeQuery(
Context & context, HTTPRequest & request, HTTPResponse & response,
HTMLForm & params, const HTTPInputStreams & input_streams, const HTTPOutputStreams & output_streams)
{
for (const auto & query_executor : query_executors)
query_executor->executeQueryImpl(context, request, response, params, input_streams, output_streams);
/// Send HTTP headers with code 200 if no exception happened and the data is still not sent to the client.
output_streams.finalize();
}
CustomExecutor::CustomExecutor(
const std::vector<CustomMatcherPtr> & matchers_, const std::vector<CustomQueryExecutorPtr> & query_executors_)
: matchers(matchers_), query_executors(query_executors_)
{
}
CustomExecutors::CustomExecutors(const Configuration & config, const Settings & settings, const String & config_prefix)
{
updateCustomExecutors(config, settings, config_prefix);
}
void CustomExecutors::updateCustomExecutors(const Configuration & config, const Settings & settings, const String & config_prefix)
{
Configuration::Keys custom_executors_keys;
config.keys(config_prefix, custom_executors_keys);
std::unordered_map<String, CustomExecutorPtr> new_custom_executors;
for (const auto & custom_executor_key : custom_executors_keys)
{
if (custom_executor_key == "Default")
throw Exception("CustomExecutor cannot be 'Default'.", ErrorCodes::SYNTAX_ERROR);
else if (custom_executor_key.find('.') != String::npos)
throw Exception("CustomExecutor names with dots are not supported: '" + custom_executor_key + "'", ErrorCodes::SYNTAX_ERROR);
new_custom_executors[custom_executor_key] = createCustomExecutor(config, settings, config_prefix + "." + custom_executor_key);
}
new_custom_executors["Default"] = CustomExecutorDefault::createDefaultCustomExecutor();
std::unique_lock<std::shared_mutex> lock(rwlock);
custom_executors = new_custom_executors;
}
CustomExecutorPtr CustomExecutors::createCustomExecutor(const CustomExecutors::Configuration & config, const Settings & /*settings*/, const String & config_prefix)
{
Configuration::Keys matchers_or_query_executors_type;
config.keys(config_prefix, matchers_or_query_executors_type);
for (const auto & matcher_or_query_executor_type : matchers_or_query_executors_type)
{
if (matcher_or_query_executor_type.find('.') != String::npos)
throw Exception(
"CustomMatcher or CustomQueryExecutor names with dots are not supported: '" + matcher_or_query_executor_type + "'",
ErrorCodes::SYNTAX_ERROR);
// throw Exception("", ErrorCodes::NOT_IMPLEMENTED);
// new_custom_executors[matcher_or_query_executor_type] = createCustomExecutor(config, settings, config_prefix + "." + matcher_or_query_executor_type);
}
return DB::CustomExecutorPtr();
}
std::pair<String, CustomExecutorPtr> CustomExecutors::getCustomExecutor(Poco::Net::HTTPServerRequest & request, HTMLForm & params) const
{
std::shared_lock<std::shared_mutex> lock(rwlock);
for (const auto & custom_executor : custom_executors)
if (custom_executor.second->match(request, params))
return custom_executor;
throw Exception("LOGICAL_ERROR not found custom executor.", ErrorCodes::LOGICAL_ERROR);
}
}
#pragma once
#include <shared_mutex>
#include <Core/Types.h>
#include <IO/ReadBuffer.h>
#include <Core/Settings.h>
#include <Common/HTMLForm.h>
#include <Common/Exception.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <IO/ReadBuffer.h>
namespace DB
{
class Context;
class CustomExecutor;
struct HTTPInputStreams;
struct HTTPOutputStreams;
using HTTPRequest = Poco::Net::HTTPServerRequest;
using HTTPResponse = Poco::Net::HTTPServerResponse;
using CustomExecutorPtr = std::shared_ptr<CustomExecutor>;
class CustomExecutors
{
public:
using Configuration = Poco::Util::AbstractConfiguration;
CustomExecutors(const Configuration & config, const Settings & settings, const String & config_prefix = "CustomHTTP");
CustomExecutors(const CustomExecutors &) = delete;
CustomExecutors & operator=(const CustomExecutors &) = delete;
void updateCustomExecutors(const Configuration & config, const Settings & settings, const String & config_prefix);
std::pair<String, CustomExecutorPtr> getCustomExecutor(Poco::Net::HTTPServerRequest & request, HTMLForm & params) const;
private:
mutable std::shared_mutex rwlock;
std::unordered_map<String, CustomExecutorPtr> custom_executors;
CustomExecutorPtr createCustomExecutor(const Configuration & config, const Settings & settings, const String & config_prefix);
};
class CustomExecutor
{
public:
using HTTPServerRequest = Poco::Net::HTTPServerRequest;
using HTTPServerResponse = Poco::Net::HTTPServerResponse;
bool isQueryParam(const String & param_name) const;
virtual ~CustomExecutor() = default;
bool match(HTTPRequest & request, HTMLForm & params) const;
virtual bool isQueryParam(const String & param_name) const = 0;
bool canBeParseRequestBody(HTTPRequest & request, HTMLForm & params) const;
virtual bool match(HTTPServerRequest & request, HTMLForm & params) const = 0;
void executeQuery(
Context & context, HTTPRequest & request, HTTPResponse & response,
HTMLForm & params, const HTTPInputStreams & input_streams, const HTTPOutputStreams & output_streams
);
virtual bool canBeParseRequestBody(HTTPServerRequest & request, HTMLForm & params) = 0;
public:
class CustomMatcher
{
public:
virtual ~CustomMatcher() = default;
using QueryExecutor = std::function<void(HTTPOutputStreams &, HTTPServerResponse &)>;
using QueryExecutors = std::vector<QueryExecutor>;
virtual QueryExecutors getQueryExecutor(Context & context, HTTPServerRequest & request, HTMLForm & params, const HTTPInputStreams & input_streams) const = 0;
};
virtual bool match(HTTPRequest & request, HTMLForm & params) const = 0;
};
using CustomExecutorPtr = std::shared_ptr<CustomExecutor>;
class CustomQueryExecutor
{
public:
virtual ~CustomQueryExecutor() = default;
virtual bool isQueryParam(const String &) const = 0;
virtual bool canBeParseRequestBody(HTTPRequest &, HTMLForm &) const = 0;
virtual void executeQueryImpl(
Context & context, HTTPRequest & request, HTTPResponse & response,
HTMLForm & params, const HTTPInputStreams & input_streams, const HTTPOutputStreams & output_streams) const = 0;
};
public:
using CustomMatcherPtr = std::shared_ptr<CustomMatcher>;
using CustomQueryExecutorPtr = std::shared_ptr<CustomQueryExecutor>;
CustomExecutor(const std::vector<CustomMatcherPtr> & matchers_, const std::vector<CustomQueryExecutorPtr> & query_executors_);
private:
std::vector<CustomMatcherPtr> matchers;
std::vector<CustomQueryExecutorPtr> query_executors;
};
}
......@@ -5,45 +5,51 @@
#include <IO/ReadBufferFromString.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/CustomHTTP/HTTPInputStreams.h>
#include <Interpreters/CustomHTTP/HTTPOutputStreams.h>
#include "HTTPInputStreams.h"
namespace DB
{
class CustomExecutorDefault : public CustomExecutor
class CustomExecutorDefault : public CustomExecutor::CustomMatcher, public CustomExecutor::CustomQueryExecutor
{
public:
bool match(HTTPServerRequest & /*request*/, HTMLForm & /*params*/) const override { return true; }
bool canBeParseRequestBody(HTTPServerRequest & /*request*/, HTMLForm & /*params*/) override { return false; }
bool canBeParseRequestBody(HTTPServerRequest & /*request*/, HTMLForm & /*params*/) const override { return false; }
bool isQueryParam(const String & param_name) const override
bool isQueryParam(const String & param_name) const override { return param_name == "query" || startsWith(param_name, "param_"); }
void executeQueryImpl(
Context & context, HTTPRequest & request, HTTPResponse & response,
HTMLForm & params, const HTTPInputStreams & input_streams, const HTTPOutputStreams & output_streams) const override
{
return param_name == "query" || startsWith(param_name, "param_");
const auto & execute_query = prepareQuery(context, params);
ReadBufferPtr execute_query_buf = std::make_shared<ReadBufferFromString>(execute_query);
ReadBufferPtr temp_query_buf;
if (!startsWith(request.getContentType().data(), "multipart/form-data"))
{
temp_query_buf = execute_query_buf; /// we create a temporary reference for not to be destroyed
execute_query_buf = std::make_unique<ConcatReadBuffer>(*temp_query_buf, *input_streams.in_maybe_internal_compressed);
}
executeQuery(
*execute_query_buf, *output_streams.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context,
[&response] (const String & content_type) { response.setContentType(content_type); },
[&response] (const String & current_query_id) { response.add("X-ClickHouse-Query-Id", current_query_id); }
);
}
QueryExecutors getQueryExecutor(Context & context, HTTPServerRequest & request, HTMLForm & params, const HTTPInputStreams & input_streams) const override
static CustomExecutorPtr createDefaultCustomExecutor()
{
return {[&](HTTPOutputStreams & output, HTTPServerResponse & response)
{
const auto & execute_query = prepareQuery(context, params);
ReadBufferPtr execute_query_buf = std::make_shared<ReadBufferFromString>(execute_query);
ReadBufferPtr temp_query_buf;
if (!startsWith(request.getContentType().data(), "multipart/form-data"))
{
temp_query_buf = execute_query_buf; /// we create a temporary reference for not to be destroyed
execute_query_buf = std::make_unique<ConcatReadBuffer>(*temp_query_buf, *input_streams.in_maybe_internal_compressed);
}
executeQuery(
*execute_query_buf, *output.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context,
[&response] (const String & content_type) { response.setContentType(content_type); },
[&response] (const String & current_query_id) { response.add("X-ClickHouse-Query-Id", current_query_id); }
);
}};
const auto & default_custom_executor = std::make_shared<CustomExecutorDefault>();
std::vector<CustomExecutor::CustomMatcherPtr> custom_matchers{default_custom_executor};
std::vector<CustomExecutor::CustomQueryExecutorPtr> custom_query_executors{default_custom_executor};
return std::make_shared<CustomExecutor>(custom_matchers, custom_query_executors);
}
private:
......
......@@ -24,7 +24,7 @@ HTTPInputStreams::HTTPInputStreams(Context & context, HTTPServerRequest & reques
/// checksums of client data compressed with internal algorithm are not checked.
if (context.getSettingsRef().http_native_compression_disable_checksumming_on_decompress)
{
if(CompressedReadBuffer * compressed_buffer = typeid_cast<CompressedReadBuffer *>(in_maybe_internal_compressed.get()))
if (CompressedReadBuffer * compressed_buffer = typeid_cast<CompressedReadBuffer *>(in_maybe_internal_compressed.get()))
compressed_buffer->disableChecksumming();
}
}
......
......@@ -24,4 +24,4 @@ struct HTTPInputStreams
ReadBufferPtr createInternalCompressedBuffer(HTMLForm & params, ReadBufferPtr & http_maybe_encoding_buffer) const;
};
}
\ No newline at end of file
}
......@@ -243,7 +243,7 @@ void HTTPHandler::SessionContextHolder::authentication(HTTPServerRequest & reque
void HTTPHandler::processQuery(Context & context, HTTPRequest & request, HTMLForm & params, HTTPResponse & response)
{
const auto & name_with_custom_executor = context.getCustomExecutor(request/*, params*/);
const auto & name_with_custom_executor = context.getCustomExecutor(request, params);
LOG_TRACE(log, "Using " << name_with_custom_executor.first << " to execute URI: " << request.getURI());
ExtractorClientInfo{context.getClientInfo()}.extract(request);
......@@ -251,12 +251,7 @@ void HTTPHandler::processQuery(Context & context, HTTPRequest & request, HTMLFor
HTTPInputStreams input_streams{context, request, params};
HTTPOutputStreams output_streams = HTTPOutputStreams(context, request, response, params, getKeepAliveTimeout());
const auto & query_executors = name_with_custom_executor.second->getQueryExecutor(context, request, params, input_streams);
for (const auto & query_executor : query_executors)
query_executor(output_streams, response);
output_streams.finalize(); /// Send HTTP headers with code 200 if no exception happened and the data is still not sent to the client.
name_with_custom_executor.second->executeQuery(context, request, response, params, input_streams, output_streams);
}
void HTTPHandler::trySendExceptionToClient(
......
......@@ -466,6 +466,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
//setTextLog(global_context->getTextLog());
//buildLoggers(*config, logger());
global_context->setClustersConfig(config);
global_context->setCustomExecutorConfig(config);
global_context->setMacros(std::make_unique<Macros>(*config, "macros"));
/// Setup protection to avoid accidental DROP for big tables (that are greater than 50 GB by default)
......
......@@ -7,6 +7,7 @@
#include <Poco/Net/IPAddress.h>
#include <Poco/Util/Application.h>
#include <Common/Macros.h>
#include <Common/HTMLForm.h>
#include <Common/escapeForFileName.h>
#include <Common/setThreadName.h>
#include <Common/Stopwatch.h>
......@@ -44,7 +45,7 @@
#include <Interpreters/SystemLog.h>
#include <Interpreters/Context.h>
#include <Interpreters/DDLWorker.h>
#include <Interpreters/CustomHTTP/CustomExecutorDefault.h>
#include <Interpreters/CustomHTTP/CustomExecutor.h>
#include <Common/DNSResolver.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/UncompressedCache.h>
......@@ -349,7 +350,9 @@ struct ContextShared
std::unique_ptr<Clusters> clusters;
ConfigurationPtr clusters_config; /// Stores updated configs
mutable std::mutex clusters_mutex; /// Guards clusters and clusters_config
mutable std::mutex match_executor_mutex; /// Guards match executor and their config
std::unique_ptr<CustomExecutors> custom_executors;
ConfigurationPtr custom_executors_config;
mutable std::mutex custom_executors_mutex; /// Guards custom executors and their config
#if USE_EMBEDDED_COMPILER
std::shared_ptr<CompiledExpressionCache> compiled_expression_cache;
......@@ -2044,9 +2047,29 @@ void Context::resetInputCallbacks()
input_blocks_reader = {};
}
std::pair<String, CustomExecutorPtr> Context::getCustomExecutor(Poco::Net::HTTPServerRequest & /*request*/)
void Context::setCustomExecutorConfig(const ConfigurationPtr & config, const String & config_prefix)
{
return std::pair<String, CustomExecutorPtr>("Default", std::make_shared<CustomExecutorDefault>());
std::lock_guard lock(shared->custom_executors_mutex);
shared->custom_executors_config = config;
if (!shared->custom_executors)
shared->custom_executors = std::make_unique<CustomExecutors>(*shared->custom_executors_config, settings, config_prefix);
else
shared->custom_executors->updateCustomExecutors(*shared->custom_executors_config, settings, config_prefix);
}
std::pair<String, CustomExecutorPtr> Context::getCustomExecutor(Poco::Net::HTTPServerRequest & request, HTMLForm & params) const
{
std::lock_guard lock(shared->custom_executors_mutex);
if (!shared->custom_executors)
{
auto & config = shared->custom_executors_config ? *shared->custom_executors_config : getConfigRef();
shared->custom_executors = std::make_unique<CustomExecutors>(config, settings);
}
return shared->custom_executors->getCustomExecutor(request, params);
}
......
......@@ -42,6 +42,7 @@ namespace zkutil
class ZooKeeper;
}
class HTMLForm;
namespace DB
{
......@@ -490,7 +491,8 @@ public:
Compiler & getCompiler();
std::pair<String, CustomExecutorPtr> getCustomExecutor(Poco::Net::HTTPServerRequest &request/*, HTMLForm & params*/);
void setCustomExecutorConfig(const ConfigurationPtr & config, const String & config_prefix = "CustomHTTP");
std::pair<String, CustomExecutorPtr> getCustomExecutor(Poco::Net::HTTPServerRequest &request, HTMLForm & params) const;
/// Call after initialization before using system logs. Call for global context.
void initializeSystemLogs();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册