提交 532945a5 编写于 作者: M Michael Kolupaev

dbms: refactored compressed read buffers. [#METR-9633]

上级 589ed293
#pragma once
#include <vector>
#include <DB/IO/ReadBufferFromFile.h>
#include <DB/IO/CompressedReadBuffer.h>
#include <DB/IO/CompressedReadBufferBase.h>
#include <DB/IO/UncompressedCache.h>
......@@ -16,156 +14,104 @@ namespace DB
* Недостатки:
* - в случае, если нужно читать много данных подряд, но из них только часть закэширована, приходится делать seek-и.
*/
class CachedCompressedReadBuffer : public ReadBuffer
class CachedCompressedReadBuffer : public CompressedReadBufferBase, public ReadBuffer
{
private:
const std::string path;
size_t cur_begin_offset; /// Смещение в сжатом файле, соответствующее working_buffer.begin().
size_t cur_end_offset; /// Смещение в сжатом файле, соответствующее working_buffer.end().
UncompressedCache * cache;
size_t buf_size;
/// SharedPtr - для ленивой инициализации (только в случае кэш-промаха).
Poco::SharedPtr<ReadBufferFromFile> in;
Poco::SharedPtr<CompressedReadBuffer> compressed_in;
Poco::SharedPtr<ReadBufferFromFile> file_in;
size_t file_pos;
/// Кусок данных из кэша, или кусок считанных данных, который мы положим в кэш.
UncompressedCache::CellPtr owned_cell;
void initInput()
{
if (!compressed_in)
if (!file_in)
{
in = new ReadBufferFromFile(path, buf_size);
compressed_in = new CompressedReadBuffer(*in);
file_in = new ReadBufferFromFile(path, buf_size);
compressed_in = &*file_in;
}
}
bool nextImpl()
{
if (cache)
{
/// Проверим наличие разжатого блока в кэше, захватим владение этим блоком, если он есть.
/// Проверим наличие разжатого блока в кэше, захватим владение этим блоком, если он есть.
cur_begin_offset = cur_end_offset;
UInt128 key = {0, 0};
UInt128 key = {0, 0};
key = cache->hash(path, cur_begin_offset);
owned_cell = cache->get(key);
key = cache->hash(path, file_pos);
owned_cell = cache->get(key);
if (!owned_cell)
{
/// Если нет - надо прочитать его из файла.
initInput();
in->seek(cur_begin_offset);
if (!owned_cell)
{
/// Если нет - надо прочитать его из файла.
initInput();
file_in->seek(file_pos);
owned_cell = new UncompressedCache::Cell;
owned_cell->key = key;
owned_cell = new UncompressedCache::Cell;
owned_cell->key = key;
/// Разжимать будем в кусок памяти, который будет в кэше.
compressed_in->setMemory(owned_cell->data);
size_t size_decompressed;
owned_cell->compressed_size = readCompressedData(size_decompressed);
size_t old_count = in->count();
compressed_in->next();
owned_cell->compressed_size = in->count() - old_count;
if (owned_cell->compressed_size)
{
owned_cell->data.resize(size_decompressed);
decompress(owned_cell->data.m_data, size_decompressed);
/// Положим данные в кэш.
cache->set(owned_cell);
}
if (owned_cell->data.m_size == 0)
return false;
internal_buffer = Buffer(owned_cell->data.m_data, owned_cell->data.m_data + owned_cell->data.m_size);
working_buffer = Buffer(owned_cell->data.m_data, owned_cell->data.m_data + owned_cell->data.m_size);
pos = working_buffer.begin();
cur_end_offset += owned_cell->compressed_size;
}
else
{
initInput();
in->seek(cur_end_offset);
if (!compressed_in->next())
return false;
syncWithCompressedInput();
if (owned_cell->data.m_size == 0)
{
owned_cell = NULL;
return false;
}
return true;
}
working_buffer = Buffer(owned_cell->data.m_data, owned_cell->data.m_data + owned_cell->data.m_size);
void syncWithCompressedInput()
{
internal_buffer = compressed_in->buffer();
working_buffer = compressed_in->buffer();
pos = compressed_in->position();
cur_end_offset = in->getPositionInFile();
cur_begin_offset = cur_end_offset - compressed_in->getCurrentBlockCompressedSize();
file_pos += owned_cell->compressed_size;
return true;
}
public:
/// Если cache_ == NULL, работает без кеша - как CompressedReadBuffer.
CachedCompressedReadBuffer(const std::string & path_, UncompressedCache * cache_, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE)
: ReadBuffer(NULL, 0), path(path_), cur_begin_offset(0), cur_end_offset(0), cache(cache_), buf_size(buf_size_)
: ReadBuffer(NULL, 0), path(path_), cache(cache_), buf_size(buf_size_), file_pos(0)
{
}
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block)
{
if (offset_in_compressed_file == cur_begin_offset && offset_in_decompressed_block < working_buffer.size())
if (owned_cell &&
offset_in_compressed_file == file_pos - owned_cell->compressed_size &&
offset_in_decompressed_block <= working_buffer.size())
{
bytes += offset();
pos = working_buffer.begin() + offset_in_decompressed_block;
bytes -= offset();
}
else
{
/// Как будто только что дочитали до нужного места.
cur_end_offset = offset_in_compressed_file;
file_pos = offset_in_compressed_file;
pos = working_buffer.end();
next();
bytes += offset();
nextImpl();
if (offset_in_decompressed_block > working_buffer.size())
throw Exception("Seek position is beyond the decompressed block", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
if (unlikely(offset_in_decompressed_block == working_buffer.size()))
{
/** Если убрать эту ветку, то будет неправильно работать функция readBig в CompressedReadBuffer
* (курсор будет находиться в конце буфера, но данные не прочитаны)
*/
pos = working_buffer.end();
next();
}
else
pos += offset_in_decompressed_block;
pos = working_buffer.begin() + offset_in_decompressed_block;
bytes -= offset();
}
}
/*
size_t readBig(char * to, size_t n)
{
/// Если кэш используется, то будем читать через него.
if (cache)
{
return read(to, n);
}
else
{
/// Иначе - вызываем метод CompressedReadBuffer. К сожалению, сложная обёртка.
initInput();
in->seek(cur_end_offset);
compressed_in->position() = pos;
size_t res = compressed_in->readBig(to, n);
syncWithCompressedInput();
return res;
}
}*/
};
}
#pragma once
#include <vector>
#include <city.h>
#include <quicklz/quicklz_level1.h>
#include <lz4/lz4.h>
#include <DB/Common/PODArray.h>
#include <DB/Common/ProfileEvents.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/BufferWithOwnMemory.h>
#include <DB/IO/CompressedStream.h>
#include <DB/IO/CompressedReadBufferBase.h>
namespace DB
{
class CompressedReadBuffer : public BufferWithOwnMemory<ReadBuffer>
class CompressedReadBuffer : public CompressedReadBufferBase, public BufferWithOwnMemory<ReadBuffer>
{
private:
ReadBuffer & in;
/// Если в буфере in помещается целый сжатый блок - используем его. Иначе - копируем данные по кусочкам в own_compressed_buffer.
PODArray<char> own_compressed_buffer;
char * compressed_buffer;
size_t size_compressed;
qlz_state_decompress * qlz_state;
/** Указатель на кусок памяти, куда будут разжиматься блоки.
* Это может быть либо свой кусок памяти из BufferWithOwnMemory (по-умолчанию),
* либо пользователь может попросить разжимать данные в свой кусок памяти (метод setMemory).
*/
Memory * maybe_own_memory;
/// Прочитать сжатые данные в compressed_buffer. Достать из их заголовка размер разжатых данных. Проверить чексумму.
bool readCompressedData(size_t & size_decompressed)
{
if (in.eof())
return false;
uint128 checksum;
in.readStrict(reinterpret_cast<char *>(&checksum), sizeof(checksum));
own_compressed_buffer.resize(QUICKLZ_HEADER_SIZE);
in.readStrict(&own_compressed_buffer[0], QUICKLZ_HEADER_SIZE);
size_compressed = qlz_size_compressed(&own_compressed_buffer[0]);
if (size_compressed > DBMS_MAX_COMPRESSED_SIZE)
throw Exception("Too large size_compressed. Most likely corrupted data.", ErrorCodes::TOO_LARGE_SIZE_COMPRESSED);
ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed + sizeof(checksum));
size_decompressed = qlz_size_decompressed(&own_compressed_buffer[0]);
/// Находится ли сжатый блок целиком в буфере in?
if (in.offset() >= QUICKLZ_HEADER_SIZE && in.position() + size_compressed - QUICKLZ_HEADER_SIZE <= in.buffer().end())
{
in.position() -= QUICKLZ_HEADER_SIZE;
compressed_buffer = in.position();
in.position() += size_compressed;
}
else
{
own_compressed_buffer.resize(size_compressed);
compressed_buffer = &own_compressed_buffer[0];
in.readStrict(&compressed_buffer[QUICKLZ_HEADER_SIZE], size_compressed - QUICKLZ_HEADER_SIZE);
}
if (checksum != CityHash128(&compressed_buffer[0], size_compressed))
throw Exception("Checksum doesn't match: corrupted data.", ErrorCodes::CHECKSUM_DOESNT_MATCH);
return true;
}
void decompress(char * to, size_t size_decompressed)
{
ProfileEvents::increment(ProfileEvents::CompressedReadBufferBlocks);
ProfileEvents::increment(ProfileEvents::CompressedReadBufferBytes, size_decompressed);
/// Старший бит первого байта определяет использованный метод сжатия.
if ((compressed_buffer[0] & 0x80) == 0)
{
if (!qlz_state)
qlz_state = new qlz_state_decompress;
qlz_decompress(&compressed_buffer[0], to, qlz_state);
}
else
LZ4_decompress_fast(&compressed_buffer[QUICKLZ_HEADER_SIZE], to, size_decompressed);
}
bool nextImpl()
{
size_t size_decompressed = 0;
size_t size_decompressed;
if (!readCompressedData(size_decompressed))
return false;
maybe_own_memory->resize(size_decompressed);
internal_buffer = Buffer(&(*maybe_own_memory)[0], &(*maybe_own_memory)[size_decompressed]);
working_buffer = Buffer(&(*maybe_own_memory)[0], &(*maybe_own_memory)[size_decompressed]);
memory.resize(size_decompressed);
working_buffer = Buffer(&memory[0], &memory[size_decompressed]);
decompress(working_buffer.begin(), size_decompressed);
......@@ -112,29 +25,10 @@ private:
public:
CompressedReadBuffer(ReadBuffer & in_)
: BufferWithOwnMemory<ReadBuffer>(0),
in(in_),
own_compressed_buffer(QUICKLZ_HEADER_SIZE),
compressed_buffer(NULL),
qlz_state(NULL),
maybe_own_memory(&memory)
{
}
~CompressedReadBuffer()
: CompressedReadBufferBase(&in_), BufferWithOwnMemory<ReadBuffer>(0)
{
if (qlz_state)
delete qlz_state;
}
/// Использовать предоставленный пользователем кусок памяти для разжатия. (Для реализации кэша разжатых блоков.)
void setMemory(Memory & memory_)
{
maybe_own_memory = &memory_;
}
size_t readBig(char * to, size_t n)
{
size_t bytes_read = 0;
......@@ -143,20 +37,13 @@ public:
if (pos < working_buffer.end())
bytes_read += read(to, std::min(static_cast<size_t>(working_buffer.end() - pos), n));
if (bytes_read < n)
bytes += offset();
/// Если надо ещё прочитать - будем, по возможности, разжимать сразу в to.
while (bytes_read < n)
{
size_t size_decompressed = 0;
size_t size_decompressed;
if (!readCompressedData(size_decompressed))
{
working_buffer.resize(0);
pos = working_buffer.begin();
return bytes_read;
}
/// Если разжатый блок помещается целиком туда, куда его надо скопировать.
if (size_decompressed <= n - bytes_read)
......@@ -167,9 +54,9 @@ public:
}
else
{
maybe_own_memory->resize(size_decompressed);
internal_buffer = Buffer(&(*maybe_own_memory)[0], &(*maybe_own_memory)[size_decompressed]);
working_buffer = Buffer(&(*maybe_own_memory)[0], &(*maybe_own_memory)[size_decompressed]);
bytes += offset();
memory.resize(size_decompressed);
working_buffer = Buffer(&memory[0], &memory[size_decompressed]);
pos = working_buffer.begin();
decompress(working_buffer.begin(), size_decompressed);
......@@ -181,12 +68,6 @@ public:
return bytes_read;
}
/// Для CachedCompressedReadBuffer.
size_t getCurrentBlockCompressedSize() const
{
return size_compressed;
}
};
}
#pragma once
#include <vector>
#include <city.h>
#include <quicklz/quicklz_level1.h>
#include <lz4/lz4.h>
#include <DB/Common/PODArray.h>
#include <DB/Common/ProfileEvents.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/BufferWithOwnMemory.h>
#include <DB/IO/CompressedStream.h>
namespace DB
{
class CompressedReadBufferBase
{
protected:
ReadBuffer * compressed_in;
/// Если в буфере compressed_in помещается целый сжатый блок - используем его. Иначе - копируем данные по кусочкам в own_compressed_buffer.
PODArray<char> own_compressed_buffer;
char * compressed_buffer;
size_t size_compressed;
qlz_state_decompress * qlz_state;
/// Прочитать сжатые данные в compressed_buffer. Достать из их заголовка размер разжатых данных. Проверить чексумму.
/// Возвращает количество прочитанных байт.
size_t readCompressedData(size_t & size_decompressed)
{
if (compressed_in->eof())
return 0;
uint128 checksum;
compressed_in->readStrict(reinterpret_cast<char *>(&checksum), sizeof(checksum));
own_compressed_buffer.resize(QUICKLZ_HEADER_SIZE);
compressed_in->readStrict(&own_compressed_buffer[0], QUICKLZ_HEADER_SIZE);
size_t size_compressed = qlz_size_compressed(&own_compressed_buffer[0]);
if (size_compressed > DBMS_MAX_COMPRESSED_SIZE)
throw Exception("Too large size_compressed. Most likely corrupted data.", ErrorCodes::TOO_LARGE_SIZE_COMPRESSED);
ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed + sizeof(checksum));
size_decompressed = qlz_size_decompressed(&own_compressed_buffer[0]);
/// Находится ли сжатый блок целиком в буфере compressed_in?
if (compressed_in->offset() >= QUICKLZ_HEADER_SIZE &&
compressed_in->position() + size_compressed - QUICKLZ_HEADER_SIZE <= compressed_in->buffer().end())
{
compressed_in->position() -= QUICKLZ_HEADER_SIZE;
compressed_buffer = compressed_in->position();
compressed_in->position() += size_compressed;
}
else
{
own_compressed_buffer.resize(size_compressed);
compressed_buffer = &own_compressed_buffer[0];
compressed_in->readStrict(&compressed_buffer[QUICKLZ_HEADER_SIZE], size_compressed - QUICKLZ_HEADER_SIZE);
}
if (checksum != CityHash128(&compressed_buffer[0], size_compressed))
throw Exception("Checksum doesn't match: corrupted data.", ErrorCodes::CHECKSUM_DOESNT_MATCH);
return size_compressed + sizeof(checksum);
}
void decompress(char * to, size_t size_decompressed)
{
ProfileEvents::increment(ProfileEvents::CompressedReadBufferBlocks);
ProfileEvents::increment(ProfileEvents::CompressedReadBufferBytes, size_decompressed);
/// Старший бит первого байта определяет использованный метод сжатия.
if ((compressed_buffer[0] & 0x80) == 0)
{
if (!qlz_state)
qlz_state = new qlz_state_decompress;
qlz_decompress(&compressed_buffer[0], to, qlz_state);
}
else
LZ4_decompress_fast(&compressed_buffer[QUICKLZ_HEADER_SIZE], to, size_decompressed);
}
public:
/// compressed_in можно инициализировать отложенно, но до первого вызова readCompressedData.
CompressedReadBufferBase(ReadBuffer * in = NULL)
:
compressed_in(in),
own_compressed_buffer(QUICKLZ_HEADER_SIZE),
compressed_buffer(NULL),
qlz_state(NULL)
{
}
~CompressedReadBufferBase()
{
if (qlz_state)
delete qlz_state;
}
};
}
#pragma once
#include <DB/IO/CompressedReadBufferBase.h>
#include "ReadBufferFromFile.h"
namespace DB
{
/// В отличие от CompressedReadBuffer, умеет делать seek.
class CompressedReadBufferFromFile : public CompressedReadBufferBase, public BufferWithOwnMemory<ReadBuffer>
{
private:
/** В любой момент выполняется одно из двух:
* a) size_compressed = 0
* b)
* - working_buffer содержит целиком один блок.
* - file_in смотрит в конец этого блока.
* - size_compressed содержит сжатый размер этого блока.
*/
ReadBufferFromFile file_in;
size_t size_compressed;
bool nextImpl()
{
size_t size_decompressed;
size_compressed = readCompressedData(size_decompressed);
if (!size_compressed)
return false;
memory.resize(size_decompressed);
working_buffer = Buffer(&memory[0], &memory[size_decompressed]);
decompress(working_buffer.begin(), size_decompressed);
return true;
}
public:
CompressedReadBufferFromFile(const std::string & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE)
: BufferWithOwnMemory<ReadBuffer>(0), file_in(path, buf_size)
{
compressed_in = &file_in;
}
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block)
{
if (size_compressed &&
offset_in_compressed_file == file_in.getPositionInFile() - size_compressed &&
offset_in_decompressed_block <= working_buffer.size())
{
bytes += offset();
pos = working_buffer.begin() + offset_in_decompressed_block;
/// bytes может переполниться и получиться отрицательным, но в count() все переполнится обратно и получится правильно.
bytes -= offset();
}
else
{
file_in.seek(offset_in_compressed_file);
bytes += offset();
nextImpl();
if (offset_in_decompressed_block > working_buffer.size())
throw Exception("Seek position is beyond the decompressed block", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
pos = working_buffer.begin() + offset_in_decompressed_block;
bytes -= offset();
}
}
size_t readBig(char * to, size_t n)
{
size_t bytes_read = 0;
/// Если в буфере есть непрочитанные байты, то скопируем сколько надо в to.
if (pos < working_buffer.end())
bytes_read += read(to, std::min(static_cast<size_t>(working_buffer.end() - pos), n));
/// Если надо ещё прочитать - будем, по возможности, разжимать сразу в to.
while (bytes_read < n)
{
size_t size_decompressed = 0;
size_t new_size_compressed = readCompressedData(size_decompressed);
size_compressed = 0; /// file_in больше не указывает на конец блока в working_buffer.
if (!new_size_compressed)
return bytes_read;
/// Если разжатый блок помещается целиком туда, куда его надо скопировать.
if (size_decompressed <= n - bytes_read)
{
decompress(to + bytes_read, size_decompressed);
bytes_read += size_decompressed;
bytes += size_decompressed;
}
else
{
size_compressed = new_size_compressed;
bytes += offset();
memory.resize(size_decompressed);
working_buffer = Buffer(&memory[0], &memory[size_decompressed]);
pos = working_buffer.begin();
decompress(working_buffer.begin(), size_decompressed);
bytes_read += read(to + bytes_read, n - bytes_read);
break;
}
}
return bytes_read;
}
};
}
......@@ -6,6 +6,7 @@
#include <DB/Core/NamesAndTypes.h>
#include <DB/Common/escapeForFileName.h>
#include <DB/IO/CachedCompressedReadBuffer.h>
#include <DB/IO/CompressedReadBufferFromFile.h>
#define MERGE_TREE_MARK_SIZE (2 * sizeof(size_t))
......@@ -139,13 +140,26 @@ private:
struct Stream
{
Poco::SharedPtr<ReadBufferFromFile> marks_buffer;
Poco::SharedPtr<CachedCompressedReadBuffer> data_buffer;
ReadBuffer * data_buffer;
Poco::SharedPtr<CachedCompressedReadBuffer> cached_buffer;
Poco::SharedPtr<CompressedReadBufferFromFile> non_cached_buffer;
std::string path_prefix;
Stream(const String & path_prefix, UncompressedCache * uncompressed_cache)
: marks_buffer(new ReadBufferFromFile(path_prefix + ".mrk", MERGE_TREE_MARK_SIZE)),
data_buffer(new CachedCompressedReadBuffer(path_prefix + ".bin", uncompressed_cache)),
path_prefix(path_prefix) {}
path_prefix(path_prefix)
{
if (uncompressed_cache)
{
cached_buffer = new CachedCompressedReadBuffer(path_prefix + ".bin", uncompressed_cache);
data_buffer = &*cached_buffer;
}
else
{
non_cached_buffer = new CompressedReadBufferFromFile(path_prefix + ".bin");
data_buffer = &*non_cached_buffer;
}
}
void seekToMark(size_t index)
{
......@@ -163,7 +177,10 @@ private:
try
{
data_buffer->seek(offset_in_compressed_file, offset_in_decompressed_block);
if (cached_buffer)
cached_buffer->seek(offset_in_compressed_file, offset_in_decompressed_block);
if (non_cached_buffer)
non_cached_buffer->seek(offset_in_compressed_file, offset_in_decompressed_block);
}
catch (const Exception & e)
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册