未验证 提交 90b5e5d2 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #11688 from ClickHouse/query-plan

Query plan
......@@ -159,6 +159,7 @@ add_object_library(clickhouse_processors_transforms Processors/Transforms)
add_object_library(clickhouse_processors_sources Processors/Sources)
add_object_library(clickhouse_processors_merges Processors/Merges)
add_object_library(clickhouse_processors_merges_algorithms Processors/Merges/Algorithms)
add_object_library(clickhouse_processors_queryplan Processors/QueryPlan)
if (MAKE_STATIC_LIBRARIES OR NOT SPLIT_SHARED_LIBRARIES)
......
......@@ -96,39 +96,48 @@ void AggregatedDataVariants::convertToTwoLevel()
}
}
Block Aggregator::getHeader(bool final) const
{
return params.getHeader(final);
}
Block Aggregator::Params::getHeader(
const Block & src_header,
const Block & intermediate_header,
const ColumnNumbers & keys,
const AggregateDescriptions & aggregates,
bool final)
{
Block res;
if (params.src_header)
if (src_header)
{
for (size_t i = 0; i < params.keys_size; ++i)
res.insert(params.src_header.safeGetByPosition(params.keys[i]).cloneEmpty());
for (const auto & key : keys)
res.insert(src_header.safeGetByPosition(key).cloneEmpty());
for (size_t i = 0; i < params.aggregates_size; ++i)
for (const auto & aggregate : aggregates)
{
size_t arguments_size = params.aggregates[i].arguments.size();
size_t arguments_size = aggregate.arguments.size();
DataTypes argument_types(arguments_size);
for (size_t j = 0; j < arguments_size; ++j)
argument_types[j] = params.src_header.safeGetByPosition(params.aggregates[i].arguments[j]).type;
argument_types[j] = src_header.safeGetByPosition(aggregate.arguments[j]).type;
DataTypePtr type;
if (final)
type = params.aggregates[i].function->getReturnType();
type = aggregate.function->getReturnType();
else
type = std::make_shared<DataTypeAggregateFunction>(params.aggregates[i].function, argument_types, params.aggregates[i].parameters);
type = std::make_shared<DataTypeAggregateFunction>(aggregate.function, argument_types, aggregate.parameters);
res.insert({ type, params.aggregates[i].column_name });
res.insert({ type, aggregate.column_name });
}
}
else if (params.intermediate_header)
else if (intermediate_header)
{
res = params.intermediate_header.cloneEmpty();
res = intermediate_header.cloneEmpty();
if (final)
{
for (const auto & aggregate : params.aggregates)
for (const auto & aggregate : aggregates)
{
auto & elem = res.getByName(aggregate.column_name);
......
......@@ -869,8 +869,8 @@ public:
* two-level aggregation begins to be used. Enough to reach of at least one of the thresholds.
* 0 - the corresponding threshold is not specified.
*/
const size_t group_by_two_level_threshold;
const size_t group_by_two_level_threshold_bytes;
size_t group_by_two_level_threshold;
size_t group_by_two_level_threshold_bytes;
/// Settings to flush temporary data to the filesystem (external aggregation).
const size_t max_bytes_before_external_group_by; /// 0 - do not use external aggregation.
......@@ -911,6 +911,18 @@ public:
{
intermediate_header = intermediate_header_;
}
static Block getHeader(
const Block & src_header,
const Block & intermediate_header,
const ColumnNumbers & keys,
const AggregateDescriptions & aggregates,
bool final);
Block getHeader(bool final) const
{
return getHeader(src_header, intermediate_header, keys, aggregates, final);
}
};
Aggregator(const Params & params_);
......
......@@ -691,6 +691,15 @@ void ExpressionActions::execute(Block & block, ExtraBlockPtr & not_processed, si
}
}
bool ExpressionActions::hasJoinOrArrayJoin() const
{
for (const auto & action : actions)
if (action.type == ExpressionAction::JOIN || action.type == ExpressionAction::ARRAY_JOIN)
return true;
return false;
}
bool ExpressionActions::hasTotalsInJoin() const
{
for (const auto & action : actions)
......
......@@ -214,6 +214,8 @@ public:
/// Execute the expression on the block with continuation.
void execute(Block & block, ExtraBlockPtr & not_processed, size_t & start_action) const;
bool hasJoinOrArrayJoin() const;
/// Check if joined subquery has totals.
bool hasTotalsInJoin() const;
......
......@@ -77,6 +77,7 @@ namespace ErrorCodes
extern const int UNKNOWN_IDENTIFIER;
extern const int ILLEGAL_PREWHERE;
extern const int LOGICAL_ERROR;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
namespace
......@@ -103,14 +104,20 @@ bool allowEarlyConstantFolding(const ExpressionActions & actions, const Settings
}
bool sanitizeBlock(Block & block)
bool sanitizeBlock(Block & block, bool throw_if_cannot_create_column)
{
for (auto & col : block)
{
if (!col.column)
{
if (isNotCreatable(col.type->getTypeId()))
{
if (throw_if_cannot_create_column)
throw Exception("Cannot create column of type " + col.type->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return false;
}
col.column = col.type->createColumn();
}
else if (!col.column->empty())
......
......@@ -33,7 +33,7 @@ class ASTSelectQuery;
struct ASTTablesInSelectQueryElement;
/// Create columns in block or return false if not possible
bool sanitizeBlock(Block & block);
bool sanitizeBlock(Block & block, bool throw_if_cannot_create_column = false);
/// ExpressionAnalyzer sources, intermediates and results. It splits data and logic, allows to test them separately.
struct ExpressionAnalyzerData
......
......@@ -3,18 +3,15 @@
#include <DataStreams/AddingDefaultBlockOutputStream.h>
#include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <DataStreams/CheckConstraintsBlockOutputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/CountingBlockOutputStream.h>
#include <DataStreams/InputStreamFromASTInsertQuery.h>
#include <DataStreams/NullAndDoCopyBlockInputStream.h>
#include <DataStreams/NullBlockOutputStream.h>
#include <DataStreams/OwningBlockInputStream.h>
#include <DataStreams/PushingToViewsBlockOutputStream.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <DataStreams/SquashingBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/ReadBufferFromMemory.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/InterpreterWatchQuery.h>
#include <Access/AccessFlags.h>
......@@ -24,7 +21,6 @@
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/queryToString.h>
#include <Storages/Kafka/StorageKafka.h>
#include <Storages/StorageDistributed.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Common/checkStackSize.h>
......@@ -32,7 +28,6 @@
#include <Processors/NullSink.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/Sources/SinkToOutputStream.h>
#include <Processors/ConcatProcessor.h>
namespace DB
......@@ -162,7 +157,7 @@ BlockIO InterpreterInsertQuery::execute()
const auto & cluster = storage_src->getCluster();
const auto & shards_info = cluster->getShardsInfo();
std::vector<QueryPipeline> pipelines;
std::vector<std::unique_ptr<QueryPipeline>> pipelines;
String new_query_str = queryToString(new_query);
for (size_t shard_index : ext::range(0, shards_info.size()))
......@@ -171,7 +166,7 @@ BlockIO InterpreterInsertQuery::execute()
if (shard_info.isLocal())
{
InterpreterInsertQuery interpreter(new_query, context);
pipelines.emplace_back(interpreter.execute().pipeline);
pipelines.emplace_back(std::make_unique<QueryPipeline>(interpreter.execute().pipeline));
}
else
{
......@@ -183,9 +178,9 @@ BlockIO InterpreterInsertQuery::execute()
/// INSERT SELECT query returns empty block
auto in_stream = std::make_shared<RemoteBlockInputStream>(std::move(connections), new_query_str, Block{}, context);
pipelines.emplace_back();
pipelines.back().init(Pipe(std::make_shared<SourceFromInputStream>(std::move(in_stream))));
pipelines.back().setSinks([](const Block & header, QueryPipeline::StreamType) -> ProcessorPtr
pipelines.emplace_back(std::make_unique<QueryPipeline>());
pipelines.back()->init(Pipe(std::make_shared<SourceFromInputStream>(std::move(in_stream))));
pipelines.back()->setSinks([](const Block & header, QueryPipeline::StreamType) -> ProcessorPtr
{
return std::make_shared<EmptySink>(header);
});
......
......@@ -14,7 +14,6 @@
#include <Storages/ReadInOrderOptimizer.h>
#include <Interpreters/StorageID.h>
#include <Processors/QueryPipeline.h>
#include <Columns/FilterDescription.h>
namespace Poco { class Logger; }
......@@ -25,6 +24,7 @@ namespace DB
struct SubqueryForSet;
class InterpreterSelectWithUnionQuery;
class Context;
class QueryPlan;
struct SyntaxAnalyzerResult;
using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>;
......@@ -77,6 +77,9 @@ public:
/// Execute a query. Get the stream of blocks to read.
BlockIO execute() override;
/// Builds QueryPlan for current query.
void buildQueryPlan(QueryPlan & query_plan);
bool ignoreLimits() const override { return options.ignore_limits; }
bool ignoreQuota() const override { return options.ignore_quota; }
......@@ -104,35 +107,35 @@ private:
Block getSampleBlockImpl();
void executeImpl(QueryPipeline & pipeline, const BlockInputStreamPtr & prepared_input, std::optional<Pipe> prepared_pipe);
void executeImpl(QueryPlan & query_plan, const BlockInputStreamPtr & prepared_input, std::optional<Pipe> prepared_pipe);
/// Different stages of query execution.
void executeFetchColumns(
QueryProcessingStage::Enum processing_stage,
QueryPipeline & pipeline,
QueryPlan & query_plan,
const PrewhereInfoPtr & prewhere_info,
const Names & columns_to_remove_after_prewhere);
void executeWhere(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter);
void executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info);
void executeMergeAggregated(QueryPipeline & pipeline, bool overflow_row, bool final);
void executeTotalsAndHaving(QueryPipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
void executeHaving(QueryPipeline & pipeline, const ExpressionActionsPtr & expression);
static void executeExpression(QueryPipeline & pipeline, const ExpressionActionsPtr & expression);
void executeOrder(QueryPipeline & pipeline, InputOrderInfoPtr sorting_info);
void executeOrderOptimized(QueryPipeline & pipeline, InputOrderInfoPtr sorting_info, UInt64 limit, SortDescription & output_order_descr);
void executeWithFill(QueryPipeline & pipeline);
void executeMergeSorted(QueryPipeline & pipeline);
void executePreLimit(QueryPipeline & pipeline, bool do_not_skip_offset);
void executeLimitBy(QueryPipeline & pipeline);
void executeLimit(QueryPipeline & pipeline);
void executeOffset(QueryPipeline & pipeline);
static void executeProjection(QueryPipeline & pipeline, const ExpressionActionsPtr & expression);
void executeDistinct(QueryPipeline & pipeline, bool before_order, Names columns);
void executeExtremes(QueryPipeline & pipeline);
void executeSubqueriesInSetsAndJoins(QueryPipeline & pipeline, const std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
void executeMergeSorted(QueryPipeline & pipeline, const SortDescription & sort_description, UInt64 limit);
void executeWhere(QueryPlan & query_plan, const ExpressionActionsPtr & expression, bool remove_filter);
void executeAggregation(QueryPlan & query_plan, const ExpressionActionsPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info);
void executeMergeAggregated(QueryPlan & query_plan, bool overflow_row, bool final);
void executeTotalsAndHaving(QueryPlan & query_plan, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
void executeHaving(QueryPlan & query_plan, const ExpressionActionsPtr & expression);
static void executeExpression(QueryPlan & query_plan, const ExpressionActionsPtr & expression, const std::string & description);
void executeOrder(QueryPlan & query_plan, InputOrderInfoPtr sorting_info);
void executeOrderOptimized(QueryPlan & query_plan, InputOrderInfoPtr sorting_info, UInt64 limit, SortDescription & output_order_descr);
void executeWithFill(QueryPlan & query_plan);
void executeMergeSorted(QueryPlan & query_plan, const std::string & description);
void executePreLimit(QueryPlan & query_plan, bool do_not_skip_offset);
void executeLimitBy(QueryPlan & query_plan);
void executeLimit(QueryPlan & query_plan);
void executeOffset(QueryPlan & query_plan);
static void executeProjection(QueryPlan & query_plan, const ExpressionActionsPtr & expression);
void executeDistinct(QueryPlan & query_plan, bool before_order, Names columns, bool pre_distinct);
void executeExtremes(QueryPlan & query_plan);
void executeSubqueriesInSetsAndJoins(QueryPlan & query_plan, const std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
void executeMergeSorted(QueryPlan & query_plan, const SortDescription & sort_description, UInt64 limit, const std::string & description);
String generateFilterActions(
ExpressionActionsPtr & actions, const ASTPtr & row_policy_filter, const Names & prerequisite_columns = {}) const;
......@@ -143,7 +146,7 @@ private:
CUBE = 1
};
void executeRollupOrCube(QueryPipeline & pipeline, Modificator modificator);
void executeRollupOrCube(QueryPlan & query_plan, Modificator modificator);
/** If there is a SETTINGS section in the SELECT query, then apply settings from it.
*
......
......@@ -6,10 +6,9 @@
#include <Columns/getLeastSuperColumn.h>
#include <Common/typeid_cast.h>
#include <Parsers/queryToString.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Pipe.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/UnionStep.h>
namespace DB
......@@ -173,47 +172,35 @@ Block InterpreterSelectWithUnionQuery::getSampleBlock(
return cache[key] = InterpreterSelectWithUnionQuery(query_ptr, context, SelectQueryOptions().analyze()).getSampleBlock();
}
BlockIO InterpreterSelectWithUnionQuery::execute()
void InterpreterSelectWithUnionQuery::buildQueryPlan(QueryPlan & query_plan)
{
BlockIO res;
QueryPipeline & main_pipeline = res.pipeline;
std::vector<QueryPipeline> pipelines;
bool has_main_pipeline = false;
Blocks headers;
headers.reserve(nested_interpreters.size());
size_t num_plans = nested_interpreters.size();
std::vector<QueryPlan> plans(num_plans);
DataStreams data_streams(num_plans);
for (auto & interpreter : nested_interpreters)
for (size_t i = 0; i < num_plans; ++i)
{
if (!has_main_pipeline)
{
has_main_pipeline = true;
main_pipeline = interpreter->execute().pipeline;
headers.emplace_back(main_pipeline.getHeader());
}
else
{
pipelines.emplace_back(interpreter->execute().pipeline);
headers.emplace_back(pipelines.back().getHeader());
}
nested_interpreters[i]->buildQueryPlan(plans[i]);
data_streams[i] = plans[i].getCurrentDataStream();
}
if (!has_main_pipeline)
main_pipeline.init(Pipe(std::make_shared<NullSource>(getSampleBlock())));
auto max_threads = context->getSettingsRef().max_threads;
auto union_step = std::make_unique<UnionStep>(std::move(data_streams), result_header, max_threads);
if (!pipelines.empty())
{
auto common_header = getCommonHeaderForUnion(headers);
main_pipeline.unitePipelines(std::move(pipelines), common_header);
query_plan.unitePlans(std::move(union_step), std::move(plans));
}
// nested queries can force 1 thread (due to simplicity)
// but in case of union this cannot be done.
UInt64 max_threads = context->getSettingsRef().max_threads;
main_pipeline.setMaxThreads(std::min<UInt64>(nested_interpreters.size(), max_threads));
}
BlockIO InterpreterSelectWithUnionQuery::execute()
{
BlockIO res;
QueryPlan query_plan;
buildQueryPlan(query_plan);
auto pipeline = query_plan.buildQueryPipeline();
main_pipeline.addInterpreterContext(context);
res.pipeline = std::move(*pipeline);
res.pipeline.addInterpreterContext(context);
return res;
}
......
......@@ -5,14 +5,12 @@
#include <Interpreters/SelectQueryOptions.h>
#include <Parsers/IAST_fwd.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
class Context;
class InterpreterSelectQuery;
class QueryPlan;
/** Interprets one or multiple SELECT queries inside UNION ALL chain.
*/
......@@ -27,6 +25,9 @@ public:
~InterpreterSelectWithUnionQuery() override;
/// Builds QueryPlan for current query.
void buildQueryPlan(QueryPlan & query_plan);
BlockIO execute() override;
bool ignoreLimits() const override { return options.ignore_limits; }
......
......@@ -563,7 +563,7 @@ void QueryPipeline::setOutputFormat(ProcessorPtr output)
}
void QueryPipeline::unitePipelines(
std::vector<QueryPipeline> && pipelines, const Block & common_header)
std::vector<std::unique_ptr<QueryPipeline>> pipelines, const Block & common_header)
{
if (initialized())
{
......@@ -583,8 +583,9 @@ void QueryPipeline::unitePipelines(
if (totals_having_port)
totals.push_back(totals_having_port);
for (auto & pipeline : pipelines)
for (auto & pipeline_ptr : pipelines)
{
auto & pipeline = *pipeline_ptr;
pipeline.checkInitialized();
if (!pipeline.isCompleted())
......@@ -642,6 +643,8 @@ void QueryPipeline::unitePipelines(
else
totals_having_port = uniteTotals(totals, current_header, processors);
}
current_header = common_header;
}
void QueryPipeline::setProgressCallback(const ProgressCallback & callback)
......
......@@ -135,7 +135,7 @@ public:
void enableQuotaForCurrentStreams();
void unitePipelines(std::vector<QueryPipeline> && pipelines, const Block & common_header);
void unitePipelines(std::vector<std::unique_ptr<QueryPipeline>> pipelines, const Block & common_header);
PipelineExecutorPtr execute();
......
#include <Processors/QueryPlan/AddingDelayedStreamStep.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = false
};
}
AddingDelayedStreamStep::AddingDelayedStreamStep(
const DataStream & input_stream_,
ProcessorPtr source_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
, source(std::move(source_))
{
}
void AddingDelayedStreamStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.addDelayedStream(source);
}
}
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <DataStreams/SizeLimits.h>
namespace DB
{
class IProcessor;
using ProcessorPtr = std::shared_ptr<IProcessor>;
class AddingDelayedStreamStep : public ITransformingStep
{
public:
AddingDelayedStreamStep(
const DataStream & input_stream_,
ProcessorPtr source_);
String getName() const override { return "AddingDelayedStream"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
ProcessorPtr source;
};
}
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Transforms/AggregatingInOrderTransform.h>
#include <Processors/Merges/AggregatingSortedTransform.h>
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = false /// Actually, we may check that distinct names are in aggregation keys
};
}
AggregatingStep::AggregatingStep(
const DataStream & input_stream_,
Aggregator::Params params_,
bool final_,
size_t max_block_size_,
size_t merge_threads_,
size_t temporary_data_merge_threads_,
bool storage_has_evenly_distributed_read_,
InputOrderInfoPtr group_by_info_,
SortDescription group_by_sort_description_)
: ITransformingStep(input_stream_, params_.getHeader(final_), getTraits())
, params(std::move(params_))
, final(std::move(final_))
, max_block_size(max_block_size_)
, merge_threads(merge_threads_)
, temporary_data_merge_threads(temporary_data_merge_threads_)
, storage_has_evenly_distributed_read(storage_has_evenly_distributed_read_)
, group_by_info(std::move(group_by_info_))
, group_by_sort_description(std::move(group_by_sort_description_))
{
}
void AggregatingStep::transformPipeline(QueryPipeline & pipeline)
{
/// Forget about current totals and extremes. They will be calculated again after aggregation if needed.
pipeline.dropTotalsAndExtremes();
bool allow_to_use_two_level_group_by = pipeline.getNumStreams() > 1 || params.max_bytes_before_external_group_by != 0;
if (!allow_to_use_two_level_group_by)
{
params.group_by_two_level_threshold = 0;
params.group_by_two_level_threshold_bytes = 0;
}
/** Two-level aggregation is useful in two cases:
* 1. Parallel aggregation is done, and the results should be merged in parallel.
* 2. An aggregation is done with store of temporary data on the disk, and they need to be merged in a memory efficient way.
*/
auto transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), final);
if (group_by_info)
{
bool need_finish_sorting = (group_by_info->order_key_prefix_descr.size() < group_by_sort_description.size());
if (need_finish_sorting)
{
/// TOO SLOW
}
else
{
if (pipeline.getNumStreams() > 1)
{
auto many_data = std::make_shared<ManyAggregatedData>(pipeline.getNumStreams());
size_t counter = 0;
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AggregatingInOrderTransform>(header, transform_params, group_by_sort_description, max_block_size, many_data, counter++);
});
for (auto & column_description : group_by_sort_description)
{
if (!column_description.column_name.empty())
{
column_description.column_number = pipeline.getHeader().getPositionByName(column_description.column_name);
column_description.column_name.clear();
}
}
auto transform = std::make_shared<AggregatingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
group_by_sort_description,
max_block_size);
pipeline.addPipe({ std::move(transform) });
}
else
{
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AggregatingInOrderTransform>(header, transform_params, group_by_sort_description, max_block_size);
});
}
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FinalizingSimpleTransform>(header, transform_params);
});
pipeline.enableQuotaForCurrentStreams();
return;
}
}
/// If there are several sources, then we perform parallel aggregation
if (pipeline.getNumStreams() > 1)
{
/// Add resize transform to uniformly distribute data between aggregating streams.
if (!storage_has_evenly_distributed_read)
pipeline.resize(pipeline.getNumStreams(), true, true);
auto many_data = std::make_shared<ManyAggregatedData>(pipeline.getNumStreams());
size_t counter = 0;
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AggregatingTransform>(header, transform_params, many_data, counter++, merge_threads, temporary_data_merge_threads);
});
pipeline.resize(1);
}
else
{
pipeline.resize(1);
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AggregatingTransform>(header, transform_params);
});
}
pipeline.enableQuotaForCurrentStreams();
}
}
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <DataStreams/SizeLimits.h>
#include <Storages/SelectQueryInfo.h>
#include <Interpreters/Aggregator.h>
namespace DB
{
struct AggregatingTransformParams;
using AggregatingTransformParamsPtr = std::shared_ptr<AggregatingTransformParams>;
class AggregatingStep : public ITransformingStep
{
public:
AggregatingStep(
const DataStream & input_stream_,
Aggregator::Params params_,
bool final_,
size_t max_block_size_,
size_t merge_threads_,
size_t temporary_data_merge_threads_,
bool storage_has_evenly_distributed_read_,
InputOrderInfoPtr group_by_info_,
SortDescription group_by_sort_description_);
String getName() const override { return "Aggregating"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
Aggregator::Params params;
bool final;
size_t max_block_size;
size_t merge_threads;
size_t temporary_data_merge_threads;
bool storage_has_evenly_distributed_read;
InputOrderInfoPtr group_by_info;
SortDescription group_by_sort_description;
};
}
#include <Processors/QueryPlan/ConvertingStep.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Transforms/ConvertingTransform.h>
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true
};
}
static void filterDistinctColumns(const Block & res_header, NameSet & distinct_columns)
{
if (distinct_columns.empty())
return;
NameSet new_distinct_columns;
for (const auto & column : res_header)
if (distinct_columns.count(column.name))
new_distinct_columns.insert(column.name);
distinct_columns.swap(new_distinct_columns);
}
ConvertingStep::ConvertingStep(const DataStream & input_stream_, Block result_header_)
: ITransformingStep(
input_stream_,
result_header_,
getTraits())
, result_header(std::move(result_header_))
{
/// Some columns may be removed
filterDistinctColumns(output_stream->header, output_stream->distinct_columns);
filterDistinctColumns(output_stream->header, output_stream->local_distinct_columns);
}
void ConvertingStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ConvertingTransform>(header, result_header, ConvertingTransform::MatchColumnsMode::Name);
});
}
}
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
namespace DB
{
class ConvertingStep : public ITransformingStep
{
public:
ConvertingStep(const DataStream & input_stream_, Block result_header_);
String getName() const override { return "Converting"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
Block result_header;
};
}
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Transforms/CreatingSetsTransform.h>
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true
};
}
CreatingSetsStep::CreatingSetsStep(
const DataStream & input_stream_,
SubqueriesForSets subqueries_for_sets_,
SizeLimits network_transfer_limits_,
const Context & context_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
, subqueries_for_sets(std::move(subqueries_for_sets_))
, network_transfer_limits(std::move(network_transfer_limits_))
, context(context_)
{
}
void CreatingSetsStep::transformPipeline(QueryPipeline & pipeline)
{
auto creating_sets = std::make_shared<CreatingSetsTransform>(
pipeline.getHeader(), subqueries_for_sets,
network_transfer_limits,
context);
pipeline.addCreatingSetsTransform(std::move(creating_sets));
}
}
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <DataStreams/SizeLimits.h>
#include <Interpreters/SubqueryForSet.h>
namespace DB
{
class CreatingSetsStep : public ITransformingStep
{
public:
CreatingSetsStep(
const DataStream & input_stream_,
SubqueriesForSets subqueries_for_sets_,
SizeLimits network_transfer_limits_,
const Context & context_);
String getName() const override { return "CreatingSets"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
SubqueriesForSets subqueries_for_sets;
SizeLimits network_transfer_limits;
const Context & context;
};
}
#include <Processors/QueryPlan/CubeStep.h>
#include <Processors/Transforms/CubeTransform.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = false
};
}
CubeStep::CubeStep(const DataStream & input_stream_, AggregatingTransformParamsPtr params_)
: ITransformingStep(input_stream_, params_->getHeader(), getTraits())
, params(std::move(params_))
{
/// Aggregation keys are distinct
for (auto key : params->params.keys)
output_stream->distinct_columns.insert(params->params.src_header.getByPosition(key).name);
}
void CubeStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.resize(1);
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
{
if (stream_type == QueryPipeline::StreamType::Totals)
return nullptr;
return std::make_shared<CubeTransform>(header, std::move(params));
});
}
}
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <DataStreams/SizeLimits.h>
namespace DB
{
struct AggregatingTransformParams;
using AggregatingTransformParamsPtr = std::shared_ptr<AggregatingTransformParams>;
class CubeStep : public ITransformingStep
{
public:
CubeStep(const DataStream & input_stream_, AggregatingTransformParamsPtr params_);
String getName() const override { return "Cube"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
AggregatingTransformParamsPtr params;
};
}
#include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/Transforms/DistinctTransform.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true
};
}
DistinctStep::DistinctStep(
const DataStream & input_stream_,
const SizeLimits & set_size_limits_,
UInt64 limit_hint_,
const Names & columns_,
bool pre_distinct_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
, set_size_limits(set_size_limits_)
, limit_hint(limit_hint_)
, columns(columns_)
, pre_distinct(pre_distinct_)
{
auto & distinct_columns = pre_distinct ? output_stream->local_distinct_columns
: output_stream->distinct_columns;
/// Add more distinct columns.
for (const auto & name : columns)
distinct_columns.insert(name);
}
static bool checkColumnsAlreadyDistinct(const Names & columns, const NameSet & distinct_names)
{
bool columns_already_distinct = true;
for (const auto & name : columns)
if (distinct_names.count(name) == 0)
columns_already_distinct = false;
return columns_already_distinct;
}
void DistinctStep::transformPipeline(QueryPipeline & pipeline)
{
if (checkColumnsAlreadyDistinct(columns, input_streams.front().distinct_columns))
return;
if ((pre_distinct || pipeline.getNumStreams() <= 1)
&& checkColumnsAlreadyDistinct(columns, input_streams.front().local_distinct_columns))
return;
if (!pre_distinct)
pipeline.resize(1);
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipeline::StreamType::Main)
return nullptr;
return std::make_shared<DistinctTransform>(header, set_size_limits, limit_hint, columns);
});
}
}
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <DataStreams/SizeLimits.h>
namespace DB
{
class DistinctStep : public ITransformingStep
{
public:
DistinctStep(
const DataStream & input_stream_,
const SizeLimits & set_size_limits_,
UInt64 limit_hint_,
const Names & columns_,
bool pre_distinct_); /// If is enabled, execute distinct for separate streams. Otherwise, merge streams.
String getName() const override { return "Distinct"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
SizeLimits set_size_limits;
UInt64 limit_hint;
Names columns;
bool pre_distinct;
};
}
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Transforms/InflatingExpressionTransform.h>
#include <Interpreters/ExpressionActions.h>
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits(const ExpressionActionsPtr & expression)
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = !expression->hasJoinOrArrayJoin()
};
}
static void filterDistinctColumns(const Block & res_header, NameSet & distinct_columns)
{
if (distinct_columns.empty())
return;
NameSet new_distinct_columns;
for (const auto & column : res_header)
if (distinct_columns.count(column.name))
new_distinct_columns.insert(column.name);
distinct_columns.swap(new_distinct_columns);
}
ExpressionStep::ExpressionStep(const DataStream & input_stream_, ExpressionActionsPtr expression_, bool default_totals_)
: ITransformingStep(
input_stream_,
ExpressionTransform::transformHeader(input_stream_.header, expression_),
getTraits(expression_))
, expression(std::move(expression_))
, default_totals(default_totals_)
{
/// Some columns may be removed by expression.
/// TODO: also check aliases, functions and some types of join
filterDistinctColumns(output_stream->header, output_stream->distinct_columns);
filterDistinctColumns(output_stream->header, output_stream->local_distinct_columns);
}
void ExpressionStep::transformPipeline(QueryPipeline & pipeline)
{
/// In case joined subquery has totals, and we don't, add default chunk to totals.
bool add_default_totals = false;
if (default_totals && !pipeline.hasTotals())
{
pipeline.addDefaultTotals();
add_default_totals = true;
}
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type)
{
bool on_totals = stream_type == QueryPipeline::StreamType::Totals;
return std::make_shared<ExpressionTransform>(header, expression, on_totals, add_default_totals);
});
}
InflatingExpressionStep::InflatingExpressionStep(const DataStream & input_stream_, ExpressionActionsPtr expression_, bool default_totals_)
: ITransformingStep(
input_stream_,
ExpressionTransform::transformHeader(input_stream_.header, expression_),
getTraits(expression_))
, expression(std::move(expression_))
, default_totals(default_totals_)
{
filterDistinctColumns(output_stream->header, output_stream->distinct_columns);
filterDistinctColumns(output_stream->header, output_stream->local_distinct_columns);
}
void InflatingExpressionStep::transformPipeline(QueryPipeline & pipeline)
{
/// In case joined subquery has totals, and we don't, add default chunk to totals.
bool add_default_totals = false;
if (default_totals && !pipeline.hasTotals())
{
pipeline.addDefaultTotals();
add_default_totals = true;
}
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type)
{
bool on_totals = stream_type == QueryPipeline::StreamType::Totals;
return std::make_shared<InflatingExpressionTransform>(header, expression, on_totals, add_default_totals);
});
}
}
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
namespace DB
{
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
class ExpressionStep : public ITransformingStep
{
public:
explicit ExpressionStep(const DataStream & input_stream_, ExpressionActionsPtr expression_, bool default_totals_ = false);
String getName() const override { return "Expression"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
ExpressionActionsPtr expression;
bool default_totals; /// See ExpressionTransform
};
/// TODO: add separate step for join.
class InflatingExpressionStep : public ITransformingStep
{
public:
explicit InflatingExpressionStep(const DataStream & input_stream_, ExpressionActionsPtr expression_, bool default_totals_ = false);
String getName() const override { return "Expression"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
ExpressionActionsPtr expression;
bool default_totals; /// See ExpressionTransform
};
}
#include <Processors/QueryPlan/ExtremesStep.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true
};
}
ExtremesStep::ExtremesStep(const DataStream & input_stream_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
{
}
void ExtremesStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.addExtremesTransform();
}
}
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
namespace DB
{
class ExtremesStep : public ITransformingStep
{
public:
ExtremesStep(const DataStream & input_stream_);
String getName() const override { return "Extremes"; }
void transformPipeline(QueryPipeline & pipeline) override;
};
}
#include <Processors/QueryPlan/FillingStep.h>
#include <Processors/Transforms/FillingTransform.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = false /// TODO: it seem to actually be true. Check it later.
};
}
FillingStep::FillingStep(const DataStream & input_stream_, SortDescription sort_description_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
, sort_description(std::move(sort_description_))
{
}
void FillingStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FillingTransform>(header, sort_description);
});
}
}
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Core/SortDescription.h>
namespace DB
{
class FillingStep : public ITransformingStep
{
public:
FillingStep(const DataStream & input_stream_, SortDescription sort_description_);
String getName() const override { return "Filling"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
SortDescription sort_description;
};
}
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/QueryPipeline.h>
#include <Interpreters/ExpressionActions.h>
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits(const ExpressionActionsPtr & expression)
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = !expression->hasJoinOrArrayJoin() /// I suppose it actually never happens
};
}
static void filterDistinctColumns(const Block & res_header, NameSet & distinct_columns)
{
if (distinct_columns.empty())
return;
NameSet new_distinct_columns;
for (const auto & column : res_header)
if (distinct_columns.count(column.name))
new_distinct_columns.insert(column.name);
distinct_columns.swap(new_distinct_columns);
}
FilterStep::FilterStep(
const DataStream & input_stream_,
ExpressionActionsPtr expression_,
String filter_column_name_,
bool remove_filter_column_)
: ITransformingStep(
input_stream_,
FilterTransform::transformHeader(input_stream_.header, expression_, filter_column_name_, remove_filter_column_),
getTraits(expression_))
, expression(std::move(expression_))
, filter_column_name(std::move(filter_column_name_))
, remove_filter_column(remove_filter_column_)
{
/// TODO: it would be easier to remove all expressions from filter step. It should only filter by column name.
filterDistinctColumns(output_stream->header, output_stream->distinct_columns);
filterDistinctColumns(output_stream->header, output_stream->local_distinct_columns);
}
void FilterStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type)
{
bool on_totals = stream_type == QueryPipeline::StreamType::Totals;
return std::make_shared<FilterTransform>(header, expression, filter_column_name, remove_filter_column, on_totals);
});
}
}
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
namespace DB
{
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
class FilterStep : public ITransformingStep
{
public:
FilterStep(
const DataStream & input_stream_,
ExpressionActionsPtr expression_,
String filter_column_name_,
bool remove_filter_column_);
String getName() const override { return "Filter"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
ExpressionActionsPtr expression;
String filter_column_name;
bool remove_filter_column;
};
}
#include <Processors/QueryPlan/FinishSortingStep.h>
#include <Processors/Transforms/DistinctTransform.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Merges/MergingSortedTransform.h>
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Processors/Transforms/FinishSortingTransform.h>
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true
};
}
FinishSortingStep::FinishSortingStep(
const DataStream & input_stream_,
SortDescription prefix_description_,
SortDescription result_description_,
size_t max_block_size_,
UInt64 limit_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
, prefix_description(std::move(prefix_description_))
, result_description(std::move(result_description_))
, max_block_size(max_block_size_)
, limit(limit_)
{
/// Streams are merged together, only global distinct keys remain distinct.
/// Note: we can not clear it if know that there will be only one stream in pipeline. Should we add info about it?
output_stream->local_distinct_columns.clear();
}
void FinishSortingStep::transformPipeline(QueryPipeline & pipeline)
{
bool need_finish_sorting = (prefix_description.size() < result_description.size());
if (pipeline.getNumStreams() > 1)
{
UInt64 limit_for_merging = (need_finish_sorting ? 0 : limit);
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
prefix_description,
max_block_size, limit_for_merging);
pipeline.addPipe({ std::move(transform) });
}
pipeline.enableQuotaForCurrentStreams();
if (need_finish_sorting)
{
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipeline::StreamType::Main)
return nullptr;
return std::make_shared<PartialSortingTransform>(header, result_description, limit);
});
/// NOTE limits are not applied to the size of temporary sets in FinishSortingTransform
pipeline.addSimpleTransform([&](const Block & header) -> ProcessorPtr
{
return std::make_shared<FinishSortingTransform>(
header, prefix_description, result_description, max_block_size, limit);
});
}
}
}
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Core/SortDescription.h>
namespace DB
{
class FinishSortingStep : public ITransformingStep
{
public:
FinishSortingStep(
const DataStream & input_stream_,
SortDescription prefix_description_,
SortDescription result_description_,
size_t max_block_size,
UInt64 limit);
String getName() const override { return "FinishSorting"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
SortDescription prefix_description;
SortDescription result_description;
size_t max_block_size;
UInt64 limit;
};
}
#include <Processors/QueryPlan/IQueryPlanStep.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
const DataStream & IQueryPlanStep::getOutputStream() const
{
if (!hasOutputStream())
throw Exception("QueryPlanStep " + getName() + " does not have output stream.", ErrorCodes::LOGICAL_ERROR);
return *output_stream;
}
}
#pragma once
#include <Core/Block.h>
namespace DB
{
class QueryPipeline;
using QueryPipelinePtr = std::unique_ptr<QueryPipeline>;
using QueryPipelines = std::vector<QueryPipelinePtr>;
/// Description of data stream.
class DataStream
{
public:
Block header;
NameSet distinct_columns = {};
NameSet local_distinct_columns = {}; /// Those columns are distinct in separate thread, but not in general.
/// Things which may be added:
/// * sort description
/// * limit
/// * estimated rows number
/// * memory allocation context
};
using DataStreams = std::vector<DataStream>;
/// Single step of query plan.
class IQueryPlanStep
{
public:
virtual ~IQueryPlanStep() = default;
virtual String getName() const = 0;
/// Add processors from current step to QueryPipeline.
/// Calling this method, we assume and don't check that:
/// * pipelines.size() == getInputStreams.size()
/// * header from each pipeline is the same as header from corresponding input_streams
/// Result pipeline must contain any number of streams with compatible output header is hasOutputStream(),
/// or pipeline should be completed otherwise.
virtual QueryPipelinePtr updatePipeline(QueryPipelines pipelines) = 0;
const DataStreams & getInputStreams() const { return input_streams; }
bool hasOutputStream() const { return output_stream.has_value(); }
const DataStream & getOutputStream() const;
/// Methods to describe what this step is needed for.
const std::string & getStepDescription() const { return step_description; }
void setStepDescription(std::string description) { step_description = std::move(description); }
protected:
DataStreams input_streams;
std::optional<DataStream> output_stream;
/// Text description about what current step does.
std::string step_description;
};
using QueryPlanStepPtr = std::unique_ptr<IQueryPlanStep>;
}
#include <Processors/QueryPlan/ISourceStep.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
ISourceStep::ISourceStep(DataStream output_stream_)
{
output_stream = std::move(output_stream_);
}
QueryPipelinePtr ISourceStep::updatePipeline(QueryPipelines)
{
auto pipeline = std::make_unique<QueryPipeline>();
initializePipeline(*pipeline);
return pipeline;
}
}
#pragma once
#include <Processors/QueryPlan/IQueryPlanStep.h>
namespace DB
{
class ISourceStep : public IQueryPlanStep
{
public:
explicit ISourceStep(DataStream output_stream_);
QueryPipelinePtr updatePipeline(QueryPipelines pipelines) override;
virtual void initializePipeline(QueryPipeline & pipeline) = 0;
};
}
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
ITransformingStep::ITransformingStep(DataStream input_stream, Block output_header, DataStreamTraits traits)
{
input_streams.emplace_back(std::move(input_stream));
output_stream = DataStream{.header = std::move(output_header)};
if (traits.preserves_distinct_columns)
{
output_stream->distinct_columns = input_streams.front().distinct_columns;
output_stream->local_distinct_columns = input_streams.front().local_distinct_columns;
}
}
QueryPipelinePtr ITransformingStep::updatePipeline(QueryPipelines pipelines)
{
transformPipeline(*pipelines.front());
return std::move(pipelines.front());
}
}
#pragma once
#include <Processors/QueryPlan/IQueryPlanStep.h>
namespace DB
{
class ITransformingStep : public IQueryPlanStep
{
public:
struct DataStreamTraits
{
bool preserves_distinct_columns;
};
ITransformingStep(DataStream input_stream, Block output_header, DataStreamTraits traits);
QueryPipelinePtr updatePipeline(QueryPipelines pipelines) override;
virtual void transformPipeline(QueryPipeline & pipeline) = 0;
};
}
#include <Processors/QueryPlan/LimitByStep.h>
#include <Processors/Transforms/LimitByTransform.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true
};
}
LimitByStep::LimitByStep(
const DataStream & input_stream_,
size_t group_length_, size_t group_offset_, const Names & columns_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
, group_length(group_length_)
, group_offset(group_offset_)
, columns(columns_)
{
}
void LimitByStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.resize(1);
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipeline::StreamType::Main)
return nullptr;
return std::make_shared<LimitByTransform>(header, group_length, group_offset, columns);
});
}
}
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
namespace DB
{
class LimitByStep : public ITransformingStep
{
public:
explicit LimitByStep(
const DataStream & input_stream_,
size_t group_length_, size_t group_offset_, const Names & columns_);
String getName() const override { return "LimitBy"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
size_t group_length;
size_t group_offset;
Names columns;
};
}
#include <Processors/QueryPlan/LimitStep.h>
#include <Processors/QueryPipeline.h>
#include <Processors/LimitTransform.h>
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true
};
}
LimitStep::LimitStep(
const DataStream & input_stream_,
size_t limit_, size_t offset_,
bool always_read_till_end_,
bool with_ties_,
SortDescription description_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
, limit(limit_), offset(offset_)
, always_read_till_end(always_read_till_end_)
, with_ties(with_ties_), description(std::move(description_))
{
}
void LimitStep::transformPipeline(QueryPipeline & pipeline)
{
auto transform = std::make_shared<LimitTransform>(
pipeline.getHeader(), limit, offset, pipeline.getNumStreams(), always_read_till_end, with_ties, description);
pipeline.addPipe({std::move(transform)});
}
}
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <DataStreams/SizeLimits.h>
#include <Core/SortDescription.h>
namespace DB
{
class LimitStep : public ITransformingStep
{
public:
LimitStep(
const DataStream & input_stream_,
size_t limit_, size_t offset_,
bool always_read_till_end_ = false,
bool with_ties_ = false,
SortDescription description_ = {});
String getName() const override { return "Limit"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
size_t limit;
size_t offset;
bool always_read_till_end;
bool with_ties;
const SortDescription description;
};
}
#include <Processors/QueryPlan/MergeSortingStep.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Transforms/MergeSortingTransform.h>
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true
};
}
MergeSortingStep::MergeSortingStep(
const DataStream & input_stream,
const SortDescription & description_,
size_t max_merged_block_size_,
UInt64 limit_,
size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_,
VolumePtr tmp_volume_,
size_t min_free_disk_space_)
: ITransformingStep(input_stream, input_stream.header, getTraits())
, description(description_)
, max_merged_block_size(max_merged_block_size_)
, limit(limit_)
, max_bytes_before_remerge(max_bytes_before_remerge_)
, max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_volume(tmp_volume_)
, min_free_disk_space(min_free_disk_space_)
{
}
void MergeSortingStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
{
if (stream_type == QueryPipeline::StreamType::Totals)
return nullptr;
return std::make_shared<MergeSortingTransform>(
header, description, max_merged_block_size, limit,
max_bytes_before_remerge / pipeline.getNumStreams(),
max_bytes_before_external_sort,
tmp_volume,
min_free_disk_space);
});
}
}
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Core/SortDescription.h>
#include <DataStreams/SizeLimits.h>
#include <Disks/IVolume.h>
namespace DB
{
class MergeSortingStep : public ITransformingStep
{
public:
explicit MergeSortingStep(
const DataStream & input_stream,
const SortDescription & description_,
size_t max_merged_block_size_,
UInt64 limit_,
size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_,
VolumePtr tmp_volume_,
size_t min_free_disk_space_);
String getName() const override { return "MergeSorting"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
SortDescription description;
size_t max_merged_block_size;
UInt64 limit;
size_t max_bytes_before_remerge;
size_t max_bytes_before_external_sort;
VolumePtr tmp_volume;
size_t min_free_disk_space;
};
}
#include <Processors/QueryPlan/MergingAggregatedStep.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Transforms/MergingAggregatedTransform.h>
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = false
};
}
MergingAggregatedStep::MergingAggregatedStep(
const DataStream & input_stream_,
AggregatingTransformParamsPtr params_,
bool memory_efficient_aggregation_,
size_t max_threads_,
size_t memory_efficient_merge_threads_)
: ITransformingStep(input_stream_, params_->getHeader(), getTraits())
, params(params_)
, memory_efficient_aggregation(memory_efficient_aggregation_)
, max_threads(max_threads_)
, memory_efficient_merge_threads(memory_efficient_merge_threads_)
{
/// Aggregation keys are distinct
for (auto key : params->params.keys)
output_stream->distinct_columns.insert(params->params.intermediate_header.getByPosition(key).name);
}
void MergingAggregatedStep::transformPipeline(QueryPipeline & pipeline)
{
if (!memory_efficient_aggregation)
{
/// We union several sources into one, parallelizing the work.
pipeline.resize(1);
/// Now merge the aggregated blocks
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<MergingAggregatedTransform>(header, params, max_threads);
});
}
else
{
auto num_merge_threads = memory_efficient_merge_threads
? static_cast<size_t>(memory_efficient_merge_threads)
: static_cast<size_t>(max_threads);
auto pipe = createMergingAggregatedMemoryEfficientPipe(
pipeline.getHeader(),
params,
pipeline.getNumStreams(),
num_merge_threads);
pipeline.addPipe(std::move(pipe));
}
pipeline.enableQuotaForCurrentStreams();
}
}
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <DataStreams/SizeLimits.h>
namespace DB
{
struct AggregatingTransformParams;
using AggregatingTransformParamsPtr = std::shared_ptr<AggregatingTransformParams>;
class MergingAggregatedStep : public ITransformingStep
{
public:
MergingAggregatedStep(
const DataStream & input_stream_,
AggregatingTransformParamsPtr params_,
bool memory_efficient_aggregation_,
size_t max_threads_,
size_t memory_efficient_merge_threads_);
String getName() const override { return "MergingAggregated"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
AggregatingTransformParamsPtr params;
bool memory_efficient_aggregation;
size_t max_threads;
size_t memory_efficient_merge_threads;
};
}
#include <Processors/QueryPlan/MergingSortedStep.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Merges/MergingSortedTransform.h>
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true
};
}
MergingSortedStep::MergingSortedStep(
const DataStream & input_stream,
SortDescription sort_description_,
size_t max_block_size_,
UInt64 limit_)
: ITransformingStep(input_stream, input_stream.header, getTraits())
, sort_description(std::move(sort_description_))
, max_block_size(max_block_size_)
, limit(limit_)
{
/// Streams are merged together, only global distinct keys remain distinct.
/// Note: we can not clear it if know that there will be only one stream in pipeline. Should we add info about it?
output_stream->local_distinct_columns.clear();
}
void MergingSortedStep::transformPipeline(QueryPipeline & pipeline)
{
/// If there are several streams, then we merge them into one
if (pipeline.getNumStreams() > 1)
{
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
sort_description,
max_block_size, limit);
pipeline.addPipe({ std::move(transform) });
pipeline.enableQuotaForCurrentStreams();
}
}
}
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Core/SortDescription.h>
#include <DataStreams/SizeLimits.h>
#include <Disks/IVolume.h>
namespace DB
{
class MergingSortedStep : public ITransformingStep
{
public:
explicit MergingSortedStep(
const DataStream & input_stream,
SortDescription sort_description_,
size_t max_block_size_,
UInt64 limit_ = 0);
String getName() const override { return "MergingSorted"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
SortDescription sort_description;
size_t max_block_size;
UInt64 limit;
};
}
#include <Processors/QueryPlan/OffsetsStep.h>
#include <Processors/OffsetTransform.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true
};
}
OffsetsStep::OffsetsStep(const DataStream & input_stream_, size_t offset_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
, offset(offset_)
{
}
void OffsetsStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipeline::StreamType::Main)
return nullptr;
return std::make_shared<OffsetTransform>(header, offset, 1);
});
}
}
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <DataStreams/SizeLimits.h>
namespace DB
{
class OffsetsStep : public ITransformingStep
{
public:
OffsetsStep(const DataStream & input_stream_, size_t offset_);
String getName() const override { return "Offsets"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
size_t offset;
};
}
#include <Processors/QueryPlan/PartialSortingStep.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Processors/Transforms/LimitsCheckingTransform.h>
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true
};
}
PartialSortingStep::PartialSortingStep(
const DataStream & input_stream,
SortDescription sort_description_,
UInt64 limit_,
SizeLimits size_limits_)
: ITransformingStep(input_stream, input_stream.header, getTraits())
, sort_description(std::move(sort_description_))
, limit(limit_)
, size_limits(size_limits_)
{
}
void PartialSortingStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipeline::StreamType::Main)
return nullptr;
return std::make_shared<PartialSortingTransform>(header, sort_description, limit);
});
IBlockInputStream::LocalLimits limits;
limits.mode = IBlockInputStream::LIMITS_CURRENT;
limits.size_limits = size_limits;
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipeline::StreamType::Main)
return nullptr;
auto transform = std::make_shared<LimitsCheckingTransform>(header, limits);
return transform;
});
}
}
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Core/SortDescription.h>
#include <DataStreams/SizeLimits.h>
namespace DB
{
class PartialSortingStep : public ITransformingStep
{
public:
explicit PartialSortingStep(
const DataStream & input_stream,
SortDescription sort_description_,
UInt64 limit_,
SizeLimits size_limits_);
String getName() const override { return "PartialSorting"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
SortDescription sort_description;
UInt64 limit;
SizeLimits size_limits;
};
}
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Processors/QueryPipeline.h>
#include <stack>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
QueryPlan::~QueryPlan() = default;
void QueryPlan::checkInitialized() const
{
if (!isInitialized())
throw Exception("QueryPlan was not initialized", ErrorCodes::LOGICAL_ERROR);
}
void QueryPlan::checkNotCompleted() const
{
if (isCompleted())
throw Exception("QueryPlan was already completed", ErrorCodes::LOGICAL_ERROR);
}
bool QueryPlan::isCompleted() const
{
return isInitialized() && !root->step->hasOutputStream();
}
const DataStream & QueryPlan::getCurrentDataStream() const
{
checkInitialized();
checkNotCompleted();
return root->step->getOutputStream();
}
void QueryPlan::unitePlans(QueryPlanStepPtr step, std::vector<QueryPlan> plans)
{
if (isInitialized())
throw Exception("Cannot unite plans because current QueryPlan is already initialized",
ErrorCodes::LOGICAL_ERROR);
const auto & inputs = step->getInputStreams();
size_t num_inputs = step->getInputStreams().size();
if (num_inputs != plans.size())
{
throw Exception("Cannot unite QueryPlans using " + step->getName() +
" because step has different number of inputs. "
"Has " + std::to_string(plans.size()) + " plans "
"and " + std::to_string(num_inputs) + " inputs", ErrorCodes::LOGICAL_ERROR);
}
for (size_t i = 0; i < num_inputs; ++i)
{
const auto & step_header = inputs[i].header;
const auto & plan_header = plans[i].getCurrentDataStream().header;
if (!blocksHaveEqualStructure(step_header, plan_header))
throw Exception("Cannot unite QueryPlans using " + step->getName() + " because "
"it has incompatible header with plan " + root->step->getName() + " "
"plan header: " + plan_header.dumpStructure() +
"step header: " + step_header.dumpStructure(), ErrorCodes::LOGICAL_ERROR);
}
for (auto & plan : plans)
nodes.splice(nodes.end(), std::move(plan.nodes));
nodes.emplace_back(Node{.step = std::move(step)});
root = &nodes.back();
for (auto & plan : plans)
root->children.emplace_back(plan.root);
for (auto & plan : plans)
{
max_threads = std::max(max_threads, plan.max_threads);
interpreter_context.insert(interpreter_context.end(),
plan.interpreter_context.begin(), plan.interpreter_context.end());
}
}
void QueryPlan::addStep(QueryPlanStepPtr step)
{
checkNotCompleted();
size_t num_input_streams = step->getInputStreams().size();
if (num_input_streams == 0)
{
if (isInitialized())
throw Exception("Cannot add step " + step->getName() + " to QueryPlan because "
"step has no inputs, but QueryPlan is already initialised", ErrorCodes::LOGICAL_ERROR);
nodes.emplace_back(Node{.step = std::move(step)});
root = &nodes.back();
return;
}
if (num_input_streams == 1)
{
if (!isInitialized())
throw Exception("Cannot add step " + step->getName() + " to QueryPlan because "
"step has input, but QueryPlan is not initialised", ErrorCodes::LOGICAL_ERROR);
const auto & root_header = root->step->getOutputStream().header;
const auto & step_header = step->getInputStreams().front().header;
if (!blocksHaveEqualStructure(root_header, step_header))
throw Exception("Cannot add step " + step->getName() + " to QueryPlan because "
"it has incompatible header with root step " + root->step->getName() + " "
"root header: " + root_header.dumpStructure() +
"step header: " + step_header.dumpStructure(), ErrorCodes::LOGICAL_ERROR);
nodes.emplace_back(Node{.step = std::move(step), .children = {root}});
root = &nodes.back();
return;
}
throw Exception("Cannot add step " + step->getName() + " to QueryPlan because it has " +
std::to_string(num_input_streams) + " inputs but " + std::to_string(isInitialized() ? 1 : 0) +
" input expected", ErrorCodes::LOGICAL_ERROR);
}
QueryPipelinePtr QueryPlan::buildQueryPipeline()
{
checkInitialized();
struct Frame
{
Node * node;
QueryPipelines pipelines = {};
};
QueryPipelinePtr last_pipeline;
std::stack<Frame> stack;
stack.push(Frame{.node = root});
while (!stack.empty())
{
auto & frame = stack.top();
if (last_pipeline)
{
frame.pipelines.emplace_back(std::move(last_pipeline));
last_pipeline = nullptr;
}
size_t next_child = frame.pipelines.size();
if (next_child == frame.node->children.size())
{
bool limit_max_threads = frame.pipelines.empty();
last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines));
if (limit_max_threads)
last_pipeline->setMaxThreads(max_threads);
stack.pop();
}
else
stack.push(Frame{.node = frame.node->children[next_child]});
}
for (auto & context : interpreter_context)
last_pipeline->addInterpreterContext(std::move(context));
return last_pipeline;
}
void QueryPlan::addInterpreterContext(std::shared_ptr<Context> context)
{
interpreter_context.emplace_back(std::move(context));
}
}
#pragma once
#include <memory>
#include <list>
#include <vector>
namespace DB
{
class DataStream;
class IQueryPlanStep;
using QueryPlanStepPtr = std::unique_ptr<IQueryPlanStep>;
class QueryPipeline;
using QueryPipelinePtr = std::unique_ptr<QueryPipeline>;
class Context;
/// A tree of query steps.
class QueryPlan
{
public:
~QueryPlan();
void unitePlans(QueryPlanStepPtr step, std::vector<QueryPlan> plans);
void addStep(QueryPlanStepPtr step);
bool isInitialized() const { return root != nullptr; } /// Tree is not empty
bool isCompleted() const; /// Tree is not empty and root hasOutputStream()
const DataStream & getCurrentDataStream() const; /// Checks that (isInitialized() && !isCompleted())
QueryPipelinePtr buildQueryPipeline();
/// Set upper limit for the recommend number of threads. Will be applied to the newly-created pipelines.
/// TODO: make it in a better way.
void setMaxThreads(size_t max_threads_) { max_threads = max_threads_; }
void addInterpreterContext(std::shared_ptr<Context> context);
private:
struct Node
{
QueryPlanStepPtr step;
std::vector<Node *> children = {};
};
using Nodes = std::list<Node>;
Nodes nodes;
Node * root = nullptr;
void checkInitialized() const;
void checkNotCompleted() const;
size_t max_threads = 0;
std::vector<std::shared_ptr<Context>> interpreter_context;
};
}
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
ReadFromPreparedSource::ReadFromPreparedSource(Pipe pipe_, std::shared_ptr<Context> context_)
: ISourceStep(DataStream{.header = pipe_.getHeader()})
, pipe(std::move(pipe_))
, context(std::move(context_))
{
}
void ReadFromPreparedSource::initializePipeline(QueryPipeline & pipeline)
{
pipeline.init(std::move(pipe));
pipeline.addInterpreterContext(std::move(context));
}
}
#pragma once
#include <Processors/QueryPlan/ISourceStep.h>
#include <Processors/Pipe.h>
namespace DB
{
class ReadFromPreparedSource : public ISourceStep
{
public:
explicit ReadFromPreparedSource(Pipe pipe_, std::shared_ptr<Context> context_);
String getName() const override { return "ReadNothing"; }
void initializePipeline(QueryPipeline & pipeline) override;
private:
Pipe pipe;
std::shared_ptr<Context> context;
};
}
#include <Processors/QueryPlan/ReadFromStorageStep.h>
#include <Interpreters/Context.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Pipe.h>
#include <Processors/QueryPipeline.h>
#include <Storages/IStorage.h>
#include <Processors/Transforms/ConvertingTransform.h>
namespace DB
{
ReadFromStorageStep::ReadFromStorageStep(
TableStructureReadLockHolder table_lock_,
SelectQueryOptions options_,
StoragePtr storage_,
const Names & required_columns_,
const SelectQueryInfo & query_info_,
std::shared_ptr<Context> context_,
QueryProcessingStage::Enum processing_stage_,
size_t max_block_size_,
size_t max_streams_)
: table_lock(std::move(table_lock_))
, options(std::move(options_))
, storage(std::move(storage_))
, required_columns(required_columns_)
, query_info(query_info_)
, context(std::move(context_))
, processing_stage(processing_stage_)
, max_block_size(max_block_size_)
, max_streams(max_streams_)
{
/// Note: we read from storage in constructor of step because we don't know real header before reading.
/// It will be fixed when storage return QueryPlanStep itself.
Pipes pipes = storage->read(required_columns, query_info, *context, processing_stage, max_block_size, max_streams);
if (pipes.empty())
{
Pipe pipe(std::make_shared<NullSource>(storage->getSampleBlockForColumns(required_columns)));
if (query_info.prewhere_info)
{
if (query_info.prewhere_info->alias_actions)
pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(
pipe.getHeader(), query_info.prewhere_info->alias_actions));
pipe.addSimpleTransform(std::make_shared<FilterTransform>(
pipe.getHeader(),
query_info.prewhere_info->prewhere_actions,
query_info.prewhere_info->prewhere_column_name,
query_info.prewhere_info->remove_prewhere_column));
// To remove additional columns
// In some cases, we did not read any marks so that the pipeline.streams is empty
// Thus, some columns in prewhere are not removed as expected
// This leads to mismatched header in distributed table
if (query_info.prewhere_info->remove_columns_actions)
pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(
pipe.getHeader(), query_info.prewhere_info->remove_columns_actions));
}
pipes.emplace_back(std::move(pipe));
}
pipeline = std::make_unique<QueryPipeline>();
/// Table lock is stored inside pipeline here.
pipeline->addTableLock(table_lock);
/// Set the limits and quota for reading data, the speed and time of the query.
{
const Settings & settings = context->getSettingsRef();
IBlockInputStream::LocalLimits limits;
limits.mode = IBlockInputStream::LIMITS_TOTAL;
limits.size_limits = SizeLimits(settings.max_rows_to_read, settings.max_bytes_to_read, settings.read_overflow_mode);
limits.speed_limits.max_execution_time = settings.max_execution_time;
limits.timeout_overflow_mode = settings.timeout_overflow_mode;
/** Quota and minimal speed restrictions are checked on the initiating server of the request, and not on remote servers,
* because the initiating server has a summary of the execution of the request on all servers.
*
* But limits on data size to read and maximum execution time are reasonable to check both on initiator and
* additionally on each remote server, because these limits are checked per block of data processed,
* and remote servers may process way more blocks of data than are received by initiator.
*
* The limits to throttle maximum execution speed is also checked on all servers.
*/
if (options.to_stage == QueryProcessingStage::Complete)
{
limits.speed_limits.min_execution_rps = settings.min_execution_speed;
limits.speed_limits.min_execution_bps = settings.min_execution_speed_bytes;
}
limits.speed_limits.max_execution_rps = settings.max_execution_speed;
limits.speed_limits.max_execution_bps = settings.max_execution_speed_bytes;
limits.speed_limits.timeout_before_checking_execution_speed = settings.timeout_before_checking_execution_speed;
auto quota = context->getQuota();
for (auto & pipe : pipes)
{
if (!options.ignore_limits)
pipe.setLimits(limits);
if (!options.ignore_quota && (options.to_stage == QueryProcessingStage::Complete))
pipe.setQuota(quota);
}
}
if (pipes.size() == 1)
pipeline->setMaxThreads(1);
for (auto & pipe : pipes)
pipe.enableQuota();
pipeline->init(std::move(pipes));
pipeline->addInterpreterContext(std::move(context));
pipeline->addStorageHolder(std::move(storage));
output_stream = DataStream{.header = pipeline->getHeader()};
}
ReadFromStorageStep::~ReadFromStorageStep() = default;
QueryPipelinePtr ReadFromStorageStep::updatePipeline(QueryPipelines)
{
return std::move(pipeline);
}
}
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Core/QueryProcessingStage.h>
#include <Storages/TableStructureLockHolder.h>
#include <Interpreters/SelectQueryOptions.h>
namespace DB
{
class IStorage;
using StoragePtr = std::shared_ptr<IStorage>;
struct SelectQueryInfo;
struct PrewhereInfo;
/// Reads from storage.
class ReadFromStorageStep : public IQueryPlanStep
{
public:
ReadFromStorageStep(
TableStructureReadLockHolder table_lock,
SelectQueryOptions options,
StoragePtr storage,
const Names & required_columns,
const SelectQueryInfo & query_info,
std::shared_ptr<Context> context,
QueryProcessingStage::Enum processing_stage,
size_t max_block_size,
size_t max_streams);
~ReadFromStorageStep() override;
String getName() const override { return "ReadFromStorage"; }
QueryPipelinePtr updatePipeline(QueryPipelines) override;
private:
TableStructureReadLockHolder table_lock;
SelectQueryOptions options;
StoragePtr storage;
const Names & required_columns;
const SelectQueryInfo & query_info;
std::shared_ptr<Context> context;
QueryProcessingStage::Enum processing_stage;
size_t max_block_size;
size_t max_streams;
QueryPipelinePtr pipeline;
};
}
#include <Processors/QueryPlan/ReadNothingStep.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Sources/NullSource.h>
namespace DB
{
ReadNothingStep::ReadNothingStep(Block output_header)
: ISourceStep(DataStream{.header = std::move(output_header)})
{
}
void ReadNothingStep::initializePipeline(QueryPipeline & pipeline)
{
pipeline.init(Pipe(std::make_shared<NullSource>(getOutputStream().header)));
}
}
#pragma once
#include <Processors/QueryPlan/ISourceStep.h>
namespace DB
{
class ReadNothingStep : public ISourceStep
{
public:
explicit ReadNothingStep(Block output_header);
String getName() const override { return "ReadNothing"; }
void initializePipeline(QueryPipeline & pipeline) override;
};
}
#include <Processors/QueryPlan/RollupStep.h>
#include <Processors/Transforms/RollupTransform.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = false
};
}
RollupStep::RollupStep(const DataStream & input_stream_, AggregatingTransformParamsPtr params_)
: ITransformingStep(input_stream_, params_->getHeader(), getTraits())
, params(std::move(params_))
{
/// Aggregation keys are distinct
for (auto key : params->params.keys)
output_stream->distinct_columns.insert(params->params.src_header.getByPosition(key).name);
}
void RollupStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.resize(1);
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
{
if (stream_type == QueryPipeline::StreamType::Totals)
return nullptr;
return std::make_shared<RollupTransform>(header, std::move(params));
});
}
}
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <DataStreams/SizeLimits.h>
namespace DB
{
struct AggregatingTransformParams;
using AggregatingTransformParamsPtr = std::shared_ptr<AggregatingTransformParams>;
class RollupStep : public ITransformingStep
{
public:
RollupStep(const DataStream & input_stream_, AggregatingTransformParamsPtr params_);
String getName() const override { return "Rollup"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
AggregatingTransformParamsPtr params;
};
}
#include <Processors/QueryPlan/TotalsHavingStep.h>
#include <Processors/Transforms/DistinctTransform.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Transforms/TotalsHavingTransform.h>
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true
};
}
TotalsHavingStep::TotalsHavingStep(
const DataStream & input_stream_,
bool overflow_row_,
const ExpressionActionsPtr & expression_,
const std::string & filter_column_,
TotalsMode totals_mode_,
double auto_include_threshold_,
bool final_)
: ITransformingStep(
input_stream_,
TotalsHavingTransform::transformHeader(input_stream_.header, expression_, final_),
getTraits())
, overflow_row(overflow_row_)
, expression(expression_)
, filter_column_name(filter_column_)
, totals_mode(totals_mode_)
, auto_include_threshold(auto_include_threshold_)
, final(final_)
{
}
void TotalsHavingStep::transformPipeline(QueryPipeline & pipeline)
{
auto totals_having = std::make_shared<TotalsHavingTransform>(
pipeline.getHeader(), overflow_row, expression,
filter_column_name, totals_mode, auto_include_threshold, final);
pipeline.addTotalsHavingTransform(std::move(totals_having));
}
}
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
namespace DB
{
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
enum class TotalsMode;
class TotalsHavingStep : public ITransformingStep
{
public:
TotalsHavingStep(
const DataStream & input_stream_,
bool overflow_row_,
const ExpressionActionsPtr & expression_,
const std::string & filter_column_,
TotalsMode totals_mode_,
double auto_include_threshold_,
bool final_);
String getName() const override { return "TotalsHaving"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
bool overflow_row;
ExpressionActionsPtr expression;
String filter_column_name;
TotalsMode totals_mode;
double auto_include_threshold;
bool final;
};
}
#include <Processors/QueryPlan/UnionStep.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Sources/NullSource.h>
#include <Interpreters/Context.h>
namespace DB
{
UnionStep::UnionStep(DataStreams input_streams_, Block result_header, size_t max_threads_)
: header(std::move(result_header))
, max_threads(max_threads_)
{
input_streams = std::move(input_streams_);
/// TODO: update traits
output_stream = DataStream{.header = header};
}
QueryPipelinePtr UnionStep::updatePipeline(QueryPipelines pipelines)
{
auto pipeline = std::make_unique<QueryPipeline>();
if (pipelines.empty())
{
pipeline->init(Pipe(std::make_shared<NullSource>(output_stream->header)));
return pipeline;
}
size_t num_pipelines = pipelines.size();
pipeline->unitePipelines(std::move(pipelines), output_stream->header);
if (num_pipelines > 1)
{
// nested queries can force 1 thread (due to simplicity)
// but in case of union this cannot be done.
pipeline->setMaxThreads(std::min<UInt64>(num_pipelines, max_threads));
}
return pipeline;
}
}
#pragma once
#include <Processors/QueryPlan/IQueryPlanStep.h>
namespace DB
{
class UnionStep : public IQueryPlanStep
{
public:
/// max_threads is used to limit the number of threads for result pipeline.
UnionStep(DataStreams input_streams_, Block result_header, size_t max_threads_);
String getName() const override { return "Union"; }
QueryPipelinePtr updatePipeline(QueryPipelines pipelines) override;
private:
Block header;
size_t max_threads;
};
}
......@@ -23,13 +23,13 @@ namespace ErrorCodes
CreatingSetsTransform::CreatingSetsTransform(
Block out_header_,
const SubqueriesForSets & subqueries_for_sets_,
const SizeLimits & network_transfer_limits_,
SubqueriesForSets subqueries_for_sets_,
SizeLimits network_transfer_limits_,
const Context & context_)
: IProcessor({}, {std::move(out_header_)})
, subqueries_for_sets(subqueries_for_sets_)
, subqueries_for_sets(std::move(subqueries_for_sets_))
, cur_subquery(subqueries_for_sets.begin())
, network_transfer_limits(network_transfer_limits_)
, network_transfer_limits(std::move(network_transfer_limits_))
, context(context_)
{
}
......
......@@ -21,8 +21,8 @@ class CreatingSetsTransform : public IProcessor
public:
CreatingSetsTransform(
Block out_header_,
const SubqueriesForSets & subqueries_for_sets_,
const SizeLimits & network_transfer_limits_,
SubqueriesForSets subqueries_for_sets_,
SizeLimits network_transfer_limits_,
const Context & context_);
String getName() const override { return "CreatingSetsTransform"; }
......
......@@ -5,7 +5,7 @@
namespace DB
{
static Block transformHeader(Block header, const ExpressionActionsPtr & expression)
Block ExpressionTransform::transformHeader(Block header, const ExpressionActionsPtr & expression)
{
expression->execute(header, true);
return header;
......
......@@ -18,6 +18,8 @@ public:
String getName() const override { return "ExpressionTransform"; }
static Block transformHeader(Block header, const ExpressionActionsPtr & expression);
protected:
void transform(Chunk & chunk) override;
......
......@@ -27,7 +27,7 @@ static void replaceFilterToConstant(Block & block, const String & filter_column_
}
}
static Block transformHeader(
Block FilterTransform::transformHeader(
Block header,
const ExpressionActionsPtr & expression,
const String & filter_column_name,
......
......@@ -19,6 +19,12 @@ public:
const Block & header_, ExpressionActionsPtr expression_, String filter_column_name_,
bool remove_filter_column_, bool on_totals_ = false);
static Block transformHeader(
Block header,
const ExpressionActionsPtr & expression,
const String & filter_column_name,
bool remove_filter_column);
String getName() const override { return "FilterTransform"; }
Status prepare() override;
......
......@@ -32,7 +32,7 @@ void finalizeChunk(Chunk & chunk)
chunk.setColumns(std::move(columns), num_rows);
}
static Block createOutputHeader(Block block, const ExpressionActionsPtr & expression, bool final)
Block TotalsHavingTransform::transformHeader(Block block, const ExpressionActionsPtr & expression, bool final)
{
if (final)
finalizeBlock(block);
......@@ -51,7 +51,7 @@ TotalsHavingTransform::TotalsHavingTransform(
TotalsMode totals_mode_,
double auto_include_threshold_,
bool final_)
: ISimpleTransform(header, createOutputHeader(header, expression_, final_), true)
: ISimpleTransform(header, transformHeader(header, expression_, final_), true)
, overflow_row(overflow_row_)
, expression(expression_)
, filter_column_name(filter_column_)
......
......@@ -37,6 +37,8 @@ public:
Status prepare() override;
void work() override;
static Block transformHeader(Block block, const ExpressionActionsPtr & expression, bool final);
protected:
void transform(Chunk & chunk) override;
......
......@@ -137,6 +137,34 @@ SRCS(
Transforms/SortingTransform.cpp
Transforms/TotalsHavingTransform.cpp
Transforms/AggregatingInOrderTransform.cpp
QueryPlan/AddingDelayedStreamStep.cpp
QueryPlan/AggregatingStep.cpp
QueryPlan/ConvertingStep.cpp
QueryPlan/CreatingSetsStep.cpp
QueryPlan/CubeStep.cpp
QueryPlan/DistinctStep.cpp
QueryPlan/ExpressionStep.cpp
QueryPlan/ExtremesStep.cpp
QueryPlan/FillingStep.cpp
QueryPlan/FilterStep.cpp
QueryPlan/FinishSortingStep.cpp
QueryPlan/ISourceStep.cpp
QueryPlan/ITransformingStep.cpp
QueryPlan/IQueryPlanStep.cpp
QueryPlan/LimitByStep.cpp
QueryPlan/LimitStep.cpp
QueryPlan/MergeSortingStep.cpp
QueryPlan/MergingAggregatedStep.cpp
QueryPlan/MergingSortedStep.cpp
QueryPlan/OffsetsStep.cpp
QueryPlan/PartialSortingStep.cpp
QueryPlan/UnionStep.cpp
QueryPlan/ReadFromPreparedSource.cpp
QueryPlan/ReadFromStorageStep.cpp
QueryPlan/ReadNothingStep.cpp
QueryPlan/RollupStep.cpp
QueryPlan/TotalsHavingStep.cpp
QueryPlan/QueryPlan.cpp
)
END()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册