提交 00546498 编写于 作者: N Nikolai Kochetov

Fix StorageBuffer/

上级 9eb97e6d
......@@ -155,7 +155,8 @@ QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context
return QueryProcessingStage::FetchColumns;
}
Pipes StorageBuffer::readWithProcessors(
static Pipes readAsPipes(
const StoragePtr & storage,
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
......@@ -163,26 +164,26 @@ Pipes StorageBuffer::readWithProcessors(
size_t max_block_size,
unsigned num_streams)
{
auto read_as_pipes = [](const StoragePtr & storage,
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams)
{
if (storage->supportProcessorsPipeline())
return storage->readWithProcessors(column_names, query_info, context, processed_stage, max_block_size, num_streams);
if (storage->supportProcessorsPipeline())
return storage->readWithProcessors(column_names, query_info, context, processed_stage, max_block_size, num_streams);
auto streams = storage->read(column_names, query_info, context, processed_stage, max_block_size, num_streams);
auto streams = storage->read(column_names, query_info, context, processed_stage, max_block_size, num_streams);
Pipes pipes;
for (auto & stream : streams)
pipes.emplace_back(std::make_shared<SourceFromInputStream>(stream));
Pipes pipes;
for (auto & stream : streams)
pipes.emplace_back(std::make_shared<SourceFromInputStream>(stream));
return pipes;
};
return pipes;
};
Pipes StorageBuffer::readWithProcessors(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams)
{
Pipes pipes_from_dst;
if (!no_destination)
......@@ -206,7 +207,7 @@ Pipes StorageBuffer::readWithProcessors(
query_info.input_sorting_info = query_info.order_by_optimizer->getInputOrder(destination);
/// The destination table has the same structure of the requested columns and we can simply read blocks from there.
pipes_from_dst = read_as_pipes(destination, column_names, query_info, context, processed_stage, max_block_size, num_streams));
pipes_from_dst = readAsPipes(destination, column_names, query_info, context, processed_stage, max_block_size, num_streams));
}
else
{
......@@ -241,7 +242,7 @@ Pipes StorageBuffer::readWithProcessors(
}
else
{
pipes_from_dst = read_as_pipes(destination, columns_intersection, query_info, context, processed_stage, max_block_size, num_streams);
pipes_from_dst = readAsPipes(destination, columns_intersection, query_info, context, processed_stage, max_block_size, num_streams);
for (auto & pipe : pipes_from_dst)
{
pipe.addSimpleTransform(std::make_shared<AddingMissedTransform>(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册