提交 6def419e 编写于 作者: A Alexey Milovidov

dbms: development [#CONV-2944].

上级 d7eb7a1c
......@@ -42,6 +42,7 @@ public:
virtual Block read() = 0;
/** Прочитать что-нибудь перед началом всех данных или после конца всех данных.
* В функции readSuffix можно реализовать финализацию, которая может привести к исключению.
*/
virtual void readPrefix() {}
virtual void readSuffix() {}
......
......@@ -74,6 +74,9 @@ public:
Block read();
/// Реализация по-умолчанию вызывает рекурсивно readSuffix() у всех детей, а затем readSuffixImpl() у себя.
void readSuffix();
/// Получить информацию о скорости выполнения.
const BlockStreamProfileInfo & getInfo() const;
......@@ -185,6 +188,8 @@ protected:
/// Наследники должны реализовать эту функцию.
virtual Block readImpl() = 0;
/// Здесь необходимо делать финализацию, которая может привести к исключению.
virtual void readSuffixImpl() {}
void updateExtremes(Block & block);
bool checkLimits();
......
......@@ -24,8 +24,6 @@ public:
children.insert(children.end(), inputs_.begin(), inputs_.end());
}
void readSuffix();
String getName() const { return "MergingSortedBlockInputStream"; }
String getID() const
......@@ -52,6 +50,7 @@ public:
protected:
Block readImpl();
void readSuffixImpl();
/// Инициализирует очередь и следующий блок результата.
void init(Block & merged_block, ColumnPlainPtrs & merged_columns);
......
......@@ -82,62 +82,10 @@ public:
~RemoteBlockInputStream()
{
bool uncaught_exception = std::uncaught_exception();
/** В случае эксепшена, закрываем соединение, чтобы оно не осталось висеть в рассихронизированном состоянии.
*/
if (uncaught_exception)
if (std::uncaught_exception())
connection.disconnect();
/** Если одно из:
* - ничего не начинали делать;
* - получили все пакеты до EndOfStream;
* - получили с сервера эксепшен;
* - объект уничтожается из-за эксепшена;
* - то больше читать ничего не нужно.
*/
if (!sent_query || finished || got_exception_from_server || uncaught_exception)
return;
/** Если ещё прочитали не все данные, но они больше не нужны.
* Это может быть из-за того, что данных достаточно (например, при использовании LIMIT).
*/
/// Отправим просьбу прервать выполнение запроса, если ещё не отправляли.
if (!was_cancelled)
{
LOG_TRACE(log, "Cancelling query because enough data has been read");
was_cancelled = true;
connection.sendCancel();
}
/// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединении с сервером.
while (true)
{
Connection::Packet packet = connection.receivePacket();
switch (packet.type)
{
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
break;
case Protocol::Server::EndOfStream:
return;
case Protocol::Server::Exception:
got_exception_from_server = true;
packet.exception->rethrow();
break;
default:
throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}
}
}
protected:
......@@ -197,6 +145,60 @@ protected:
}
}
void readSuffixImpl()
{
/** Если одно из:
* - ничего не начинали делать;
* - получили все пакеты до EndOfStream;
* - получили с сервера эксепшен;
* - то больше читать ничего не нужно.
*/
if (!sent_query || finished || got_exception_from_server)
return;
finished = true;
/** Если ещё прочитали не все данные, но они больше не нужны.
* Это может быть из-за того, что данных достаточно (например, при использовании LIMIT).
*/
/// Отправим просьбу прервать выполнение запроса, если ещё не отправляли.
if (!was_cancelled)
{
LOG_TRACE(log, "Cancelling query because enough data has been read");
was_cancelled = true;
connection.sendCancel();
}
/// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединении с сервером.
while (true)
{
Connection::Packet packet = connection.receivePacket();
switch (packet.type)
{
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
break;
case Protocol::Server::EndOfStream:
return;
case Protocol::Server::Exception:
got_exception_from_server = true;
packet.exception->rethrow();
break;
default:
throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}
}
}
private:
/// Используется, если нужно владеть соединением из пула
ConnectionPool::Entry pool_entry;
......
......@@ -37,7 +37,7 @@ class UnionBlockInputStream : public IProfilingBlockInputStream
public:
UnionBlockInputStream(BlockInputStreams inputs_, unsigned max_threads_ = 1)
: max_threads(std::min(inputs_.size(), static_cast<size_t>(max_threads_))),
output_queue(max_threads), exhausted_inputs(0), finish(false), all_read(false),
output_queue(max_threads), exhausted_inputs(0), finish(false), all_read(false), finalized(false),
log(&Logger::get("UnionBlockInputStream"))
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
......@@ -71,37 +71,10 @@ public:
return res.str();
}
~UnionBlockInputStream()
{
LOG_TRACE(log, "Waiting for threads to finish");
finish = true;
cancel();
ExceptionPtr exception;
/// Вынем всё, что есть в очереди готовых данных.
OutputData res;
while (output_queue.tryPop(res))
if (res.exception && !exception)
exception = res.exception;
/** В этот момент, запоздавшие потоки ещё могут вставить в очередь какие-нибудь блоки, но очередь не переполнится.
* PS. Может быть, для переменной finish нужен барьер?
*/
for (ThreadsData::iterator it = threads_data.begin(); it != threads_data.end(); ++it)
it->thread->join();
/// Может быть, нам под конец положили эксепшен.
while (output_queue.tryPop(res))
if (res.exception && !exception)
exception = res.exception;
if (exception && !std::uncaught_exception())
exception->rethrow();
LOG_TRACE(log, "Waited for threads to finish");
readSuffixImpl();
}
/** Отличается от реализации по-умолчанию тем, что пытается остановить все источники,
......@@ -179,6 +152,44 @@ protected:
return res.block;
}
void readSuffixImpl()
{
if (finalized)
return;
finalized = true;
LOG_TRACE(log, "Waiting for threads to finish");
finish = true;
cancel();
ExceptionPtr exception;
/// Вынем всё, что есть в очереди готовых данных.
OutputData res;
while (output_queue.tryPop(res))
if (res.exception && !exception)
exception = res.exception;
/** В этот момент, запоздавшие потоки ещё могут вставить в очередь какие-нибудь блоки, но очередь не переполнится.
* PS. Может быть, для переменной finish нужен барьер?
*/
for (ThreadsData::iterator it = threads_data.begin(); it != threads_data.end(); ++it)
it->thread->join();
/// Может быть, нам под конец положили эксепшен.
while (output_queue.tryPop(res))
if (res.exception && !exception)
exception = res.exception;
if (exception && !std::uncaught_exception())
exception->rethrow();
LOG_TRACE(log, "Waited for threads to finish");
}
private:
/// Данные отдельного источника
struct InputData
......@@ -338,6 +349,9 @@ private:
volatile bool finish;
bool all_read;
/// Была вызвана функция readSuffixImpl.
bool finalized;
Logger * log;
};
......
......@@ -823,6 +823,9 @@ private:
void onEndOfStream()
{
if (block_std_in)
block_std_in->readSuffix();
if (block_std_out)
block_std_out->writeSuffix();
......
......@@ -204,6 +204,15 @@ Block IProfilingBlockInputStream::read()
}
void IProfilingBlockInputStream::readSuffix()
{
for (BlockInputStreams::iterator it = children.begin(); it != children.end(); ++it)
(*it)->readSuffix();
readSuffixImpl();
}
void IProfilingBlockInputStream::updateExtremes(Block & block)
{
size_t columns = block.columns();
......
......@@ -171,7 +171,7 @@ void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current,
}
void MergingSortedBlockInputStream::readSuffix()
void MergingSortedBlockInputStream::readSuffixImpl()
{
const BlockStreamProfileInfo & profile_info = getInfo();
double seconds = profile_info.work_stopwatch.elapsedSeconds();
......
......@@ -213,6 +213,8 @@ void TCPHandler::processOrdinaryQuery()
/// Отправим блок-заголовок, чтобы клиент мог подготовить формат вывода
if (state.io.in_sample && client_revision >= DBMS_MIN_REVISION_WITH_HEADER_BLOCK)
sendData(state.io.in_sample);
state.io.in->readPrefix();
AsynchronousBlockInputStream async_in(state.io.in);
......@@ -254,6 +256,8 @@ void TCPHandler::processOrdinaryQuery()
break;
}
state.io.in->readSuffix();
watch.stop();
logProfileInfo(watch, *state.io.in);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册