提交 76e9a8ed 编写于 作者: D Dmitry

At least something...

上级 45497b4e
......@@ -361,6 +361,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, enable_debug_queries, false, "Enables debug queries such as AST.", 0) \
M(SettingBool, enable_unaligned_array_join, false, "Allow ARRAY JOIN with multiple arrays that have different sizes. When this settings is enabled, arrays will be resized to the longest one.", 0) \
M(SettingBool, optimize_read_in_order, true, "Enable ORDER BY optimization for reading data in corresponding order in MergeTree tables.", 0) \
M(SettingBool, optimize_aggregation_in_order, true, "Enable GROUP BY optimization for aggregating data in corresponding order in MergeTree tables.", 0) \
M(SettingBool, low_cardinality_allow_in_native_format, true, "Use LowCardinality type in Native format. Otherwise, convert LowCardinality columns to ordinary for select query, and convert ordinary columns to required LowCardinality for insert query.", 0) \
M(SettingBool, cancel_http_readonly_queries_on_client_close, false, "Cancel HTTP readonly queries when a client closes the connection without waiting for response.", 0) \
M(SettingBool, external_table_functions_use_nulls, true, "If it is set to true, external table functions will implicitly use Nullable type if needed. Otherwise NULLs will be substituted with default values. Currently supported only by 'mysql' and 'odbc' table functions.", 0) \
......
......@@ -791,13 +791,18 @@ InterpreterSelectQuery::analyzeExpressions(
}
}
bool has_stream_with_non_joned_rows = (res.before_join && res.before_join->getTableJoinAlgo()->hasStreamWithNonJoinedRows());
bool has_stream_with_non_joined_rows = (res.before_join && res.before_join->getTableJoinAlgo()->hasStreamWithNonJoinedRows());
res.optimize_read_in_order =
context.getSettingsRef().optimize_read_in_order
&& storage && query.orderBy()
&& !query_analyzer.hasAggregation()
&& !query.final()
&& !has_stream_with_non_joned_rows;
&& !has_stream_with_non_joined_rows;
/// TODO correct conditions
res.optimize_aggregation_in_order =
context.getSettingsRef().optimize_aggregation_in_order
&& storage && query.groupBy();
/// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers.
query_analyzer.appendSelect(chain, only_types || (res.need_aggregate ? !res.second_stage : !res.first_stage));
......@@ -929,6 +934,19 @@ static SortDescription getSortDescription(const ASTSelectQuery & query, const Co
return order_descr;
}
static Names getGroupByDescription(const ASTSelectQuery & query, const Context & /*context*/)
{
Names group_by_descr;
group_by_descr.reserve(query.groupBy()->children.size());
for (const auto & elem : query.groupBy()->children)
{
String name = elem->getColumnName();
group_by_descr.push_back(name);
}
return group_by_descr;
}
static UInt64 getLimitUIntValue(const ASTPtr & node, const Context & context)
{
const auto & [field, type] = evaluateConstantExpression(node, context);
......@@ -1165,7 +1183,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
executeWhere(pipeline, expressions.before_where, expressions.remove_where_filter);
if (expressions.need_aggregate)
executeAggregation(pipeline, expressions.before_aggregation, aggregate_overflow_row, aggregate_final);
executeAggregation(pipeline, expressions.before_aggregation, aggregate_overflow_row, aggregate_final, query_info.group_by_info);
else
{
executeExpression(pipeline, expressions.before_order_and_select);
......@@ -1648,6 +1666,15 @@ void InterpreterSelectQuery::executeFetchColumns(
query_info.input_sorting_info = query_info.order_by_optimizer->getInputOrder(storage);
}
if (analysis_result.optimize_aggregation_in_order)
{
query_info.group_by_optimizer = std::make_shared<AggregateInOrderOptimizer>(
getGroupByDescription(query, *context),
query_info.syntax_analyzer_result);
query_info.group_by_info = query_info.group_by_optimizer->getGroupByCommonPrefix(storage);
}
BlockInputStreams streams;
Pipes pipes;
......@@ -1861,7 +1888,7 @@ void InterpreterSelectQuery::executeWhere(QueryPipeline & pipeline, const Expres
});
}
void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final)
void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final, GroupByInfoPtr group_by_info)
{
pipeline.transform([&](auto & stream)
{
......@@ -1883,6 +1910,15 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre
const Settings & settings = context->getSettingsRef();
if (group_by_info) {
/// TODO optimization :)
// for (const auto & elem : group_by_info->order_key_prefix_descr) {
// std::cerr << elem << " ";
// }
// std::cerr << "\n";
}
/** Two-level aggregation is useful in two cases:
* 1. Parallel aggregation is done, and the results should be merged in parallel.
* 2. An aggregation is done with store of temporary data on the disk, and they need to be merged in a memory efficient way.
......@@ -1927,7 +1963,7 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre
}
void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final)
void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final, GroupByInfoPtr /*group_by_info*/)
{
pipeline.addSimpleTransform([&](const Block & header)
{
......
......@@ -163,6 +163,7 @@ private:
bool remove_where_filter = false;
bool optimize_read_in_order = false;
bool optimize_aggregation_in_order = false;
ExpressionActionsPtr before_join; /// including JOIN
ExpressionActionsPtr before_where;
......@@ -217,7 +218,7 @@ private:
QueryPipeline & save_context_and_storage);
void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter);
void executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
void executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final, GroupByInfoPtr group_by_info);
void executeMergeAggregated(Pipeline & pipeline, bool overflow_row, bool final);
void executeTotalsAndHaving(Pipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
void executeHaving(Pipeline & pipeline, const ExpressionActionsPtr & expression);
......@@ -236,7 +237,7 @@ private:
void executeMergeSorted(Pipeline & pipeline, const SortDescription & sort_description, UInt64 limit);
void executeWhere(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_fiter);
void executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
void executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final, GroupByInfoPtr group_by_info);
void executeMergeAggregated(QueryPipeline & pipeline, bool overflow_row, bool final);
void executeTotalsAndHaving(QueryPipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final);
void executeHaving(QueryPipeline & pipeline, const ExpressionActionsPtr & expression);
......
......@@ -31,7 +31,7 @@ ReadInOrderOptimizer::ReadInOrderOptimizer(
InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & storage) const
{
const MergeTreeData * merge_tree = dynamic_cast<const MergeTreeData *>(storage.get());
const auto * merge_tree = dynamic_cast<const MergeTreeData *>(storage.get());
if (!merge_tree || !merge_tree->hasSortingKey())
return {};
......@@ -110,4 +110,45 @@ InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & stora
return std::make_shared<InputSortingInfo>(std::move(order_key_prefix_descr), read_direction);
}
AggregateInOrderOptimizer::AggregateInOrderOptimizer(
const Names & group_by_description_,
const SyntaxAnalyzerResultPtr & syntax_result)
: group_by_description(group_by_description_)
{
/// Not sure yet but let it be
for (const auto & elem : syntax_result->array_join_result_to_source)
forbidden_columns.insert(elem.first);
}
GroupByInfoPtr AggregateInOrderOptimizer::getGroupByCommonPrefix(const StoragePtr &storage) const
{
const auto * merge_tree = dynamic_cast<const MergeTreeData *>(storage.get());
if (!merge_tree || !merge_tree->hasSortingKey())
return {};
Names group_by_common_prefix;
const auto & sorting_key_columns = merge_tree->getSortingKeyColumns();
size_t prefix_size = std::min(group_by_description.size(), sorting_key_columns.size());
for (size_t i = 0; i < prefix_size; ++i)
{
if (forbidden_columns.count(group_by_description[i]))
break;
if (group_by_description[i] == sorting_key_columns[i]) {
group_by_common_prefix.push_back(group_by_description[i]);
}
else {
/// TODO injective functions
break;
}
}
if (group_by_common_prefix.empty())
return {};
return std::make_shared<GroupByInfo>(std::move(group_by_common_prefix));
}
}
......@@ -23,10 +23,31 @@ public:
InputSortingInfoPtr getInputOrder(const StoragePtr & storage) const;
private:
/// Actions for every element of order expression to analyze functions for monotonicicy
/// Actions for every element of order expression to analyze functions for monotonicity
ManyExpressionActions elements_actions;
NameSet forbidden_columns;
SortDescription required_sort_description;
};
}
/** Helper class, that can analyze MergeTree order key
* and required group by description to get their
* common prefix, which is needed for
* performing reading in order of PK.
*/
class AggregateInOrderOptimizer
{
public:
AggregateInOrderOptimizer(
const Names & group_by_description,
const SyntaxAnalyzerResultPtr & syntax_result);
GroupByInfoPtr getGroupByCommonPrefix(const StoragePtr & storage) const;
private:
/// Actions for every element of order expression to analyze functions for monotonicity
NameSet forbidden_columns;
Names group_by_description;
};
}
\ No newline at end of file
......@@ -2,6 +2,7 @@
#include <Interpreters/PreparedSets.h>
#include <Core/SortDescription.h>
#include <Core/Names.h>
#include <memory>
namespace DB
......@@ -51,9 +52,18 @@ struct InputSortingInfo
bool operator !=(const InputSortingInfo & other) const { return !(*this == other); }
};
struct GroupByInfo
{
Names order_key_prefix_descr;
GroupByInfo(const Names & order_key_prefix_descr_)
: order_key_prefix_descr(order_key_prefix_descr_) {}
};
using PrewhereInfoPtr = std::shared_ptr<PrewhereInfo>;
using FilterInfoPtr = std::shared_ptr<FilterInfo>;
using InputSortingInfoPtr = std::shared_ptr<const InputSortingInfo>;
using GroupByInfoPtr = std::shared_ptr<GroupByInfo>;
struct SyntaxAnalyzerResult;
using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>;
......@@ -61,6 +71,9 @@ using SyntaxAnalyzerResultPtr = std::shared_ptr<const SyntaxAnalyzerResult>;
class ReadInOrderOptimizer;
using ReadInOrderOptimizerPtr = std::shared_ptr<const ReadInOrderOptimizer>;
class AggregateInOrderOptimizer;
using AggregateInOrderOptimizerPtr = std::shared_ptr<const AggregateInOrderOptimizer>;
/** Query along with some additional data,
* that can be used during query processing
* inside storage engines.
......@@ -74,9 +87,13 @@ struct SelectQueryInfo
PrewhereInfoPtr prewhere_info;
ReadInOrderOptimizerPtr order_by_optimizer;
AggregateInOrderOptimizerPtr group_by_optimizer;
/// We can modify it while reading from storage
mutable InputSortingInfoPtr input_sorting_info;
GroupByInfoPtr group_by_info;
/// Prepared sets are used for indices by storage engine.
/// Example: x IN (1, 2, 3)
PreparedSets sets;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册