未验证 提交 414f470c 编写于 作者: I Ivan 提交者: GitHub

Make Poco HTTP Server zero-copy again (#19516)

* Refactoring: part 1

* Refactoring: part 2

* Handle request using ReadBuffer interface

* Struggles with ReadBuffer's

* Fix URI parsing

* Implement parsing of multipart/form-data

* Check HTTP_LENGTH_REQUIRED before eof() or will hang

* Fix HTTPChunkedReadBuffer

* Fix build and style

* Fix test

* Resist double-eof

* Fix arcadian build
上级 71c65b5d
......@@ -83,7 +83,7 @@ public:
template <class T>
void writeToGraphite(const std::string & key, const T & value, const std::string & config_name = DEFAULT_GRAPHITE_CONFIG_NAME, time_t timestamp = 0, const std::string & custom_root_path = "")
{
auto writer = getGraphiteWriter(config_name);
auto *writer = getGraphiteWriter(config_name);
if (writer)
writer->write(key, value, timestamp, custom_root_path);
}
......@@ -91,7 +91,7 @@ public:
template <class T>
void writeToGraphite(const GraphiteWriter::KeyValueVector<T> & key_vals, const std::string & config_name = DEFAULT_GRAPHITE_CONFIG_NAME, time_t timestamp = 0, const std::string & custom_root_path = "")
{
auto writer = getGraphiteWriter(config_name);
auto *writer = getGraphiteWriter(config_name);
if (writer)
writer->write(key_vals, timestamp, custom_root_path);
}
......@@ -99,7 +99,7 @@ public:
template <class T>
void writeToGraphite(const GraphiteWriter::KeyValueVector<T> & key_vals, const std::chrono::system_clock::time_point & current_time, const std::string & custom_root_path)
{
auto writer = getGraphiteWriter();
auto *writer = getGraphiteWriter();
if (writer)
writer->write(key_vals, std::chrono::system_clock::to_time_t(current_time), custom_root_path);
}
......
......@@ -4,14 +4,14 @@
# include <DataTypes/DataTypeFactory.h>
# include <DataTypes/DataTypeNullable.h>
# include <IO/WriteBufferFromHTTPServerResponse.h>
# include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
# include <IO/WriteHelpers.h>
# include <Parsers/ParserQueryWithOutput.h>
# include <Parsers/parseQuery.h>
# include <Poco/Data/ODBC/ODBCException.h>
# include <Poco/Data/ODBC/SessionImpl.h>
# include <Poco/Data/ODBC/Utility.h>
# include <Poco/Net/HTMLForm.h>
# include <Server/HTTP/HTMLForm.h>
# include <Poco/Net/HTTPServerRequest.h>
# include <Poco/Net/HTTPServerResponse.h>
# include <Poco/NumberParser.h>
......@@ -59,16 +59,16 @@ namespace
}
}
void ODBCColumnsInfoHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
void ODBCColumnsInfoHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
{
Poco::Net::HTMLForm params(request, request.stream());
HTMLForm params(request, request.getStream());
LOG_TRACE(log, "Request URI: {}", request.getURI());
auto process_error = [&response, this](const std::string & message)
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
response.send() << message << std::endl;
*response.send() << message << std::endl;
LOG_WARNING(log, message);
};
......@@ -159,7 +159,7 @@ void ODBCColumnsInfoHandler::handleRequest(Poco::Net::HTTPServerRequest & reques
columns.emplace_back(reinterpret_cast<char *>(column_name), std::move(column_type));
}
WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout);
WriteBufferFromHTTPServerResponse out(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout);
writeStringBinary(columns.toString(), out);
}
catch (...)
......
......@@ -3,10 +3,11 @@
#if USE_ODBC
# include <Interpreters/Context.h>
# include <Poco/Logger.h>
# include <Poco/Net/HTTPRequestHandler.h>
# include <Server/HTTP/HTTPRequestHandler.h>
# include <Common/config.h>
# include <Poco/Logger.h>
/** The structure of the table is taken from the query "SELECT * FROM table WHERE 1=0".
* TODO: It would be much better to utilize ODBC methods dedicated for columns description.
* If there is no such table, an exception is thrown.
......@@ -14,7 +15,7 @@
namespace DB
{
class ODBCColumnsInfoHandler : public Poco::Net::HTTPRequestHandler
class ODBCColumnsInfoHandler : public HTTPRequestHandler
{
public:
ODBCColumnsInfoHandler(size_t keep_alive_timeout_, Context & context_)
......@@ -22,7 +23,7 @@ public:
{
}
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
private:
Poco::Logger * log;
......
......@@ -7,39 +7,40 @@
namespace DB
{
Poco::Net::HTTPRequestHandler * HandlerFactory::createRequestHandler(const Poco::Net::HTTPServerRequest & request)
std::unique_ptr<HTTPRequestHandler> HandlerFactory::createRequestHandler(const HTTPServerRequest & request)
{
Poco::URI uri{request.getURI()};
LOG_TRACE(log, "Request URI: {}", uri.toString());
if (uri.getPath() == "/ping" && request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET)
return new PingHandler(keep_alive_timeout);
return std::make_unique<PingHandler>(keep_alive_timeout);
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST)
{
if (uri.getPath() == "/columns_info")
#if USE_ODBC
return new ODBCColumnsInfoHandler(keep_alive_timeout, context);
return std::make_unique<ODBCColumnsInfoHandler>(keep_alive_timeout, context);
#else
return nullptr;
#endif
else if (uri.getPath() == "/identifier_quote")
#if USE_ODBC
return new IdentifierQuoteHandler(keep_alive_timeout, context);
return std::make_unique<IdentifierQuoteHandler>(keep_alive_timeout, context);
#else
return nullptr;
#endif
else if (uri.getPath() == "/schema_allowed")
#if USE_ODBC
return new SchemaAllowedHandler(keep_alive_timeout, context);
return std::make_unique<SchemaAllowedHandler>(keep_alive_timeout, context);
#else
return nullptr;
#endif
else if (uri.getPath() == "/write")
return new ODBCHandler(pool_map, keep_alive_timeout, context, "write");
return std::make_unique<ODBCHandler>(pool_map, keep_alive_timeout, context, "write");
else
return new ODBCHandler(pool_map, keep_alive_timeout, context, "read");
return std::make_unique<ODBCHandler>(pool_map, keep_alive_timeout, context, "read");
}
return nullptr;
}
......
#pragma once
#include <Interpreters/Context.h>
#include <Poco/Logger.h>
#include <Poco/Net/HTTPRequestHandler.h>
#include <Poco/Net/HTTPRequestHandlerFactory.h>
#include "MainHandler.h"
#include <Server/HTTP/HTTPRequestHandlerFactory.h>
#include "ColumnInfoHandler.h"
#include "IdentifierQuoteHandler.h"
#include "MainHandler.h"
#include "SchemaAllowedHandler.h"
#include <Poco/Logger.h>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <Poco/Data/SessionPool.h>
#include <Poco/Data/SessionPool.h>
#pragma GCC diagnostic pop
......@@ -19,7 +20,7 @@ namespace DB
/** Factory for '/ping', '/', '/columns_info', '/identifier_quote', '/schema_allowed' handlers.
* Also stores Session pools for ODBC connections
*/
class HandlerFactory : public Poco::Net::HTTPRequestHandlerFactory
class HandlerFactory : public HTTPRequestHandlerFactory
{
public:
HandlerFactory(const std::string & name_, size_t keep_alive_timeout_, Context & context_)
......@@ -28,7 +29,7 @@ public:
pool_map = std::make_shared<ODBCHandler::PoolMap>();
}
Poco::Net::HTTPRequestHandler * createRequestHandler(const Poco::Net::HTTPServerRequest & request) override;
std::unique_ptr<HTTPRequestHandler> createRequestHandler(const HTTPServerRequest & request) override;
private:
Poco::Logger * log;
......
......@@ -3,14 +3,14 @@
#if USE_ODBC
# include <DataTypes/DataTypeFactory.h>
# include <IO/WriteBufferFromHTTPServerResponse.h>
# include <Server/HTTP/HTMLForm.h>
# include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
# include <IO/WriteHelpers.h>
# include <Parsers/ParserQueryWithOutput.h>
# include <Parsers/parseQuery.h>
# include <Poco/Data/ODBC/ODBCException.h>
# include <Poco/Data/ODBC/SessionImpl.h>
# include <Poco/Data/ODBC/Utility.h>
# include <Poco/Net/HTMLForm.h>
# include <Poco/Net/HTTPServerRequest.h>
# include <Poco/Net/HTTPServerResponse.h>
# include <common/logger_useful.h>
......@@ -22,16 +22,16 @@
namespace DB
{
void IdentifierQuoteHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
void IdentifierQuoteHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
{
Poco::Net::HTMLForm params(request, request.stream());
HTMLForm params(request, request.getStream());
LOG_TRACE(log, "Request URI: {}", request.getURI());
auto process_error = [&response, this](const std::string & message)
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
response.send() << message << std::endl;
*response.send() << message << std::endl;
LOG_WARNING(log, message);
};
......@@ -49,7 +49,7 @@ void IdentifierQuoteHandler::handleRequest(Poco::Net::HTTPServerRequest & reques
auto identifier = getIdentifierQuote(hdbc);
WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout);
WriteBufferFromHTTPServerResponse out(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout);
writeStringBinary(identifier, out);
}
catch (...)
......
#pragma once
#include <Interpreters/Context.h>
#include <Server/HTTP/HTTPRequestHandler.h>
#include <Poco/Logger.h>
#include <Poco/Net/HTTPRequestHandler.h>
#if USE_ODBC
......@@ -10,7 +11,7 @@
namespace DB
{
class IdentifierQuoteHandler : public Poco::Net::HTTPRequestHandler
class IdentifierQuoteHandler : public HTTPRequestHandler
{
public:
IdentifierQuoteHandler(size_t keep_alive_timeout_, Context &)
......@@ -18,7 +19,7 @@ public:
{
}
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
private:
Poco::Logger * log;
......
......@@ -7,7 +7,7 @@
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeFactory.h>
#include <Formats/FormatFactory.h>
#include <IO/WriteBufferFromHTTPServerResponse.h>
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromIStream.h>
......@@ -17,6 +17,7 @@
#include <Poco/ThreadPool.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <common/logger_useful.h>
#include <Server/HTTP/HTMLForm.h>
#include <mutex>
#include <memory>
......@@ -73,19 +74,19 @@ ODBCHandler::PoolPtr ODBCHandler::getPool(const std::string & connection_str)
return pool_map->at(connection_str);
}
void ODBCHandler::processError(Poco::Net::HTTPServerResponse & response, const std::string & message)
void ODBCHandler::processError(HTTPServerResponse & response, const std::string & message)
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
response.setStatusAndReason(HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
response.send() << message << std::endl;
*response.send() << message << std::endl;
LOG_WARNING(log, message);
}
void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
{
Poco::Net::HTMLForm params(request);
HTMLForm params(request);
if (mode == "read")
params.read(request.stream());
params.read(request.getStream());
LOG_TRACE(log, "Request URI: {}", request.getURI());
if (mode == "read" && !params.has("query"))
......@@ -136,7 +137,7 @@ void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne
std::string connection_string = params.get("connection_string");
LOG_TRACE(log, "Connection string: '{}'", connection_string);
WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout);
WriteBufferFromHTTPServerResponse out(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout);
try
{
......@@ -163,9 +164,8 @@ void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne
#endif
auto pool = getPool(connection_string);
ReadBufferFromIStream read_buf(request.stream());
auto input_format = FormatFactory::instance().getInput(format, read_buf, *sample_block,
context, max_block_size);
auto & read_buf = request.getStream();
auto input_format = FormatFactory::instance().getInput(format, read_buf, *sample_block, context, max_block_size);
auto input_stream = std::make_shared<InputStreamFromInputFormat>(input_format);
ODBCBlockOutputStream output_stream(pool->get(), db_name, table_name, *sample_block, quoting_style);
copyData(*input_stream, output_stream);
......
#pragma once
#include <Interpreters/Context.h>
#include <Server/HTTP/HTTPRequestHandler.h>
#include <Poco/Logger.h>
#include <Poco/Net/HTTPRequestHandler.h>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <Poco/Data/SessionPool.h>
#include <Poco/Data/SessionPool.h>
#pragma GCC diagnostic pop
namespace DB
......@@ -16,7 +17,7 @@ namespace DB
* and also query in request body
* response in RowBinary format
*/
class ODBCHandler : public Poco::Net::HTTPRequestHandler
class ODBCHandler : public HTTPRequestHandler
{
public:
using PoolPtr = std::shared_ptr<Poco::Data::SessionPool>;
......@@ -34,7 +35,7 @@ public:
{
}
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
private:
Poco::Logger * log;
......@@ -47,7 +48,7 @@ private:
static inline std::mutex mutex;
PoolPtr getPool(const std::string & connection_str);
void processError(Poco::Net::HTTPServerResponse & response, const std::string & message);
void processError(HTTPServerResponse & response, const std::string & message);
};
}
......@@ -11,7 +11,6 @@
# include <Poco/Data/ODBC/Connector.h>
#endif
#include <Poco/Net/HTTPServer.h>
#include <Poco/Net/NetException.h>
#include <Poco/String.h>
#include <Poco/Util/HelpFormatter.h>
......@@ -23,6 +22,7 @@
#include <ext/scope_guard.h>
#include <ext/range.h>
#include <Common/SensitiveDataMasker.h>
#include <Server/HTTP/HTTPServer.h>
namespace DB
......@@ -212,8 +212,12 @@ int ODBCBridge::main(const std::vector<std::string> & /*args*/)
SensitiveDataMasker::setInstance(std::make_unique<SensitiveDataMasker>(config(), "query_masking_rules"));
}
auto server = Poco::Net::HTTPServer(
new HandlerFactory("ODBCRequestHandlerFactory-factory", keep_alive_timeout, context), server_pool, socket, http_params);
auto server = HTTPServer(
context,
std::make_shared<HandlerFactory>("ODBCRequestHandlerFactory-factory", keep_alive_timeout, context),
server_pool,
socket,
http_params);
server.start();
LOG_INFO(log, "Listening http://{}", address.toString());
......
......@@ -6,7 +6,7 @@
namespace DB
{
void PingHandler::handleRequest(Poco::Net::HTTPServerRequest & /*request*/, Poco::Net::HTTPServerResponse & response)
void PingHandler::handleRequest(HTTPServerRequest & /* request */, HTTPServerResponse & response)
{
try
{
......
#pragma once
#include <Poco/Net/HTTPRequestHandler.h>
#include <Server/HTTP/HTTPRequestHandler.h>
namespace DB
{
/** Simple ping handler, answers "Ok." to GET request
*/
class PingHandler : public Poco::Net::HTTPRequestHandler
/// Simple ping handler, answers "Ok." to GET request
class PingHandler : public HTTPRequestHandler
{
public:
PingHandler(size_t keep_alive_timeout_) : keep_alive_timeout(keep_alive_timeout_) {}
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override;
explicit PingHandler(size_t keep_alive_timeout_) : keep_alive_timeout(keep_alive_timeout_) {}
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
private:
size_t keep_alive_timeout;
};
}
......@@ -2,12 +2,12 @@
#if USE_ODBC
# include <IO/WriteBufferFromHTTPServerResponse.h>
# include <Server/HTTP/HTMLForm.h>
# include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
# include <IO/WriteHelpers.h>
# include <Poco/Data/ODBC/ODBCException.h>
# include <Poco/Data/ODBC/SessionImpl.h>
# include <Poco/Data/ODBC/Utility.h>
# include <Poco/Net/HTMLForm.h>
# include <Poco/Net/HTTPServerRequest.h>
# include <Poco/Net/HTTPServerResponse.h>
# include <common/logger_useful.h>
......@@ -33,16 +33,16 @@ namespace
}
void SchemaAllowedHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
void SchemaAllowedHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
{
Poco::Net::HTMLForm params(request, request.stream());
HTMLForm params(request, request.getStream());
LOG_TRACE(log, "Request URI: {}", request.getURI());
auto process_error = [&response, this](const std::string & message)
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
response.send() << message << std::endl;
*response.send() << message << std::endl;
LOG_WARNING(log, message);
};
......@@ -60,7 +60,7 @@ void SchemaAllowedHandler::handleRequest(Poco::Net::HTTPServerRequest & request,
bool result = isSchemaAllowed(hdbc);
WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout);
WriteBufferFromHTTPServerResponse out(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout);
writeBoolText(result, out);
}
catch (...)
......
#pragma once
#include <Server/HTTP/HTTPRequestHandler.h>
#include <Poco/Logger.h>
#include <Poco/Net/HTTPRequestHandler.h>
#if USE_ODBC
namespace DB
{
class Context;
class Context;
/// This handler establishes connection to database, and retrieve whether schema is allowed.
class SchemaAllowedHandler : public Poco::Net::HTTPRequestHandler
/// This handler establishes connection to database, and retrieves whether schema is allowed.
class SchemaAllowedHandler : public HTTPRequestHandler
{
public:
SchemaAllowedHandler(size_t keep_alive_timeout_, Context &)
......@@ -19,7 +20,7 @@ public:
{
}
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
private:
Poco::Logger * log;
......
......@@ -69,6 +69,7 @@
#include <Server/MySQLHandlerFactory.h>
#include <Server/PostgreSQLHandlerFactory.h>
#include <Server/ProtocolServerAdapter.h>
#include <Server/HTTP/HTTPServer.h>
#if !defined(ARCADIA_BUILD)
......@@ -1070,8 +1071,10 @@ int Server::main(const std::vector<std::string> & /*args*/)
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers->emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, async_metrics, "HTTPHandler-factory"), server_pool, socket, http_params));
servers->emplace_back(
port_name,
std::make_unique<HTTPServer>(
context(), createHandlerFactory(*this, async_metrics, "HTTPHandler-factory"), server_pool, socket, http_params));
LOG_INFO(log, "Listening for http://{}", address.toString());
});
......@@ -1085,8 +1088,10 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers->emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, async_metrics, "HTTPSHandler-factory"), server_pool, socket, http_params));
servers->emplace_back(
port_name,
std::make_unique<HTTPServer>(
context(), createHandlerFactory(*this, async_metrics, "HTTPSHandler-factory"), server_pool, socket, http_params));
LOG_INFO(log, "Listening for https://{}", address.toString());
#else
......@@ -1160,8 +1165,14 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socketBindListen(socket, listen_host, port);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers->emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, async_metrics, "InterserverIOHTTPHandler-factory"), server_pool, socket, http_params));
servers->emplace_back(
port_name,
std::make_unique<HTTPServer>(
context(),
createHandlerFactory(*this, async_metrics, "InterserverIOHTTPHandler-factory"),
server_pool,
socket,
http_params));
LOG_INFO(log, "Listening for replica communication (interserver): http://{}", address.toString());
});
......@@ -1174,8 +1185,14 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers->emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, async_metrics, "InterserverIOHTTPSHandler-factory"), server_pool, socket, http_params));
servers->emplace_back(
port_name,
std::make_unique<HTTPServer>(
context(),
createHandlerFactory(*this, async_metrics, "InterserverIOHTTPSHandler-factory"),
server_pool,
socket,
http_params));
LOG_INFO(log, "Listening for secure replica communication (interserver): https://{}", address.toString());
#else
......@@ -1235,8 +1252,14 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socketBindListen(socket, listen_host, port);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers->emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params));
servers->emplace_back(
port_name,
std::make_unique<HTTPServer>(
context(),
createHandlerFactory(*this, async_metrics, "PrometheusHandler-factory"),
server_pool,
socket,
http_params));
LOG_INFO(log, "Listening for Prometheus: http://{}", address.toString());
});
......
......@@ -51,6 +51,7 @@ public:
}
void defineOptions(Poco::Util::OptionSet & _options) override;
protected:
int run() override;
......@@ -65,8 +66,6 @@ protected:
private:
Context * global_context_ptr = nullptr;
private:
Poco::Net::SocketAddress socketBindListen(Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port, [[maybe_unused]] bool secure = false) const;
using CreateServerFunc = std::function<void(UInt16)>;
......
......@@ -181,6 +181,7 @@ add_object_library(clickhouse_storages_mergetree Storages/MergeTree)
add_object_library(clickhouse_storages_liveview Storages/LiveView)
add_object_library(clickhouse_client Client)
add_object_library(clickhouse_server Server)
add_object_library(clickhouse_server_http Server/HTTP)
add_object_library(clickhouse_formats Formats)
add_object_library(clickhouse_processors Processors)
add_object_library(clickhouse_processors_executors Processors/Executors)
......
#pragma once
#include <sstream>
#include <Poco/Net/HTMLForm.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/URI.h>
#include <IO/ReadHelpers.h>
/** Somehow, in case of POST, Poco::Net::HTMLForm doesn't read parameters from URL, only from body.
* This helper allows to read parameters just from URL.
*/
struct HTMLForm : public Poco::Net::HTMLForm
{
HTMLForm(const Poco::Net::HTTPRequest & request)
{
Poco::URI uri(request.getURI());
std::istringstream istr(uri.getRawQuery()); // STYLE_CHECK_ALLOW_STD_STRING_STREAM
readUrl(istr);
}
HTMLForm(const Poco::URI & uri)
{
std::istringstream istr(uri.getRawQuery()); // STYLE_CHECK_ALLOW_STD_STRING_STREAM
readUrl(istr);
}
template <typename T>
T getParsed(const std::string & key, T default_value)
{
auto it = find(key);
return (it != end()) ? DB::parse<T>(it->second) : default_value;
}
template <typename T>
T getParsed(const std::string & key)
{
return DB::parse<T>(get(key));
}
};
......@@ -120,6 +120,12 @@ inline bool isWhitespaceASCII(char c)
return c == ' ' || c == '\t' || c == '\n' || c == '\r' || c == '\f' || c == '\v';
}
/// Since |isWhiteSpaceASCII()| is used inside algorithms it's easier to implement another function than add extra argument.
inline bool isWhitespaceASCIIOneLine(char c)
{
return c == ' ' || c == '\t' || c == '\r' || c == '\f' || c == '\v';
}
inline bool isControlASCII(char c)
{
return static_cast<unsigned char>(c) <= 31;
......
......@@ -85,9 +85,9 @@ inline bool parseIPv6(const char * src, unsigned char * dst)
return clear_dst();
unsigned char tmp[IPV6_BINARY_LENGTH]{};
auto tp = tmp;
auto endp = tp + IPV6_BINARY_LENGTH;
auto curtok = src;
auto * tp = tmp;
auto * endp = tp + IPV6_BINARY_LENGTH;
const auto * curtok = src;
auto saw_xdigit = false;
UInt32 val{};
unsigned char * colonp = nullptr;
......@@ -97,14 +97,14 @@ inline bool parseIPv6(const char * src, unsigned char * dst)
{
const auto num = unhex(ch);
if (num != '\xff')
if (num != u8'\xff')
{
val <<= 4;
val |= num;
if (val > 0xffffu)
return clear_dst();
saw_xdigit = 1;
saw_xdigit = true;
continue;
}
......@@ -204,7 +204,7 @@ inline void formatIPv4(const unsigned char * src, char *& dst, uint8_t mask_tail
for (size_t octet = 0; octet < limit; ++octet)
{
const uint8_t value = static_cast<uint8_t>(src[IPV4_BINARY_LENGTH - octet - 1]);
auto rep = one_byte_to_string_lookup_table[value];
const auto * rep = one_byte_to_string_lookup_table[value];
const uint8_t len = rep[0];
const char* str = rep + 1;
......
......@@ -90,12 +90,12 @@ std::string getHexUIntLowercase(TUInt uint_)
extern const char * const hex_char_to_digit_table;
inline char unhex(char c)
inline UInt8 unhex(char c)
{
return hex_char_to_digit_table[static_cast<UInt8>(c)];
}
inline char unhex2(const char * data)
inline UInt8 unhex2(const char * data)
{
return
static_cast<UInt8>(unhex(data[0])) * 0x10
......
......@@ -125,19 +125,16 @@ ExternalTable::ExternalTable(const boost::program_options::variables_map & exter
}
void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header, std::istream & stream)
void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header, ReadBuffer & stream)
{
const Settings & settings = context.getSettingsRef();
/// The buffer is initialized here, not in the virtual function initReadBuffer
read_buffer_impl = std::make_unique<ReadBufferFromIStream>(stream);
if (settings.http_max_multipart_form_data_size)
read_buffer = std::make_unique<LimitReadBuffer>(
*read_buffer_impl, settings.http_max_multipart_form_data_size,
stream, settings.http_max_multipart_form_data_size,
true, "the maximum size of multipart/form-data. This limit can be tuned by 'http_max_multipart_form_data_size' setting");
else
read_buffer = std::move(read_buffer_impl);
read_buffer = wrapReadBufferReference(stream);
/// Retrieve a collection of parameters from MessageHeader
Poco::Net::NameValueCollection content;
......
#pragma once
#include <string>
#include <vector>
#include <memory>
#include <iosfwd>
#include <Poco/Net/PartHandler.h>
#include <Core/Block.h>
#include <Client/Connection.h>
#include <Core/Block.h>
#include <IO/ReadBuffer.h>
#include <Server/HTTP/HTMLForm.h>
#include <iosfwd>
#include <memory>
#include <string>
#include <vector>
namespace Poco
......@@ -51,7 +50,7 @@ public:
std::unique_ptr<ReadBuffer> read_buffer;
Block sample_block;
virtual ~BaseExternalTable() {}
virtual ~BaseExternalTable() = default;
/// Initialize read_buffer, depending on the data source. By default, does nothing.
virtual void initReadBuffer() {}
......@@ -82,24 +81,23 @@ public:
void initReadBuffer() override;
/// Extract parameters from variables_map, which is built on the client command line
ExternalTable(const boost::program_options::variables_map & external_options);
explicit ExternalTable(const boost::program_options::variables_map & external_options);
};
/// Parsing of external table used when sending tables via http
/// The `handlePart` function will be called for each table passed,
/// so it's also necessary to call `clean` at the end of the `handlePart`.
class ExternalTablesHandler : public Poco::Net::PartHandler, BaseExternalTable
class ExternalTablesHandler : public HTMLForm::PartHandler, BaseExternalTable
{
public:
ExternalTablesHandler(Context & context_, const Poco::Net::NameValueCollection & params_) : context(context_), params(params_) {}
void handlePart(const Poco::Net::MessageHeader & header, std::istream & stream) override;
void handlePart(const Poco::Net::MessageHeader & header, ReadBuffer & stream) override;
private:
Context & context;
const Poco::Net::NameValueCollection & params;
std::unique_ptr<ReadBuffer> read_buffer_impl;
};
......
#pragma once
#include <IO/ReadBuffer.h>
namespace DB
{
/// Just a stub - reads nothing from nowhere.
class EmptyReadBuffer : public ReadBuffer
{
public:
EmptyReadBuffer() : ReadBuffer(nullptr, 0) {}
private:
bool nextImpl() override { return false; }
};
}
#include <IO/HTTPChunkedReadBuffer.h>
#include <IO/ReadHelpers.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/hex.h>
#include <common/arithmeticOverflow.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int UNEXPECTED_END_OF_FILE;
extern const int CORRUPTED_DATA;
extern const int TOO_MANY_BYTES;
}
size_t HTTPChunkedReadBuffer::readChunkHeader()
{
if (in->eof())
throw Exception("Unexpected end of file while reading chunk header of HTTP chunked data", ErrorCodes::UNEXPECTED_END_OF_FILE);
if (!isHexDigit(*in->position()))
throw Exception("Unexpected data instead of HTTP chunk header", ErrorCodes::CORRUPTED_DATA);
size_t res = 0;
do
{
if (common::mulOverflow(res, 16ul, res) || common::addOverflow<size_t>(res, unhex(*in->position()), res))
throw Exception("Chunk size is out of bounds", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
++in->position();
} while (!in->eof() && isHexDigit(*in->position()));
/// NOTE: If we want to read any chunk extensions, it should be done here.
skipToCarriageReturnOrEOF(*in);
if (in->eof())
throw Exception("Unexpected end of file while reading chunk header of HTTP chunked data", ErrorCodes::UNEXPECTED_END_OF_FILE);
if (res > max_size)
throw Exception("Chunk size is too large", ErrorCodes::TOO_MANY_BYTES);
assertString("\n", *in);
return res;
}
void HTTPChunkedReadBuffer::readChunkFooter()
{
assertString("\r\n", *in);
}
bool HTTPChunkedReadBuffer::nextImpl()
{
if (!in)
return false;
/// The footer of previous chunk.
if (count())
readChunkFooter();
size_t chunk_size = readChunkHeader();
if (0 == chunk_size)
{
readChunkFooter();
in.reset(); // prevent double-eof situation.
return false;
}
if (in->available() >= chunk_size)
{
/// Zero-copy read from input.
working_buffer = Buffer(in->position(), in->position() + chunk_size);
in->position() += chunk_size;
}
else
{
/// Chunk is not completely in buffer, copy it to scratch space.
memory.resize(chunk_size);
in->readStrict(memory.data(), chunk_size);
working_buffer = Buffer(memory.data(), memory.data() + chunk_size);
}
/// NOTE: We postpone reading the footer to the next iteration, because it may not be completely in buffer,
/// but we need to keep the current data in buffer available.
return true;
}
}
#pragma once
#include <IO/BufferWithOwnMemory.h>
#include <IO/ReadBuffer.h>
namespace DB
{
/// Reads data with HTTP Chunked Transfer Encoding.
class HTTPChunkedReadBuffer : public BufferWithOwnMemory<ReadBuffer>
{
public:
HTTPChunkedReadBuffer(std::unique_ptr<ReadBuffer> in_, size_t max_chunk_size) : in(std::move(in_)), max_size(max_chunk_size) {}
private:
std::unique_ptr<ReadBuffer> in;
const size_t max_size;
size_t readChunkHeader();
void readChunkFooter();
bool nextImpl() override;
};
}
#include <IO/HTTPCommon.h>
#include <Server/HTTP/HTTPServerResponse.h>
#include <Common/DNSResolver.h>
#include <Common/Exception.h>
#include <Common/PoolBase.h>
......@@ -23,7 +24,6 @@
# include <Poco/Net/SecureStreamSocket.h>
#endif
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Util/Application.h>
#include <tuple>
......@@ -266,7 +266,7 @@ namespace
};
}
void setResponseDefaultHeaders(Poco::Net::HTTPServerResponse & response, unsigned keep_alive_timeout)
void setResponseDefaultHeaders(HTTPServerResponse & response, unsigned keep_alive_timeout)
{
if (!response.getKeepAlive())
return;
......
......@@ -14,20 +14,13 @@
#include <IO/ConnectionTimeouts.h>
namespace Poco
{
namespace Net
{
class HTTPServerResponse;
}
}
namespace DB
{
constexpr int HTTP_TOO_MANY_REQUESTS = 429;
class HTTPServerResponse;
class SingleEndpointHTTPSessionPool : public PoolBase<Poco::Net::HTTPClientSession>
{
private:
......@@ -45,7 +38,7 @@ public:
using PooledHTTPSessionPtr = SingleEndpointHTTPSessionPool::Entry;
using HTTPSessionPtr = std::shared_ptr<Poco::Net::HTTPClientSession>;
void setResponseDefaultHeaders(Poco::Net::HTTPServerResponse & response, unsigned keep_alive_timeout);
void setResponseDefaultHeaders(HTTPServerResponse & response, unsigned keep_alive_timeout);
/// Create session object to perform requests and set required parameters.
HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, bool resolve_host = true);
......@@ -54,7 +47,7 @@ HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts &
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size, bool resolve_host = true);
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const Poco::URI & proxy_uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size, bool resolve_host = true);
bool isRedirect(const Poco::Net::HTTPResponse::HTTPStatus status);
bool isRedirect(Poco::Net::HTTPResponse::HTTPStatus status);
/** Used to receive response (response headers and possibly body)
* after sending data (request headers and possibly body).
......@@ -65,5 +58,5 @@ std::istream * receiveResponse(
Poco::Net::HTTPClientSession & session, const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response, bool allow_redirects);
void assertResponseIsOk(
const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response, std::istream & istr, const bool allow_redirects = false);
const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response, std::istream & istr, bool allow_redirects = false);
}
......@@ -14,10 +14,10 @@ namespace ErrorCodes
bool LimitReadBuffer::nextImpl()
{
assert(position() >= in.position());
assert(position() >= in->position());
/// Let underlying buffer calculate read bytes in `next()` call.
in.position() = position();
in->position() = position();
if (bytes >= limit)
{
......@@ -27,13 +27,13 @@ bool LimitReadBuffer::nextImpl()
return false;
}
if (!in.next())
if (!in->next())
{
working_buffer = in.buffer();
working_buffer = in->buffer();
return false;
}
working_buffer = in.buffer();
working_buffer = in->buffer();
if (limit - bytes < working_buffer.size())
working_buffer.resize(limit - bytes);
......@@ -42,14 +42,33 @@ bool LimitReadBuffer::nextImpl()
}
LimitReadBuffer::LimitReadBuffer(ReadBuffer & in_, UInt64 limit_, bool throw_exception_, std::string exception_message_)
: ReadBuffer(in_.position(), 0), in(in_), limit(limit_), throw_exception(throw_exception_), exception_message(std::move(exception_message_))
LimitReadBuffer::LimitReadBuffer(ReadBuffer * in_, bool owns, UInt64 limit_, bool throw_exception_, std::string exception_message_)
: ReadBuffer(in_ ? in_->position() : nullptr, 0)
, in(in_)
, owns_in(owns)
, limit(limit_)
, throw_exception(throw_exception_)
, exception_message(std::move(exception_message_))
{
size_t remaining_bytes_in_buffer = in.buffer().end() - in.position();
assert(in);
size_t remaining_bytes_in_buffer = in->buffer().end() - in->position();
if (remaining_bytes_in_buffer > limit)
remaining_bytes_in_buffer = limit;
working_buffer = Buffer(in.position(), in.position() + remaining_bytes_in_buffer);
working_buffer = Buffer(in->position(), in->position() + remaining_bytes_in_buffer);
}
LimitReadBuffer::LimitReadBuffer(ReadBuffer & in_, UInt64 limit_, bool throw_exception_, std::string exception_message_)
: LimitReadBuffer(&in_, false, limit_, throw_exception_, exception_message_)
{
}
LimitReadBuffer::LimitReadBuffer(std::unique_ptr<ReadBuffer> in_, UInt64 limit_, bool throw_exception_, std::string exception_message_)
: LimitReadBuffer(in_.release(), true, limit_, throw_exception_, exception_message_)
{
}
......@@ -57,7 +76,10 @@ LimitReadBuffer::~LimitReadBuffer()
{
/// Update underlying buffer's position in case when limit wasn't reached.
if (!working_buffer.empty())
in.position() = position();
in->position() = position();
if (owns_in)
delete in;
}
}
......@@ -12,17 +12,22 @@ namespace DB
*/
class LimitReadBuffer : public ReadBuffer
{
public:
LimitReadBuffer(ReadBuffer & in_, UInt64 limit_, bool throw_exception_, std::string exception_message_ = {});
LimitReadBuffer(std::unique_ptr<ReadBuffer> in_, UInt64 limit_, bool throw_exception_, std::string exception_message_ = {});
~LimitReadBuffer() override;
private:
ReadBuffer & in;
ReadBuffer * in;
bool owns_in;
UInt64 limit;
bool throw_exception;
std::string exception_message;
bool nextImpl() override;
LimitReadBuffer(ReadBuffer * in_, bool owns, UInt64 limit_, bool throw_exception_, std::string exception_message_);
public:
LimitReadBuffer(ReadBuffer & in_, UInt64 limit_, bool throw_exception_, std::string exception_message_ = {});
~LimitReadBuffer() override;
bool nextImpl() override;
};
}
#include <IO/PeekableReadBuffer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
......@@ -107,22 +109,29 @@ bool PeekableReadBuffer::peekNext()
return sub_buf.next();
}
void PeekableReadBuffer::rollbackToCheckpoint()
void PeekableReadBuffer::rollbackToCheckpoint(bool drop)
{
checkStateCorrect();
if (!checkpoint)
throw DB::Exception("There is no checkpoint", ErrorCodes::LOGICAL_ERROR);
else if (checkpointInOwnMemory() == currentlyReadFromOwnMemory())
pos = *checkpoint;
else /// Checkpoint is in own memory and pos is not. Switch to reading from own memory
BufferBase::set(memory.data(), peeked_size, *checkpoint - memory.data());
if (drop)
dropCheckpoint();
checkStateCorrect();
}
bool PeekableReadBuffer::nextImpl()
{
/// FIXME wrong bytes count because it can read the same data again after rollbackToCheckpoint()
/// However, changing bytes count on every call of next() (even after rollback) allows to determine if some pointers were invalidated.
/// FIXME: wrong bytes count because it can read the same data again after rollbackToCheckpoint()
/// however, changing bytes count on every call of next() (even after rollback) allows to determine
/// if some pointers were invalidated.
checkStateCorrect();
bool res;
......@@ -138,7 +147,7 @@ bool PeekableReadBuffer::nextImpl()
if (useSubbufferOnly())
{
/// Load next data to sub_buf
sub_buf.position() = pos;
sub_buf.position() = position();
res = sub_buf.next();
}
else
......
......@@ -58,7 +58,7 @@ public:
/// Sets position at checkpoint.
/// All pointers (such as this->buffer().end()) may be invalidated
void rollbackToCheckpoint();
void rollbackToCheckpoint(bool drop = false);
/// If checkpoint and current position are in different buffers, appends data from sub-buffer to own memory,
/// so data between checkpoint and position will be in continuous memory.
......
......@@ -134,15 +134,27 @@ public:
tryIgnore(std::numeric_limits<size_t>::max());
}
/** Reads a single byte. */
bool ALWAYS_INLINE read(char & c)
/// Peeks a single byte.
bool ALWAYS_INLINE peek(char & c)
{
if (eof())
return false;
c = *pos++;
c = *pos;
return true;
}
/// Reads a single byte.
bool ALWAYS_INLINE read(char & c)
{
if (peek(c))
{
++pos;
return true;
}
return false;
}
void ALWAYS_INLINE readStrict(char & c)
{
if (read(c))
......@@ -207,5 +219,39 @@ private:
using ReadBufferPtr = std::shared_ptr<ReadBuffer>;
/// Due to inconsistencies in ReadBuffer-family interfaces:
/// - some require to fully wrap underlying buffer and own it,
/// - some just wrap the reference without ownership,
/// we need to be able to wrap reference-only buffers with movable transparent proxy-buffer.
/// The uniqueness of such wraps is responsibility of the code author.
inline std::unique_ptr<ReadBuffer> wrapReadBufferReference(ReadBuffer & buf)
{
class ReadBufferWrapper : public ReadBuffer
{
public:
explicit ReadBufferWrapper(ReadBuffer & buf_) : ReadBuffer(buf_.position(), 0), buf(buf_)
{
working_buffer = Buffer(buf.position(), buf.buffer().end());
}
private:
ReadBuffer & buf;
bool nextImpl() override
{
buf.position() = position();
if (!buf.next())
return false;
working_buffer = buf.buffer();
return true;
}
};
return std::make_unique<ReadBufferWrapper>(buf);
}
}
......@@ -78,7 +78,7 @@ ReadBufferFromPocoSocket::ReadBufferFromPocoSocket(Poco::Net::Socket & socket_,
{
}
bool ReadBufferFromPocoSocket::poll(size_t timeout_microseconds)
bool ReadBufferFromPocoSocket::poll(size_t timeout_microseconds) const
{
return available() || socket.poll(timeout_microseconds, Poco::Net::Socket::SELECT_READ | Poco::Net::Socket::SELECT_ERROR);
}
......
#pragma once
#include <Poco/Net/Socket.h>
#include <IO/ReadBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/ReadBuffer.h>
#include <Poco/Net/Socket.h>
namespace DB
{
/** Works with the ready Poco::Net::Socket. Blocking operations.
*/
/// Works with the ready Poco::Net::Socket. Blocking operations.
class ReadBufferFromPocoSocket : public BufferWithOwnMemory<ReadBuffer>
{
protected:
......@@ -24,9 +23,9 @@ protected:
bool nextImpl() override;
public:
ReadBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
explicit ReadBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
bool poll(size_t timeout_microseconds);
bool poll(size_t timeout_microseconds) const;
void setAsyncCallback(std::function<void(Poco::Net::Socket &)> async_callback_) { async_callback = std::move(async_callback_); }
......
......@@ -1050,6 +1050,25 @@ void readAndThrowException(ReadBuffer & buf, const String & additional_message)
}
void skipToCarriageReturnOrEOF(ReadBuffer & buf)
{
while (!buf.eof())
{
char * next_pos = find_first_symbols<'\r'>(buf.position(), buf.buffer().end());
buf.position() = next_pos;
if (!buf.hasPendingData())
continue;
if (*buf.position() == '\r')
{
++buf.position();
return;
}
}
}
void skipToNextLineOrEOF(ReadBuffer & buf)
{
while (!buf.eof())
......
......@@ -536,7 +536,7 @@ void parseUUID(const UInt8 * src36, std::reverse_iterator<UInt8 *> dst16);
void parseUUIDWithoutSeparator(const UInt8 * src36, std::reverse_iterator<UInt8 *> dst16);
template <typename IteratorSrc, typename IteratorDst>
void formatHex(IteratorSrc src, IteratorDst dst, const size_t num_bytes);
void formatHex(IteratorSrc src, IteratorDst dst, size_t num_bytes);
template <typename ReturnType>
......@@ -1046,10 +1046,14 @@ void readText(std::vector<T> & x, ReadBuffer & buf)
/// Skip whitespace characters.
inline void skipWhitespaceIfAny(ReadBuffer & buf)
inline void skipWhitespaceIfAny(ReadBuffer & buf, bool one_line = false)
{
while (!buf.eof() && isWhitespaceASCII(*buf.position()))
++buf.position();
if (!one_line)
while (!buf.eof() && isWhitespaceASCII(*buf.position()))
++buf.position();
else
while (!buf.eof() && isWhitespaceASCIIOneLine(*buf.position()))
++buf.position();
}
/// Skips json value.
......@@ -1212,6 +1216,9 @@ inline void skipBOMIfExists(ReadBuffer & buf)
/// Skip to next character after next \n. If no \n in stream, skip to end.
void skipToNextLineOrEOF(ReadBuffer & buf);
/// Skip to next character after next \r. If no \r in stream, skip to end.
void skipToCarriageReturnOrEOF(ReadBuffer & buf);
/// Skip to next character after next unescaped \n. If no \n in stream, skip to end. Does not throw on invalid escape sequences.
void skipToUnescapedNextLineOrEOF(ReadBuffer & buf);
......
......@@ -26,6 +26,7 @@ SRCS(
CascadeWriteBuffer.cpp
CompressionMethod.cpp
DoubleConverter.cpp
HTTPChunkedReadBuffer.cpp
HTTPCommon.cpp
HashingWriteBuffer.cpp
HexWriteBuffer.cpp
......@@ -56,7 +57,6 @@ SRCS(
WriteBufferFromFileDescriptor.cpp
WriteBufferFromFileDescriptorDiscardOnFailure.cpp
WriteBufferFromHTTP.cpp
WriteBufferFromHTTPServerResponse.cpp
WriteBufferFromOStream.cpp
WriteBufferFromPocoSocket.cpp
WriteBufferFromTemporaryFile.cpp
......
......@@ -8,13 +8,13 @@
#include <IO/WriteHelpers.h>
#include <Common/ActionBlocker.h>
#include <common/types.h>
#include <map>
#include <atomic>
#include <utility>
#include <shared_mutex>
#include <Poco/Net/HTMLForm.h>
namespace Poco { namespace Net { class HTTPServerResponse; } }
#include <atomic>
#include <map>
#include <shared_mutex>
#include <utility>
namespace DB
{
......@@ -25,13 +25,16 @@ namespace ErrorCodes
extern const int NO_SUCH_INTERSERVER_IO_ENDPOINT;
}
class HTMLForm;
class HTTPServerResponse;
/** Query processor from other servers.
*/
class InterserverIOEndpoint
{
public:
virtual std::string getId(const std::string & path) const = 0;
virtual void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) = 0;
virtual void processQuery(const HTMLForm & params, ReadBuffer & body, WriteBuffer & out, HTTPServerResponse & response) = 0;
virtual ~InterserverIOEndpoint() = default;
/// You need to stop the data transfer if blocker is activated.
......
#include <Server/HTTP/HTMLForm.h>
#include <IO/EmptyReadBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <Server/HTTP/ReadHeaders.h>
#include <Poco/CountingStream.h>
#include <Poco/Net/MultipartReader.h>
#include <Poco/Net/MultipartWriter.h>
#include <Poco/Net/NetException.h>
#include <Poco/Net/NullPartHandler.h>
#include <Poco/NullStream.h>
#include <Poco/StreamCopier.h>
#include <Poco/UTF8String.h>
#include <sstream>
namespace DB
{
namespace
{
class NullPartHandler : public HTMLForm::PartHandler
{
public:
void handlePart(const Poco::Net::MessageHeader &, ReadBuffer &) override {}
};
}
const std::string HTMLForm::ENCODING_URL = "application/x-www-form-urlencoded";
const std::string HTMLForm::ENCODING_MULTIPART = "multipart/form-data";
const int HTMLForm::UNKNOWN_CONTENT_LENGTH = -1;
HTMLForm::HTMLForm() : field_limit(DFL_FIELD_LIMIT), value_length_limit(DFL_MAX_VALUE_LENGTH), encoding(ENCODING_URL)
{
}
HTMLForm::HTMLForm(const std::string & encoding_)
: field_limit(DFL_FIELD_LIMIT), value_length_limit(DFL_MAX_VALUE_LENGTH), encoding(encoding_)
{
}
HTMLForm::HTMLForm(const Poco::Net::HTTPRequest & request, ReadBuffer & requestBody, PartHandler & handler)
: field_limit(DFL_FIELD_LIMIT), value_length_limit(DFL_MAX_VALUE_LENGTH)
{
load(request, requestBody, handler);
}
HTMLForm::HTMLForm(const Poco::Net::HTTPRequest & request, ReadBuffer & requestBody)
: field_limit(DFL_FIELD_LIMIT), value_length_limit(DFL_MAX_VALUE_LENGTH)
{
load(request, requestBody);
}
HTMLForm::HTMLForm(const Poco::Net::HTTPRequest & request) : HTMLForm(Poco::URI(request.getURI()))
{
}
HTMLForm::HTMLForm(const Poco::URI & uri) : field_limit(DFL_FIELD_LIMIT), value_length_limit(DFL_MAX_VALUE_LENGTH)
{
ReadBufferFromString istr(uri.getRawQuery()); // STYLE_CHECK_ALLOW_STD_STRING_STREAM
readQuery(istr);
}
void HTMLForm::setEncoding(const std::string & encoding_)
{
encoding = encoding_;
}
void HTMLForm::addPart(const std::string & name, Poco::Net::PartSource * source)
{
poco_check_ptr(source);
Part part;
part.name = name;
part.source = std::unique_ptr<Poco::Net::PartSource>(source);
parts.push_back(std::move(part));
}
void HTMLForm::load(const Poco::Net::HTTPRequest & request, ReadBuffer & requestBody, PartHandler & handler)
{
clear();
Poco::URI uri(request.getURI());
const std::string & query = uri.getRawQuery();
if (!query.empty())
{
ReadBufferFromString istr(query);
readQuery(istr);
}
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST || request.getMethod() == Poco::Net::HTTPRequest::HTTP_PUT)
{
std::string media_type;
NameValueCollection params;
Poco::Net::MessageHeader::splitParameters(request.getContentType(), media_type, params);
encoding = media_type;
if (encoding == ENCODING_MULTIPART)
{
boundary = params["boundary"];
readMultipart(requestBody, handler);
}
else
{
readQuery(requestBody);
}
}
}
void HTMLForm::load(const Poco::Net::HTTPRequest & request, ReadBuffer & requestBody)
{
NullPartHandler nah;
load(request, requestBody, nah);
}
void HTMLForm::load(const Poco::Net::HTTPRequest & request)
{
NullPartHandler nah;
EmptyReadBuffer nis;
load(request, nis, nah);
}
void HTMLForm::read(ReadBuffer & in, PartHandler & handler)
{
if (encoding == ENCODING_URL)
readQuery(in);
else
readMultipart(in, handler);
}
void HTMLForm::read(ReadBuffer & in)
{
readQuery(in);
}
void HTMLForm::read(const std::string & queryString)
{
ReadBufferFromString istr(queryString);
readQuery(istr);
}
void HTMLForm::readQuery(ReadBuffer & in)
{
size_t fields = 0;
char ch = 0; // silence "uninitialized" warning from gcc-*
bool is_first = true;
while (true)
{
if (field_limit > 0 && fields == field_limit)
throw Poco::Net::HTMLFormException("Too many form fields");
std::string name;
std::string value;
while (in.read(ch) && ch != '=' && ch != '&')
{
if (ch == '+')
ch = ' ';
if (name.size() < MAX_NAME_LENGTH)
name += ch;
else
throw Poco::Net::HTMLFormException("Field name too long");
}
if (ch == '=')
{
while (in.read(ch) && ch != '&')
{
if (ch == '+')
ch = ' ';
if (value.size() < value_length_limit)
value += ch;
else
throw Poco::Net::HTMLFormException("Field value too long");
}
}
// Remove UTF-8 BOM from first name, if present
if (is_first)
Poco::UTF8::removeBOM(name);
std::string decoded_name;
std::string decoded_value;
Poco::URI::decode(name, decoded_name);
Poco::URI::decode(value, decoded_value);
add(decoded_name, decoded_value);
++fields;
is_first = false;
if (in.eof())
break;
}
}
void HTMLForm::readMultipart(ReadBuffer & in_, PartHandler & handler)
{
/// Assume there is always a boundary provided.
assert(!boundary.empty());
size_t fields = 0;
MultipartReadBuffer in(in_, boundary);
/// Assume there is at least one part
in.skipToNextBoundary();
/// Read each part until next boundary (or last boundary)
while (!in.eof())
{
if (field_limit && fields > field_limit)
throw Poco::Net::HTMLFormException("Too many form fields");
Poco::Net::MessageHeader header;
readHeaders(header, in);
skipToNextLineOrEOF(in);
NameValueCollection params;
if (header.has("Content-Disposition"))
{
std::string unused;
Poco::Net::MessageHeader::splitParameters(header.get("Content-Disposition"), unused, params);
}
if (params.has("filename"))
handler.handlePart(header, in);
else
{
std::string name = params["name"];
std::string value;
char ch;
while (in.read(ch))
{
if (value.size() > value_length_limit)
throw Poco::Net::HTMLFormException("Field value too long");
value += ch;
}
add(name, value);
}
++fields;
/// If we already encountered EOF for the buffer |in|, it's possible that the next symbol is a start of boundary line.
/// In this case reading the boundary line will reset the EOF state, potentially breaking invariant of EOF idempotency -
/// if there is such invariant in the first place.
if (!in.skipToNextBoundary())
break;
}
}
void HTMLForm::setFieldLimit(int limit)
{
poco_assert(limit >= 0);
field_limit = limit;
}
void HTMLForm::setValueLengthLimit(int limit)
{
poco_assert(limit >= 0);
value_length_limit = limit;
}
HTMLForm::MultipartReadBuffer::MultipartReadBuffer(ReadBuffer & in_, const std::string & boundary_)
: ReadBuffer(nullptr, 0), in(in_), boundary("--" + boundary_)
{
/// For consistency with |nextImpl()|
position() = in.position();
}
bool HTMLForm::MultipartReadBuffer::skipToNextBoundary()
{
assert(working_buffer.empty() || eof());
assert(boundary_hit);
boundary_hit = false;
while (!in.eof())
{
auto line = readLine();
if (startsWith(line, boundary))
{
set(in.position(), 0);
next(); /// We need to restrict our buffer to size of next available line.
return !startsWith(line, boundary + "--");
}
}
throw Poco::Net::HTMLFormException("No boundary line found");
}
std::string HTMLForm::MultipartReadBuffer::readLine(bool strict)
{
std::string line;
char ch = 0; // silence "uninitialized" warning from gcc-*
while (in.read(ch) && ch != '\r' && ch != '\n')
line += ch;
if (in.eof())
{
if (strict)
throw Poco::Net::HTMLFormException("Unexpected end of message");
return line;
}
line += ch;
if (ch == '\r')
{
if (!in.read(ch) || ch != '\n')
throw Poco::Net::HTMLFormException("No CRLF found");
else
line += ch;
}
return line;
}
bool HTMLForm::MultipartReadBuffer::nextImpl()
{
if (boundary_hit)
return false;
assert(position() >= in.position());
in.position() = position();
/// We expect to start from the first symbol after EOL, so we can put checkpoint
/// and safely try to read til the next EOL and check for boundary.
in.setCheckpoint();
/// FIXME: there is an extra copy because we cannot traverse PeekableBuffer from checkpoint to position()
/// since it may store different data parts in different sub-buffers,
/// anyway calling makeContinuousMemoryFromCheckpointToPos() will also make an extra copy.
std::string line = readLine(false);
/// According to RFC2046 the preceding CRLF is a part of boundary line.
if (line == "\r\n")
{
line = readLine(false);
boundary_hit = startsWith(line, boundary);
if (!boundary_hit) line = "\r\n";
}
else
boundary_hit = startsWith(line, boundary);
in.rollbackToCheckpoint(true);
/// Rolling back to checkpoint may change underlying buffers.
/// Limit readable data to a single line.
BufferBase::set(in.position(), line.size(), 0);
return !boundary_hit && !line.empty();
}
}
#pragma once
#include <IO/PeekableReadBuffer.h>
#include <IO/ReadHelpers.h>
#include <boost/noncopyable.hpp>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/NameValueCollection.h>
#include <Poco/Net/PartSource.h>
#include <Poco/URI.h>
namespace DB
{
class HTMLForm : public Poco::Net::NameValueCollection, private boost::noncopyable
{
public:
class PartHandler;
enum Options
{
OPT_USE_CONTENT_LENGTH = 0x01 // don't use Chunked Transfer-Encoding for multipart requests.
};
/// Creates an empty HTMLForm and sets the
/// encoding to "application/x-www-form-urlencoded".
HTMLForm();
/// Creates an empty HTMLForm that uses the given encoding.
/// Encoding must be either "application/x-www-form-urlencoded" (which is the default) or "multipart/form-data".
explicit HTMLForm(const std::string & encoding);
/// Creates a HTMLForm from the given HTTP request.
/// Uploaded files are passed to the given PartHandler.
HTMLForm(const Poco::Net::HTTPRequest & request, ReadBuffer & requestBody, PartHandler & handler);
/// Creates a HTMLForm from the given HTTP request.
/// Uploaded files are silently discarded.
HTMLForm(const Poco::Net::HTTPRequest & request, ReadBuffer & requestBody);
/// Creates a HTMLForm from the given HTTP request.
/// The request must be a GET request and the form data must be in the query string (URL encoded).
/// For POST requests, you must use one of the constructors taking an additional input stream for the request body.
explicit HTMLForm(const Poco::Net::HTTPRequest & request);
explicit HTMLForm(const Poco::URI & uri);
template <typename T>
T getParsed(const std::string & key, T default_value)
{
auto it = find(key);
return (it != end()) ? DB::parse<T>(it->second) : default_value;
}
template <typename T>
T getParsed(const std::string & key)
{
return DB::parse<T>(get(key));
}
/// Sets the encoding used for posting the form.
/// Encoding must be either "application/x-www-form-urlencoded" (which is the default) or "multipart/form-data".
void setEncoding(const std::string & encoding);
/// Returns the encoding used for posting the form.
const std::string & getEncoding() const { return encoding; }
/// Adds an part/attachment (file upload) to the form.
/// The form takes ownership of the PartSource and deletes it when it is no longer needed.
/// The part will only be sent if the encoding set for the form is "multipart/form-data"
void addPart(const std::string & name, Poco::Net::PartSource * pSource);
/// Reads the form data from the given HTTP request.
/// Uploaded files are passed to the given PartHandler.
void load(const Poco::Net::HTTPRequest & request, ReadBuffer & requestBody, PartHandler & handler);
/// Reads the form data from the given HTTP request.
/// Uploaded files are silently discarded.
void load(const Poco::Net::HTTPRequest & request, ReadBuffer & requestBody);
/// Reads the form data from the given HTTP request.
/// The request must be a GET request and the form data must be in the query string (URL encoded).
/// For POST requests, you must use one of the overloads taking an additional input stream for the request body.
void load(const Poco::Net::HTTPRequest & request);
/// Reads the form data from the given input stream.
/// The form data read from the stream must be in the encoding specified for the form.
/// Note that read() does not clear the form before reading the new values.
void read(ReadBuffer & in, PartHandler & handler);
/// Reads the URL-encoded form data from the given input stream.
/// Note that read() does not clear the form before reading the new values.
void read(ReadBuffer & in);
/// Reads the form data from the given HTTP query string.
/// Note that read() does not clear the form before reading the new values.
void read(const std::string & queryString);
/// Returns the MIME boundary used for writing multipart form data.
const std::string & getBoundary() const { return boundary; }
/// Returns the maximum number of header fields allowed.
/// See setFieldLimit() for more information.
int getFieldLimit() const { return field_limit; }
/// Sets the maximum number of header fields allowed. This limit is used to defend certain kinds of denial-of-service attacks.
/// Specify 0 for unlimited (not recommended). The default limit is 100.
void setFieldLimit(int limit);
/// Sets the maximum size for form field values stored as strings.
void setValueLengthLimit(int limit);
/// Returns the maximum size for form field values stored as strings.
int getValueLengthLimit() const { return value_length_limit; }
static const std::string ENCODING_URL; /// "application/x-www-form-urlencoded"
static const std::string ENCODING_MULTIPART; /// "multipart/form-data"
static const int UNKNOWN_CONTENT_LENGTH;
protected:
void readQuery(ReadBuffer & in);
void readMultipart(ReadBuffer & in, PartHandler & handler);
private:
/// This buffer provides data line by line to check for boundary line in a convenient way.
class MultipartReadBuffer;
enum Limits
{
DFL_FIELD_LIMIT = 100,
MAX_NAME_LENGTH = 1024,
DFL_MAX_VALUE_LENGTH = 256 * 1024
};
struct Part
{
std::string name;
std::unique_ptr<Poco::Net::PartSource> source;
};
using PartVec = std::vector<Part>;
size_t field_limit;
size_t value_length_limit;
std::string encoding;
std::string boundary;
PartVec parts;
};
class HTMLForm::PartHandler
{
public:
virtual ~PartHandler() = default;
virtual void handlePart(const Poco::Net::MessageHeader &, ReadBuffer &) = 0;
};
class HTMLForm::MultipartReadBuffer : public ReadBuffer
{
public:
MultipartReadBuffer(ReadBuffer & in, const std::string & boundary);
/// Returns false if last boundary found.
bool skipToNextBoundary();
private:
PeekableReadBuffer in;
const std::string boundary;
bool boundary_hit = true;
std::string readLine(bool strict = true);
bool nextImpl() override;
};
}
#pragma once
#include <Poco/Net/HTTPRequest.h>
namespace DB
{
using HTTPRequest = Poco::Net::HTTPRequest;
}
#pragma once
#include <Server/HTTP/HTTPServerRequest.h>
#include <Server/HTTP/HTTPServerResponse.h>
#include <boost/noncopyable.hpp>
namespace DB
{
class HTTPRequestHandler : private boost::noncopyable
{
public:
virtual ~HTTPRequestHandler() = default;
virtual void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) = 0;
};
}
#pragma once
#include <Server/HTTP/HTTPRequestHandler.h>
#include <boost/noncopyable.hpp>
namespace DB
{
class HTTPRequestHandlerFactory : private boost::noncopyable
{
public:
virtual ~HTTPRequestHandlerFactory() = default;
virtual std::unique_ptr<HTTPRequestHandler> createRequestHandler(const HTTPServerRequest & request) = 0;
};
using HTTPRequestHandlerFactoryPtr = std::shared_ptr<HTTPRequestHandlerFactory>;
}
#pragma once
#include <Poco/Net/HTTPResponse.h>
namespace DB
{
using HTTPResponse = Poco::Net::HTTPResponse;
}
#include <Server/HTTP/HTTPServer.h>
#include <Server/HTTP/HTTPServerConnectionFactory.h>
namespace DB
{
HTTPServer::HTTPServer(
const Context & context,
HTTPRequestHandlerFactoryPtr factory_,
UInt16 portNumber,
Poco::Net::HTTPServerParams::Ptr params)
: TCPServer(new HTTPServerConnectionFactory(context, params, factory_), portNumber, params), factory(factory_)
{
}
HTTPServer::HTTPServer(
const Context & context,
HTTPRequestHandlerFactoryPtr factory_,
const Poco::Net::ServerSocket & socket,
Poco::Net::HTTPServerParams::Ptr params)
: TCPServer(new HTTPServerConnectionFactory(context, params, factory_), socket, params), factory(factory_)
{
}
HTTPServer::HTTPServer(
const Context & context,
HTTPRequestHandlerFactoryPtr factory_,
Poco::ThreadPool & threadPool,
const Poco::Net::ServerSocket & socket,
Poco::Net::HTTPServerParams::Ptr params)
: TCPServer(new HTTPServerConnectionFactory(context, params, factory_), threadPool, socket, params), factory(factory_)
{
}
HTTPServer::~HTTPServer()
{
/// We should call stop and join thread here instead of destructor of parent TCPHandler,
/// because there's possible race on 'vptr' between this virtual destructor and 'run' method.
stop();
}
void HTTPServer::stopAll(bool /* abortCurrent */)
{
stop();
}
}
#pragma once
#include <Server/HTTP/HTTPRequestHandlerFactory.h>
#include <Poco/Net/HTTPServerParams.h>
#include <Poco/Net/TCPServer.h>
#include <common/types.h>
namespace DB
{
class Context;
class HTTPServer : public Poco::Net::TCPServer
{
public:
explicit HTTPServer(
const Context & context,
HTTPRequestHandlerFactoryPtr factory,
UInt16 portNumber = 80,
Poco::Net::HTTPServerParams::Ptr params = new Poco::Net::HTTPServerParams);
HTTPServer(
const Context & context,
HTTPRequestHandlerFactoryPtr factory,
const Poco::Net::ServerSocket & socket,
Poco::Net::HTTPServerParams::Ptr params);
HTTPServer(
const Context & context,
HTTPRequestHandlerFactoryPtr factory,
Poco::ThreadPool & threadPool,
const Poco::Net::ServerSocket & socket,
Poco::Net::HTTPServerParams::Ptr params);
~HTTPServer() override;
void stopAll(bool abortCurrent = false);
private:
HTTPRequestHandlerFactoryPtr factory;
};
}
#include <Server/HTTP/HTTPServerConnection.h>
#include <Poco/Net/NetException.h>
namespace DB
{
HTTPServerConnection::HTTPServerConnection(
const Context & context_,
const Poco::Net::StreamSocket & socket,
Poco::Net::HTTPServerParams::Ptr params_,
HTTPRequestHandlerFactoryPtr factory_)
: TCPServerConnection(socket), context(context_), params(params_), factory(factory_), stopped(false)
{
poco_check_ptr(factory);
}
void HTTPServerConnection::run()
{
std::string server = params->getSoftwareVersion();
Poco::Net::HTTPServerSession session(socket(), params);
while (!stopped && session.hasMoreRequests())
{
try
{
std::unique_lock<std::mutex> lock(mutex);
if (!stopped)
{
HTTPServerResponse response(session);
HTTPServerRequest request(context, response, session);
Poco::Timestamp now;
response.setDate(now);
response.setVersion(request.getVersion());
response.setKeepAlive(params->getKeepAlive() && request.getKeepAlive() && session.canKeepAlive());
if (!server.empty())
response.set("Server", server);
try
{
std::unique_ptr<HTTPRequestHandler> handler(factory->createRequestHandler(request));
if (handler)
{
if (request.getExpectContinue() && response.getStatus() == Poco::Net::HTTPResponse::HTTP_OK)
response.sendContinue();
handler->handleRequest(request, response);
session.setKeepAlive(params->getKeepAlive() && response.getKeepAlive() && session.canKeepAlive());
}
else
sendErrorResponse(session, Poco::Net::HTTPResponse::HTTP_NOT_IMPLEMENTED);
}
catch (Poco::Exception &)
{
if (!response.sent())
{
try
{
sendErrorResponse(session, Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
}
catch (...)
{
}
}
throw;
}
}
}
catch (Poco::Net::NoMessageException &)
{
break;
}
catch (Poco::Net::MessageException &)
{
sendErrorResponse(session, Poco::Net::HTTPResponse::HTTP_BAD_REQUEST);
}
catch (Poco::Exception &)
{
if (session.networkException())
{
session.networkException()->rethrow();
}
else
throw;
}
}
}
// static
void HTTPServerConnection::sendErrorResponse(Poco::Net::HTTPServerSession & session, Poco::Net::HTTPResponse::HTTPStatus status)
{
HTTPServerResponse response(session);
response.setVersion(Poco::Net::HTTPMessage::HTTP_1_1);
response.setStatusAndReason(status);
response.setKeepAlive(false);
response.send();
session.setKeepAlive(false);
}
void HTTPServerConnection::onServerStopped(const bool & abortCurrent)
{
stopped = true;
if (abortCurrent)
{
try
{
socket().shutdown();
}
catch (...)
{
}
}
else
{
std::unique_lock<std::mutex> lock(mutex);
try
{
socket().shutdown();
}
catch (...)
{
}
}
}
}
#pragma once
#include <Interpreters/Context.h>
#include <Server/HTTP/HTTPRequestHandlerFactory.h>
#include <Poco/Net/HTTPServerParams.h>
#include <Poco/Net/HTTPServerSession.h>
#include <Poco/Net/TCPServerConnection.h>
namespace DB
{
class HTTPServerConnection : public Poco::Net::TCPServerConnection
{
public:
HTTPServerConnection(
const Context & context,
const Poco::Net::StreamSocket & socket,
Poco::Net::HTTPServerParams::Ptr params,
HTTPRequestHandlerFactoryPtr factory);
void run() override;
protected:
static void sendErrorResponse(Poco::Net::HTTPServerSession & session, Poco::Net::HTTPResponse::HTTPStatus status);
void onServerStopped(const bool & abortCurrent);
private:
Context context;
Poco::Net::HTTPServerParams::Ptr params;
HTTPRequestHandlerFactoryPtr factory;
bool stopped;
std::mutex mutex; // guards the |factory| with assumption that creating handlers is not thread-safe.
};
}
#include <Server/HTTP/HTTPServerConnectionFactory.h>
#include <Server/HTTP/HTTPServerConnection.h>
namespace DB
{
HTTPServerConnectionFactory::HTTPServerConnectionFactory(
const Context & context_, Poco::Net::HTTPServerParams::Ptr params_, HTTPRequestHandlerFactoryPtr factory_)
: context(context_), params(params_), factory(factory_)
{
poco_check_ptr(factory);
}
Poco::Net::TCPServerConnection * HTTPServerConnectionFactory::createConnection(const Poco::Net::StreamSocket & socket)
{
return new HTTPServerConnection(context, socket, params, factory);
}
}
#pragma once
#include <Interpreters/Context.h>
#include <Server/HTTP/HTTPRequestHandlerFactory.h>
#include <Poco/Net/HTTPServerParams.h>
#include <Poco/Net/TCPServerConnectionFactory.h>
namespace DB
{
class HTTPServerConnectionFactory : public Poco::Net::TCPServerConnectionFactory
{
public:
HTTPServerConnectionFactory(const Context & context, Poco::Net::HTTPServerParams::Ptr params, HTTPRequestHandlerFactoryPtr factory);
Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket) override;
private:
Context context;
Poco::Net::HTTPServerParams::Ptr params;
HTTPRequestHandlerFactoryPtr factory;
};
}
#include <Server/HTTP/HTTPServerRequest.h>
#include <Interpreters/Context.h>
#include <IO/EmptyReadBuffer.h>
#include <IO/HTTPChunkedReadBuffer.h>
#include <IO/LimitReadBuffer.h>
#include <IO/ReadBufferFromPocoSocket.h>
#include <IO/ReadHelpers.h>
#include <Server/HTTP/HTTPServerResponse.h>
#include <Server/HTTP/ReadHeaders.h>
#include <Poco/Net/HTTPHeaderStream.h>
#include <Poco/Net/HTTPStream.h>
#include <Poco/Net/NetException.h>
namespace DB
{
HTTPServerRequest::HTTPServerRequest(const Context & context, HTTPServerResponse & response, Poco::Net::HTTPServerSession & session)
{
response.attachRequest(this);
/// Now that we know socket is still connected, obtain addresses
client_address = session.clientAddress();
server_address = session.serverAddress();
auto receive_timeout = context.getSettingsRef().http_receive_timeout;
auto send_timeout = context.getSettingsRef().http_send_timeout;
auto max_query_size = context.getSettingsRef().max_query_size;
session.socket().setReceiveTimeout(receive_timeout);
session.socket().setSendTimeout(send_timeout);
auto in = std::make_unique<ReadBufferFromPocoSocket>(session.socket());
socket = session.socket().impl();
readRequest(*in); /// Try parse according to RFC7230
if (getChunkedTransferEncoding())
stream = std::make_unique<HTTPChunkedReadBuffer>(std::move(in), max_query_size);
else if (hasContentLength())
stream = std::make_unique<LimitReadBuffer>(std::move(in), getContentLength(), false);
else if (getMethod() != HTTPRequest::HTTP_GET && getMethod() != HTTPRequest::HTTP_HEAD && getMethod() != HTTPRequest::HTTP_DELETE)
stream = std::move(in);
else
/// We have to distinguish empty buffer and nullptr.
stream = std::make_unique<EmptyReadBuffer>();
}
bool HTTPServerRequest::checkPeerConnected() const
{
try
{
char b;
if (!socket->receiveBytes(&b, 1, MSG_DONTWAIT | MSG_PEEK))
return false;
}
catch (Poco::TimeoutException &)
{
}
catch (...)
{
return false;
}
return true;
}
void HTTPServerRequest::readRequest(ReadBuffer & in)
{
char ch;
std::string method;
std::string uri;
std::string version;
method.reserve(16);
uri.reserve(64);
version.reserve(16);
if (in.eof())
throw Poco::Net::NoMessageException();
skipWhitespaceIfAny(in);
if (in.eof())
throw Poco::Net::MessageException("No HTTP request header");
while (in.read(ch) && !Poco::Ascii::isSpace(ch) && method.size() <= MAX_METHOD_LENGTH)
method += ch;
if (method.size() > MAX_METHOD_LENGTH)
throw Poco::Net::MessageException("HTTP request method invalid or too long");
skipWhitespaceIfAny(in);
while (in.read(ch) && !Poco::Ascii::isSpace(ch) && uri.size() <= MAX_URI_LENGTH)
uri += ch;
if (uri.size() > MAX_URI_LENGTH)
throw Poco::Net::MessageException("HTTP request URI invalid or too long");
skipWhitespaceIfAny(in);
while (in.read(ch) && !Poco::Ascii::isSpace(ch) && version.size() <= MAX_VERSION_LENGTH)
version += ch;
if (version.size() > MAX_VERSION_LENGTH)
throw Poco::Net::MessageException("Invalid HTTP version string");
// since HTTP always use Windows-style EOL '\r\n' we always can safely skip to '\n'
skipToNextLineOrEOF(in);
readHeaders(*this, in);
skipToNextLineOrEOF(in);
setMethod(method);
setURI(uri);
setVersion(version);
}
}
#pragma once
#include <IO/ReadBuffer.h>
#include <Server/HTTP/HTTPRequest.h>
#include <Poco/Net/HTTPServerSession.h>
namespace DB
{
class Context;
class HTTPServerResponse;
class ReadBufferFromPocoSocket;
class HTTPServerRequest : public HTTPRequest
{
public:
HTTPServerRequest(const Context & context, HTTPServerResponse & response, Poco::Net::HTTPServerSession & session);
/// FIXME: it's a little bit inconvenient interface. The rationale is that all other ReadBuffer's wrap each other
/// via unique_ptr - but we can't inherit HTTPServerRequest from ReadBuffer and pass it around,
/// since we also need it in other places.
/// Returns the input stream for reading the request body.
ReadBuffer & getStream()
{
poco_check_ptr(stream);
return *stream;
}
bool checkPeerConnected() const;
/// Returns the client's address.
const Poco::Net::SocketAddress & clientAddress() const { return client_address; }
/// Returns the server's address.
const Poco::Net::SocketAddress & serverAddress() const { return server_address; }
private:
/// Limits for basic sanity checks when reading a header
enum Limits
{
MAX_NAME_LENGTH = 256,
MAX_VALUE_LENGTH = 8192,
MAX_METHOD_LENGTH = 32,
MAX_URI_LENGTH = 16384,
MAX_VERSION_LENGTH = 8,
MAX_FIELDS_NUMBER = 100,
};
std::unique_ptr<ReadBuffer> stream;
Poco::Net::SocketImpl * socket;
Poco::Net::SocketAddress client_address;
Poco::Net::SocketAddress server_address;
void readRequest(ReadBuffer & in);
};
}
#include <Server/HTTP/HTTPServerResponse.h>
#include <Server/HTTP/HTTPServerRequest.h>
#include <Poco/CountingStream.h>
#include <Poco/DateTimeFormat.h>
#include <Poco/DateTimeFormatter.h>
#include <Poco/File.h>
#include <Poco/FileStream.h>
#include <Poco/Net/HTTPChunkedStream.h>
#include <Poco/Net/HTTPFixedLengthStream.h>
#include <Poco/Net/HTTPHeaderStream.h>
#include <Poco/Net/HTTPStream.h>
#include <Poco/StreamCopier.h>
namespace DB
{
HTTPServerResponse::HTTPServerResponse(Poco::Net::HTTPServerSession & session_) : session(session_)
{
}
void HTTPServerResponse::sendContinue()
{
Poco::Net::HTTPHeaderOutputStream hs(session);
hs << getVersion() << " 100 Continue\r\n\r\n";
}
std::shared_ptr<std::ostream> HTTPServerResponse::send()
{
poco_assert(!stream);
if ((request && request->getMethod() == HTTPRequest::HTTP_HEAD) || getStatus() < 200 || getStatus() == HTTPResponse::HTTP_NO_CONTENT
|| getStatus() == HTTPResponse::HTTP_NOT_MODIFIED)
{
Poco::CountingOutputStream cs;
write(cs);
stream = std::make_shared<Poco::Net::HTTPFixedLengthOutputStream>(session, cs.chars());
write(*stream);
}
else if (getChunkedTransferEncoding())
{
Poco::Net::HTTPHeaderOutputStream hs(session);
write(hs);
stream = std::make_shared<Poco::Net::HTTPChunkedOutputStream>(session);
}
else if (hasContentLength())
{
Poco::CountingOutputStream cs;
write(cs);
stream = std::make_shared<Poco::Net::HTTPFixedLengthOutputStream>(session, getContentLength64() + cs.chars());
write(*stream);
}
else
{
stream = std::make_shared<Poco::Net::HTTPOutputStream>(session);
setKeepAlive(false);
write(*stream);
}
return stream;
}
std::pair<std::shared_ptr<std::ostream>, std::shared_ptr<std::ostream>> HTTPServerResponse::beginSend()
{
poco_assert(!stream);
poco_assert(!header_stream);
/// NOTE: Code is not exception safe.
if ((request && request->getMethod() == HTTPRequest::HTTP_HEAD) || getStatus() < 200 || getStatus() == HTTPResponse::HTTP_NO_CONTENT
|| getStatus() == HTTPResponse::HTTP_NOT_MODIFIED)
{
throw Poco::Exception("HTTPServerResponse::beginSend is invalid for HEAD request");
}
else if (getChunkedTransferEncoding())
{
header_stream = std::make_shared<Poco::Net::HTTPHeaderOutputStream>(session);
beginWrite(*header_stream);
stream = std::make_shared<Poco::Net::HTTPChunkedOutputStream>(session);
}
else if (hasContentLength())
{
throw Poco::Exception("HTTPServerResponse::beginSend is invalid for response with Content-Length header");
}
else
{
stream = std::make_shared<Poco::Net::HTTPOutputStream>(session);
header_stream = stream;
setKeepAlive(false);
beginWrite(*stream);
}
return std::make_pair(header_stream, stream);
}
void HTTPServerResponse::sendFile(const std::string & path, const std::string & mediaType)
{
poco_assert(!stream);
Poco::File f(path);
Poco::Timestamp date_time = f.getLastModified();
Poco::File::FileSize length = f.getSize();
set("Last-Modified", Poco::DateTimeFormatter::format(date_time, Poco::DateTimeFormat::HTTP_FORMAT));
setContentLength64(length);
setContentType(mediaType);
setChunkedTransferEncoding(false);
Poco::FileInputStream istr(path);
if (istr.good())
{
stream = std::make_shared<Poco::Net::HTTPHeaderOutputStream>(session);
write(*stream);
if (request && request->getMethod() != HTTPRequest::HTTP_HEAD)
{
Poco::StreamCopier::copyStream(istr, *stream);
}
}
else
throw Poco::OpenFileException(path);
}
void HTTPServerResponse::sendBuffer(const void * buffer, std::size_t length)
{
poco_assert(!stream);
setContentLength(static_cast<int>(length));
setChunkedTransferEncoding(false);
stream = std::make_shared<Poco::Net::HTTPHeaderOutputStream>(session);
write(*stream);
if (request && request->getMethod() != HTTPRequest::HTTP_HEAD)
{
stream->write(static_cast<const char *>(buffer), static_cast<std::streamsize>(length));
}
}
void HTTPServerResponse::redirect(const std::string & uri, HTTPStatus status)
{
poco_assert(!stream);
setContentLength(0);
setChunkedTransferEncoding(false);
setStatusAndReason(status);
set("Location", uri);
stream = std::make_shared<Poco::Net::HTTPHeaderOutputStream>(session);
write(*stream);
}
void HTTPServerResponse::requireAuthentication(const std::string & realm)
{
poco_assert(!stream);
setStatusAndReason(HTTPResponse::HTTP_UNAUTHORIZED);
std::string auth("Basic realm=\"");
auth.append(realm);
auth.append("\"");
set("WWW-Authenticate", auth);
}
}
#pragma once
#include <Server/HTTP/HTTPResponse.h>
#include <Poco/Net/HTTPServerSession.h>
#include <Poco/Net/HTTPResponse.h>
#include <iostream>
#include <memory>
namespace DB
{
class HTTPServerRequest;
class HTTPServerResponse : public HTTPResponse
{
public:
explicit HTTPServerResponse(Poco::Net::HTTPServerSession & session);
void sendContinue(); /// Sends a 100 Continue response to the client.
/// Sends the response header to the client and
/// returns an output stream for sending the
/// response body.
///
/// Must not be called after beginSend(), sendFile(), sendBuffer()
/// or redirect() has been called.
std::shared_ptr<std::ostream> send(); /// TODO: use some WriteBuffer implementation here.
/// Sends the response headers to the client
/// but do not finish headers with \r\n,
/// allowing to continue sending additional header fields.
///
/// Must not be called after send(), sendFile(), sendBuffer()
/// or redirect() has been called.
std::pair<std::shared_ptr<std::ostream>, std::shared_ptr<std::ostream>> beginSend(); /// TODO: use some WriteBuffer implementation here.
/// Sends the response header to the client, followed
/// by the content of the given file.
///
/// Must not be called after send(), sendBuffer()
/// or redirect() has been called.
///
/// Throws a FileNotFoundException if the file
/// cannot be found, or an OpenFileException if
/// the file cannot be opened.
void sendFile(const std::string & path, const std::string & mediaType);
/// Sends the response header to the client, followed
/// by the contents of the given buffer.
///
/// The Content-Length header of the response is set
/// to length and chunked transfer encoding is disabled.
///
/// If both the HTTP message header and body (from the
/// given buffer) fit into one single network packet, the
/// complete response can be sent in one network packet.
///
/// Must not be called after send(), sendFile()
/// or redirect() has been called.
void sendBuffer(const void * pBuffer, std::size_t length); /// FIXME: do we need this one?
/// Sets the status code, which must be one of
/// HTTP_MOVED_PERMANENTLY (301), HTTP_FOUND (302),
/// or HTTP_SEE_OTHER (303),
/// and sets the "Location" header field
/// to the given URI, which according to
/// the HTTP specification, must be absolute.
///
/// Must not be called after send() has been called.
void redirect(const std::string & uri, Poco::Net::HTTPResponse::HTTPStatus status = Poco::Net::HTTPResponse::HTTP_FOUND);
void requireAuthentication(const std::string & realm);
/// Sets the status code to 401 (Unauthorized)
/// and sets the "WWW-Authenticate" header field
/// according to the given realm.
/// Returns true if the response (header) has been sent.
bool sent() const { return !!stream; }
void attachRequest(HTTPServerRequest * request_) { request = request_; }
private:
Poco::Net::HTTPServerSession & session;
HTTPServerRequest * request;
std::shared_ptr<std::ostream> stream;
std::shared_ptr<std::ostream> header_stream;
};
}
#include <Server/HTTP/ReadHeaders.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadHelpers.h>
#include <Poco/Net/NetException.h>
namespace DB
{
void readHeaders(
Poco::Net::MessageHeader & headers, ReadBuffer & in, size_t max_fields_number, size_t max_name_length, size_t max_value_length)
{
char ch = 0; // silence uninitialized warning from gcc-*
std::string name;
std::string value;
name.reserve(32);
value.reserve(64);
size_t fields = 0;
while (true)
{
if (fields > max_fields_number)
throw Poco::Net::MessageException("Too many header fields");
name.clear();
value.clear();
/// Field name
while (in.peek(ch) && ch != ':' && !Poco::Ascii::isSpace(ch) && name.size() <= max_name_length)
{
name += ch;
in.ignore();
}
if (in.eof())
throw Poco::Net::MessageException("Field is invalid");
if (name.empty())
{
if (ch == '\r')
/// Start of the empty-line delimiter
break;
if (ch == ':')
throw Poco::Net::MessageException("Field name is empty");
}
else
{
if (name.size() > max_name_length)
throw Poco::Net::MessageException("Field name is too long");
if (ch != ':')
throw Poco::Net::MessageException("Field name is invalid or no colon found");
}
in.ignore();
skipWhitespaceIfAny(in, true);
if (in.eof())
throw Poco::Net::MessageException("Field is invalid");
/// Field value - folded values not supported.
while (in.read(ch) && ch != '\r' && ch != '\n' && value.size() <= max_value_length)
value += ch;
if (in.eof())
throw Poco::Net::MessageException("Field is invalid");
if (value.empty())
throw Poco::Net::MessageException("Field value is empty");
if (ch == '\n')
throw Poco::Net::MessageException("No CRLF found");
if (value.size() > max_value_length)
throw Poco::Net::MessageException("Field value is too long");
skipToNextLineOrEOF(in);
Poco::trimRightInPlace(value);
headers.add(name, headers.decodeWord(value));
++fields;
}
}
}
#pragma once
#include <Poco/Net/MessageHeader.h>
namespace DB
{
class ReadBuffer;
void readHeaders(
Poco::Net::MessageHeader & headers,
ReadBuffer & in,
size_t max_fields_number = 100,
size_t max_name_length = 256,
size_t max_value_length = 8192);
}
#include <Poco/Version.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <IO/WriteBufferFromHTTPServerResponse.h>
#include <IO/WriteBufferFromString.h>
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
#include <IO/HTTPCommon.h>
#include <IO/Progress.h>
#include <IO/WriteBufferFromString.h>
#include <Common/Exception.h>
#include <Common/NetException.h>
#include <Common/Stopwatch.h>
......@@ -13,6 +12,8 @@
# include <Common/config.h>
#endif
#include <Poco/Version.h>
namespace DB
{
......@@ -33,16 +34,13 @@ void WriteBufferFromHTTPServerResponse::startSendHeaders()
setResponseDefaultHeaders(response, keep_alive_timeout);
#if defined(POCO_CLICKHOUSE_PATCH)
if (request.getMethod() != Poco::Net::HTTPRequest::HTTP_HEAD)
if (!is_http_method_head)
std::tie(response_header_ostr, response_body_ostr) = response.beginSend();
#endif
}
}
void WriteBufferFromHTTPServerResponse::writeHeaderSummary()
{
#if defined(POCO_CLICKHOUSE_PATCH)
if (headers_finished_sending)
return;
......@@ -51,12 +49,10 @@ void WriteBufferFromHTTPServerResponse::writeHeaderSummary()
if (response_header_ostr)
*response_header_ostr << "X-ClickHouse-Summary: " << progress_string_writer.str() << "\r\n" << std::flush;
#endif
}
void WriteBufferFromHTTPServerResponse::writeHeaderProgress()
{
#if defined(POCO_CLICKHOUSE_PATCH)
if (headers_finished_sending)
return;
......@@ -65,7 +61,6 @@ void WriteBufferFromHTTPServerResponse::writeHeaderProgress()
if (response_header_ostr)
*response_header_ostr << "X-ClickHouse-Progress: " << progress_string_writer.str() << "\r\n" << std::flush;
#endif
}
void WriteBufferFromHTTPServerResponse::finishSendHeaders()
......@@ -75,23 +70,16 @@ void WriteBufferFromHTTPServerResponse::finishSendHeaders()
writeHeaderSummary();
headers_finished_sending = true;
if (request.getMethod() != Poco::Net::HTTPRequest::HTTP_HEAD)
if (!is_http_method_head)
{
#if defined(POCO_CLICKHOUSE_PATCH)
/// Send end of headers delimiter.
if (response_header_ostr)
*response_header_ostr << "\r\n" << std::flush;
#else
/// Newline autosent by response.send()
/// if nothing to send in body:
if (!response_body_ostr)
response_body_ostr = &(response.send());
#endif
}
else
{
if (!response_body_ostr)
response_body_ostr = &(response.send());
response_body_ostr = response.send();
}
}
}
......@@ -104,23 +92,15 @@ void WriteBufferFromHTTPServerResponse::nextImpl()
startSendHeaders();
if (!out && request.getMethod() != Poco::Net::HTTPRequest::HTTP_HEAD)
if (!out && !is_http_method_head)
{
if (compress)
{
auto content_encoding_name = toContentEncodingName(compression_method);
#if defined(POCO_CLICKHOUSE_PATCH)
*response_header_ostr << "Content-Encoding: " << content_encoding_name << "\r\n";
#else
response.set("Content-Encoding", content_encoding_name);
#endif
}
#if !defined(POCO_CLICKHOUSE_PATCH)
response_body_ostr = &(response.send());
#endif
/// We reuse our buffer in "out" to avoid extra allocations and copies.
if (compress)
......@@ -150,14 +130,14 @@ void WriteBufferFromHTTPServerResponse::nextImpl()
WriteBufferFromHTTPServerResponse::WriteBufferFromHTTPServerResponse(
Poco::Net::HTTPServerRequest & request_,
Poco::Net::HTTPServerResponse & response_,
HTTPServerResponse & response_,
bool is_http_method_head_,
unsigned keep_alive_timeout_,
bool compress_,
CompressionMethod compression_method_)
: BufferWithOwnMemory<WriteBuffer>(DBMS_DEFAULT_BUFFER_SIZE)
, request(request_)
, response(response_)
, is_http_method_head(is_http_method_head_)
, keep_alive_timeout(keep_alive_timeout_)
, compress(compress_)
, compression_method(compression_method_)
......
#pragma once
#include <optional>
#include <mutex>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Version.h>
#include <IO/CompressionMethod.h>
#include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/WriteBufferFromOStream.h>
#include <IO/CompressionMethod.h>
#include <IO/HTTPCommon.h>
#include <IO/Progress.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferFromOStream.h>
#include <Server/HTTP/HTTPServerResponse.h>
#include <Common/NetException.h>
#include <Common/Stopwatch.h>
#if !defined(ARCADIA_BUILD)
# include <Common/config.h>
#endif
namespace Poco
{
namespace Net
{
class HTTPServerResponse;
}
}
#include <mutex>
#include <optional>
namespace DB
......@@ -47,20 +33,17 @@ namespace DB
class WriteBufferFromHTTPServerResponse final : public BufferWithOwnMemory<WriteBuffer>
{
private:
Poco::Net::HTTPServerRequest & request;
Poco::Net::HTTPServerResponse & response;
HTTPServerResponse & response;
bool is_http_method_head;
bool add_cors_header = false;
unsigned keep_alive_timeout = 0;
bool compress = false;
CompressionMethod compression_method;
int compression_level = 1;
std::ostream * response_body_ostr = nullptr;
#if defined(POCO_CLICKHOUSE_PATCH)
std::ostream * response_header_ostr = nullptr;
#endif
std::shared_ptr<std::ostream> response_body_ostr;
std::shared_ptr<std::ostream> response_header_ostr;
std::unique_ptr<WriteBuffer> out;
......@@ -91,8 +74,8 @@ private:
public:
WriteBufferFromHTTPServerResponse(
Poco::Net::HTTPServerRequest & request_,
Poco::Net::HTTPServerResponse & response_,
HTTPServerResponse & response_,
bool is_http_method_head_,
unsigned keep_alive_timeout_,
bool compress_ = false, /// If true - set Content-Encoding header and compress the result.
CompressionMethod compression_method_ = CompressionMethod::None);
......
#include "HTTPHandler.h"
#include <Server/HTTPHandler.h>
#include "HTTPHandlerFactory.h"
#include "HTTPHandlerRequestFilter.h"
#include <chrono>
#include <iomanip>
#include <Poco/File.h>
#include <Poco/Net/HTTPBasicCredentials.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerRequestImpl.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Net/HTTPRequestHandlerFactory.h>
#include <Poco/Net/NetException.h>
#include <ext/scope_guard.h>
#include <Core/ExternalTable.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/escapeForFileName.h>
#include <common/getFQDNOrHostName.h>
#include <Common/setThreadName.h>
#include <Common/SettingsChanges.h>
#include <Disks/IVolume.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Core/ExternalTable.h>
#include <DataStreams/IBlockInputStream.h>
#include <Disks/StoragePolicy.h>
#include <IO/CascadeWriteBuffer.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/MemoryReadWriteBuffer.h>
#include <IO/ReadBufferFromIStream.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteBufferFromHTTPServerResponse.h>
#include <IO/WriteBufferFromFile.h>
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteBufferFromTemporaryFile.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/CascadeWriteBuffer.h>
#include <IO/MemoryReadWriteBuffer.h>
#include <IO/WriteBufferFromTemporaryFile.h>
#include <DataStreams/IBlockInputStream.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/QueryParameterVisitor.h>
#include <Interpreters/Context.h>
#include <Interpreters/QueryParameterVisitor.h>
#include <Interpreters/executeQuery.h>
#include <Server/HTTPHandlerFactory.h>
#include <Server/HTTPHandlerRequestFilter.h>
#include <Server/IServer.h>
#include <Common/SettingsChanges.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/escapeForFileName.h>
#include <Common/setThreadName.h>
#include <Common/typeid_cast.h>
#include <Poco/Net/HTTPStream.h>
#include <common/getFQDNOrHostName.h>
#include <ext/scope_guard.h>
#if !defined(ARCADIA_BUILD)
# include <Common/config.h>
#endif
#include <Poco/File.h>
#include <Poco/Net/HTTPBasicCredentials.h>
#include <Poco/Net/HTTPStream.h>
#include <Poco/Net/NetException.h>
#include <chrono>
#include <iomanip>
namespace DB
{
......@@ -237,16 +235,14 @@ HTTPHandler::HTTPHandler(IServer & server_, const std::string & name)
void HTTPHandler::processQuery(
Context & context,
Poco::Net::HTTPServerRequest & request,
HTTPServerRequest & request,
HTMLForm & params,
Poco::Net::HTTPServerResponse & response,
HTTPServerResponse & response,
Output & used_output,
std::optional<CurrentThread::QueryScope> & query_scope)
{
LOG_TRACE(log, "Request URI: {}", request.getURI());
std::istream & istr = request.stream();
/// The user and password can be passed by headers (similar to X-Auth-*),
/// which is used by load balancers to pass authentication information.
std::string user = request.get("X-ClickHouse-User", "");
......@@ -291,9 +287,9 @@ void HTTPHandler::processQuery(
client_info.interface = ClientInfo::Interface::HTTP;
ClientInfo::HTTPMethod http_method = ClientInfo::HTTPMethod::UNKNOWN;
if (request.getMethod() == Poco::Net::HTTPServerRequest::HTTP_GET)
if (request.getMethod() == HTTPServerRequest::HTTP_GET)
http_method = ClientInfo::HTTPMethod::GET;
else if (request.getMethod() == Poco::Net::HTTPServerRequest::HTTP_POST)
else if (request.getMethod() == HTTPServerRequest::HTTP_POST)
http_method = ClientInfo::HTTPMethod::POST;
client_info.http_method = http_method;
......@@ -356,10 +352,8 @@ void HTTPHandler::processQuery(
}
#endif
// Set the query id supplied by the user, if any, and also update the
// OpenTelemetry fields.
context.setCurrentQueryId(params.get("query_id",
request.get("X-ClickHouse-Query-Id", "")));
// Set the query id supplied by the user, if any, and also update the OpenTelemetry fields.
context.setCurrentQueryId(params.get("query_id", request.get("X-ClickHouse-Query-Id", "")));
client_info.initial_query_id = client_info.current_query_id;
......@@ -405,7 +399,11 @@ void HTTPHandler::processQuery(
unsigned keep_alive_timeout = config.getUInt("keep_alive_timeout", 10);
used_output.out = std::make_shared<WriteBufferFromHTTPServerResponse>(
request, response, keep_alive_timeout, client_supports_http_compression, http_response_compression_method);
response,
request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD,
keep_alive_timeout,
client_supports_http_compression,
http_response_compression_method);
if (internal_compression)
used_output.out_maybe_compressed = std::make_shared<CompressedWriteBuffer>(*used_output.out);
......@@ -459,8 +457,8 @@ void HTTPHandler::processQuery(
/// Request body can be compressed using algorithm specified in the Content-Encoding header.
String http_request_compression_method_str = request.get("Content-Encoding", "");
std::unique_ptr<ReadBuffer> in_post = wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromIStream>(istr), chooseCompressionMethod({}, http_request_compression_method_str));
auto in_post = wrapReadBufferWithCompressionMethod(
wrapReadBufferReference(request.getStream()), chooseCompressionMethod({}, http_request_compression_method_str));
/// The data can also be compressed using incompatible internal algorithm. This is indicated by
/// 'decompress' query parameter.
......@@ -513,7 +511,7 @@ void HTTPHandler::processQuery(
const auto & settings = context.getSettingsRef();
/// Only readonly queries are allowed for HTTP GET requests.
if (request.getMethod() == Poco::Net::HTTPServerRequest::HTTP_GET)
if (request.getMethod() == HTTPServerRequest::HTTP_GET)
{
if (settings.readonly == 0)
context.setSetting("readonly", 2);
......@@ -608,26 +606,12 @@ void HTTPHandler::processQuery(
if (settings.readonly > 0 && settings.cancel_http_readonly_queries_on_client_close)
{
Poco::Net::StreamSocket & socket = dynamic_cast<Poco::Net::HTTPServerRequestImpl &>(request).socket();
append_callback([&context, &socket](const Progress &)
append_callback([&context, &request](const Progress &)
{
/// Assume that at the point this method is called no one is reading data from the socket any more.
/// True for read-only queries.
try
{
char b;
int status = socket.receiveBytes(&b, 1, MSG_DONTWAIT | MSG_PEEK);
if (status == 0)
context.killCurrentQuery();
}
catch (Poco::TimeoutException &)
{
}
catch (...)
{
/// Assume that at the point this method is called no one is reading data from the socket any more:
/// should be true for read-only queries.
if (!request.checkPeerConnected())
context.killCurrentQuery();
}
});
}
......@@ -656,22 +640,23 @@ void HTTPHandler::processQuery(
used_output.out->finalize();
}
void HTTPHandler::trySendExceptionToClient(const std::string & s, int exception_code,
Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response,
Output & used_output)
void HTTPHandler::trySendExceptionToClient(
const std::string & s, int exception_code, HTTPServerRequest & request, HTTPServerResponse & response, Output & used_output)
{
try
{
response.set("X-ClickHouse-Exception-Code", toString<int>(exception_code));
/// FIXME: make sure that no one else is reading from the same stream at the moment.
/// If HTTP method is POST and Keep-Alive is turned on, we should read the whole request body
/// to avoid reading part of the current request body in the next request.
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST
&& response.getKeepAlive()
&& !request.stream().eof()
&& exception_code != ErrorCodes::HTTP_LENGTH_REQUIRED)
&& exception_code != ErrorCodes::HTTP_LENGTH_REQUIRED
&& !request.getStream().eof())
{
request.stream().ignore(std::numeric_limits<std::streamsize>::max());
request.getStream().ignoreAll();
}
bool auth_fail = exception_code == ErrorCodes::UNKNOWN_USER ||
......@@ -690,7 +675,7 @@ void HTTPHandler::trySendExceptionToClient(const std::string & s, int exception_
if (!response.sent() && !used_output.out_maybe_compressed)
{
/// If nothing was sent yet and we don't even know if we must compress the response.
response.send() << s << std::endl;
*response.send() << s << std::endl;
}
else if (used_output.out_maybe_compressed)
{
......@@ -717,6 +702,11 @@ void HTTPHandler::trySendExceptionToClient(const std::string & s, int exception_
used_output.out_maybe_compressed->next();
used_output.out->finalize();
}
else
{
assert(false);
__builtin_unreachable();
}
}
catch (...)
{
......@@ -725,7 +715,7 @@ void HTTPHandler::trySendExceptionToClient(const std::string & s, int exception_
}
void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
{
setThreadName("HTTPHandler");
ThreadStatus thread_status;
......@@ -746,17 +736,18 @@ void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne
response.setContentType("text/plain; charset=UTF-8");
response.set("X-ClickHouse-Server-Display-Name", server_display_name);
/// For keep-alive to work.
if (request.getVersion() == Poco::Net::HTTPServerRequest::HTTP_1_1)
if (request.getVersion() == HTTPServerRequest::HTTP_1_1)
response.setChunkedTransferEncoding(true);
HTMLForm params(request);
with_stacktrace = params.getParsed<bool>("stacktrace", false);
/// Workaround. Poco does not detect 411 Length Required case.
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST && !request.getChunkedTransferEncoding() &&
!request.hasContentLength())
if (request.getMethod() == HTTPRequest::HTTP_POST && !request.getChunkedTransferEncoding() && !request.hasContentLength())
{
throw Exception("The Transfer-Encoding is not chunked and there is no Content-Length header for POST request", ErrorCodes::HTTP_LENGTH_REQUIRED);
throw Exception(
"The Transfer-Encoding is not chunked and there is no Content-Length header for POST request",
ErrorCodes::HTTP_LENGTH_REQUIRED);
}
processQuery(context, request, params, response, used_output, query_scope);
......@@ -800,7 +791,7 @@ bool DynamicQueryHandler::customizeQueryParam(Context & context, const std::stri
return false;
}
std::string DynamicQueryHandler::getQuery(Poco::Net::HTTPServerRequest & request, HTMLForm & params, Context & context)
std::string DynamicQueryHandler::getQuery(HTTPServerRequest & request, HTMLForm & params, Context & context)
{
if (likely(!startsWith(request.getContentType(), "multipart/form-data")))
{
......@@ -814,7 +805,7 @@ std::string DynamicQueryHandler::getQuery(Poco::Net::HTTPServerRequest & request
/// Support for "external data for query processing".
/// Used in case of POST request with form-data, but it isn't expected to be deleted after that scope.
ExternalTablesHandler handler(context, params);
params.load(request, request.stream(), handler);
params.load(request, request.getStream(), handler);
std::string full_query;
/// Params are of both form params POST and uri (GET params)
......@@ -844,7 +835,7 @@ bool PredefinedQueryHandler::customizeQueryParam(Context & context, const std::s
return false;
}
void PredefinedQueryHandler::customizeContext(Poco::Net::HTTPServerRequest & request, DB::Context & context)
void PredefinedQueryHandler::customizeContext(HTTPServerRequest & request, DB::Context & context)
{
/// If in the configuration file, the handler's header is regex and contains named capture group
/// We will extract regex named capture groups as query parameters
......@@ -880,22 +871,26 @@ void PredefinedQueryHandler::customizeContext(Poco::Net::HTTPServerRequest & req
}
}
std::string PredefinedQueryHandler::getQuery(Poco::Net::HTTPServerRequest & request, HTMLForm & params, Context & context)
std::string PredefinedQueryHandler::getQuery(HTTPServerRequest & request, HTMLForm & params, Context & context)
{
if (unlikely(startsWith(request.getContentType(), "multipart/form-data")))
{
/// Support for "external data for query processing".
ExternalTablesHandler handler(context, params);
params.load(request, request.stream(), handler);
params.load(request, request.getStream(), handler);
}
return predefined_query;
}
Poco::Net::HTTPRequestHandlerFactory * createDynamicHandlerFactory(IServer & server, const std::string & config_prefix)
HTTPRequestHandlerFactoryPtr createDynamicHandlerFactory(IServer & server, const std::string & config_prefix)
{
std::string query_param_name = server.config().getString(config_prefix + ".handler.query_param_name", "query");
return addFiltersFromConfig(new HandlingRuleHTTPHandlerFactory<DynamicQueryHandler>(server, std::move(query_param_name)), server.config(), config_prefix);
const auto & query_param_name = server.config().getString(config_prefix + ".handler.query_param_name", "query");
auto factory = std::make_shared<HandlingRuleHTTPHandlerFactory<DynamicQueryHandler>>(server, std::move(query_param_name));
factory->addFiltersFromConfig(server.config(), config_prefix);
return factory;
}
static inline bool capturingNamedQueryParam(NameSet receive_params, const CompiledRegexPtr & compiled_regex)
......@@ -913,18 +908,20 @@ static inline CompiledRegexPtr getCompiledRegex(const std::string & expression)
auto compiled_regex = std::make_shared<const re2::RE2>(expression);
if (!compiled_regex->ok())
throw Exception("Cannot compile re2: " + expression + " for http handling rule, error: " +
compiled_regex->error() + ". Look at https://github.com/google/re2/wiki/Syntax for reference.", ErrorCodes::CANNOT_COMPILE_REGEXP);
throw Exception(
"Cannot compile re2: " + expression + " for http handling rule, error: " + compiled_regex->error()
+ ". Look at https://github.com/google/re2/wiki/Syntax for reference.",
ErrorCodes::CANNOT_COMPILE_REGEXP);
return compiled_regex;
}
Poco::Net::HTTPRequestHandlerFactory * createPredefinedHandlerFactory(IServer & server, const std::string & config_prefix)
HTTPRequestHandlerFactoryPtr createPredefinedHandlerFactory(IServer & server, const std::string & config_prefix)
{
Poco::Util::AbstractConfiguration & configuration = server.config();
if (!configuration.has(config_prefix + ".handler.query"))
throw Exception("There is no path '" + config_prefix + ".handler.query" + "' in configuration file.", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
throw Exception("There is no path '" + config_prefix + ".handler.query' in configuration file.", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
std::string predefined_query = configuration.getString(config_prefix + ".handler.query");
NameSet analyze_receive_params = analyzeReceiveQueryParams(predefined_query);
......@@ -946,6 +943,8 @@ Poco::Net::HTTPRequestHandlerFactory * createPredefinedHandlerFactory(IServer &
headers_name_with_regex.emplace(std::make_pair(header_name, regex));
}
std::shared_ptr<HandlingRuleHTTPHandlerFactory<PredefinedQueryHandler>> factory;
if (configuration.has(config_prefix + ".url"))
{
auto url_expression = configuration.getString(config_prefix + ".url");
......@@ -955,14 +954,23 @@ Poco::Net::HTTPRequestHandlerFactory * createPredefinedHandlerFactory(IServer &
auto regex = getCompiledRegex(url_expression);
if (capturingNamedQueryParam(analyze_receive_params, regex))
return addFiltersFromConfig(new HandlingRuleHTTPHandlerFactory<PredefinedQueryHandler>(
server, std::move(analyze_receive_params), std::move(predefined_query), std::move(regex),
std::move(headers_name_with_regex)), configuration, config_prefix);
{
factory = std::make_shared<HandlingRuleHTTPHandlerFactory<PredefinedQueryHandler>>(
server,
std::move(analyze_receive_params),
std::move(predefined_query),
std::move(regex),
std::move(headers_name_with_regex));
factory->addFiltersFromConfig(configuration, config_prefix);
return factory;
}
}
return addFiltersFromConfig(new HandlingRuleHTTPHandlerFactory<PredefinedQueryHandler>(
server, std::move(analyze_receive_params), std::move(predefined_query), CompiledRegexPtr{} ,std::move(headers_name_with_regex)),
configuration, config_prefix);
factory = std::make_shared<HandlingRuleHTTPHandlerFactory<PredefinedQueryHandler>>(
server, std::move(analyze_receive_params), std::move(predefined_query), CompiledRegexPtr{}, std::move(headers_name_with_regex));
factory->addFiltersFromConfig(configuration, config_prefix);
return factory;
}
}
#pragma once
#include "IServer.h"
#include <Poco/Net/HTTPRequestHandler.h>
#include <Common/CurrentThread.h>
#include <Common/CurrentMetrics.h>
#include <Common/HTMLForm.h>
#include <Core/Names.h>
#include <Server/HTTP/HTMLForm.h>
#include <Server/HTTP/HTTPRequestHandler.h>
#include <Common/CurrentMetrics.h>
#include <Common/CurrentThread.h>
#include <re2/re2.h>
......@@ -21,23 +18,24 @@ namespace Poco { class Logger; }
namespace DB
{
class IServer;
class WriteBufferFromHTTPServerResponse;
using CompiledRegexPtr = std::shared_ptr<const re2::RE2>;
class HTTPHandler : public Poco::Net::HTTPRequestHandler
class HTTPHandler : public HTTPRequestHandler
{
public:
explicit HTTPHandler(IServer & server_, const std::string & name);
HTTPHandler(IServer & server_, const std::string & name);
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
/// This method is called right before the query execution.
virtual void customizeContext(Poco::Net::HTTPServerRequest & /*request*/, Context & /* context */) {}
virtual void customizeContext(HTTPServerRequest & /* request */, Context & /* context */) {}
virtual bool customizeQueryParam(Context & context, const std::string & key, const std::string & value) = 0;
virtual std::string getQuery(Poco::Net::HTTPServerRequest & request, HTMLForm & params, Context & context) = 0;
virtual std::string getQuery(HTTPServerRequest & request, HTMLForm & params, Context & context) = 0;
private:
struct Output
......@@ -74,17 +72,17 @@ private:
/// Also initializes 'used_output'.
void processQuery(
Context & context,
Poco::Net::HTTPServerRequest & request,
HTTPServerRequest & request,
HTMLForm & params,
Poco::Net::HTTPServerResponse & response,
HTTPServerResponse & response,
Output & used_output,
std::optional<CurrentThread::QueryScope> & query_scope);
void trySendExceptionToClient(
const std::string & s,
int exception_code,
Poco::Net::HTTPServerRequest & request,
Poco::Net::HTTPServerResponse & response,
HTTPServerRequest & request,
HTTPServerResponse & response,
Output & used_output);
static void pushDelayedResults(Output & used_output);
......@@ -97,7 +95,7 @@ private:
public:
explicit DynamicQueryHandler(IServer & server_, const std::string & param_name_ = "query");
std::string getQuery(Poco::Net::HTTPServerRequest & request, HTMLForm & params, Context & context) override;
std::string getQuery(HTTPServerRequest & request, HTMLForm & params, Context & context) override;
bool customizeQueryParam(Context &context, const std::string &key, const std::string &value) override;
};
......@@ -114,9 +112,9 @@ public:
IServer & server_, const NameSet & receive_params_, const std::string & predefined_query_
, const CompiledRegexPtr & url_regex_, const std::unordered_map<String, CompiledRegexPtr> & header_name_with_regex_);
virtual void customizeContext(Poco::Net::HTTPServerRequest & request, Context & context) override;
virtual void customizeContext(HTTPServerRequest & request, Context & context) override;
std::string getQuery(Poco::Net::HTTPServerRequest & request, HTMLForm & params, Context & context) override;
std::string getQuery(HTTPServerRequest & request, HTMLForm & params, Context & context) override;
bool customizeQueryParam(Context & context, const std::string & key, const std::string & value) override;
};
......
#include "HTTPHandlerFactory.h"
#include <Server/HTTPHandlerFactory.h>
#include <Server/HTTP/HTTPRequestHandler.h>
#include <Server/IServer.h>
#include <Poco/Util/LayeredConfiguration.h>
......@@ -29,7 +32,7 @@ HTTPRequestHandlerFactoryMain::HTTPRequestHandlerFactoryMain(const std::string &
{
}
Poco::Net::HTTPRequestHandler * HTTPRequestHandlerFactoryMain::createRequestHandler(const Poco::Net::HTTPServerRequest & request)
std::unique_ptr<HTTPRequestHandler> HTTPRequestHandlerFactoryMain::createRequestHandler(const HTTPServerRequest & request)
{
LOG_TRACE(log, "HTTP Request for {}. Method: {}, Address: {}, User-Agent: {}{}, Content Type: {}, Transfer Encoding: {}, X-Forwarded-For: {}",
name, request.getMethod(), request.clientAddress().toString(), request.get("User-Agent", "(none)"),
......@@ -38,8 +41,8 @@ Poco::Net::HTTPRequestHandler * HTTPRequestHandlerFactoryMain::createRequestHand
for (auto & handler_factory : child_factories)
{
auto * handler = handler_factory->createRequestHandler(request);
if (handler != nullptr)
auto handler = handler_factory->createRequestHandler(request);
if (handler)
return handler;
}
......@@ -47,31 +50,16 @@ Poco::Net::HTTPRequestHandler * HTTPRequestHandlerFactoryMain::createRequestHand
|| request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD
|| request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST)
{
return new NotFoundHandler;
return std::unique_ptr<HTTPRequestHandler>(new NotFoundHandler);
}
return nullptr;
}
HTTPRequestHandlerFactoryMain::~HTTPRequestHandlerFactoryMain()
{
while (!child_factories.empty())
{
delete child_factories.back();
child_factories.pop_back();
}
}
HTTPRequestHandlerFactoryMain::TThis * HTTPRequestHandlerFactoryMain::addHandler(Poco::Net::HTTPRequestHandlerFactory * child_factory)
{
child_factories.emplace_back(child_factory);
return this;
}
static inline auto createHandlersFactoryFromConfig(
IServer & server, const std::string & name, const String & prefix, AsynchronousMetrics & async_metrics)
{
auto main_handler_factory = std::make_unique<HTTPRequestHandlerFactoryMain>(name);
auto main_handler_factory = std::make_shared<HTTPRequestHandlerFactoryMain>(name);
Poco::Util::AbstractConfiguration::Keys keys;
server.config().keys(prefix, keys);
......@@ -109,10 +97,11 @@ static inline auto createHandlersFactoryFromConfig(
ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
return main_handler_factory.release();
return main_handler_factory;
}
static inline Poco::Net::HTTPRequestHandlerFactory * createHTTPHandlerFactory(IServer & server, const std::string & name, AsynchronousMetrics & async_metrics)
static inline HTTPRequestHandlerFactoryPtr
createHTTPHandlerFactory(IServer & server, const std::string & name, AsynchronousMetrics & async_metrics)
{
if (server.config().has("http_handlers"))
{
......@@ -120,25 +109,25 @@ static inline Poco::Net::HTTPRequestHandlerFactory * createHTTPHandlerFactory(IS
}
else
{
auto factory = std::make_unique<HTTPRequestHandlerFactoryMain>(name);
auto factory = std::make_shared<HTTPRequestHandlerFactoryMain>(name);
addDefaultHandlersFactory(*factory, server, async_metrics);
return factory.release();
return factory;
}
}
static inline Poco::Net::HTTPRequestHandlerFactory * createInterserverHTTPHandlerFactory(IServer & server, const std::string & name)
static inline HTTPRequestHandlerFactoryPtr createInterserverHTTPHandlerFactory(IServer & server, const std::string & name)
{
auto factory = std::make_unique<HTTPRequestHandlerFactoryMain>(name);
auto factory = std::make_shared<HTTPRequestHandlerFactoryMain>(name);
addCommonDefaultHandlersFactory(*factory, server);
auto main_handler = std::make_unique<HandlingRuleHTTPHandlerFactory<InterserverIOHTTPHandler>>(server);
auto main_handler = std::make_shared<HandlingRuleHTTPHandlerFactory<InterserverIOHTTPHandler>>(server);
main_handler->allowPostAndGetParamsRequest();
factory->addHandler(main_handler.release());
factory->addHandler(main_handler);
return factory.release();
return factory;
}
Poco::Net::HTTPRequestHandlerFactory * createHandlerFactory(IServer & server, AsynchronousMetrics & async_metrics, const std::string & name)
HTTPRequestHandlerFactoryPtr createHandlerFactory(IServer & server, AsynchronousMetrics & async_metrics, const std::string & name)
{
if (name == "HTTPHandler-factory" || name == "HTTPSHandler-factory")
return createHTTPHandlerFactory(server, name, async_metrics);
......@@ -146,12 +135,13 @@ Poco::Net::HTTPRequestHandlerFactory * createHandlerFactory(IServer & server, As
return createInterserverHTTPHandlerFactory(server, name);
else if (name == "PrometheusHandler-factory")
{
auto factory = std::make_unique<HTTPRequestHandlerFactoryMain>(name);
auto handler = std::make_unique<HandlingRuleHTTPHandlerFactory<PrometheusRequestHandler>>(
auto factory = std::make_shared<HTTPRequestHandlerFactoryMain>(name);
auto handler = std::make_shared<HandlingRuleHTTPHandlerFactory<PrometheusRequestHandler>>(
server, PrometheusMetricsWriter(server.config(), "prometheus", async_metrics));
handler->attachStrictPath(server.config().getString("prometheus.endpoint", "/metrics"))->allowGetAndHeadRequest();
factory->addHandler(handler.release());
return factory.release();
handler->attachStrictPath(server.config().getString("prometheus.endpoint", "/metrics"));
handler->allowGetAndHeadRequest();
factory->addHandler(handler);
return factory;
}
throw Exception("LOGICAL ERROR: Unknown HTTP handler factory name.", ErrorCodes::LOGICAL_ERROR);
......@@ -162,39 +152,44 @@ static const auto root_response_expression = "config://http_server_default_respo
void addCommonDefaultHandlersFactory(HTTPRequestHandlerFactoryMain & factory, IServer & server)
{
auto root_handler = std::make_unique<HandlingRuleHTTPHandlerFactory<StaticRequestHandler>>(server, root_response_expression);
root_handler->attachStrictPath("/")->allowGetAndHeadRequest();
factory.addHandler(root_handler.release());
auto ping_handler = std::make_unique<HandlingRuleHTTPHandlerFactory<StaticRequestHandler>>(server, ping_response_expression);
ping_handler->attachStrictPath("/ping")->allowGetAndHeadRequest();
factory.addHandler(ping_handler.release());
auto replicas_status_handler = std::make_unique<HandlingRuleHTTPHandlerFactory<ReplicasStatusHandler>>(server);
replicas_status_handler->attachNonStrictPath("/replicas_status")->allowGetAndHeadRequest();
factory.addHandler(replicas_status_handler.release());
auto web_ui_handler = std::make_unique<HandlingRuleHTTPHandlerFactory<WebUIRequestHandler>>(server, "play.html");
web_ui_handler->attachNonStrictPath("/play")->allowGetAndHeadRequest();
factory.addHandler(web_ui_handler.release());
auto root_handler = std::make_shared<HandlingRuleHTTPHandlerFactory<StaticRequestHandler>>(server, root_response_expression);
root_handler->attachStrictPath("/");
root_handler->allowGetAndHeadRequest();
factory.addHandler(root_handler);
auto ping_handler = std::make_shared<HandlingRuleHTTPHandlerFactory<StaticRequestHandler>>(server, ping_response_expression);
ping_handler->attachStrictPath("/ping");
ping_handler->allowGetAndHeadRequest();
factory.addHandler(ping_handler);
auto replicas_status_handler = std::make_shared<HandlingRuleHTTPHandlerFactory<ReplicasStatusHandler>>(server);
replicas_status_handler->attachNonStrictPath("/replicas_status");
replicas_status_handler->allowGetAndHeadRequest();
factory.addHandler(replicas_status_handler);
auto web_ui_handler = std::make_shared<HandlingRuleHTTPHandlerFactory<WebUIRequestHandler>>(server, "play.html");
web_ui_handler->attachNonStrictPath("/play");
web_ui_handler->allowGetAndHeadRequest();
factory.addHandler(web_ui_handler);
}
void addDefaultHandlersFactory(HTTPRequestHandlerFactoryMain & factory, IServer & server, AsynchronousMetrics & async_metrics)
{
addCommonDefaultHandlersFactory(factory, server);
auto query_handler = std::make_unique<HandlingRuleHTTPHandlerFactory<DynamicQueryHandler>>(server, "query");
auto query_handler = std::make_shared<HandlingRuleHTTPHandlerFactory<DynamicQueryHandler>>(server, "query");
query_handler->allowPostAndGetParamsRequest();
factory.addHandler(query_handler.release());
factory.addHandler(query_handler);
/// We check that prometheus handler will be served on current (default) port.
/// Otherwise it will be created separately, see createHandlerFactory(...).
if (server.config().has("prometheus") && server.config().getInt("prometheus.port", 0) == 0)
{
auto prometheus_handler = std::make_unique<HandlingRuleHTTPHandlerFactory<PrometheusRequestHandler>>(
auto prometheus_handler = std::make_shared<HandlingRuleHTTPHandlerFactory<PrometheusRequestHandler>>(
server, PrometheusMetricsWriter(server.config(), "prometheus", async_metrics));
prometheus_handler->attachStrictPath(server.config().getString("prometheus.endpoint", "/metrics"))->allowGetAndHeadRequest();
factory.addHandler(prometheus_handler.release());
prometheus_handler->attachStrictPath(server.config().getString("prometheus.endpoint", "/metrics"));
prometheus_handler->allowGetAndHeadRequest();
factory.addHandler(prometheus_handler);
}
}
......
#pragma once
#include "IServer.h"
#include <common/logger_useful.h>
#include <Common/HTMLForm.h>
#include <Common/StringUtils/StringUtils.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Net/HTTPRequestHandlerFactory.h>
#include <Interpreters/AsynchronousMetrics.h>
#include <Server/HTTP/HTMLForm.h>
#include <Server/HTTP/HTTPRequestHandlerFactory.h>
#include <Server/HTTPHandlerRequestFilter.h>
#include <Common/StringUtils/StringUtils.h>
#include <common/logger_useful.h>
#include <Poco/Util/LayeredConfiguration.h>
namespace DB
{
/// Handle request using child handlers
class HTTPRequestHandlerFactoryMain : public Poco::Net::HTTPRequestHandlerFactory, boost::noncopyable
namespace ErrorCodes
{
private:
using TThis = HTTPRequestHandlerFactoryMain;
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
}
Poco::Logger * log;
std::string name;
class IServer;
std::vector<Poco::Net::HTTPRequestHandlerFactory *> child_factories;
/// Handle request using child handlers
class HTTPRequestHandlerFactoryMain : public HTTPRequestHandlerFactory
{
public:
explicit HTTPRequestHandlerFactoryMain(const std::string & name_);
~HTTPRequestHandlerFactoryMain() override;
void addHandler(HTTPRequestHandlerFactoryPtr child_factory) { child_factories.emplace_back(child_factory); }
HTTPRequestHandlerFactoryMain(const std::string & name_);
std::unique_ptr<HTTPRequestHandler> createRequestHandler(const HTTPServerRequest & request) override;
TThis * addHandler(Poco::Net::HTTPRequestHandlerFactory * child_factory);
private:
Poco::Logger * log;
std::string name;
Poco::Net::HTTPRequestHandler * createRequestHandler(const Poco::Net::HTTPServerRequest & request) override;
std::vector<HTTPRequestHandlerFactoryPtr> child_factories;
};
template <typename TEndpoint>
class HandlingRuleHTTPHandlerFactory : public Poco::Net::HTTPRequestHandlerFactory
class HandlingRuleHTTPHandlerFactory : public HTTPRequestHandlerFactory
{
public:
using TThis = HandlingRuleHTTPHandlerFactory<TEndpoint>;
using Filter = std::function<bool(const Poco::Net::HTTPServerRequest &)>;
using Filter = std::function<bool(const HTTPServerRequest &)>;
template <typename... TArgs>
HandlingRuleHTTPHandlerFactory(TArgs &&... args)
explicit HandlingRuleHTTPHandlerFactory(TArgs &&... args)
{
creator = [args = std::tuple<TArgs...>(std::forward<TArgs>(args) ...)]()
{
return std::apply([&](auto && ... endpoint_args)
{
return new TEndpoint(std::forward<decltype(endpoint_args)>(endpoint_args)...);
return std::make_unique<TEndpoint>(std::forward<decltype(endpoint_args)>(endpoint_args)...);
}, std::move(args));
};
}
TThis * addFilter(Filter cur_filter)
void addFilter(Filter cur_filter)
{
Filter prev_filter = filter;
filter = [prev_filter, cur_filter](const auto & request)
{
return prev_filter ? prev_filter(request) && cur_filter(request) : cur_filter(request);
};
}
void addFiltersFromConfig(Poco::Util::AbstractConfiguration & config, const std::string & prefix)
{
Poco::Util::AbstractConfiguration::Keys filters_type;
config.keys(prefix, filters_type);
return this;
for (const auto & filter_type : filters_type)
{
if (filter_type == "handler")
continue;
else if (filter_type == "url")
addFilter(urlFilter(config, prefix + ".url"));
else if (filter_type == "headers")
addFilter(headersFilter(config, prefix + ".headers"));
else if (filter_type == "methods")
addFilter(methodsFilter(config, prefix + ".methods"));
else
throw Exception("Unknown element in config: " + prefix + "." + filter_type, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
}
TThis * attachStrictPath(const String & strict_path)
void attachStrictPath(const String & strict_path)
{
return addFilter([strict_path](const auto & request) { return request.getURI() == strict_path; });
addFilter([strict_path](const auto & request) { return request.getURI() == strict_path; });
}
TThis * attachNonStrictPath(const String & non_strict_path)
void attachNonStrictPath(const String & non_strict_path)
{
return addFilter([non_strict_path](const auto & request) { return startsWith(request.getURI(), non_strict_path); });
addFilter([non_strict_path](const auto & request) { return startsWith(request.getURI(), non_strict_path); });
}
/// Handle GET or HEAD endpoint on specified path
TThis * allowGetAndHeadRequest()
void allowGetAndHeadRequest()
{
return addFilter([](const auto & request)
addFilter([](const auto & request)
{
return request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET
|| request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD;
......@@ -84,35 +104,35 @@ public:
}
/// Handle POST or GET with params
TThis * allowPostAndGetParamsRequest()
void allowPostAndGetParamsRequest()
{
return addFilter([](const auto & request)
addFilter([](const auto & request)
{
return request.getURI().find('?') != std::string::npos
|| request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST;
});
}
Poco::Net::HTTPRequestHandler * createRequestHandler(const Poco::Net::HTTPServerRequest & request) override
std::unique_ptr<HTTPRequestHandler> createRequestHandler(const HTTPServerRequest & request) override
{
return filter(request) ? creator() : nullptr;
}
private:
Filter filter;
std::function<Poco::Net::HTTPRequestHandler * ()> creator;
std::function<std::unique_ptr<HTTPRequestHandler> ()> creator;
};
Poco::Net::HTTPRequestHandlerFactory * createStaticHandlerFactory(IServer & server, const std::string & config_prefix);
Poco::Net::HTTPRequestHandlerFactory * createDynamicHandlerFactory(IServer & server, const std::string & config_prefix);
HTTPRequestHandlerFactoryPtr createStaticHandlerFactory(IServer & server, const std::string & config_prefix);
Poco::Net::HTTPRequestHandlerFactory * createPredefinedHandlerFactory(IServer & server, const std::string & config_prefix);
HTTPRequestHandlerFactoryPtr createDynamicHandlerFactory(IServer & server, const std::string & config_prefix);
Poco::Net::HTTPRequestHandlerFactory * createReplicasStatusHandlerFactory(IServer & server, const std::string & config_prefix);
HTTPRequestHandlerFactoryPtr createPredefinedHandlerFactory(IServer & server, const std::string & config_prefix);
Poco::Net::HTTPRequestHandlerFactory * createPrometheusHandlerFactory(IServer & server, AsynchronousMetrics & async_metrics, const std::string & config_prefix);
HTTPRequestHandlerFactoryPtr createReplicasStatusHandlerFactory(IServer & server, const std::string & config_prefix);
Poco::Net::HTTPRequestHandlerFactory * createHandlerFactory(IServer & server, AsynchronousMetrics & async_metrics, const std::string & name);
HTTPRequestHandlerFactoryPtr
createPrometheusHandlerFactory(IServer & server, AsynchronousMetrics & async_metrics, const std::string & config_prefix);
HTTPRequestHandlerFactoryPtr createHandlerFactory(IServer & server, AsynchronousMetrics & async_metrics, const std::string & name);
}
#pragma once
#include "HTTPHandlerFactory.h"
#include <Server/HTTP/HTTPServerRequest.h>
#include <Common/Exception.h>
#include <Common/StringUtils/StringUtils.h>
#include <common/StringRef.h>
#include <common/find_symbols.h>
#include <re2/re2.h>
#include <re2/stringpiece.h>
#include <Poco/StringTokenizer.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <common/find_symbols.h>
#include <unordered_map>
namespace DB
{
......@@ -17,11 +19,9 @@ namespace DB
namespace ErrorCodes
{
extern const int CANNOT_COMPILE_REGEXP;
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
}
typedef std::shared_ptr<const re2::RE2> CompiledRegexPtr;
using CompiledRegexPtr = std::shared_ptr<const re2::RE2>;
static inline bool checkRegexExpression(const StringRef & match_str, const CompiledRegexPtr & compiled_regex)
{
......@@ -45,10 +45,10 @@ static inline auto methodsFilter(Poco::Util::AbstractConfiguration & config, con
std::vector<String> methods;
Poco::StringTokenizer tokenizer(config.getString(config_path), ",");
for (auto iterator = tokenizer.begin(); iterator != tokenizer.end(); ++iterator)
methods.emplace_back(Poco::toUpper(Poco::trim(*iterator)));
for (const auto & iterator : tokenizer)
methods.emplace_back(Poco::toUpper(Poco::trim(iterator)));
return [methods](const Poco::Net::HTTPServerRequest & request) { return std::count(methods.begin(), methods.end(), request.getMethod()); };
return [methods](const HTTPServerRequest & request) { return std::count(methods.begin(), methods.end(), request.getMethod()); };
}
static inline auto getExpression(const std::string & expression)
......@@ -66,7 +66,7 @@ static inline auto getExpression(const std::string & expression)
static inline auto urlFilter(Poco::Util::AbstractConfiguration & config, const std::string & config_path)
{
return [expression = getExpression(config.getString(config_path))](const Poco::Net::HTTPServerRequest & request)
return [expression = getExpression(config.getString(config_path))](const HTTPServerRequest & request)
{
const auto & uri = request.getURI();
const auto & end = find_first_symbols<'?'>(uri.data(), uri.data() + uri.size());
......@@ -88,7 +88,7 @@ static inline auto headersFilter(Poco::Util::AbstractConfiguration & config, con
headers_expression.emplace(std::make_pair(header_name, expression));
}
return [headers_expression](const Poco::Net::HTTPServerRequest & request)
return [headers_expression](const HTTPServerRequest & request)
{
for (const auto & [header_name, header_expression] : headers_expression)
{
......@@ -101,28 +101,4 @@ static inline auto headersFilter(Poco::Util::AbstractConfiguration & config, con
};
}
template <typename TEndpoint>
static inline Poco::Net::HTTPRequestHandlerFactory * addFiltersFromConfig(
HandlingRuleHTTPHandlerFactory <TEndpoint> * factory, Poco::Util::AbstractConfiguration & config, const std::string & prefix)
{
Poco::Util::AbstractConfiguration::Keys filters_type;
config.keys(prefix, filters_type);
for (const auto & filter_type : filters_type)
{
if (filter_type == "handler")
continue;
else if (filter_type == "url")
factory->addFilter(urlFilter(config, prefix + ".url"));
else if (filter_type == "headers")
factory->addFilter(headersFilter(config, prefix + ".headers"));
else if (filter_type == "methods")
factory->addFilter(methodsFilter(config, prefix + ".methods"));
else
throw Exception("Unknown element in config: " + prefix + "." + filter_type, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
return factory;
}
}
#include "InterserverIOHTTPHandler.h"
#include <Server/InterserverIOHTTPHandler.h>
#include <Server/IServer.h>
#include <Poco/Net/HTTPBasicCredentials.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <common/logger_useful.h>
#include <Common/HTMLForm.h>
#include <Common/setThreadName.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadBufferFromIStream.h>
#include <IO/WriteBufferFromHTTPServerResponse.h>
#include <Interpreters/InterserverIOHandler.h>
#include <Interpreters/Context.h>
#include "IServer.h"
#include <Interpreters/InterserverIOHandler.h>
#include <Server/HTTP/HTMLForm.h>
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
#include <Common/setThreadName.h>
#include <common/logger_useful.h>
#include <Poco/Net/HTTPBasicCredentials.h>
#include <Poco/Util/LayeredConfiguration.h>
namespace DB
{
......@@ -23,7 +23,7 @@ namespace ErrorCodes
extern const int TOO_MANY_SIMULTANEOUS_QUERIES;
}
std::pair<String, bool> InterserverIOHTTPHandler::checkAuthentication(Poco::Net::HTTPServerRequest & request) const
std::pair<String, bool> InterserverIOHTTPHandler::checkAuthentication(HTTPServerRequest & request) const
{
const auto & config = server.config();
......@@ -51,7 +51,7 @@ std::pair<String, bool> InterserverIOHTTPHandler::checkAuthentication(Poco::Net:
return {"", true};
}
void InterserverIOHTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response, Output & used_output)
void InterserverIOHTTPHandler::processQuery(HTTPServerRequest & request, HTTPServerResponse & response, Output & used_output)
{
HTMLForm params(request);
......@@ -60,7 +60,7 @@ void InterserverIOHTTPHandler::processQuery(Poco::Net::HTTPServerRequest & reque
String endpoint_name = params.get("endpoint");
bool compress = params.get("compress") == "true";
ReadBufferFromIStream body(request.stream());
auto & body = request.getStream();
auto endpoint = server.context().getInterserverIOHandler().getEndpoint(endpoint_name);
/// Locked for read while query processing
......@@ -80,18 +80,19 @@ void InterserverIOHTTPHandler::processQuery(Poco::Net::HTTPServerRequest & reque
}
void InterserverIOHTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
void InterserverIOHTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
{
setThreadName("IntersrvHandler");
/// In order to work keep-alive.
if (request.getVersion() == Poco::Net::HTTPServerRequest::HTTP_1_1)
if (request.getVersion() == HTTPServerRequest::HTTP_1_1)
response.setChunkedTransferEncoding(true);
Output used_output;
const auto & config = server.config();
unsigned keep_alive_timeout = config.getUInt("keep_alive_timeout", 10);
used_output.out = std::make_shared<WriteBufferFromHTTPServerResponse>(request, response, keep_alive_timeout);
used_output.out = std::make_shared<WriteBufferFromHTTPServerResponse>(
response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout);
try
{
......@@ -102,7 +103,7 @@ void InterserverIOHTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & requ
}
else
{
response.setStatusAndReason(Poco::Net::HTTPServerResponse::HTTP_UNAUTHORIZED);
response.setStatusAndReason(HTTPServerResponse::HTTP_UNAUTHORIZED);
if (!response.sent())
writeString(message, *used_output.out);
LOG_WARNING(log, "Query processing failed request: '{}' authentication failed", request.getURI());
......
#pragma once
#include <memory>
#include <Poco/Logger.h>
#include <Poco/Net/HTTPRequestHandler.h>
#include <Server/HTTP/HTTPRequestHandler.h>
#include <Common/CurrentMetrics.h>
#include <Poco/Logger.h>
#include <memory>
namespace CurrentMetrics
{
......@@ -17,7 +19,7 @@ namespace DB
class IServer;
class WriteBufferFromHTTPServerResponse;
class InterserverIOHTTPHandler : public Poco::Net::HTTPRequestHandler
class InterserverIOHTTPHandler : public HTTPRequestHandler
{
public:
explicit InterserverIOHTTPHandler(IServer & server_)
......@@ -26,7 +28,7 @@ public:
{
}
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
private:
struct Output
......@@ -39,9 +41,9 @@ private:
CurrentMetrics::Increment metric_increment{CurrentMetrics::InterserverConnection};
void processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response, Output & used_output);
void processQuery(HTTPServerRequest & request, HTTPServerResponse & response, Output & used_output);
std::pair<String, bool> checkAuthentication(Poco::Net::HTTPServerRequest & request) const;
std::pair<String, bool> checkAuthentication(HTTPServerRequest & request) const;
};
}
#include "NotFoundHandler.h"
#include <Server/NotFoundHandler.h>
#include <IO/HTTPCommon.h>
#include <Common/Exception.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
namespace DB
{
void NotFoundHandler::handleRequest(
Poco::Net::HTTPServerRequest & request,
Poco::Net::HTTPServerResponse & response)
void NotFoundHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
{
try
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_NOT_FOUND);
response.send() << "There is no handle " << request.getURI() << "\n\n"
<< "Use / or /ping for health checks.\n"
<< "Or /replicas_status for more sophisticated health checks.\n\n"
<< "Send queries from your program with POST method or GET /?query=...\n\n"
<< "Use clickhouse-client:\n\n"
<< "For interactive data analysis:\n"
<< " clickhouse-client\n\n"
<< "For batch query processing:\n"
<< " clickhouse-client --query='SELECT 1' > result\n"
<< " clickhouse-client < query > result\n";
*response.send() << "There is no handle " << request.getURI() << "\n\n"
<< "Use / or /ping for health checks.\n"
<< "Or /replicas_status for more sophisticated health checks.\n\n"
<< "Send queries from your program with POST method or GET /?query=...\n\n"
<< "Use clickhouse-client:\n\n"
<< "For interactive data analysis:\n"
<< " clickhouse-client\n\n"
<< "For batch query processing:\n"
<< " clickhouse-client --query='SELECT 1' > result\n"
<< " clickhouse-client < query > result\n";
}
catch (...)
{
......
#pragma once
#include <Poco/Net/HTTPRequestHandler.h>
#include <Server/HTTP/HTTPRequestHandler.h>
namespace DB
{
/// Response with 404 and verbose description.
class NotFoundHandler : public Poco::Net::HTTPRequestHandler
class NotFoundHandler : public HTTPRequestHandler
{
public:
void handleRequest(
Poco::Net::HTTPServerRequest & request,
Poco::Net::HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
};
}
#include "PrometheusRequestHandler.h"
#include <Server/PrometheusRequestHandler.h>
#include <IO/HTTPCommon.h>
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
#include <Server/HTTPHandlerFactory.h>
#include <Server/IServer.h>
#include <Common/CurrentMetrics.h>
#include <Common/Exception.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <IO/WriteBufferFromHTTPServerResponse.h>
#include <Server/HTTPHandlerRequestFilter.h>
#include <Poco/Util/LayeredConfiguration.h>
namespace DB
{
void PrometheusRequestHandler::handleRequest(
Poco::Net::HTTPServerRequest & request,
Poco::Net::HTTPServerResponse & response)
void PrometheusRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
{
try
{
......@@ -31,7 +24,7 @@ void PrometheusRequestHandler::handleRequest(
response.setContentType("text/plain; version=0.0.4; charset=UTF-8");
auto wb = WriteBufferFromHTTPServerResponse(request, response, keep_alive_timeout);
auto wb = WriteBufferFromHTTPServerResponse(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout);
metrics_writer.write(wb);
wb.finalize();
}
......@@ -41,10 +34,13 @@ void PrometheusRequestHandler::handleRequest(
}
}
Poco::Net::HTTPRequestHandlerFactory * createPrometheusHandlerFactory(IServer & server, AsynchronousMetrics & async_metrics, const std::string & config_prefix)
HTTPRequestHandlerFactoryPtr
createPrometheusHandlerFactory(IServer & server, AsynchronousMetrics & async_metrics, const std::string & config_prefix)
{
return addFiltersFromConfig(new HandlingRuleHTTPHandlerFactory<PrometheusRequestHandler>(
server, PrometheusMetricsWriter(server.config(), config_prefix + ".handler", async_metrics)), server.config(), config_prefix);
auto factory = std::make_shared<HandlingRuleHTTPHandlerFactory<PrometheusRequestHandler>>(
server, PrometheusMetricsWriter(server.config(), config_prefix + ".handler", async_metrics));
factory->addFiltersFromConfig(server.config(), config_prefix);
return factory;
}
}
#pragma once
#include "IServer.h"
#include "PrometheusMetricsWriter.h"
#include <Server/HTTP/HTTPRequestHandler.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Net/HTTPRequestHandler.h>
#include <Poco/Net/HTTPRequestHandlerFactory.h>
#include "PrometheusMetricsWriter.h"
namespace DB
{
class PrometheusRequestHandler : public Poco::Net::HTTPRequestHandler
class IServer;
class PrometheusRequestHandler : public HTTPRequestHandler
{
private:
IServer & server;
......@@ -24,9 +22,7 @@ public:
{
}
void handleRequest(
Poco::Net::HTTPServerRequest & request,
Poco::Net::HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
};
}
#include "ReplicasStatusHandler.h"
#include <Server/ReplicasStatusHandler.h>
#include <Databases/IDatabase.h>
#include <IO/HTTPCommon.h>
#include <Interpreters/Context.h>
#include <Server/HTTP/HTMLForm.h>
#include <Server/HTTPHandlerFactory.h>
#include <Server/HTTPHandlerRequestFilter.h>
#include <Server/IServer.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Common/HTMLForm.h>
#include <Common/typeid_cast.h>
#include <Databases/IDatabase.h>
#include <IO/HTTPCommon.h>
#include <Poco/Net/HTTPRequestHandlerFactory.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Server/HTTPHandlerFactory.h>
#include <Server/HTTPHandlerRequestFilter.h>
namespace DB
......@@ -24,7 +25,7 @@ ReplicasStatusHandler::ReplicasStatusHandler(IServer & server)
}
void ReplicasStatusHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
void ReplicasStatusHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
{
try
{
......@@ -82,7 +83,7 @@ void ReplicasStatusHandler::handleRequest(Poco::Net::HTTPServerRequest & request
}
if (verbose)
response.send() << message.str();
*response.send() << message.str();
else
{
const char * data = "Ok.\n";
......@@ -100,7 +101,7 @@ void ReplicasStatusHandler::handleRequest(Poco::Net::HTTPServerRequest & request
if (!response.sent())
{
/// We have not sent anything yet and we don't even know if we need to compress response.
response.send() << getCurrentExceptionMessage(false) << std::endl;
*response.send() << getCurrentExceptionMessage(false) << std::endl;
}
}
catch (...)
......@@ -110,9 +111,11 @@ void ReplicasStatusHandler::handleRequest(Poco::Net::HTTPServerRequest & request
}
}
Poco::Net::HTTPRequestHandlerFactory * createReplicasStatusHandlerFactory(IServer & server, const std::string & config_prefix)
HTTPRequestHandlerFactoryPtr createReplicasStatusHandlerFactory(IServer & server, const std::string & config_prefix)
{
return addFiltersFromConfig(new HandlingRuleHTTPHandlerFactory<ReplicasStatusHandler>(server), server.config(), config_prefix);
auto factory = std::make_shared<HandlingRuleHTTPHandlerFactory<ReplicasStatusHandler>>(server);
factory->addFiltersFromConfig(server.config(), config_prefix);
return factory;
}
}
#pragma once
#include "IServer.h"
#include <Poco/Net/HTTPRequestHandler.h>
#include <Server/HTTP/HTTPRequestHandler.h>
namespace DB
{
class Context;
class IServer;
/// Replies "Ok.\n" if all replicas on this server don't lag too much. Otherwise output lag information.
class ReplicasStatusHandler : public Poco::Net::HTTPRequestHandler
class ReplicasStatusHandler : public HTTPRequestHandler
{
private:
Context & context;
......@@ -19,7 +17,7 @@ private:
public:
explicit ReplicasStatusHandler(IServer & server_);
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
};
......
......@@ -9,7 +9,7 @@
#include <IO/WriteBufferFromString.h>
#include <IO/copyData.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromHTTPServerResponse.h>
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
#include <Interpreters/Context.h>
#include <Common/Exception.h>
......@@ -32,7 +32,8 @@ namespace ErrorCodes
extern const int INVALID_CONFIG_PARAMETER;
}
static inline WriteBufferPtr responseWriteBuffer(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response, unsigned int keep_alive_timeout)
static inline WriteBufferPtr
responseWriteBuffer(HTTPServerRequest & request, HTTPServerResponse & response, unsigned int keep_alive_timeout)
{
/// The client can pass a HTTP header indicating supported compression method (gzip or deflate).
String http_response_compression_methods = request.get("Accept-Encoding", "");
......@@ -55,12 +56,15 @@ static inline WriteBufferPtr responseWriteBuffer(Poco::Net::HTTPServerRequest &
bool client_supports_http_compression = http_response_compression_method != CompressionMethod::None;
return std::make_shared<WriteBufferFromHTTPServerResponse>(
request, response, keep_alive_timeout, client_supports_http_compression, http_response_compression_method);
response,
request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD,
keep_alive_timeout,
client_supports_http_compression,
http_response_compression_method);
}
static inline void trySendExceptionToClient(
const std::string & s, int exception_code,
Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response , WriteBuffer & out)
const std::string & s, int exception_code, HTTPServerRequest & request, HTTPServerResponse & response, WriteBuffer & out)
{
try
{
......@@ -69,13 +73,13 @@ static inline void trySendExceptionToClient(
/// If HTTP method is POST and Keep-Alive is turned on, we should read the whole request body
/// to avoid reading part of the current request body in the next request.
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST
&& response.getKeepAlive() && !request.stream().eof() && exception_code != ErrorCodes::HTTP_LENGTH_REQUIRED)
request.stream().ignore(std::numeric_limits<std::streamsize>::max());
&& response.getKeepAlive() && !request.getStream().eof() && exception_code != ErrorCodes::HTTP_LENGTH_REQUIRED)
request.getStream().ignore(std::numeric_limits<std::streamsize>::max());
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
response.send() << s << std::endl;
*response.send() << s << std::endl;
else
{
if (out.count() != out.offset())
......@@ -94,7 +98,7 @@ static inline void trySendExceptionToClient(
}
}
void StaticRequestHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
void StaticRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
{
auto keep_alive_timeout = server.config().getUInt("keep_alive_timeout", 10);
const auto & out = responseWriteBuffer(request, response, keep_alive_timeout);
......@@ -159,14 +163,17 @@ StaticRequestHandler::StaticRequestHandler(IServer & server_, const String & exp
{
}
Poco::Net::HTTPRequestHandlerFactory * createStaticHandlerFactory(IServer & server, const std::string & config_prefix)
HTTPRequestHandlerFactoryPtr createStaticHandlerFactory(IServer & server, const std::string & config_prefix)
{
int status = server.config().getInt(config_prefix + ".handler.status", 200);
std::string response_content = server.config().getRawString(config_prefix + ".handler.response_content", "Ok.\n");
std::string response_content_type = server.config().getString(config_prefix + ".handler.content_type", "text/plain; charset=UTF-8");
auto factory = std::make_shared<HandlingRuleHTTPHandlerFactory<StaticRequestHandler>>(
server, std::move(response_content), std::move(status), std::move(response_content_type));
return addFiltersFromConfig(new HandlingRuleHTTPHandlerFactory<StaticRequestHandler>(
server, std::move(response_content), std::move(status), std::move(response_content_type)), server.config(), config_prefix);
factory->addFiltersFromConfig(server.config(), config_prefix);
return factory;
}
}
#pragma once
#include <Poco/Net/HTTPRequestHandler.h>
#include <Server/HTTP/HTTPRequestHandler.h>
#include <common/types.h>
......@@ -11,7 +11,7 @@ class IServer;
class WriteBuffer;
/// Response with custom string. Can be used for browser.
class StaticRequestHandler : public Poco::Net::HTTPRequestHandler
class StaticRequestHandler : public HTTPRequestHandler
{
private:
IServer & server;
......@@ -29,7 +29,7 @@ public:
void writeResponse(WriteBuffer & out);
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
};
}
......@@ -18,18 +18,18 @@ WebUIRequestHandler::WebUIRequestHandler(IServer & server_, std::string resource
}
void WebUIRequestHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
void WebUIRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
{
auto keep_alive_timeout = server.config().getUInt("keep_alive_timeout", 10);
response.setContentType("text/html; charset=UTF-8");
if (request.getVersion() == Poco::Net::HTTPServerRequest::HTTP_1_1)
if (request.getVersion() == HTTPServerRequest::HTTP_1_1)
response.setChunkedTransferEncoding(true);
setResponseDefaultHeaders(response, keep_alive_timeout);
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_OK);
response.send() << getResource(resource_name);
*response.send() << getResource(resource_name);
}
}
#pragma once
#include <Poco/Net/HTTPRequestHandler.h>
#include <Server/HTTP/HTTPRequestHandler.h>
namespace DB
......@@ -9,14 +9,14 @@ namespace DB
class IServer;
/// Response with HTML page that allows to send queries and show results in browser.
class WebUIRequestHandler : public Poco::Net::HTTPRequestHandler
class WebUIRequestHandler : public HTTPRequestHandler
{
private:
IServer & server;
std::string resource_name;
public:
WebUIRequestHandler(IServer & server_, std::string resource_name_);
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override;
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
};
}
......
......@@ -11,6 +11,14 @@ PEERDIR(
SRCS(
GRPCServer.cpp
HTTP/HTMLForm.cpp
HTTP/HTTPServer.cpp
HTTP/HTTPServerConnection.cpp
HTTP/HTTPServerConnectionFactory.cpp
HTTP/HTTPServerRequest.cpp
HTTP/HTTPServerResponse.cpp
HTTP/ReadHeaders.cpp
HTTP/WriteBufferFromHTTPServerResponse.cpp
HTTPHandler.cpp
HTTPHandlerFactory.cpp
InterserverIOHTTPHandler.cpp
......
#include <Storages/MergeTree/DataPartsExchange.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <Disks/SingleDiskVolume.h>
#include <Disks/createVolume.h>
#include <IO/HTTPCommon.h>
#include <Server/HTTP/HTMLForm.h>
#include <Server/HTTP/HTTPServerResponse.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Disks/createVolume.h>
#include <Disks/SingleDiskVolume.h>
#include <Storages/MergeTree/ReplicatedFetchList.h>
#include <Common/CurrentMetrics.h>
#include <Common/NetException.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <IO/HTTPCommon.h>
#include <ext/scope_guard.h>
#include <Poco/File.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Net/HTTPRequest.h>
#include <Storages/MergeTree/ReplicatedFetchList.h>
namespace CurrentMetrics
......@@ -83,7 +86,7 @@ std::string Service::getId(const std::string & node_id) const
return getEndpointId(node_id);
}
void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*body*/, WriteBuffer & out, Poco::Net::HTTPServerResponse & response)
void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, WriteBuffer & out, HTTPServerResponse & response)
{
int client_protocol_version = parse<int>(params.get("client_protocol_version", "0"));
......
......@@ -20,21 +20,19 @@ namespace DataPartsExchange
class Service final : public InterserverIOEndpoint
{
public:
Service(MergeTreeData & data_)
: data(data_), log(&Poco::Logger::get(data.getLogName() + " (Replicated PartsService)")) {}
explicit Service(MergeTreeData & data_) : data(data_), log(&Poco::Logger::get(data.getLogName() + " (Replicated PartsService)")) {}
Service(const Service &) = delete;
Service & operator=(const Service &) = delete;
std::string getId(const std::string & node_id) const override;
void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) override;
void processQuery(const HTMLForm & params, ReadBuffer & body, WriteBuffer & out, HTTPServerResponse & response) override;
private:
MergeTreeData::DataPartPtr findPart(const String & name);
void sendPartFromMemory(const MergeTreeData::DataPartPtr & part, WriteBuffer & out);
void sendPartFromDisk(const MergeTreeData::DataPartPtr & part, WriteBuffer & out, int client_protocol_version);
private:
/// StorageReplicatedMergeTree::shutdown() waits for all parts exchange handlers to finish,
/// so Service will never access dangling reference to storage
MergeTreeData & data;
......@@ -43,13 +41,10 @@ private:
/** Client for getting the parts from the table *MergeTree.
*/
class Fetcher final
class Fetcher final : private boost::noncopyable
{
public:
Fetcher(MergeTreeData & data_) : data(data_), log(&Poco::Logger::get("Fetcher")) {}
Fetcher(const Fetcher &) = delete;
Fetcher & operator=(const Fetcher &) = delete;
explicit Fetcher(MergeTreeData & data_) : data(data_), log(&Poco::Logger::get("Fetcher")) {}
/// Downloads a part to tmp_directory. If to_detached - downloads to the `detached` directory.
MergeTreeData::MutableDataPartPtr fetchPart(
......@@ -75,7 +70,7 @@ private:
bool to_detached,
const String & tmp_prefix_,
bool sync,
const ReservationPtr reservation,
ReservationPtr reservation,
PooledReadWriteBufferFromHTTP & in);
MergeTreeData::MutableDataPartPtr downloadPartToMemory(
......
......@@ -33,7 +33,7 @@ SKIP_LIST = [
"01057_http_compression_prefer_brotli",
"01080_check_for_error_incorrect_size_of_nested_column",
"01083_expressions_in_engine_arguments",
"01086_odbc_roundtrip",
# "01086_odbc_roundtrip",
"01088_benchmark_query_id",
"01098_temporary_and_external_tables",
"01099_parallel_distributed_insert_select",
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册