HTTPCommon.cpp 6.6 KB
Newer Older
1
#include <IO/HTTPCommon.h>
2

3
#include <Poco/Version.h>
4 5
#include <Common/DNSResolver.h>
#include <Common/Exception.h>
6
#include <Common/config.h>
A
alexey-milovidov 已提交
7

8
#if USE_POCO_NETSSL
9 10
#include <Poco/Net/AcceptCertificateHandler.h>
#include <Poco/Net/Context.h>
11
#include <Poco/Net/HTTPSClientSession.h>
12 13 14 15
#include <Poco/Net/InvalidCertificateHandler.h>
#include <Poco/Net/PrivateKeyPassphraseHandler.h>
#include <Poco/Net/RejectCertificateHandler.h>
#include <Poco/Net/SSLManager.h>
16
#endif
A
alexey-milovidov 已提交
17

18 19
#include <tuple>
#include <unordered_map>
20
#include <Poco/Net/HTTPServerResponse.h>
21
#include <Poco/Util/Application.h>
22 23 24
#include <Common/PoolBase.h>
#include <Common/ProfileEvents.h>
#include <Common/SipHash.h>
25

A
alesapin 已提交
26
#include <sstream>
A
alexey-milovidov 已提交
27 28


29 30 31 32
namespace ProfileEvents
{
    extern const Event CreatedHTTPConnections;
}
33 34 35

namespace DB
{
36 37 38 39
namespace ErrorCodes
{
    extern const int RECEIVED_ERROR_FROM_REMOTE_IO_SERVER;
    extern const int RECEIVED_ERROR_TOO_MANY_REQUESTS;
A
Alexey Milovidov 已提交
40
    extern const int FEATURE_IS_NOT_ENABLED_AT_BUILD_TIME;
41
    extern const int UNSUPPORTED_URI_SCHEME;
42 43
}

A
Alexey Milovidov 已提交
44

45 46 47 48 49 50 51 52 53 54 55
namespace
{
    void setTimeouts(Poco::Net::HTTPClientSession & session, const ConnectionTimeouts & timeouts)
    {
#if POCO_CLICKHOUSE_PATCH || POCO_VERSION >= 0x02000000
        session.setTimeout(timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout);
#else
        session.setTimeout(std::max({timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout}));
#endif
    }

56 57 58 59 60 61 62 63 64 65
    bool isHTTPS(const Poco::URI & uri)
    {
        if (uri.getScheme() == "https")
            return true;
        else if (uri.getScheme() == "http")
            return false;
        else
            throw Exception("Unsupported scheme in URI '" + uri.toString() + "'", ErrorCodes::UNSUPPORTED_URI_SCHEME);
    }

66
    HTTPSessionPtr makeHTTPSessionImpl(const std::string & host, UInt16 port, bool https, bool keep_alive)
67 68 69 70 71
    {
        HTTPSessionPtr session;

        if (https)
#if USE_POCO_NETSSL
72
            session = std::make_shared<Poco::Net::HTTPSClientSession>();
73 74 75 76
#else
            throw Exception("ClickHouse was built without HTTPS support", ErrorCodes::FEATURE_IS_NOT_ENABLED_AT_BUILD_TIME);
#endif
        else
77
            session = std::make_shared<Poco::Net::HTTPClientSession>();
78 79 80 81 82

        ProfileEvents::increment(ProfileEvents::CreatedHTTPConnections);

        session->setHost(DNSResolver::instance().resolveHost(host).toString());
        session->setPort(port);
83 84 85

        /// doesn't work properly without patch
#if POCO_CLICKHOUSE_PATCH
86
        session->setKeepAlive(keep_alive);
87 88
#else
        (void)keep_alive; // Avoid warning: unused parameter
89
#endif
90 91 92 93 94 95 96 97 98 99 100 101 102 103

        return session;
    }

    class SingleEndpointHTTPSessionPool : public PoolBase<Poco::Net::HTTPClientSession>
    {
    private:
        const std::string host;
        const UInt16 port;
        bool https;
        using Base = PoolBase<Poco::Net::HTTPClientSession>;

