From d4225117ee0c765f08572f85c163a18e6e0fe4df Mon Sep 17 00:00:00 2001 From: yah01 Date: Mon, 14 Mar 2022 16:00:01 +0800 Subject: [PATCH] Enable Zstd compression for binlog (#15840) Signed-off-by: yah01 --- internal/storage/cwrapper/CMakeLists.txt | 3 ++- internal/storage/cwrapper/ParquetWrapper.cpp | 7 ++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/internal/storage/cwrapper/CMakeLists.txt b/internal/storage/cwrapper/CMakeLists.txt index ef18d29a1..00c5cd59d 100644 --- a/internal/storage/cwrapper/CMakeLists.txt +++ b/internal/storage/cwrapper/CMakeLists.txt @@ -42,7 +42,8 @@ macro( build_arrow ) set( ARROW_CMAKE_ARGS "-DARROW_WITH_LZ4=OFF" - "-DARROW_WITH_ZSTD=OFF" + "-DARROW_WITH_ZSTD=ON" + "-Dzstd_SOURCE=BUNDLED" "-DARROW_WITH_BROTLI=OFF" "-DARROW_WITH_SNAPPY=OFF" "-DARROW_WITH_ZLIB=OFF" diff --git a/internal/storage/cwrapper/ParquetWrapper.cpp b/internal/storage/cwrapper/ParquetWrapper.cpp index c9e4ccdd7..cc73d7b9a 100644 --- a/internal/storage/cwrapper/ParquetWrapper.cpp +++ b/internal/storage/cwrapper/ParquetWrapper.cpp @@ -302,6 +302,7 @@ CStatus FinishPayloadWriter(CPayloadWriter payloadWriter) { st.error_msg = ErrorMsg("arrow builder is nullptr"); return st; } + if (p->output == nullptr) { std::shared_ptr array; auto ast = p->builder->Finish(&array); @@ -310,10 +311,14 @@ CStatus FinishPayloadWriter(CPayloadWriter payloadWriter) { st.error_msg = ErrorMsg(ast.message()); return st; } + auto table = arrow::Table::Make(p->schema, {array}); p->output = std::make_shared(); auto mem_pool = arrow::default_memory_pool(); - ast = parquet::arrow::WriteTable(*table, mem_pool, p->output, 1024 * 1024 * 1024); + ast = parquet::arrow::WriteTable(*table, mem_pool, p->output, 1024 * 1024 * 1024, + parquet::WriterProperties::Builder().compression(arrow::Compression::ZSTD) + ->compression_level(3)->build()); + if (!ast.ok()) { st.error_code = static_cast(ErrorCode::UNEXPECTED_ERROR); st.error_msg = ErrorMsg(ast.message()); -- GitLab