提交 5a033eb7 编写于 作者: A Alexey Arno

dbms: Server: fixed issue with query cancellation. [#METR-14410]

上级 3fe14e7b
......@@ -7,9 +7,9 @@
#include <DB/Interpreters/ProcessList.h>
#include <DB/DataStreams/BlockStreamProfileInfo.h>
#include <DB/DataStreams/IBlockInputStream.h>
#include <atomic>
namespace DB
{
......@@ -96,9 +96,9 @@ public:
/** Требуется ли прервать получение данных.
*/
bool isCancelled()
bool isCancelled() const
{
return is_cancelled;
return is_cancelled.load(std::memory_order_seq_cst);
}
/** Какие ограничения (и квоты) проверяются.
......@@ -151,7 +151,7 @@ public:
protected:
BlockStreamProfileInfo info;
volatile bool is_cancelled = false;
std::atomic<bool> is_cancelled{false};
ProgressCallback progress_callback;
ProcessList::Element * process_list_elem = nullptr;
......
......@@ -57,7 +57,8 @@ public:
void cancel() override
{
if (!__sync_bool_compare_and_swap(&is_cancelled, false, true))
bool old_val = false;
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
return;
processor.cancel();
......
......@@ -80,7 +80,8 @@ public:
void cancel() override
{
if (!__sync_bool_compare_and_swap(&is_cancelled, false, true))
bool old_val = false;
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
return;
if (hasNoQueryInProgress() || hasThrownException())
......@@ -147,6 +148,9 @@ protected:
while (true)
{
if (isCancelled())
return Block();
Connection::Packet packet = parallel_replicas->receivePacket();
switch (packet.type)
......@@ -178,10 +182,6 @@ protected:
* и квот (например, на количество строчек для чтения).
*/
progressImpl(packet.progress);
if (isQueryInProgress() && isCancelled())
cancel();
break;
case Protocol::Server::ProfileInfo:
......@@ -281,7 +281,7 @@ private:
void tryCancel(const char * reason)
{
bool old_val = false;
if (!was_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_seq_cst))
if (!was_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
return;
LOG_TRACE(log, "(" << parallel_replicas->dumpAddresses() << ") " << reason);
......
......@@ -76,7 +76,8 @@ public:
*/
void cancel() override
{
if (!__sync_bool_compare_and_swap(&is_cancelled, false, true))
bool old_val = false;
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
return;
//std::cerr << "cancelling\n";
......@@ -164,7 +165,7 @@ protected:
void readSuffix() override
{
//std::cerr << "readSuffix\n";
if (!all_read && !is_cancelled)
if (!all_read && !is_cancelled.load(std::memory_order_seq_cst))
throw Exception("readSuffix called before all data is read", ErrorCodes::LOGICAL_ERROR);
finalize();
......
......@@ -6,6 +6,7 @@
#include <DB/DataStreams/IRowInputStream.h>
#include <DB/DataStreams/IRowOutputStream.h>
#include <atomic>
namespace DB
{
......@@ -13,7 +14,7 @@ namespace DB
/** Копирует данные из InputStream в OutputStream
* (например, из БД в консоль и т. п.)
*/
void copyData(IBlockInputStream & from, IBlockOutputStream & to, volatile bool * is_cancelled = nullptr);
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled = nullptr);
void copyData(IRowInputStream & from, IRowOutputStream & to);
}
......@@ -31,7 +31,7 @@ Block IProfilingBlockInputStream::read()
Block res;
if (is_cancelled)
if (is_cancelled.load(std::memory_order_seq_cst))
return res;
res = readImpl();
......@@ -292,7 +292,8 @@ void IProfilingBlockInputStream::progressImpl(const Progress & value)
void IProfilingBlockInputStream::cancel()
{
if (!__sync_bool_compare_and_swap(&is_cancelled, false, true))
bool old_val = false;
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
return;
for (auto & child : children)
......
......@@ -7,20 +7,30 @@
namespace DB
{
void copyData(IBlockInputStream & from, IBlockOutputStream & to, volatile bool * is_cancelled)
namespace
{
bool isAtomicSet(std::atomic<bool> * val)
{
return ((val != nullptr) && val->load(std::memory_order_seq_cst));
}
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled)
{
from.readPrefix();
to.writePrefix();
while (Block block = from.read())
{
if (is_cancelled && *is_cancelled)
if (isAtomicSet(is_cancelled))
break;
to.write(block);
}
if (is_cancelled && *is_cancelled)
if (isAtomicSet(is_cancelled))
return;
/// Для вывода дополнительной информации в некоторых форматах.
......@@ -33,7 +43,7 @@ void copyData(IBlockInputStream & from, IBlockOutputStream & to, volatile bool *
to.setExtremes(input->getExtremes());
}
if (is_cancelled && *is_cancelled)
if (isAtomicSet(is_cancelled))
return;
from.readSuffix();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册