提交 e411916b 编写于 作者: N Nikolai Kochetov

Refactor Pipe [part 1].

上级 2ae94f45
......@@ -23,39 +23,40 @@ namespace
}
}
Pipes narrowPipes(Pipes pipes, size_t width)
void narrowPipe(Pipe & pipe, size_t width)
{
size_t size = pipes.size();
size_t size = pipe.numOutputPorts();
if (size <= width)
return pipes;
return;
std::vector<std::vector<OutputPort *>> partitions(width);
auto distribution = getDistribution(size, width);
for (size_t i = 0; i < size; ++i)
partitions[distribution[i]].emplace_back(pipes.getOutputPort(i));
pipe.transform([&](OutputPortRawPtrs ports)
{
for (size_t i = 0; i < size; ++i)
partitions[distribution[i]].emplace_back(ports[i]);
Processors concats;
concats.reserve(width);
Processors concats;
concats.reserve(width);
for (size_t i = 0; i < width; ++i)
{
auto concat = std::make_shared<ConcatProcessor>(partitions[i].at(0)->getHeader(), partitions[i].size());
size_t next_port = 0;
for (auto & port : concat->getInputs())
for (size_t i = 0; i < width; ++i)
{
connect(*partitions[i][next_port], port);
++next_port;
auto concat = std::make_shared<ConcatProcessor>(partitions[i].at(0)->getHeader(),
partitions[i].size());
size_t next_port = 0;
for (auto & port : concat->getInputs())
{
connect(*partitions[i][next_port], port);
++next_port;
}
concats.emplace_back(std::move(concat));
}
concats.emplace_back(std::move(concat));
}
auto processors = Pipes::detachProcessors(std::move(pipes));
processors.insert(processors.end(), concats.begin(), concats.end());
return Pipes(std::move(processors));
return concats;
});
}
}
......@@ -7,7 +7,6 @@ namespace DB
{
class Pipe;
using Pipes = std::vector<Pipe>;
/** If the number of sources of `inputs` is greater than `width`,
* then glues the sources to each other (using ConcatBlockInputStream),
......@@ -16,6 +15,6 @@ using Pipes = std::vector<Pipe>;
* Trying to glue the sources with each other uniformly randomly.
* (to avoid overweighting if the distribution of the amount of data in different sources is subject to some pattern)
*/
Pipes narrowPipes(Pipes pipes, size_t width);
void narrowPipe(Pipe & pipe, size_t width);
}
......@@ -113,7 +113,7 @@ void SelectStreamFactory::createForShard(
const String &, const ASTPtr & query_ast,
const Context & context, const ThrottlerPtr & throttler,
const SelectQueryInfo &,
Pipes & res)
Pipes & pipes)
{
bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
bool add_totals = false;
......@@ -130,7 +130,7 @@ void SelectStreamFactory::createForShard(
auto emplace_local_stream = [&]()
{
res.emplace_back(createLocalStream(modified_query_ast, header, context, processed_stage).getPipe());
pipes.emplace_back(createLocalStream(modified_query_ast, header, context, processed_stage).getPipe());
};
String modified_query = formattedAST(modified_query_ast);
......@@ -143,7 +143,7 @@ void SelectStreamFactory::createForShard(
if (!table_func_ptr)
remote_query_executor->setMainTable(main_table);
res.emplace_back(createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes));
pipes.emplace_back(createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes));
};
const auto & settings = context.getSettingsRef();
......@@ -285,7 +285,7 @@ void SelectStreamFactory::createForShard(
}
};
res.emplace_back(createDelayedPipe(header, lazily_create_stream));
pipes.emplace_back(createDelayedPipe(header, lazily_create_stream));
}
else
emplace_remote_stream();
......
......@@ -39,7 +39,7 @@ public:
const String & query, const ASTPtr & query_ast,
const Context & context, const ThrottlerPtr & throttler,
const SelectQueryInfo & query_info,
Pipes & res) override;
Pipes & pipes) override;
private:
const Block header;
......
......@@ -74,7 +74,7 @@ Context removeUserRestrictionsFromSettings(const Context & context, const Settin
return new_context;
}
Pipes executeQuery(
Pipe executeQuery(
IStreamFactory & stream_factory, const ClusterPtr & cluster, Poco::Logger * log,
const ASTPtr & query_ast, const Context & context, const Settings & settings, const SelectQueryInfo & query_info)
{
......
......@@ -12,7 +12,6 @@ class Cluster;
struct SelectQueryInfo;
class Pipe;
using Pipes = std::vector<Pipe>;
namespace ClusterProxy
{
......@@ -26,7 +25,7 @@ Context removeUserRestrictionsFromSettings(const Context & context, const Settin
/// Execute a distributed query, creating a vector of BlockInputStreams, from which the result can be read.
/// `stream_factory` object encapsulates the logic of creating streams for a different type of query
/// (currently SELECT, DESCRIBE).
Pipes executeQuery(
Pipe executeQuery(
IStreamFactory & stream_factory, const ClusterPtr & cluster, Poco::Logger * log,
const ASTPtr & query_ast, const Context & context, const Settings & settings, const SelectQueryInfo & query_info);
......
#include <Processors/Pipe.h>
#include <IO/WriteHelpers.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/ResizeProcessor.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/LimitTransform.h>
#include <Processors/NullSink.h>
#include <Processors/Transforms/ExtremesTransform.h>
namespace DB
{
......@@ -54,16 +59,88 @@ static void checkSource(const IProcessor & source)
toString(source.getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
}
Pipes::Pipes(ProcessorPtr source)
static OutputPort * uniteExtremes(const OutputPortRawPtrs & ports, const Block & header, Processors & processors)
{
checkSource(*source);
if (ports.empty())
return nullptr;
if (ports.size() == 1)
return ports.front();
/// Here we calculate extremes for extremes in case we unite several pipelines.
/// Example: select number from numbers(2) union all select number from numbers(3)
/// ->> Resize -> Extremes --(output port)----> Empty
/// --(extremes port)--> ...
auto resize = std::make_shared<ResizeProcessor>(header, ports.size(), 1);
auto extremes = std::make_shared<ExtremesTransform>(header);
auto sink = std::make_shared<EmptySink>(header);
auto * extremes_port = &extremes->getExtremesPort();
auto in = resize->getInputs().begin();
for (const auto & port : ports)
connect(*port, *(in++));
connect(resize->getOutputs().front(), extremes->getInputPort());
connect(extremes->getOutputPort(), sink->getPort());
processors.emplace_back(std::move(resize));
processors.emplace_back(std::move(extremes));
processors.emplace_back(std::move(sink));
return extremes_port;
}
static OutputPort * uniteTotals(const OutputPortRawPtrs & ports, const Block & header, Processors & processors)
{
if (ports.empty())
return nullptr;
if (ports.size() == 1)
return ports.front();
/// Calculate totals fro several streams.
/// Take totals from first sources which has any, skip others.
/// ->> Concat -> Limit
auto concat = std::make_shared<ConcatProcessor>(header, ports.size());
auto limit = std::make_shared<LimitTransform>(header, 1, 0);
auto * totals_port = &limit->getOutputPort();
auto in = concat->getInputs().begin();
for (const auto & port : ports)
connect(*port, *(in++));
connect(concat->getOutputs().front(), limit->getInputPort());
processors.emplace_back(std::move(concat));
processors.emplace_back(std::move(limit));
return totals_port;
}
Pipe::Pipe(ProcessorPtr source)
{
if (auto * source_from_input_stream = typeid_cast<SourceFromInputStream *>(source.get()))
{
/// Special case for SourceFromInputStream. Will remove it later.
totals_port = source_from_input_stream->getTotalsPort();
extremes_port = source_from_input_stream->getExtremesPort();
}
else if (source->getOutputs().size() != 1)
checkSource(*source);
output_ports.push_back(&source->getOutputs().front());
header = output_ports.front()->getHeader();
processors.emplace_back(std::move(source));
max_parallel_streams = 1;
}
Pipes::Pipes(Processors processors_) : processors(std::move(processors_))
Pipe::Pipe(Processors processors_) : processors(std::move(processors_))
{
/// Create hash table with processors.
std::unordered_set<const IProcessor *> set;
......@@ -75,12 +152,12 @@ Pipes::Pipes(Processors processors_) : processors(std::move(processors_))
for (const auto & port : processor->getInputs())
{
if (!port.isConnected())
throw Exception("Cannot create Pipes because processor " + processor->getName() +
throw Exception("Cannot create Pipe because processor " + processor->getName() +
" has not connected input port", ErrorCodes::LOGICAL_ERROR);
const auto * connected_processor = &port.getOutputPort().getProcessor();
if (set.count(connected_processor) == 0)
throw Exception("Cannot create Pipes because processor " + processor->getName() +
throw Exception("Cannot create Pipe because processor " + processor->getName() +
" has input port which is connected with unknown processor " +
connected_processor->getName(), ErrorCodes::LOGICAL_ERROR);
}
......@@ -95,25 +172,160 @@ Pipes::Pipes(Processors processors_) : processors(std::move(processors_))
const auto * connected_processor = &port.getInputPort().getProcessor();
if (set.count(connected_processor) == 0)
throw Exception("Cannot create Pipes because processor " + processor->getName() +
throw Exception("Cannot create Pipe because processor " + processor->getName() +
" has output port which is connected with unknown processor " +
connected_processor->getName(), ErrorCodes::LOGICAL_ERROR);
}
}
if (output_ports.empty())
throw Exception("Cannot create Pipes because processors don't have any not-connected output ports",
throw Exception("Cannot create Pipe because processors don't have any not-connected output ports",
ErrorCodes::LOGICAL_ERROR);
header = output_ports.front()->getHeader();
for (size_t i = 1; i < output_ports.size(); ++i)
assertBlocksHaveEqualStructure(header, output_ports[i]->getHeader(), "Pipes");
assertBlocksHaveEqualStructure(header, output_ports[i]->getHeader(), "Pipe");
max_parallel_streams = output_ports.size();
}
void Pipes::addTransform(ProcessorPtr transform)
static Pipes removeEmptyPipes(Pipes pipes)
{
Pipes res;
res.reserve(pipes.size());
for (auto & pipe : pipes)
{
if (!pipe.empty())
res.emplace_back(std::move(pipe));
}
return res;
}
Pipe Pipe::unitePipes(Pipes pipes)
{
pipes = removeEmptyPipes(std::move(pipes));
if (pipes.empty())
return {};
if (pipes.size() == 1)
return std::move(pipes[0]);
Pipe res;
OutputPortRawPtrs totals;
OutputPortRawPtrs extremes;
res.header = pipes.front().header;
for (auto & pipe : pipes)
{
assertBlocksHaveEqualStructure(res.header, pipe.header, "Pipe::unitePipes");
res.processors.insert(res.processors.end(), pipe.processors.begin(), pipe.processors.end());
res.output_ports.insert(res.output_ports.end(), pipe.output_ports.begin(), pipe.output_ports.end());
res.table_locks.insert(res.table_locks.end(), pipe.table_locks.begin(), pipe.table_locks.end());
res.storage_holders.insert(res.storage_holders.end(), pipe.storage_holders.begin(), pipe.storage_holders.end());
res.interpreter_context.insert(res.interpreter_context.end(),
pipe.interpreter_context.begin(), pipe.interpreter_context.end());
res.max_parallel_streams += pipe.max_parallel_streams;
if (pipe.totals_port)
totals.emplace_back(pipe.totals_port);
if (pipe.extremes_port)
extremes.emplace_back(pipe.extremes_port);
}
res.totals_port = uniteTotals(totals, res.header, res.processors);
res.extremes_port = uniteExtremes(extremes, res.header, res.processors);
}
//void Pipe::addPipes(Pipe pipes)
//{
// if (processors.empty())
// {
// *this = std::move(pipes);
// return;
// }
//
// if (pipes.processors.empty())
// return;
//
// assertBlocksHaveEqualStructure(header, pipes.header, "Pipe");
//
// max_parallel_streams += pipes.max_parallel_streams;
// processors.insert(processors.end(), pipes.processors.begin(), pipes.processors.end());
//
// OutputPortRawPtrs totals;
// if (totals_port)
// totals.emplace_back(totals_port);
// if (pipes.totals_port)
// totals.emplace_back(pipes.totals_port);
// if (!totals.empty())
// totals_port = uniteTotals(totals, header, processors);
//
// OutputPortRawPtrs extremes;
// if (extremes_port)
// extremes.emplace_back(extremes_port);
// if (pipes.extremes_port)
// extremes.emplace_back(pipes.extremes_port);
// if (!extremes.empty())
// extremes_port = uniteExtremes(extremes, header, processors);
//}
//void Pipe::addSource(ProcessorPtr source)
//{
// checkSource(*source);
// const auto & source_header = output_ports.front()->getHeader();
//
// assertBlocksHaveEqualStructure(header, source_header, "Pipes"); !!!!
//
// output_ports.push_back(&source->getOutputs().front());
// processors.emplace_back(std::move(source));
//
// max_parallel_streams = std::max<size_t>(max_parallel_streams, output_ports.size());
//}
void Pipe::addTotalsSource(ProcessorPtr source)
{
if (output_ports.empty())
throw Exception("Cannot add totals source to empty Pipe.", ErrorCodes::LOGICAL_ERROR);
if (totals_port)
throw Exception("Totals source was already added to Pipe.", ErrorCodes::LOGICAL_ERROR);
checkSource(*source);
const auto & source_header = output_ports.front()->getHeader();
assertBlocksHaveEqualStructure(header, source_header, "Pipes");
totals_port = &source->getOutputs().front();
processors.emplace_back(std::move(source));
}
void Pipe::addExtremesSource(ProcessorPtr source)
{
if (output_ports.empty())
throw Exception("Cannot add extremes source to empty Pipe.", ErrorCodes::LOGICAL_ERROR);
if (extremes_port)
throw Exception("Extremes source was already added to Pipe.", ErrorCodes::LOGICAL_ERROR);
checkSource(*source);
const auto & source_header = output_ports.front()->getHeader();
assertBlocksHaveEqualStructure(header, source_header, "Pipes");
extremes_port = &source->getOutputs().front();
processors.emplace_back(std::move(source));
}
void Pipe::addTransform(ProcessorPtr transform)
{
if (output_ports.empty())
throw Exception("Cannot add transform to empty Pipe.", ErrorCodes::LOGICAL_ERROR);
auto & inputs = transform->getInputs();
if (inputs.size() != output_ports.size())
throw Exception("Cannot add transform " + transform->getName() + " to Pipes because "
......@@ -147,10 +359,15 @@ void Pipes::addTransform(ProcessorPtr transform)
if (extremes_port)
assertBlocksHaveEqualStructure(header, extremes_port->getHeader(), "Pipes");
max_parallel_streams = std::max<size_t>(max_parallel_streams, output_ports.size());
}
void Pipes::addSimpleTransform(const ProcessorGetter & getter)
void Pipe::addSimpleTransform(const ProcessorGetter & getter)
{
if (output_ports.empty())
throw Exception("Cannot add simple transform to empty Pipe.", ErrorCodes::LOGICAL_ERROR);
Block new_header;
auto add_transform = [&](OutputPort *& port, StreamType stream_type)
......@@ -198,6 +415,77 @@ void Pipes::addSimpleTransform(const ProcessorGetter & getter)
header = std::move(new_header);
}
void Pipe::transform(const Transformer & transformer)
{
if (output_ports.empty())
throw Exception("Cannot transform empty Pipe.", ErrorCodes::LOGICAL_ERROR);
auto new_processors = transformer(output_ports);
/// Create hash table with new processors.
std::unordered_set<const IProcessor *> set;
for (const auto & processor : new_processors)
set.emplace(processor.get());
for (const auto & port : output_ports)
{
if (!port->isConnected())
throw Exception("Transformation of Pipe is not valid because output port (" +
port->getHeader().dumpStructure() + ") is not connected", ErrorCodes::LOGICAL_ERROR);
set.emplace(&port->getProcessor());
}
OutputPortRawPtrs new_output_ports;
for (const auto & processor : new_processors)
{
for (const auto & port : processor->getInputs())
{
if (!port.isConnected())
throw Exception("Transformation of Pipe is not valid because processor " + processor->getName() +
" has not connected input port", ErrorCodes::LOGICAL_ERROR);
const auto * connected_processor = &port.getOutputPort().getProcessor();
if (set.count(connected_processor) == 0)
throw Exception("Transformation of Pipe is not valid because processor " + processor->getName() +
" has input port which is connected with unknown processor " +
connected_processor->getName(), ErrorCodes::LOGICAL_ERROR);
}
for (auto & port : processor->getOutputs())
{
if (!port.isConnected())
{
new_output_ports.push_back(&port);
continue;
}
const auto * connected_processor = &port.getInputPort().getProcessor();
if (set.count(connected_processor) == 0)
throw Exception("Transformation of Pipe is not valid because processor " + processor->getName() +
" has output port which is connected with unknown processor " +
connected_processor->getName(), ErrorCodes::LOGICAL_ERROR);
}
}
if (output_ports.empty())
throw Exception("Transformation of Pipe is not valid because processors don't have any "
"not-connected output ports", ErrorCodes::LOGICAL_ERROR);
header = output_ports.front()->getHeader();
for (size_t i = 1; i < output_ports.size(); ++i)
assertBlocksHaveEqualStructure(header, output_ports[i]->getHeader(), "Pipe");
if (totals_port)
assertBlocksHaveEqualStructure(header, totals_port->getHeader(), "Pipes");
if (extremes_port)
assertBlocksHaveEqualStructure(header, extremes_port->getHeader(), "Pipes");
max_parallel_streams = std::max<size_t>(max_parallel_streams, output_ports.size());
}
/*
Pipe::Pipe(ProcessorPtr source)
{
if (auto * source_from_input_stream = typeid_cast<SourceFromInputStream *>(source.get()))
......@@ -295,5 +583,5 @@ void Pipe::enableQuota()
source->enableQuota();
}
}
*/
}
......@@ -6,39 +6,52 @@ namespace DB
{
class Pipe;
using Pipes = std::vector<Pipe>;
class IStorage;
using StoragePtr = std::shared_ptr<IStorage>;
using OutputPortRawPtrs = std::vector<OutputPort *>;
/// Pipes is a set of processors which represents the part of pipeline.
/// Pipes contains a list of output ports, with specified port for totals and specified port for extremes.
/// All output ports have same header.
/// All other ports are connected, all connections are inside processors set.
class Pipes
class Pipe
{
public:
/// Default constructor creates empty pipe. Generally, you cannot do anything with it except to check it is empty().
/// You cannot get empty pipe in any other way. All transforms check that result pipe is not empty.
Pipe() = default;
/// Create from source. Source must have no input ports and single output.
explicit Pipes(ProcessorPtr source);
explicit Pipe(ProcessorPtr source);
/// Create from processors. Use all not-connected output ports as output_ports. Check invariants.
explicit Pipes(Processors processors_);
explicit Pipe(Processors processors_);
Pipes(const Pipes & other) = delete;
Pipes(Pipes && other) = default;
Pipes & operator=(const Pipes & other) = delete;
Pipes & operator=(Pipes && other) = default;
Pipe(const Pipe & other) = delete;
Pipe(Pipe && other) = default;
Pipe & operator=(const Pipe & other) = delete;
Pipe & operator=(Pipe && other) = default;
const Block & getHeader() const { return header; }
bool empty() const { return output_ports.empty(); }
size_t size() const { return output_ports.size(); }
size_t numOutputPorts() const { return output_ports.size(); }
OutputPort * getOutputPort(size_t pos) const { return output_ports[pos]; }
OutputPort * getTotalsPort() const { return totals_port; }
OutputPort * getExtremesPort() const { return extremes_port; }
/// Add processors form other pipe. It should have same header.
//void addPipes(Pipes pipes);
/// Add processor to list, add it output ports to output_ports.
/// Processor shouldn't have input ports, output ports shouldn't be connected.
/// Output headers should have same structure and be compatible with current header (if not empty()).
/// void addSource(ProcessorPtr source);
/// Add totals and extremes.
void addTotalsSource(ProcessorPtr source);
void addExtremesSource(ProcessorPtr source);
/// Add processor to list. It should have size() input ports with compatible header.
/// Output ports should have same headers.
/// If totals or extremes are not empty, transform shouldn't change header.
......@@ -56,8 +69,13 @@ public:
/// Add transform with single input and single output for each port.
void addSimpleTransform(const ProcessorGetter & port);
/// Destroy pipes and get processors.
static Processors detachProcessors(Pipes pipes) { return std::move(pipes.processors); }
using Transformer = std::function<Processors(OutputPortRawPtrs ports)>;
/// Transform Pipe in general way.
void transform(const Transformer & transformer);
/// Unite several pipes together. They should have same header.
static Pipe unitePipes(Pipes pipes);
private:
Processors processors;
......@@ -66,7 +84,7 @@ private:
Block header;
/// Output ports. Totals and extremes are allowed to be empty.
std::vector<OutputPort *> output_ports;
OutputPortRawPtrs output_ports;
OutputPort * totals_port = nullptr;
OutputPort * extremes_port = nullptr;
......@@ -81,9 +99,12 @@ private:
/// because QueryPipeline is alive until query is finished.
std::vector<std::shared_ptr<Context>> interpreter_context;
std::vector<StoragePtr> storage_holders;
};
/// Destroy pipes and get processors.
static Processors detachProcessors(Pipe pipe) { return std::move(pipe.processors); }
};
/*
/// Pipe is a set of processors which represents the part of pipeline with single output.
/// All processors in pipe are connected. All ports are connected except the output one.
class Pipe
......@@ -170,5 +191,5 @@ private:
friend class QueryPipeline;
};
*/
}
......@@ -793,13 +793,7 @@ void QueryPipeline::initRowsBeforeLimit()
Pipe QueryPipeline::getPipe() &&
{
resize(1);
return std::move(std::move(*this).getPipes()[0]);
}
Pipes QueryPipeline::getPipes() &&
{
Pipe pipe(processors.detach(), streams.at(0), totals_having_port, extremes_port);
Pipes pipes(processors.detach(), streams.at(0), totals_having_port, extremes_port);
pipe.max_parallel_streams = streams.maxParallelStreams();
for (auto & lock : table_locks)
......
......@@ -102,7 +102,6 @@ public:
/// All pipes must have same header.
void init(Pipes pipes);
void init(Pipe pipe); /// Simple init for single pipe
bool initialized() { return !processors.empty(); }
bool isCompleted() { return initialized() && streams.empty(); }
......@@ -197,9 +196,8 @@ public:
max_threads = max_threads_;
}
/// Convert query pipeline to single or several pipes.
/// Convert query pipeline to pipe.
Pipe getPipe() &&;
Pipes getPipes() &&;
/// Get internal processors.
const Processors & getProcessors() const { return processors.get(); }
......
......@@ -113,18 +113,10 @@ Pipe createRemoteSourcePipe(
Pipe pipe(std::make_shared<RemoteSource>(query_executor, add_aggregation_info));
if (add_totals)
{
auto totals_source = std::make_shared<RemoteTotalsSource>(query_executor);
pipe.setTotalsPort(&totals_source->getPort());
pipe.addProcessors({std::move(totals_source)});
}
pipe.addTotalsSource(std::make_shared<RemoteTotalsSource>(query_executor));
if (add_extremes)
{
auto extremes_source = std::make_shared<RemoteExtremesSource>(query_executor);
pipe.setExtremesPort(&extremes_source->getPort());
pipe.addProcessors({std::move(extremes_source)});
}
pipe.addExtremesSource(std::make_shared<RemoteExtremesSource>(query_executor));
return pipe;
}
......
......@@ -6,6 +6,7 @@
#include <Storages/AlterCommands.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Processors/Pipe.h>
#include <Interpreters/Context.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/quoteString.h>
......@@ -78,6 +79,27 @@ TableExclusiveLockHolder IStorage::lockExclusively(const String & query_id, cons
return result;
}
Pipe IStorage::read(
const Names & /*column_names*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,
size_t /*max_block_size*/,
unsigned /*num_streams*/)
{
throw Exception("Method read is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
Pipe IStorage::alterPartition(
const ASTPtr & /* query */,
const StorageMetadataPtr & /* metadata_snapshot */,
const PartitionCommands & /* commands */,
const Context & /* context */)
{
throw Exception("Partition operations are not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
void IStorage::alter(
const AlterCommands & params, const Context & context, TableLockHolder &)
{
......
......@@ -50,7 +50,6 @@ using ProcessorPtr = std::shared_ptr<IProcessor>;
using Processors = std::vector<ProcessorPtr>;
class Pipe;
using Pipes = std::vector<Pipe>;
class StoragePolicy;
using StoragePolicyPtr = std::shared_ptr<const StoragePolicy>;
......@@ -136,8 +135,6 @@ public:
using ColumnSizeByName = std::unordered_map<std::string, ColumnSize>;
virtual ColumnSizeByName getColumnSizes() const { return {}; }
public:
/// Get mutable version (snapshot) of storage metadata. Metadata object is
/// multiversion, so it can be concurrently chaged, but returned copy can be
/// used without any locks.
......@@ -183,7 +180,7 @@ private:
/// Multiversion storage metadata. Allows to read/write storage metadata
/// without locks.
MultiVersionStorageMetadataPtr metadata;
private:
RWLockImpl::LockHolder tryLockTimed(
const RWLock & rwlock, RWLockImpl::Type type, const String & query_id, const SettingSeconds & acquire_timeout) const;
......@@ -276,17 +273,14 @@ public:
* changed during lifetime of the returned pipeline, but the snapshot is
* guaranteed to be immutable.
*/
virtual Pipes read(
virtual Pipe read(
const Names & /*column_names*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,
size_t /*max_block_size*/,
unsigned /*num_streams*/)
{
throw Exception("Method read is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
unsigned /*num_streams*/);
/** Writes the data to a table.
* Receives a description of the query, which can contain information about the data write method.
......@@ -355,10 +349,11 @@ public:
/** ALTER tables with regard to its partitions.
* Should handle locks for each command on its own.
*/
virtual Pipes alterPartition(const ASTPtr & /* query */, const StorageMetadataPtr & /* metadata_snapshot */, const PartitionCommands & /* commands */, const Context & /* context */)
{
throw Exception("Partition operations are not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
virtual Pipe alterPartition(
const ASTPtr & /* query */,
const StorageMetadataPtr & /* metadata_snapshot */,
const PartitionCommands & /* commands */,
const Context & /* context */);
/// Checks that partition commands can be applied to storage.
virtual void checkAlterPartitionIsPossible(const PartitionCommands & commands, const StorageMetadataPtr & metadata_snapshot, const Settings & settings) const;
......
......@@ -198,7 +198,7 @@ String StorageKafka::getDefaultClientId(const StorageID & table_id_)
}
Pipes StorageKafka::read(
Pipe StorageKafka::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & /* query_info */,
......@@ -227,7 +227,7 @@ Pipes StorageKafka::read(
}
LOG_DEBUG(log, "Starting reading {} streams", pipes.size());
return pipes;
return Pipe::unitePipes(std::move(pipes));
}
......
......@@ -37,7 +37,7 @@ public:
void startup() override;
void shutdown() override;
Pipes read(
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & query_info,
......
......@@ -528,7 +528,7 @@ void StorageLiveView::refresh(const Context & context)
}
}
Pipes StorageLiveView::read(
Pipe StorageLiveView::read(
const Names & /*column_names*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & /*query_info*/,
......@@ -537,7 +537,6 @@ Pipes StorageLiveView::read(
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
Pipes pipes;
{
std::lock_guard lock(mutex);
if (!(*blocks_ptr))
......@@ -545,9 +544,8 @@ Pipes StorageLiveView::read(
if (getNewBlocks())
condition.notify_all();
}
pipes.emplace_back(std::make_shared<BlocksSource>(blocks_ptr, getHeader()));
return Pipe(std::make_shared<BlocksSource>(blocks_ptr, getHeader()));
}
return pipes;
}
BlockInputStreams StorageLiveView::watch(
......
......@@ -124,7 +124,7 @@ public:
void refresh(const Context & context);
Pipes read(
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const SelectQueryInfo & query_info,
......
......@@ -147,7 +147,7 @@ static RelativeSize convertAbsoluteSampleSizeToRelative(const ASTPtr & node, siz
}
Pipes MergeTreeDataSelectExecutor::read(
Pipe MergeTreeDataSelectExecutor::read(
const Names & column_names_to_return,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
......@@ -162,7 +162,7 @@ Pipes MergeTreeDataSelectExecutor::read(
max_block_numbers_to_read);
}
Pipes MergeTreeDataSelectExecutor::readFromParts(
Pipe MergeTreeDataSelectExecutor::readFromParts(
MergeTreeData::DataPartsVector parts,
const Names & column_names_to_return,
const StorageMetadataPtr & metadata_snapshot,
......@@ -647,7 +647,7 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
ProfileEvents::increment(ProfileEvents::SelectedRanges, sum_ranges);
ProfileEvents::increment(ProfileEvents::SelectedMarks, sum_marks);
Pipes res;
Pipe res;
/// Projection, that needed to drop columns, which have appeared by execution
/// of some extra expressions, and to allow execute the same expressions later.
......@@ -721,31 +721,37 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
if (use_sampling)
{
for (auto & pipe : res)
pipe.addSimpleTransform(std::make_shared<FilterTransform>(
pipe.getHeader(), filter_expression, filter_function->getColumnName(), false));
res.addSimpleTransform([&filter_expression, &filter_function](const Block & header, Pipe::StreamType)
{
return std::make_shared<FilterTransform>(
header, filter_expression, filter_function->getColumnName(), false);
});
}
if (result_projection)
{
for (auto & pipe : res)
pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(
pipe.getHeader(), result_projection));
res.addSimpleTransform([&result_projection](const Block & header, Pipe::StreamType)
{
return std::make_shared<ExpressionTransform>(header, result_projection);
});
}
/// By the way, if a distributed query or query to a Merge table is made, then the `_sample_factor` column can have different values.
if (sample_factor_column_queried)
{
for (auto & pipe : res)
pipe.addSimpleTransform(std::make_shared<AddingConstColumnTransform<Float64>>(
pipe.getHeader(), std::make_shared<DataTypeFloat64>(), used_sample_factor, "_sample_factor"));
res.addSimpleTransform([used_sample_factor](const Block & header, Pipe::StreamType)
{
return std::make_shared<AddingConstColumnTransform<Float64>>(
header, std::make_shared<DataTypeFloat64>(), used_sample_factor, "_sample_factor");
});
}
if (query_info.prewhere_info && query_info.prewhere_info->remove_columns_actions)
{
for (auto & pipe : res)
pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(
pipe.getHeader(), query_info.prewhere_info->remove_columns_actions));
res.addSimpleTransform([&query_info](const Block & header, Pipe::StreamType)
{
return std::make_shared<ExpressionTransform>(header, query_info.prewhere_info->remove_columns_actions);
});
}
return res;
......@@ -774,7 +780,7 @@ size_t roundRowsOrBytesToMarks(
}
Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
RangesInDataParts && parts,
size_t num_streams,
const Names & column_names,
......@@ -822,13 +828,13 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
if (sum_marks > max_marks_to_use_cache)
use_uncompressed_cache = false;
Pipes res;
if (0 == sum_marks)
return res;
return {};
if (num_streams > 1)
{
/// Parallel query execution.
Pipes res;
/// Reduce the number of num_streams if the data is small.
if (sum_marks < num_streams * min_marks_for_concurrent_read && parts.size() < num_streams)
......@@ -867,10 +873,13 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
res.emplace_back(std::move(source));
}
return Pipe::unitePipes(std::move(res));
}
else
{
/// Sequential query execution.
Pipes res;
for (const auto & part : parts)
{
......@@ -882,18 +891,15 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
res.emplace_back(std::move(source));
}
auto pipe = Pipe::unitePipes(std::move(res));
/// Use ConcatProcessor to concat sources together.
/// It is needed to read in parts order (and so in PK order) if single thread is used.
if (res.size() > 1)
{
auto concat = std::make_shared<ConcatProcessor>(res.front().getHeader(), res.size());
Pipe pipe(std::move(res), std::move(concat));
res = Pipes();
res.emplace_back(std::move(pipe));
}
}
if (pipe.numOutputPorts() > 1)
pipe.addTransform(std::make_shared<ConcatProcessor>(pipe.getHeader(), pipe.numOutputPorts()));
return res;
return pipe;
}
}
static ExpressionActionsPtr createProjection(const Pipe & pipe, const MergeTreeData & data)
......@@ -904,7 +910,7 @@ static ExpressionActionsPtr createProjection(const Pipe & pipe, const MergeTreeD
return projection;
}
Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
RangesInDataParts && parts,
size_t num_streams,
const Names & column_names,
......@@ -956,7 +962,7 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
Pipes res;
if (sum_marks == 0)
return res;
return {};
/// Let's split ranges to avoid reading much data.
auto split_ranges = [rows_granularity = data_settings->index_granularity, max_block_size](const auto & ranges, int direction)
......@@ -1101,40 +1107,45 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
reader_settings,
virt_columns,
part.part_index_in_query));
pipes.back().addSimpleTransform(std::make_shared<ReverseTransform>(pipes.back().getHeader()));
}
}
if (pipes.size() > 1 && need_preliminary_merge)
auto pipe = Pipe::unitePipes(std::move(pipes));
if (input_order_info->direction == 1)
{
pipe.addSimpleTransform([](const Block & header, Pipe::StreamType)
{
return std::make_shared<ReverseTransform>(header);
});
}
if (pipe.numOutputPorts() > 1 && need_preliminary_merge)
{
SortDescription sort_description;
for (size_t j = 0; j < input_order_info->order_key_prefix_descr.size(); ++j)
sort_description.emplace_back(metadata_snapshot->getSortingKey().column_names[j],
input_order_info->direction, 1);
input_order_info->direction, 1);
/// Drop temporary columns, added by 'sorting_key_prefix_expr'
out_projection = createProjection(pipes.back(), data);
for (auto & pipe : pipes)
pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(pipe.getHeader(), sorting_key_prefix_expr));
auto merging_sorted = std::make_shared<MergingSortedTransform>(
pipes.back().getHeader(), pipes.size(), sort_description, max_block_size);
out_projection = createProjection(pipe, data);
pipe.addSimpleTransform([sorting_key_prefix_expr](const Block & header, Pipe::StreamType)
{
return std::make_shared<ExpressionTransform>(header, sorting_key_prefix_expr);
});
res.emplace_back(std::move(pipes), std::move(merging_sorted));
}
else
{
for (auto && pipe : pipes)
res.emplace_back(std::move(pipe));
pipe.addTransform(std::make_shared<MergingSortedTransform>(
pipe.getHeader(), pipe.numOutputPorts(), sort_description, max_block_size));
}
res.emplace_back(std::move(pipe));
}
return res;
return Pipe::unitePipes(std::move(res));
}
Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
RangesInDataParts && parts,
size_t num_streams,
const Names & column_names,
......@@ -1172,25 +1183,35 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
if (sum_marks > max_marks_to_use_cache)
use_uncompressed_cache = false;
Pipes pipes;
Pipe pipe;
for (const auto & part : parts)
{
auto source_processor = std::make_shared<MergeTreeSelectProcessor>(
data, metadata_snapshot, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges, use_uncompressed_cache,
query_info.prewhere_info, true, reader_settings,
virt_columns, part.part_index_in_query);
Pipe pipe(std::move(source_processor));
/// Drop temporary columns, added by 'sorting_key_expr'
if (!out_projection)
out_projection = createProjection(pipe, data);
Pipes pipes;
for (const auto & part : parts)
{
auto source_processor = std::make_shared<MergeTreeSelectProcessor>(
data, metadata_snapshot, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges,
use_uncompressed_cache,
query_info.prewhere_info, true, reader_settings,
virt_columns, part.part_index_in_query);
pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(pipe.getHeader(), metadata_snapshot->getSortingKey().expression));
pipes.emplace_back(std::move(pipe));
pipes.emplace_back(std::move(source_processor));
}
pipe = Pipe::unitePipes(std::move(pipes));
}
/// Drop temporary columns, added by 'sorting_key_expr'
if (!out_projection)
out_projection = createProjection(pipe, data);
pipe.addSimpleTransform([&metadata_snapshot](const Block & header, Pipe::StreamType)
{
return std::make_shared<ExpressionTransform>(header, metadata_snapshot->getSortingKey().expression);
});
Names sort_columns = metadata_snapshot->getSortingKeyColumns();
SortDescription sort_description;
size_t sort_columns_size = sort_columns.size();
......@@ -1198,7 +1219,7 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
Names partition_key_columns = metadata_snapshot->getPartitionKey().column_names;
Block header = pipes.at(0).getHeader();
Block header = pipe.getHeader();
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1);
......@@ -1208,28 +1229,28 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
{
case MergeTreeData::MergingParams::Ordinary:
{
return std::make_shared<MergingSortedTransform>(header, pipes.size(),
return std::make_shared<MergingSortedTransform>(header, pipe.numOutputPorts(),
sort_description, max_block_size);
}
case MergeTreeData::MergingParams::Collapsing:
return std::make_shared<CollapsingSortedTransform>(header, pipes.size(),
return std::make_shared<CollapsingSortedTransform>(header, pipe.numOutputPorts(),
sort_description, data.merging_params.sign_column, true, max_block_size);
case MergeTreeData::MergingParams::Summing:
return std::make_shared<SummingSortedTransform>(header, pipes.size(),
return std::make_shared<SummingSortedTransform>(header, pipe.numOutputPorts(),
sort_description, data.merging_params.columns_to_sum, partition_key_columns, max_block_size);
case MergeTreeData::MergingParams::Aggregating:
return std::make_shared<AggregatingSortedTransform>(header, pipes.size(),
return std::make_shared<AggregatingSortedTransform>(header, pipe.numOutputPorts(),
sort_description, max_block_size);
case MergeTreeData::MergingParams::Replacing:
return std::make_shared<ReplacingSortedTransform>(header, pipes.size(),
return std::make_shared<ReplacingSortedTransform>(header, pipe.numOutputPorts(),
sort_description, data.merging_params.version_column, max_block_size);
case MergeTreeData::MergingParams::VersionedCollapsing:
return std::make_shared<VersionedCollapsingTransform>(header, pipes.size(),
return std::make_shared<VersionedCollapsingTransform>(header, pipe.numOutputPorts(),
sort_description, data.merging_params.sign_column, max_block_size);
case MergeTreeData::MergingParams::Graphite:
......@@ -1244,12 +1265,8 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
if (num_streams <= 1 || sort_description.empty())
{
Pipe pipe(std::move(pipes), get_merging_processor());
pipes = Pipes();
pipes.emplace_back(std::move(pipe));
return pipes;
pipe.addTransform(get_merging_processor());
return pipe;
}
ColumnNumbers key_columns;
......@@ -1263,63 +1280,47 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
key_columns.emplace_back(desc.column_number);
}
Processors selectors;
Processors copiers;
selectors.reserve(pipes.size());
for (auto & pipe : pipes)
pipe.addSimpleTransform([&](const Block & header, Pipe::StreamType)
{
auto selector = std::make_shared<AddingSelectorTransform>(pipe.getHeader(), num_streams, key_columns);
auto copier = std::make_shared<CopyTransform>(pipe.getHeader(), num_streams);
connect(pipe.getPort(), selector->getInputPort());
connect(selector->getOutputPort(), copier->getInputPort());
selectors.emplace_back(std::move(selector));
copiers.emplace_back(std::move(copier));
}
return std::make_shared<AddingSelectorTransform>(header, num_streams, key_columns);
});
Processors merges;
std::vector<InputPorts::iterator> input_ports;
merges.reserve(num_streams);
input_ports.reserve(num_streams);
for (size_t i = 0; i < num_streams; ++i)
pipe.transform([&](OutputPortRawPtrs ports)
{
auto merge = get_merging_processor();
merge->setSelectorPosition(i);
input_ports.emplace_back(merge->getInputs().begin());
merges.emplace_back(std::move(merge));
}
Processors processors;
std::vector<OutputPorts::iterator> output_ports;
processors.reserve(ports.size() + num_streams);
output_ports.reserve(ports.size());
/// Connect outputs of i-th splitter with i-th input port of every merge.
for (auto & resize : copiers)
{
size_t input_num = 0;
for (auto & output : resize->getOutputs())
for (auto & port : ports)
{
connect(output, *input_ports[input_num]);
++input_ports[input_num];
++input_num;
auto copier = std::make_shared<CopyTransform>(header, num_streams);
connect(*port, copier->getInputPort());
output_ports.emplace_back(copier->getOutputs().begin());
processors.emplace_back(std::move(copier));
}
}
Processors processors;
for (auto & pipe : pipes)
{
auto pipe_processors = std::move(pipe).detachProcessors();
processors.insert(processors.end(), pipe_processors.begin(), pipe_processors.end());
}
for (size_t i = 0; i < num_streams; ++i)
{
auto merge = get_merging_processor();
merge->setSelectorPosition(i);
auto input = merge->getInputs().begin();
pipes.clear();
pipes.reserve(num_streams);
for (auto & merge : merges)
pipes.emplace_back(&merge->getOutputs().front());
/// Connect i-th merge with i-th input port of every copier.
for (size_t j = 0; j < ports.size(); ++j)
{
connect(*output_ports[j], *input);
++output_ports[j];
++input;
}
processors.emplace_back(std::move(merge));
}
pipes.front().addProcessors(processors);
pipes.front().addProcessors(selectors);
pipes.front().addProcessors(copiers);
pipes.front().addProcessors(merges);
return processors;
});
return pipes;
return pipe;
}
/// Calculates a set of mark ranges, that could possibly contain keys, required by condition.
......
......@@ -24,7 +24,7 @@ public:
*/
using PartitionIdToMaxBlock = std::unordered_map<String, Int64>;
Pipes read(
Pipe read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
......@@ -33,7 +33,7 @@ public:
unsigned num_streams,
const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr) const;
Pipes readFromParts(
Pipe readFromParts(
MergeTreeData::DataPartsVector parts,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
......@@ -48,7 +48,7 @@ private:
Poco::Logger * log;
Pipes spreadMarkRangesAmongStreams(
Pipe spreadMarkRangesAmongStreams(
RangesInDataParts && parts,
size_t num_streams,
const Names & column_names,
......@@ -61,7 +61,7 @@ private:
const MergeTreeReaderSettings & reader_settings) const;
/// out_projection - save projection only with columns, requested to read
Pipes spreadMarkRangesAmongStreamsWithOrder(
Pipe spreadMarkRangesAmongStreamsWithOrder(
RangesInDataParts && parts,
size_t num_streams,
const Names & column_names,
......@@ -75,7 +75,7 @@ private:
const MergeTreeReaderSettings & reader_settings,
ExpressionActionsPtr & out_projection) const;
Pipes spreadMarkRangesAmongStreamsFinal(
Pipe spreadMarkRangesAmongStreamsFinal(
RangesInDataParts && parts,
size_t num_streams,
const Names & column_names,
......
......@@ -18,7 +18,7 @@ class StorageFromMergeTreeDataPart final : public ext::shared_ptr_helper<Storage
public:
String getName() const override { return "FromMergeTreeDataPart"; }
Pipes read(
Pipe read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
......
......@@ -15,7 +15,6 @@ namespace DB
class ASTAlterCommand;
class Pipe;
using Pipes = std::vector<Pipe>;
struct PartitionCommand
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册