提交 57d727d0 编写于 作者: N Nikolai Kochetov

Fix result_rows and result_bytes metrics for selects.

上级 4d01fb3c
......@@ -31,8 +31,8 @@ struct BlockIO
QueryPipeline pipeline;
/// Callbacks for query logging could be set here.
std::function<void(IBlockInputStream *, IBlockOutputStream *)> finish_callback;
std::function<void()> exception_callback;
std::function<void(IBlockInputStream *, IBlockOutputStream *, QueryPipeline *)> finish_callback;
std::function<void()> exception_callback;
/// When it is true, don't bother sending any non-empty blocks to the out stream
bool null_format = false;
......@@ -41,7 +41,13 @@ struct BlockIO
void onFinish()
{
if (finish_callback)
finish_callback(in.get(), out.get());
{
QueryPipeline * pipeline_ptr = nullptr;
if (pipeline.initialized())
pipeline_ptr = &pipeline;
finish_callback(in.get(), out.get(), pipeline_ptr);
}
}
void onException()
......
......@@ -479,7 +479,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
}
/// Also make possible for caller to log successful query finish and exception during execution.
auto finish_callback = [elem, &context, log_queries, log_queries_min_type = settings.log_queries_min_type] (IBlockInputStream * stream_in, IBlockOutputStream * stream_out) mutable
auto finish_callback = [elem, &context, log_queries, log_queries_min_type = settings.log_queries_min_type]
(IBlockInputStream * stream_in, IBlockOutputStream * stream_out, QueryPipeline * query_pipeline) mutable
{
QueryStatus * process_list_elem = context.getProcessListElement();
......@@ -528,6 +529,14 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
elem.result_bytes = counting_stream->getProgress().read_bytes;
}
}
else if (query_pipeline)
{
if (const auto * output_format = query_pipeline->getOutputFormat())
{
elem.result_rows = output_format->getResultRows();
elem.result_bytes = output_format->getResultBytes();
}
}
if (elem.read_rows != 0)
{
......
......@@ -59,6 +59,8 @@ void IOutputFormat::work()
switch (current_block_kind)
{
case Main:
result_rows += current_chunk.getNumRows();
result_bytes += current_chunk.allocatedBytes();
consume(std::move(current_chunk));
break;
case Totals:
......
......@@ -79,6 +79,14 @@ public:
void setTotals(const Block & totals) { consumeTotals(Chunk(totals.getColumns(), totals.rows())); }
void setExtremes(const Block & extremes) { consumeExtremes(Chunk(extremes.getColumns(), extremes.rows())); }
size_t getResultRows() const { return result_rows; }
size_t getResultBytes() const { return result_rows; }
private:
/// Counters for consumed chunks. Are used for QueryLog.
size_t result_rows = 0;
size_t result_bytes = 0;
};
}
......@@ -110,6 +110,8 @@ public:
void addCreatingSetsTransform(ProcessorPtr transform);
/// Resize pipeline to single output and add IOutputFormat. Pipeline will be completed after this transformation.
void setOutputFormat(ProcessorPtr output);
/// Get current OutputFormat.
IOutputFormat * getOutputFormat() const { return output_format; }
/// Sink is a processor with single input port and no output ports. Creates sink for each output port.
/// Pipeline will be completed after this transformation.
void setSinks(const ProcessorGetterWithStreamKind & getter);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册