提交 4148c6ce 编写于 作者: A Alexey Milovidov

Better semantic of sharing columns: development [#CLICKHOUSE-2].

上级 8926af2c
......@@ -21,7 +21,7 @@ Block AggregatingSortedBlockInputStream::readImpl()
return children[0]->read();
Block header;
MutableColumnRawPtrs merged_columns;
MutableColumns merged_columns;
init(header, merged_columns);
......@@ -66,10 +66,10 @@ Block AggregatingSortedBlockInputStream::readImpl()
columns_to_aggregate.resize(column_numbers_to_aggregate.size());
for (size_t i = 0, size = columns_to_aggregate.size(); i < size; ++i)
columns_to_aggregate[i] = typeid_cast<ColumnAggregateFunction *>(merged_columns[column_numbers_to_aggregate[i]]);
columns_to_aggregate[i] = typeid_cast<ColumnAggregateFunction *>(merged_columns[column_numbers_to_aggregate[i]].get());
merge(merged_columns, queue);
return header.cloneWithColumns(merged_columns);
return header.cloneWithColumns(std::move(merged_columns));
}
......
......@@ -70,11 +70,11 @@ private:
/** We support two different cursors - with Collation and without.
* Templates are used instead of polymorphic SortCursor and calls to virtual functions.
*/
void merge(MutableColumnRawPtrs & merged_columns, std::priority_queue<TSortCursor> & queue);
void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
/** Extract all states of aggregate functions and merge them with the current group.
*/
void addRow(TSortCursor & cursor);
void addRow(SortCursor & cursor);
};
}
......@@ -133,7 +133,7 @@ Block CollapsingSortedBlockInputStream::readImpl()
}
merge(merged_columns, queue);
return header.cloneWithColumns(merged_columns);
return header.cloneWithColumns(std::move(merged_columns));
}
......
......@@ -104,7 +104,7 @@ Block GraphiteRollupSortedBlockInputStream::readImpl()
}
merge(merged_columns, queue);
return header.cloneWithColumns(merged_columns);
return header.cloneWithColumns(std::move(merged_columns));
}
......
......@@ -206,7 +206,7 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queue<TSortCur
{
size_t num_columns = blocks[0].columns();
MutableColumnPtrs merged_columns = blocks[0].cloneEmptyColumns();
MutableColumns merged_columns = blocks[0].cloneEmptyColumns();
/// TODO: reserve (in each column)
/// Take rows from queue in right order and push to 'merged'.
......@@ -229,18 +229,18 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queue<TSortCur
if (limit && total_merged_rows == limit)
{
blocks.clear();
return blocks[0].cloneWithColumns(merged_columns);
return blocks[0].cloneWithColumns(std::move(merged_columns));
}
++merged_rows;
if (merged_rows == max_merged_block_size)
return blocks[0].cloneWithColumns(merged_columns);
return blocks[0].cloneWithColumns(std::move(merged_columns));
}
if (merged_rows == 0)
return {};
return blocks[0].cloneWithColumns(merged_columns);
return blocks[0].cloneWithColumns(std::move(merged_columns));
}
......
......@@ -46,7 +46,7 @@ String MergingSortedBlockInputStream::getID() const
return res.str();
}
void MergingSortedBlockInputStream::init(Block & header, MutableColumnRawPtrs & merged_columns)
void MergingSortedBlockInputStream::init(Block & header, MutableColumns & merged_columns)
{
/// Read the first blocks, initialize the queue.
if (first)
......@@ -155,7 +155,7 @@ Block MergingSortedBlockInputStream::readImpl()
return children[0]->read();
Block header;
MutableColumnRawPtrs merged_columns;
MutableColumns merged_columns;
init(header, merged_columns);
if (merged_columns.empty())
......@@ -166,7 +166,7 @@ Block MergingSortedBlockInputStream::readImpl()
else
merge(merged_columns, queue);
return header.cloneWithColumns(merged_columns);
return header.cloneWithColumns(std::move(merged_columns));
}
......@@ -202,7 +202,7 @@ void MergingSortedBlockInputStream::fetchNextBlock<SortCursorWithCollation>(cons
template <typename TSortCursor>
void MergingSortedBlockInputStream::merge(MutableColumnRawPtrs & merged_columns, std::priority_queue<TSortCursor> & queue)
void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<TSortCursor> & queue)
{
size_t merged_rows = 0;
......
......@@ -214,7 +214,7 @@ private:
void initQueue(std::priority_queue<TSortCursor> & queue);
template <typename TSortCursor>
void merge(MutableColumnRawPtrs & merged_columns, std::priority_queue<TSortCursor> & queue);
void merge(MutableColumns & merged_columns, std::priority_queue<TSortCursor> & queue);
Logger * log = &Logger::get("MergingSortedBlockInputStream");
......
......@@ -114,11 +114,13 @@ Block NativeBlockInputStream::readImpl()
}
/// Data
column.column = column.type->createColumn();
MutableColumnPtr read_column = column.type->createColumn();
double avg_value_size_hint = avg_value_size_hints.empty() ? 0 : avg_value_size_hints[i];
if (rows) /// If no rows, nothing to read.
readData(*column.type, *column.column, istr, rows, avg_value_size_hint);
readData(*column.type, *read_column, istr, rows, avg_value_size_hint);
column.column = std::move(read_column);
res.insert(std::move(column));
......
......@@ -59,7 +59,7 @@ Block ReplacingSortedBlockInputStream::readImpl()
}
merge(merged_columns, queue);
return header.cloneWithColumns(merged_columns);
return header.cloneWithColumns(std::move(merged_columns));
}
......
......@@ -289,7 +289,7 @@ Block SummingSortedBlockInputStream::readImpl()
}
merge(merged_columns, queue);
return header.cloneWithColumns(merged_columns);
return header.cloneWithColumns(std::move(merged_columns));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册