提交 600f4bdd 编写于 作者: A Alexey Milovidov

dbms: development [#CONV-2944].

上级 ce4161b4
......@@ -24,7 +24,6 @@ public:
: required_columns(required_columns_)
{
children.push_back(input_);
input = &*children.back();
}
String getName() const { return "AddingDefaultBlockInputStream"; }
......@@ -32,7 +31,7 @@ public:
String getID() const
{
std::stringstream res;
res << "AddingDefault(" << input->getID();
res << "AddingDefault(" << children.back()->getID();
for (NamesAndTypesList::const_iterator it = required_columns->begin(); it != required_columns->end(); ++it)
res << ", " << it->first << ", " << it->second->getName();
......@@ -44,7 +43,7 @@ public:
protected:
Block readImpl()
{
Block res = input->read();
Block res = children.back()->read();
if (!res)
return res;
......@@ -65,7 +64,6 @@ protected:
}
private:
IBlockInputStream * input;
NamesAndTypesListPtr required_columns;
};
......
......@@ -24,7 +24,6 @@ public:
: aggregator(new Aggregator(keys_, aggregates_, max_rows_to_group_by_, group_by_overflow_mode_)), has_been_read(false)
{
children.push_back(input_);
input = &*children.back();
}
/** keys берутся из GROUP BY части запроса
......@@ -39,18 +38,13 @@ public:
String getID() const
{
std::stringstream res;
res << "Aggregating(" << input->getID() << ", " << aggregator->getID() << ")";
res << "Aggregating(" << children.back()->getID() << ", " << aggregator->getID() << ")";
return res.str();
}
protected:
Block readImpl();
private:
AggregatingBlockInputStream(const AggregatingBlockInputStream & src)
: input(src.input), aggregator(src.aggregator), has_been_read(src.has_been_read) {}
IBlockInputStream * input;
SharedPtr<Aggregator> aggregator;
bool has_been_read;
};
......
......@@ -40,14 +40,12 @@ public:
: array_column(array_column_)
{
children.push_back(input_);
input = &*children.back();
}
ArrayJoiningBlockInputStream(BlockInputStreamPtr input_, const String & array_column_name_)
: array_column(-1), array_column_name(array_column_name_)
{
children.push_back(input_);
input = &*children.back();
}
String getName() const { return "ArrayJoiningBlockInputStream"; }
......@@ -55,14 +53,14 @@ public:
String getID() const
{
std::stringstream res;
res << "ArrayJoining(" << input->getID() << ", " << array_column << ", " << array_column_name << ")";
res << "ArrayJoining(" << children.back()->getID() << ", " << array_column << ", " << array_column_name << ")";
return res.str();
}
protected:
Block readImpl()
{
Block block = input->read();
Block block = children.back()->read();
if (!block)
return block;
......@@ -98,7 +96,6 @@ protected:
}
private:
IBlockInputStream * input;
ssize_t array_column;
String array_column_name;
};
......
......@@ -23,7 +23,6 @@ public:
AsynchronousBlockInputStream(BlockInputStreamPtr in_) : pool(1), started(false)
{
children.push_back(in_);
in = &*children.back();
}
String getName() const { return "AsynchronousBlockInputStream"; }
......@@ -31,7 +30,7 @@ public:
String getID() const
{
std::stringstream res;
res << "Asynchronous(" << in->getID() << ")";
res << "Asynchronous(" << children.back()->getID() << ")";
return res.str();
}
......@@ -58,7 +57,6 @@ public:
}
protected:
IBlockInputStream * in;
boost::threadpool::pool pool;
Poco::Event ready;
bool started;
......@@ -105,7 +103,7 @@ protected:
{
try
{
block = in->read();
block = children.back()->read();
}
catch (const Exception & e)
{
......
......@@ -27,7 +27,6 @@ public:
: expression(expression_), part_id(part_id_), clear_temporaries(clear_temporaries_)
{
children.push_back(input_);
input = &*children.back();
}
String getName() const { return "ExpressionBlockInputStream"; }
......@@ -35,14 +34,14 @@ public:
String getID() const
{
std::stringstream res;
res << "Expression(" << input->getID() << ", " << expression->getExecutionID(part_id) << ")";
res << "Expression(" << children.back()->getID() << ", " << expression->getExecutionID(part_id) << ")";
return res.str();
}
protected:
Block readImpl()
{
Block res = input->read();
Block res = children.back()->read();
if (!res)
return res;
......@@ -55,7 +54,6 @@ protected:
}
private:
IBlockInputStream * input;
ExpressionPtr expression;
unsigned part_id;
bool clear_temporaries;
......
......@@ -27,7 +27,7 @@ public:
String getID() const
{
std::stringstream res;
res << "Filter(" << input->getID() << ", " << filter_column << ", " << filter_column_name << ")";
res << "Filter(" << children.back()->getID() << ", " << filter_column << ", " << filter_column_name << ")";
return res.str();
}
......@@ -35,7 +35,6 @@ protected:
Block readImpl();
private:
IBlockInputStream * input;
ssize_t filter_column;
String filter_column_name;
};
......
......@@ -24,7 +24,6 @@ public:
: log(&Logger::get("FinalizingAggregatedBlockInputStream"))
{
children.push_back(input_);
input = &*children.back();
}
String getName() const { return "FinalizingAggregatedBlockInputStream"; }
......@@ -32,14 +31,14 @@ public:
String getID() const
{
std::stringstream res;
res << "FinalizingAggregated(" << input->getID() << ")";
res << "FinalizingAggregated(" << children.back()->getID() << ")";
return res.str();
}
protected:
Block readImpl()
{
Block res = input->read();
Block res = children.back()->read();
if (!res)
return res;
......@@ -81,7 +80,6 @@ protected:
}
private:
IBlockInputStream * input;
Logger * log;
};
......
......@@ -72,11 +72,6 @@ public:
/// Получить количество строк и байт, прочитанных в листовых источниках.
void getLeafRowsBytes(size_t & rows, size_t & bytes);
/** Получить текст, который идентифицирует этот источник и всё поддерево.
* Обычно он содержит идентификатор источника и getTreeID от всех детей.
*/
String getTreeID() const;
/** Проверить глубину конвейера.
* Если задано max_depth и глубина больше - кинуть исключение.
*/
......@@ -84,13 +79,18 @@ public:
protected:
BlockInputStreams children;
StoragePtr owned_storage;
private:
void getLeavesImpl(BlockInputStreams & res, BlockInputStreamPtr this_shared_ptr = NULL);
size_t checkDepthImpl(size_t max_depth, size_t level) const;
/** Получить текст, который идентифицирует этот источник и всё поддерево.
* В отличие от getID - без учёта параметров.
*/
String getTreeID() const;
};
......
......@@ -23,7 +23,7 @@ public:
String getID() const
{
std::stringstream res;
res << "Limit(" << input->getID() << ", " << limit << ", " << offset << ")";
res << "Limit(" << children.back()->getID() << ", " << limit << ", " << offset << ")";
return res.str();
}
......@@ -31,7 +31,6 @@ protected:
Block readImpl();
private:
IBlockInputStream * input;
size_t limit;
size_t offset;
size_t pos;
......
......@@ -16,7 +16,6 @@ public:
MaterializingBlockInputStream(BlockInputStreamPtr input_)
{
children.push_back(input_);
input = &*children.back();
}
String getName() const { return "MaterializingBlockInputStream"; }
......@@ -24,14 +23,14 @@ public:
String getID() const
{
std::stringstream res;
res << "Materializing(" << input->getID() << ")";
res << "Materializing(" << children.back()->getID() << ")";
return res.str();
}
protected:
Block readImpl()
{
Block res = input->read();
Block res = children.back()->read();
if (!res)
return res;
......@@ -46,9 +45,6 @@ protected:
return res;
}
private:
IBlockInputStream * input;
};
}
......@@ -19,7 +19,6 @@ public:
: description(description_), has_been_read(false), log(&Logger::get("MergeSortingBlockInputStream"))
{
children.push_back(input_);
input = &*children.back();
}
String getName() const { return "MergeSortingBlockInputStream"; }
......@@ -27,7 +26,7 @@ public:
String getID() const
{
std::stringstream res;
res << "MergeSorting(" << input->getID();
res << "MergeSorting(" << children.back()->getID();
for (size_t i = 0; i < description.size(); ++i)
res << ", " << description[i].getID();
......@@ -40,7 +39,6 @@ protected:
Block readImpl();
private:
IBlockInputStream * input;
SortDescription description;
/// Всё было прочитано.
......
......@@ -22,7 +22,6 @@ public:
: aggregator(new Aggregator(keys_, aggregates_)), has_been_read(false)
{
children.push_back(input_);
input = &*children.back();
}
/** keys берутся из GROUP BY части запроса
......@@ -35,7 +34,7 @@ public:
String getID() const
{
std::stringstream res;
res << "MergingAggregated(" << input->getID() << ", " << aggregator->getID() << ")";
res << "MergingAggregated(" << children.back()->getID() << ", " << aggregator->getID() << ")";
return res.str();
}
......@@ -43,10 +42,6 @@ protected:
Block readImpl();
private:
MergingAggregatedBlockInputStream(const MergingAggregatedBlockInputStream & src)
: input(src.input), aggregator(src.aggregator), has_been_read(src.has_been_read) {}
IBlockInputStream * input;
SharedPtr<Aggregator> aggregator;
bool has_been_read;
};
......
......@@ -18,7 +18,6 @@ public:
: description(description_)
{
children.push_back(input_);
input = &*children.back();
}
String getName() const { return "PartialSortingBlockInputStream"; }
......@@ -26,7 +25,7 @@ public:
String getID() const
{
std::stringstream res;
res << "PartialSorting(" << input->getID();
res << "PartialSorting(" << children.back()->getID();
for (size_t i = 0; i < description.size(); ++i)
res << ", " << description[i].getID();
......@@ -39,7 +38,6 @@ protected:
Block readImpl();
private:
IBlockInputStream * input;
SortDescription description;
};
......
......@@ -27,7 +27,6 @@ public:
: expression(expression_), without_duplicates_and_aliases(without_duplicates_and_aliases_), part_id(part_id_), subtree(subtree_)
{
children.push_back(input_);
input = &*children.back();
}
String getName() const { return "ProjectionBlockInputStream"; }
......@@ -35,14 +34,14 @@ public:
String getID() const
{
std::stringstream res;
res << "Projection(" << input->getID() << ", " << expression->getProjectionID(without_duplicates_and_aliases, part_id, subtree) << ")";
res << "Projection(" << children.back()->getID() << ", " << expression->getProjectionID(without_duplicates_and_aliases, part_id, subtree) << ")";
return res.str();
}
protected:
Block readImpl()
{
Block res = input->read();
Block res = children.back()->read();
if (!res)
return res;
......@@ -50,7 +49,6 @@ protected:
}
private:
IBlockInputStream * input;
ExpressionPtr expression;
bool without_duplicates_and_aliases;
unsigned part_id;
......
......@@ -12,7 +12,6 @@ AggregatingBlockInputStream::AggregatingBlockInputStream(BlockInputStreamPtr inp
: has_been_read(false)
{
children.push_back(input_);
input = &*children.back();
Names key_names;
AggregateDescriptions aggregates;
......@@ -28,9 +27,9 @@ Block AggregatingBlockInputStream::readImpl()
return Block();
has_been_read = true;
AggregatedDataVariants data_variants;
aggregator->execute(input, data_variants);
aggregator->execute(children.back(), data_variants);
if (isCancelled())
return Block();
......
......@@ -11,14 +11,12 @@ FilterBlockInputStream::FilterBlockInputStream(BlockInputStreamPtr input_, ssize
: filter_column(filter_column_)
{
children.push_back(input_);
input = &*children.back();
}
FilterBlockInputStream::FilterBlockInputStream(BlockInputStreamPtr input_, const String & filter_column_name_)
: filter_column(-1), filter_column_name(filter_column_name_)
{
children.push_back(input_);
input = &*children.back();
}
......@@ -27,7 +25,7 @@ Block FilterBlockInputStream::readImpl()
/// Пока не встретится блок, после фильтрации которого что-нибудь останется, или поток не закончится.
while (1)
{
Block res = input->read();
Block res = children.back()->read();
if (!res)
return res;
......
......@@ -12,7 +12,6 @@ LimitBlockInputStream::LimitBlockInputStream(BlockInputStreamPtr input_, size_t
: limit(limit_), offset(offset_), pos(0)
{
children.push_back(input_);
input = &*children.back();
}
......@@ -28,7 +27,7 @@ Block LimitBlockInputStream::readImpl()
do
{
res = input->read();
res = children.back()->read();
if (!res)
return res;
rows = res.rows();
......
......@@ -22,7 +22,7 @@ Block MergeSortingBlockInputStream::readImpl()
has_been_read = true;
Blocks blocks;
while (Block block = input->read())
while (Block block = children.back()->read())
blocks.push_back(block);
if (isCancelled())
......
......@@ -11,7 +11,6 @@ MergingAggregatedBlockInputStream::MergingAggregatedBlockInputStream(BlockInputS
: has_been_read(false)
{
children.push_back(input_);
input = &*children.back();
Names key_names;
AggregateDescriptions aggregates;
......@@ -29,7 +28,7 @@ Block MergingAggregatedBlockInputStream::readImpl()
has_been_read = true;
AggregatedDataVariants data_variants;
aggregator->merge(input, data_variants);
aggregator->merge(children.back(), data_variants);
return aggregator->convertToBlock(data_variants);
}
......
......@@ -9,7 +9,7 @@ namespace DB
Block PartialSortingBlockInputStream::readImpl()
{
Block res = input->read();
Block res = children.back()->read();
sortBlock(res, description);
return res;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册