提交 870f7667 编写于 作者: A Alexey Milovidov

dbms: JOINs: development [#METR-11370].

上级 3f650bfb
......@@ -137,6 +137,8 @@ private:
std::unordered_map<String, SetPtr> sets_with_subqueries;
Joins joins;
NameSet join_key_names_set;
NamesAndTypesList columns_added_by_join;
typedef std::unordered_map<String, ASTPtr> Aliases;
......@@ -166,12 +168,9 @@ private:
*/
void removeUnusedColumns();
/** Найти обычные (не ARRAY) JOIN-ы, записать их в joins.
* Удалить из множества столбцов required_columns те, которые будут присоединены (JOIN).
* То есть, столбцы, являющиеся результатом подзапроса в секции JOIN,
* но не входящие в ключи JOIN-а (USING).
/** Найти столбцы, получаемые путём JOIN-а.
*/
void findJoins(NameSet & required_columns);
void collectJoinedColumns(NameSet & joined_columns, NamesAndTypesList & joined_columns_name_type);
/** Создать словарь алиасов.
*/
......@@ -196,7 +195,7 @@ private:
void getArrayJoinedColumnsImpl(ASTPtr ast);
void addMultipleArrayJoinAction(ExpressionActions & actions);
void addJoinAction(ExpressionActions & actions);
void addJoinAction(ExpressionActions & actions, bool only_types);
struct ScopeStack;
void getActionsImpl(ASTPtr ast, bool no_subqueries, bool only_consts, ScopeStack & actions_stack);
......@@ -209,7 +208,14 @@ private:
/// Установить has_aggregation = true, если есть хоть одна агрегатная функция.
void getAggregatesImpl(ASTPtr ast, ExpressionActions & actions);
void getRequiredColumnsImpl(ASTPtr ast, NameSet & required_columns, NameSet & ignored_names);
/** Получить множество нужных столбцов для чтения из таблицы.
* При этом, столбцы, указанные в ignored_names, считаются ненужными. И параметр ignored_names может модифицироваться.
* Множество столбцов available_joined_columns - столбцы, доступные из JOIN-а, они не нужны для чтения из основной таблицы.
* Положить в required_joined_columns множество столбцов, доступных из JOIN-а и востребованных.
*/
void getRequiredColumnsImpl(ASTPtr ast,
NameSet & required_columns, NameSet & ignored_names,
const NameSet & available_joined_columns, NameSet & required_joined_columns);
/// Получить таблицу, из которой идет запрос
StoragePtr getTable();
......
......@@ -314,7 +314,7 @@ std::string ExpressionAction::toString() const
break;
case COPY_COLUMN:
ss << "COPY " << result_name << " " << result_type->getName() << " = " << source_name;
ss << "COPY " << result_name << " = " << source_name;
break;
case APPLY_FUNCTION:
......
......@@ -104,7 +104,7 @@ void ExpressionAnalyzer::init()
if (select_query && select_query->join)
{
getRootActionsImpl(dynamic_cast<ASTJoin &>(*select_query->join).using_expr_list, true, false, temp_actions);
addJoinAction(temp_actions);
addJoinAction(temp_actions, true);
}
getAggregatesImpl(ast, temp_actions);
......@@ -1235,9 +1235,9 @@ bool ExpressionAnalyzer::appendArrayJoin(ExpressionActionsChain & chain, bool on
return true;
}
void ExpressionAnalyzer::addJoinAction(ExpressionActions & actions)
void ExpressionAnalyzer::addJoinAction(ExpressionActions & actions, bool only_types)
{
actions.add(ExpressionAction::ordinaryJoin(joins[0], columns_added_by_join));
actions.add(ExpressionAction::ordinaryJoin(only_types ? nullptr : joins[0], columns_added_by_join));
}
bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_types)
......@@ -1252,7 +1252,39 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
getRootActionsImpl(dynamic_cast<ASTJoin &>(*select_query->join).using_expr_list, only_types, false, *step.actions);
addJoinAction(*step.actions);
{
Names join_key_names(join_key_names_set.begin(), join_key_names_set.end());
JoinPtr join = new Join(join_key_names, settings.limits);
/** Для подзапроса в секции JOIN не действуют ограничения на максимальный размер результата.
* Так как результат этого поздапроса - ещё не результат всего запроса.
* Вместо этого работают ограничения max_rows_in_set, max_bytes_in_set, set_overflow_mode.
* TODO: отдельные ограничения для JOIN.
*/
Context subquery_context = context;
Settings subquery_settings = context.getSettings();
subquery_settings.limits.max_result_rows = 0;
subquery_settings.limits.max_result_bytes = 0;
/// Вычисление extremes не имеет смысла и не нужно (если его делать, то в результате всего запроса могут взяться extremes подзапроса).
subquery_settings.extremes = 0;
subquery_context.setSettings(subquery_settings);
Names required_joined_columns(join_key_names.begin(), join_key_names.end());
for (const auto & name_type : columns_added_by_join)
required_joined_columns.push_back(name_type.first);
InterpreterSelectQuery interpreter(
dynamic_cast<ASTJoin &>(*select_query->join).subquery->children[0], subquery_context,
required_joined_columns, true,
QueryProcessingStage::Complete, subquery_depth + 1);
Block right_table_sample = interpreter.getSampleBlock();
join->setSource(interpreter.execute());
joins.push_back(join);
}
addJoinAction(*step.actions, false);
return true;
}
......@@ -1533,15 +1565,33 @@ void ExpressionAnalyzer::removeUnusedColumns()
{
/// Для выражений в ARRAY JOIN ничего игнорировать не нужно.
NameSet empty;
getRequiredColumnsImpl(expressions[i], required, empty);
getRequiredColumnsImpl(expressions[i], required, empty, empty, empty);
}
ignored.insert(expressions[i]->getAlias());
}
}
getRequiredColumnsImpl(ast, required, ignored);
findJoins(required);
/** Также нужно не учитывать идентификаторы столбцов, получающихся путём JOIN-а.
* (Не считать, что они требуются для чтения из "левой" таблицы).
*/
NameSet available_joined_columns;
collectJoinedColumns(available_joined_columns, columns_added_by_join);
NameSet required_joined_columns;
getRequiredColumnsImpl(ast, required, ignored, available_joined_columns, required_joined_columns);
for (NamesAndTypesList::iterator it = columns_added_by_join.begin(); it != columns_added_by_join.end();)
{
if (required_joined_columns.count(it->first))
++it;
else
columns_added_by_join.erase(it++);
}
for (const auto & name_type : columns_added_by_join)
std::cerr << "JOINed column (required): " << name_type.first << std::endl;
std::cerr << std::endl;
/// Вставляем в список требуемых столбцов столбцы, нужные для вычисления ARRAY JOIN.
NameSet array_join_sources;
......@@ -1585,77 +1635,42 @@ void ExpressionAnalyzer::removeUnusedColumns()
}
}
void ExpressionAnalyzer::findJoins(NameSet & required_columns)
void ExpressionAnalyzer::collectJoinedColumns(NameSet & joined_columns, NamesAndTypesList & joined_columns_name_type)
{
if (!select_query || !select_query->join)
return;
std::cerr << "Found JOIN" << std::endl;
std::cerr << "collectJoinedColumns" << std::endl;
auto & node = dynamic_cast<ASTJoin &>(*select_query->join);
auto & join_keys_expr_list = dynamic_cast<ASTExpressionList &>(*node.using_expr_list);
auto & keys = dynamic_cast<ASTExpressionList &>(*node.using_expr_list);
auto & select = node.subquery->children[0];
size_t num_join_keys = join_keys_expr_list.children.size();
Names join_key_names(num_join_keys);
NameSet join_key_names_set;
size_t num_join_keys = keys.children.size();
for (size_t i = 0; i < num_join_keys; ++i)
{
String name = join_keys_expr_list.children[i]->getColumnName();
join_key_names[i] = name;
std::cerr << "USING " << name << std::endl;
if (!join_key_names_set.insert(name).second)
if (!join_key_names_set.insert(keys.children[i]->getColumnName()).second)
throw Exception("Duplicate column in USING list", ErrorCodes::DUPLICATE_COLUMN);
}
JoinPtr join = new Join(join_key_names, settings.limits);
/** Для подзапроса в секции JOIN не действуют ограничения на максимальный размер результата.
* Так как результат этого поздапроса - ещё не результат всего запроса.
* Вместо этого работают ограничения max_rows_in_set, max_bytes_in_set, set_overflow_mode.
* TODO: отдельные ограничения для JOIN.
*/
Context subquery_context = context;
Settings subquery_settings = context.getSettings();
subquery_settings.limits.max_result_rows = 0;
subquery_settings.limits.max_result_bytes = 0;
/// Вычисление extremes не имеет смысла и не нужно (если его делать, то в результате всего запроса могут взяться extremes подзапроса).
subquery_settings.extremes = 0;
subquery_context.setSettings(subquery_settings);
InterpreterSelectQuery interpreter(
node.subquery->children[0], subquery_context,
/// Будем использовать в подзапросе только столбцы, которые есть в required_columns.
Names(required_columns.begin(), required_columns.end()), true,
QueryProcessingStage::Complete, subquery_depth + 1);
Block right_table_sample = interpreter.getSampleBlock();
join->setSource(interpreter.execute());
Block nested_result_sample = ExpressionAnalyzer(select, context, subquery_depth + 1).getSelectSampleBlock();
joins.push_back(join);
std::cerr << right_table_sample.dumpNames() << std::endl;
for (const auto & x : required_columns)
std::cerr << "Required column: " << x << std::endl;
/// Удаляем из required_columns столбцы, которые есть в подзапросе, но нет в USING-е.
for (NameSet::iterator it = required_columns.begin(); it != required_columns.end();)
size_t nested_result_columns = nested_result_sample.columns();
for (size_t i = 0; i < nested_result_columns; ++i)
{
if (right_table_sample.has(*it) && !join_key_names_set.count(*it))
auto col = nested_result_sample.getByPosition(i);
if (!join_key_names_set.count(col.name))
{
ColumnWithNameAndType & added_col = right_table_sample.getByName(*it);
columns_added_by_join.emplace_back(added_col.name, added_col.type);
std::cerr << "Column added by JOIN: " << *it << std::endl;
required_columns.erase(it++);
joined_columns.insert(col.name);
joined_columns_name_type.emplace_back(col.name, col.type);
}
else
++it;
}
for (const auto & name : join_key_names_set)
std::cerr << "JOIN key: " << name << std::endl;
std::cerr << std::endl;
for (const auto & name : joined_columns)
std::cerr << "JOINed column: " << name << std::endl;
std::cerr << std::endl;
}
Names ExpressionAnalyzer::getRequiredColumns()
......@@ -1670,14 +1685,17 @@ Names ExpressionAnalyzer::getRequiredColumns()
return res;
}
void ExpressionAnalyzer::getRequiredColumnsImpl(ASTPtr ast, NameSet & required_columns, NameSet & ignored_names)
void ExpressionAnalyzer::getRequiredColumnsImpl(ASTPtr ast,
NameSet & required_columns, NameSet & ignored_names,
const NameSet & available_joined_columns, NameSet & required_joined_columns)
{
/** Найдём все идентификаторы в запросе.
* Будем искать их рекурсивно, обходя в глубину AST.
* При этом:
* - для лямбда функций не будем брать формальные параметры;
* - не опускаемся в подзапросы (там свои идентификаторы);
* - некоторое исключение для секции ARRAY JOIN (в ней идентификаторы немного другие).
* - некоторое исключение для секции ARRAY JOIN (в ней идентификаторы немного другие);
* - идентификаторы, доступные из JOIN-а, кладём в required_joined_columns.
*/
if (ASTIdentifier * node = dynamic_cast<ASTIdentifier *>(&*ast))
......@@ -1686,7 +1704,10 @@ void ExpressionAnalyzer::getRequiredColumnsImpl(ASTPtr ast, NameSet & required_c
&& !ignored_names.count(node->name)
&& !ignored_names.count(DataTypeNested::extractNestedTableName(node->name)))
{
required_columns.insert(node->name);
if (!available_joined_columns.count(node->name))
required_columns.insert(node->name);
else
required_joined_columns.insert(node->name);
}
return;
......@@ -1711,7 +1732,8 @@ void ExpressionAnalyzer::getRequiredColumnsImpl(ASTPtr ast, NameSet & required_c
ASTIdentifier * identifier = dynamic_cast<ASTIdentifier *>(&*lambda_args_tuple->arguments->children[i]);
if (!identifier)
throw Exception("lambda argument declarations must be identifiers", ErrorCodes::TYPE_MISMATCH);
std::string name = identifier->name;
String & name = identifier->name;
if (!ignored_names.count(name))
{
ignored_names.insert(name);
......@@ -1719,7 +1741,9 @@ void ExpressionAnalyzer::getRequiredColumnsImpl(ASTPtr ast, NameSet & required_c
}
}
getRequiredColumnsImpl(node->arguments->children[1], required_columns, ignored_names);
getRequiredColumnsImpl(node->arguments->children[1],
required_columns, ignored_names,
available_joined_columns, required_joined_columns);
for (size_t i = 0; i < added_ignored.size(); ++i)
ignored_names.erase(added_ignored[i]);
......@@ -1738,7 +1762,7 @@ void ExpressionAnalyzer::getRequiredColumnsImpl(ASTPtr ast, NameSet & required_c
*/
if (!dynamic_cast<ASTSubquery *>(&*child) && !dynamic_cast<ASTSelectQuery *>(&*child) &&
!(select && child == select->array_join_expression_list))
getRequiredColumnsImpl(child, required_columns, ignored_names);
getRequiredColumnsImpl(child, required_columns, ignored_names, available_joined_columns, required_joined_columns);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册