提交 e07d4761 编写于 作者: A Alexey Milovidov

dbms: development [#CONV-2944].

上级 8d0b8479
......@@ -5,6 +5,8 @@
#include <map>
#include <tr1/unordered_map>
#include <Poco/Mutex.h>
#include <DB/Core/ColumnNumbers.h>
#include <DB/Core/Names.h>
#include <DB/DataStreams/IBlockInputStream.h>
......@@ -104,15 +106,15 @@ typedef SharedPtr<AggregatedDataVariants> AggregatedDataVariantsPtr;
typedef std::vector<AggregatedDataVariantsPtr> ManyAggregatedDataVariants;
/** Агрегирует поток блоков.
/** Агрегирует источник блоков.
*/
class Aggregator
{
public:
Aggregator(const ColumnNumbers & keys_, AggregateDescriptions & aggregates_) : keys(keys_), aggregates(aggregates_) {};
Aggregator(const Names & key_names_, AggregateDescriptions & aggregates_) : key_names(key_names_), aggregates(aggregates_) {};
Aggregator(const ColumnNumbers & keys_, AggregateDescriptions & aggregates_) : keys(keys_), aggregates(aggregates_), initialized(false) {};
Aggregator(const Names & key_names_, AggregateDescriptions & aggregates_) : key_names(key_names_), aggregates(aggregates_), initialized(false) {};
/// Агрегировать поток. Получить результат в виде одной из структур данных.
/// Агрегировать источник. Получить результат в виде одной из структур данных.
void execute(BlockInputStreamPtr stream, AggregatedDataVariants & result);
/// Получить пример блока, описывающего результат. Следует вызывать только после execute.
......@@ -129,7 +131,16 @@ private:
Names key_names;
AggregateDescriptions aggregates;
/// Для инициализации от первого блока при конкуррентном использовании.
bool initialized;
Poco::FastMutex mutex;
Block sample;
/** Если заданы только имена столбцов (key_names, а также aggregates[i].column_name), то вычислить номера столбцов.
* Сформировать блок - пример результата.
*/
void initialize(Block & block);
};
......
......@@ -88,18 +88,27 @@ Block IProfilingBlockInputStream::read()
Block res = readImpl();
info.work_stopwatch.stop();
if (res)
info.update(res);
/* if (res)
{
static Poco::FastMutex mutex;
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
std::cerr << std::endl;
std::cerr << "[ " << Poco::ThreadNumber::get() << " ]\t" << getName() << std::endl;
std::cerr << "[ " << Poco::ThreadNumber::get() << " ]\t" << res.dumpNames() << std::endl;
std::cerr << "[ " << Poco::ThreadNumber::get() << " ]\t";
for (size_t i = 0; i < res.columns(); ++i)
{
if (i != 0)
std::cerr << ", ";
std::cerr << res.getByPosition(i).name << " (" << res.getByPosition(i).column->size() << ")";
}
std::cerr << std::endl;
}*/
if (res)
info.update(res);
return res;
}
......
......@@ -108,6 +108,50 @@ public:
};
void Aggregator::initialize(Block & block)
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
if (initialized)
return;
initialized = true;
/// Преобразуем имена столбцов в номера, если номера не заданы
if (keys.empty() && !key_names.empty())
for (Names::const_iterator it = key_names.begin(); it != key_names.end(); ++it)
keys.push_back(block.getPositionByName(*it));
for (AggregateDescriptions::iterator it = aggregates.begin(); it != aggregates.end(); ++it)
if (it->arguments.empty() && !it->argument_names.empty())
for (Names::const_iterator jt = it->argument_names.begin(); jt != it->argument_names.end(); ++jt)
it->arguments.push_back(block.getPositionByName(*jt));
/// Создадим пример блока, описывающего результат
if (!sample)
{
for (size_t i = 0, size = keys.size(); i < size; ++i)
sample.insert(block.getByPosition(keys[i]).cloneEmpty());
for (size_t i = 0, size = aggregates.size(); i < size; ++i)
{
ColumnWithNameAndType col;
col.name = aggregates[i].column_name;
col.type = new DataTypeAggregateFunction;
col.column = new ColumnAggregateFunction;
sample.insert(col);
}
/// Вставим в блок результата все столбцы-константы из исходного блока, так как они могут ещё пригодиться.
size_t columns = block.columns();
for (size_t i = 0; i < columns; ++i)
if (block.getByPosition(i).column->isConst())
sample.insert(block.getByPosition(i).cloneEmpty());
}
}
/** Результат хранится в оперативке и должен полностью помещаться в оперативку.
*/
void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & result)
......@@ -126,15 +170,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
/// Читаем все данные
while (Block block = stream->read())
{
/// Преобразуем имена столбцов в номера, если номера не заданы
if (keys.empty() && !key_names.empty())
for (Names::const_iterator it = key_names.begin(); it != key_names.end(); ++it)
keys.push_back(block.getPositionByName(*it));
for (AggregateDescriptions::iterator it = aggregates.begin(); it != aggregates.end(); ++it)
if (it->arguments.empty() && !it->argument_names.empty())
for (Names::const_iterator jt = it->argument_names.begin(); jt != it->argument_names.end(); ++jt)
it->arguments.push_back(block.getPositionByName(*jt));
initialize(block);
for (size_t i = 0; i < aggregates_size; ++i)
{
......@@ -150,29 +186,6 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
for (size_t j = 0; j < aggregate_columns[i].size(); ++j)
aggregate_columns[i][j] = block.getByPosition(aggregates[i].arguments[j]).column;
/// Создадим пример блока, описывающего результат
if (!sample)
{
for (size_t i = 0, size = keys_size; i < size; ++i)
sample.insert(block.getByPosition(keys[i]).cloneEmpty());
for (size_t i = 0; i < aggregates_size; ++i)
{
ColumnWithNameAndType col;
col.name = aggregates[i].column_name;
col.type = new DataTypeAggregateFunction;
col.column = new ColumnAggregateFunction;
sample.insert(col);
}
/// Вставим в блок результата все столбцы-константы из исходного блока, так как они могут ещё пригодиться.
size_t columns = block.columns();
for (size_t i = 0; i < columns; ++i)
if (block.getByPosition(i).column->isConst())
sample.insert(block.getByPosition(i).cloneEmpty());
}
size_t rows = block.rows();
/// Каким способом выполнять агрегацию?
......
......@@ -198,8 +198,8 @@ int main(int argc, char ** argv)
if (query_plan)
{
std::cerr << std::endl;
query_plan->dumpTreeWithProfile(std::cerr);
/* std::cerr << std::endl;
query_plan->dumpTreeWithProfile(std::cerr);*/
std::cerr << std::endl;
query_plan->dumpTree(std::cerr);
std::cerr << std::endl;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册