未验证 提交 39f8eb57 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #4213 from nvartolomei/nv/http-cancel

Cancel http read only queries if client socket goes away
......@@ -4,6 +4,7 @@
#include <Poco/File.h>
#include <Poco/Net/HTTPBasicCredentials.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerRequestImpl.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Net/NetException.h>
......@@ -558,9 +559,47 @@ void HTTPHandler::processQuery(
client_info.http_method = http_method;
client_info.http_user_agent = request.get("User-Agent", "");
auto appendCallback = [&context] (ProgressCallback callback)
{
auto prev = context.getProgressCallback();
context.setProgressCallback([prev, callback] (const Progress & progress)
{
if (prev)
prev(progress);
callback(progress);
});
};
/// While still no data has been sent, we will report about query execution progress by sending HTTP headers.
if (settings.send_progress_in_http_headers)
context.setProgressCallback([&used_output] (const Progress & progress) { used_output.out->onProgress(progress); });
appendCallback([&used_output] (const Progress & progress) { used_output.out->onProgress(progress); });
if (settings.readonly > 0 && settings.cancel_http_readonly_queries_on_client_close)
{
Poco::Net::StreamSocket & socket = dynamic_cast<Poco::Net::HTTPServerRequestImpl &>(request).socket();
appendCallback([&context, &socket](const Progress &)
{
/// Assume that at the point this method is called no one is reading data from the socket any more.
/// True for read-only queries.
try
{
char b;
int status = socket.receiveBytes(&b, 1, MSG_DONTWAIT | MSG_PEEK);
if (status == 0)
context.killCurrentQuery();
}
catch (Poco::TimeoutException &)
{
}
catch (...)
{
context.killCurrentQuery();
}
});
}
executeQuery(*in, *used_output.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context,
[&response] (const String & content_type) { response.setContentType(content_type); },
......
......@@ -1077,6 +1077,13 @@ void Context::setCurrentQueryId(const String & query_id)
client_info.current_query_id = query_id_to_set;
}
void Context::killCurrentQuery()
{
if (process_list_elem)
{
process_list_elem->cancelQuery(true);
}
};
String Context::getDefaultFormat() const
{
......
......@@ -236,6 +236,8 @@ public:
void setCurrentDatabase(const String & name);
void setCurrentQueryId(const String & query_id);
void killCurrentQuery();
void setInsertionTable(std::pair<String, String> && db_and_table) { insertion_table = db_and_table; }
const std::pair<String, String> & getInsertionTable() const { return insertion_table; }
......
......@@ -26,9 +26,6 @@ namespace ErrorCodes
extern const int CANNOT_KILL;
}
using CancellationCode = ProcessList::CancellationCode;
static const char * cancellationCodeToStatus(CancellationCode code)
{
switch (code)
......
......@@ -325,6 +325,29 @@ bool QueryStatus::tryGetQueryStreams(BlockInputStreamPtr & in, BlockOutputStream
return true;
}
CancellationCode QueryStatus::cancelQuery(bool kill)
{
/// Streams are destroyed, and ProcessListElement will be deleted from ProcessList soon. We need wait a little bit
if (streamsAreReleased())
return CancellationCode::CancelSent;
BlockInputStreamPtr input_stream;
BlockOutputStreamPtr output_stream;
if (tryGetQueryStreams(input_stream, output_stream))
{
if (input_stream)
{
input_stream->cancel(kill);
return CancellationCode::CancelSent;
}
return CancellationCode::CancelCannotBeSent;
}
/// Query is not even started
is_killed.store(true);
return CancellationCode::CancelSent;
}
void QueryStatus::setUserProcessList(ProcessListForUser * user_process_list_)
{
......@@ -356,7 +379,7 @@ QueryStatus * ProcessList::tryGetProcessListElement(const String & current_query
}
ProcessList::CancellationCode ProcessList::sendCancelToQuery(const String & current_query_id, const String & current_user, bool kill)
CancellationCode ProcessList::sendCancelToQuery(const String & current_query_id, const String & current_user, bool kill)
{
std::lock_guard lock(mutex);
......@@ -365,25 +388,7 @@ ProcessList::CancellationCode ProcessList::sendCancelToQuery(const String & curr
if (!elem)
return CancellationCode::NotFound;
/// Streams are destroyed, and ProcessListElement will be deleted from ProcessList soon. We need wait a little bit
if (elem->streamsAreReleased())
return CancellationCode::CancelSent;
BlockInputStreamPtr input_stream;
BlockOutputStreamPtr output_stream;
if (elem->tryGetQueryStreams(input_stream, output_stream))
{
if (input_stream)
{
input_stream->cancel(kill);
return CancellationCode::CancelSent;
}
return CancellationCode::CancelCannotBeSent;
}
/// Query is not even started
elem->is_killed.store(true);
return CancellationCode::CancelSent;
return elem->cancelQuery(kill);
}
......
......@@ -70,6 +70,14 @@ struct QueryStatusInfo
std::shared_ptr<Settings> query_settings;
};
enum class CancellationCode
{
NotFound = 0, /// already cancelled
QueryIsNotInitializedYet = 1,
CancelCannotBeSent = 2,
CancelSent = 3,
Unknown
};
/// Query and information about its execution.
class QueryStatus
......@@ -192,6 +200,8 @@ public:
/// Get query in/out pointers from BlockIO
bool tryGetQueryStreams(BlockInputStreamPtr & in, BlockOutputStreamPtr & out) const;
CancellationCode cancelQuery(bool kill);
bool isKilled() const { return is_killed; }
};
......@@ -312,15 +322,6 @@ public:
max_size = max_size_;
}
enum class CancellationCode
{
NotFound = 0, /// already cancelled
QueryIsNotInitializedYet = 1,
CancelCannotBeSent = 2,
CancelSent = 3,
Unknown
};
/// Try call cancel() for input and output streams of query with specified id and user
CancellationCode sendCancelToQuery(const String & current_query_id, const String & current_user, bool kill = false);
};
......
......@@ -299,6 +299,7 @@ struct Settings
M(SettingBool, low_cardinality_allow_in_native_format, true, "Use LowCardinality type in Native format. Otherwise, convert LowCardinality columns to ordinary for select query, and convert ordinary columns to required LowCardinality for insert query.") \
M(SettingBool, allow_experimental_multiple_joins_emulation, false, "Emulate multiple joins using subselects") \
M(SettingBool, allow_experimental_cross_to_join_conversion, false, "Convert CROSS JOIN to INNER JOIN if possible") \
M(SettingBool, cancel_http_readonly_queries_on_client_close, false, "Cancel HTTP readonly queries when a client closes the connection without waiting for response.") \
#define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) \
TYPE NAME {DEFAULT};
......
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}?query_id=cancel_http_readonly_queries_on_client_close&cancel_http_readonly_queries_on_client_close=1&query=SELECT+count()+FROM+system.numbers" &
REQUEST_CURL_PID=$!
sleep 0.1
# Check query is registered
$CLICKHOUSE_CLIENT -q "SELECT count() FROM system.processes where query_id='cancel_http_readonly_queries_on_client_close'"
# Kill client (curl process)
kill -SIGTERM $REQUEST_CURL_PID
sleep 0.1
# Check query is killed after client is gone
$CLICKHOUSE_CLIENT -q "SELECT count() FROM system.processes where query_id='cancel_http_readonly_queries_on_client_close'"
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册