提交 42abc4ce 编写于 作者: A Alexey Milovidov

dbms: development [#CONV-2944].

上级 fd8556bf
......@@ -3,3 +3,4 @@
#define DEFAULT_BLOCK_SIZE 1048576
#define DEFAULT_MAX_QUERY_SIZE 1048576
#define SHOW_CHARS_ON_SYNTAX_ERROR 160L
#define DEFAULT_MAX_THREADS 8
......@@ -97,6 +97,9 @@ namespace ErrorCodes
UNKNOWN_COMPRESSION_METHOD,
EMPTY_LIST_OF_COLUMNS_PASSED,
SIZES_OF_MARKS_FILES_ARE_INCONSISTENT,
EMPTY_DATA_PASSED,
UNKNOWN_AGGREGATED_DATA_VARIANT,
CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,
......
#pragma once
#include <vector>
#include <Poco/Exception.h>
#include <Poco/SharedPtr.h>
#include <DB/Core/StackTrace.h>
......@@ -28,4 +32,9 @@ private:
StackTrace trace;
};
using Poco::SharedPtr;
typedef SharedPtr<Exception> ExceptionPtr;
typedef std::vector<ExceptionPtr> Exceptions;
}
#pragma once
#include <statdaemons/threadpool.hpp>
#include <DB/Interpreters/Aggregator.h>
#include <DB/Interpreters/Expression.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
namespace DB
{
using Poco::SharedPtr;
/** Агрегирует несколько источников параллельно.
* Запускает агрегацию отдельных источников в отдельных потоках, затем объединяет результаты.
* Агрегатные функции не финализируются, то есть, не заменяются на своё значение, а содержат промежуточное состояние вычислений.
* Это необходимо, чтобы можно было продолжить агрегацию (например, объединяя потоки частично агрегированных данных).
*/
class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream
{
public:
ParallelAggregatingBlockInputStream(BlockInputStreams inputs_, const ColumnNumbers & keys_, AggregateDescriptions & aggregates_, unsigned max_threads_ = 1)
: inputs(inputs_), aggregator(new Aggregator(keys_, aggregates_)), has_been_read(false), max_threads(max_threads_), pool(max_threads)
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
}
/** keys берутся из GROUP BY части запроса
* Агрегатные функции ищутся везде в выражении.
* Столбцы, соответствующие keys и аргументам агрегатных функций, уже должны быть вычислены.
*/
ParallelAggregatingBlockInputStream(BlockInputStreams inputs_, SharedPtr<Expression> expression, unsigned max_threads_ = 1)
: inputs(inputs_), has_been_read(false), max_threads(max_threads_), pool(max_threads)
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
Names key_names;
AggregateDescriptions aggregates;
expression->getAggregateInfo(key_names, aggregates);
aggregator = new Aggregator(key_names, aggregates);
}
Block readImpl()
{
if (has_been_read)
return Block();
has_been_read = true;
ManyAggregatedDataVariants many_data(inputs.size());
Exceptions exceptions(inputs.size());
for (size_t i = 0, size = many_data.size(); i < size; ++i)
{
many_data[i] = new AggregatedDataVariants;
pool.schedule(boost::bind(&ParallelAggregatingBlockInputStream::calculate, this, boost::ref(inputs[i]), boost::ref(*many_data[i]), boost::ref(exceptions[i])));
}
pool.wait();
AggregatedDataVariantsPtr res = aggregator->merge(many_data);
return aggregator->convertToBlock(*res);
}
String getName() const { return "ParallelAggregatingBlockInputStream"; }
BlockInputStreamPtr clone() { return new ParallelAggregatingBlockInputStream(*this); }
private:
ParallelAggregatingBlockInputStream(const ParallelAggregatingBlockInputStream & src)
: inputs(src.inputs), aggregator(src.aggregator), has_been_read(src.has_been_read) {}
BlockInputStreams inputs;
SharedPtr<Aggregator> aggregator;
bool has_been_read;
size_t max_threads;
boost::threadpool::pool pool;
/// Вычисления, которые выполняться в отдельном потоке
void calculate(BlockInputStreamPtr & input, AggregatedDataVariants & data, ExceptionPtr & exception)
{
try
{
aggregator->execute(input, data);
}
catch (const Exception & e)
{
exception = new Exception(e);
}
catch (const Poco::Exception & e)
{
exception = new Exception(e.message(), ErrorCodes::POCO_EXCEPTION);
}
catch (const std::exception & e)
{
exception = new Exception(e.what(), ErrorCodes::STD_EXCEPTION);
}
catch (...)
{
exception = new Exception("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
}
}
};
}
#pragma once
#include <DB/IO/ReadBuffer.h>
namespace DB
{
/** Позволяет считать из другого ReadBuffer не более указанного количества байт.
*/
class LimitReadBuffer : public ReadBuffer
{
private:
ReadBuffer & in;
size_t limit;
bool nextImpl()
{
if (count() >= limit || !in.next())
return false;
working_buffer = in.buffer();
if (limit - count() < working_buffer.size())
working_buffer.resize(limit - count());
return true;
}
public:
LimitReadBuffer(ReadBuffer & in_, size_t limit_) : ReadBuffer(NULL, 0), in(in_), limit(limit_) {}
};
}
......@@ -88,8 +88,21 @@ struct AggregatedDataVariants
* (При этом, строки, содержащие нули посередине, могут склеиться.)
*/
AggregatedDataHashed hashed;
enum Type
{
GENERIC,
WITHOUT_KEY,
KEY_64,
KEY_STRING,
HASHED,
};
Type type;
};
typedef SharedPtr<AggregatedDataVariants> AggregatedDataVariantsPtr;
typedef std::vector<AggregatedDataVariantsPtr> ManyAggregatedDataVariants;
/** Агрегирует поток блоков.
*/
......@@ -99,11 +112,18 @@ public:
Aggregator(const ColumnNumbers & keys_, AggregateDescriptions & aggregates_) : keys(keys_), aggregates(aggregates_) {};
Aggregator(const Names & key_names_, AggregateDescriptions & aggregates_) : key_names(key_names_), aggregates(aggregates_) {};
/// Агрегировать поток. Получить результат в виде одной из структур данных.
void execute(BlockInputStreamPtr stream, AggregatedDataVariants & result);
/// Получить пример блока, описывающего результат. Следует вызывать только после execute.
Block getSampleBlock() { return sample; }
/// Преобразовать структуру данных агрегации в блок.
Block convertToBlock(AggregatedDataVariants & data_variants);
/// Объединить несколько структуру данных агрегации в одну. (В первый элемент массива.) Все варианты агрегации должны быть одинаковыми!
AggregatedDataVariantsPtr merge(ManyAggregatedDataVariants & data_variants);
private:
ColumnNumbers keys;
Names key_names;
......
......@@ -13,7 +13,7 @@ namespace DB
class InterpreterCreateQuery
{
public:
InterpreterCreateQuery(ASTPtr query_ptr_, Context & context_, size_t max_block_size_ = DEFAULT_BLOCK_SIZE);
InterpreterCreateQuery(ASTPtr query_ptr_, Context & context_, size_t max_threads_ = DEFAULT_MAX_THREADS, size_t max_block_size_ = DEFAULT_BLOCK_SIZE);
/** В случае таблицы: добавляет созданную таблицу в контекст, а также возвращает её.
* В случае БД: добавляет созданную БД в контекст и возвращает NULL.
......@@ -23,6 +23,7 @@ public:
private:
ASTPtr query_ptr;
Context context;
size_t max_threads;
size_t max_block_size;
};
......
......@@ -12,7 +12,7 @@ namespace DB
class InterpreterInsertQuery
{
public:
InterpreterInsertQuery(ASTPtr query_ptr_, Context & context_, size_t max_block_size_ = DEFAULT_BLOCK_SIZE);
InterpreterInsertQuery(ASTPtr query_ptr_, Context & context_, size_t max_threads_ = DEFAULT_MAX_THREADS, size_t max_block_size_ = DEFAULT_BLOCK_SIZE);
/** Выполнить запрос.
* remaining_data_istr, если не NULL, может содержать нераспарсенные данные для вставки.
......@@ -25,6 +25,7 @@ private:
ASTPtr query_ptr;
Context context;
size_t max_threads;
size_t max_block_size;
};
......
......@@ -12,7 +12,7 @@ namespace DB
class InterpreterQuery
{
public:
InterpreterQuery(ASTPtr query_ptr_, Context & context_, size_t max_block_size_ = DEFAULT_BLOCK_SIZE);
InterpreterQuery(ASTPtr query_ptr_, Context & context_, size_t max_threads_ = DEFAULT_MAX_THREADS, size_t max_block_size_ = DEFAULT_BLOCK_SIZE);
/** Выполнить запрос.
*
......@@ -31,6 +31,7 @@ public:
private:
ASTPtr query_ptr;
Context context;
size_t max_threads;
size_t max_block_size;
};
......
......@@ -13,7 +13,7 @@ namespace DB
class InterpreterSelectQuery
{
public:
InterpreterSelectQuery(ASTPtr query_ptr_, Context & context_, size_t max_block_size_ = DEFAULT_BLOCK_SIZE);
InterpreterSelectQuery(ASTPtr query_ptr_, Context & context_, size_t max_threads_ = DEFAULT_MAX_THREADS, size_t max_block_size_ = DEFAULT_BLOCK_SIZE);
/// Выполнить запрос, получить поток блоков для чтения
BlockInputStreamPtr execute();
......@@ -51,6 +51,7 @@ private:
ASTPtr query_ptr;
Context context;
size_t max_threads;
size_t max_block_size;
};
......
......@@ -16,6 +16,7 @@ void executeQuery(
Context & context, /// БД, таблицы, типы данных, движки таблиц, функции, агрегатные функции...
BlockInputStreamPtr & query_plan, /// Сюда может быть записано описание, как выполнялся запрос
size_t max_query_size = DEFAULT_MAX_QUERY_SIZE, /// Какую часть запроса можно прочитать в оперативку для парсинга (оставшиеся данные для INSERT, если есть, считываются позже)
size_t max_threads = DEFAULT_MAX_THREADS, /// Максимальное количество потоков выполнения запроса
size_t max_block_size = DEFAULT_BLOCK_SIZE); /// Максимальный размер блока при чтении или вставке данных
}
......@@ -29,91 +29,7 @@ Block AggregatingBlockInputStream::readImpl()
AggregatedDataVariants data_variants;
aggregator->execute(input, data_variants);
Block res = aggregator->getSampleBlock();
size_t rows = 0;
/// В какой структуре данных агрегированы данные?
if (!data_variants.without_key.empty())
{
AggregatedDataWithoutKey & data = data_variants.without_key;
rows = 1;
size_t i = 0;
for (AggregateFunctionsPlainPtrs::const_iterator jt = data.begin(); jt != data.end(); ++jt, ++i)
res.getByPosition(i).column->insert(*jt);
}
else if (!data_variants.key64.empty())
{
AggregatedDataWithUInt64Key & data = data_variants.key64;
rows = data.size();
IColumn & first_column = *res.getByPosition(0).column;
bool is_signed = dynamic_cast<ColumnInt8 *>(&first_column) || dynamic_cast<ColumnInt16 *>(&first_column)
|| dynamic_cast<ColumnInt32 *>(&first_column) || dynamic_cast<ColumnInt64 *>(&first_column);
for (AggregatedDataWithUInt64Key::const_iterator it = data.begin(); it != data.end(); ++it)
{
if (is_signed)
first_column.insert(static_cast<Int64>(it->first));
else
first_column.insert(it->first);
size_t i = 1;
for (AggregateFunctionsPlainPtrs::const_iterator jt = it->second.begin(); jt != it->second.end(); ++jt, ++i)
res.getByPosition(i).column->insert(*jt);
}
}
else if (!data_variants.key_string.empty())
{
AggregatedDataWithStringKey & data = data_variants.key_string;
rows = data.size();
IColumn & first_column = *res.getByPosition(0).column;
for (AggregatedDataWithStringKey::const_iterator it = data.begin(); it != data.end(); ++it)
{
first_column.insert(it->first);
size_t i = 1;
for (AggregateFunctionsPlainPtrs::const_iterator jt = it->second.begin(); jt != it->second.end(); ++jt, ++i)
res.getByPosition(i).column->insert(*jt);
}
}
else if (!data_variants.hashed.empty())
{
AggregatedDataHashed & data = data_variants.hashed;
rows = data.size();
for (AggregatedDataHashed::const_iterator it = data.begin(); it != data.end(); ++it)
{
size_t i = 0;
for (Row::const_iterator jt = it->second.first.begin(); jt != it->second.first.end(); ++jt, ++i)
res.getByPosition(i).column->insert(*jt);
for (AggregateFunctionsPlainPtrs::const_iterator jt = it->second.second.begin(); jt != it->second.second.end(); ++jt, ++i)
res.getByPosition(i).column->insert(*jt);
}
}
else
{
AggregatedData & data = data_variants.generic;
rows = data.size();
for (AggregatedData::const_iterator it = data.begin(); it != data.end(); ++it)
{
size_t i = 0;
for (Row::const_iterator jt = it->first.begin(); jt != it->first.end(); ++jt, ++i)
res.getByPosition(i).column->insert(*jt);
for (AggregateFunctionsPlainPtrs::const_iterator jt = it->second.begin(); jt != it->second.end(); ++jt, ++i)
res.getByPosition(i).column->insert(*jt);
}
}
/// Изменяем размер столбцов-констант в блоке.
size_t columns = res.columns();
for (size_t i = 0; i < columns; ++i)
if (res.getByPosition(i).column->isConst())
res.getByPosition(i).column->cut(0, rows);
return res;
return aggregator->convertToBlock(data_variants);
}
......
......@@ -201,6 +201,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
if (keys_size == 0)
{
/// Если ключей нет
result.type = AggregatedDataVariants::WITHOUT_KEY;
AggregatedDataWithoutKey & res = result.without_key;
if (res.empty())
{
......@@ -225,6 +226,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
&& !dynamic_cast<ColumnFloat32 *>(&*key_columns[0]) && !dynamic_cast<ColumnFloat64 *>(&*key_columns[0]))
{
/// Если есть один ключ, который помещается в 64 бита, и это не число с плавающей запятой
result.type = AggregatedDataVariants::KEY_64;
AggregatedDataWithUInt64Key & res = result.key64;
const FieldVisitorToUInt64 visitor;
IColumn & column = *key_columns[0];
......@@ -262,6 +264,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
&& (dynamic_cast<ColumnString *>(&*key_columns[0]) || dynamic_cast<ColumnFixedString *>(&*key_columns[0])))
{
/// Если есть один строковый ключ, то используем хэш-таблицу с ним
result.type = AggregatedDataVariants::KEY_STRING;
AggregatedDataWithStringKey & res = result.key_string;
IColumn & column = *key_columns[0];
......@@ -293,6 +296,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
else
{
/// Если много ключей - будем агрегировать по хэшу от них
result.type = AggregatedDataVariants::HASHED;
AggregatedDataHashed & res = result.hashed;
const FieldVisitorToUInt64 to_uint64_visitor;
......@@ -358,6 +362,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
/* else
{
/// Общий способ
result.type = AggregatedDataVariants::GENERIC;
AggregatedData & res = result.generic;
/// Для всех строчек
......@@ -390,4 +395,222 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
}
Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants)
{
Block res = getSampleBlock();
size_t rows = 0;
/// В какой структуре данных агрегированы данные?
if (data_variants.type == AggregatedDataVariants::WITHOUT_KEY)
{
AggregatedDataWithoutKey & data = data_variants.without_key;
rows = 1;
size_t i = 0;
for (AggregateFunctionsPlainPtrs::const_iterator jt = data.begin(); jt != data.end(); ++jt, ++i)
res.getByPosition(i).column->insert(*jt);
}
else if (data_variants.type == AggregatedDataVariants::KEY_64)
{
AggregatedDataWithUInt64Key & data = data_variants.key64;
rows = data.size();
IColumn & first_column = *res.getByPosition(0).column;
bool is_signed = dynamic_cast<ColumnInt8 *>(&first_column) || dynamic_cast<ColumnInt16 *>(&first_column)
|| dynamic_cast<ColumnInt32 *>(&first_column) || dynamic_cast<ColumnInt64 *>(&first_column);
for (AggregatedDataWithUInt64Key::const_iterator it = data.begin(); it != data.end(); ++it)
{
if (is_signed)
first_column.insert(static_cast<Int64>(it->first));
else
first_column.insert(it->first);
size_t i = 1;
for (AggregateFunctionsPlainPtrs::const_iterator jt = it->second.begin(); jt != it->second.end(); ++jt, ++i)
res.getByPosition(i).column->insert(*jt);
}
}
else if (data_variants.type == AggregatedDataVariants::KEY_STRING)
{
AggregatedDataWithStringKey & data = data_variants.key_string;
rows = data.size();
IColumn & first_column = *res.getByPosition(0).column;
for (AggregatedDataWithStringKey::const_iterator it = data.begin(); it != data.end(); ++it)
{
first_column.insert(it->first);
size_t i = 1;
for (AggregateFunctionsPlainPtrs::const_iterator jt = it->second.begin(); jt != it->second.end(); ++jt, ++i)
res.getByPosition(i).column->insert(*jt);
}
}
else if (data_variants.type == AggregatedDataVariants::HASHED)
{
AggregatedDataHashed & data = data_variants.hashed;
rows = data.size();
for (AggregatedDataHashed::const_iterator it = data.begin(); it != data.end(); ++it)
{
size_t i = 0;
for (Row::const_iterator jt = it->second.first.begin(); jt != it->second.first.end(); ++jt, ++i)
res.getByPosition(i).column->insert(*jt);
for (AggregateFunctionsPlainPtrs::const_iterator jt = it->second.second.begin(); jt != it->second.second.end(); ++jt, ++i)
res.getByPosition(i).column->insert(*jt);
}
}
else if (data_variants.type == AggregatedDataVariants::GENERIC)
{
AggregatedData & data = data_variants.generic;
rows = data.size();
for (AggregatedData::const_iterator it = data.begin(); it != data.end(); ++it)
{
size_t i = 0;
for (Row::const_iterator jt = it->first.begin(); jt != it->first.end(); ++jt, ++i)
res.getByPosition(i).column->insert(*jt);
for (AggregateFunctionsPlainPtrs::const_iterator jt = it->second.begin(); jt != it->second.end(); ++jt, ++i)
res.getByPosition(i).column->insert(*jt);
}
}
else
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
/// Изменяем размер столбцов-констант в блоке.
size_t columns = res.columns();
for (size_t i = 0; i < columns; ++i)
if (res.getByPosition(i).column->isConst())
res.getByPosition(i).column->cut(0, rows);
return res;
}
AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_variants)
{
if (data_variants.empty())
throw Exception("Empty data passed to Aggregator::merge().", ErrorCodes::EMPTY_DATA_PASSED);
AggregatedDataVariants & res = *data_variants[0];
/// Все результаты агрегации соединяем с первым.
for (size_t i = 1, size = data_variants.size(); i < size; ++i)
{
AggregatedDataVariants & current = *data_variants[i];
if (res.type != current.type)
throw Exception("Cannot merge different aggregated data variants.", ErrorCodes::CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS);
/// В какой структуре данных агрегированы данные?
if (res.type == AggregatedDataVariants::WITHOUT_KEY)
{
AggregatedDataWithoutKey & res_data = res.without_key;
AggregatedDataWithoutKey & current_data = current.without_key;
size_t i = 0;
for (AggregateFunctionsPlainPtrs::const_iterator jt = current_data.begin(); jt != current_data.end(); ++jt, ++i)
{
res_data[i]->merge(**jt);
delete *jt;
}
}
else if (res.type == AggregatedDataVariants::KEY_64)
{
AggregatedDataWithUInt64Key & res_data = res.key64;
AggregatedDataWithUInt64Key & current_data = current.key64;
for (typename AggregatedDataWithUInt64Key::const_iterator it = current_data.begin(); it != current_data.end(); ++it)
{
AggregatedDataWithUInt64Key::iterator res_it;
bool inserted;
res_data.emplace(it->first, res_it, inserted);
if (!inserted)
{
size_t i = 1;
for (AggregateFunctionsPlainPtrs::const_iterator jt = it->second.begin(); jt != it->second.end(); ++jt, ++i)
{
res_it->second[i]->merge(**jt);
delete *jt;
}
}
else
res_it->second = it->second;
}
}
else if (res.type == AggregatedDataVariants::KEY_STRING)
{
AggregatedDataWithStringKey & res_data = res.key_string;
AggregatedDataWithStringKey & current_data = current.key_string;
for (typename AggregatedDataWithStringKey::const_iterator it = current_data.begin(); it != current_data.end(); ++it)
{
AggregateFunctionsPlainPtrs & res_row = res_data[it->first];
if (!res_row.empty())
{
size_t i = 1;
for (AggregateFunctionsPlainPtrs::const_iterator jt = it->second.begin(); jt != it->second.end(); ++jt, ++i)
{
res_row[i]->merge(**jt);
delete *jt;
}
}
else
res_row = it->second;
}
}
else if (res.type == AggregatedDataVariants::HASHED)
{
AggregatedDataHashed & res_data = res.hashed;
AggregatedDataHashed & current_data = current.hashed;
for (typename AggregatedDataHashed::const_iterator it = current_data.begin(); it != current_data.end(); ++it)
{
AggregatedDataHashed::iterator res_it;
bool inserted;
res_data.emplace(it->first, res_it, inserted);
if (!inserted)
{
size_t i = 1;
for (AggregateFunctionsPlainPtrs::const_iterator jt = it->second.second.begin(); jt != it->second.second.end(); ++jt, ++i)
{
res_it->second.second[i]->merge(**jt);
delete *jt;
}
}
else
res_it->second = it->second;
}
}
else if (res.type == AggregatedDataVariants::GENERIC)
{
AggregatedData & res_data = res.generic;
AggregatedData & current_data = current.generic;
for (typename AggregatedData::const_iterator it = current_data.begin(); it != current_data.end(); ++it)
{
AggregateFunctionsPlainPtrs & res_row = res_data[it->first];
if (!res_row.empty())
{
size_t i = 1;
for (AggregateFunctionsPlainPtrs::const_iterator jt = it->second.begin(); jt != it->second.end(); ++jt, ++i)
{
res_row[i]->merge(**jt);
delete *jt;
}
}
else
res_row = it->second;
}
}
else
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
}
return data_variants[0];
}
}
......@@ -23,8 +23,8 @@ namespace DB
{
InterpreterCreateQuery::InterpreterCreateQuery(ASTPtr query_ptr_, Context & context_, size_t max_block_size_)
: query_ptr(query_ptr_), context(context_), max_block_size(max_block_size_)
InterpreterCreateQuery::InterpreterCreateQuery(ASTPtr query_ptr_, Context & context_, size_t max_threads_, size_t max_block_size_)
: query_ptr(query_ptr_), context(context_), max_threads(max_threads_), max_block_size(max_block_size_)
{
}
......@@ -94,7 +94,7 @@ StoragePtr InterpreterCreateQuery::execute()
SharedPtr<InterpreterSelectQuery> interpreter_select;
if (create.select)
interpreter_select = new InterpreterSelectQuery(create.select, context, max_block_size);
interpreter_select = new InterpreterSelectQuery(create.select, context, max_threads, max_block_size);
/// Получаем список столбцов
if (create.columns)
......
......@@ -16,8 +16,8 @@ namespace DB
{
InterpreterInsertQuery::InterpreterInsertQuery(ASTPtr query_ptr_, Context & context_, size_t max_block_size_)
: query_ptr(query_ptr_), context(context_), max_block_size(max_block_size_)
InterpreterInsertQuery::InterpreterInsertQuery(ASTPtr query_ptr_, Context & context_, size_t max_threads_, size_t max_block_size_)
: query_ptr(query_ptr_), context(context_), max_threads(max_threads_), max_block_size(max_block_size_)
{
}
......@@ -78,7 +78,7 @@ void InterpreterInsertQuery::execute(ReadBuffer * remaining_data_istr)
}
else
{
InterpreterSelectQuery interpreter_select(query.select, context, max_block_size);
InterpreterSelectQuery interpreter_select(query.select, context, max_threads, max_block_size);
in = interpreter_select.execute();
in = new MaterializingBlockInputStream(in);
copyData(*in, *out);
......
......@@ -14,8 +14,8 @@ namespace DB
{
InterpreterQuery::InterpreterQuery(ASTPtr query_ptr_, Context & context_, size_t max_block_size_)
: query_ptr(query_ptr_), context(context_), max_block_size(max_block_size_)
InterpreterQuery::InterpreterQuery(ASTPtr query_ptr_, Context & context_, size_t max_threads_, size_t max_block_size_)
: query_ptr(query_ptr_), context(context_), max_threads(max_threads_), max_block_size(max_block_size_)
{
}
......@@ -24,17 +24,17 @@ void InterpreterQuery::execute(WriteBuffer & ostr, ReadBuffer * remaining_data_i
{
if (dynamic_cast<ASTSelectQuery *>(&*query_ptr))
{
InterpreterSelectQuery interpreter(query_ptr, context, max_block_size);
InterpreterSelectQuery interpreter(query_ptr, context, max_threads, max_block_size);
query_plan = interpreter.executeAndFormat(ostr);
}
else if (dynamic_cast<ASTInsertQuery *>(&*query_ptr))
{
InterpreterInsertQuery interpreter(query_ptr, context, max_block_size);
InterpreterInsertQuery interpreter(query_ptr, context, max_threads, max_block_size);
interpreter.execute(remaining_data_istr);
}
else if (dynamic_cast<ASTCreateQuery *>(&*query_ptr))
{
InterpreterCreateQuery interpreter(query_ptr, context, max_block_size);
InterpreterCreateQuery interpreter(query_ptr, context, max_threads, max_block_size);
interpreter.execute();
}
else if (dynamic_cast<ASTDropQuery *>(&*query_ptr))
......
......@@ -8,6 +8,7 @@
#include <DB/DataStreams/FinalizingAggregatedBlockInputStream.h>
#include <DB/DataStreams/AsynchronousBlockInputStream.h>
#include <DB/DataStreams/UnionBlockInputStream.h>
#include <DB/DataStreams/ParallelAggregatingBlockInputStream.h>
#include <DB/DataStreams/FormatFactory.h>
#include <DB/DataStreams/copyData.h>
......@@ -25,8 +26,8 @@ namespace DB
{
InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, Context & context_, size_t max_block_size_)
: query_ptr(query_ptr_), context(context_), max_block_size(max_block_size_)
InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, Context & context_, size_t max_threads_, size_t max_block_size_)
: query_ptr(query_ptr_), context(context_), max_threads(max_threads_), max_block_size(max_block_size_)
{
}
......@@ -70,7 +71,7 @@ void InterpreterSelectQuery::setColumns()
context.columns = !query.table || !dynamic_cast<ASTSelectQuery *>(&*query.table)
? getTable()->getColumnsList()
: InterpreterSelectQuery(query.table, context, max_block_size).getSampleBlock().getColumnsList();
: InterpreterSelectQuery(query.table, context, max_threads, max_block_size).getSampleBlock().getColumnsList();
if (context.columns.empty())
throw Exception("There is no available columns", ErrorCodes::THERE_IS_NO_COLUMN);
......@@ -108,7 +109,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
if (!query.table || !dynamic_cast<ASTSelectQuery *>(&*query.table))
table = getTable();
else
interpreter_subquery = new InterpreterSelectQuery(query.table, context, max_block_size);
interpreter_subquery = new InterpreterSelectQuery(query.table, context, max_threads, max_block_size);
/// Объект, с помощью которого анализируется запрос.
Poco::SharedPtr<Expression> expression = new Expression(query_ptr, context);
......@@ -141,21 +142,37 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
block_size = limit_length + limit_offset;
}
/// Поток данных.
BlockInputStreamPtr stream;
/** Потоки данных. При параллельном выполнении запроса, имеем несколько потоков данных.
* Если нет GROUP BY, то выполним все операции до ORDER BY и LIMIT параллельно, затем
* если есть ORDER BY, то склеим потоки с помощью UnionBlockInputStream, а затем MergеSortingBlockInputStream,
* если нет, то склеим с помощью UnionBlockInputStream,
* затем применим LIMIT.
* Если есть GROUP BY, то выполним все операции до GROUP BY, включительно, параллельно;
* параллельный GROUP BY склеит потоки в один,
* затем выполним остальные операции с одним получившимся потоком.
*/
BlockInputStreams streams(max_threads);
/// Инициализируем изначальный поток данных, на который накладываются преобразования запроса. Таблица или подзапрос?
/// Инициализируем изначальные потоки данных, на которые накладываются преобразования запроса. Таблица или подзапрос?
if (!query.table || !dynamic_cast<ASTSelectQuery *>(&*query.table))
stream = new AsynchronousBlockInputStream(new UnionBlockInputStream(table->read(required_columns, query_ptr, block_size, 10), 10));
streams = table->read(required_columns, query_ptr, block_size, max_threads);
else
stream = new AsynchronousBlockInputStream(interpreter_subquery->execute());
{
streams[0] = new AsynchronousBlockInputStream(interpreter_subquery->execute());
streams.resize(1);
}
/// Если есть условие WHERE - сначала выполним часть выражения, необходимую для его вычисления
if (query.where_expression)
{
setPartID(query.where_expression, PART_WHERE);
stream = new AsynchronousBlockInputStream(new ExpressionBlockInputStream(stream, expression, PART_WHERE));
stream = new AsynchronousBlockInputStream(new FilterBlockInputStream(stream));
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
{
BlockInputStreamPtr & stream = *it;
stream = new AsynchronousBlockInputStream(new ExpressionBlockInputStream(stream, expression, PART_WHERE));
stream = new AsynchronousBlockInputStream(new FilterBlockInputStream(stream));
}
}
/// Если есть GROUP BY - сначала выполним часть выражения, необходимую для его вычисления
......@@ -165,9 +182,25 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
if (query.group_expression_list)
setPartID(query.group_expression_list, PART_GROUP);
stream = new AsynchronousBlockInputStream(new ExpressionBlockInputStream(stream, expression, PART_GROUP | PART_BEFORE_AGGREGATING));
stream = new AsynchronousBlockInputStream(new AggregatingBlockInputStream(stream, expression));
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
{
BlockInputStreamPtr & stream = *it;
stream = new AsynchronousBlockInputStream(new ExpressionBlockInputStream(stream, expression, PART_GROUP | PART_BEFORE_AGGREGATING));
}
BlockInputStreamPtr & stream = streams[0];
/// Если потоков несколько, то выполняем параллельную агрегацию
if (streams.size() > 1)
{
stream = new AsynchronousBlockInputStream(new ParallelAggregatingBlockInputStream(streams, expression, max_threads));
streams.resize(1);
}
else
stream = new AsynchronousBlockInputStream(new AggregatingBlockInputStream(stream, expression));
/// Финализируем агрегатные функции - заменяем их состояния вычислений на готовые значения
stream = new AsynchronousBlockInputStream(new FinalizingAggregatedBlockInputStream(stream));
}
......@@ -175,23 +208,33 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
if (query.having_expression)
{
setPartID(query.having_expression, PART_HAVING);
stream = new AsynchronousBlockInputStream(new ExpressionBlockInputStream(stream, expression, PART_HAVING));
stream = new AsynchronousBlockInputStream(new FilterBlockInputStream(stream));
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
{
BlockInputStreamPtr & stream = *it;
stream = new AsynchronousBlockInputStream(new ExpressionBlockInputStream(stream, expression, PART_HAVING));
stream = new AsynchronousBlockInputStream(new FilterBlockInputStream(stream));
}
}
/// Выполним оставшуюся часть выражения
setPartID(query.select_expression_list, PART_SELECT);
if (query.order_expression_list)
setPartID(query.order_expression_list, PART_ORDER);
stream = new AsynchronousBlockInputStream(new ExpressionBlockInputStream(stream, expression, PART_SELECT | PART_ORDER));
/** Оставим только столбцы, нужные для SELECT и ORDER BY части.
* Если нет ORDER BY - то это последняя проекция, и нужно брать только столбцы из SELECT части.
*/
stream = new ProjectionBlockInputStream(stream, expression,
query.order_expression_list ? true : false,
PART_SELECT | PART_ORDER,
query.order_expression_list ? NULL : query.select_expression_list);
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
{
BlockInputStreamPtr & stream = *it;
stream = new AsynchronousBlockInputStream(new ExpressionBlockInputStream(stream, expression, PART_SELECT | PART_ORDER));
/** Оставим только столбцы, нужные для SELECT и ORDER BY части.
* Если нет ORDER BY - то это последняя проекция, и нужно брать только столбцы из SELECT части.
*/
stream = new ProjectionBlockInputStream(stream, expression,
query.order_expression_list ? true : false,
PART_SELECT | PART_ORDER,
query.order_expression_list ? NULL : query.select_expression_list);
}
/// Если есть ORDER BY
if (query.order_expression_list)
......@@ -206,12 +249,36 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
order_descr.push_back(SortColumnDescription(name, dynamic_cast<ASTOrderByElement &>(**it).direction));
}
stream = new AsynchronousBlockInputStream(new PartialSortingBlockInputStream(stream, order_descr));
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
{
BlockInputStreamPtr & stream = *it;
stream = new AsynchronousBlockInputStream(new PartialSortingBlockInputStream(stream, order_descr));
}
BlockInputStreamPtr & stream = streams[0];
/// Если потоков несколько, то объединяем их в один
if (streams.size() > 1)
{
stream = new UnionBlockInputStream(streams, max_threads);
streams.resize(1);
}
/// Сливаем сортированные блоки
stream = new AsynchronousBlockInputStream(new MergeSortingBlockInputStream(stream, order_descr));
/// Оставим только столбцы, нужные для SELECT части
stream = new ProjectionBlockInputStream(stream, expression, false, PART_SELECT, query.select_expression_list);
}
/// Если до сих пор есть несколько потоков, то объединяем их в один
if (streams.size() > 1)
{
streams[0] = new UnionBlockInputStream(streams, max_threads);
streams.resize(1);
}
BlockInputStreamPtr & stream = streams[0];
/// Если есть LIMIT
if (query.limit_length)
......
......@@ -13,6 +13,7 @@ void executeQuery(
Context & context,
BlockInputStreamPtr & query_plan,
size_t max_query_size,
size_t max_threads,
size_t max_block_size)
{
DB::ParserQuery parser;
......@@ -58,7 +59,7 @@ void executeQuery(
formatAST(*ast, std::cerr);
std::cerr << std::endl;
InterpreterQuery interpreter(ast, context, max_block_size);
InterpreterQuery interpreter(ast, context, max_threads, max_block_size);
interpreter.execute(ostr, &istr, query_plan);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册