HTTPCommon.cpp 8.1 KB
Newer Older
1
#include <IO/HTTPCommon.h>
2

A
Alexey Milovidov 已提交
3
#include <Common/config.h>
4 5
#include <Common/DNSResolver.h>
#include <Common/Exception.h>
A
Alexey Milovidov 已提交
6 7 8 9 10
#include <Common/PoolBase.h>
#include <Common/ProfileEvents.h>
#include <Common/SipHash.h>

#include <Poco/Version.h>
A
alexey-milovidov 已提交
11

12
#if USE_POCO_NETSSL
13 14
#include <Poco/Net/AcceptCertificateHandler.h>
#include <Poco/Net/Context.h>
15
#include <Poco/Net/HTTPSClientSession.h>
16 17 18 19
#include <Poco/Net/InvalidCertificateHandler.h>
#include <Poco/Net/PrivateKeyPassphraseHandler.h>
#include <Poco/Net/RejectCertificateHandler.h>
#include <Poco/Net/SSLManager.h>
20
#endif
A
alexey-milovidov 已提交
21

22
#include <Poco/Net/HTTPServerResponse.h>
23
#include <Poco/Util/Application.h>
24

A
Alexey Milovidov 已提交
25 26
#include <tuple>
#include <unordered_map>
A
alesapin 已提交
27
#include <sstream>
A
alexey-milovidov 已提交
28 29


30 31 32 33
namespace ProfileEvents
{
    extern const Event CreatedHTTPConnections;
}
34 35 36

namespace DB
{
37 38 39 40
namespace ErrorCodes
{
    extern const int RECEIVED_ERROR_FROM_REMOTE_IO_SERVER;
    extern const int RECEIVED_ERROR_TOO_MANY_REQUESTS;
A
Alexey Milovidov 已提交
41
    extern const int FEATURE_IS_NOT_ENABLED_AT_BUILD_TIME;
42
    extern const int UNSUPPORTED_URI_SCHEME;
L
l 已提交
43
    extern const int TOO_MANY_REDIRECTS;
44 45
}

A
Alexey Milovidov 已提交
46

47 48
namespace
{
A
alesapin 已提交
49
void setTimeouts(Poco::Net::HTTPClientSession & session, const ConnectionTimeouts & timeouts)
50
    {
P
proller 已提交
51
#if defined(POCO_CLICKHOUSE_PATCH) || POCO_VERSION >= 0x02000000
52 53 54 55
        session.setTimeout(timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout);
#else
        session.setTimeout(std::max({timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout}));
#endif
A
alesapin 已提交
56
        session.setKeepAliveTimeout(timeouts.http_keep_alive_timeout);
57 58
    }

59 60 61 62 63 64 65 66 67 68
    bool isHTTPS(const Poco::URI & uri)
    {
        if (uri.getScheme() == "https")
            return true;
        else if (uri.getScheme() == "http")
            return false;
        else
            throw Exception("Unsupported scheme in URI '" + uri.toString() + "'", ErrorCodes::UNSUPPORTED_URI_SCHEME);
    }

69
    HTTPSessionPtr makeHTTPSessionImpl(const std::string & host, UInt16 port, bool https, bool keep_alive)
70 71 72 73 74
    {
        HTTPSessionPtr session;

        if (https)
#if USE_POCO_NETSSL
75
            session = std::make_shared<Poco::Net::HTTPSClientSession>();
76 77 78 79
#else
            throw Exception("ClickHouse was built without HTTPS support", ErrorCodes::FEATURE_IS_NOT_ENABLED_AT_BUILD_TIME);
#endif
        else
80
            session = std::make_shared<Poco::Net::HTTPClientSession>();
81 82 83 84 85

        ProfileEvents::increment(ProfileEvents::CreatedHTTPConnections);

        session->setHost(DNSResolver::instance().resolveHost(host).toString());
        session->setPort(port);
86 87

        /// doesn't work properly without patch
P
proller 已提交
88
#if defined(POCO_CLICKHOUSE_PATCH)
89
        session->setKeepAlive(keep_alive);
90 91
#else
        (void)keep_alive; // Avoid warning: unused parameter
92
#endif
93 94 95 96 97 98 99 100 101 102 103 104 105

        return session;
    }

    class SingleEndpointHTTPSessionPool : public PoolBase<Poco::Net::HTTPClientSession>
    {
    private:
        const std::string host;
        const UInt16 port;
        bool https;
        using Base = PoolBase<Poco::Net::HTTPClientSession>;
        ObjectPtr allocObject() override
        {
106
            return makeHTTPSessionImpl(host, port, https, true);
107 108 109 110
        }

