未验证 提交 3f607b21 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #6375 from yandex/try-to-remove-dry-run

Try to remove dry run
......@@ -164,7 +164,18 @@ static Block adaptBlockStructure(const Block & block, const Block & header, cons
res.info = block.info;
for (const auto & elem : header)
res.insert({ castColumn(block.getByName(elem.name), elem.type, context), elem.type, elem.name });
{
ColumnPtr column;
if (elem.column && isColumnConst(*elem.column))
/// TODO: check that column from block contains the same value.
/// TODO: serialize const columns.
column = elem.column->cloneResized(block.rows());
else
column = castColumn(block.getByName(elem.name), elem.type, context);
res.insert({column, elem.type, elem.name});
}
return res;
}
......
......@@ -159,6 +159,13 @@ public:
*/
virtual bool isSuitableForConstantFolding() const { return true; }
/** Some functions like ignore(...) or toTypeName(...) always return constant result which doesn't depend on arguments.
* In this case we can calculate result and assume that it's constant in stream header.
* There is no need to implement function if it has zero arguments.
* Must return ColumnConst with single row or nullptr.
*/
virtual ColumnPtr getResultIfAlwaysReturnsConstantAndHasArguments(const Block & /*block*/, const ColumnNumbers & /*arguments*/) const { return nullptr; }
/** Function is called "injective" if it returns different result for different values of arguments.
* Example: hex, negate, tuple...
*
......@@ -456,6 +463,10 @@ public:
}
bool isSuitableForConstantFolding() const override { return function->isSuitableForConstantFolding(); }
ColumnPtr getResultIfAlwaysReturnsConstantAndHasArguments(const Block & block, const ColumnNumbers & arguments_) const override
{
return function->getResultIfAlwaysReturnsConstantAndHasArguments(block, arguments_);
}
bool isInjective(const Block & sample_block) override { return function->isInjective(sample_block); }
......
......@@ -72,7 +72,7 @@ public:
offsets.push_back(offset);
}
block.getByPosition(result).column = ColumnArray::create(col_value->replicate(offsets), std::move(offsets_col));
block.getByPosition(result).column = ColumnArray::create(col_value->replicate(offsets)->convertToFullColumnIfConst(), std::move(offsets_col));
}
};
......
......@@ -37,6 +37,12 @@ public:
const IDataType & type = *block.getByPosition(arguments[0]).type;
block.getByPosition(result).column = type.createColumnConst(input_rows_count, type.getDefault());
}
ColumnPtr getResultIfAlwaysReturnsConstantAndHasArguments(const Block & block, const ColumnNumbers & arguments) const override
{
const IDataType & type = *block.getByPosition(arguments[0]).type;
return type.createColumnConst(1, type.getDefault());
}
};
......
......@@ -49,11 +49,16 @@ public:
}
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override
{
block.getByPosition(result).column = getResultIfAlwaysReturnsConstantAndHasArguments(block, arguments)->cloneResized(input_rows_count);
}
ColumnPtr getResultIfAlwaysReturnsConstantAndHasArguments(const Block & block, const ColumnNumbers & arguments) const override
{
if (auto type8 = checkAndGetDataType<DataTypeEnum8>(block.getByPosition(arguments[0]).type.get()))
block.getByPosition(result).column = DataTypeUInt8().createColumnConst(input_rows_count, type8->getValues().size());
return DataTypeUInt8().createColumnConst(1, type8->getValues().size());
else if (auto type16 = checkAndGetDataType<DataTypeEnum16>(block.getByPosition(arguments[0]).type.get()))
block.getByPosition(result).column = DataTypeUInt16().createColumnConst(input_rows_count, type16->getValues().size());
return DataTypeUInt16().createColumnConst(1, type16->getValues().size());
else
throw Exception("The argument for function " + getName() + " must be Enum", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
......
......@@ -42,6 +42,11 @@ public:
{
block.getByPosition(result).column = DataTypeUInt8().createColumnConst(input_rows_count, 0u);
}
ColumnPtr getResultIfAlwaysReturnsConstantAndHasArguments(const Block &, const ColumnNumbers &) const override
{
return DataTypeUInt8().createColumnConst(1, 0u);
}
};
......
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
namespace DB
......@@ -38,7 +39,11 @@ namespace DB
void executeImpl(Block & block, const ColumnNumbers &, size_t result, size_t input_rows_count) override
{
block.getByPosition(result).column = DataTypeUInt8().createColumnConst(input_rows_count, UInt64(0));
/// This function is mainly used in query analysis instead of "in" functions
/// in the case when only header is needed and set for in is not calculated.
/// Because of that function must return the same column type as "in" function, which is ColumnUInt8.
auto res = ColumnUInt8::create(input_rows_count, 0);
block.getByPosition(result).column = std::move(res);
}
};
......
......@@ -75,6 +75,8 @@ public:
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) override
{
/// NOTE: after updating this code, check that FunctionIgnoreExceptNull returns the same type of column.
/// Second argument must be ColumnSet.
ColumnPtr column_set_ptr = block.getByPosition(arguments[1]).column;
const ColumnSet * column_set = typeid_cast<const ColumnSet *>(&*column_set_ptr);
......
......@@ -38,6 +38,11 @@ public:
block.getByPosition(result).column
= DataTypeString().createColumnConst(input_rows_count, block.getByPosition(arguments[0]).column->getName());
}
ColumnPtr getResultIfAlwaysReturnsConstantAndHasArguments(const Block & block, const ColumnNumbers & arguments) const override
{
return DataTypeString().createColumnConst(1, block.getByPosition(arguments[0]).type->createColumn()->getName());
}
};
......
......@@ -9,45 +9,83 @@ namespace DB
/** toTypeName(x) - get the type name
* Returns name of IDataType instance (name of data type).
*/
class FunctionToTypeName : public IFunction
class PreparedFunctionToTypeName : public PreparedFunctionImpl
{
public:
static constexpr auto name = "toTypeName";
static FunctionPtr create(const Context &)
{
return std::make_shared<FunctionToTypeName>();
}
String getName() const override { return name; }
protected:
bool useDefaultImplementationForNulls() const override { return false; }
bool useDefaultImplementationForLowCardinalityColumns() const override { return false; }
String getName() const override
/// Execute the function on the block.
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override
{
return name;
block.getByPosition(result).column
= DataTypeString().createColumnConst(input_rows_count, block.getByPosition(arguments[0]).type->getName());
}
};
bool useDefaultImplementationForNulls() const override { return false; }
bool useDefaultImplementationForLowCardinalityColumns() const override { return false; }
size_t getNumberOfArguments() const override
class BaseFunctionToTypeName : public IFunctionBase
{
public:
BaseFunctionToTypeName(DataTypes argument_types_, DataTypePtr return_type_)
: argument_types(std::move(argument_types_)), return_type(std::move(return_type_)) {}
static constexpr auto name = "toTypeName";
String getName() const override { return name; }
const DataTypes & getArgumentTypes() const override { return argument_types; }
const DataTypePtr & getReturnType() const override { return return_type; }
PreparedFunctionPtr prepare(const Block &, const ColumnNumbers &, size_t) const override
{
return 1;
return std::make_shared<PreparedFunctionToTypeName>();
}
DataTypePtr getReturnTypeImpl(const DataTypes & /*arguments*/) const override
ColumnPtr getResultIfAlwaysReturnsConstantAndHasArguments(const Block &, const ColumnNumbers &) const override
{
return std::make_shared<DataTypeString>();
return DataTypeString().createColumnConst(1, argument_types.at(0)->getName());
}
/// Execute the function on the block.
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override
private:
DataTypes argument_types;
DataTypePtr return_type;
};
class FunctionToTypeNameBuilder : public FunctionBuilderImpl
{
public:
static constexpr auto name = "toTypeName";
String getName() const override { return name; }
static FunctionBuilderPtr create(const Context &) { return std::make_shared<FunctionToTypeNameBuilder>(); }
size_t getNumberOfArguments() const override { return 1; }
protected:
DataTypePtr getReturnTypeImpl(const DataTypes &) const override { return std::make_shared<DataTypeString>(); }
FunctionBasePtr buildImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & return_type) const override
{
block.getByPosition(result).column
= DataTypeString().createColumnConst(input_rows_count, block.getByPosition(arguments[0]).type->getName());
DataTypes types;
types.reserve(arguments.size());
for (auto & elem : arguments)
types.emplace_back(elem.type);
return std::make_shared<BaseFunctionToTypeName>(types, return_type);
}
bool useDefaultImplementationForNulls() const override { return false; }
bool useDefaultImplementationForLowCardinalityColumns() const override { return false; }
};
void registerFunctionToTypeName(FunctionFactory & factory)
{
factory.registerFunction<FunctionToTypeName>();
factory.registerFunction<FunctionToTypeNameBuilder>();
}
}
......@@ -68,7 +68,13 @@ BlockInputStreamPtr createLocalStream(const ASTPtr & query_ast, const Context &
* If you do not do this, different types (Const and non-Const) columns will be produced in different threads,
* And this is not allowed, since all code is based on the assumption that in the block stream all types are the same.
*/
return std::make_shared<MaterializingBlockInputStream>(stream);
/* Now we don't need to materialize constants, because RemoteBlockInputStream will ignore constant and take it from header.
* So, streams from different threads will always have the same header.
*/
/// return std::make_shared<MaterializingBlockInputStream>(stream);
return stream;
}
}
......
......@@ -12,6 +12,7 @@
#include <Functions/IFunction.h>
#include <set>
#include <optional>
#include <DataTypes/DataTypeNullable.h>
namespace ProfileEvents
......@@ -159,20 +160,24 @@ ExpressionAction ExpressionAction::arrayJoin(const NameSet & array_joined_column
}
ExpressionAction ExpressionAction::ordinaryJoin(
const ASTTableJoin & join_params,
std::shared_ptr<const Join> join_,
const Names & join_key_names_left,
const Names & join_key_names_right,
const NamesAndTypesList & columns_added_by_join_)
{
ExpressionAction a;
a.type = JOIN;
a.join = std::move(join_);
a.join_kind = join_params.kind;
a.join_key_names_left = join_key_names_left;
a.join_key_names_right = join_key_names_right;
a.columns_added_by_join = columns_added_by_join_;
return a;
}
void ExpressionAction::prepare(Block & sample_block, const Settings & settings)
void ExpressionAction::prepare(Block & sample_block, const Settings & settings, NameSet & names_not_for_constant_folding)
{
// std::cerr << "preparing: " << toString() << std::endl;
......@@ -187,6 +192,7 @@ void ExpressionAction::prepare(Block & sample_block, const Settings & settings)
throw Exception("Column '" + result_name + "' already exists", ErrorCodes::DUPLICATE_COLUMN);
bool all_const = true;
bool all_suitable_for_constant_folding = true;
ColumnNumbers arguments(argument_names.size());
for (size_t i = 0; i < argument_names.size(); ++i)
......@@ -195,6 +201,9 @@ void ExpressionAction::prepare(Block & sample_block, const Settings & settings)
ColumnPtr col = sample_block.safeGetByPosition(arguments[i]).column;
if (!col || !isColumnConst(*col))
all_const = false;
if (names_not_for_constant_folding.count(argument_names[i]))
all_suitable_for_constant_folding = false;
}
size_t result_position = sample_block.columns();
......@@ -229,6 +238,22 @@ void ExpressionAction::prepare(Block & sample_block, const Settings & settings)
if (col.column->empty())
col.column = col.column->cloneResized(1);
if (!all_suitable_for_constant_folding)
names_not_for_constant_folding.insert(result_name);
}
}
/// Some functions like ignore() or getTypeName() always return constant result even if arguments are not constant.
/// We can't do constant folding, but can specify in sample block that function result is constant to avoid
/// unnecessary materialization.
auto & res = sample_block.getByPosition(result_position);
if (!res.column && function_base->isSuitableForConstantFolding())
{
if (auto col = function_base->getResultIfAlwaysReturnsConstantAndHasArguments(sample_block, arguments))
{
res.column = std::move(col);
names_not_for_constant_folding.insert(result_name);
}
}
......@@ -252,10 +277,50 @@ void ExpressionAction::prepare(Block & sample_block, const Settings & settings)
case JOIN:
{
/// TODO join_use_nulls setting
bool is_null_used_as_default = settings.join_use_nulls;
bool right_or_full_join = join_kind == ASTTableJoin::Kind::Right || join_kind == ASTTableJoin::Kind::Full;
bool left_or_full_join = join_kind == ASTTableJoin::Kind::Left || join_kind == ASTTableJoin::Kind::Full;
for (auto & col : sample_block)
{
/// Materialize column.
/// Column is not empty if it is constant, but after Join all constants will be materialized.
/// So, we need remove constants from header.
if (col.column)
col.column = nullptr;
bool make_nullable = is_null_used_as_default && right_or_full_join;
if (make_nullable && !col.type->isNullable())
col.type = std::make_shared<DataTypeNullable>(col.type);
}
for (const auto & col : columns_added_by_join)
sample_block.insert(ColumnWithTypeAndName(nullptr, col.type, col.name));
{
auto res_type = col.type;
bool make_nullable = is_null_used_as_default && left_or_full_join;
if (!make_nullable)
{
/// Keys from right table are usually not stored in Join, but copied from the left one.
/// So, if left key is nullable, let's make right key nullable too.
/// Note: for some join types it's not needed and, probably, may be removed.
/// Note: changing this code, take into account the implementation in Join.cpp.
auto it = std::find(join_key_names_right.begin(), join_key_names_right.end(), col.name);
if (it != join_key_names_right.end())
{
auto pos = it - join_key_names_right.begin();
const auto & left_key_name = join_key_names_left[pos];
make_nullable = sample_block.getByName(left_key_name).type->isNullable();
}
}
if (make_nullable && !res_type->isNullable())
res_type = std::make_shared<DataTypeNullable>(res_type);
sample_block.insert(ColumnWithTypeAndName(nullptr, res_type, col.name));
}
break;
}
......@@ -683,7 +748,7 @@ void ExpressionActions::addImpl(ExpressionAction action, Names & new_names)
for (const auto & name_with_alias : action.projection)
new_names.emplace_back(name_with_alias.second);
action.prepare(sample_block, settings);
action.prepare(sample_block, settings, names_not_for_constant_folding);
actions.push_back(action);
}
......@@ -915,7 +980,7 @@ void ExpressionActions::finalize(const Names & output_columns)
if (action.type == ExpressionAction::APPLY_FUNCTION && sample_block.has(out))
{
auto & result = sample_block.getByName(out);
if (result.column)
if (result.column && names_not_for_constant_folding.count(result.name) == 0)
{
action.type = ExpressionAction::ADD_COLUMN;
action.result_type = result.type;
......@@ -1262,6 +1327,7 @@ bool ExpressionAction::operator==(const ExpressionAction & other) const
&& array_join_is_left == other.array_join_is_left
&& join == other.join
&& join_key_names_left == other.join_key_names_left
&& join_key_names_right == other.join_key_names_right
&& columns_added_by_join == other.columns_added_by_join
&& projection == other.projection
&& is_function_compiled == other.is_function_compiled;
......
......@@ -10,6 +10,7 @@
#include "config_core.h"
#include <unordered_map>
#include <unordered_set>
#include <Parsers/ASTTablesInSelectQuery.h>
namespace DB
......@@ -104,7 +105,9 @@ public:
/// For JOIN
std::shared_ptr<const Join> join;
ASTTableJoin::Kind join_kind;
Names join_key_names_left;
Names join_key_names_right;
NamesAndTypesList columns_added_by_join;
/// For PROJECT.
......@@ -121,7 +124,8 @@ public:
static ExpressionAction project(const Names & projected_columns_);
static ExpressionAction addAliases(const NamesWithAliases & aliased_columns_);
static ExpressionAction arrayJoin(const NameSet & array_joined_columns, bool array_join_is_left, const Context & context);
static ExpressionAction ordinaryJoin(std::shared_ptr<const Join> join_, const Names & join_key_names_left,
static ExpressionAction ordinaryJoin(const ASTTableJoin & join_params, std::shared_ptr<const Join> join_,
const Names & join_key_names_left, const Names & join_key_names_right,
const NamesAndTypesList & columns_added_by_join_);
/// Which columns necessary to perform this action.
......@@ -139,7 +143,7 @@ public:
private:
friend class ExpressionActions;
void prepare(Block & sample_block, const Settings & settings);
void prepare(Block & sample_block, const Settings & settings, NameSet & names_not_for_constant_folding);
void execute(Block & block, bool dry_run) const;
void executeOnTotals(Block & block) const;
};
......@@ -263,6 +267,8 @@ private:
Actions actions;
/// The example of result (output) block.
Block sample_block;
/// Columns which can't be used for constant folding.
NameSet names_not_for_constant_folding;
Settings settings;
#if USE_EMBEDDED_COMPILER
......
......@@ -140,7 +140,7 @@ void ExpressionAnalyzer::analyzeAggregation()
for (const auto & key_ast : analyzedJoin().key_asts_left)
getRootActions(key_ast, true, temp_actions);
addJoinAction(temp_actions);
addJoinAction(table_join, temp_actions);
}
}
......@@ -424,9 +424,9 @@ static void appendRequiredColumns(
}
/// It's possible to set nullptr as join for only_types mode
void ExpressionAnalyzer::addJoinAction(ExpressionActionsPtr & actions, JoinPtr join) const
void ExpressionAnalyzer::addJoinAction(const ASTTableJoin & join_params, ExpressionActionsPtr & actions, JoinPtr join) const
{
actions->add(ExpressionAction::ordinaryJoin(join, analyzedJoin().key_names_left, columnsAddedByJoin()));
actions->add(ExpressionAction::ordinaryJoin(join_params, std::move(join), analyzedJoin().key_names_left, analyzedJoin().key_names_right, columnsAddedByJoin()));
}
bool SelectQueryExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_types)
......@@ -443,8 +443,10 @@ bool SelectQueryExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, b
initChain(chain, sourceColumns());
ExpressionActionsChain::Step & step = chain.steps.back();
auto & join_params = ast_join->table_join->as<ASTTableJoin &>();
getRootActions(left_keys_list, only_types, step.actions);
addJoinAction(step.actions, subquery_for_set.join);
addJoinAction(join_params, step.actions, subquery_for_set.join);
return true;
}
......
......@@ -131,7 +131,7 @@ protected:
void addMultipleArrayJoinAction(ExpressionActionsPtr & actions, bool is_left) const;
void addJoinAction(ExpressionActionsPtr & actions, JoinPtr join = {}) const;
void addJoinAction(const ASTTableJoin & join_params, ExpressionActionsPtr & actions, JoinPtr join = {}) const;
void getRootActions(const ASTPtr & ast, bool no_subqueries, ExpressionActionsPtr & actions, bool only_consts = false);
......
......@@ -82,6 +82,8 @@
#include <Processors/Transforms/RollupTransform.h>
#include <Processors/Transforms/CubeTransform.h>
#include <Processors/LimitTransform.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataStreams/materializeBlock.h>
namespace DB
......@@ -271,7 +273,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
String database_name;
String table_name;
getDatabaseAndTableNames(database_name, table_name);
getDatabaseAndTableNames(query, database_name, table_name, context);
if (auto view_source = context.getViewSource())
{
......@@ -345,17 +347,20 @@ InterpreterSelectQuery::InterpreterSelectQuery(
source_header = storage->getSampleBlockForColumns(required_columns);
/// Calculate structure of the result.
result_header = getSampleBlockImpl();
for (auto & col : result_header)
{
Pipeline pipeline;
executeImpl(pipeline, nullptr, true);
result_header = pipeline.firstStream()->getHeader();
if (!col.column)
col.column = col.type->createColumn();
else if (isColumnConst(*col.column) && !col.column->empty())
col.column = col.column->cloneEmpty();
}
}
void InterpreterSelectQuery::getDatabaseAndTableNames(String & database_name, String & table_name)
void InterpreterSelectQuery::getDatabaseAndTableNames(const ASTSelectQuery & query, String & database_name, String & table_name, const Context & context)
{
if (auto db_and_table = getDatabaseAndTable(getSelectQuery(), 0))
if (auto db_and_table = getDatabaseAndTable(query, 0))
{
table_name = db_and_table->table;
database_name = db_and_table->database;
......@@ -381,8 +386,8 @@ Block InterpreterSelectQuery::getSampleBlock()
BlockIO InterpreterSelectQuery::execute()
{
Pipeline pipeline;
executeImpl(pipeline, input, options.only_analyze);
executeUnion(pipeline);
executeImpl(pipeline, input);
executeUnion(pipeline, getSampleBlock());
BlockIO res;
res.in = pipeline.firstStream();
......@@ -392,28 +397,104 @@ BlockIO InterpreterSelectQuery::execute()
BlockInputStreams InterpreterSelectQuery::executeWithMultipleStreams()
{
Pipeline pipeline;
executeImpl(pipeline, input, options.only_analyze);
executeImpl(pipeline, input);
unifyStreams(pipeline, getSampleBlock());
return pipeline.streams;
}
QueryPipeline InterpreterSelectQuery::executeWithProcessors()
{
QueryPipeline query_pipeline;
executeImpl(query_pipeline, input, options.only_analyze);
executeImpl(query_pipeline, input);
return query_pipeline;
}
Block InterpreterSelectQuery::getSampleBlockImpl()
{
FilterInfoPtr filter_info;
/// Need to create sets before analyzeExpressions(). Otherwise some sets for index won't be created.
query_analyzer->makeSetsForIndex(getSelectQuery().where());
query_analyzer->makeSetsForIndex(getSelectQuery().prewhere());
auto analysis_result = analyzeExpressions(
getSelectQuery(),
*query_analyzer,
QueryProcessingStage::Enum::FetchColumns,
options.to_stage,
context,
storage,
true,
filter_info);
if (options.to_stage == QueryProcessingStage::Enum::FetchColumns)
{
auto header = source_header;
if (analysis_result.prewhere_info)
{
analysis_result.prewhere_info->prewhere_actions->execute(header);
header = materializeBlock(header);
if (analysis_result.prewhere_info->remove_prewhere_column)
header.erase(analysis_result.prewhere_info->prewhere_column_name);
}
return header;
}
if (options.to_stage == QueryProcessingStage::Enum::WithMergeableState)
{
if (!analysis_result.need_aggregate)
return analysis_result.before_order_and_select->getSampleBlock();
auto header = analysis_result.before_aggregation->getSampleBlock();
Names key_names;
AggregateDescriptions aggregates;
query_analyzer->getAggregateInfo(key_names, aggregates);
Block res;
for (auto & key : key_names)
res.insert({nullptr, header.getByName(key).type, key});
for (auto & aggregate : aggregates)
{
size_t arguments_size = aggregate.argument_names.size();
DataTypes argument_types(arguments_size);
for (size_t j = 0; j < arguments_size; ++j)
argument_types[j] = header.getByName(aggregate.argument_names[j]).type;
DataTypePtr type = std::make_shared<DataTypeAggregateFunction>(aggregate.function, argument_types, aggregate.parameters);
res.insert({nullptr, type, aggregate.column_name});
}
return res;
}
return analysis_result.final_projection->getSampleBlock();
}
InterpreterSelectQuery::AnalysisResult
InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage, bool dry_run, const FilterInfoPtr & filter_info)
InterpreterSelectQuery::analyzeExpressions(
const ASTSelectQuery & query,
SelectQueryExpressionAnalyzer & query_analyzer,
QueryProcessingStage::Enum from_stage,
QueryProcessingStage::Enum to_stage,
const Context & context,
const StoragePtr & storage,
bool only_types,
const FilterInfoPtr & filter_info)
{
AnalysisResult res;
/// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing.
res.first_stage = from_stage < QueryProcessingStage::WithMergeableState
&& options.to_stage >= QueryProcessingStage::WithMergeableState;
&& to_stage >= QueryProcessingStage::WithMergeableState;
/// Do I need to execute the second part of the pipeline - running on the initiating server during distributed processing.
res.second_stage = from_stage <= QueryProcessingStage::WithMergeableState
&& options.to_stage > QueryProcessingStage::WithMergeableState;
&& to_stage > QueryProcessingStage::WithMergeableState;
/** First we compose a chain of actions and remember the necessary steps from it.
* Regardless of from_stage and to_stage, we will compose a complete sequence of actions to perform optimization and
......@@ -468,8 +549,6 @@ InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage
{
ExpressionActionsChain chain(context);
auto & query = getSelectQuery();
Names additional_required_columns_after_prewhere;
if (storage && query.sample_size())
......@@ -486,14 +565,14 @@ InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage
columns_for_final.begin(), columns_for_final.end());
}
if (storage && context.hasUserProperty(storage->getDatabaseName(), storage->getTableName(), "filter"))
if (storage && filter_info)
{
has_filter = true;
/// XXX: aggregated copy-paste from ExpressionAnalyzer::appendSmth()
if (chain.steps.empty())
{
chain.steps.emplace_back(std::make_shared<ExpressionActions>(source_columns, context));
chain.steps.emplace_back(std::make_shared<ExpressionActions>(NamesAndTypesList(), context));
}
ExpressionActionsChain::Step & step = chain.steps.back();
......@@ -506,7 +585,7 @@ InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage
chain.addStep();
}
if (query_analyzer->appendPrewhere(chain, !res.first_stage, additional_required_columns_after_prewhere))
if (query_analyzer.appendPrewhere(chain, !res.first_stage, additional_required_columns_after_prewhere))
{
has_prewhere = true;
......@@ -516,11 +595,11 @@ InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage
chain.addStep();
}
res.need_aggregate = query_analyzer->hasAggregation();
res.need_aggregate = query_analyzer.hasAggregation();
query_analyzer->appendArrayJoin(chain, dry_run || !res.first_stage);
query_analyzer.appendArrayJoin(chain, only_types || !res.first_stage);
if (query_analyzer->appendJoin(chain, dry_run || !res.first_stage))
if (query_analyzer.appendJoin(chain, only_types || !res.first_stage))
{
res.before_join = chain.getLastActions();
if (!res.hasJoin())
......@@ -528,7 +607,7 @@ InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage
chain.addStep();
}
if (query_analyzer->appendWhere(chain, dry_run || !res.first_stage))
if (query_analyzer.appendWhere(chain, only_types || !res.first_stage))
{
where_step_num = chain.steps.size() - 1;
has_where = res.has_where = true;
......@@ -538,13 +617,13 @@ InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage
if (res.need_aggregate)
{
query_analyzer->appendGroupBy(chain, dry_run || !res.first_stage);
query_analyzer->appendAggregateFunctionsArguments(chain, dry_run || !res.first_stage);
query_analyzer.appendGroupBy(chain, only_types || !res.first_stage);
query_analyzer.appendAggregateFunctionsArguments(chain, only_types || !res.first_stage);
res.before_aggregation = chain.getLastActions();
finalizeChain(chain);
if (query_analyzer->appendHaving(chain, dry_run || !res.second_stage))
if (query_analyzer.appendHaving(chain, only_types || !res.second_stage))
{
res.has_having = true;
res.before_having = chain.getLastActions();
......@@ -553,20 +632,20 @@ InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage
}
/// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers.
query_analyzer->appendSelect(chain, dry_run || (res.need_aggregate ? !res.second_stage : !res.first_stage));
query_analyzer.appendSelect(chain, only_types || (res.need_aggregate ? !res.second_stage : !res.first_stage));
res.selected_columns = chain.getLastStep().required_output;
res.has_order_by = query_analyzer->appendOrderBy(chain, dry_run || (res.need_aggregate ? !res.second_stage : !res.first_stage));
res.has_order_by = query_analyzer.appendOrderBy(chain, only_types || (res.need_aggregate ? !res.second_stage : !res.first_stage));
res.before_order_and_select = chain.getLastActions();
chain.addStep();
if (query_analyzer->appendLimitBy(chain, dry_run || !res.second_stage))
if (query_analyzer.appendLimitBy(chain, only_types || !res.second_stage))
{
res.has_limit_by = true;
res.before_limit_by = chain.getLastActions();
chain.addStep();
}
query_analyzer->appendProjectResult(chain);
query_analyzer.appendProjectResult(chain);
res.final_projection = chain.getLastActions();
finalizeChain(chain);
......@@ -580,7 +659,7 @@ InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage
if (res.has_having)
res.before_having->prependProjectInput();
res.subqueries_for_sets = query_analyzer->getSubqueriesForSets();
res.subqueries_for_sets = query_analyzer.getSubqueriesForSets();
/// Check that PREWHERE doesn't contain unusual actions. Unusual actions are that can change number of rows.
if (res.prewhere_info)
......@@ -747,7 +826,7 @@ static SortingInfoPtr optimizeReadInOrder(const MergeTreeData & merge_tree, cons
template <typename TPipeline>
void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputStreamPtr & prepared_input, bool dry_run)
void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputStreamPtr & prepared_input)
{
/** Streams of data. When the query is executed in parallel, we have several data streams.
* If there is no GROUP BY, then perform all operations before ORDER BY and LIMIT in parallel, then
......@@ -771,7 +850,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
/// Turn off, if the table filter is applied.
if (storage && !context.hasUserProperty(storage->getDatabaseName(), storage->getTableName(), "filter"))
{
if (!dry_run)
if (!options.only_analyze)
from_stage = storage->getQueryProcessingStage(context);
query_analyzer->makeSetsForIndex(query.where());
......@@ -811,14 +890,22 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
sorting_info = optimizeReadInOrder(*merge_tree_data, query, context, syntax_analyzer_result);
}
if (dry_run)
if (options.only_analyze)
{
if constexpr (pipeline_with_processors)
pipeline.init({std::make_shared<NullSource>(source_header)});
else
pipeline.streams.emplace_back(std::make_shared<NullBlockInputStream>(source_header));
expressions = analyzeExpressions(QueryProcessingStage::FetchColumns, true, filter_info);
expressions = analyzeExpressions(
getSelectQuery(),
*query_analyzer,
QueryProcessingStage::FetchColumns,
options.to_stage,
context,
storage,
true,
filter_info);
if (storage && expressions.filter_info && expressions.prewhere_info)
throw Exception("PREWHERE is not supported if the table is filtered by row-level security expression", ErrorCodes::ILLEGAL_PREWHERE);
......@@ -850,7 +937,15 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
pipeline.streams.push_back(prepared_input);
}
expressions = analyzeExpressions(from_stage, false, filter_info);
expressions = analyzeExpressions(
getSelectQuery(),
*query_analyzer,
from_stage,
options.to_stage,
context,
storage,
false,
filter_info);
if (from_stage == QueryProcessingStage::WithMergeableState &&
options.to_stage == QueryProcessingStage::WithMergeableState)
......@@ -1097,7 +1192,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
if constexpr (pipeline_with_processors)
pipeline.resize(1);
else
executeUnion(pipeline);
executeUnion(pipeline, {});
}
/** If there was more than one stream,
......@@ -1696,7 +1791,7 @@ void InterpreterSelectQuery::executeMergeAggregated(Pipeline & pipeline, bool ov
if (!settings.distributed_aggregation_memory_efficient)
{
/// We union several sources into one, parallelizing the work.
executeUnion(pipeline);
executeUnion(pipeline, {});
/// Now merge the aggregated blocks
pipeline.firstStream() = std::make_shared<MergingAggregatedBlockInputStream>(pipeline.firstStream(), params, final, settings.max_threads);
......@@ -1798,7 +1893,7 @@ void InterpreterSelectQuery::executeHaving(QueryPipeline & pipeline, const Expre
void InterpreterSelectQuery::executeTotalsAndHaving(Pipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final)
{
executeUnion(pipeline);
executeUnion(pipeline, {});
const Settings & settings = context.getSettingsRef();
......@@ -1827,7 +1922,7 @@ void InterpreterSelectQuery::executeTotalsAndHaving(QueryPipeline & pipeline, bo
void InterpreterSelectQuery::executeRollupOrCube(Pipeline & pipeline, Modificator modificator)
{
executeUnion(pipeline);
executeUnion(pipeline, {});
Names key_names;
AggregateDescriptions aggregates;
......@@ -1972,7 +2067,7 @@ void InterpreterSelectQuery::executeOrder(Pipeline & pipeline, SortingInfoPtr so
});
/// If there are several streams, we merge them into one
executeUnion(pipeline);
executeUnion(pipeline, {});
/// Merge the sorted blocks.
pipeline.firstStream() = std::make_shared<MergeSortingBlockInputStream>(
......@@ -2032,7 +2127,7 @@ void InterpreterSelectQuery::executeMergeSorted(Pipeline & pipeline)
/// If there are several streams, then we merge them into one
if (pipeline.hasMoreThanOneStream())
{
unifyStreams(pipeline);
unifyStreams(pipeline, pipeline.firstStream()->getHeader());
/** MergingSortedBlockInputStream reads the sources sequentially.
* To make the data on the remote servers prepared in parallel, we wrap it in AsynchronousBlockInputStream.
......@@ -2136,12 +2231,15 @@ void InterpreterSelectQuery::executeDistinct(QueryPipeline & pipeline, bool befo
}
void InterpreterSelectQuery::executeUnion(Pipeline & pipeline)
void InterpreterSelectQuery::executeUnion(Pipeline & pipeline, Block header)
{
/// If there are still several streams, then we combine them into one
if (pipeline.hasMoreThanOneStream())
{
unifyStreams(pipeline);
if (!header)
header = pipeline.firstStream()->getHeader();
unifyStreams(pipeline, std::move(header));
pipeline.firstStream() = std::make_shared<UnionBlockInputStream>(pipeline.streams, pipeline.stream_with_non_joined_data, max_streams);
pipeline.stream_with_non_joined_data = nullptr;
......@@ -2351,7 +2449,7 @@ void InterpreterSelectQuery::executeExtremes(QueryPipeline & pipeline)
void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(Pipeline & pipeline, SubqueriesForSets & subqueries_for_sets)
{
executeUnion(pipeline);
executeUnion(pipeline, {});
pipeline.firstStream() = std::make_shared<CreatingSetsBlockInputStream>(
pipeline.firstStream(), subqueries_for_sets, context);
}
......@@ -2369,20 +2467,22 @@ void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(QueryPipeline & pip
}
void InterpreterSelectQuery::unifyStreams(Pipeline & pipeline)
void InterpreterSelectQuery::unifyStreams(Pipeline & pipeline, Block header)
{
if (pipeline.hasMoreThanOneStream())
/// Unify streams in case they have different headers.
/// TODO: remove previos addition of _dummy column.
if (header.columns() > 1 && header.has("_dummy"))
header.erase("_dummy");
for (size_t i = 0; i < pipeline.streams.size(); ++i)
{
/// Unify streams in case they have different headers.
auto first_header = pipeline.streams.at(0)->getHeader();
for (size_t i = 1; i < pipeline.streams.size(); ++i)
{
auto & stream = pipeline.streams[i];
auto header = stream->getHeader();
auto mode = ConvertingBlockInputStream::MatchColumnsMode::Name;
if (!blocksHaveEqualStructure(first_header, header))
stream = std::make_shared<ConvertingBlockInputStream>(context, stream, first_header, mode);
}
auto & stream = pipeline.streams[i];
auto stream_header = stream->getHeader();
auto mode = ConvertingBlockInputStream::MatchColumnsMode::Name;
if (!blocksHaveEqualStructure(header, stream_header))
stream = std::make_shared<ConvertingBlockInputStream>(context, stream, header, mode);
}
}
......
......@@ -90,6 +90,7 @@ private:
ASTSelectQuery & getSelectQuery() { return query_ptr->as<ASTSelectQuery &>(); }
Block getSampleBlockImpl();
struct Pipeline
{
......@@ -135,7 +136,7 @@ private:
};
template <typename TPipeline>
void executeImpl(TPipeline & pipeline, const BlockInputStreamPtr & prepared_input, bool dry_run);
void executeImpl(TPipeline & pipeline, const BlockInputStreamPtr & prepared_input);
struct AnalysisResult
{
......@@ -172,12 +173,19 @@ private:
FilterInfoPtr filter_info;
};
AnalysisResult analyzeExpressions(QueryProcessingStage::Enum from_stage, bool dry_run, const FilterInfoPtr & filter_info);
static AnalysisResult analyzeExpressions(
const ASTSelectQuery & query,
SelectQueryExpressionAnalyzer & query_analyzer,
QueryProcessingStage::Enum from_stage,
QueryProcessingStage::Enum to_stage,
const Context & context,
const StoragePtr & storage,
bool only_types,
const FilterInfoPtr & filter_info);
/** From which table to read. With JOIN, the "left" table is returned.
*/
void getDatabaseAndTableNames(String & database_name, String & table_name);
static void getDatabaseAndTableNames(const ASTSelectQuery & query, String & database_name, String & table_name, const Context & context);
/// Different stages of query execution.
......@@ -198,7 +206,7 @@ private:
void executeOrder(Pipeline & pipeline, SortingInfoPtr sorting_info);
void executeMergeSorted(Pipeline & pipeline);
void executePreLimit(Pipeline & pipeline);
void executeUnion(Pipeline & pipeline);
void executeUnion(Pipeline & pipeline, Block header); /// If header is not empty, convert streams structure to it.
void executeLimitBy(Pipeline & pipeline);
void executeLimit(Pipeline & pipeline);
void executeProjection(Pipeline & pipeline, const ExpressionActionsPtr & expression);
......@@ -222,8 +230,8 @@ private:
void executeExtremes(QueryPipeline & pipeline);
void executeSubqueriesInSetsAndJoins(QueryPipeline & pipeline, std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
/// If pipeline has several streams with different headers, add ConvertingBlockInputStream to first header.
void unifyStreams(Pipeline & pipeline);
/// Add ConvertingBlockInputStream to specified header.
void unifyStreams(Pipeline & pipeline, Block header);
enum class Modificator
{
......@@ -246,7 +254,6 @@ private:
const SelectQueryOptions options;
ASTPtr query_ptr;
Context context;
NamesAndTypesList source_columns;
SyntaxAnalyzerResultPtr syntax_analyzer_result;
std::unique_ptr<SelectQueryExpressionAnalyzer> query_analyzer;
SelectQueryInfo query_info;
......
......@@ -126,6 +126,8 @@ public:
bool empty() { return type == Type::EMPTY; }
bool isNullUsedAsDefault() const { return use_nulls; }
/** Set information about structure of right hand of JOIN (joined data).
* You must call this method before subsequent calls to insertFromBlock.
*/
......@@ -168,6 +170,7 @@ public:
size_t getTotalByteCount() const;
ASTTableJoin::Kind getKind() const { return kind; }
ASTTableJoin::Strictness getStrictness() const { return strictness; }
AsofRowRefs::Type getAsofType() const { return *asof_type; }
bool anyTakeLastRow() const { return any_take_last_row; }
......
......@@ -461,7 +461,8 @@ BlockInputStreamPtr MutationsInterpreter::addStreamsForLaterStages(const std::ve
void MutationsInterpreter::validate(TableStructureReadLockHolder &)
{
prepare(/* dry_run = */ true);
Block first_stage_header = interpreter_select->getSampleBlock();
/// Do not use getSampleBlock in order to check the whole pipeline.
Block first_stage_header = interpreter_select->execute().in->getHeader();
BlockInputStreamPtr in = std::make_shared<NullBlockInputStream>(first_stage_header);
addStreamsForLaterStages(stages, in)->getHeader();
}
......
......@@ -102,8 +102,24 @@ void QueryNormalizer::visit(ASTIdentifier & node, ASTPtr & ast, Data & data)
/// If it is an alias, but not a parent alias (for constructs like "SELECT column + 1 AS column").
auto it_alias = data.aliases.find(node.name);
if (IdentifierSemantic::canBeAlias(node) && it_alias != data.aliases.end() && current_alias != node.name)
if (it_alias != data.aliases.end() && current_alias != node.name)
{
if (!IdentifierSemantic::canBeAlias(node))
{
/// This means that column had qualified name, which was translated (so, canBeAlias() returns false).
/// But there is an alias with the same name. So, let's use original name for that column.
/// If alias wasn't set, use original column name as alias.
/// That helps to avoid result set with columns which have same names but different values.
if (node.alias.empty())
{
node.name.swap(node.alias);
node.restoreCompoundName();
node.name.swap(node.alias);
}
return;
}
auto & alias_node = it_alias->second;
/// Let's replace it with the corresponding tree node.
......
......@@ -317,8 +317,8 @@ BlockInputStreams StorageDistributed::read(
const auto & modified_query_ast = rewriteSelectQuery(
query_info.query, remote_database, remote_table, remote_table_function_ptr);
Block header = materializeBlock(
InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage)).getSampleBlock());
Block header =
InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage)).getSampleBlock();
ClusterProxy::SelectStreamFactory select_stream_factory = remote_table_function_ptr
? ClusterProxy::SelectStreamFactory(
......
......@@ -2,5 +2,5 @@ SELECT groupArrayInsertAt(toString(number), number * 2) FROM (SELECT * FROM syst
SELECT groupArrayInsertAt('-')(toString(number), number * 2) FROM (SELECT * FROM system.numbers LIMIT 10);
SELECT groupArrayInsertAt([123])(range(number), number * 2) FROM (SELECT * FROM system.numbers LIMIT 10);
SELECT number, groupArrayInsertAt(number, number) FROM (SELECT * FROM system.numbers LIMIT 10) GROUP BY number ORDER BY number;
SELECT k, ignore(groupArrayInsertAt(x, x)) FROM (SELECT dummy AS k, randConstant() % 10 AS x FROM remote('127.0.0.{1,1}', system.one)) GROUP BY k ORDER BY k;
SELECT k, ignore(groupArrayInsertAt(x, x)) FROM (SELECT dummy AS k, (randConstant() * 10) % 10 AS x FROM remote('127.0.0.{1,1}', system.one)) GROUP BY k ORDER BY k;
SELECT k, groupArrayInsertAt('-', 10)(toString(x), x) FROM (SELECT number AS k, number AS x FROM system.numbers LIMIT 11) GROUP BY k ORDER BY k;
a 2018-01-01 00:00:00 0000-00-00 00:00:00
b 2018-01-01 00:00:00 b 2018-01-01 00:00:00
c 2018-01-01 00:00:00 c 2018-01-01 00:00:00
b 2018-01-01 00:00:00 b 2018-01-01 00:00:00
c 2018-01-01 00:00:00 c 2018-01-01 00:00:00
a 2018-01-01 00:00:00 0000-00-00 00:00:00
b 2018-01-01 00:00:00 b b 2018-01-01 00:00:00
c 2018-01-01 00:00:00 c c 2018-01-01 00:00:00
b 2018-01-01 00:00:00 b b 2018-01-01 00:00:00
c 2018-01-01 00:00:00 c c 2018-01-01 00:00:00
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册