提交 8643f02a 编写于 作者: A Alexey Milovidov

dbms: reworked GLOBAL subqueries (probably doesn't work) [#METR-11370].

上级 0ee947c6
#pragma once
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Interpreters/ExpressionAnalyzer.h>
#include <DB/Interpreters/Set.h>
#include <DB/Interpreters/Join.h>
......@@ -13,14 +14,17 @@ namespace DB
class CreatingSetsBlockInputStream : public IProfilingBlockInputStream
{
public:
CreatingSetsBlockInputStream(BlockInputStreamPtr input, const Sets & sets_, const Joins & joins_)
: sets(sets_), joins(joins_)
CreatingSetsBlockInputStream(
BlockInputStreamPtr input,
SubqueriesForSets & subqueries_for_sets_,
const Limits & limits)
: subqueries_for_sets(subqueries_for_sets_),
max_rows_to_transfer(limits.max_rows_to_transfer),
max_bytes_to_transfer(limits.max_bytes_to_transfer),
transfer_overflow_mode(limits.transfer_overflow_mode)
{
for (auto & set : sets)
children.push_back(set->getSource());
for (auto & join : joins)
children.push_back(join->getSource());
for (auto & elem : subqueries_for_sets)
children.push_back(elem.second.source);
children.push_back(input);
}
......@@ -50,15 +54,19 @@ protected:
Block readImpl();
private:
Sets sets;
Joins joins;
SubqueriesForSets subqueries_for_sets;
bool created = false;
size_t max_rows_to_transfer;
size_t max_bytes_to_transfer;
OverflowMode transfer_overflow_mode;
size_t rows_to_transfer = 0;
size_t bytes_to_transfer = 0;
Logger * log = &Logger::get("CreatingSetsBlockInputStream");
void createSet(SetPtr & set);
void createJoin(JoinPtr & join);
void logProfileInfo(Stopwatch & watch, IBlockInputStream & in, size_t entries);
void create(SubqueryForSet & subquery);
};
}
......@@ -15,6 +15,29 @@
namespace DB
{
/** Информация о том, что делать при выполнении подзапроса в секции [GLOBAL] IN/JOIN.
*/
struct SubqueryForSet
{
/// Источник - получен с помощью InterpreterSelectQuery подзапроса.
BlockInputStreamPtr source;
/// Если задано - создать из результата Set.
SetPtr set;
/// Если задано - создать из результата Join.
JoinPtr join;
/// Если задано - положить результат в таблицу.
/// Это - временная таблица для передачи на удалённые серверы при распределённой обработке запроса.
StoragePtr table;
};
/// ID подзапроса -> что с ним делать.
typedef std::unordered_map<String, SubqueryForSet> SubqueriesForSets;
/** Превращает выражение из синтаксического дерева в последовательность действий для его выполнения.
*
* NOTE: если ast - запрос SELECT из таблицы, структура этой таблицы не должна меняться во все время жизни ExpressionAnalyzer-а.
......@@ -65,7 +88,7 @@ public:
* analyzer.appendOrderBy(chain);
* chain.finalize();
*
* Если указано only_types=true, не выполняет подзапросы в соответствующих частях запроса. Полученные таким
* Если указано only_types = true, не выполняет подзапросы в соответствующих частях запроса. Полученные таким
* образом действия не следует выполнять, они нужны только чтобы получить список столбцов с их типами.
*/
......@@ -94,17 +117,19 @@ public:
/** Множества, для создания которых нужно будет выполнить подзапрос.
* Только множества, нужные для выполнения действий, возвращенных из уже вызванных append* или getActions.
* То есть, нужно вызвать getSubquerySets после всех вызовов append* или getActions и создать все возвращенные множества перед выполнением действий.
* То есть, нужно вызвать getSetsWithSubqueries после всех вызовов append* или getActions
* и создать все возвращенные множества перед выполнением действий.
*/
SubqueriesForSets getSubqueriesForSets();
/** Таблицы, которые надо будет отправить на удалённые серверы при распределённой обработке запроса.
*/
Sets getSetsWithSubqueries();
Joins getJoinsWithSubqueries();
const Tables & getExternalTables() { return external_tables; }
/// Если ast - запрос SELECT, получает имена (алиасы) и типы столбцов из секции SELECT.
Block getSelectSampleBlock();
/// Создаем какие сможем Set из секции In для использования индекса по ним
/// Создаем какие сможем Set из секции IN для использования индекса по ним.
void makeSetsForIndex();
private:
......@@ -132,8 +157,7 @@ private:
NamesAndTypesList aggregation_keys;
AggregateDescriptions aggregate_descriptions;
std::unordered_map<String, SetPtr> sets_with_subqueries;
Joins joins;
SubqueriesForSets subqueries_for_sets;
/// NOTE: Пока поддерживается только один JOIN на запрос.
......@@ -168,7 +192,6 @@ private:
/// Все новые временные таблицы, полученные при выполнении подзапросов GLOBAL IN/JOIN.
Tables external_tables;
std::unordered_map<String, BlockInputStreamPtr> external_data;
size_t external_table_id = 1;
......@@ -200,7 +223,7 @@ private:
/// Превратить перечисление значений или подзапрос в ASTSet. node - функция in или notIn.
void makeSet(ASTFunction * node, const Block & sample_block);
/// Находит глобальные подзапросы в секциях GLOBAL IN/JOIN. Заполняет external_tables, external_data.
/// Находит глобальные подзапросы в секциях GLOBAL IN/JOIN. Заполняет external_tables.
void initGlobalSubqueriesAndExternalTables();
void initGlobalSubqueries(ASTPtr & ast);
......@@ -208,7 +231,7 @@ private:
void findExternalTables(ASTPtr & ast);
/** Инициализировать InterpreterSelectQuery для подзапроса в секции GLOBAL IN/JOIN,
* создать временную таблицу типа Memory и запомнить это в словаре external_tables, external_data.
* создать временную таблицу типа Memory и запомнить это в словаре external_tables.
*/
void addExternalStorage(ASTPtr & subquery_or_table_name);
......
......@@ -101,7 +101,7 @@ private:
void executeLimit( BlockInputStreams & streams);
void executeProjection( BlockInputStreams & streams, ExpressionActionsPtr expression);
void executeDistinct( BlockInputStreams & streams, bool before_order, Names columns);
void executeSubqueriesInSetsAndJoins(BlockInputStreams & streams, const Sets & sets, const Joins & joins);
void executeSubqueriesInSetsAndJoins(BlockInputStreams & streams, SubqueriesForSets & subqueries_for_sets);
ASTPtr query_ptr;
ASTSelectQuery & query;
......
......@@ -62,28 +62,14 @@ public:
: kind(kind_), strictness(strictness_),
key_names_left(key_names_left_),
key_names_right(key_names_right_),
max_bytes_to_transfer(limits.max_bytes_to_transfer),
max_rows_to_transfer(limits.max_rows_to_transfer),
transfer_overflow_mode(limits.transfer_overflow_mode),
bytes_in_external_table(0),
rows_in_external_table(0),
only_external(false),
log(&Logger::get("Join")),
max_rows(limits.max_rows_in_set),
max_bytes(limits.max_bytes_in_set),
overflow_mode(limits.set_overflow_mode)
{
}
bool empty() { return type == Set::EMPTY; }
/** Запомнить поток блоков, чтобы потом его можно было прочитать и создать отображение.
*/
void setSource(BlockInputStreamPtr stream) { source = stream; }
BlockInputStreamPtr getSource() { return source; }
void setExternalOutput(StoragePtr storage) { external_table = storage; }
void setOnlyExternal(bool flag) { only_external = flag; }
bool empty() { return type == Set::EMPTY; }
/** Добавить в отображение для соединения блок "правой" таблицы.
* Возвращает false, если превышено какое-нибудь ограничение, и больше не нужно вставлять.
......@@ -164,17 +150,6 @@ private:
*/
Blocks blocks;
BlockInputStreamPtr source;
/// Информация о внешней таблице, заполняемой этим классом
StoragePtr external_table;
size_t max_bytes_to_transfer;
size_t max_rows_to_transfer;
OverflowMode transfer_overflow_mode;
size_t bytes_in_external_table;
size_t rows_in_external_table;
bool only_external;
MapsAny maps_any;
MapsAll maps_all;
......@@ -182,12 +157,12 @@ private:
Arena pool;
Set::Type type = Set::EMPTY;
bool keys_fit_128_bits;
Sizes key_sizes;
Logger * log;
/// Ограничения на максимальный размер множества
size_t max_rows;
size_t max_bytes;
......@@ -203,12 +178,10 @@ private:
/// Проверить не превышены ли допустимые размеры множества
bool checkSizeLimits() const;
/// Проверить не превышены ли допустимые размеры внешней таблицы для передачи данных
bool checkExternalSizeLimits() const;
/// Считает суммарное число ключей во всех Set'ах
/// Считает суммарное число ключей во всех Join'ах
size_t getTotalRowCount() const;
/// Считает суммарный размер в байтах буфферов всех Set'ов + размер string_pool'а
/// Считает суммарный размер в байтах буфферов всех Join'ов + размер string_pool'а
size_t getTotalByteCount() const;
};
......
......@@ -31,20 +31,14 @@ namespace DB
class Set
{
public:
Set(const Limits & limits)
: max_bytes_to_transfer(limits.max_bytes_to_transfer),
max_rows_to_transfer(limits.max_rows_to_transfer),
transfer_overflow_mode(limits.transfer_overflow_mode),
bytes_in_external_table(0),
rows_in_external_table(0),
only_external(false),
Set(const Limits & limits) :
log(&Logger::get("Set")),
max_rows(limits.max_rows_in_set),
max_bytes(limits.max_bytes_in_set),
overflow_mode(limits.set_overflow_mode)
{
}
bool empty() { return type == EMPTY; }
/** Создать множество по выражению (для перечисления в самом запросе).
......@@ -54,15 +48,6 @@ public:
*/
void createFromAST(DataTypes & types, ASTPtr node, bool create_ordered_set);
/** Запомнить поток блоков (для подзапросов), чтобы потом его можно было прочитать и создать множество.
*/
void setSource(BlockInputStreamPtr stream) { source = stream; }
void setExternalOutput(StoragePtr storage) { external_table = storage; }
void setOnlyExternal(bool flag) { only_external = flag; }
BlockInputStreamPtr getSource() { return source; }
// Возвращает false, если превышено какое-нибудь ограничение, и больше не нужно вставлять.
bool insertFromBlock(Block & block, bool create_ordered_set = false);
......@@ -77,10 +62,10 @@ public:
{
if (!ordered_set_elements)
return "{}";
bool first = true;
std::stringstream ss;
ss << "{";
for (const Field & f : *ordered_set_elements)
{
......@@ -112,17 +97,6 @@ private:
typedef HashSetWithSavedHash<StringRef> SetString;
typedef HashSet<UInt128, UInt128Hash> SetHashed;
BlockInputStreamPtr source;
/// Информация о внешней таблице, заполняемой этим классом
StoragePtr external_table;
size_t max_bytes_to_transfer;
size_t max_rows_to_transfer;
OverflowMode transfer_overflow_mode;
size_t bytes_in_external_table;
size_t rows_in_external_table;
bool only_external;
/// Специализация для случая, когда есть один числовой ключ.
std::unique_ptr<SetUInt64> key64;
......@@ -138,7 +112,7 @@ private:
std::unique_ptr<SetHashed> hashed;
Type type = EMPTY;
bool keys_fit_128_bits;
Sizes key_sizes;
......@@ -146,14 +120,14 @@ private:
* При проверке на принадлежность множеству, типы проверяемых столбцов должны с ними совпадать.
*/
DataTypes data_types;
Logger * log;
/// Ограничения на максимальный размер множества
size_t max_rows;
size_t max_bytes;
OverflowMode overflow_mode;
void init(Type type_)
{
type = type_;
......@@ -173,14 +147,12 @@ private:
/// Если в левой части IN стоит массив. Проверяем, что хоть один элемент массива лежит в множестве.
void executeConstArray(const ColumnConstArray * key_column, ColumnUInt8::Container_t & vec_res, bool negative) const;
void executeArray(const ColumnArray * key_column, ColumnUInt8::Container_t & vec_res, bool negative) const;
/// Если в левой части набор столбцов тех же типов, что элементы множества.
void executeOrdinary(const ConstColumnPlainPtrs & key_columns, ColumnUInt8::Container_t & vec_res, bool negative) const;
/// Проверить не превышены ли допустимые размеры множества ключей
bool checkSetSizeLimits() const;
/// Проверить не превышены ли допустимые размеры внешней таблицы для передачи данных
bool checkExternalSizeLimits() const;
/// Считает суммарное число ключей во всех Set'ах
size_t getTotalRowCount() const;
......
......@@ -15,7 +15,8 @@ class ASTSet : public IAST
public:
SetPtr set;
String column_name;
bool is_explicit = false;
ASTSet(const String & column_name_) : column_name(column_name_) {}
ASTSet(StringRange range_, const String & column_name_) : IAST(range_), column_name(column_name_) {}
String getID() const { return "Set_" + getColumnName(); }
......
......@@ -10,16 +10,10 @@ Block CreatingSetsBlockInputStream::readImpl()
if (!created)
{
for (auto & set : sets)
/// Заполнение временных таблиц идёт первым - потому что эти таблицы могут затем использоваться для создания Set/Join.
for (auto & elem : subqueries_for_sets)
{
createSet(set);
if (isCancelled())
return res;
}
for (auto & join : joins)
{
createJoin(join);
create(elem.second);
if (isCancelled())
return res;
}
......@@ -33,73 +27,117 @@ Block CreatingSetsBlockInputStream::readImpl()
return children.back()->read();
}
void CreatingSetsBlockInputStream::createSet(SetPtr & set)
void CreatingSetsBlockInputStream::create(SubqueryForSet & subquery)
{
LOG_TRACE(log, "Creating set");
LOG_TRACE(log, (subquery.set ? "Creating set. " : "")
<< (subquery.join ? "Creating join. " : "")
<< (subquery.table ? "Filling temporary table. " : ""));
Stopwatch watch;
while (Block block = set->getSource()->read())
BlockOutputStreamPtr table_out;
if (subquery.table)
table_out = subquery.table->write(ASTPtr());
bool done_with_set = !subquery.set;
bool done_with_join = !subquery.join;
bool done_with_table = !subquery.table;
if (done_with_set && done_with_join && done_with_table)
throw Exception("Logical error: nothing to do with subquery", ErrorCodes::LOGICAL_ERROR);
subquery.source->readPrefix();
if (table_out)
table_out->writePrefix();
while (Block block = subquery.source->read())
{
if (isCancelled())
{
LOG_DEBUG(log, "Query was cancelled during set creation");
LOG_DEBUG(log, "Query was cancelled during set / join or temporary table creation.");
return;
}
if (!set->insertFromBlock(block))
if (!done_with_set)
{
if (IProfilingBlockInputStream * profiling_in = dynamic_cast<IProfilingBlockInputStream *>(&*set->getSource()))
profiling_in->cancel();
break;
if (!subquery.set->insertFromBlock(block))
done_with_set = true;
}
}
logProfileInfo(watch, *set->getSource(), set->size());
set->setSource(nullptr);
}
void CreatingSetsBlockInputStream::createJoin(JoinPtr & join)
{
LOG_TRACE(log, "Creating join");
Stopwatch watch;
if (!done_with_join)
{
if (!subquery.join->insertFromBlock(block))
done_with_join = true;
}
while (Block block = join->getSource()->read())
{
if (isCancelled())
if (!done_with_table)
{
LOG_DEBUG(log, "Query was cancelled during join creation");
return;
table_out->write(block);
rows_to_transfer += block.rows();
bytes_to_transfer += block.bytes();
if ((max_rows_to_transfer && rows_to_transfer > max_rows_to_transfer)
|| (max_bytes_to_transfer && bytes_to_transfer > max_bytes_to_transfer))
{
if (transfer_overflow_mode == OverflowMode::THROW)
throw Exception("IN/JOIN external table size limit exceeded."
" Rows: " + toString(rows_to_transfer)
+ ", limit: " + toString(max_rows_to_transfer)
+ ". Bytes: " + toString(bytes_to_transfer)
+ ", limit: " + toString(max_bytes_to_transfer) + ".",
ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
if (transfer_overflow_mode == OverflowMode::BREAK)
done_with_table = true;
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
}
if (!join->insertFromBlock(block))
if (done_with_set && done_with_join && done_with_table)
{
if (IProfilingBlockInputStream * profiling_in = dynamic_cast<IProfilingBlockInputStream *>(&*join->getSource()))
if (IProfilingBlockInputStream * profiling_in = dynamic_cast<IProfilingBlockInputStream *>(&*subquery.source))
profiling_in->cancel();
break;
}
}
logProfileInfo(watch, *join->getSource(), join->size());
join->setSource(nullptr);
}
subquery.source->readSuffix();
if (table_out)
table_out->writeSuffix();
void CreatingSetsBlockInputStream::logProfileInfo(Stopwatch & watch, IBlockInputStream & in, size_t entries)
{
/// Выведем информацию о том, сколько считано строк и байт.
size_t rows = 0;
size_t bytes = 0;
in.getLeafRowsBytes(rows, bytes);
subquery.source->getLeafRowsBytes(rows, bytes);
size_t head_rows = 0;
if (IProfilingBlockInputStream * profiling_in = dynamic_cast<IProfilingBlockInputStream *>(&in))
if (IProfilingBlockInputStream * profiling_in = dynamic_cast<IProfilingBlockInputStream *>(&*subquery.source))
head_rows = profiling_in->getInfo().rows;
if (rows != 0)
{
LOG_DEBUG(log, std::fixed << std::setprecision(3)
<< "Created with " << entries << " entries from " << head_rows << " rows."
<< " Read " << rows << " rows, " << bytes / 1048576.0 << " MiB in " << watch.elapsedSeconds() << " sec., "
<< static_cast<size_t>(rows / watch.elapsedSeconds()) << " rows/sec., " << bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.");
std::stringstream msg;
msg << std::fixed << std::setprecision(3);
msg << "Created. ";
if (subquery.set)
msg << "Set with " << subquery.set->size() << " entries from " << head_rows << " rows.";
if (subquery.join)
msg << "Join with " << subquery.join->size() << " entries from " << head_rows << " rows.";
if (subquery.table)
msg << "Table with " << head_rows << " rows. ";
msg << " Read " << rows << " rows, " << bytes / 1048576.0 << " MiB in " << watch.elapsedSeconds() << " sec., "
<< static_cast<size_t>(rows / watch.elapsedSeconds()) << " rows/sec., " << bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.";
LOG_DEBUG(log, msg.rdbuf());
}
else
{
LOG_DEBUG(log, "Subquery has empty result.");
}
}
......
......@@ -56,7 +56,7 @@ void ExpressionAnalyzer::init()
/// has_aggregation, aggregation_keys, aggregate_descriptions, aggregated_columns.
analyzeAggregation();
/// external_tables, external_data.
/// external_tables, subqueries_for_sets для глобальных подзапросов.
/// Заменяет глобальные подзапросы на сгенерированные имена временных таблиц, которые будут отправлены на удалённые серверы.
initGlobalSubqueriesAndExternalTables();
}
......@@ -432,14 +432,9 @@ void ExpressionAnalyzer::makeSetsForIndexImpl(ASTPtr & node, const Block & sampl
}
void ExpressionAnalyzer::addExternalStorage(ASTPtr & subquery_or_table_name)
static SharedPtr<InterpreterSelectQuery> interpretSubquery(
ASTPtr & subquery_or_table_name, const Context & context, size_t subquery_depth, const Names & required_columns = Names())
{
/// Сгенерируем имя для внешней таблицы.
while (context.tryGetExternalTable("_data" + toString(external_table_id)))
++external_table_id;
StoragePtr external_storage;
/// Подзапрос или имя таблицы. Имя таблицы аналогично подзапросу SELECT * FROM t.
const ASTSubquery * subquery = typeid_cast<const ASTSubquery *>(&*subquery_or_table_name);
const ASTIdentifier * table = typeid_cast<const ASTIdentifier *>(&*subquery_or_table_name);
......@@ -464,24 +459,13 @@ void ExpressionAnalyzer::addExternalStorage(ASTPtr & subquery_or_table_name)
ASTPtr query;
if (table)
{
ParserSelectQuery parser;
StoragePtr existing_storage;
/// Если это уже внешняя таблица, ничего заполять не нужно. Просто запоминаем ее наличие.
if ((existing_storage = context.tryGetExternalTable(table->name)))
{
external_tables[table->name] = existing_storage;
return;
}
String query_str = "SELECT * FROM " + table->name;
String query_str = "SELECT * FROM " + backQuoteIfNeed(table->name);
const char * begin = query_str.data();
const char * end = begin + query_str.size();
const char * pos = begin;
Expected expected = "";
bool parse_res = parser.parse(pos, end, query, expected);
bool parse_res = ParserSelectQuery().parse(pos, end, query, expected);
if (!parse_res)
throw Exception("Error in parsing SELECT query while creating set or join for table " + table->name + ".",
ErrorCodes::LOGICAL_ERROR);
......@@ -489,28 +473,54 @@ void ExpressionAnalyzer::addExternalStorage(ASTPtr & subquery_or_table_name)
else
query = subquery->children.at(0);
InterpreterSelectQuery interpreter(query, subquery_context, QueryProcessingStage::Complete, subquery_depth + 1);
if (required_columns.empty())
return new InterpreterSelectQuery(query, subquery_context, QueryProcessingStage::Complete, subquery_depth + 1);
else
return new InterpreterSelectQuery(query, subquery_context, required_columns, QueryProcessingStage::Complete, subquery_depth + 1);
}
Block sample = interpreter.getSampleBlock();
void ExpressionAnalyzer::addExternalStorage(ASTPtr & subquery_or_table_name)
{
/// Сгенерируем имя для внешней таблицы.
while (context.tryGetExternalTable("_data" + toString(external_table_id)))
++external_table_id;
if (const ASTIdentifier * table = typeid_cast<const ASTIdentifier *>(&*subquery_or_table_name))
{
/// Если это уже внешняя таблица, ничего заполять не нужно. Просто запоминаем ее наличие.
if (StoragePtr existing_storage = context.tryGetExternalTable(table->name))
{
external_tables[table->name] = existing_storage;
return;
}
}
SharedPtr<InterpreterSelectQuery> interpreter = interpretSubquery(subquery_or_table_name, context, subquery_depth);
Block sample = interpreter->getSampleBlock();
NamesAndTypesListPtr columns = new NamesAndTypesList(sample.getColumnsList());
String external_table_name = "_data" + toString(external_table_id);
++external_table_id;
external_storage = StorageMemory::create(external_table_name, columns);
ASTIdentifier * ast_ident = new ASTIdentifier;
ast_ident->kind = ASTIdentifier::Table;
ast_ident->name = external_storage->getTableName();
subquery_or_table_name = ast_ident;
/** Заменяем подзапрос на имя временной таблицы.
* Именно в таком виде, запрос отправится на удалённый сервер.
* На удалённый сервер отправится эта временная таблица, и на его стороне,
* вместо выполнения подзапроса, надо будет просто из неё прочитать.
*/
subquery_or_table_name = new ASTIdentifier(StringRange(), external_table_name, ASTIdentifier::Table);
StoragePtr external_storage = StorageMemory::create(external_table_name, columns);
external_tables[external_table_name] = external_storage;
external_data[external_table_name] = interpreter.execute();
/// Добавляем множество, при обработке которого будет заполнена внешняя таблица. // TODO JOIN
SetPtr set = new Set(settings.limits);
set->setSource(external_data[external_table_name]);
set->setExternalOutput(external_tables[external_table_name]);
set->setOnlyExternal(true);
sets_with_subqueries["external_" + subquery_or_table_name->getColumnName()] = set;
subqueries_for_sets[external_table_name].source = interpreter->execute();
/** NOTE Если было написано IN tmp_table - существующая временная (но не внешняя) таблица,
* то здесь будет создана новая временная таблица (например, _data1),
* и данные будут затем в неё скопированы.
* Может быть, этого можно избежать.
*/
}
......@@ -523,85 +533,32 @@ void ExpressionAnalyzer::makeSet(ASTFunction * node, const Block & sample_block)
IAST & args = *node->arguments;
ASTPtr & arg = args.children.at(1);
/// Уже преобразовали.
if (typeid_cast<ASTSet *>(&*arg))
return;
/// Если подзапрос или имя таблицы для SELECT.
if (typeid_cast<ASTSubquery *>(&*arg) || typeid_cast<ASTIdentifier *>(&*arg))
{
/// Получаем поток блоков для подзапроса, отдаем его множеству, и кладём это множество на место подзапроса.
ASTSet * ast_set = new ASTSet(arg->getColumnName());
/// Получаем поток блоков для подзапроса. Создаём Set и кладём на место подзапроса.
String set_id = arg->getColumnName();
ASTSet * ast_set = new ASTSet(set_id);
ASTPtr ast_set_ptr = ast_set;
String set_id = ast_set->getColumnName();
/// Удаляем множество, которое могло быть создано, чтобы заполнить внешнюю таблицу
/// Вместо него будет добавлено множество, так же заполняющее себя и помогающее отвечать на зарос.
sets_with_subqueries.erase("external_" + set_id);
SubqueryForSet & subquery_for_set = subqueries_for_sets[set_id];
if (sets_with_subqueries.count(set_id))
/// Если уже создали Set с таким же подзапросом.
if (subquery_for_set.set)
{
ast_set->set = sets_with_subqueries[set_id];
ast_set->set = subquery_for_set.set;
arg = ast_set_ptr;
return;
}
else
{
ast_set->set = new Set(settings.limits);
ASTPtr subquery;
bool external = false;
/** В правой части IN-а может стоять подзапрос или имя таблицы.
* Во втором случае, это эквивалентно подзапросу (SELECT * FROM t).
*/
if (ASTIdentifier * table = typeid_cast<ASTIdentifier *>(&*arg))
{
if (external_data.count(table->name))
{
external = true;
ast_set->set->setExternalOutput(external_tables[table->name]);
ast_set->set->setSource(external_data[table->name]);
}
else
{
ParserSelectQuery parser;
String query = "SELECT * FROM " + table->name;
const char * begin = query.data();
const char * end = begin + query.size();
const char * pos = begin;
Expected expected = "";
bool parse_res = parser.parse(pos, end, subquery, expected);
if (!parse_res)
throw Exception("Error in parsing select query while creating set for table " + table->name + ".",
ErrorCodes::LOGICAL_ERROR);
}
}
else
subquery = arg->children.at(0);
/// Если чтение из внешней таблицы, то источник данных уже вычислен.
if (!external)
{
/** Для подзапроса в секции IN не действуют ограничения на максимальный размер результата.
* Так как результат этого поздапроса - ещё не результат всего запроса.
* Вместо этого работают ограничения max_rows_in_set, max_bytes_in_set, set_overflow_mode.
*/
Context subquery_context = context;
Settings subquery_settings = context.getSettings();
subquery_settings.limits.max_result_rows = 0;
subquery_settings.limits.max_result_bytes = 0;
/// Вычисление extremes не имеет смысла и не нужно (если его делать, то в результате всего запроса могут взяться extremes подзапроса).
subquery_settings.extremes = 0;
subquery_context.setSettings(subquery_settings);
InterpreterSelectQuery interpreter(subquery, subquery_context, QueryProcessingStage::Complete, subquery_depth + 1);
ast_set->set->setSource(interpreter.execute());
}
sets_with_subqueries[set_id] = ast_set->set;
}
ast_set->set = new Set(settings.limits);
subquery_for_set.source = interpretSubquery(arg, context, subquery_depth)->execute();
subquery_for_set.set = ast_set->set;
arg = ast_set_ptr;
}
else
......@@ -675,6 +632,7 @@ void ExpressionAnalyzer::makeExplicitSet(ASTFunction * node, const Block & sampl
ASTSet * ast_set = new ASTSet(arg->getColumnName());
ASTPtr ast_set_ptr = ast_set;
ast_set->set = new Set(settings.limits);
ast_set->is_explicit = true;
ast_set->set->createFromAST(set_element_types, elements_ast, create_ordered_set);
arg = ast_set_ptr;
}
......@@ -1000,7 +958,7 @@ void ExpressionAnalyzer::getActionsImpl(ASTPtr ast, bool no_subqueries, bool onl
/// Если аргумент - множество, заданное перечислением значений, дадим ему уникальное имя,
/// чтобы множества с одинаковой записью не склеивались (у них может быть разный тип).
if (!set->set->getSource())
if (set->is_explicit)
column.name = getUniqueName(actions_stack.getSampleBlock(), "__set");
else
column.name = set->getColumnName();
......@@ -1248,7 +1206,9 @@ bool ExpressionAnalyzer::appendArrayJoin(ExpressionActionsChain & chain, bool on
void ExpressionAnalyzer::addJoinAction(ExpressionActionsPtr & actions, bool only_types)
{
actions->add(ExpressionAction::ordinaryJoin(only_types ? nullptr : joins[0], columns_added_by_join));
for (auto & subquery_for_set : subqueries_for_sets)
if (subquery_for_set.second.join)
actions->add(ExpressionAction::ordinaryJoin(only_types ? nullptr : subquery_for_set.second.join, columns_added_by_join));
}
bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_types)
......@@ -1264,24 +1224,16 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
ASTJoin & ast_join = typeid_cast<ASTJoin &>(*select_query->join);
getRootActions(ast_join.using_expr_list, only_types, false, step.actions);
/// Не поддерживается два JOIN-а с одинаковым подзапросом, но разными USING-ами.
String join_id = ast_join.table->getColumnName();
SubqueryForSet & subquery_for_set = subqueries_for_sets[join_id];
if (!subquery_for_set.join)
{
Names join_key_names_left(join_key_names_left_set.begin(), join_key_names_left_set.end());
Names join_key_names_right(join_key_names_right_set.begin(), join_key_names_right_set.end());
JoinPtr join = new Join(join_key_names_left, join_key_names_right, settings.limits, ast_join.kind, ast_join.strictness);
/** Для подзапроса в секции JOIN не действуют ограничения на максимальный размер результата.
* Так как результат этого поздапроса - ещё не результат всего запроса.
* Вместо этого работают ограничения max_rows_in_set, max_bytes_in_set, set_overflow_mode.
* TODO: отдельные ограничения для JOIN.
*/
Context subquery_context = context;
Settings subquery_settings = context.getSettings();
subquery_settings.limits.max_result_rows = 0;
subquery_settings.limits.max_result_bytes = 0;
/// Вычисление extremes не имеет смысла и не нужно (если его делать, то в результате всего запроса могут взяться extremes подзапроса).
subquery_settings.extremes = 0;
subquery_context.setSettings(subquery_settings);
for (const auto & name_type : columns_added_by_join)
std::cerr << "! Column added by JOIN: " << name_type.first << std::endl;
......@@ -1289,16 +1241,8 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
for (const auto & name_type : columns_added_by_join)
required_joined_columns.push_back(name_type.first);
/// TODO: поддержка идентификаторов вместо подзапросов.
InterpreterSelectQuery interpreter(
typeid_cast<ASTJoin &>(*select_query->join).table->children.at(0), subquery_context,
required_joined_columns,
QueryProcessingStage::Complete, subquery_depth + 1);
Block right_table_sample = interpreter.getSampleBlock();
join->setSource(interpreter.execute());
joins.push_back(join);
subquery_for_set.source = interpretSubquery(ast_join.table, context, subquery_depth, required_joined_columns)->execute();
subquery_for_set.join = join;
}
addJoinAction(step.actions, false);
......@@ -1443,20 +1387,6 @@ void ExpressionAnalyzer::appendProjectResult(DB::ExpressionActionsChain & chain,
}
Sets ExpressionAnalyzer::getSetsWithSubqueries()
{
Sets res;
for (auto & s : sets_with_subqueries)
res.push_back(s.second);
return res;
}
Joins ExpressionAnalyzer::getJoinsWithSubqueries()
{
return joins;
}
Block ExpressionAnalyzer::getSelectSampleBlock()
{
assertSelect();
......
......@@ -376,10 +376,9 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
executeUnion(streams);
Sets sets_with_subqueries = query_analyzer->getSetsWithSubqueries();
Joins joins_with_subqueries = query_analyzer->getJoinsWithSubqueries();
if (!sets_with_subqueries.empty() || !joins_with_subqueries.empty())
executeSubqueriesInSetsAndJoins(streams, sets_with_subqueries, joins_with_subqueries);
SubqueriesForSets subqueries_for_sets = query_analyzer->getSubqueriesForSets();
if (!subqueries_for_sets.empty())
executeSubqueriesInSetsAndJoins(streams, subqueries_for_sets);
/// Ограничения на результат, квота на результат, а также колбек для прогресса.
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(&*streams[0]))
......@@ -501,16 +500,20 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInpu
QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns;
query_analyzer->makeSetsForIndex();
/// Инициализируем изначальные потоки данных, на которые накладываются преобразования запроса. Таблица или подзапрос?
if (!interpreter_subquery)
{
/** При распределённой обработке запроса, на все удалённые серверы отправляются временные таблицы,
* полученные из глобальных подзапросов - GLOBAL IN/JOIN.
*/
if (storage->isRemote())
storage->storeExternalTables(query_analyzer->getExternalTables());
streams = storage->read(required_columns, query_ptr, settings_for_storage, from_stage, settings.max_block_size, settings.max_threads);
for (auto stream : streams)
{
streams = storage->read(required_columns, query_ptr, settings_for_storage, from_stage, settings.max_block_size, settings.max_threads);
for (auto & stream : streams)
stream->addTableLock(table_lock);
}
}
else
{
......@@ -809,9 +812,9 @@ void InterpreterSelectQuery::executeLimit(BlockInputStreams & streams)
}
void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(BlockInputStreams & streams, const Sets & sets, const Joins & joins)
void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(BlockInputStreams & streams, SubqueriesForSets & subqueries_for_sets)
{
streams[0] = new CreatingSetsBlockInputStream(streams[0], sets, joins);
streams[0] = new CreatingSetsBlockInputStream(streams[0], subqueries_for_sets, settings.limits);
}
......
......@@ -92,16 +92,6 @@ bool Join::checkSizeLimits() const
}
bool Join::checkExternalSizeLimits() const
{
if (max_rows_to_transfer && rows_in_external_table > max_rows_to_transfer)
return false;
if (max_bytes_to_transfer && bytes_in_external_table > max_bytes_to_transfer)
return false;
return true;
}
/// Вставка элемента в хэш-таблицу вида ключ -> ссылка на строку, которая затем будет использоваться при JOIN-е.
template <ASTJoin::Strictness STRICTNESS, typename Map>
struct Inserter
......@@ -271,33 +261,6 @@ void Join::insertFromBlockImpl(Maps & maps, size_t rows, const ConstColumnPlainP
bool Join::insertFromBlock(const Block & block)
{
if (external_table)
{
BlockOutputStreamPtr output = external_table->write(ASTPtr());
output->write(block);
bytes_in_external_table += block.bytes();
rows_in_external_table += block.rows();
if (!checkExternalSizeLimits())
{
if (transfer_overflow_mode == OverflowMode::THROW)
throw Exception("JOIN external table size limit exceeded."
" Rows: " + toString(rows_in_external_table) +
", limit: " + toString(max_rows_to_transfer) +
". Bytes: " + toString(bytes_in_external_table) +
", limit: " + toString(max_bytes_to_transfer) + ".",
ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
if (transfer_overflow_mode == OverflowMode::BREAK)
return false;
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
}
if (only_external)
return true;
size_t keys_size = key_names_right.size();
ConstColumnPlainPtrs key_columns(keys_size);
......
......@@ -58,16 +58,6 @@ bool Set::checkSetSizeLimits() const
}
bool Set::checkExternalSizeLimits() const
{
if (max_rows_to_transfer && rows_in_external_table > max_rows_to_transfer)
return false;
if (max_bytes_to_transfer && bytes_in_external_table > max_bytes_to_transfer)
return false;
return true;
}
Set::Type Set::chooseMethod(const ConstColumnPlainPtrs & key_columns, bool & keys_fit_128_bits, Sizes & key_sizes)
{
size_t keys_size = key_columns.size();
......@@ -105,33 +95,6 @@ Set::Type Set::chooseMethod(const ConstColumnPlainPtrs & key_columns, bool & key
bool Set::insertFromBlock(Block & block, bool create_ordered_set)
{
if (external_table)
{
BlockOutputStreamPtr output = external_table->write(ASTPtr());
output->write(block);
bytes_in_external_table += block.bytes();
rows_in_external_table += block.rows();
if (!checkExternalSizeLimits())
{
if (transfer_overflow_mode == OverflowMode::THROW)
throw Exception("IN external table size limit exceeded."
" Rows: " + toString(rows_in_external_table) +
", limit: " + toString(max_rows_to_transfer) +
". Bytes: " + toString(bytes_in_external_table) +
", limit: " + toString(max_bytes_to_transfer) + ".",
ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
if (transfer_overflow_mode == OverflowMode::BREAK)
return false;
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
}
if (only_external)
return true;
size_t keys_size = block.columns();
ConstColumnPlainPtrs key_columns(keys_size);
data_types.resize(keys_size);
......@@ -311,9 +274,6 @@ void Set::createFromAST(DataTypes & types, ASTPtr node, bool create_ordered_set)
void Set::execute(Block & block, const ColumnNumbers & arguments, size_t result, bool negative) const
{
if (source)
throw Exception("Using uninitialized set.", ErrorCodes::LOGICAL_ERROR);
ColumnUInt8 * c_res = new ColumnUInt8;
block.getByPosition(result).column = c_res;
ColumnUInt8::Container_t & vec_res = c_res->getData();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册