提交 7fc40e18 编写于 作者: A Alexey Milovidov

dbms: development [#CONV-2944].

上级 42abc4ce
......@@ -100,6 +100,7 @@ namespace ErrorCodes
EMPTY_DATA_PASSED,
UNKNOWN_AGGREGATED_DATA_VARIANT,
CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS,
NO_STREAMS_RETURNED_FROM_TABLE,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,
......
#include <iomanip>
/*#include <Poco/Mutex.h>
#include <Poco/Ext/ThreadNumber.h>*/
#include <DB/DataStreams/IProfilingBlockInputStream.h>
......@@ -90,9 +93,12 @@ Block IProfilingBlockInputStream::read()
/* if (res)
{
static Poco::FastMutex mutex;
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
std::cerr << std::endl;
std::cerr << getName() << std::endl;
std::cerr << res.dumpNames() << std::endl;
std::cerr << "[ " << Poco::ThreadNumber::get() << " ]\t" << getName() << std::endl;
std::cerr << "[ " << Poco::ThreadNumber::get() << " ]\t" << res.dumpNames() << std::endl;
}*/
return res;
......
......@@ -490,7 +490,7 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants)
AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_variants)
{
if (data_variants.empty())
throw Exception("Empty data passed to Aggregator::merge().", ErrorCodes::EMPTY_DATA_PASSED);
throw Exception("Empty data passed to Aggregator::merge().", ErrorCodes::EMPTY_DATA_PASSED);
AggregatedDataVariants & res = *data_variants[0];
......@@ -528,7 +528,7 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va
if (!inserted)
{
size_t i = 1;
size_t i = 0;
for (AggregateFunctionsPlainPtrs::const_iterator jt = it->second.begin(); jt != it->second.end(); ++jt, ++i)
{
res_it->second[i]->merge(**jt);
......@@ -549,7 +549,7 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va
AggregateFunctionsPlainPtrs & res_row = res_data[it->first];
if (!res_row.empty())
{
size_t i = 1;
size_t i = 0;
for (AggregateFunctionsPlainPtrs::const_iterator jt = it->second.begin(); jt != it->second.end(); ++jt, ++i)
{
res_row[i]->merge(**jt);
......@@ -573,7 +573,7 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va
if (!inserted)
{
size_t i = 1;
size_t i = 0;
for (AggregateFunctionsPlainPtrs::const_iterator jt = it->second.second.begin(); jt != it->second.second.end(); ++jt, ++i)
{
res_it->second.second[i]->merge(**jt);
......@@ -594,7 +594,7 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va
AggregateFunctionsPlainPtrs & res_row = res_data[it->first];
if (!res_row.empty())
{
size_t i = 1;
size_t i = 0;
for (AggregateFunctionsPlainPtrs::const_iterator jt = it->second.begin(); jt != it->second.end(); ++jt, ++i)
{
res_row[i]->merge(**jt);
......
......@@ -151,16 +151,16 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
* параллельный GROUP BY склеит потоки в один,
* затем выполним остальные операции с одним получившимся потоком.
*/
BlockInputStreams streams(max_threads);
BlockInputStreams streams;
/// Инициализируем изначальные потоки данных, на которые накладываются преобразования запроса. Таблица или подзапрос?
if (!query.table || !dynamic_cast<ASTSelectQuery *>(&*query.table))
streams = table->read(required_columns, query_ptr, block_size, max_threads);
else
{
streams[0] = new AsynchronousBlockInputStream(interpreter_subquery->execute());
streams.resize(1);
}
streams.push_back(new AsynchronousBlockInputStream(interpreter_subquery->execute()));
if (streams.empty())
throw Exception("No streams returned from table.", ErrorCodes::NO_STREAMS_RETURNED_FROM_TABLE);
/// Если есть условие WHERE - сначала выполним часть выражения, необходимую для его вычисления
if (query.where_expression)
......
......@@ -43,10 +43,10 @@ Block LogBlockInputStream::readImpl()
if (column.column->size())
res.insert(column);
rows_read += column.column->size();
}
rows_read += res.getByPosition(0).column->size();
return res;
}
......@@ -148,10 +148,10 @@ BlockInputStreams StorageLog::read(
for (size_t thread = 0; thread < max_threads; ++thread)
{
std::cerr << "Thread " << thread << ", mark " << thread * marks_size / max_threads
/* std::cerr << "Thread " << thread << ", mark " << thread * marks_size / max_threads
<< ", rows " << (thread == 0
? marks[marks_size / max_threads - 1].rows
: (marks[(thread + 1) * marks_size / max_threads - 1].rows - marks[thread * marks_size / max_threads - 1].rows)) << std::endl;
: (marks[(thread + 1) * marks_size / max_threads - 1].rows - marks[thread * marks_size / max_threads - 1].rows)) << std::endl;*/
res.push_back(new LogBlockInputStream(
max_block_size,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册