提交 4e3a0f0a 编写于 作者: A Alexey Milovidov

Fixed error [#CLICKHOUSE-2]

上级 aad0c62e
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#include <DataStreams/IProfilingBlockInputStream.h> #include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h> #include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/NullBlockInputStream.h> #include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/materializeBlock.h>
#include <IO/WriteBufferFromFile.h> #include <IO/WriteBufferFromFile.h>
#include <IO/CompressedWriteBuffer.h> #include <IO/CompressedWriteBuffer.h>
...@@ -130,7 +131,7 @@ Block Aggregator::getHeader(bool final) const ...@@ -130,7 +131,7 @@ Block Aggregator::getHeader(bool final) const
} }
} }
return res; return materializeBlock(res);
} }
......
...@@ -64,6 +64,7 @@ namespace ErrorCodes ...@@ -64,6 +64,7 @@ namespace ErrorCodes
extern const int ILLEGAL_FINAL; extern const int ILLEGAL_FINAL;
extern const int ILLEGAL_PREWHERE; extern const int ILLEGAL_PREWHERE;
extern const int TOO_MUCH_COLUMNS; extern const int TOO_MUCH_COLUMNS;
extern const int LOGICAL_ERROR;
} }
...@@ -329,7 +330,8 @@ Block InterpreterSelectQuery::getSampleBlock() ...@@ -329,7 +330,8 @@ Block InterpreterSelectQuery::getSampleBlock()
{ {
Pipeline pipeline; Pipeline pipeline;
executeWithoutUnionImpl(pipeline, std::make_shared<OneBlockInputStream>(source_header)); executeWithoutUnionImpl(pipeline, std::make_shared<OneBlockInputStream>(source_header));
return pipeline.firstStream()->getHeader(); auto res = pipeline.firstStream()->getHeader();
return res;
} }
...@@ -785,9 +787,20 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline ...@@ -785,9 +787,20 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline
query_analyzer->makeSetsForIndex(); query_analyzer->makeSetsForIndex();
/// Initialize the initial data streams to which the query transforms are superimposed. Table or subquery? /// Initialize the initial data streams to which the query transforms are superimposed. Table or subquery or prepared input?
if (!interpreter_subquery) if (!pipeline.streams.empty())
{
/// Prepared input.
}
else if (interpreter_subquery)
{ {
/// Subquery.
interpreter_subquery->executeWithoutUnionImpl(pipeline, {});
}
else if (storage)
{
/// Table.
if (max_streams == 0) if (max_streams == 0)
throw Exception("Logical error: zero number of streams requested", ErrorCodes::LOGICAL_ERROR); throw Exception("Logical error: zero number of streams requested", ErrorCodes::LOGICAL_ERROR);
...@@ -834,39 +847,37 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline ...@@ -834,39 +847,37 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline
{ {
stream->addTableLock(table_lock); stream->addTableLock(table_lock);
}); });
}
else
{
interpreter_subquery->executeWithoutUnionImpl(pipeline, input);
}
/** Set the limits and quota for reading data, the speed and time of the query. /** Set the limits and quota for reading data, the speed and time of the query.
* Such restrictions are checked on the initiating server of the request, and not on remote servers. * Such restrictions are checked on the initiating server of the request, and not on remote servers.
* Because the initiating server has a summary of the execution of the request on all servers. * Because the initiating server has a summary of the execution of the request on all servers.
*/ */
if (storage && to_stage == QueryProcessingStage::Complete) if (to_stage == QueryProcessingStage::Complete)
{
IProfilingBlockInputStream::LocalLimits limits;
limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL;
limits.max_rows_to_read = settings.limits.max_rows_to_read;
limits.max_bytes_to_read = settings.limits.max_bytes_to_read;
limits.read_overflow_mode = settings.limits.read_overflow_mode;
limits.max_execution_time = settings.limits.max_execution_time;
limits.timeout_overflow_mode = settings.limits.timeout_overflow_mode;
limits.min_execution_speed = settings.limits.min_execution_speed;
limits.timeout_before_checking_execution_speed = settings.limits.timeout_before_checking_execution_speed;
QuotaForIntervals & quota = context.getQuota();
pipeline.transform([&](auto & stream)
{ {
if (IProfilingBlockInputStream * p_stream = dynamic_cast<IProfilingBlockInputStream *>(stream.get())) IProfilingBlockInputStream::LocalLimits limits;
limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL;
limits.max_rows_to_read = settings.limits.max_rows_to_read;
limits.max_bytes_to_read = settings.limits.max_bytes_to_read;
limits.read_overflow_mode = settings.limits.read_overflow_mode;
limits.max_execution_time = settings.limits.max_execution_time;
limits.timeout_overflow_mode = settings.limits.timeout_overflow_mode;
limits.min_execution_speed = settings.limits.min_execution_speed;
limits.timeout_before_checking_execution_speed = settings.limits.timeout_before_checking_execution_speed;
QuotaForIntervals & quota = context.getQuota();
pipeline.transform([&](auto & stream)
{ {
p_stream->setLimits(limits); if (IProfilingBlockInputStream * p_stream = dynamic_cast<IProfilingBlockInputStream *>(stream.get()))
p_stream->setQuota(quota); {
} p_stream->setLimits(limits);
}); p_stream->setQuota(quota);
}
});
}
} }
else
throw Exception("Logical error in InterpreterSelectQuery: nowhere to read", ErrorCodes::LOGICAL_ERROR);
return from_stage; return from_stage;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册