提交 c4bbc1c9 编写于 作者: A Alexey Milovidov

Better semantic of sharing columns: development [#CLICKHOUSE-2].

上级 8ce60e3a
......@@ -333,7 +333,7 @@ Blocks DistributedBlockOutputStream::splitBlock(const Block & block)
size_t columns_in_block = block.columns();
for (size_t col_idx_in_block = 0; col_idx_in_block < columns_in_block; ++col_idx_in_block)
{
Columns splitted_columns = block.getByPosition(col_idx_in_block).column->scatter(num_shards, selector);
MutableColumns splitted_columns = block.getByPosition(col_idx_in_block).column->scatter(num_shards, selector);
for (size_t shard_idx = 0; shard_idx < num_shards; ++shard_idx)
splitted_blocks[shard_idx].getByPosition(col_idx_in_block).column = std::move(splitted_columns[shard_idx]);
}
......
#include "MergeTreeBaseBlockInputStream.h"
#include <Storages/MergeTree/MergeTreeBaseBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeReader.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnNullable.h>
#include <Columns/FilterDescription.h>
#include <Common/typeid_cast.h>
#include <ext/range.h>
......@@ -21,7 +19,7 @@ namespace ErrorCodes
MergeTreeBaseBlockInputStream::MergeTreeBaseBlockInputStream(
MergeTreeData & storage,
const ExpressionActionsPtr & prewhere_actions,
const String & prewhere_column,
const String & prewhere_column_name,
size_t max_block_size_rows,
size_t preferred_block_size_bytes,
size_t preferred_max_column_in_block_size_bytes,
......@@ -33,7 +31,7 @@ MergeTreeBaseBlockInputStream::MergeTreeBaseBlockInputStream(
:
storage(storage),
prewhere_actions(prewhere_actions),
prewhere_column(prewhere_column),
prewhere_column_name(prewhere_column_name),
max_block_size_rows(max_block_size_rows),
preferred_block_size_bytes(preferred_block_size_bytes),
preferred_max_column_in_block_size_bytes(preferred_max_column_in_block_size_bytes),
......@@ -133,21 +131,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
{
auto & col = block.getByPosition(i);
if (task.column_name_set.count(col.name))
{
if (ColumnArray * column_array = typeid_cast<ColumnArray *>(col.column.get()))
{
/// ColumnArray columns in block could have common offset column, which is used while reading.
/// This is in case of nested data structures.
/// Have to call resize(0) instead of cloneEmpty to save structure.
/// (To keep offsets possibly shared between different arrays.)
column_array->getOffsets().resize(0);
/// It's ok until multidimensional arrays are not stored in MergeTree.
column_array->getDataPtr() = column_array->getDataPtr()->cloneEmpty();
}
else
col.column = col.column->cloneEmpty();
}
col.column = col.column->cloneEmpty();
}
}
};
......@@ -219,46 +203,38 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
/// Compute the expression in PREWHERE.
prewhere_actions->execute(res);
ColumnPtr column = res.getByName(prewhere_column).column;
ColumnPtr prewhere_column = res.getByName(prewhere_column_name).column;
if (task->remove_prewhere_column)
res.erase(prewhere_column);
res.erase(prewhere_column_name);
const auto pre_bytes = res.bytes();
ColumnPtr observed_column;
if (column->isColumnNullable())
{
ColumnNullable & nullable_col = static_cast<ColumnNullable &>(*column);
observed_column = &nullable_col.getNestedColumn();
}
else
observed_column = column;
ConstantFilterDescription constant_filter_description(*prewhere_column);
/** If the filter is a constant (for example, it says PREWHERE 1),
/** If the filter is a constant (for example, it says PREWHERE 0),
* then either return an empty block, or return the block unchanged.
*/
if (observed_column->isColumnConst())
if (constant_filter_description.always_false)
{
if (!static_cast<const ColumnConst &>(*observed_column).getValue<UInt8>())
if (pre_range_reader)
{
if (pre_range_reader)
/// Have to read rows from last partly read granula.
if (!ranges_to_read.empty())
{
/// Have to read rows from last partly read granula.
if (!ranges_to_read.empty())
{
auto & range = ranges_to_read.back();
task->current_range_reader = reader->readRange(range.begin, range.end);
}
/// But can just skip them.
task->number_of_rows_to_skip = rows_was_read_in_last_range;
auto & range = ranges_to_read.back();
task->current_range_reader = reader->readRange(range.begin, range.end);
}
else
task->current_range_reader.reset();
res.clear();
return res;
/// But can just skip them.
task->number_of_rows_to_skip = rows_was_read_in_last_range;
}
else
task->current_range_reader.reset();
res.clear();
return res;
}
else if (constant_filter_description.always_true)
{
if (task->current_range_reader)
{
if (task->number_of_rows_to_skip)
......@@ -283,9 +259,11 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
progressImpl({ 0, res.bytes() - pre_bytes });
}
else if (const auto column_vec = typeid_cast<const ColumnUInt8 *>(observed_column.get()))
else
{
const auto & pre_filter = column_vec->getData();
FilterDescription filter_and_holder(*prewhere_column);
const auto & pre_filter = *filter_and_holder.data;
auto & number_of_rows_to_skip = task->number_of_rows_to_skip;
if (!task->current_range_reader)
number_of_rows_to_skip = 0;
......@@ -408,7 +386,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
for (const auto i : ext::range(0, res.columns()))
{
auto & col = res.safeGetByPosition(i);
if (col.name == prewhere_column && res.columns() > 1)
if (col.name == prewhere_column_name && res.columns() > 1)
continue;
col.column =
col.column->filter(task->column_name_set.count(col.name) ? post_filter : pre_filter, -1);
......@@ -419,13 +397,9 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
/// Replace column with condition value from PREWHERE to a constant.
if (!task->remove_prewhere_column)
res.getByName(prewhere_column).column = DataTypeUInt8().createColumnConst(rows, UInt64(1));
res.getByName(prewhere_column_name).column = DataTypeUInt8().createColumnConst(rows, UInt64(1));
}
else
throw Exception{
"Illegal type " + column->getName() + " of column for filter. Must be ColumnUInt8 or ColumnConstUInt8.",
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER};
if (res)
{
......
......@@ -50,7 +50,7 @@ protected:
MergeTreeData & storage;
ExpressionActionsPtr prewhere_actions;
String prewhere_column;
String prewhere_column_name;
size_t max_block_size_rows;
size_t preferred_block_size_bytes;
......
......@@ -113,7 +113,7 @@ try
const NameSet pre_name_set(pre_column_names.begin(), pre_column_names.end());
/// If the expression in PREWHERE is not a column of the table, you do not need to output a column with it
/// (from storage expect to receive only the columns of the table).
remove_prewhere_column = !pre_name_set.count(prewhere_column);
remove_prewhere_column = !pre_name_set.count(prewhere_column_name);
Names post_column_names;
for (const auto & name : column_names)
......
......@@ -633,13 +633,13 @@ void MergeTreeDataPart::loadIndex()
if (key_size)
{
index.clear();
index.resize(key_size);
MutableColumns loaded_index;
loaded_index.resize(key_size);
for (size_t i = 0; i < key_size; ++i)
{
index[i] = storage.primary_key_data_types[i]->createColumn();
index[i]->reserve(marks_count);
loaded_index[i] = storage.primary_key_data_types[i]->createColumn();
loaded_index[i]->reserve(marks_count);
}
String index_path = getFullPath() + "primary.idx";
......@@ -647,16 +647,18 @@ void MergeTreeDataPart::loadIndex()
for (size_t i = 0; i < marks_count; ++i)
for (size_t j = 0; j < key_size; ++j)
storage.primary_key_data_types[j]->deserializeBinary(*index[j].get(), index_file);
storage.primary_key_data_types[j]->deserializeBinary(*loaded_index[j].get(), index_file);
for (size_t i = 0; i < key_size; ++i)
if (index[i]->size() != marks_count)
if (loaded_index[i]->size() != marks_count)
throw Exception("Cannot read all data from index file " + index_path
+ "(expected size: " + toString(marks_count) + ", read: " + toString(index[i]->size()) + ")",
+ "(expected size: " + toString(marks_count) + ", read: " + toString(loaded_index[i]->size()) + ")",
ErrorCodes::CANNOT_READ_ALL_DATA);
if (!index_file.eof())
throw Exception("Index file " + index_path + " is unexpectedly long", ErrorCodes::EXPECTED_END_OF_FILE);
index.assign(std::make_move_iterator(loaded_index.begin()), std::make_move_iterator(loaded_index.end()));
}
size_in_bytes = calculateTotalSize(getFullPath());
......
......@@ -112,7 +112,7 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block
for (size_t col = 0; col < block.columns(); ++col)
{
Columns scattered = block.getByPosition(col).column->scatter(partitions_count, selector);
MutableColumns scattered = block.getByPosition(col).column->scatter(partitions_count, selector);
for (size_t i = 0; i < partitions_count; ++i)
result[i].block.getByPosition(col).column = std::move(scattered[i]);
}
......
......@@ -96,7 +96,7 @@ void MergeTreePartition::serializeTextQuoted(const MergeTreeData & storage, Writ
writeCString(", ", out);
const DataTypePtr & type = storage.partition_expr_column_types[i];
ColumnPtr column = type->createColumn();
auto column = type->createColumn();
column->insert(value[i]);
type->serializeTextQuoted(*column, 0, out);
}
......
......@@ -345,7 +345,7 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
new_part->marks_count = marks_count;
new_part->modification_time = time(nullptr);
new_part->columns = *total_column_list;
new_part->index.swap(index_columns);
new_part->index.assign(std::make_move_iterator(index_columns.begin()), std::make_move_iterator(index_columns.end()));
new_part->checksums = checksums;
new_part->size_in_bytes = MergeTreeData::DataPart::calculateTotalSize(new_part->getFullPath());
}
......
......@@ -140,7 +140,7 @@ private:
std::unique_ptr<WriteBufferFromFile> index_file_stream;
std::unique_ptr<HashingWriteBuffer> index_stream;
MergeTreeData::DataPart::Index index_columns;
MutableColumns index_columns;
};
......
......@@ -182,7 +182,7 @@ MergeTreeData::DataPart::Checksums checkDataPart(
if (!primary_key_data_types.empty())
{
size_t key_size = primary_key_data_types.size();
Columns tmp_columns(key_size);
MutableColumns tmp_columns(key_size);
for (size_t j = 0; j < key_size; ++j)
tmp_columns[j] = primary_key_data_types[j]->createColumn();
......@@ -277,7 +277,7 @@ MergeTreeData::DataPart::Checksums checkDataPart(
/// Read index_granularity rows from column.
/// NOTE Shared array sizes of Nested columns are read more than once. That's Ok.
ColumnPtr tmp_column = name_type.type->createColumn();
MutableColumnPtr tmp_column = name_type.type->createColumn();
name_type.type->deserializeBinaryBulkWithMultipleStreams(
*tmp_column,
[&](const IDataType::SubstreamPath & substream_path)
......
......@@ -97,10 +97,7 @@ protected:
return res;
for (const auto & name : column_names)
{
auto & col = buffer.data.getByName(name);
res.insert(ColumnWithTypeAndName(col.column->clone(), col.type, name));
}
res.insert(buffer.data.getByName(name));
return res;
}
......@@ -156,6 +153,10 @@ static void appendBlock(const Block & from, Block & to)
if (!to)
throw Exception("Cannot append to empty block", ErrorCodes::LOGICAL_ERROR);
if (!blocksHaveEqualStructure(from, to))
throw Exception("Cannot append block to buffer: block has different structure. "
"Block: " + from.dumpStructure() + ", Buffer: " + to.dumpStructure(), ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
from.checkNumberOfRows();
to.checkNumberOfRows();
......@@ -171,14 +172,12 @@ static void appendBlock(const Block & from, Block & to)
{
for (size_t column_no = 0, columns = to.columns(); column_no < columns; ++column_no)
{
const IColumn & col_from = *from.safeGetByPosition(column_no).column.get();
IColumn & col_to = *to.safeGetByPosition(column_no).column.get();
const IColumn & col_from = *from.getByPosition(column_no).column.get();
MutableColumnPtr col_to = to.getByPosition(column_no).column->mutate();
if (col_from.getName() != col_to.getName())
throw Exception("Cannot append block to another: different type of columns at index " + toString(column_no)
+ ". Block 1: " + from.dumpStructure() + ". Block 2: " + to.dumpStructure(), ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
col_to->insertRangeFrom(col_from, 0, rows);
col_to.insertRangeFrom(col_from, 0, rows);
to.getByPosition(column_no).column = std::move(col_to);
}
}
catch (...)
......@@ -191,9 +190,9 @@ static void appendBlock(const Block & from, Block & to)
for (size_t column_no = 0, columns = to.columns(); column_no < columns; ++column_no)
{
ColumnPtr & col_to = to.safeGetByPosition(column_no).column;
ColumnPtr & col_to = to.getByPosition(column_no).column;
if (col_to->size() != old_rows)
col_to = col_to->cut(0, old_rows);
col_to = col_to->mutate()->cut(0, old_rows);
}
}
catch (...)
......
......@@ -210,30 +210,28 @@ Block LogBlockInputStream::readImpl()
{
const auto & name = column_names[i];
ColumnWithTypeAndName column;
column.name = name;
column.type = column_types[i];
MutableColumnPtr column;
bool read_offsets = true;
/// For nested structures, remember pointers to columns with offsets
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(column.type.get()))
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(column_types[i].get()))
{
String name = DataTypeNested::extractNestedTableName(column.name);
String name = DataTypeNested::extractNestedTableName(name);
if (offset_columns.count(name) == 0)
offset_columns[name] = std::make_shared<ColumnArray::ColumnOffsets_t>();
offset_columns[name] = ColumnArray::ColumnOffsets_t::create();
else
read_offsets = false; /// on previous iterations the offsets were already read by `readData`
column.column = ColumnArray::create(type_arr->getNestedType()->createColumn(), offset_columns[name]);
column = ColumnArray::create(type_arr->getNestedType()->createColumn(), offset_columns[name]);
}
else
column.column = column.type->createColumn();
column = column_types[i]->createColumn();
try
{
readData(name, *column.type, *column.column, max_rows_to_read, read_offsets);
readData(name, *column_types[i], *column, max_rows_to_read, read_offsets);
}
catch (Exception & e)
{
......@@ -241,8 +239,8 @@ Block LogBlockInputStream::readImpl()
throw;
}
if (column.column->size())
res.insert(std::move(column));
if (column->size())
res.insert(ColumnWithTypeAndName(std::move(column), column_types[i], name));
}
if (res)
......
......@@ -302,14 +302,12 @@ BlockInputStreams StorageMerge::read(
/// Construct a block consisting only of possible values of virtual columns
Block StorageMerge::getBlockWithVirtualColumns(const StorageListWithLocks & selected_tables) const
{
Block res;
ColumnWithTypeAndName _table(ColumnString::create(), std::make_shared<DataTypeString>(), "_table");
auto column = ColumnString::create();
for (const auto & elem : selected_tables)
_table.column->insert(elem.first->getTableName());
column->insert(elem.first->getTableName());
res.insert(_table);
return res;
return Block{ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeString>(), "_table")};
}
StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables() const
......
......@@ -205,7 +205,7 @@ Block TinyLogBlockInputStream::readImpl()
String nested_name = DataTypeNested::extractNestedTableName(name);
if (offset_columns.count(nested_name) == 0)
offset_columns[nested_name] = ColumnArray::ColumnOffsets_t::create());
offset_columns[nested_name] = ColumnArray::ColumnOffsets_t::create();
else
read_offsets = false; /// on previous iterations, the offsets were already calculated by `readData`
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册