提交 abf49e93 编写于 作者: A Alexey Milovidov

dbms: disabled QuickLZ [#METR-17973].

上级 00637091
......@@ -3,7 +3,11 @@
#include <vector>
#include <city.h>
#include <quicklz/quicklz_level1.h>
#ifdef USE_QUICKLZ
#include <quicklz/quicklz_level1.h>
#endif
#include <lz4/lz4.h>
#include <zstd/zstd.h>
......@@ -34,10 +38,12 @@ protected:
ReadBuffer * compressed_in;
/// Если в буфере compressed_in помещается целый сжатый блок - используем его. Иначе - копируем данные по кусочкам в own_compressed_buffer.
PODArray<char> own_compressed_buffer{QUICKLZ_HEADER_SIZE};
PODArray<char> own_compressed_buffer{COMPRESSED_BLOCK_HEADER_SIZE};
char * compressed_buffer = nullptr;
#ifdef USE_QUICKLZ
qlz_state_decompress * qlz_state = nullptr;
#endif
/// Прочитать сжатые данные в compressed_buffer. Достать из их заголовка размер разжатых данных. Проверить чексумму.
/// Возвращает количество прочитанных байт.
......@@ -49,8 +55,8 @@ protected:
uint128 checksum;
compressed_in->readStrict(reinterpret_cast<char *>(&checksum), sizeof(checksum));
own_compressed_buffer.resize(QUICKLZ_HEADER_SIZE);
compressed_in->readStrict(&own_compressed_buffer[0], QUICKLZ_HEADER_SIZE);
own_compressed_buffer.resize(COMPRESSED_BLOCK_HEADER_SIZE);
compressed_in->readStrict(&own_compressed_buffer[0], COMPRESSED_BLOCK_HEADER_SIZE);
UInt8 method = own_compressed_buffer[0]; /// См. CompressedWriteBuffer.h
......@@ -58,8 +64,12 @@ protected:
if (method < 0x80)
{
#ifdef USE_QUICKLZ
size_compressed = qlz_size_compressed(&own_compressed_buffer[0]);
size_decompressed = qlz_size_decompressed(&own_compressed_buffer[0]);
#else
throw Exception("QuickLZ compression method is disabled", ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
#endif
}
else if (method == static_cast<UInt8>(CompressionMethodByte::LZ4) || method == static_cast<UInt8>(CompressionMethodByte::ZSTD))
{
......@@ -75,10 +85,10 @@ protected:
ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed + sizeof(checksum));
/// Находится ли сжатый блок целиком в буфере compressed_in?
if (compressed_in->offset() >= QUICKLZ_HEADER_SIZE &&
compressed_in->position() + size_compressed - QUICKLZ_HEADER_SIZE <= compressed_in->buffer().end())
if (compressed_in->offset() >= COMPRESSED_BLOCK_HEADER_SIZE &&
compressed_in->position() + size_compressed - COMPRESSED_BLOCK_HEADER_SIZE <= compressed_in->buffer().end())
{
compressed_in->position() -= QUICKLZ_HEADER_SIZE;
compressed_in->position() -= COMPRESSED_BLOCK_HEADER_SIZE;
compressed_buffer = compressed_in->position();
compressed_in->position() += size_compressed;
}
......@@ -86,7 +96,7 @@ protected:
{
own_compressed_buffer.resize(size_compressed);
compressed_buffer = &own_compressed_buffer[0];
compressed_in->readStrict(&compressed_buffer[QUICKLZ_HEADER_SIZE], size_compressed - QUICKLZ_HEADER_SIZE);
compressed_in->readStrict(&compressed_buffer[COMPRESSED_BLOCK_HEADER_SIZE], size_compressed - COMPRESSED_BLOCK_HEADER_SIZE);
}
if (checksum != CityHash128(&compressed_buffer[0], size_compressed))
......@@ -104,21 +114,25 @@ protected:
if (method < 0x80)
{
#ifdef USE_QUICKLZ
if (!qlz_state)
qlz_state = new qlz_state_decompress;
qlz_decompress(&compressed_buffer[0], to, qlz_state);
#else
throw Exception("QuickLZ compression method is disabled", ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
#endif
}
else if (method == static_cast<UInt8>(CompressionMethodByte::LZ4))
{
if (LZ4_decompress_fast(&compressed_buffer[QUICKLZ_HEADER_SIZE], to, size_decompressed) < 0)
if (LZ4_decompress_fast(&compressed_buffer[COMPRESSED_BLOCK_HEADER_SIZE], to, size_decompressed) < 0)
throw Exception("Cannot LZ4_decompress_fast", ErrorCodes::CANNOT_DECOMPRESS);
}
else if (method == static_cast<UInt8>(CompressionMethodByte::ZSTD))
{
size_t res = ZSTD_decompress(
to, size_decompressed,
&compressed_buffer[QUICKLZ_HEADER_SIZE], size_compressed_without_checksum - QUICKLZ_HEADER_SIZE);
&compressed_buffer[COMPRESSED_BLOCK_HEADER_SIZE], size_compressed_without_checksum - COMPRESSED_BLOCK_HEADER_SIZE);
if (ZSTD_isError(res))
throw Exception("Cannot ZSTD_decompress: " + std::string(ZSTD_getErrorName(res)), ErrorCodes::CANNOT_DECOMPRESS);
......@@ -136,8 +150,10 @@ public:
~CompressedReadBufferBase()
{
#ifdef USE_QUICKLZ
if (qlz_state)
delete qlz_state;
#endif
}
};
......
......@@ -5,7 +5,7 @@
#define DBMS_MAX_COMPRESSED_SIZE 0x40000000ULL /// 1GB
#define QUICKLZ_ADDITIONAL_SPACE 400
#define QUICKLZ_HEADER_SIZE 9
#define COMPRESSED_BLOCK_HEADER_SIZE 9
namespace DB
......
......@@ -5,7 +5,11 @@
#include <vector>
#include <city.h>
#include <quicklz/quicklz_level1.h>
#ifdef USE_QUICKLZ
#include <quicklz/quicklz_level1.h>
#endif
#include <lz4/lz4.h>
#include <lz4/lz4hc.h>
#include <zstd/zstd.h>
......@@ -35,7 +39,9 @@ private:
CompressionMethod method;
PODArray<char> compressed_buffer;
#ifdef USE_QUICKLZ
qlz_state_compress * qlz_state;
#endif
void nextImpl()
{
......@@ -53,6 +59,7 @@ private:
{
case CompressionMethod::QuickLZ:
{
#ifdef USE_QUICKLZ
compressed_buffer.resize(uncompressed_size + QUICKLZ_ADDITIONAL_SPACE);
compressed_size = qlz_compress(
......@@ -64,6 +71,9 @@ private:
compressed_buffer[0] &= 3;
compressed_buffer_ptr = &compressed_buffer[0];
break;
#else
throw Exception("QuickLZ compression method is disabled", ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
#endif
}
case CompressionMethod::LZ4:
case CompressionMethod::LZ4HC:
......@@ -137,7 +147,12 @@ public:
WriteBuffer & out_,
CompressionMethod method_ = CompressionMethod::LZ4,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE)
: BufferWithOwnMemory<WriteBuffer>(buf_size), out(out_), method(method_), qlz_state(new qlz_state_compress) {}
: BufferWithOwnMemory<WriteBuffer>(buf_size), out(out_), method(method_)
#ifdef USE_QUICKLZ
, qlz_state(new qlz_state_compress)
#endif
{
}
/// Объём сжатых данных
size_t getCompressedBytes()
......@@ -170,7 +185,9 @@ public:
tryLogCurrentException(__PRETTY_FUNCTION__);
}
#ifdef USE_QUICKLZ
delete qlz_state;
#endif
}
};
......
......@@ -555,7 +555,13 @@ struct SettingCompressionMethod
static CompressionMethod getCompressionMethod(const String & s)
{
if (s == "quicklz")
{
#ifdef USE_QUICKLZ
return CompressionMethod::QuickLZ;
#else
throw Exception("QuickLZ compression method is disabled", ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
#endif
}
if (s == "lz4")
return CompressionMethod::LZ4;
if (s == "lz4hc")
......
......@@ -44,7 +44,9 @@ int main(int argc, char ** argv)
("d,decompress", "decompress")
("block-size,b", boost::program_options::value<unsigned>()->default_value(DBMS_DEFAULT_BUFFER_SIZE), "compress in blocks of specified size")
("hc", "use LZ4HC instead of LZ4")
#ifdef USE_QUICKLZ
("qlz", "use QuickLZ (level 1) instead of LZ4")
#endif
("zstd", "use ZSTD instead of LZ4")
("stat", "print block statistics of compressed data")
;
......@@ -62,7 +64,13 @@ int main(int argc, char ** argv)
try
{
bool decompress = options.count("d");
#ifdef USE_QUICKLZ
bool use_qlz = options.count("qlz");
#else
bool use_qlz = false;
#endif
bool use_lz4hc = options.count("hc");
bool use_zstd = options.count("zstd");
bool stat_mode = options.count("stat");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册