提交 82a361d0 编写于 作者: P proller 提交者: alexey-milovidov

Show error to client if query was killed (#1989)

* Show error to client if query was killed

* Kill exception v2

* Use kill

* fix

* wip

* fix

* fxi

* try fix

* Revert "try fix"

This reverts commit eb76e4c0401ee02148204b814ff04b31bc470292.

* QUERY_WASCANCELLED

* Fxi all cancel()

* fix
上级 6282c8e4
......@@ -369,6 +369,7 @@ namespace ErrorCodes
extern const int EXTERNAL_LIBRARY_ERROR = 391;
extern const int QUERY_IS_PROHIBITED = 392;
extern const int THERE_IS_NO_QUERY = 393;
extern const int QUERY_WAS_CANCELLED = 394;
extern const int KEEPER_EXCEPTION = 999;
......
......@@ -68,7 +68,7 @@ Block AggregatingBlockInputStream::readImpl()
}
}
if (isCancelled() || !impl)
if (isCancelledOrThrowIfKilled() || !impl)
return {};
return impl->read();
......
......@@ -22,7 +22,7 @@ Block CreatingSetsBlockInputStream::readImpl()
createAll();
if (isCancelled())
if (isCancelledOrThrowIfKilled())
return res;
return children.back()->read();
......@@ -54,7 +54,7 @@ void CreatingSetsBlockInputStream::createAll()
{
if (elem.second.source) /// There could be prepared in advance Set/Join - no source is specified for them.
{
if (isCancelled())
if (isCancelledOrThrowIfKilled())
return;
createOne(elem.second);
......@@ -139,7 +139,7 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
if (done_with_set && done_with_join && done_with_table)
{
if (IProfilingBlockInputStream * profiling_in = dynamic_cast<IProfilingBlockInputStream *>(&*subquery.source))
profiling_in->cancel();
profiling_in->cancel(false);
break;
}
......
......@@ -41,7 +41,7 @@ Block IProfilingBlockInputStream::read()
Block res;
if (is_cancelled.load(std::memory_order_seq_cst))
if (isCancelledOrThrowIfKilled())
return res;
if (!checkTimeLimits())
......@@ -71,7 +71,7 @@ Block IProfilingBlockInputStream::read()
* but children sources are still working,
* herewith they can work in separate threads or even remotely.
*/
cancel();
cancel(false);
}
progress(Progress(res.rows(), res.bytes()));
......@@ -264,7 +264,7 @@ void IProfilingBlockInputStream::progressImpl(const Progress & value)
if (process_list_elem)
{
if (!process_list_elem->updateProgressIn(value))
cancel();
cancel(false);
/// The total amount of data processed or intended for processing in all leaf sources, possibly on remote servers.
......@@ -302,7 +302,7 @@ void IProfilingBlockInputStream::progressImpl(const Progress & value)
if ((limits.max_rows_to_read && rows_processed > limits.max_rows_to_read)
|| (limits.max_bytes_to_read && bytes_processed > limits.max_bytes_to_read))
{
cancel();
cancel(false);
}
break;
......@@ -350,15 +350,18 @@ void IProfilingBlockInputStream::progressImpl(const Progress & value)
}
void IProfilingBlockInputStream::cancel()
void IProfilingBlockInputStream::cancel(bool kill)
{
if (kill)
is_killed = true;
bool old_val = false;
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
return;
forEachProfilingChild([] (IProfilingBlockInputStream & child)
forEachProfilingChild([&] (IProfilingBlockInputStream & child)
{
child.cancel();
child.cancel(kill);
return false;
});
}
......
......@@ -12,6 +12,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int QUERY_WAS_CANCELLED;
}
class QuotaForIntervals;
struct ProcessListElement;
class IProfilingBlockInputStream;
......@@ -106,8 +111,11 @@ public:
/** Ask to abort the receipt of data as soon as possible.
* By default - just sets the flag is_cancelled and asks that all children be interrupted.
* This function can be called several times, including simultaneously from different threads.
* Have two modes:
* with kill = false only is_cancelled is set - streams will stop silently with returning some processed data.
* with kill = true also is_killed set - queries will stop with exception.
*/
virtual void cancel();
virtual void cancel(bool kill);
/** Do you want to abort the receipt of data.
*/
......@@ -116,6 +124,15 @@ public:
return is_cancelled.load(std::memory_order_seq_cst);
}
bool isCancelledOrThrowIfKilled() const
{
if (!isCancelled())
return false;
if (is_killed)
throw Exception("Query was cancelled", ErrorCodes::QUERY_WAS_CANCELLED);
return true;
}
/** What limitations and quotas should be checked.
* LIMITS_CURRENT - checks amount of data read by current stream only (BlockStreamProfileInfo is used for check).
* Currently it is used in root streams to check max_result_{rows,bytes} limits.
......@@ -173,6 +190,7 @@ public:
protected:
BlockStreamProfileInfo info;
std::atomic<bool> is_cancelled{false};
bool is_killed{false};
ProgressCallback progress_callback;
ProcessListElement * process_list_elem = nullptr;
......
......@@ -59,7 +59,7 @@ protected:
addChild(input);
if (isCancelled() && p_input)
p_input->cancel();
p_input->cancel(is_killed);
}
}
......
......@@ -116,7 +116,7 @@ Block MergeSortingBlockInputStream::readImpl()
}
}
if ((blocks.empty() && temporary_files.empty()) || isCancelled())
if ((blocks.empty() && temporary_files.empty()) || isCancelledOrThrowIfKilled())
return Block();
if (temporary_files.empty())
......
......@@ -28,7 +28,7 @@ Block MergingAggregatedBlockInputStream::readImpl()
}
Block res;
if (isCancelled() || it == blocks.end())
if (isCancelledOrThrowIfKilled() || it == blocks.end())
return res;
res = std::move(*it);
......
......@@ -104,7 +104,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::readPrefix()
void MergingAggregatedMemoryEfficientBlockInputStream::readSuffix()
{
if (!all_read && !is_cancelled.load(std::memory_order_seq_cst))
if (!all_read && !isCancelled())
throw Exception("readSuffix called before all data is read", ErrorCodes::LOGICAL_ERROR);
finalize();
......@@ -114,8 +114,11 @@ void MergingAggregatedMemoryEfficientBlockInputStream::readSuffix()
}
void MergingAggregatedMemoryEfficientBlockInputStream::cancel()
void MergingAggregatedMemoryEfficientBlockInputStream::cancel(bool kill)
{
if (kill)
is_killed = true;
bool old_val = false;
if (!is_cancelled.compare_exchange_strong(old_val, true))
return;
......@@ -136,7 +139,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::cancel()
{
try
{
child->cancel();
child->cancel(kill);
}
catch (...)
{
......@@ -265,7 +268,7 @@ MergingAggregatedMemoryEfficientBlockInputStream::~MergingAggregatedMemoryEffici
try
{
if (!all_read)
cancel();
cancel(false);
finalize();
}
......
......@@ -76,7 +76,7 @@ public:
/** Different from the default implementation by trying to stop all sources,
* skipping failed by execution.
*/
void cancel() override;
void cancel(bool kill) override;
Block getHeader() const override;
......
......@@ -176,7 +176,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
if (limit && total_merged_rows == limit)
{
// std::cerr << "Limit reached\n";
cancel();
cancel(false);
finished = true;
return true;
}
......@@ -236,7 +236,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
column = column->cut(0, merged_rows);
}
cancel();
cancel(false);
finished = true;
}
......@@ -304,7 +304,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
return;
}
cancel();
cancel(false);
finished = true;
}
......
......@@ -35,14 +35,16 @@ Block ParallelAggregatingBlockInputStream::getHeader() const
}
void ParallelAggregatingBlockInputStream::cancel()
void ParallelAggregatingBlockInputStream::cancel(bool kill)
{
if (kill)
is_killed = true;
bool old_val = false;
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
return;
if (!executed)
processor.cancel();
processor.cancel(kill);
}
......@@ -55,7 +57,7 @@ Block ParallelAggregatingBlockInputStream::readImpl()
execute();
if (isCancelled())
if (isCancelledOrThrowIfKilled())
return {};
if (!aggregator.hasTemporaryFiles())
......@@ -92,7 +94,7 @@ Block ParallelAggregatingBlockInputStream::readImpl()
}
Block res;
if (isCancelled() || !impl)
if (isCancelledOrThrowIfKilled() || !impl)
return res;
return impl->read();
......@@ -150,7 +152,7 @@ void ParallelAggregatingBlockInputStream::Handler::onFinish()
void ParallelAggregatingBlockInputStream::Handler::onException(std::exception_ptr & exception, size_t thread_num)
{
parent.exceptions[thread_num] = exception;
parent.cancel();
parent.cancel(false);
}
......@@ -174,7 +176,7 @@ void ParallelAggregatingBlockInputStream::execute()
rethrowFirstException(exceptions);
if (isCancelled())
if (isCancelledOrThrowIfKilled())
return;
double elapsed_seconds = watch.elapsedSeconds();
......
......@@ -27,7 +27,7 @@ public:
String getName() const override { return "ParallelAggregating"; }
void cancel() override;
void cancel(bool kill) override;
Block getHeader() const override;
......
......@@ -110,7 +110,7 @@ public:
}
/// Ask all sources to stop earlier than they run out.
void cancel()
void cancel(bool kill)
{
finish = true;
......@@ -120,7 +120,7 @@ public:
{
try
{
child->cancel();
child->cancel(kill);
}
catch (...)
{
......
......@@ -97,8 +97,11 @@ void RemoteBlockInputStream::readPrefix()
sendQuery();
}
void RemoteBlockInputStream::cancel()
void RemoteBlockInputStream::cancel(bool kill)
{
if (kill)
is_killed = true;
bool old_val = false;
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
return;
......@@ -110,7 +113,7 @@ void RemoteBlockInputStream::cancel()
for (auto & vec : external_tables_data)
for (auto & elem : vec)
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(elem.first.get()))
stream->cancel();
stream->cancel(kill);
}
if (!isQueryPending() || hasThrownException())
......@@ -180,7 +183,7 @@ Block RemoteBlockInputStream::readImpl()
while (true)
{
if (isCancelled())
if (isCancelledOrThrowIfKilled())
return Block();
Connection::Packet packet = multiplexed_connections->receivePacket();
......
......@@ -62,7 +62,7 @@ public:
*/
void progress(const Progress & /*value*/) override {}
void cancel() override;
void cancel(bool kill) override;
String getName() const override { return "Remote"; }
......
......@@ -99,7 +99,7 @@ public:
try
{
if (!all_read)
cancel();
cancel(false);
finalize();
}
......@@ -112,14 +112,17 @@ public:
/** Different from the default implementation by trying to stop all sources,
* skipping failed by execution.
*/
void cancel() override
void cancel(bool kill) override
{
if (kill)
is_killed = true;
bool old_val = false;
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
return;
//std::cerr << "cancelling\n";
processor.cancel();
processor.cancel(kill);
}
BlockExtraInfo getBlockExtraInfo() const override
......@@ -217,7 +220,7 @@ protected:
void readSuffix() override
{
//std::cerr << "readSuffix\n";
if (!all_read && !is_cancelled.load(std::memory_order_seq_cst))
if (!all_read && !isCancelled())
throw Exception("readSuffix called before all data is read", ErrorCodes::LOGICAL_ERROR);
finalize();
......@@ -281,7 +284,7 @@ private:
/// and the exception is lost.
parent.output_queue.push(exception);
parent.cancel(); /// Does not throw exceptions.
parent.cancel(false); /// Does not throw exceptions.
}
Self & parent;
......
......@@ -978,7 +978,7 @@ public:
while(res.rows() == 0)
{
if (is_cancelled)
if (isCancelled())
return res;
if (timeout_seconds >= 0 && watch.elapsedSeconds() > timeout_seconds)
......
......@@ -126,7 +126,7 @@ BlockIO InterpreterCheckQuery::execute()
while (true)
{
if (stream.isCancelled())
if (stream.isCancelledOrThrowIfKilled())
{
BlockIO res;
res.in = std::make_shared<OneBlockInputStream>(result);
......
......@@ -135,7 +135,7 @@ public:
if (curr_process.processed)
continue;
auto code = process_list.sendCancelToQuery(curr_process.query_id, curr_process.user);
auto code = process_list.sendCancelToQuery(curr_process.query_id, curr_process.user, true);
if (code != CancellationCode::QueryIsNotInitializedYet && code != CancellationCode::CancelSent)
{
......@@ -148,7 +148,7 @@ public:
/// KILL QUERY could be killed also
/// Probably interpreting KILL QUERIES as complete (not internal) queries is extra functionality
if (is_cancelled)
if (isCancelled())
break;
/// Sleep if there are unprocessed queries
......@@ -190,7 +190,7 @@ BlockIO InterpreterKillQueryQuery::execute()
for (const auto & query_desc : queries_to_stop)
{
auto code = (query.test) ? CancellationCode::Unknown : process_list.sendCancelToQuery(query_desc.query_id, query_desc.user);
auto code = (query.test) ? CancellationCode::Unknown : process_list.sendCancelToQuery(query_desc.query_id, query_desc.user, true);
insertResultRow(query_desc.source_num, code, processes_block, header, res_columns);
}
......
......@@ -230,7 +230,7 @@ ProcessListElement * ProcessList::tryGetProcessListElement(const String & curren
}
ProcessList::CancellationCode ProcessList::sendCancelToQuery(const String & current_query_id, const String & current_user)
ProcessList::CancellationCode ProcessList::sendCancelToQuery(const String & current_query_id, const String & current_user, bool kill)
{
std::lock_guard<std::mutex> lock(mutex);
......@@ -251,7 +251,7 @@ ProcessList::CancellationCode ProcessList::sendCancelToQuery(const String & curr
IProfilingBlockInputStream * input_stream_casted;
if (input_stream && (input_stream_casted = dynamic_cast<IProfilingBlockInputStream *>(input_stream.get())))
{
input_stream_casted->cancel();
input_stream_casted->cancel(kill);
return CancellationCode::CancelSent;
}
return CancellationCode::CancelCannotBeSent;
......
......@@ -286,7 +286,7 @@ public:
};
/// Try call cancel() for input and output streams of query with specified id and user
CancellationCode sendCancelToQuery(const String & current_query_id, const String & current_user);
CancellationCode sendCancelToQuery(const String & current_query_id, const String & current_user, bool kill = false);
};
}
......@@ -1118,14 +1118,14 @@ private:
if (stop_conditions.areFulfilled())
{
statistics.last_query_was_cancelled = true;
stream.cancel();
stream.cancel(false);
}
if (interrupt_listener.check())
{
gotSIGINT = true;
statistics.last_query_was_cancelled = true;
stream.cancel();
stream.cancel(false);
}
}
......
......@@ -321,7 +321,7 @@ void TCPHandler::processOrdinaryQuery()
if (isQueryCancelled())
{
/// A packet was received requesting to stop execution of the request.
async_in.cancel();
async_in.cancel(false);
break;
}
else
......
......@@ -163,7 +163,7 @@ public:
Block readImpl() override
{
if (isCancelled())
if (isCancelledOrThrowIfKilled())
return {};
return reader->read();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册