    public:
        SingleEndpointHTTPSessionPool(const std::string & host_, UInt16 port_, bool https_, size_t max_pool_size_)
A
alexey-milovidov 已提交
111
            : Base(max_pool_size_, &Poco::Logger::get("HTTPSessionPool")), host(host_), port(port_), https(https_)
112 113 114 115
        {
        }
    };

116
    class HTTPSessionPool : private boost::noncopyable
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
    {
    private:
        using Key = std::tuple<std::string, UInt16, bool>;
        using PoolPtr = std::shared_ptr<SingleEndpointHTTPSessionPool>;
        using Entry = SingleEndpointHTTPSessionPool::Entry;

        struct Hasher
        {
            size_t operator()(const Key & k) const
            {
                SipHash s;
                s.update(std::get<0>(k));
                s.update(std::get<1>(k));
                s.update(std::get<2>(k));
                return s.get64();
            }
        };

        std::mutex mutex;
        std::unordered_map<Key, PoolPtr, Hasher> endpoints_pool;

    protected:
        HTTPSessionPool() = default;

    public:
142 143 144 145 146 147
        static auto & instance()
        {
            static HTTPSessionPool instance;
            return instance;
        }

A
alesapin 已提交
148 149 150 151
        Entry getSession(
            const Poco::URI & uri,
            const ConnectionTimeouts & timeouts,
            size_t max_connections_per_endpoint)
152
        {
A
Alexey Milovidov 已提交
153
            std::unique_lock lock(mutex);
154 155
            const std::string & host = uri.getHost();
            UInt16 port = uri.getPort();
156
            bool https = isHTTPS(uri);
157 158 159 160 161 162
            auto key = std::make_tuple(host, port, https);
            auto pool_ptr = endpoints_pool.find(key);
            if (pool_ptr == endpoints_pool.end())
                std::tie(pool_ptr, std::ignore) = endpoints_pool.emplace(
                    key, std::make_shared<SingleEndpointHTTPSessionPool>(host, port, https, max_connections_per_endpoint));

163 164 165
            auto retry_timeout = timeouts.connection_timeout.totalMicroseconds();
            auto session = pool_ptr->second->get(retry_timeout);

166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
            /// We store exception messages in session data.
            /// Poco HTTPSession also stores exception, but it can be removed at any time.
            const auto & sessionData = session->sessionData();
            if (!sessionData.empty())
            {
                auto msg = Poco::AnyCast<std::string>(sessionData);
                if (!msg.empty())
                {
                    LOG_TRACE((&Logger::get("HTTPCommon")), "Failed communicating with " << host << " with error '" << msg << "' will try to reconnect session");
                    /// Host can change IP
                    const auto ip = DNSResolver::instance().resolveHost(host).toString();
                    if (ip != session->getHost())
                    {
                        session->reset();
                        session->setHost(ip);
                        session->attachSessionData({});
                    }
                }
            }

186
            setTimeouts(*session, timeouts);
A
alesapin 已提交
187

188 189 190 191 192
            return session;
        }
    };
}

193
void setResponseDefaultHeaders(Poco::Net::HTTPServerResponse & response, unsigned keep_alive_timeout)
A
Alexey Milovidov 已提交
194
{
195 196
    if (!response.getKeepAlive())
        return;
197

198 199 200
    Poco::Timespan timeout(keep_alive_timeout, 0);
    if (timeout.totalSeconds())
        response.set("Keep-Alive", "timeout=" + std::to_string(timeout.totalSeconds()));
201 202
}

203
HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts)
204
{
205 206
    const std::string & host = uri.getHost();
    UInt16 port = uri.getPort();
207
    bool https = isHTTPS(uri);
208

209
    auto session = makeHTTPSessionImpl(host, port, https, false);
210 211 212
    setTimeouts(*session, timeouts);
    return session;
}
213 214


215 216 217
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size)
{
    return HTTPSessionPool::instance().getSession(uri, timeouts, per_endpoint_pool_size);
218 219
}

R
root 已提交
220
bool isRedirect(const Poco::Net::HTTPResponse::HTTPStatus status) { return status == Poco::Net::HTTPResponse::HTTP_MOVED_PERMANENTLY  || status == Poco::Net::HTTPResponse::HTTP_FOUND || status == Poco::Net::HTTPResponse::HTTP_SEE_OTHER  || status == Poco::Net::HTTPResponse::HTTP_TEMPORARY_REDIRECT; }
221

A
Alexey Milovidov 已提交
222
std::istream * receiveResponse(
R
root 已提交
223
    Poco::Net::HTTPClientSession & session, const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response, const bool allow_redirects)
224 225 226 227
{
    auto istr = &session.receiveResponse(response);
    auto status = response.getStatus();

R
root 已提交
228
    if (!(status == Poco::Net::HTTPResponse::HTTP_OK || (isRedirect(status) && allow_redirects)))
229
    {
230 231 232 233 234 235 236 237 238 239
        std::stringstream error_message;
        error_message << "Received error from remote server " << request.getURI() << ". HTTP status code: " << status << " "
                      << response.getReason() << ", body: " << istr->rdbuf();

        throw Exception(error_message.str(),
            status == HTTP_TOO_MANY_REQUESTS ? ErrorCodes::RECEIVED_ERROR_TOO_MANY_REQUESTS
                                             : ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER);
    }
    return istr;
}
A
Alexey Milovidov 已提交
240

241
}