From c27ca2364417c96f4e56a1902cb4b800fee46f18 Mon Sep 17 00:00:00 2001 From: Siddhartha Roychowdhury Date: Wed, 26 Jan 2022 13:57:30 -0800 Subject: [PATCH] Add option for WAL compression algorithm (#9432) Summary: Add an option to set the WAL compression algorithm - wal_compression. TODO: WAL compression is not implemented and will only support zstd initially. Will be added in subsequent diffs. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9432 Reviewed By: pdillinger Differential Revision: D33797275 Pulled By: sidroyc fbshipit-source-id: 8db81d9c9cea5e2e4f1445d3aecad8106137b8e7 --- db/c.cc | 8 ++++++++ db/c_test.c | 3 +++ db/db_impl/db_impl_open.cc | 8 ++++++++ include/rocksdb/c.h | 4 ++++ include/rocksdb/options.h | 5 +++++ options/db_options.cc | 7 +++++++ options/db_options.h | 1 + options/options_helper.cc | 1 + options/options_settable_test.cc | 1 + tools/db_bench_tool.cc | 9 +++++++++ 10 files changed, 47 insertions(+) diff --git a/db/c.cc b/db/c.cc index 03d422dce..29a635830 100644 --- a/db/c.cc +++ b/db/c.cc @@ -3533,6 +3533,14 @@ unsigned char rocksdb_options_get_manual_wal_flush(rocksdb_options_t* opt) { return opt->rep.manual_wal_flush; } +void rocksdb_options_set_wal_compression(rocksdb_options_t* opt, int val) { + opt->rep.wal_compression = static_cast(val); +} + +int rocksdb_options_get_wal_compression(rocksdb_options_t* opt) { + return opt->rep.wal_compression; +} + rocksdb_ratelimiter_t* rocksdb_ratelimiter_create( int64_t rate_bytes_per_sec, int64_t refill_period_us, diff --git a/db/c_test.c b/db/c_test.c index 4a657ead3..adc70ede4 100644 --- a/db/c_test.c +++ b/db/c_test.c @@ -1777,6 +1777,9 @@ int main(int argc, char** argv) { rocksdb_options_set_manual_wal_flush(o, 1); CheckCondition(1 == rocksdb_options_get_manual_wal_flush(o)); + rocksdb_options_set_wal_compression(o, 1); + CheckCondition(1 == rocksdb_options_get_wal_compression(o)); + /* Blob Options */ rocksdb_options_set_enable_blob_files(o, 1); CheckCondition(1 == rocksdb_options_get_enable_blob_files(o)); diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index f73060a9e..f9be6f425 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -194,6 +194,14 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src, } #endif // !ROCKSDB_LITE + // Supported wal compression types + if (result.wal_compression != kNoCompression && + result.wal_compression != kZSTD) { + result.wal_compression = kNoCompression; + ROCKS_LOG_WARN(result.info_log, + "wal_compression is disabled since only zstd is supported"); + } + if (!result.paranoid_checks) { result.skip_checking_sst_file_sizes_on_db_open = true; ROCKS_LOG_INFO(result.info_log, diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index 94f096209..5b12449b7 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -1445,6 +1445,10 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_manual_wal_flush( rocksdb_options_t* opt, unsigned char); extern ROCKSDB_LIBRARY_API unsigned char rocksdb_options_get_manual_wal_flush( rocksdb_options_t* opt); +extern ROCKSDB_LIBRARY_API void rocksdb_options_set_wal_compression( + rocksdb_options_t* opt, int); +extern ROCKSDB_LIBRARY_API int rocksdb_options_get_wal_compression( + rocksdb_options_t* opt); /* RateLimiter */ extern ROCKSDB_LIBRARY_API rocksdb_ratelimiter_t* rocksdb_ratelimiter_create( diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index e14b3253e..6aa89c833 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1228,6 +1228,11 @@ struct DBOptions { // file. bool manual_wal_flush = false; + // This feature is WORK IN PROGRESS + // If enabled WAL records will be compressed before they are written. + // Only zstd is supported. + CompressionType wal_compression = kNoCompression; + // If true, RocksDB supports flushing multiple column families and committing // their results atomically to MANIFEST. Note that it is not // necessary to set atomic_flush to true if WAL is always enabled since WAL diff --git a/options/db_options.cc b/options/db_options.cc index 0fd23a9e9..f69cff036 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -384,6 +384,10 @@ static std::unordered_map {offsetof(struct ImmutableDBOptions, manual_wal_flush), OptionType::kBoolean, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"wal_compression", + {offsetof(struct ImmutableDBOptions, wal_compression), + OptionType::kCompressionType, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, {"seq_per_batch", {0, OptionType::kBoolean, OptionVerificationType::kDeprecated, OptionTypeFlags::kNone}}, @@ -725,6 +729,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) preserve_deletes(options.preserve_deletes), two_write_queues(options.two_write_queues), manual_wal_flush(options.manual_wal_flush), + wal_compression(options.wal_compression), atomic_flush(options.atomic_flush), avoid_unnecessary_blocking_io(options.avoid_unnecessary_blocking_io), persist_stats_to_disk(options.persist_stats_to_disk), @@ -891,6 +896,8 @@ void ImmutableDBOptions::Dump(Logger* log) const { two_write_queues); ROCKS_LOG_HEADER(log, " Options.manual_wal_flush: %d", manual_wal_flush); + ROCKS_LOG_HEADER(log, " Options.wal_compression: %d", + wal_compression); ROCKS_LOG_HEADER(log, " Options.atomic_flush: %d", atomic_flush); ROCKS_LOG_HEADER(log, " Options.avoid_unnecessary_blocking_io: %d", diff --git a/options/db_options.h b/options/db_options.h index d44331b06..49188abba 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -87,6 +87,7 @@ struct ImmutableDBOptions { bool preserve_deletes; bool two_write_queues; bool manual_wal_flush; + CompressionType wal_compression; bool atomic_flush; bool avoid_unnecessary_blocking_io; bool persist_stats_to_disk; diff --git a/options/options_helper.cc b/options/options_helper.cc index 7f9a991bf..aa421b8b8 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -172,6 +172,7 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, immutable_db_options.preserve_deletes; options.two_write_queues = immutable_db_options.two_write_queues; options.manual_wal_flush = immutable_db_options.manual_wal_flush; + options.wal_compression = immutable_db_options.wal_compression; options.atomic_flush = immutable_db_options.atomic_flush; options.avoid_unnecessary_blocking_io = immutable_db_options.avoid_unnecessary_blocking_io; diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 3628409cf..e9ea696ba 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -334,6 +334,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "concurrent_prepare=false;" "two_write_queues=false;" "manual_wal_flush=false;" + "wal_compression=kZSTD;" "seq_per_batch=false;" "atomic_flush=false;" "avoid_unnecessary_blocking_io=false;" diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 92e2da123..0c7af7ed5 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -762,6 +762,11 @@ DEFINE_bool(disable_wal, false, "If true, do not write WAL for write."); DEFINE_bool(manual_wal_flush, false, "If true, buffer WAL until buffer is full or a manual FlushWAL()."); +DEFINE_string(wal_compression, "string", + "Algorithm to use for WAL compression. none to disable."); +static enum ROCKSDB_NAMESPACE::CompressionType FLAGS_wal_compression_e = + ROCKSDB_NAMESPACE::kNoCompression; + DEFINE_string(wal_dir, "", "If not empty, use the given dir for WAL"); DEFINE_string(truth_db, "/dev/shm/truth_db/dbbench", @@ -3998,6 +4003,7 @@ class Benchmark { options.use_direct_io_for_flush_and_compaction = FLAGS_use_direct_io_for_flush_and_compaction; options.manual_wal_flush = FLAGS_manual_wal_flush; + options.wal_compression = FLAGS_wal_compression_e; #ifndef ROCKSDB_LITE options.ttl = FLAGS_fifo_compaction_ttl; options.compaction_options_fifo = CompactionOptionsFIFO( @@ -8129,6 +8135,9 @@ int db_bench_tool(int argc, char** argv) { FLAGS_compression_type_e = StringToCompressionType(FLAGS_compression_type.c_str()); + FLAGS_wal_compression_e = + StringToCompressionType(FLAGS_wal_compression.c_str()); + #ifndef ROCKSDB_LITE // Stacked BlobDB FLAGS_blob_db_compression_type_e = -- GitLab