提交 2cca4d5f 编写于 作者: N Nikolai Kochetov

Refactor Pipe [part 2].

上级 e411916b
......@@ -106,7 +106,7 @@ Pipe executeQuery(
for (const auto & shard_info : cluster->getShardsInfo())
stream_factory.createForShard(shard_info, query, query_ast, new_context, throttler, query_info, res);
return res;
return Pipe::unitePipes(std::move(res));
}
}
......
......@@ -363,7 +363,7 @@ void Pipe::addTransform(ProcessorPtr transform)
max_parallel_streams = std::max<size_t>(max_parallel_streams, output_ports.size());
}
void Pipe::addSimpleTransform(const ProcessorGetter & getter)
void Pipe::addSimpleTransform(const ProcessorGetterWithStreamKind & getter)
{
if (output_ports.empty())
throw Exception("Cannot add simple transform to empty Pipe.", ErrorCodes::LOGICAL_ERROR);
......@@ -415,6 +415,11 @@ void Pipe::addSimpleTransform(const ProcessorGetter & getter)
header = std::move(new_header);
}
void Pipe::addSimpleTransform(const ProcessorGetter & getter)
{
addSimpleTransform([&](const Block & stream_header, StreamType) { return getter(stream_header); });
}
void Pipe::transform(const Transformer & transformer)
{
if (output_ports.empty())
......
......@@ -64,10 +64,12 @@ public:
Extremes, /// Stream for extremes. No more then one.
};
using ProcessorGetter = std::function<ProcessorPtr(const Block & header, StreamType stream_type)>;
using ProcessorGetter = std::function<ProcessorPtr(const Block & header)>;
using ProcessorGetterWithStreamKind = std::function<ProcessorPtr(const Block & header, StreamType stream_type)>;
/// Add transform with single input and single output for each port.
void addSimpleTransform(const ProcessorGetter & port);
void addSimpleTransform(const ProcessorGetterWithStreamKind & port);
using Transformer = std::function<Processors(OutputPortRawPtrs ports)>;
......@@ -77,6 +79,12 @@ public:
/// Unite several pipes together. They should have same header.
static Pipe unitePipes(Pipes pipes);
/// Do not allow to change the table while the processors of pipe are alive.
void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }
/// This methods are from QueryPipeline. Needed to make conversion from pipeline to pipe possible.
void addInterpreterContext(std::shared_ptr<Context> context) { interpreter_context.emplace_back(std::move(context)); }
void addStorageHolder(StoragePtr storage) { storage_holders.emplace_back(std::move(storage)); }
private:
Processors processors;
......
......@@ -721,7 +721,7 @@ Pipe MergeTreeDataSelectExecutor::readFromParts(
if (use_sampling)
{
res.addSimpleTransform([&filter_expression, &filter_function](const Block & header, Pipe::StreamType)
res.addSimpleTransform([&filter_expression, &filter_function](const Block & header)
{
return std::make_shared<FilterTransform>(
header, filter_expression, filter_function->getColumnName(), false);
......@@ -730,7 +730,7 @@ Pipe MergeTreeDataSelectExecutor::readFromParts(
if (result_projection)
{
res.addSimpleTransform([&result_projection](const Block & header, Pipe::StreamType)
res.addSimpleTransform([&result_projection](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, result_projection);
});
......@@ -739,7 +739,7 @@ Pipe MergeTreeDataSelectExecutor::readFromParts(
/// By the way, if a distributed query or query to a Merge table is made, then the `_sample_factor` column can have different values.
if (sample_factor_column_queried)
{
res.addSimpleTransform([used_sample_factor](const Block & header, Pipe::StreamType)
res.addSimpleTransform([used_sample_factor](const Block & header)
{
return std::make_shared<AddingConstColumnTransform<Float64>>(
header, std::make_shared<DataTypeFloat64>(), used_sample_factor, "_sample_factor");
......@@ -748,7 +748,7 @@ Pipe MergeTreeDataSelectExecutor::readFromParts(
if (query_info.prewhere_info && query_info.prewhere_info->remove_columns_actions)
{
res.addSimpleTransform([&query_info](const Block & header, Pipe::StreamType)
res.addSimpleTransform([&query_info](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, query_info.prewhere_info->remove_columns_actions);
});
......@@ -1114,7 +1114,7 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
if (input_order_info->direction == 1)
{
pipe.addSimpleTransform([](const Block & header, Pipe::StreamType)
pipe.addSimpleTransform([](const Block & header)
{
return std::make_shared<ReverseTransform>(header);
});
......@@ -1129,10 +1129,10 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
/// Drop temporary columns, added by 'sorting_key_prefix_expr'
out_projection = createProjection(pipe, data);
pipe.addSimpleTransform([sorting_key_prefix_expr](const Block & header, Pipe::StreamType)
{
return std::make_shared<ExpressionTransform>(header, sorting_key_prefix_expr);
});
pipe.addSimpleTransform([sorting_key_prefix_expr](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, sorting_key_prefix_expr);
});
pipe.addTransform(std::make_shared<MergingSortedTransform>(
pipe.getHeader(), pipe.numOutputPorts(), sort_description, max_block_size));
......@@ -1207,7 +1207,7 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
if (!out_projection)
out_projection = createProjection(pipe, data);
pipe.addSimpleTransform([&metadata_snapshot](const Block & header, Pipe::StreamType)
pipe.addSimpleTransform([&metadata_snapshot](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, metadata_snapshot->getSortingKey().expression);
});
......@@ -1280,7 +1280,7 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
key_columns.emplace_back(desc.column_number);
}
pipe.addSimpleTransform([&](const Block & header, Pipe::StreamType)
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AddingSelectorTransform>(header, num_streams, key_columns);
});
......
......@@ -135,7 +135,7 @@ std::string PartitionCommand::typeToString() const
__builtin_unreachable();
}
Pipes convertCommandsResultToSource(const PartitionCommandsResultInfo & commands_result)
Pipe convertCommandsResultToSource(const PartitionCommandsResultInfo & commands_result)
{
Block header {
ColumnWithTypeAndName(std::make_shared<DataTypeString>(), "command_type"),
......@@ -180,11 +180,7 @@ Pipes convertCommandsResultToSource(const PartitionCommandsResultInfo & commands
}
Chunk chunk(std::move(res_columns), commands_result.size());
Pipe pipe(std::make_shared<SourceFromSingleChunk>(std::move(header), std::move(chunk)));
Pipes result;
result.emplace_back(std::move(pipe));
return result;
return Pipe(std::make_shared<SourceFromSingleChunk>(std::move(header), std::move(chunk)));
}
}
......@@ -101,6 +101,6 @@ using PartitionCommandsResultInfo = std::vector<PartitionCommandResultInfo>;
/// used to print info to the user. Tries to create narrowest table for given
/// results. For example, if all commands were FREEZE commands, than
/// old_part_name column will be absent.
Pipes convertCommandsResultToSource(const PartitionCommandsResultInfo & commands_result);
Pipe convertCommandsResultToSource(const PartitionCommandsResultInfo & commands_result);
}
......@@ -146,7 +146,7 @@ QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context
}
Pipes StorageBuffer::read(
Pipe StorageBuffer::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
......@@ -155,7 +155,7 @@ Pipes StorageBuffer::read(
size_t max_block_size,
unsigned num_streams)
{
Pipes pipes_from_dst;
Pipe pipe_from_dst;
if (destination_id)
{
......@@ -182,7 +182,7 @@ Pipes StorageBuffer::read(
query_info.input_order_info = query_info.order_optimizer->getInputOrder(destination, destination_metadata_snapshot);
/// The destination table has the same structure of the requested columns and we can simply read blocks from there.
pipes_from_dst = destination->read(
pipe_from_dst = destination->read(
column_names, destination_metadata_snapshot, query_info,
context, processed_stage, max_block_size, num_streams);
}
......@@ -217,66 +217,75 @@ Pipes StorageBuffer::read(
}
else
{
pipes_from_dst = destination->read(
pipe_from_dst = destination->read(
columns_intersection, destination_metadata_snapshot, query_info,
context, processed_stage, max_block_size, num_streams);
for (auto & pipe : pipes_from_dst)
pipe_from_dst.addSimpleTransform([&](const Block & header)
{
pipe.addSimpleTransform(std::make_shared<AddingMissedTransform>(
pipe.getHeader(), header_after_adding_defaults, metadata_snapshot->getColumns().getDefaults(), context));
return std::make_shared<AddingMissedTransform>(
header, header_after_adding_defaults, metadata_snapshot->getColumns().getDefaults(), context);
});
pipe.addSimpleTransform(std::make_shared<ConvertingTransform>(
pipe.getHeader(), header, ConvertingTransform::MatchColumnsMode::Name));
}
pipe_from_dst.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<ConvertingTransform>(
stream_header, header, ConvertingTransform::MatchColumnsMode::Name);
});
}
}
for (auto & pipe : pipes_from_dst)
pipe.addTableLock(destination_lock);
pipe_from_dst.addTableLock(destination_lock);
}
Pipes pipes_from_buffers;
pipes_from_buffers.reserve(num_shards);
for (auto & buf : buffers)
pipes_from_buffers.emplace_back(std::make_shared<BufferSource>(column_names, buf, *this, metadata_snapshot));
Pipe pipe_from_buffers;
{
Pipes pipes_from_buffers;
pipes_from_buffers.reserve(num_shards);
for (auto & buf : buffers)
pipes_from_buffers.emplace_back(std::make_shared<BufferSource>(column_names, buf, *this, metadata_snapshot));
pipe_from_buffers = Pipe::unitePipes(std::move(pipes_from_buffers));
}
/// Convert pipes from table to structure from buffer.
if (!pipes_from_buffers.empty() && !pipes_from_dst.empty()
&& !blocksHaveEqualStructure(pipes_from_buffers.front().getHeader(), pipes_from_dst.front().getHeader()))
if (!pipe_from_buffers.empty() && !pipe_from_dst.empty()
&& !blocksHaveEqualStructure(pipe_from_buffers.getHeader(), pipe_from_dst.getHeader()))
{
for (auto & pipe : pipes_from_dst)
pipe.addSimpleTransform(std::make_shared<ConvertingTransform>(
pipe.getHeader(),
pipes_from_buffers.front().getHeader(),
ConvertingTransform::MatchColumnsMode::Name));
pipe_from_buffers.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ConvertingTransform>(
header,
pipe_from_buffers.getHeader(),
ConvertingTransform::MatchColumnsMode::Name);
});
}
/** If the sources from the table were processed before some non-initial stage of query execution,
* then sources from the buffers must also be wrapped in the processing pipeline before the same stage.
*/
if (processed_stage > QueryProcessingStage::FetchColumns)
for (auto & pipe : pipes_from_buffers)
pipe = InterpreterSelectQuery(query_info.query, context, std::move(pipe), SelectQueryOptions(processed_stage)).execute().pipeline.getPipe();
pipe_from_buffers = InterpreterSelectQuery(query_info.query, context, std::move(pipe_from_buffers), SelectQueryOptions(processed_stage)).execute().pipeline.getPipe();
if (query_info.prewhere_info)
{
for (auto & pipe : pipes_from_buffers)
pipe.addSimpleTransform(std::make_shared<FilterTransform>(pipe.getHeader(), query_info.prewhere_info->prewhere_actions,
query_info.prewhere_info->prewhere_column_name, query_info.prewhere_info->remove_prewhere_column));
pipe_from_buffers.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FilterTransform>(
header, query_info.prewhere_info->prewhere_actions,
query_info.prewhere_info->prewhere_column_name, query_info.prewhere_info->remove_prewhere_column);
});
if (query_info.prewhere_info->alias_actions)
{
for (auto & pipe : pipes_from_buffers)
pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(pipe.getHeader(), query_info.prewhere_info->alias_actions));
pipe_from_buffers.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, query_info.prewhere_info->alias_actions);
});
}
}
for (auto & pipe : pipes_from_buffers)
pipes_from_dst.emplace_back(std::move(pipe));
return pipes_from_dst;
return Pipe::unitePipes({std::move(pipe_from_dst), std::move(pipe_from_buffers)});
}
......
......@@ -56,7 +56,7 @@ public:
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, const ASTPtr &) const override;
Pipes read(
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & query_info,
......
......@@ -123,7 +123,7 @@ void StorageDictionary::checkTableCanBeDropped() const
throw Exception("Cannot detach table " + getStorageID().getFullTableName() + " from a database with DICTIONARY engine", ErrorCodes::CANNOT_DETACH_DICTIONARY_AS_TABLE);
}
Pipes StorageDictionary::read(
Pipe StorageDictionary::read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & /*query_info*/,
......@@ -134,11 +134,8 @@ Pipes StorageDictionary::read(
{
auto dictionary = context.getExternalDictionariesLoader().getDictionary(dictionary_name);
auto stream = dictionary->getBlockInputStream(column_names, max_block_size);
auto source = std::make_shared<SourceFromInputStream>(stream);
/// TODO: update dictionary interface for processors.
Pipes pipes;
pipes.emplace_back(std::move(source));
return pipes;
return Pipe(std::make_shared<SourceFromInputStream>(stream));
}
......
......@@ -16,7 +16,7 @@ public:
void checkTableCanBeDropped() const override;
Pipes read(
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & query_info,
......
......@@ -467,7 +467,7 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Con
return getQueryProcessingStageImpl(context, to_stage, cluster);
}
Pipes StorageDistributed::read(
Pipe StorageDistributed::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
......
......@@ -70,7 +70,7 @@ public:
bool canForceGroupByNoMerge(const Context &, QueryProcessingStage::Enum to_stage, const ASTPtr &) const;
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum to_stage, const ASTPtr &) const override;
Pipes read(
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & query_info,
......
......@@ -378,7 +378,7 @@ private:
};
Pipes StorageFile::read(
Pipe StorageFile::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & /*query_info*/,
......@@ -419,7 +419,7 @@ Pipes StorageFile::read(
pipes.emplace_back(std::make_shared<StorageFileSource>(
this_ptr, metadata_snapshot, context, max_block_size, files_info, metadata_snapshot->getColumns().getDefaults()));
return pipes;
return Pipe::unitePipes(pipes);
}
......
......@@ -24,7 +24,7 @@ class StorageFile final : public ext::shared_ptr_helper<StorageFile>, public ISt
public:
std::string getName() const override { return "File"; }
Pipes read(
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & query_info,
......
......@@ -438,7 +438,7 @@ void registerStorageGenerateRandom(StorageFactory & factory)
});
}
Pipes StorageGenerateRandom::read(
Pipe StorageGenerateRandom::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & /*query_info*/,
......@@ -467,7 +467,7 @@ Pipes StorageGenerateRandom::read(
for (UInt64 i = 0; i < num_streams; ++i)
pipes.emplace_back(std::make_shared<GenerateSource>(max_block_size, max_array_length, max_string_length, generate(), block_header, context));
return pipes;
return Pipe::unitePipes(pipes);
}
}
......@@ -15,7 +15,7 @@ class StorageGenerateRandom final : public ext::shared_ptr_helper<StorageGenerat
public:
std::string getName() const override { return "GenerateRandom"; }
Pipes read(
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & query_info,
......
......@@ -262,7 +262,7 @@ Strings LSWithRegexpMatching(const String & path_for_ls, const HDFSFSPtr & fs, c
}
Pipes StorageHDFS::read(
Pipe StorageHDFS::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & /*query_info*/,
......@@ -298,7 +298,7 @@ Pipes StorageHDFS::read(
pipes.emplace_back(std::make_shared<HDFSSource>(
sources_info, uri_without_path, format_name, compression_method, metadata_snapshot->getSampleBlock(), context_, max_block_size));
return pipes;
return Pipe::unitePipes(pipes);
}
BlockOutputStreamPtr StorageHDFS::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
......
......@@ -19,7 +19,7 @@ class StorageHDFS final : public ext::shared_ptr_helper<StorageHDFS>, public ISt
public:
String getName() const override { return "HDFS"; }
Pipes read(
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & query_info,
......
......@@ -58,7 +58,7 @@ void StorageInput::setInputStream(BlockInputStreamPtr input_stream_)
}
Pipes StorageInput::read(
Pipe StorageInput::read(
const Names & /*column_names*/,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & /*query_info*/,
......@@ -74,15 +74,13 @@ Pipes StorageInput::read(
{
/// Send structure to the client.
query_context.initializeInput(shared_from_this());
pipes.emplace_back(std::make_shared<StorageInputSource>(query_context, metadata_snapshot->getSampleBlock()));
return pipes;
return Pipe(std::make_shared<StorageInputSource>(query_context, metadata_snapshot->getSampleBlock()));
}
if (!input_stream)
throw Exception("Input stream is not initialized, input() must be used only in INSERT SELECT query", ErrorCodes::INVALID_USAGE_OF_INPUT);
pipes.emplace_back(std::make_shared<SourceFromInputStream>(input_stream));
return pipes;
return Pipe(std::make_shared<SourceFromInputStream>(input_stream));
}
}
......@@ -17,7 +17,7 @@ public:
/// A table will read from this stream.
void setInputStream(BlockInputStreamPtr input_stream_);
Pipes read(
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & query_info,
......
......@@ -436,7 +436,7 @@ private:
// TODO: multiple stream read and index read
Pipes StorageJoin::read(
Pipe StorageJoin::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & /*query_info*/,
......@@ -447,10 +447,7 @@ Pipes StorageJoin::read(
{
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
Pipes pipes;
pipes.emplace_back(std::make_shared<JoinSource>(*join, max_block_size, metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID())));
return pipes;
return Pipe(std::make_shared<JoinSource>(*join, max_block_size, metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID())));
}
}
......@@ -36,7 +36,7 @@ public:
/// Verify that the data structure is suitable for implementing this type of JOIN.
void assertCompatible(ASTTableJoin::Kind kind_, ASTTableJoin::Strictness strictness_) const;
Pipes read(
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & query_info,
......
......@@ -604,7 +604,7 @@ const StorageLog::Marks & StorageLog::getMarksWithRealRowCount(const StorageMeta
return it->second.marks;
}
Pipes StorageLog::read(
Pipe StorageLog::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & /*query_info*/,
......@@ -647,7 +647,7 @@ Pipes StorageLog::read(
max_read_buffer_size));
}
return pipes;
return Pipe::unitePipes(pipes);
}
BlockOutputStreamPtr StorageLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
......
......@@ -24,7 +24,7 @@ class StorageLog final : public ext::shared_ptr_helper<StorageLog>, public IStor
public:
String getName() const override { return "Log"; }
Pipes read(
Pipe read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
......
......@@ -106,7 +106,7 @@ QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage(cons
return getTargetTable()->getQueryProcessingStage(context, to_stage, query_ptr);
}
Pipes StorageMaterializedView::read(
Pipe StorageMaterializedView::read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & query_info,
......@@ -122,12 +122,10 @@ Pipes StorageMaterializedView::read(
if (query_info.order_optimizer)
query_info.input_order_info = query_info.order_optimizer->getInputOrder(storage, metadata_snapshot);
Pipes pipes = storage->read(column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
Pipe pipe = storage->read(column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
pipe.addTableLock(lock);
for (auto & pipe : pipes)
pipe.addTableLock(lock);
return pipes;
return pipe;
}
BlockOutputStreamPtr StorageMaterializedView::write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context)
......@@ -250,7 +248,7 @@ void StorageMaterializedView::checkAlterIsPossible(const AlterCommands & command
}
}
Pipes StorageMaterializedView::alterPartition(
Pipe StorageMaterializedView::alterPartition(
const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, const PartitionCommands & commands, const Context & context)
{
checkStatementCanBeForwarded();
......
......@@ -51,7 +51,7 @@ public:
void checkAlterIsPossible(const AlterCommands & commands, const Settings & settings) const override;
Pipes alterPartition(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, const PartitionCommands & commands, const Context & context) override;
Pipe alterPartition(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, const PartitionCommands & commands, const Context & context) override;
void checkAlterPartitionIsPossible(const PartitionCommands & commands, const StorageMetadataPtr & metadata_snapshot, const Settings & settings) const override;
......@@ -71,7 +71,7 @@ public:
ActionLock getActionLock(StorageActionBlockType type) override;
Pipes read(
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & query_info,
......
......@@ -101,7 +101,7 @@ StorageMemory::StorageMemory(const StorageID & table_id_, ColumnsDescription col
}
Pipes StorageMemory::read(
Pipe StorageMemory::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & /*query_info*/,
......@@ -132,7 +132,7 @@ Pipes StorageMemory::read(
pipes.emplace_back(std::make_shared<MemorySource>(column_names, begin, end, *this, metadata_snapshot));
}
return pipes;
return Pipe::unitePipes(pipes);
}
......
......@@ -28,7 +28,7 @@ public:
size_t getSize() const { return data.size(); }
Pipes read(
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & query_info,
......
......@@ -19,7 +19,6 @@
#include <ext/range.h>
#include <algorithm>
#include <Parsers/queryToString.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/Transforms/AddingConstColumnTransform.h>
......@@ -128,7 +127,7 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context &
}
Pipes StorageMerge::read(
Pipe StorageMerge::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
......@@ -137,7 +136,7 @@ Pipes StorageMerge::read(
const size_t max_block_size,
unsigned num_streams)
{
Pipes res;
Pipe pipe;
bool has_table_virtual_column = false;
Names real_column_names;
......@@ -211,22 +210,19 @@ Pipes StorageMerge::read(
auto storage_metadata_snapshot = storage->getInMemoryMetadataPtr();
auto source_pipes = createSources(
pipe = createSources(
storage_metadata_snapshot, query_info, processed_stage,
max_block_size, header, table, real_column_names, modified_context,
current_streams, has_table_virtual_column);
for (auto & pipe : source_pipes)
res.emplace_back(std::move(pipe));
}
if (res.empty())
return res;
if (!pipe.empty())
narrowPipe(pipe, num_streams);
return narrowPipes(std::move(res), num_streams);
return pipe;
}
Pipes StorageMerge::createSources(
Pipe StorageMerge::createSources(
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const QueryProcessingStage::Enum & processed_stage,
......@@ -245,18 +241,17 @@ Pipes StorageMerge::createSources(
VirtualColumnUtils::rewriteEntityInAst(modified_query_info.query, "_table", table_name);
Pipes pipes;
Pipe pipe;
if (!storage)
{
auto pipe = InterpreterSelectQuery(
pipe = InterpreterSelectQuery(
modified_query_info.query, *modified_context,
std::make_shared<OneBlockInputStream>(header),
SelectQueryOptions(processed_stage).analyze()).execute().pipeline.getPipe();
pipe.addInterpreterContext(modified_context);
pipes.emplace_back(std::move(pipe));
return pipes;
return pipe;
}
auto storage_stage = storage->getQueryProcessingStage(*modified_context, QueryProcessingStage::Complete, query_info.query);
......@@ -267,7 +262,7 @@ Pipes StorageMerge::createSources(
real_column_names.push_back(ExpressionActions::getSmallestColumn(metadata_snapshot->getColumns().getAllPhysical()));
pipes = storage->read(real_column_names, metadata_snapshot, modified_query_info, *modified_context, processed_stage, max_block_size, UInt32(streams_num));
pipe = storage->read(real_column_names, metadata_snapshot, modified_query_info, *modified_context, processed_stage, max_block_size, UInt32(streams_num));
}
else if (processed_stage > storage_stage)
{
......@@ -279,46 +274,39 @@ Pipes StorageMerge::createSources(
InterpreterSelectQuery interpreter{modified_query_info.query, *modified_context, SelectQueryOptions(processed_stage)};
{
Pipe pipe = interpreter.execute().pipeline.getPipe();
pipes.emplace_back(std::move(pipe));
}
pipe = interpreter.execute().pipeline.getPipe();
/** Materialization is needed, since from distributed storage the constants come materialized.
* If you do not do this, different types (Const and non-Const) columns will be produced in different threads,
* And this is not allowed, since all code is based on the assumption that in the block stream all types are the same.
*/
pipes.back().addSimpleTransform(std::make_shared<MaterializingTransform>(pipes.back().getHeader()));
pipe.addSimpleTransform([](const Block & header) { return std::make_shared<MaterializingTransform>(header); });
}
if (!pipes.empty())
if (!pipe.empty())
{
if (concat_streams && pipes.size() > 1)
{
auto concat = std::make_shared<ConcatProcessor>(pipes.at(0).getHeader(), pipes.size());
Pipe pipe(std::move(pipes), std::move(concat));
pipes = Pipes();
pipes.emplace_back(std::move(pipe));
}
if (concat_streams && pipe.numOutputPorts() > 1)
pipe.addTransform(std::make_shared<ConcatProcessor>(pipe.getHeader(), pipe.numOutputPorts()));
for (auto & pipe : pipes)
if (has_table_virtual_column)
{
if (has_table_virtual_column)
pipe.addSimpleTransform(std::make_shared<AddingConstColumnTransform<String>>(
pipe.getHeader(), std::make_shared<DataTypeString>(), table_name, "_table"));
/// Subordinary tables could have different but convertible types, like numeric types of different width.
/// We must return streams with structure equals to structure of Merge table.
convertingSourceStream(header, metadata_snapshot, *modified_context, modified_query_info.query, pipe, processed_stage);
pipe.addSimpleTransform([name = table_name](const Block & header)
{
return std::make_shared<AddingConstColumnTransform<String>>(
header, std::make_shared<DataTypeString>(), name, "_table");
});
}
pipe.addTableLock(struct_lock);
pipe.addInterpreterContext(modified_context);
/// Subordinary tables could have different but convertible types, like numeric types of different width.
/// We must return streams with structure equals to structure of Merge table.
convertingSourceStream(header, metadata_snapshot, *modified_context, modified_query_info.query, pipe, processed_stage);
}
pipe.addTableLock(struct_lock);
pipe.addInterpreterContext(modified_context);
}
return pipes;
return pipe;
}
......@@ -452,7 +440,10 @@ void StorageMerge::convertingSourceStream(
QueryProcessingStage::Enum processed_stage)
{
Block before_block_header = pipe.getHeader();
pipe.addSimpleTransform(std::make_shared<ConvertingTransform>(before_block_header, header, ConvertingTransform::MatchColumnsMode::Name));
pipe.addSimpleTransform([&before_block_header](const Block & header)
{
return std::make_shared<ConvertingTransform>(before_block_header, header, ConvertingTransform::MatchColumnsMode::Name);
});
auto where_expression = query->as<ASTSelectQuery>()->where();
......
......@@ -29,7 +29,7 @@ public:
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, const ASTPtr &) const override;
Pipes read(
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & query_info,
......
......@@ -172,7 +172,7 @@ StorageMergeTree::~StorageMergeTree()
shutdown();
}
Pipes StorageMergeTree::read(
Pipe StorageMergeTree::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
......@@ -1052,7 +1052,7 @@ bool StorageMergeTree::optimize(
return true;
}
Pipes StorageMergeTree::alterPartition(
Pipe StorageMergeTree::alterPartition(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
const PartitionCommands & commands,
......@@ -1135,7 +1135,7 @@ Pipes StorageMergeTree::alterPartition(
if (query_context.getSettingsRef().alter_partition_verbose_result)
return convertCommandsResultToSource(result);
return { };
return {};
}
void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, const Context & context)
......
......@@ -37,7 +37,7 @@ public:
bool supportsIndexForIn() const override { return true; }
Pipes read(
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & query_info,
......@@ -61,7 +61,7 @@ public:
bool deduplicate,
const Context & context) override;
Pipes alterPartition(
Pipe alterPartition(
const ASTPtr & query,
const StorageMetadataPtr & /* metadata_snapshot */,
const PartitionCommands & commands,
......
......@@ -53,7 +53,7 @@ StorageMongoDB::StorageMongoDB(
}
Pipes StorageMongoDB::read(
Pipe StorageMongoDB::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & /*query_info*/,
......@@ -79,11 +79,8 @@ Pipes StorageMongoDB::read(
sample_block.insert({ column_data.type, column_data.name });
}
Pipes pipes;
pipes.emplace_back(std::make_shared<SourceFromInputStream>(
return Pipe(std::make_shared<SourceFromInputStream>(
std::make_shared<MongoDBBlockInputStream>(connection, createCursor(database_name, collection_name, sample_block), sample_block, max_block_size, true)));
return pipes;
}
void registerStorageMongoDB(StorageFactory & factory)
......
......@@ -34,7 +34,7 @@ public:
std::string getName() const override { return "MongoDB"; }
Pipes read(
Pipe read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
......
......@@ -64,7 +64,7 @@ StorageMySQL::StorageMySQL(
}
Pipes StorageMySQL::read(
Pipe StorageMySQL::read(
const Names & column_names_,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info_,
......@@ -94,12 +94,9 @@ Pipes StorageMySQL::read(
sample_block.insert({ column_data.type, column_data.name });
}
Pipes pipes;
/// TODO: rewrite MySQLBlockInputStream
pipes.emplace_back(std::make_shared<SourceFromInputStream>(
return Pipe(std::make_shared<SourceFromInputStream>(
std::make_shared<MySQLBlockInputStream>(pool.get(), query, sample_block, max_block_size_)));
return pipes;
}
......
......@@ -37,7 +37,7 @@ public:
std::string getName() const override { return "MySQL"; }
Pipes read(
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & query_info,
......
......@@ -22,7 +22,7 @@ class StorageNull final : public ext::shared_ptr_helper<StorageNull>, public ISt
public:
std::string getName() const override { return "Null"; }
Pipes read(
Pipe read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo &,
......@@ -31,10 +31,8 @@ public:
size_t,
unsigned) override
{
Pipes pipes;
pipes.emplace_back(
return Pipe(
std::make_shared<NullSource>(metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID())));
return pipes;
}
BlockOutputStreamPtr write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &) override
......
......@@ -3418,7 +3418,7 @@ ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock StorageReplicatedMerg
return max_added_blocks;
}
Pipes StorageReplicatedMergeTree::read(
Pipe StorageReplicatedMergeTree::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
......@@ -3864,7 +3864,7 @@ void StorageReplicatedMergeTree::alter(
}
}
Pipes StorageReplicatedMergeTree::alterPartition(
Pipe StorageReplicatedMergeTree::alterPartition(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
const PartitionCommands & commands,
......
......@@ -87,7 +87,7 @@ public:
bool supportsReplication() const override { return true; }
bool supportsDeduplication() const override { return true; }
Pipes read(
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & query_info,
......@@ -111,7 +111,7 @@ public:
void alter(const AlterCommands & params, const Context & query_context, TableLockHolder & table_lock_holder) override;
Pipes alterPartition(
Pipe alterPartition(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
const PartitionCommands & commands,
......
......@@ -284,7 +284,7 @@ Strings listFilesWithRegexpMatching(Aws::S3::S3Client & client, const S3::URI &
}
Pipes StorageS3::read(
Pipe StorageS3::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & /*query_info*/,
......@@ -319,7 +319,9 @@ Pipes StorageS3::read(
uri.bucket,
key));
return narrowPipes(std::move(pipes), num_streams);
auto pipe = Pipe::unitePipes(std::move(pipes));
narrowPipe(pipe, num_streams);
return pipe;
}
BlockOutputStreamPtr StorageS3::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
......
......@@ -41,7 +41,7 @@ public:
return name;
}
Pipes read(
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & query_info,
......
......@@ -290,7 +290,7 @@ void StorageStripeLog::rename(const String & new_path_to_table_data, const Stora
}
Pipes StorageStripeLog::read(
Pipe StorageStripeLog::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & /*query_info*/,
......@@ -310,8 +310,7 @@ Pipes StorageStripeLog::read(
String index_file = table_path + "index.mrk";
if (!disk->exists(index_file))
{
pipes.emplace_back(std::make_shared<NullSource>(metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID())));
return pipes;
Pipe(std::make_shared<NullSource>(metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID())));
}
CompressedReadBufferFromFile index_in(disk->readFile(index_file, INDEX_BUFFER_SIZE));
......@@ -335,7 +334,7 @@ Pipes StorageStripeLog::read(
/// We do not keep read lock directly at the time of reading, because we read ranges of data that do not change.
return pipes;
return Pipe::unitePipes(pipes);
}
......
......@@ -25,7 +25,7 @@ class StorageStripeLog final : public ext::shared_ptr_helper<StorageStripeLog>,
public:
String getName() const override { return "StripeLog"; }
Pipes read(
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & query_info,
......
......@@ -420,7 +420,7 @@ void StorageTinyLog::rename(const String & new_path_to_table_data, const Storage
}
Pipes StorageTinyLog::read(
Pipe StorageTinyLog::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & /*query_info*/,
......@@ -431,14 +431,10 @@ Pipes StorageTinyLog::read(
{
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
Pipes pipes;
// When reading, we lock the entire storage, because we only have one file
// per column and can't modify it concurrently.
pipes.emplace_back(std::make_shared<TinyLogSource>(
return Pipe(std::make_shared<TinyLogSource>(
max_block_size, Nested::collect(metadata_snapshot->getColumns().getAllPhysical().addTypes(column_names)), *this, context.getSettingsRef().max_read_buffer_size));
return pipes;
}
......
......@@ -24,7 +24,7 @@ class StorageTinyLog final : public ext::shared_ptr_helper<StorageTinyLog>, publ
public:
String getName() const override { return "TinyLog"; }
Pipes read(
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & query_info,
......
......@@ -13,7 +13,6 @@
#include <Formats/FormatFactory.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <Poco/Net/HTTPRequest.h>
......@@ -178,7 +177,7 @@ std::function<void(std::ostream &)> IStorageURLBase::getReadPOSTDataCallback(
}
Pipes IStorageURLBase::read(
Pipe IStorageURLBase::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
......@@ -192,8 +191,7 @@ Pipes IStorageURLBase::read(
for (const auto & [param, value] : params)
request_uri.addQueryParameter(param, value);
Pipes pipes;
pipes.emplace_back(std::make_shared<StorageURLSource>(
return Pipe(std::make_shared<StorageURLSource>(
request_uri,
getReadMethod(),
getReadPOSTDataCallback(
......@@ -207,8 +205,6 @@ Pipes IStorageURLBase::read(
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(context),
chooseCompressionMethod(request_uri.getPath(), compression_method)));
return pipes;
}
BlockOutputStreamPtr IStorageURLBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
......
......@@ -19,7 +19,7 @@ namespace DB
class IStorageURLBase : public IStorage
{
public:
Pipes read(
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & query_info,
......
......@@ -21,7 +21,7 @@ StorageValues::StorageValues(
setInMemoryMetadata(storage_metadata);
}
Pipes StorageValues::read(
Pipe StorageValues::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & /*query_info*/,
......@@ -32,12 +32,8 @@ Pipes StorageValues::read(
{
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
Pipes pipes;
Chunk chunk(res_block.getColumns(), res_block.rows());
pipes.emplace_back(std::make_shared<SourceFromSingleChunk>(res_block.cloneEmpty(), std::move(chunk)));
return pipes;
return Pipe(std::make_shared<SourceFromSingleChunk>(res_block.cloneEmpty(), std::move(chunk)));
}
}
......@@ -15,7 +15,7 @@ class StorageValues final : public ext::shared_ptr_helper<StorageValues>, public
public:
std::string getName() const override { return "Values"; }
Pipes read(
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & query_info,
......
......@@ -6,7 +6,6 @@
#include <Parsers/ASTSubquery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/queryToString.h>
#include <Storages/StorageView.h>
#include <Storages/StorageFactory.h>
......@@ -15,12 +14,8 @@
#include <Common/typeid_cast.h>
#include <Processors/Pipe.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>
namespace DB
{
......@@ -52,7 +47,7 @@ StorageView::StorageView(
}
Pipes StorageView::read(
Pipe StorageView::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
......@@ -91,9 +86,7 @@ Pipes StorageView::read(
column_names, getVirtuals(), getStorageID()), ConvertingTransform::MatchColumnsMode::Name);
});
pipes = std::move(pipeline).getPipes();
return pipes;
return std::move(pipeline).getPipe();
}
static ASTTableExpression * getFirstTableExpression(ASTSelectQuery & select_query)
......
......@@ -21,7 +21,7 @@ public:
bool supportsSampling() const override { return true; }
bool supportsFinal() const override { return true; }
Pipes read(
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & query_info,
......
......@@ -9,7 +9,6 @@
#include <IO/ReadHelpers.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Path.h>
#include <Common/ShellCommand.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Pipe.h>
......@@ -85,7 +84,7 @@ std::function<void(std::ostream &)> StorageXDBC::getReadPOSTDataCallback(
return [query](std::ostream & os) { os << "query=" << query; };
}
Pipes StorageXDBC::read(
Pipe StorageXDBC::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
......
......@@ -15,7 +15,7 @@ namespace DB
class StorageXDBC : public IStorageURLBase
{
public:
Pipes read(
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & query_info,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册