未验证 提交 190fd88a 编写于 作者: N Nikolai Kochetov 提交者: GitHub

Merge pull request #15785 from amosbird/e1

Code refactor.
......@@ -300,8 +300,7 @@ Block InterpreterKillQueryQuery::getSelectResult(const String & columns, const S
if (where_expression)
select_query += " WHERE " + queryToString(where_expression);
BlockIO block_io = executeQuery(select_query, context.getGlobalContext(), true);
auto stream = block_io.getInputStream();
auto stream = executeQuery(select_query, context.getGlobalContext(), true).getInputStream();
Block res = stream->read();
if (res && stream->read())
......
......@@ -804,8 +804,7 @@ void executeQuery(
InputStreamFromASTInsertQuery in(ast, &istr, streams.out->getHeader(), context, nullptr);
copyData(in, *streams.out);
}
if (streams.in)
else if (streams.in)
{
/// FIXME: try to prettify this cast using `as<>()`
const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());
......@@ -847,8 +846,7 @@ void executeQuery(
copyData(*streams.in, *out, [](){ return false; }, [&out](const Block &) { out->flush(); });
}
if (pipeline.initialized())
else if (pipeline.initialized())
{
const ASTQueryWithOutput * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());
......
......@@ -218,10 +218,7 @@ void PostgreSQLHandler::cancelRequest()
String query = Poco::format("KILL QUERY WHERE query_id = 'postgres:%d:%d'", msg->process_id, msg->secret_key);
ReadBufferFromString replacement(query);
executeQuery(
replacement, *out, true, connection_context,
[](const String &, const String &, const String &, const String &) {}
);
executeQuery(replacement, *out, true, connection_context, {});
}
inline std::unique_ptr<PostgreSQLProtocol::Messaging::StartupMessage> PostgreSQLHandler::receiveStartupMessage(int payload_size)
......
......@@ -253,27 +253,27 @@ void TCPHandler::runImpl()
/// Processing Query
state.io = executeQuery(state.query, *query_context, false, state.stage, may_have_embedded_data);
if (state.io.out)
state.need_receive_data_for_insert = true;
after_check_cancelled.restart();
after_send_progress.restart();
/// Does the request require receive data from client?
if (state.need_receive_data_for_insert)
if (state.io.out)
{
state.need_receive_data_for_insert = true;
processInsertQuery(connection_settings);
else if (state.need_receive_data_for_input)
}
else if (state.need_receive_data_for_input) // It implies pipeline execution
{
/// It is special case for input(), all works for reading data from client will be done in callbacks.
auto executor = state.io.pipeline.execute();
executor->execute(state.io.pipeline.getNumThreads());
state.io.onFinish();
}
else if (state.io.pipeline.initialized())
processOrdinaryQueryWithProcessors();
else
else if (state.io.in)
processOrdinaryQuery();
state.io.onFinish();
/// Do it before sending end of stream, to have a chance to show log message in client.
query_scope->logPeakMemoryUsage();
......@@ -509,7 +509,6 @@ void TCPHandler::processInsertQuery(const Settings & connection_settings)
readData(connection_settings);
state.io.out->writeSuffix();
state.io.onFinish();
}
......@@ -571,8 +570,6 @@ void TCPHandler::processOrdinaryQuery()
sendData({});
}
state.io.onFinish();
sendProgress();
}
......@@ -638,8 +635,6 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
sendData({});
}
state.io.onFinish();
sendProgress();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册