提交 3d706ec2 编写于 作者: C CurtizJ

support 'order by' optimiation with simple monotonic functions

上级 e559c8f9
......@@ -52,6 +52,7 @@
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Functions/IFunction.h>
#include <Core/Field.h>
#include <Core/Types.h>
#include <Columns/Collator.h>
......@@ -534,13 +535,19 @@ InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage
}
}
/// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers.
query_analyzer->appendSelect(chain, dry_run || (res.need_aggregate ? !res.second_stage : !res.first_stage));
res.selected_columns = chain.getLastStep().required_output;
res.has_order_by = query_analyzer->appendOrderBy(chain, dry_run || (res.need_aggregate ? !res.second_stage : !res.first_stage));
res.before_order_and_select = chain.getLastActions();
res.before_select = chain.getLastActions();
chain.addStep();
/// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers.
if (query_analyzer->appendOrderBy(chain, dry_run || (res.need_aggregate ? !res.second_stage : !res.first_stage)))
{
res.has_order_by = true;
res.before_order = chain.getLastActions();
chain.addStep();
}
if (query_analyzer->appendLimitBy(chain, dry_run || !res.second_stage))
{
res.has_limit_by = true;
......@@ -646,6 +653,86 @@ static UInt64 getLimitForSorting(const ASTSelectQuery & query, const Context & c
return 0;
}
static SortingInfoPtr optimizeSortingWithPK(const MergeTreeData & merge_tree, const ASTSelectQuery & query,
const Context & context, const NamesAndTypesList & required_columns)
{
if (!merge_tree.hasSortingKey())
return {};
auto order_descr = getSortDescription(query);
SortDescription prefix_order_descr;
int read_direction = order_descr.at(0).direction;
const auto & sorting_key_columns = merge_tree.getSortingKeyColumns();
size_t prefix_size = std::min(order_descr.size(), sorting_key_columns.size());
auto order_by_expr = query.orderBy();
auto syntax_result = SyntaxAnalyzer(context).analyze(order_by_expr, required_columns);
for (size_t i = 0; i < prefix_size; ++i)
{
/// Read in pk order in case of exact match with order key element
/// or in some simple cases when order key element is wrapped into monotonic function.
int current_direction = order_descr[i].direction;
if (order_descr[i].column_name == sorting_key_columns[i] && current_direction == read_direction)
prefix_order_descr.push_back(order_descr[i]);
else
{
const auto & ast = query.orderBy()->children[0];
auto actions = ExpressionAnalyzer(ast->children.at(0), syntax_result, context).getActions(false);
const auto & input_columns = actions->getRequiredColumnsWithTypes();
if (input_columns.size() != 1 || input_columns.front().name != sorting_key_columns[i])
break;
bool first = true;
for (const auto & action : actions->getActions())
{
if (action.type != ExpressionAction::APPLY_FUNCTION)
continue;
if (!first)
{
current_direction = 0;
break;
}
else
first = false;
const auto & func = *action.function_base;
if (!func.hasInformationAboutMonotonicity())
{
current_direction = 0;
break;
}
auto monotonicity = func.getMonotonicityForRange(*input_columns.front().type, {}, {});
if (!monotonicity.is_monotonic)
{
current_direction = 0;
break;
}
else if (!monotonicity.is_positive)
current_direction *= -1;
}
if (!current_direction || (i > 0 && current_direction != read_direction))
break;
if (i == 0)
read_direction = current_direction;
prefix_order_descr.push_back(order_descr[i]);
}
}
if (prefix_order_descr.empty())
return {};
return std::make_shared<SortingInfo>(std::move(prefix_order_descr), read_direction);
}
template <typename TPipeline>
void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputStreamPtr & prepared_input, bool dry_run)
{
......@@ -704,38 +791,6 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
}
SortingInfoPtr sorting_info;
if (settings.optimize_pk_order && storage && query.orderBy() && !query.groupBy() && !query.final())
{
auto optimize_pk_order = [&](const auto & merge_tree) -> SortingInfoPtr
{
if (!merge_tree.hasSortingKey())
return {};
auto order_descr = getSortDescription(query);
const auto & sorting_key_columns = merge_tree.getSortingKeyColumns();
SortDescription prefix_order_descr;
int direction = order_descr.at(0).direction;
size_t prefix_size = std::min(order_descr.size(), sorting_key_columns.size());
for (size_t i = 0; i < prefix_size; ++i)
{
if (order_descr[i].column_name != sorting_key_columns[i]
|| order_descr[i].direction != direction)
break;
prefix_order_descr.push_back(order_descr[i]);
}
if (prefix_order_descr.empty())
return {};
return std::make_shared<SortingInfo>(std::move(prefix_order_descr), direction);
};
if (const MergeTreeData * merge_tree_data = dynamic_cast<const MergeTreeData *>(storage.get()))
sorting_info = optimize_pk_order(*merge_tree_data);
}
if (dry_run)
{
if constexpr (pipeline_with_processors)
......@@ -748,6 +803,14 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
if (storage && expressions.filter_info && expressions.prewhere_info)
throw Exception("PREWHERE is not supported if the table is filtered by row-level security expression", ErrorCodes::ILLEGAL_PREWHERE);
if (settings.optimize_pk_order && storage && query.orderBy() && !query.groupBy() && !query.final())
{
if (const MergeTreeData * merge_tree_data = dynamic_cast<const MergeTreeData *>(storage.get()))
sorting_info = optimizeSortingWithPK(*merge_tree_data, query,context, expressions.before_order->getSampleBlock().getNamesAndTypesList());
if (sorting_info)
sorting_info->setActions(expressions.before_order);
}
if (expressions.prewhere_info)
{
if constexpr (pipeline_with_processors)
......@@ -784,6 +847,14 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
if (storage && expressions.filter_info && expressions.prewhere_info)
throw Exception("PREWHERE is not supported if the table is filtered by row-level security expression", ErrorCodes::ILLEGAL_PREWHERE);
if (settings.optimize_pk_order && storage && query.orderBy() && !query.groupBy() && !query.final())
{
if (const MergeTreeData * merge_tree_data = dynamic_cast<const MergeTreeData *>(storage.get()))
sorting_info = optimizeSortingWithPK(*merge_tree_data, query, context, expressions.before_order->getSampleBlock().getNamesAndTypesList());
if (sorting_info)
sorting_info->setActions(expressions.before_order);
}
/** Read the data from Storage. from_stage - to what stage the request was completed in Storage. */
executeFetchColumns(from_stage, pipeline, sorting_info, expressions.prewhere_info, expressions.columns_to_remove_after_prewhere);
......@@ -891,7 +962,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
executeAggregation(pipeline, expressions.before_aggregation, aggregate_overflow_row, aggregate_final);
else
{
executeExpression(pipeline, expressions.before_order_and_select);
executeExpression(pipeline, expressions.before_select);
executeDistinct(pipeline, true, expressions.selected_columns);
}
......@@ -903,7 +974,11 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
if (!expressions.second_stage && !expressions.need_aggregate && !expressions.has_having)
{
if (expressions.has_order_by)
{
if (!query_info.sorting_info) // Otherwise we have executed expressions while reading
executeExpression(pipeline, expressions.before_order);
executeOrder(pipeline, query_info.sorting_info);
}
if (expressions.has_order_by && query.limitLength())
executeDistinct(pipeline, false, expressions.selected_columns);
......@@ -957,7 +1032,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
else if (expressions.has_having)
executeHaving(pipeline, expressions.before_having);
executeExpression(pipeline, expressions.before_order_and_select);
executeExpression(pipeline, expressions.before_select);
executeDistinct(pipeline, true, expressions.selected_columns);
need_second_distinct_pass = query.distinct && pipeline.hasMixedStreams();
......@@ -994,6 +1069,10 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
* but there is no aggregation, then on the remote servers ORDER BY was made
* - therefore, we merge the sorted streams from remote servers.
*/
if (!query_info.sorting_info) // Otherwise we have executed them while reading
executeExpression(pipeline, expressions.before_order);
if (!expressions.first_stage && !expressions.need_aggregate && !(query.group_by_with_totals && !aggregate_final))
executeMergeSorted(pipeline);
else /// Otherwise, just sort.
......
......@@ -152,7 +152,8 @@ private:
ExpressionActionsPtr before_where;
ExpressionActionsPtr before_aggregation;
ExpressionActionsPtr before_having;
ExpressionActionsPtr before_order_and_select;
ExpressionActionsPtr before_order;
ExpressionActionsPtr before_select;
ExpressionActionsPtr before_limit_by;
ExpressionActionsPtr final_projection;
......
......@@ -973,6 +973,10 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsPKOrd
streams_per_thread.push_back(source_stream);
}
if (sorting_info->actions && !sorting_info->actions->getActions().empty())
for (auto & stream : streams_per_thread)
stream = std::make_shared<ExpressionBlockInputStream>(stream, sorting_info->actions);
if (streams_per_thread.size() > 1)
streams.push_back(std::make_shared<MergingSortedBlockInputStream>(
streams_per_thread, sorting_info->prefix_order_descr, max_block_size));
......
......@@ -38,9 +38,12 @@ struct SortingInfo
{
SortDescription prefix_order_descr;
int direction;
ExpressionActionsPtr actions;
SortingInfo(const SortDescription prefix_order_descr_, int direction_)
SortingInfo(const SortDescription & prefix_order_descr_, int direction_)
: prefix_order_descr(prefix_order_descr_), direction(direction_) {}
void setActions(const ExpressionActionsPtr & actions_) { actions = actions_; }
};
using PrewhereInfoPtr = std::shared_ptr<PrewhereInfo>;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册