From 82a361d0e963d1f4a31c2fc1b0367c1afc351333 Mon Sep 17 00:00:00 2001 From: proller Date: Tue, 6 Mar 2018 00:09:39 +0300 Subject: [PATCH] Show error to client if query was killed (#1989) * Show error to client if query was killed * Kill exception v2 * Use kill * fix * wip * fix * fxi * try fix * Revert "try fix" This reverts commit eb76e4c0401ee02148204b814ff04b31bc470292. * QUERY_WASCANCELLED * Fxi all cancel() * fix --- dbms/src/Common/ErrorCodes.cpp | 1 + .../AggregatingBlockInputStream.cpp | 2 +- .../CreatingSetsBlockInputStream.cpp | 6 +++--- .../IProfilingBlockInputStream.cpp | 17 +++++++++------- .../DataStreams/IProfilingBlockInputStream.h | 20 ++++++++++++++++++- dbms/src/DataStreams/LazyBlockInputStream.h | 2 +- .../MergeSortingBlockInputStream.cpp | 2 +- .../MergingAggregatedBlockInputStream.cpp | 2 +- ...regatedMemoryEfficientBlockInputStream.cpp | 11 ++++++---- ...ggregatedMemoryEfficientBlockInputStream.h | 2 +- .../MergingSortedBlockInputStream.cpp | 6 +++--- .../ParallelAggregatingBlockInputStream.cpp | 14 +++++++------ .../ParallelAggregatingBlockInputStream.h | 2 +- .../src/DataStreams/ParallelInputsProcessor.h | 4 ++-- .../DataStreams/RemoteBlockInputStream.cpp | 9 ++++++--- dbms/src/DataStreams/RemoteBlockInputStream.h | 2 +- dbms/src/DataStreams/UnionBlockInputStream.h | 13 +++++++----- dbms/src/Interpreters/DDLWorker.cpp | 2 +- .../Interpreters/InterpreterCheckQuery.cpp | 2 +- .../InterpreterKillQueryQuery.cpp | 6 +++--- dbms/src/Interpreters/ProcessList.cpp | 4 ++-- dbms/src/Interpreters/ProcessList.h | 2 +- dbms/src/Server/PerformanceTest.cpp | 4 ++-- dbms/src/Server/TCPHandler.cpp | 2 +- dbms/src/Storages/StorageKafka.cpp | 2 +- 25 files changed, 86 insertions(+), 53 deletions(-) diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index d68ba4091c..4a253fc189 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -369,6 +369,7 @@ namespace ErrorCodes extern const int EXTERNAL_LIBRARY_ERROR = 391; extern const int QUERY_IS_PROHIBITED = 392; extern const int THERE_IS_NO_QUERY = 393; + extern const int QUERY_WAS_CANCELLED = 394; extern const int KEEPER_EXCEPTION = 999; diff --git a/dbms/src/DataStreams/AggregatingBlockInputStream.cpp b/dbms/src/DataStreams/AggregatingBlockInputStream.cpp index ce91333bfe..e8acc63932 100644 --- a/dbms/src/DataStreams/AggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/AggregatingBlockInputStream.cpp @@ -68,7 +68,7 @@ Block AggregatingBlockInputStream::readImpl() } } - if (isCancelled() || !impl) + if (isCancelledOrThrowIfKilled() || !impl) return {}; return impl->read(); diff --git a/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp b/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp index ee88c1160c..83c3db4850 100644 --- a/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp +++ b/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp @@ -22,7 +22,7 @@ Block CreatingSetsBlockInputStream::readImpl() createAll(); - if (isCancelled()) + if (isCancelledOrThrowIfKilled()) return res; return children.back()->read(); @@ -54,7 +54,7 @@ void CreatingSetsBlockInputStream::createAll() { if (elem.second.source) /// There could be prepared in advance Set/Join - no source is specified for them. { - if (isCancelled()) + if (isCancelledOrThrowIfKilled()) return; createOne(elem.second); @@ -139,7 +139,7 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery) if (done_with_set && done_with_join && done_with_table) { if (IProfilingBlockInputStream * profiling_in = dynamic_cast(&*subquery.source)) - profiling_in->cancel(); + profiling_in->cancel(false); break; } diff --git a/dbms/src/DataStreams/IProfilingBlockInputStream.cpp b/dbms/src/DataStreams/IProfilingBlockInputStream.cpp index 70d287b864..4be7789748 100644 --- a/dbms/src/DataStreams/IProfilingBlockInputStream.cpp +++ b/dbms/src/DataStreams/IProfilingBlockInputStream.cpp @@ -41,7 +41,7 @@ Block IProfilingBlockInputStream::read() Block res; - if (is_cancelled.load(std::memory_order_seq_cst)) + if (isCancelledOrThrowIfKilled()) return res; if (!checkTimeLimits()) @@ -71,7 +71,7 @@ Block IProfilingBlockInputStream::read() * but children sources are still working, * herewith they can work in separate threads or even remotely. */ - cancel(); + cancel(false); } progress(Progress(res.rows(), res.bytes())); @@ -264,7 +264,7 @@ void IProfilingBlockInputStream::progressImpl(const Progress & value) if (process_list_elem) { if (!process_list_elem->updateProgressIn(value)) - cancel(); + cancel(false); /// The total amount of data processed or intended for processing in all leaf sources, possibly on remote servers. @@ -302,7 +302,7 @@ void IProfilingBlockInputStream::progressImpl(const Progress & value) if ((limits.max_rows_to_read && rows_processed > limits.max_rows_to_read) || (limits.max_bytes_to_read && bytes_processed > limits.max_bytes_to_read)) { - cancel(); + cancel(false); } break; @@ -350,15 +350,18 @@ void IProfilingBlockInputStream::progressImpl(const Progress & value) } -void IProfilingBlockInputStream::cancel() +void IProfilingBlockInputStream::cancel(bool kill) { + if (kill) + is_killed = true; + bool old_val = false; if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed)) return; - forEachProfilingChild([] (IProfilingBlockInputStream & child) + forEachProfilingChild([&] (IProfilingBlockInputStream & child) { - child.cancel(); + child.cancel(kill); return false; }); } diff --git a/dbms/src/DataStreams/IProfilingBlockInputStream.h b/dbms/src/DataStreams/IProfilingBlockInputStream.h index e11df9a256..254b4e3101 100644 --- a/dbms/src/DataStreams/IProfilingBlockInputStream.h +++ b/dbms/src/DataStreams/IProfilingBlockInputStream.h @@ -12,6 +12,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int QUERY_WAS_CANCELLED; +} + class QuotaForIntervals; struct ProcessListElement; class IProfilingBlockInputStream; @@ -106,8 +111,11 @@ public: /** Ask to abort the receipt of data as soon as possible. * By default - just sets the flag is_cancelled and asks that all children be interrupted. * This function can be called several times, including simultaneously from different threads. + * Have two modes: + * with kill = false only is_cancelled is set - streams will stop silently with returning some processed data. + * with kill = true also is_killed set - queries will stop with exception. */ - virtual void cancel(); + virtual void cancel(bool kill); /** Do you want to abort the receipt of data. */ @@ -116,6 +124,15 @@ public: return is_cancelled.load(std::memory_order_seq_cst); } + bool isCancelledOrThrowIfKilled() const + { + if (!isCancelled()) + return false; + if (is_killed) + throw Exception("Query was cancelled", ErrorCodes::QUERY_WAS_CANCELLED); + return true; + } + /** What limitations and quotas should be checked. * LIMITS_CURRENT - checks amount of data read by current stream only (BlockStreamProfileInfo is used for check). * Currently it is used in root streams to check max_result_{rows,bytes} limits. @@ -173,6 +190,7 @@ public: protected: BlockStreamProfileInfo info; std::atomic is_cancelled{false}; + bool is_killed{false}; ProgressCallback progress_callback; ProcessListElement * process_list_elem = nullptr; diff --git a/dbms/src/DataStreams/LazyBlockInputStream.h b/dbms/src/DataStreams/LazyBlockInputStream.h index f4faceb392..3bb6e67c3a 100644 --- a/dbms/src/DataStreams/LazyBlockInputStream.h +++ b/dbms/src/DataStreams/LazyBlockInputStream.h @@ -59,7 +59,7 @@ protected: addChild(input); if (isCancelled() && p_input) - p_input->cancel(); + p_input->cancel(is_killed); } } diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index ed165fc0e8..46fc738a51 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -116,7 +116,7 @@ Block MergeSortingBlockInputStream::readImpl() } } - if ((blocks.empty() && temporary_files.empty()) || isCancelled()) + if ((blocks.empty() && temporary_files.empty()) || isCancelledOrThrowIfKilled()) return Block(); if (temporary_files.empty()) diff --git a/dbms/src/DataStreams/MergingAggregatedBlockInputStream.cpp b/dbms/src/DataStreams/MergingAggregatedBlockInputStream.cpp index 5586033482..6e0b5986e1 100644 --- a/dbms/src/DataStreams/MergingAggregatedBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingAggregatedBlockInputStream.cpp @@ -28,7 +28,7 @@ Block MergingAggregatedBlockInputStream::readImpl() } Block res; - if (isCancelled() || it == blocks.end()) + if (isCancelledOrThrowIfKilled() || it == blocks.end()) return res; res = std::move(*it); diff --git a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp index 3b38dae42a..bf436bb854 100644 --- a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp @@ -104,7 +104,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::readPrefix() void MergingAggregatedMemoryEfficientBlockInputStream::readSuffix() { - if (!all_read && !is_cancelled.load(std::memory_order_seq_cst)) + if (!all_read && !isCancelled()) throw Exception("readSuffix called before all data is read", ErrorCodes::LOGICAL_ERROR); finalize(); @@ -114,8 +114,11 @@ void MergingAggregatedMemoryEfficientBlockInputStream::readSuffix() } -void MergingAggregatedMemoryEfficientBlockInputStream::cancel() +void MergingAggregatedMemoryEfficientBlockInputStream::cancel(bool kill) { + if (kill) + is_killed = true; + bool old_val = false; if (!is_cancelled.compare_exchange_strong(old_val, true)) return; @@ -136,7 +139,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::cancel() { try { - child->cancel(); + child->cancel(kill); } catch (...) { @@ -265,7 +268,7 @@ MergingAggregatedMemoryEfficientBlockInputStream::~MergingAggregatedMemoryEffici try { if (!all_read) - cancel(); + cancel(false); finalize(); } diff --git a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h index 69af976c52..837c10869c 100644 --- a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h +++ b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h @@ -76,7 +76,7 @@ public: /** Different from the default implementation by trying to stop all sources, * skipping failed by execution. */ - void cancel() override; + void cancel(bool kill) override; Block getHeader() const override; diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp index c256e49e60..f93197ca12 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp @@ -176,7 +176,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std:: if (limit && total_merged_rows == limit) { // std::cerr << "Limit reached\n"; - cancel(); + cancel(false); finished = true; return true; } @@ -236,7 +236,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std:: column = column->cut(0, merged_rows); } - cancel(); + cancel(false); finished = true; } @@ -304,7 +304,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std:: return; } - cancel(); + cancel(false); finished = true; } diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp index 9405cbfd38..9fee2996a7 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp @@ -35,14 +35,16 @@ Block ParallelAggregatingBlockInputStream::getHeader() const } -void ParallelAggregatingBlockInputStream::cancel() +void ParallelAggregatingBlockInputStream::cancel(bool kill) { + if (kill) + is_killed = true; bool old_val = false; if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed)) return; if (!executed) - processor.cancel(); + processor.cancel(kill); } @@ -55,7 +57,7 @@ Block ParallelAggregatingBlockInputStream::readImpl() execute(); - if (isCancelled()) + if (isCancelledOrThrowIfKilled()) return {}; if (!aggregator.hasTemporaryFiles()) @@ -92,7 +94,7 @@ Block ParallelAggregatingBlockInputStream::readImpl() } Block res; - if (isCancelled() || !impl) + if (isCancelledOrThrowIfKilled() || !impl) return res; return impl->read(); @@ -150,7 +152,7 @@ void ParallelAggregatingBlockInputStream::Handler::onFinish() void ParallelAggregatingBlockInputStream::Handler::onException(std::exception_ptr & exception, size_t thread_num) { parent.exceptions[thread_num] = exception; - parent.cancel(); + parent.cancel(false); } @@ -174,7 +176,7 @@ void ParallelAggregatingBlockInputStream::execute() rethrowFirstException(exceptions); - if (isCancelled()) + if (isCancelledOrThrowIfKilled()) return; double elapsed_seconds = watch.elapsedSeconds(); diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h index a0dbb8a948..0a74557d44 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h @@ -27,7 +27,7 @@ public: String getName() const override { return "ParallelAggregating"; } - void cancel() override; + void cancel(bool kill) override; Block getHeader() const override; diff --git a/dbms/src/DataStreams/ParallelInputsProcessor.h b/dbms/src/DataStreams/ParallelInputsProcessor.h index d4f9bc6351..115dc6e1a3 100644 --- a/dbms/src/DataStreams/ParallelInputsProcessor.h +++ b/dbms/src/DataStreams/ParallelInputsProcessor.h @@ -110,7 +110,7 @@ public: } /// Ask all sources to stop earlier than they run out. - void cancel() + void cancel(bool kill) { finish = true; @@ -120,7 +120,7 @@ public: { try { - child->cancel(); + child->cancel(kill); } catch (...) { diff --git a/dbms/src/DataStreams/RemoteBlockInputStream.cpp b/dbms/src/DataStreams/RemoteBlockInputStream.cpp index 311cda1727..7b63b36cad 100644 --- a/dbms/src/DataStreams/RemoteBlockInputStream.cpp +++ b/dbms/src/DataStreams/RemoteBlockInputStream.cpp @@ -97,8 +97,11 @@ void RemoteBlockInputStream::readPrefix() sendQuery(); } -void RemoteBlockInputStream::cancel() +void RemoteBlockInputStream::cancel(bool kill) { + if (kill) + is_killed = true; + bool old_val = false; if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed)) return; @@ -110,7 +113,7 @@ void RemoteBlockInputStream::cancel() for (auto & vec : external_tables_data) for (auto & elem : vec) if (IProfilingBlockInputStream * stream = dynamic_cast(elem.first.get())) - stream->cancel(); + stream->cancel(kill); } if (!isQueryPending() || hasThrownException()) @@ -180,7 +183,7 @@ Block RemoteBlockInputStream::readImpl() while (true) { - if (isCancelled()) + if (isCancelledOrThrowIfKilled()) return Block(); Connection::Packet packet = multiplexed_connections->receivePacket(); diff --git a/dbms/src/DataStreams/RemoteBlockInputStream.h b/dbms/src/DataStreams/RemoteBlockInputStream.h index c76b0a03ff..fd50735516 100644 --- a/dbms/src/DataStreams/RemoteBlockInputStream.h +++ b/dbms/src/DataStreams/RemoteBlockInputStream.h @@ -62,7 +62,7 @@ public: */ void progress(const Progress & /*value*/) override {} - void cancel() override; + void cancel(bool kill) override; String getName() const override { return "Remote"; } diff --git a/dbms/src/DataStreams/UnionBlockInputStream.h b/dbms/src/DataStreams/UnionBlockInputStream.h index f45ff396cb..71e72672aa 100644 --- a/dbms/src/DataStreams/UnionBlockInputStream.h +++ b/dbms/src/DataStreams/UnionBlockInputStream.h @@ -99,7 +99,7 @@ public: try { if (!all_read) - cancel(); + cancel(false); finalize(); } @@ -112,14 +112,17 @@ public: /** Different from the default implementation by trying to stop all sources, * skipping failed by execution. */ - void cancel() override + void cancel(bool kill) override { + if (kill) + is_killed = true; + bool old_val = false; if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed)) return; //std::cerr << "cancelling\n"; - processor.cancel(); + processor.cancel(kill); } BlockExtraInfo getBlockExtraInfo() const override @@ -217,7 +220,7 @@ protected: void readSuffix() override { //std::cerr << "readSuffix\n"; - if (!all_read && !is_cancelled.load(std::memory_order_seq_cst)) + if (!all_read && !isCancelled()) throw Exception("readSuffix called before all data is read", ErrorCodes::LOGICAL_ERROR); finalize(); @@ -281,7 +284,7 @@ private: /// and the exception is lost. parent.output_queue.push(exception); - parent.cancel(); /// Does not throw exceptions. + parent.cancel(false); /// Does not throw exceptions. } Self & parent; diff --git a/dbms/src/Interpreters/DDLWorker.cpp b/dbms/src/Interpreters/DDLWorker.cpp index 3ae9a1d5a0..7cdcacb007 100644 --- a/dbms/src/Interpreters/DDLWorker.cpp +++ b/dbms/src/Interpreters/DDLWorker.cpp @@ -978,7 +978,7 @@ public: while(res.rows() == 0) { - if (is_cancelled) + if (isCancelled()) return res; if (timeout_seconds >= 0 && watch.elapsedSeconds() > timeout_seconds) diff --git a/dbms/src/Interpreters/InterpreterCheckQuery.cpp b/dbms/src/Interpreters/InterpreterCheckQuery.cpp index 068b789710..2657775919 100644 --- a/dbms/src/Interpreters/InterpreterCheckQuery.cpp +++ b/dbms/src/Interpreters/InterpreterCheckQuery.cpp @@ -126,7 +126,7 @@ BlockIO InterpreterCheckQuery::execute() while (true) { - if (stream.isCancelled()) + if (stream.isCancelledOrThrowIfKilled()) { BlockIO res; res.in = std::make_shared(result); diff --git a/dbms/src/Interpreters/InterpreterKillQueryQuery.cpp b/dbms/src/Interpreters/InterpreterKillQueryQuery.cpp index 68f12dc578..da37dee5c4 100644 --- a/dbms/src/Interpreters/InterpreterKillQueryQuery.cpp +++ b/dbms/src/Interpreters/InterpreterKillQueryQuery.cpp @@ -135,7 +135,7 @@ public: if (curr_process.processed) continue; - auto code = process_list.sendCancelToQuery(curr_process.query_id, curr_process.user); + auto code = process_list.sendCancelToQuery(curr_process.query_id, curr_process.user, true); if (code != CancellationCode::QueryIsNotInitializedYet && code != CancellationCode::CancelSent) { @@ -148,7 +148,7 @@ public: /// KILL QUERY could be killed also /// Probably interpreting KILL QUERIES as complete (not internal) queries is extra functionality - if (is_cancelled) + if (isCancelled()) break; /// Sleep if there are unprocessed queries @@ -190,7 +190,7 @@ BlockIO InterpreterKillQueryQuery::execute() for (const auto & query_desc : queries_to_stop) { - auto code = (query.test) ? CancellationCode::Unknown : process_list.sendCancelToQuery(query_desc.query_id, query_desc.user); + auto code = (query.test) ? CancellationCode::Unknown : process_list.sendCancelToQuery(query_desc.query_id, query_desc.user, true); insertResultRow(query_desc.source_num, code, processes_block, header, res_columns); } diff --git a/dbms/src/Interpreters/ProcessList.cpp b/dbms/src/Interpreters/ProcessList.cpp index d886367d0b..95e279b6e1 100644 --- a/dbms/src/Interpreters/ProcessList.cpp +++ b/dbms/src/Interpreters/ProcessList.cpp @@ -230,7 +230,7 @@ ProcessListElement * ProcessList::tryGetProcessListElement(const String & curren } -ProcessList::CancellationCode ProcessList::sendCancelToQuery(const String & current_query_id, const String & current_user) +ProcessList::CancellationCode ProcessList::sendCancelToQuery(const String & current_query_id, const String & current_user, bool kill) { std::lock_guard lock(mutex); @@ -251,7 +251,7 @@ ProcessList::CancellationCode ProcessList::sendCancelToQuery(const String & curr IProfilingBlockInputStream * input_stream_casted; if (input_stream && (input_stream_casted = dynamic_cast(input_stream.get()))) { - input_stream_casted->cancel(); + input_stream_casted->cancel(kill); return CancellationCode::CancelSent; } return CancellationCode::CancelCannotBeSent; diff --git a/dbms/src/Interpreters/ProcessList.h b/dbms/src/Interpreters/ProcessList.h index edf1060f0c..f061e43ba1 100644 --- a/dbms/src/Interpreters/ProcessList.h +++ b/dbms/src/Interpreters/ProcessList.h @@ -286,7 +286,7 @@ public: }; /// Try call cancel() for input and output streams of query with specified id and user - CancellationCode sendCancelToQuery(const String & current_query_id, const String & current_user); + CancellationCode sendCancelToQuery(const String & current_query_id, const String & current_user, bool kill = false); }; } diff --git a/dbms/src/Server/PerformanceTest.cpp b/dbms/src/Server/PerformanceTest.cpp index d5239ac0e4..82518df2fe 100644 --- a/dbms/src/Server/PerformanceTest.cpp +++ b/dbms/src/Server/PerformanceTest.cpp @@ -1118,14 +1118,14 @@ private: if (stop_conditions.areFulfilled()) { statistics.last_query_was_cancelled = true; - stream.cancel(); + stream.cancel(false); } if (interrupt_listener.check()) { gotSIGINT = true; statistics.last_query_was_cancelled = true; - stream.cancel(); + stream.cancel(false); } } diff --git a/dbms/src/Server/TCPHandler.cpp b/dbms/src/Server/TCPHandler.cpp index 7dadeef966..c8d249fb20 100644 --- a/dbms/src/Server/TCPHandler.cpp +++ b/dbms/src/Server/TCPHandler.cpp @@ -321,7 +321,7 @@ void TCPHandler::processOrdinaryQuery() if (isQueryCancelled()) { /// A packet was received requesting to stop execution of the request. - async_in.cancel(); + async_in.cancel(false); break; } else diff --git a/dbms/src/Storages/StorageKafka.cpp b/dbms/src/Storages/StorageKafka.cpp index c7ac83d114..c3b14a468c 100644 --- a/dbms/src/Storages/StorageKafka.cpp +++ b/dbms/src/Storages/StorageKafka.cpp @@ -163,7 +163,7 @@ public: Block readImpl() override { - if (isCancelled()) + if (isCancelledOrThrowIfKilled()) return {}; return reader->read(); -- GitLab