WriteBufferFromHTTPServerResponse.cpp 4.8 KB
Newer Older
1 2
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>

3
#include <IO/HTTPCommon.h>
A
Alexey Milovidov 已提交
4
#include <IO/Progress.h>
5
#include <IO/WriteBufferFromString.h>
A
Alexey Milovidov 已提交
6
#include <Common/Exception.h>
7 8
#include <Common/NetException.h>
#include <Common/Stopwatch.h>
9
#include <Common/MemoryTracker.h>
10 11 12 13

#if !defined(ARCADIA_BUILD)
#    include <Common/config.h>
#endif
A
Alexey Milovidov 已提交
14

15 16
#include <Poco/Version.h>

17 18 19 20 21 22 23 24 25 26 27

namespace DB
{

namespace ErrorCodes
{
}


void WriteBufferFromHTTPServerResponse::startSendHeaders()
{
28 29 30
    if (!headers_started_sending)
    {
        headers_started_sending = true;
31

32 33
        if (add_cors_header)
            response.set("Access-Control-Allow-Origin", "*");
34

35
        setResponseDefaultHeaders(response, keep_alive_timeout);
36

37
        if (!is_http_method_head)
38
            std::tie(response_header_ostr, response_body_ostr) = response.beginSend();
39
    }
40 41
}

42
void WriteBufferFromHTTPServerResponse::writeHeaderSummary()
43 44 45
{
    if (headers_finished_sending)
        return;
G
Guillaume Tassery 已提交
46

47 48 49
    WriteBufferFromOwnString progress_string_writer;
    accumulated_progress.writeJSON(progress_string_writer);

50 51
    if (response_header_ostr)
        *response_header_ostr << "X-ClickHouse-Summary: " << progress_string_writer.str() << "\r\n" << std::flush;
52 53 54 55 56 57
}

void WriteBufferFromHTTPServerResponse::writeHeaderProgress()
{
    if (headers_finished_sending)
        return;
G
Guillaume Tassery 已提交
58

59
    WriteBufferFromOwnString progress_string_writer;
60
    accumulated_progress.writeJSON(progress_string_writer);
61

62 63
    if (response_header_ostr)
        *response_header_ostr << "X-ClickHouse-Progress: " << progress_string_writer.str() << "\r\n" << std::flush;
64
}
65 66 67

void WriteBufferFromHTTPServerResponse::finishSendHeaders()
{
68 69
    if (!headers_finished_sending)
    {
70
        writeHeaderSummary();
71
        headers_finished_sending = true;
72

73
        if (!is_http_method_head)
74
        {
75 76 77
            /// Send end of headers delimiter.
            if (response_header_ostr)
                *response_header_ostr << "\r\n" << std::flush;
78 79 80
        }
        else
        {
81
            if (!response_body_ostr)
82
                response_body_ostr = response.send();
83
        }
84
    }
85 86 87 88 89
}


void WriteBufferFromHTTPServerResponse::nextImpl()
{
90
    {
A
Alexey Milovidov 已提交
91
        std::lock_guard lock(mutex);
92

93
        startSendHeaders();
94

95
        if (!out && !is_http_method_head)
96 97 98
        {
            if (compress)
            {
99 100 101
                auto content_encoding_name = toContentEncodingName(compression_method);

                *response_header_ostr << "Content-Encoding: " << content_encoding_name << "\r\n";
102
            }
A
Alexey Milovidov 已提交
103

A
Alexey Milovidov 已提交
104 105 106 107 108 109 110 111 112 113 114 115 116 117
            /// We reuse our buffer in "out" to avoid extra allocations and copies.

            if (compress)
                out = wrapWriteBufferWithCompressionMethod(
                    std::make_unique<WriteBufferFromOStream>(*response_body_ostr),
                    compress ? compression_method : CompressionMethod::None,
                    compression_level,
                    working_buffer.size(),
                    working_buffer.begin());
            else
                out = std::make_unique<WriteBufferFromOStream>(
                    *response_body_ostr,
                    working_buffer.size(),
                    working_buffer.begin());
118
        }
119

120 121
        finishSendHeaders();
    }
122

123 124
    if (out)
    {
A
Alexey Milovidov 已提交
125
        out->buffer() = buffer();
126 127 128
        out->position() = position();
        out->next();
    }
129 130 131 132
}


WriteBufferFromHTTPServerResponse::WriteBufferFromHTTPServerResponse(
133 134
    HTTPServerResponse & response_,
    bool is_http_method_head_,
135
    unsigned keep_alive_timeout_,
136
    bool compress_,
137
    CompressionMethod compression_method_)
A
Alexey Milovidov 已提交
138
    : BufferWithOwnMemory<WriteBuffer>(DBMS_DEFAULT_BUFFER_SIZE)
139
    , response(response_)
140
    , is_http_method_head(is_http_method_head_)
141 142 143
    , keep_alive_timeout(keep_alive_timeout_)
    , compress(compress_)
    , compression_method(compression_method_)
144 145 146 147 148 149
{
}


void WriteBufferFromHTTPServerResponse::onProgress(const Progress & progress)
{
A
Alexey Milovidov 已提交
150
    std::lock_guard lock(mutex);
151

152 153 154
    /// Cannot add new headers if body was started to send.
    if (headers_finished_sending)
        return;
155

156
    accumulated_progress.incrementPiecewiseAtomically(progress);
157

158 159 160
    if (progress_watch.elapsed() >= send_progress_interval_ms * 1000000)
    {
        progress_watch.restart();
161

162 163
        /// Send all common headers before our special progress headers.
        startSendHeaders();
164
        writeHeaderProgress();
165
    }
166 167 168 169 170
}


void WriteBufferFromHTTPServerResponse::finalize()
{
171 172
    next();
    if (out)
173
    {
174 175
        out->next();
        out.reset();
176
    }
177 178

    if (!offset())
179 180
    {
        /// If no remaining data, just send headers.
A
Alexey Milovidov 已提交
181
        std::lock_guard lock(mutex);
182 183 184
        startSendHeaders();
        finishSendHeaders();
    }
185 186 187 188 189
}


WriteBufferFromHTTPServerResponse::~WriteBufferFromHTTPServerResponse()
{
190 191 192
    /// FIXME move final flush into the caller
    MemoryTracker::LockExceptionInThread lock;
    finalize();
193 194 195
}

}