提交 d6d7b3e7 编写于 作者: A Alexey Milovidov

Header in every stream: development [#CLICKHOUSE-2]

上级 95d2d68c
......@@ -59,7 +59,7 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu
if (name == "Native")
{
return std::make_shared<NativeBlockInputStream>(buf);
return std::make_shared<NativeBlockInputStream>(buf, sample, 0);
}
else if (name == "RowBinary")
{
......
......@@ -133,7 +133,7 @@ private:
BlockInputStreamPtr block_in;
TemporaryFileStream(const std::string & path)
: file_in(path), compressed_in(file_in), block_in(std::make_shared<NativeBlockInputStream>(compressed_in)) {}
: file_in(path), compressed_in(file_in), block_in(std::make_shared<NativeBlockInputStream>(compressed_in, 0)) {}
};
std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs;
......
......@@ -22,21 +22,34 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
NativeBlockInputStream::NativeBlockInputStream(
ReadBuffer & istr_, UInt64 server_revision_,
bool use_index_,
NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_)
: istr(istr_), server_revision(server_revision_)
{
}
NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_)
: istr(istr_), header(header_), server_revision(server_revision_)
{
}
NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_,
IndexForNativeFormat::Blocks::const_iterator index_block_it_,
IndexForNativeFormat::Blocks::const_iterator index_block_end_)
: istr(istr_), server_revision(server_revision_),
use_index(use_index_), index_block_it(index_block_it_), index_block_end(index_block_end_)
use_index(true), index_block_it(index_block_it_), index_block_end(index_block_end_)
{
if (use_index)
{
istr_concrete = typeid_cast<CompressedReadBufferFromFile *>(&istr);
if (!istr_concrete)
throw Exception("When need to use index for NativeBlockInputStream, istr must be CompressedReadBufferFromFile.", ErrorCodes::LOGICAL_ERROR);
istr_concrete = typeid_cast<CompressedReadBufferFromFile *>(&istr);
if (!istr_concrete)
throw Exception("When need to use index for NativeBlockInputStream, istr must be CompressedReadBufferFromFile.", ErrorCodes::LOGICAL_ERROR);
index_column_it = index_block_it->columns.begin();
index_column_it = index_block_it->columns.begin();
/// Initialize header from the index.
for (const auto & column : index_block_it->columns)
{
auto type = DataTypeFactory::instance().get(column.type);
header.insert({ type->createColumn(), type, column.name });
}
}
......@@ -53,17 +66,7 @@ void NativeBlockInputStream::readData(const IDataType & type, IColumn & column,
Block NativeBlockInputStream::getHeader()
{
/// Note: we may read first block and stash it for further use just to get header.
if (!use_index)
throw Exception("Method getHeader for NativeBlockInputStream requires index", ErrorCodes::NOT_IMPLEMENTED);
Block res;
for (const auto & column : index_block_it->columns)
{
auto type = DataTypeFactory::instance().get(column.type);
res.insert({ type->createColumn(), type, column.name });
}
return res;
return header;
}
......
......@@ -60,15 +60,17 @@ struct IndexForNativeFormat
class NativeBlockInputStream : public IProfilingBlockInputStream
{
public:
/** If a non-zero server_revision is specified, additional block information may be expected and read.
*
* `index` is not required parameter. If set, only parts of columns specified in the index will be read.
*/
NativeBlockInputStream(
ReadBuffer & istr_, UInt64 server_revision_ = 0,
bool use_index_ = false,
IndexForNativeFormat::Blocks::const_iterator index_block_it_ = IndexForNativeFormat::Blocks::const_iterator{},
IndexForNativeFormat::Blocks::const_iterator index_block_end_ = IndexForNativeFormat::Blocks::const_iterator{});
/// If a non-zero server_revision is specified, additional block information may be expected and read.
NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_);
/// For cases when data structure (header) is known in advance.
/// NOTE We may use header for data validation and/or type conversions. It is not implemented.
NativeBlockInputStream(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_);
/// For cases when we have an index. It allows to skip columns. Only columns specified in the index will be read.
NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_,
IndexForNativeFormat::Blocks::const_iterator index_block_it_,
IndexForNativeFormat::Blocks::const_iterator index_block_end_);
String getName() const override { return "Native"; }
......@@ -88,9 +90,10 @@ protected:
private:
ReadBuffer & istr;
Block header;
UInt64 server_revision;
bool use_index;
bool use_index = false;
IndexForNativeFormat::Blocks::const_iterator index_block_it;
IndexForNativeFormat::Blocks::const_iterator index_block_end;
IndexOfBlockForNativeFormat::Columns::const_iterator index_column_it;
......
......@@ -157,7 +157,7 @@ void StorageSetOrJoinBase::restoreFromFile(const String & file_path)
{
ReadBufferFromFile backup_buf(file_path);
CompressedReadBuffer compressed_backup_buf(backup_buf);
NativeBlockInputStream backup_stream(compressed_backup_buf);
NativeBlockInputStream backup_stream(compressed_backup_buf, 0);
backup_stream.readPrefix();
while (Block block = backup_stream.read())
......
......@@ -122,7 +122,7 @@ private:
storage.full_path() + "data.bin", 0, 0,
std::min(static_cast<Poco::File::FileSize>(max_read_buffer_size), Poco::File(storage.full_path() + "data.bin").getSize()));
block_in.emplace(*data_in, 0, true, index_begin, index_end);
block_in.emplace(*data_in, 0, index_begin, index_end);
}
}
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册