提交 bbe0245b 编写于 作者: D Dmitry

changes after review #1

上级 259de4aa
......@@ -540,7 +540,7 @@ void NO_INLINE Aggregator::executeOnIntervalWithoutKeyImpl(
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
{
if (inst->offsets)
inst->batch_that->addBatchSinglePlaceFromInterval(inst->offsets[row_begin], inst->offsets[static_cast<ssize_t>(row_end - 1)], res + inst->state_offset, inst->batch_arguments, arena);
inst->batch_that->addBatchSinglePlaceFromInterval(inst->offsets[row_begin], inst->offsets[row_end - 1], res + inst->state_offset, inst->batch_arguments, arena);
else
inst->batch_that->addBatchSinglePlaceFromInterval(row_begin, row_end, res + inst->state_offset, inst->batch_arguments, arena);
}
......
......@@ -831,7 +831,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
if (!expressions.second_stage && !expressions.need_aggregate && !expressions.hasHaving())
{
if (expressions.has_order_by)
executeOrder(pipeline, query_info.input_sorting_info);
executeOrder(pipeline, query_info.input_order_info);
if (expressions.has_order_by && query.limitLength())
executeDistinct(pipeline, false, expressions.selected_columns);
......@@ -1025,7 +1025,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
if (!expressions.first_stage && !expressions.need_aggregate && !(query.group_by_with_totals && !aggregate_final))
executeMergeSorted(pipeline);
else /// Otherwise, just sort.
executeOrder(pipeline, query_info.input_sorting_info);
executeOrder(pipeline, query_info.input_order_info);
}
/** Optimization - if there are several sources and there is LIMIT, then first apply the preliminary LIMIT,
......@@ -1424,25 +1424,21 @@ void InterpreterSelectQuery::executeFetchColumns(
query_info.prewhere_info = prewhere_info;
/// Create optimizer with prepared actions.
/// Maybe we will need to calc input_sorting_info later, e.g. while reading from StorageMerge.
if (analysis_result.optimize_read_in_order)
/// Maybe we will need to calc input_order_info later, e.g. while reading from StorageMerge.
if (analysis_result.optimize_read_in_order || analysis_result.optimize_aggregation_in_order)
{
query_info.order_by_optimizer = std::make_shared<ReadInOrderOptimizer>(
analysis_result.order_by_elements_actions,
getSortDescription(query, *context),
query_info.syntax_analyzer_result);
query_info.input_sorting_info = query_info.order_by_optimizer->getInputOrder(storage);
}
if (analysis_result.optimize_aggregation_in_order)
{
query_info.group_by_optimizer = std::make_shared<ReadInOrderOptimizer>(
if (analysis_result.optimize_read_in_order)
query_info.order_optimizer = std::make_shared<ReadInOrderOptimizer>(
analysis_result.order_by_elements_actions,
getSortDescription(query, *context),
query_info.syntax_analyzer_result);
else
query_info.order_optimizer = std::make_shared<ReadInOrderOptimizer>(
analysis_result.group_by_elements_actions,
getSortDescriptionFromGroupBy(query, *context),
query_info.syntax_analyzer_result);
query_info.group_by_info = query_info.group_by_optimizer->getInputOrder(storage);
query_info.input_order_info = query_info.order_optimizer->getInputOrder(storage);
}
......@@ -1647,7 +1643,7 @@ void InterpreterSelectQuery::executeWhere(QueryPipeline & pipeline, const Expres
});
}
void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final, InputSortingInfoPtr /*group_by_info*/)
void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr /*group_by_info*/)
{
pipeline.transform([&](auto & stream)
{
......@@ -1711,7 +1707,7 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre
}
void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final, InputSortingInfoPtr group_by_info)
void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info)
{
pipeline.addSimpleTransform([&](const Block & header)
{
......@@ -1801,7 +1797,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const
{
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FinalizingInOrderTransform>(header, transform_params);
return std::make_shared<FinalizingSimpleTransform>(header, transform_params);
});
}
......@@ -2075,7 +2071,7 @@ void InterpreterSelectQuery::executeExpression(QueryPipeline & pipeline, const E
});
}
void InterpreterSelectQuery::executeOrder(Pipeline & pipeline, InputSortingInfoPtr input_sorting_info)
void InterpreterSelectQuery::executeOrder(Pipeline & pipeline, InputOrderInfoPtr input_sorting_info)
{
auto & query = getSelectQuery();
SortDescription output_order_descr = getSortDescription(query, *context);
......@@ -2138,7 +2134,7 @@ void InterpreterSelectQuery::executeOrder(Pipeline & pipeline, InputSortingInfoP
}
}
void InterpreterSelectQuery::executeOrderOptimized(QueryPipeline & pipeline, InputSortingInfoPtr input_sorting_info, UInt64 limit, SortDescription & output_order_descr)
void InterpreterSelectQuery::executeOrderOptimized(QueryPipeline & pipeline, InputOrderInfoPtr input_sorting_info, UInt64 limit, SortDescription & output_order_descr)
{
const Settings & settings = context->getSettingsRef();
......@@ -2176,7 +2172,7 @@ void InterpreterSelectQuery::executeOrderOptimized(QueryPipeline & pipeline, Inp
}
}
void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, InputSortingInfoPtr input_sorting_info)
void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, InputOrderInfoPtr input_sorting_info)
{
auto & query = getSelectQuery();
SortDescription output_order_descr = getSortDescription(query, *context);
......@@ -2649,11 +2645,11 @@ void InterpreterSelectQuery::executeExtremes(QueryPipeline & pipeline)
void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(Pipeline & pipeline, const SubqueriesForSets & subqueries_for_sets)
{
/// Merge streams to one. Use MergeSorting if data was read in sorted order, Union otherwise.
if (query_info.input_sorting_info)
if (query_info.input_order_info)
{
if (pipeline.stream_with_non_joined_data)
throw Exception("Using read in order optimization, but has stream with non-joined data in pipeline", ErrorCodes::LOGICAL_ERROR);
executeMergeSorted(pipeline, query_info.input_sorting_info->order_key_prefix_descr, 0);
executeMergeSorted(pipeline, query_info.input_order_info->order_key_prefix_descr, 0);
}
else
executeUnion(pipeline, {});
......@@ -2664,8 +2660,8 @@ void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(Pipeline & pipeline
void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(QueryPipeline & pipeline, const SubqueriesForSets & subqueries_for_sets)
{
if (query_info.input_sorting_info)
executeMergeSorted(pipeline, query_info.input_sorting_info->order_key_prefix_descr, 0);
if (query_info.input_order_info)
executeMergeSorted(pipeline, query_info.input_order_info->order_key_prefix_descr, 0);
const Settings & settings = context->getSettingsRef();
......
......@@ -173,12 +173,12 @@ private:
QueryPipeline & save_context_and_storage);
void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter);
void executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final, InputSortingInfoPtr group_by_info);
void executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info);
void executeMergeAggregated(Pipeline & pipeline, bool overflow_row, bool final);
void executeTotalsAndHaving(Pipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
void executeHaving(Pipeline & pipeline, const ExpressionActionsPtr & expression);
static void executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expression);
void executeOrder(Pipeline & pipeline, InputSortingInfoPtr sorting_info);
void executeOrder(Pipeline & pipeline, InputOrderInfoPtr sorting_info);
void executeWithFill(Pipeline & pipeline);
void executeMergeSorted(Pipeline & pipeline);
void executePreLimit(Pipeline & pipeline);
......@@ -191,13 +191,13 @@ private:
void executeSubqueriesInSetsAndJoins(Pipeline & pipeline, const std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
void executeMergeSorted(Pipeline & pipeline, const SortDescription & sort_description, UInt64 limit);
void executeWhere(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter);
void executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final, InputSortingInfoPtr group_by_info);
void executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info);
void executeMergeAggregated(QueryPipeline & pipeline, bool overflow_row, bool final);
void executeTotalsAndHaving(QueryPipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
void executeHaving(QueryPipeline & pipeline, const ExpressionActionsPtr & expression);
static void executeExpression(QueryPipeline & pipeline, const ExpressionActionsPtr & expression);
void executeOrder(QueryPipeline & pipeline, InputSortingInfoPtr sorting_info);
void executeOrderOptimized(QueryPipeline & pipeline, InputSortingInfoPtr sorting_info, UInt64 limit, SortDescription & output_order_descr);
void executeOrder(QueryPipeline & pipeline, InputOrderInfoPtr sorting_info);
void executeOrderOptimized(QueryPipeline & pipeline, InputOrderInfoPtr sorting_info, UInt64 limit, SortDescription & output_order_descr);
void executeWithFill(QueryPipeline & pipeline);
void executeMergeSorted(QueryPipeline & pipeline);
void executePreLimit(QueryPipeline & pipeline, bool do_not_skip_offset);
......
#include <Processors/Transforms/AggregatingInOrderTransform.h>
#include <Processors/Transforms/TotalsHavingTransform.h>
#include <DataTypes/DataTypeLowCardinality.h>
namespace DB
{
......@@ -250,65 +248,5 @@ void AggregatingInOrderTransform::generate()
need_generate = false;
}
FinalizingInOrderTransform::FinalizingInOrderTransform(Block header, AggregatingTransformParamsPtr params_)
: IProcessor({std::move(header)}, {params_->getHeader(true)})
{
}
FinalizingInOrderTransform::~FinalizingInOrderTransform() = default;
void FinalizingInOrderTransform::consume(Chunk chunk)
{
finalizeChunk(chunk);
current_chunk = std::move(chunk);
}
void FinalizingInOrderTransform::work()
{
consume(std::move(current_chunk));
}
IProcessor::Status FinalizingInOrderTransform::prepare()
{
auto & output = outputs.front();
auto & input = inputs.back();
/// Check can output.
if (output.isFinished())
{
input.close();
return Status::Finished;
}
if (!output.canPush())
{
input.setNotNeeded();
return Status::PortFull;
}
if (input.isFinished())
{
output.push(std::move(current_chunk));
output.finish();
return Status::Finished;
}
if (!current_chunk.empty())
{
output.push(std::move(current_chunk));
current_chunk.clear();
return Status::Ready;
}
if (!input.hasData())
{
input.setNeeded();
return Status::NeedData;
}
current_chunk = input.pull(true);
return Status::Ready;
}
}
......@@ -2,7 +2,9 @@
#include <Core/SortDescription.h>
#include <Interpreters/Aggregator.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Transforms/TotalsHavingTransform.h>
namespace DB
{
......@@ -58,24 +60,19 @@ private:
};
class FinalizingInOrderTransform : public IProcessor
class FinalizingSimpleTransform : public ISimpleTransform
{
public:
FinalizingInOrderTransform(Block header, AggregatingTransformParamsPtr params);
FinalizingSimpleTransform(Block header, AggregatingTransformParamsPtr params)
: ISimpleTransform({std::move(header)}, {params->getHeader(true)}, true) {}
~FinalizingInOrderTransform() override;
void transform(Chunk & chunk) override
{
finalizeChunk(chunk);
}
String getName() const override { return "FinalizingInOrderTransform"; }
String getName() const override { return "FinalizingSimpleTransform"; }
};
/// TODO Simplify prepare
Status prepare() override;
void work() override;
void consume(Chunk chunk);
private:
Chunk current_chunk;
Logger * log = &Logger::get("FinalizingInOrderTransform");
};
}
#include <Processors/ISimpleTransform.h>
#pragma once
#include <Processors/ISimpleTransform.h>
#include <Common/Arena.h>
namespace DB
......
......@@ -625,30 +625,9 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
settings,
reader_settings);
}
else if (settings.optimize_read_in_order && query_info.input_sorting_info)
else if ((settings.optimize_read_in_order || settings.optimize_aggregation_in_order) && query_info.input_order_info)
{
size_t prefix_size = query_info.input_sorting_info->order_key_prefix_descr.size();
auto order_key_prefix_ast = data.sorting_key_expr_ast->clone();
order_key_prefix_ast->children.resize(prefix_size);
auto syntax_result = SyntaxAnalyzer(context).analyze(order_key_prefix_ast, data.getColumns().getAllPhysical());
auto sorting_key_prefix_expr = ExpressionAnalyzer(order_key_prefix_ast, syntax_result, context).getActions(false);
res = spreadMarkRangesAmongStreamsWithOrder(
std::move(parts_with_ranges),
num_streams,
column_names_to_read,
max_block_size,
settings.use_uncompressed_cache,
query_info,
sorting_key_prefix_expr,
virt_column_names,
settings,
reader_settings);
}
else if (settings.optimize_aggregation_in_order && query_info.group_by_info)
{
size_t prefix_size = query_info.group_by_info->order_key_prefix_descr.size();
size_t prefix_size = query_info.input_order_info->order_key_prefix_descr.size();
auto order_key_prefix_ast = data.sorting_key_expr_ast->clone();
order_key_prefix_ast->children.resize(prefix_size);
......@@ -855,8 +834,7 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
const MergeTreeReaderSettings & reader_settings) const
{
size_t sum_marks = 0;
const InputSortingInfoPtr & input_sorting_info = query_info.input_sorting_info;
const InputSortingInfoPtr & group_by_info = query_info.group_by_info;
const InputOrderInfoPtr & input_order_info = query_info.input_order_info;
size_t adaptive_parts = 0;
std::vector<size_t> sum_marks_in_parts(parts.size());
......@@ -1000,13 +978,9 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
}
parts.emplace_back(part);
}
/// TODO Better code
if (group_by_info)
ranges_to_get_from_part = split_ranges(ranges_to_get_from_part, group_by_info->direction);
else
ranges_to_get_from_part = split_ranges(ranges_to_get_from_part, input_sorting_info->direction);
ranges_to_get_from_part = split_ranges(ranges_to_get_from_part, input_order_info->direction);
if (group_by_info || input_sorting_info->direction == 1)
if (input_order_info->direction == 1)
{
pipes.emplace_back(std::make_shared<MergeTreeSelectProcessor>(
data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
......@@ -1029,17 +1003,8 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
if (pipes.size() > 1)
{
SortDescription sort_description;
/// TODO Better code
if (group_by_info)
{
for (size_t j = 0; j < group_by_info->order_key_prefix_descr.size(); ++j)
sort_description.emplace_back(data.sorting_key_columns[j], group_by_info->direction, 1);
}
else
{
for (size_t j = 0; j < input_sorting_info->order_key_prefix_descr.size(); ++j)
sort_description.emplace_back(data.sorting_key_columns[j], input_sorting_info->direction, 1);
}
for (size_t j = 0; j < input_order_info->order_key_prefix_descr.size(); ++j)
sort_description.emplace_back(data.sorting_key_columns[j], input_order_info->direction, 1);
/// Project input columns to drop columns from sorting_key_prefix_expr
/// to allow execute the same expression later.
......
......@@ -30,7 +30,7 @@ ReadInOrderOptimizer::ReadInOrderOptimizer(
forbidden_columns.insert(elem.first);
}
InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & storage) const
InputOrderInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & storage) const
{
Names sorting_key_columns;
if (const auto * merge_tree = dynamic_cast<const MergeTreeData *>(storage.get()))
......@@ -122,7 +122,7 @@ InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & stora
if (order_key_prefix_descr.empty())
return {};
return std::make_shared<InputSortingInfo>(std::move(order_key_prefix_descr), read_direction);
return std::make_shared<InputOrderInfo>(std::move(order_key_prefix_descr), read_direction);
}
}
......@@ -20,7 +20,7 @@ public:
const SortDescription & required_sort_description,
const SyntaxAnalyzerResultPtr & syntax_result);
InputSortingInfoPtr getInputOrder(const StoragePtr & storage) const;
InputOrderInfoPtr getInputOrder(const StoragePtr & storage) const;
private:
/// Actions for every element of order expression to analyze functions for monotonicity
......
......@@ -36,25 +36,25 @@ struct FilterInfo
bool do_remove_column = false;
};
struct InputSortingInfo
struct InputOrderInfo
{
SortDescription order_key_prefix_descr;
int direction;
InputSortingInfo(const SortDescription & order_key_prefix_descr_, int direction_)
InputOrderInfo(const SortDescription & order_key_prefix_descr_, int direction_)
: order_key_prefix_descr(order_key_prefix_descr_), direction(direction_) {}
bool operator ==(const InputSortingInfo & other) const
bool operator ==(const InputOrderInfo & other) const
{
return order_key_prefix_descr == other.order_key_prefix_descr && direction == other.direction;
}
bool operator !=(const InputSortingInfo & other) const { return !(*this == other); }
bool operator !=(const InputOrderInfo & other) const { return !(*this == other); }
};
using PrewhereInfoPtr = std::shared_ptr<PrewhereInfo>;
using FilterInfoPtr = std::shared_ptr<FilterInfo>;
using InputSortingInfoPtr = std::shared_ptr<const InputSortingInfo>;
using InputOrderInfoPtr = std::shared_ptr<const InputOrderInfo>;
struct SyntaxAnalyzerResult;
using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>;
......@@ -75,12 +75,9 @@ struct SelectQueryInfo
PrewhereInfoPtr prewhere_info;
ReadInOrderOptimizerPtr order_by_optimizer;
ReadInOrderOptimizerPtr group_by_optimizer;
ReadInOrderOptimizerPtr order_optimizer;
/// We can modify it while reading from storage
mutable InputSortingInfoPtr input_sorting_info;
InputSortingInfoPtr group_by_info;
mutable InputOrderInfoPtr input_order_info;
/// Prepared sets are used for indices by storage engine.
/// Example: x IN (1, 2, 3)
......
......@@ -171,8 +171,8 @@ Pipes StorageBuffer::read(
if (dst_has_same_structure)
{
if (query_info.order_by_optimizer)
query_info.input_sorting_info = query_info.order_by_optimizer->getInputOrder(destination);
if (query_info.order_optimizer)
query_info.input_order_info = query_info.order_optimizer->getInputOrder(destination);
/// The destination table has the same structure of the requested columns and we can simply read blocks from there.
pipes_from_dst = destination->read(column_names, query_info, context, processed_stage, max_block_size, num_streams);
......
......@@ -180,8 +180,8 @@ Pipes StorageMaterializedView::read(
auto lock = storage->lockStructureForShare(
false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
if (query_info.order_by_optimizer)
query_info.input_sorting_info = query_info.order_by_optimizer->getInputOrder(storage);
if (query_info.order_optimizer)
query_info.input_order_info = query_info.order_optimizer->getInputOrder(storage);
Pipes pipes = storage->read(column_names, query_info, context, processed_stage, max_block_size, num_streams);
......
......@@ -177,12 +177,12 @@ Pipes StorageMerge::read(
num_streams *= num_streams_multiplier;
size_t remaining_streams = num_streams;
InputSortingInfoPtr input_sorting_info;
if (query_info.order_by_optimizer)
InputOrderInfoPtr input_sorting_info;
if (query_info.order_optimizer)
{
for (auto it = selected_tables.begin(); it != selected_tables.end(); ++it)
{
auto current_info = query_info.order_by_optimizer->getInputOrder(std::get<0>(*it));
auto current_info = query_info.order_optimizer->getInputOrder(std::get<0>(*it));
if (it == selected_tables.begin())
input_sorting_info = current_info;
else if (!current_info || (input_sorting_info && *current_info != *input_sorting_info))
......@@ -192,7 +192,7 @@ Pipes StorageMerge::read(
break;
}
query_info.input_sorting_info = input_sorting_info;
query_info.input_order_info = input_sorting_info;
}
for (const auto & table : selected_tables)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册