        ObjectPtr allocObject() override
        {
104
            return makeHTTPSessionImpl(host, port, https, true);
105 106 107 108
        }

    public:
        SingleEndpointHTTPSessionPool(const std::string & host_, UInt16 port_, bool https_, size_t max_pool_size_)
A
alexey-milovidov 已提交
109
            : Base(max_pool_size_, &Poco::Logger::get("HTTPSessionPool")), host(host_), port(port_), https(https_)
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
        {
        }
    };

    class HTTPSessionPool : public ext::singleton<HTTPSessionPool>
    {
    private:
        using Key = std::tuple<std::string, UInt16, bool>;
        using PoolPtr = std::shared_ptr<SingleEndpointHTTPSessionPool>;
        using Entry = SingleEndpointHTTPSessionPool::Entry;

        friend class ext::singleton<HTTPSessionPool>;

        struct Hasher
        {
            size_t operator()(const Key & k) const
            {
                SipHash s;
                s.update(std::get<0>(k));
                s.update(std::get<1>(k));
                s.update(std::get<2>(k));
                return s.get64();
            }
        };

        std::mutex mutex;
        std::unordered_map<Key, PoolPtr, Hasher> endpoints_pool;

    protected:
        HTTPSessionPool() = default;

    public:
        Entry getSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t max_connections_per_endpoint)
        {
A
Alexey Milovidov 已提交
144
            std::unique_lock lock(mutex);
145 146
            const std::string & host = uri.getHost();
            UInt16 port = uri.getPort();
147
            bool https = isHTTPS(uri);
148 149 150 151 152 153
            auto key = std::make_tuple(host, port, https);
            auto pool_ptr = endpoints_pool.find(key);
            if (pool_ptr == endpoints_pool.end())
                std::tie(pool_ptr, std::ignore) = endpoints_pool.emplace(
                    key, std::make_shared<SingleEndpointHTTPSessionPool>(host, port, https, max_connections_per_endpoint));

154 155 156
            auto retry_timeout = timeouts.connection_timeout.totalMicroseconds();
            auto session = pool_ptr->second->get(retry_timeout);

157 158 159 160 161 162
            setTimeouts(*session, timeouts);
            return session;
        }
    };
}

163
void setResponseDefaultHeaders(Poco::Net::HTTPServerResponse & response, unsigned keep_alive_timeout)
A
Alexey Milovidov 已提交
164
{
165 166
    if (!response.getKeepAlive())
        return;
167

168 169 170
    Poco::Timespan timeout(keep_alive_timeout, 0);
    if (timeout.totalSeconds())
        response.set("Keep-Alive", "timeout=" + std::to_string(timeout.totalSeconds()));
171 172
}

173
HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts)
174
{
175 176
    const std::string & host = uri.getHost();
    UInt16 port = uri.getPort();
177
    bool https = isHTTPS(uri);
178

179
    auto session = makeHTTPSessionImpl(host, port, https, false);
180 181 182
    setTimeouts(*session, timeouts);
    return session;
}
183 184


185 186 187
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size)
{
    return HTTPSessionPool::instance().getSession(uri, timeouts, per_endpoint_pool_size);
188 189 190
}


A
Alexey Milovidov 已提交
191
std::istream * receiveResponse(
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
    Poco::Net::HTTPClientSession & session, const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response)
{
    auto istr = &session.receiveResponse(response);
    auto status = response.getStatus();

    if (status != Poco::Net::HTTPResponse::HTTP_OK)
    {
        std::stringstream error_message;
        error_message << "Received error from remote server " << request.getURI() << ". HTTP status code: " << status << " "
                      << response.getReason() << ", body: " << istr->rdbuf();

        throw Exception(error_message.str(),
            status == HTTP_TOO_MANY_REQUESTS ? ErrorCodes::RECEIVED_ERROR_TOO_MANY_REQUESTS
                                             : ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER);
    }
    return istr;
}
A
Alexey Milovidov 已提交
209

210
}