提交 43717781 编写于 作者: A Alexey Milovidov

Unification of Nested: development [#CLICKHOUSE-2].

上级 fffad2bb
......@@ -51,11 +51,10 @@ class LogBlockInputStream final : public IProfilingBlockInputStream
{
public:
LogBlockInputStream(
size_t block_size_, const Names & column_names_, StorageLog & storage_,
size_t block_size_, const NamesAndTypesList & columns_, StorageLog & storage_,
size_t mark_number_, size_t rows_limit_, size_t max_read_buffer_size_)
: block_size(block_size_),
column_names(column_names_),
column_types(column_names.size()),
columns(columns_),
storage(storage_),
mark_number(mark_number_),
rows_limit(rows_limit_),
......@@ -70,8 +69,8 @@ public:
std::stringstream res;
res << "Log(" << storage.getTableName() << ", " << &storage << ", " << mark_number << ", " << rows_limit;
for (const auto & name : column_names)
res << ", " << name;
for (const auto & name_type : columns)
res << ", " << name_type.name;
res << ")";
return res.str();
......@@ -82,8 +81,7 @@ protected:
private:
size_t block_size;
Names column_names;
DataTypes column_types;
NamesAndTypesList columns;
StorageLog & storage;
size_t mark_number; /// from what mark to read data
size_t rows_limit; /// The maximum number of rows that can be read
......@@ -191,11 +189,6 @@ Block LogBlockInputStream::readImpl()
if (Poco::DirectoryIterator(storage.getFullPath()) == Poco::DirectoryIterator())
return res;
/// If the files are not open, then open them.
if (streams.empty())
for (size_t i = 0, size = column_names.size(); i < size; ++i)
column_types[i] = storage.getDataTypeByName(column_names[i]);
/// How many rows to read for the next block.
size_t max_rows_to_read = std::min(block_size, rows_limit - rows_read);
......@@ -203,18 +196,16 @@ Block LogBlockInputStream::readImpl()
using OffsetColumns = std::map<std::string, ColumnPtr>;
OffsetColumns offset_columns;
for (size_t i = 0, size = column_names.size(); i < size; ++i)
for (const auto & name_type : columns)
{
const auto & name = column_names[i];
MutableColumnPtr column;
bool read_offsets = true;
/// For nested structures, remember pointers to columns with offsets
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(column_types[i].get()))
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(name_type.type.get()))
{
String nested_name = Nested::extractTableName(name);
String nested_name = Nested::extractTableName(name_type.name);
if (offset_columns.count(nested_name) == 0)
offset_columns[nested_name] = ColumnArray::ColumnOffsets::create();
......@@ -224,20 +215,20 @@ Block LogBlockInputStream::readImpl()
column = ColumnArray::create(type_arr->getNestedType()->createColumn(), offset_columns[nested_name]);
}
else
column = column_types[i]->createColumn();
column = name_type.type->createColumn();
try
{
readData(name, *column_types[i], *column, max_rows_to_read, read_offsets);
readData(name_type.name, *name_type.type, *column, max_rows_to_read, read_offsets);
}
catch (Exception & e)
{
e.addMessage("while reading column " + name + " at " + storage.path + escapeForFileName(storage.name));
e.addMessage("while reading column " + name_type.name + " at " + storage.path + escapeForFileName(storage.name));
throw;
}
if (column->size())
res.insert(ColumnWithTypeAndName(std::move(column), column_types[i], name));
res.insert(ColumnWithTypeAndName(std::move(column), name_type.type, name_type.name));
}
if (res)
......@@ -538,6 +529,8 @@ BlockInputStreams StorageLog::read(
processed_stage = QueryProcessingStage::FetchColumns;
loadMarks();
NamesAndTypesList columns = getColumnsList().addTypes(column_names);
std::shared_lock<std::shared_mutex> lock(rwlock);
BlockInputStreams res;
......@@ -560,7 +553,7 @@ BlockInputStreams StorageLog::read(
res.emplace_back(std::make_shared<LogBlockInputStream>(
max_block_size,
column_names,
columns,
*this,
mark_begin,
rows_end - rows_begin,
......
......@@ -55,8 +55,8 @@ namespace ErrorCodes
class TinyLogBlockInputStream final : public IProfilingBlockInputStream
{
public:
TinyLogBlockInputStream(size_t block_size_, const Names & column_names_, StorageTinyLog & storage_, size_t max_read_buffer_size_)
: block_size(block_size_), column_names(column_names_), column_types(column_names.size()),
TinyLogBlockInputStream(size_t block_size_, const NamesAndTypesList & columns_, StorageTinyLog & storage_, size_t max_read_buffer_size_)
: block_size(block_size_), columns(columns_),
storage(storage_), max_read_buffer_size(max_read_buffer_size_) {}
String getName() const override { return "TinyLog"; }
......@@ -67,8 +67,7 @@ protected:
Block readImpl() override;
private:
size_t block_size;
Names column_names;
DataTypes column_types;
NamesAndTypesList columns;
StorageTinyLog & storage;
bool finished = false;
size_t max_read_buffer_size;
......@@ -151,8 +150,8 @@ String TinyLogBlockInputStream::getID() const
std::stringstream res;
res << "TinyLog(" << storage.getTableName() << ", " << &storage;
for (const auto & name : column_names)
res << ", " << name;
for (const auto & name_type : columns)
res << ", " << name_type.name;
res << ")";
return res.str();
......@@ -180,27 +179,20 @@ Block TinyLogBlockInputStream::readImpl()
return res;
}
/// If the files are not open, then open them.
if (streams.empty())
for (size_t i = 0, size = column_names.size(); i < size; ++i)
column_types[i] = storage.getDataTypeByName(column_names[i]);
/// Pointers to offset columns, shared for columns from nested data structures
using OffsetColumns = std::map<std::string, ColumnPtr>;
OffsetColumns offset_columns;
for (size_t i = 0, size = column_names.size(); i < size; ++i)
for (const auto & name_type : columns)
{
const auto & name = column_names[i];
MutableColumnPtr column;
bool read_offsets = true;
/// For nested structures, remember pointers to columns with offsets
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(column_types[i].get()))
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(name_type.type.get()))
{
String nested_name = Nested::extractTableName(name);
String nested_name = Nested::extractTableName(name_type.name);
if (offset_columns.count(nested_name) == 0)
offset_columns[nested_name] = ColumnArray::ColumnOffsets::create();
......@@ -210,20 +202,20 @@ Block TinyLogBlockInputStream::readImpl()
column = ColumnArray::create(type_arr->getNestedType()->createColumn(), offset_columns[nested_name]);
}
else
column = column_types[i]->createColumn();
column = name_type.type->createColumn();
try
{
readData(name, *column_types[i], *column, block_size, read_offsets);
readData(name_type.name, *name_type.type, *column, block_size, read_offsets);
}
catch (Exception & e)
{
e.addMessage("while reading column " + name + " at " + storage.full_path());
e.addMessage("while reading column " + name_type.name + " at " + storage.full_path());
throw;
}
if (column->size())
res.insert(ColumnWithTypeAndName(std::move(column), column_types[i], name));
res.insert(ColumnWithTypeAndName(std::move(column), name_type.type, name_type.name));
}
if (!res || streams.begin()->second->compressed.eof())
......@@ -386,7 +378,7 @@ BlockInputStreams StorageTinyLog::read(
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
return BlockInputStreams(1, std::make_shared<TinyLogBlockInputStream>(
max_block_size, column_names, *this, context.getSettingsRef().max_read_buffer_size));
max_block_size, getColumnsList().addTypes(column_names), *this, context.getSettingsRef().max_read_buffer_size));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册